Amazon Athena의 SQL 기초를 정리한다. 이번 내용에는 Amazon Athena의 기본 사용 방법을 익히고, 활용하여 S3에 업로드 된 CloudTrail 로그들을 이용해서 MFA 없이 AWS Web Console에 접근하는 사용자를 찾아보도록 하자.
Amazon Athena의 기본적인 구성은 아래와 같다.
S3 Location 확인
분석하기 위한 로그 파일들이 S3 버킷에 업로드 되었다는 가정하에 진행한다. 따라서 사전에 각종 로그 파일들을 S3에 업로드 해놓도록 하자.
DDL Create Query(테이블 생성)
기본적으로 Athena 쿼리 엔진의 DDL은 Hive DDL을 기반으로 한다. Create Syntax는 아래와 같다.
CREATE Syntax
CREATE [EXTERNAL] TABLE [IF NOT EXISTS]
[db_name.]table_name [(col_name data_type [COMMENT col_comment] [, ...] )]
[COMMENT table_comment]
[PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)]
[ROW FORMAT row_format]
[STORED AS file_format] [WITH SERDEPROPERTIES (...)] ]
[LOCATION 's3_loc']
[TBLPROPERTIES ( ['has_encrypted_data'='true | false',] ['classification'='aws_glue_classification',] property_name=property_value [, ...] ) ]
참고
Create Table - CloudTrail 로그 테이블 생성
CloudTrail 로그를 S3에 저장되면 아래와 같은 형식의 path에 로그파일이 위치하게 된다.
아래 형태의 Path로 생성됨으로 파티션을 생성할 수 있다.
s3://"BucketName"/AWSLogs/"AccountNumber"/CloudTrail/"Region"/"year"/"month"/"day"/"LogFileName.gz"
Athena Document에서 CloudTrail 로그 쿼리 관련 정보를 참고해서 partition을 추가하여 테이블을 생성하겠다.
CREATE EXTERNAL TABLE cloudtrail.cloudtrail_log(
eventversion string,
useridentity struct<type:string,principalid:string,arn:string,accountid:string,invokedby:string,accesskeyid:string,username:string,sessioncontext:struct<attributes:struct<mfaauthenticated:string,creationdate:string>,sessionissuer:struct<type:string,principalid:string,arn:string,accountid:string,username:string>>>,
eventtime string,
eventsource string,
eventname string,
awsregion string,
sourceipaddress string,
useragent string,
errorcode string,
errormessage string,
requestparameters string,
responseelements string,
additionaleventdata string,
requestid string,
eventid string,
resources array<struct<arn:string,accountid:string,type:string>>,
eventtype string,
apiversion string,
readonly string,
recipientaccountid string,
serviceeventdetails string,
sharedeventid string,
vpcendpointid string)
PARTITIONED BY (
accountnumber string,
region string,
year string,
month string,
day string)
ROW FORMAT SERDE
'com.amazon.emr.hive.serde.CloudTrailSerde'
STORED AS INPUTFORMAT
'com.amazon.emr.cloudtrail.CloudTrailInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
's3://<bucketname>/AWSLogs/'
테이블 이름과 Location을 정확하게 입력해서 Athena 콘솔에서 Run query를 실행한다.
쿼리가 성공적으로 수행되면 데이터베이스에 쿼리 실행 결과로 테이블이 생성된 것을 확인할 수 있다.
테이블에 파티션을 선언했음으로 파티션을 추가해야 한다. 파티션 컬럼 값과 S3 Path와 일치되도록 ADD 파티션을 해준다. 파티션을 미리 만들어 놓거나, 주기적으로 생성되도록 별도의 스케쥴링 구성을 권장한다.
ALTER TABLE cloudtrail_log ADD
PARTITION (accountnumber='00000000000' ,region='ap-northeast-2',year='2019',month='05',day='01') location 's3://<bucketname>/AWSLogs/00000000000/CloudTrail/ap-northeast-2/2019/05/01'
PARTITION (accountnumber='00000000000' ,region='ap-northeast-2',year='2019',month='05',day='02') location 's3://<bucketname>/AWSLogs/00000000000/CloudTrail/ap-northeast-2/2019/05/02'
추가적으로 Glue Crawler를 통해 데이터 카탈로그에 테이블 스키마를 자동 등록하고, Athena의 데이터 카탈로그에 등록된 테이블 스키마를 사용하는 방법도 가능하다. 이번에는 다루지 않기 때문에 자세한 내용은 Glue 가이드 문서를 참고하자.
DML Select Query(테이블 조회)
앞서 말했듯이 Athena의 쿼리 엔진은 Presto를 기반으로 하기 때문에 ANSI SQL 표준 구문을 지원한다.
[ WITH with_query [, ...] ]
SELECT [ ALL | DISTINCT ] select_expression [, ...]
[ FROM from_item [, ...] ]
[ WHERE condition ]
[ GROUP BY [ ALL | DISTINCT ] grouping_element [, ...] ]
[ HAVING condition ]
[ UNION [ ALL | DISTINCT ] union_query ]
[ ORDER BY expression [ ASC | DESC ] [ NULLS FIRST | NULLS LAST] [, ...] ]
[ LIMIT [ count | ALL ] ]
참고
그럼 Select Query 문을 통해서 생성한 Cloudtrail 로그 테이블을 조회해본다. 특정 기간 동안 MFA를 사용하지 않고 AWS Console에 로그인 한 사용자 계정과 로그인 회수를 알아보겠다.
1. 테이블의 모든 칼럼 및 모든 데이터 조회
SELECT *
FROM cloudtrail. cloudtrail_log
Limit 10;
약 10개의 Cloudtrail 로그 테이블 칼럼들을 확인할 수 있다.
2. 테이블의 User(useridentity.username, responseelements) 칼럼 조회
SELECT useridentity.username, responseelements
FROM cloudtrail. cloudtrail_log
Limit 10;
3. 테이블의 User(useridentity.username, responseelements)의 evetname이 "ConsoleLogin"인 칼럼 조회
SELECT useridentity.username, responseelements
FROM cloudtrail. cloudtrail_log
WHERE eventname = 'ConsoleLogin'
Limit 10;
그럼 어떤 유저가 언제 AWSConsole에 로그인 했는지 확인할 수 있다.
4. 테이블의 User(useridentity.username, responseelements)의 evetname이 "ConsoleLogin" / Date가 "2019-05-02"부터 "2019-05-09"인 칼럼 조회
SELECT useridentity.username, responseelements
FROM cloudtrail. cloudtrail_log
WHERE eventname = 'ConsoleLogin'
AND date_parse(concat(year,month,day), '%Y%m%d') >= from_iso8601_date('2019-05-02')
AND date_parse(concat(year,month,day), '%Y%m%d') <= from_iso8601_date('2019-05-09')
Limit 10;
특정 기간까지 추가했다. 계속 조건을 추가해서 진행해보자.
5. 테이블의 User(useridentity.username, responseelements)의 evetname이 "ConsoleLogin" / Date가 "2019-05-02"부터 "2019-05-09" / additionaleventdata의 MFAUsed 값이 "No" / responseelements에서 ConsoleLogin 값이 "Success"인 칼럼 조회
SELECT useridentity.username, responseelements
FROM cloudtrail. cloudtrail_log
WHERE eventname = 'ConsoleLogin'
AND date_parse(concat(year,month,day), '%Y%m%d') >= from_iso8601_date('2019-05-02')
AND date_parse(concat(year,month,day), '%Y%m%d') <= from_iso8601_date('2019-05-09')
AND json_extract_scalar(additionaleventdata, '$.MFAUsed') = 'No'
AND ((CAST(json_extract(responseelements, '$.ConsoleLogin') as VARCHAR) = 'Success'))
Limit 10;
6. 테이블의 User(useridentity.username, responseelements)의 evetname이 "ConsoleLogin" / Date가 "2019-05-02"부터 "2019-05-09" / additionaleventdata의 MFAUsed 값이 "No" / responseelements에서 ConsoleLogin 값이 "Success"인 칼럼들을 Group by로 조회
SELECT useridentity.username, responseelements
FROM cloudtrail. cloudtrail_log
WHERE eventname = 'ConsoleLogin'
AND date_parse(concat(year,month,day), '%Y%m%d') >= from_iso8601_date('2019-05-02')
AND date_parse(concat(year,month,day), '%Y%m%d') <= from_iso8601_date('2019-05-09')
AND json_extract_scalar(additionaleventdata, '$.MFAUsed') = 'No'
AND ((CAST(json_extract(responseelements, '$.ConsoleLogin') as VARCHAR) = 'Success'))
GROUP BY useridentity.username, responseelements
Limit 10;
추가로 Count를 주어서 얼마나 MFA 없이 로그인 했는지 회수를 알 수 있다.
SELECT useridentity.username, responseelements, COUNT(*) AS "count"
FROM cloudtrail. cloudtrail_log
WHERE eventname = 'ConsoleLogin'
AND date_parse(concat(year,month,day), '%Y%m%d') >= from_iso8601_date('2019-05-02')
AND date_parse(concat(year,month,day), '%Y%m%d') <= from_iso8601_date('2019-05-09')
AND json_extract_scalar(additionaleventdata, '$.MFAUsed') = 'No'
AND ((CAST(json_extract(responseelements, '$.ConsoleLogin') as VARCHAR) = 'Success'))
GROUP BY useridentity.username, responseelements;
Athena는 당연히 VIEW도 지원한다.
AWS Athena Document에는 각종 AWS 로그 제어에 대한 Query를 제공한다. 현재 제공되는 로그들은 CloudTrail, CloudFront, CLB, NLB, ALB, VPC Flow, WAF 로그가 있다. 자세한 내용은 아래 링크를 참고하자.
CloudFront 컨텐츠 통계
SELECT
domain, uri, download_bytes_GB
FROM(
SELECT
domain, uri, download_bytes, download_bytes_GB, rank() OVER (PARTITION BY domain ORDER BY download_bytes DESC) AS rank
FROM(
SELECT
domain, SPLIT_PART(uri, '/', 2) as uri, SUM(bytes) as download_bytes, ROUND((SUM(bytes)*1.0/1024/1024/1024), 2) AS download_bytes_GB
FROM
xx_temp.xxx_poc_temp
WHERE
status >= 200
AND
status < 300
AND
domain LIKE 'blog.leedoing.com'
AND
pdate BETWEEN date('2021-03-10') AND date('2021-03-10')
GROUP BY
1, 2
)
)
WHERE
rank <= 1000
ORDER BY
1 ASC, 3 DESC
SELECT
domain, uri, download_bytes_GB
FROM(
SELECT
domain, uri, download_bytes, download_bytes_GB, rank() OVER (PARTITION BY domain ORDER BY download_bytes DESC) AS rank
FROM(
SELECT
domain, SPLIT_PART(uri, '/', 2) as uri, SUM(bytes) as download_bytes, ROUND((SUM(bytes)*1.0/1024/1024/1024), 2) AS download_bytes_GB
FROM
xx_temp.xxx_poc_temp
WHERE
status >= 200
AND
status < 300
AND
domain LIKE 'blog.leedoing.com'
AND
FROM_iso8601_timestamp(concat(concat(date_format(date, '%Y-%m-%d'), 'T'), time)) >= FROM_iso8601_timestamp('2021-03-07T11:00:00')
AND
FROM_iso8601_timestamp(concat(concat(date_format(date, '%Y-%m-%d'), 'T'), time)) <= FROM_iso8601_timestamp('2021-03-08T03:00:00')
GROUP BY
1, 2
)
)
ORDER BY
1 ASC, 3 DESC