'데이터 엔지니어'로 성장하기

정리하는 걸 좋아하고, 남이 읽으면 더 좋아함

Data/Airflow

Airflow) custom operator 만들기_s3 Prefix copy

MightyTedKim 2022. 6. 12. 23:48
728x90
반응형

 

 
+ 요즘 docker Image에 다 몰려있는 로직을 task로 쪼개는 작업을 진행하고 있어요.
처음부터 airflow를 공부하고 적용했어야했는데, 과거의 내가 원망스럽네요.

요약

hook/operator 공부한김에 자주 사용할만한 기능을 operator로 하나 만들었어요

from custom.s3.operators.S3CopyFolderObjectOperator import S3CopyFolderObjectOperator

with DAG(
    'hook-S3CopyFolderObjectOperator', 
) as dag:
    S3CopyFolderObjectOperator(
        task_id="check_ready",
        aws_conn_id="CephObjectConn",
        source_bucket_name="hgkim",
        source_bucket_key_prefix="tmp/", #custom
        dest_bucket_name="hgkim",
        dest_bucket_key_prefix="tmp_test/" #custom
    )

구성은 1.상황 -> 2.고민 -> 3. 적용 로 잡았습니다

내용

1. 상황

airflow 이용해서 s3 파일들(폴더)를 이동시키는 효율적인 방법은 뭐가 있을까?

PythonOperator에서 s3fs를 사용하고 있는데, 여기에는 copy 가 없더라고요.
move 만 있어요. 그럴리가 없다고 생각했는데, 진짜 없음
그래서 boto3에는 copy가 있길래 사용하려다가, airflow를 airflow답게 사용해보기로 햇어요


k8s에서 돌릴건데, 환경은 아래와 같아요

  • 환경
    • k8s, ceph object storage, airflow 

적용하려는 파이프라인의 데이터는 아래 같다고 가정했어요

  • 데이터
    • 용량: 3,40G/day 이상
    • 파일: 3~4000/day
    • 성격 : 365일 들어옴

pythonOperator로 하던 걸 변경하는 거라서, 아래 조건을 최대한 만족하려 해요

  • 조건
    • 영향도 줄이기 -> airflow image에 pip install 하고 싶지 않음 (불안함)
    • 관리 편의성 -> 적업용 image를 따로 관리하고 싶지 않음 (귀찮음)
    • 효율성 -> for문 돌리는 것도 괜찮은데, connection 은 하나로 진행하고 싶음 (불안함)

2. 고민

여기서 부터는 폭풍 자문자답

AS-IS

기존에는 pythonOperator로 모든 로직을 풀고 있어서, 관리가 안되는 문제가 발생 ㅜ

 

Q. pythonOperator는 왜 사용하면 안될까?

  • 사용은 가능함
  • 하지만 자주 사용할 작업이라 만드는게 좋을 것 같음
  • hook을 사용하면, connection을 이용해서 보안 정보들을 관리 가능

TO-BE

여러 파일을 복사하는 operator는 따로 없음

  • 단일 파일만 있음

https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/operators/s3.html

 

Q. Operator로 만들어서 사용할 수 는 없을까?
A. class 만들어서 실행

  • boto의 s3객체를 뽑을 수 있어서 그대로 사용!
    • dest_bucket = s3_hook.get_bucket(self.dest_bucket_name)
from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Union

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook

class S3CopyFolderObjectOperator(BaseOperator):
    template_fields: Sequence[str] = (
        'source_bucket_key_prefix',
        'dest_bucket_key_prefix',
        'source_bucket_name',
        'dest_bucket_name',
    )

    def __init__(
        self,
        *,
        source_bucket_key_prefix: str,
        dest_bucket_key_prefix: str,
        source_bucket_name: Optional[str] = None,
        dest_bucket_name: Optional[str] = None,
        source_version_id: Optional[str] = None,
        aws_conn_id: str = 'aws_default',
        verify: Optional[Union[str, bool]] = None,
        acl_policy: Optional[str] = None,
        **kwargs,
    ):
        super().__init__(**kwargs)

        self.source_bucket_key_prefix = source_bucket_key_prefix
        self.dest_bucket_key_prefix = dest_bucket_key_prefix
        self.source_bucket_name = source_bucket_name
        self.dest_bucket_name = dest_bucket_name
        self.source_version_id = source_version_id
        self.aws_conn_id = aws_conn_id
        self.verify = verify
        self.acl_policy = acl_policy

    def execute(self, context: 'Context'):
        s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
        dest_bucket = s3_hook.get_bucket(self.dest_bucket_name)
        for old_key in s3_hook.list_keys(bucket_name=self.source_bucket_name, prefix=self.source_bucket_key_prefix):
            old_source = {'Bucket': self.source_bucket_name, 'Key': old_key}
            new_key = old_key.replace(self.source_bucket_key_prefix, self.dest_bucket_key_prefix, 1)
            new_obj = dest_bucket.Object(new_key)
            new_obj.copy(old_source)
            print(f"copy {old_source} -> {new_key}")

실행하는 DAG는 아래와 같아요

from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from custom.s3.operators.S3CopyFolderObjectOperator import S3CopyFolderObjectOperator

import time
import logging

args = { "owner": "hgkim", "description": "An example"}

logging.basicConfig(format='%(asctime)s,%(msecs)d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s',
    datefmt='%Y-%m-%d:%H:%M:%S',
    level=logging.DEBUG)

logger = logging.getLogger("hook-s3fs-sample")

with DAG(
    'hook-S3CopyFolderObjectOperator', 
    default_args=args,
    schedule_interval=None,
    start_date=datetime(2021, 6, 29),
    catchup=False,
    tags=['hook','hgkim']
) as dag:
    start_task = DummyOperator(task_id="start_task")
    end_task = DummyOperator(task_id="end_task")
    
    copy_task = S3CopyFolderObjectOperator(
        task_id="check_ready",
        aws_conn_id="CephObjectConn",
        source_bucket_name="hgkim",
        source_bucket_key_prefix="tmp/", #custom
        dest_bucket_name="hgkim",
        dest_bucket_key_prefix="tmp_test/" #custom
    )

    start_task >> copy_task >> end_task

- 로그

AIRFLOW_CTX_DAG_OWNER=hgkim
AIRFLOW_CTX_DAG_ID=hook-S3CopyFolderObjectOperator
AIRFLOW_CTX_TASK_ID=check_ready
AIRFLOW_CTX_EXECUTION_DATE=2022-06-17T03:39:50.828392+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-06-17T03:39:50.828392+00:00
[2022-06-17 03:39:58,391] {base_aws.py:400} INFO - Airflow Connection: aws_conn_id=CephObjectConn
[2022-06-17 03:39:58,405] {base.py:79} INFO - Using connection to: id: CephObjectConn. Host: http://***:3***1, Port: None, Schema: , Login: , Password: None, extra: {'aws_access_key_id': '***', 'aws_secret_access_key': '***'}
[2022-06-17 03:39:58,406] {base_aws.py:180} INFO - Credentials retrieved from extra_config
[2022-06-17 03:39:58,407] {base_aws.py:92} INFO - Creating session with aws_access_key_id=****-key region_name=None
[2022-06-17 03:39:58,422] {base_aws.py:167} INFO - role_arn is None
[2022-06-17 03:39:58,456] {base_aws.py:400} INFO - Airflow Connection: aws_conn_id=CephObjectConn
[2022-06-17 03:39:58,466] {base.py:79} INFO - Using connection to: id: CephObjectConn. Host: http://***:3***1, Port: None, Schema: , Login: , Password: None, extra: {'aws_access_key_id': '***', 'aws_secret_access_key': '***'}
[2022-06-17 03:39:58,466] {base_aws.py:180} INFO - Credentials retrieved from extra_config
[2022-06-17 03:39:58,466] {base_aws.py:92} INFO - Creating session with aws_access_key_id=****-key region_name=None
[2022-06-17 03:39:58,473] {base_aws.py:167} INFO - role_arn is None
[2022-06-17 03:39:58,544] {logging_mixin.py:109} INFO - copy {'Bucket': 'hgkim', 'Key': 'tmp/'} -> tmp_test/
[2022-06-17 03:39:58,608] {logging_mixin.py:109} INFO - copy {'Bucket': 'hgkim', 'Key': 'tmp/dt=20220225/'} -> tmp_test/dt=20220225/
[2022-06-17 03:39:58,631] {logging_mixin.py:109} INFO - copy {'Bucket': 'hgkim', 'Key': 'tmp/dt=20220225/test/'} -> tmp_test/dt=20220225/test/
[2022-06-17 03:39:58,851] {logging_mixin.py:109} INFO - copy {'Bucket': 'hgkim', 'Key': 'tmp/dt=20220225/test/tmp1.parquet'} -> tmp_test/dt=20220225/test/tmp1.parquet
[2022-06-17 03:39:58,906] {logging_mixin.py:109} INFO - copy {'Bucket': 'hgkim', 'Key': 'tmp/sample2.parquet'} -> tmp_test/sample2.parquet
[2022-06-17 03:39:58,923] {taskinstance.py:1219} INFO - Marking task as SUCCESS. dag_id=hook-S3CopyFolderObjectOperator, task_id=check_ready, execution_date=20220617T033950, start_date=20220617T033958, end_date=20220617T033958
[2022-06-17 03:39:58,972] {local_task_job.py:151} INFO - Task exited with return code 0

+ 기타

Awscli가 boto에 비해서 30배나 빠르다고 해서 잠깐 생각해봄 
'awscli does the job 30 times faster for me than boto coping and deleting each key.'

Q. 만약 Awscli 를 사용한다면 어떻게 진행해야할까?
A. 4가지 방법이 떠오름 생각만해봄

1. pythonOperator
- airflow 기본 이미지에 pip install

import os
if os.environ.get('LC_CTYPE', '') == 'UTF-8':
    os.environ['LC_CTYPE'] = 'en_US.UTF-8'

from awscli.clidriver import create_clidriver
driver = create_clidriver()
driver.main('s3 mv source_bucket target_bucket --recursive'.split())

2. K8sPodOpearotor
- 파라미터 전달만 받으면 실행할 수 있는 이미지 만들기

docker pull amazon/aws-cli

3. SSHOperator
- remote server에 command 날려서 실행하기

aws s3 cp s3://source-awsexamplebucket/ s3://destination-awsexamplebucket/ \
--recursive \
--exclude "*"\ 
--include "0*"\
--include "1*"

https://aws.amazon.com/ko/premiumsupport/knowledge-center/s3-large-transfer-between-buckets/

참고

Apache Airflow: operator to copy s3 to s3
- https://stackoverflow.com/questions/55135735/apache-airflow-operator-to-copy-s3-to-s3

How to read multiple files in a directory, all of which are csv.gzip with Airflow S3 Hook or boto3?
- https://stackoverflow.com/questions/63642502/how-to-read-multiple-files-in-a-directory-all-of-which-are-csv-gzip-with-airflo
Python boto3 복사, 업로드, 동기화 유용한 기능
- https://blog.actin.kr/devlog/2020/01/21/boto3-copy-folder/
S3 storage를 위한 boto3 사용법 정리
https://gaussian37.github.io/python-etc-s3_storage_for_boto3/htt

Airflow provider amazon
- https://github.com/apache/airflow/blob/main/airflow/providers/amazon/aws/operators/s3.py
- https://github.com/apache/airflow/blob/main/airflow/providers/amazon/aws/example_dags/example_s3.py

728x90
반응형