본문 바로가기

AWS

[AWS] S3, Athena, Lambda - 최근 액세스 날짜 기준 이후의 객체 삭제

어느 한 버킷에 오래된 객체들이 있지만 이 객체들 중에서 서비스 운영에서 필요한 객체들이 있습니다.
하지만 쓰지 않는 객체들이 많아 해당 객체들을 삭제하여 비용을 절감하고 싶을 때 S3의 수명관리로는 해당 작업이 불가합니다.
S3의 수명관리로 삭제를 하려고 한다면 최근 액세스 된 객체를 잡아내지 못하고 특정 기간이 지난 객체를 삭제하게 됩니다.
최근 액세스된 객체를 제외한 오래된 객체를 삭제하기 위해서는 S3 서버 액세스 로그를 뽑아내고 해당 로그를 Athena를 통해 분석 후 람다를 통해 삭제를 진행해야 합니다. 진행하면서 참고한 자료들은 맨 밑에 링크 첨부하도록 하겠습니다.
 
 

S3 액세스 로그 활성화

  • S3 Bucket -> 속성 -> 서버 액세스 로깅

  • S3 Bucket -> 관리 -> 인벤토리 구성

  • S3 액세스 로그 확인
    • 액세스 로그

  • 인벤토리 확인
    • 인벤토리

 
 

Athena를 통해 분석

  • 액세스 로그 분석
CREATE EXTERNAL TABLE `ydy_access_log_test_db.s3_access_logs`(
  `bucketowner` STRING,
  `bucket_name` STRING,
  `requestdatetime` STRING,
  `remoteip` STRING,
  `requester` STRING,
  `requestid` STRING,
  `operation` STRING,
  `key` STRING,
  `request_uri` STRING,
  `httpstatus` STRING,
  `errorcode` STRING,
  `bytessent` BIGINT,
  `objectsize` BIGINT,
  `totaltime` STRING,
  `turnaroundtime` STRING,
  `referrer` STRING,
  `useragent` STRING,
  `versionid` STRING,
  `hostid` STRING,
  `sigv` STRING,
  `ciphersuite` STRING,
  `authtype` STRING,
  `endpoint` STRING,
  `tlsversion` STRING)
ROW FORMAT SERDE
  'org.apache.hadoop.hive.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
  'input.regex'='([^ ]*) ([^ ]*) \\[(.*?)\\] ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) (\"[^\"]*\"|-) (-|[0-9]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) (\"[^\"]*\"|-) ([^ ]*)(?: ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*))?.*$')
STORED AS INPUTFORMAT
  'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
 's3://ydy-s3-access-log-test-02/Access-Logs/'
 TBLPROPERTIES (
  'storage.location.template'='s3://ydy-s3-access-log-test-02/Access-Logs/logs/${timestamp}')

  • 인벤토리 분석
CREATE EXTERNAL TABLE ydy_access_log_test_db.s3_inventory(
         bucket string,
         key string,
         version_id string,
         is_latest boolean,
         is_delete_marker boolean,
         size string,
         last_modified_date string,
         e_tag string,
         storage_class string,
         is_multipart_uploaded boolean,
         replication_status string,
         encryption_status string,
         object_lock_retain_until_date string,
         object_lock_mode string,
         object_lock_legal_hold_status string,
         intelligent_tiering_access_tier string,
         bucket_key_status string,
         checksum_algorithm string,
         object_access_control_list string,
         object_owner string
) PARTITIONED BY (
        dt string
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
  STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
  OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
  LOCATION 's3://ydy-s3-access-log-test-02/inventory/ydy-s3-access-log-test-01/ydytest/hive/'
  TBLPROPERTIES (
    "projection.enabled" = "true",
    "projection.dt.type" = "date",
    "projection.dt.format" = "yyyy-MM-dd-HH-mm",
    "projection.dt.range" = "2022-01-01-00-00,NOW",
    "projection.dt.interval" = "1",
    "projection.dt.interval.unit" = "HOURS"
  );

  • 분석 값을 통한 결과 값 S3에 저장
    • UNLOAD 활용
UNLOAD(
WITH latest_access AS (
    SELECT
        key,
        MAX(requestdatetime) AS last_access_time
    FROM
        ydy_access_log_test_db.s3_access_logs
    GROUP BY
        key
),
inventory_data AS (
    SELECT
        key,
        last_modified_date,
        size,
        storage_class,
        e_tag,
        is_latest,
        version_id,
        is_multipart_uploaded,
        replication_status,
        encryption_status,
        object_lock_retain_until_date,
        object_lock_mode,
        object_lock_legal_hold_status,
        intelligent_tiering_access_tier,
        bucket_key_status,
        checksum_algorithm,
        object_access_control_list,
        object_owner
    FROM
        ydy_access_log_test_db.s3_inventory
)
SELECT
    i.key,
    i.last_modified_date,
    i.size,
    i.storage_class,
    i.e_tag,
    i.is_latest,
    i.version_id,
    i.is_multipart_uploaded,
    i.replication_status,
    i.encryption_status,
    i.object_lock_retain_until_date,
    i.object_lock_mode,
    i.object_lock_legal_hold_status,
    i.intelligent_tiering_access_tier,
    i.bucket_key_status,
    i.checksum_algorithm,
    i.object_access_control_list,
    i.object_owner,
    l.last_access_time
FROM
    inventory_data i
LEFT JOIN
    latest_access l ON i.key = l.key
ORDER BY
    l.last_access_time DESC
LIMIT 100)
TO 's3://ydy-s3-access-log-test-02/Access-Logs/logs/' 
WITH (format = 'JSON')

 
 

Lambda를 통해 최근 액세스 기준 날짜보다 하루 지난 객체 삭제

  • 코드 작성 및 실행
import json
import boto3
import datetime
import gzip
from io import BytesIO

def lambda_handler(event, context):
    # S3 클라이언트 생성
    s3_client = boto3.client('s3')
    source_bucket_name = 'ydy-s3-access-log-test-02'
    target_bucket_name = 'ydy-s3-access-log-test-01'
    prefix = 'Access-Logs/logs/'

    # 현재 시간과 하루 전 시간 계산
    current_time = datetime.datetime.now(datetime.timezone.utc)
    cutoff_time = current_time - datetime.timedelta(days=1)
    
    # 목록 가져오기
    objects_to_delete = []

    # S3에서 접근 로그 파일 가져오기
    response = s3_client.list_objects_v2(Bucket=source_bucket_name, Prefix=prefix)
    
    if 'Contents' in response:
        for obj in response['Contents']:
            key = obj['Key']
            
            # 접근 로그 파일 읽기
            access_log_object = s3_client.get_object(Bucket=source_bucket_name, Key=key)
            with gzip.GzipFile(fileobj=BytesIO(access_log_object['Body'].read())) as gz:
                access_log_content = gz.read().decode('utf-8')
            
            # 접근 로그 파일의 각 줄을 처리
            for line in access_log_content.strip().split('\n'):
                if line.strip():  # 빈 줄 무시
                    try:
                        log_entry = json.loads(line)
                        key = log_entry['key']
                        last_access_time_str = log_entry.get('last_access_time')
                        
                        if last_access_time_str:
                            # 날짜 형식을 명시적으로 파싱
                            last_access_time = datetime.datetime.strptime(last_access_time_str, '%d/%b/%Y:%H:%M:%S %z')
                            
                            # 하루 전 시간보다 이전에 접근된 객체만 삭제
                            if last_access_time < cutoff_time:
                                objects_to_delete.append({'Key': key})
                    except json.JSONDecodeError as e:
                        print(f"Error decoding JSON for line: {line} | Error: {e}")
                    except KeyError as e:
                        print(f"Missing key in log entry: {line} | Error: {e}")
                    except ValueError as e:
                        print(f"Error parsing date for line: {line} | Error: {e}")

    # 객체 삭제
    if objects_to_delete:
        response = s3_client.delete_objects(
            Bucket=target_bucket_name,
            Delete={
                'Objects': objects_to_delete
            }
        )
        print(f'Deleted objects: {response}')
    else:
        print('No objects to delete.')

    return {
        'statusCode': 200,
        'body': json.dumps('Process completed successfully.')
    }

 
 

참고자료

 

Amazon S3 서버 액세스 로그 형식 - Amazon Simple Storage Service

아무 필드나 -로 설정하여 데이터를 알 수 없거나 사용할 수 없음 또는 해당 필드에 이 요청이 적용되지 않음을 표시할 수 있습니다.

docs.aws.amazon.com

 

Athena를 사용하여 Amazon S3 서버 액세스 로그를 분석하세요.

Amazon Athena에서 Amazon Simple Storage Service(S3) 서버 액세스 로그를 쿼리하고 싶습니다.

repost.aws