Airflow) custom operator 만들기_s3 Prefix copy
+ 요즘 docker Image에 다 몰려있는 로직을 task로 쪼개는 작업을 진행하고 있어요.
처음부터 airflow를 공부하고 적용했어야했는데, 과거의 내가 원망스럽네요.
요약
hook/operator 공부한김에 자주 사용할만한 기능을 operator로 하나 만들었어요
from custom.s3.operators.S3CopyFolderObjectOperator import S3CopyFolderObjectOperator
with DAG(
'hook-S3CopyFolderObjectOperator',
) as dag:
S3CopyFolderObjectOperator(
task_id="check_ready",
aws_conn_id="CephObjectConn",
source_bucket_name="hgkim",
source_bucket_key_prefix="tmp/", #custom
dest_bucket_name="hgkim",
dest_bucket_key_prefix="tmp_test/" #custom
)
구성은 1.상황 -> 2.고민 -> 3. 적용 로 잡았습니다
내용
1. 상황
airflow 이용해서 s3 파일들(폴더)를 이동시키는 효율적인 방법은 뭐가 있을까?
PythonOperator에서 s3fs를 사용하고 있는데, 여기에는 copy 가 없더라고요.
move 만 있어요. 그럴리가 없다고 생각했는데, 진짜 없음
그래서 boto3에는 copy가 있길래 사용하려다가, airflow를 airflow답게 사용해보기로 햇어요
k8s에서 돌릴건데, 환경은 아래와 같아요
- 환경
- k8s, ceph object storage, airflow
적용하려는 파이프라인의 데이터는 아래 같다고 가정했어요
- 데이터
- 용량: 3,40G/day 이상
- 파일: 3~4000/day
- 성격 : 365일 들어옴
pythonOperator로 하던 걸 변경하는 거라서, 아래 조건을 최대한 만족하려 해요
- 조건
- 영향도 줄이기 -> airflow image에 pip install 하고 싶지 않음 (불안함)
- 관리 편의성 -> 적업용 image를 따로 관리하고 싶지 않음 (귀찮음)
- 효율성 -> for문 돌리는 것도 괜찮은데, connection 은 하나로 진행하고 싶음 (불안함)
2. 고민
여기서 부터는 폭풍 자문자답
AS-IS
기존에는 pythonOperator로 모든 로직을 풀고 있어서, 관리가 안되는 문제가 발생 ㅜ
Q. pythonOperator는 왜 사용하면 안될까?
- 사용은 가능함
- 하지만 자주 사용할 작업이라 만드는게 좋을 것 같음
- hook을 사용하면, connection을 이용해서 보안 정보들을 관리 가능
TO-BE
여러 파일을 복사하는 operator는 따로 없음
- 단일 파일만 있음
Q. Operator로 만들어서 사용할 수 는 없을까?
A. class 만들어서 실행
- boto의 s3객체를 뽑을 수 있어서 그대로 사용!
- dest_bucket = s3_hook.get_bucket(self.dest_bucket_name)
from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Union
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
class S3CopyFolderObjectOperator(BaseOperator):
template_fields: Sequence[str] = (
'source_bucket_key_prefix',
'dest_bucket_key_prefix',
'source_bucket_name',
'dest_bucket_name',
)
def __init__(
self,
*,
source_bucket_key_prefix: str,
dest_bucket_key_prefix: str,
source_bucket_name: Optional[str] = None,
dest_bucket_name: Optional[str] = None,
source_version_id: Optional[str] = None,
aws_conn_id: str = 'aws_default',
verify: Optional[Union[str, bool]] = None,
acl_policy: Optional[str] = None,
**kwargs,
):
super().__init__(**kwargs)
self.source_bucket_key_prefix = source_bucket_key_prefix
self.dest_bucket_key_prefix = dest_bucket_key_prefix
self.source_bucket_name = source_bucket_name
self.dest_bucket_name = dest_bucket_name
self.source_version_id = source_version_id
self.aws_conn_id = aws_conn_id
self.verify = verify
self.acl_policy = acl_policy
def execute(self, context: 'Context'):
s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
dest_bucket = s3_hook.get_bucket(self.dest_bucket_name)
for old_key in s3_hook.list_keys(bucket_name=self.source_bucket_name, prefix=self.source_bucket_key_prefix):
old_source = {'Bucket': self.source_bucket_name, 'Key': old_key}
new_key = old_key.replace(self.source_bucket_key_prefix, self.dest_bucket_key_prefix, 1)
new_obj = dest_bucket.Object(new_key)
new_obj.copy(old_source)
print(f"copy {old_source} -> {new_key}")
실행하는 DAG는 아래와 같아요
from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from custom.s3.operators.S3CopyFolderObjectOperator import S3CopyFolderObjectOperator
import time
import logging
args = { "owner": "hgkim", "description": "An example"}
logging.basicConfig(format='%(asctime)s,%(msecs)d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s',
datefmt='%Y-%m-%d:%H:%M:%S',
level=logging.DEBUG)
logger = logging.getLogger("hook-s3fs-sample")
with DAG(
'hook-S3CopyFolderObjectOperator',
default_args=args,
schedule_interval=None,
start_date=datetime(2021, 6, 29),
catchup=False,
tags=['hook','hgkim']
) as dag:
start_task = DummyOperator(task_id="start_task")
end_task = DummyOperator(task_id="end_task")
copy_task = S3CopyFolderObjectOperator(
task_id="check_ready",
aws_conn_id="CephObjectConn",
source_bucket_name="hgkim",
source_bucket_key_prefix="tmp/", #custom
dest_bucket_name="hgkim",
dest_bucket_key_prefix="tmp_test/" #custom
)
start_task >> copy_task >> end_task
- 로그
AIRFLOW_CTX_DAG_OWNER=hgkim
AIRFLOW_CTX_DAG_ID=hook-S3CopyFolderObjectOperator
AIRFLOW_CTX_TASK_ID=check_ready
AIRFLOW_CTX_EXECUTION_DATE=2022-06-17T03:39:50.828392+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-06-17T03:39:50.828392+00:00
[2022-06-17 03:39:58,391] {base_aws.py:400} INFO - Airflow Connection: aws_conn_id=CephObjectConn
[2022-06-17 03:39:58,405] {base.py:79} INFO - Using connection to: id: CephObjectConn. Host: http://***:3***1, Port: None, Schema: , Login: , Password: None, extra: {'aws_access_key_id': '***', 'aws_secret_access_key': '***'}
[2022-06-17 03:39:58,406] {base_aws.py:180} INFO - Credentials retrieved from extra_config
[2022-06-17 03:39:58,407] {base_aws.py:92} INFO - Creating session with aws_access_key_id=****-key region_name=None
[2022-06-17 03:39:58,422] {base_aws.py:167} INFO - role_arn is None
[2022-06-17 03:39:58,456] {base_aws.py:400} INFO - Airflow Connection: aws_conn_id=CephObjectConn
[2022-06-17 03:39:58,466] {base.py:79} INFO - Using connection to: id: CephObjectConn. Host: http://***:3***1, Port: None, Schema: , Login: , Password: None, extra: {'aws_access_key_id': '***', 'aws_secret_access_key': '***'}
[2022-06-17 03:39:58,466] {base_aws.py:180} INFO - Credentials retrieved from extra_config
[2022-06-17 03:39:58,466] {base_aws.py:92} INFO - Creating session with aws_access_key_id=****-key region_name=None
[2022-06-17 03:39:58,473] {base_aws.py:167} INFO - role_arn is None
[2022-06-17 03:39:58,544] {logging_mixin.py:109} INFO - copy {'Bucket': 'hgkim', 'Key': 'tmp/'} -> tmp_test/
[2022-06-17 03:39:58,608] {logging_mixin.py:109} INFO - copy {'Bucket': 'hgkim', 'Key': 'tmp/dt=20220225/'} -> tmp_test/dt=20220225/
[2022-06-17 03:39:58,631] {logging_mixin.py:109} INFO - copy {'Bucket': 'hgkim', 'Key': 'tmp/dt=20220225/test/'} -> tmp_test/dt=20220225/test/
[2022-06-17 03:39:58,851] {logging_mixin.py:109} INFO - copy {'Bucket': 'hgkim', 'Key': 'tmp/dt=20220225/test/tmp1.parquet'} -> tmp_test/dt=20220225/test/tmp1.parquet
[2022-06-17 03:39:58,906] {logging_mixin.py:109} INFO - copy {'Bucket': 'hgkim', 'Key': 'tmp/sample2.parquet'} -> tmp_test/sample2.parquet
[2022-06-17 03:39:58,923] {taskinstance.py:1219} INFO - Marking task as SUCCESS. dag_id=hook-S3CopyFolderObjectOperator, task_id=check_ready, execution_date=20220617T033950, start_date=20220617T033958, end_date=20220617T033958
[2022-06-17 03:39:58,972] {local_task_job.py:151} INFO - Task exited with return code 0
+ 기타
Awscli가 boto에 비해서 30배나 빠르다고 해서 잠깐 생각해봄
'awscli does the job 30 times faster for me than boto coping and deleting each key.'
Q. 만약 Awscli 를 사용한다면 어떻게 진행해야할까?
A. 4가지 방법이 떠오름 생각만해봄
1. pythonOperator
- airflow 기본 이미지에 pip install
import os
if os.environ.get('LC_CTYPE', '') == 'UTF-8':
os.environ['LC_CTYPE'] = 'en_US.UTF-8'
from awscli.clidriver import create_clidriver
driver = create_clidriver()
driver.main('s3 mv source_bucket target_bucket --recursive'.split())
2. K8sPodOpearotor
- 파라미터 전달만 받으면 실행할 수 있는 이미지 만들기
docker pull amazon/aws-cli |
3. SSHOperator
- remote server에 command 날려서 실행하기
aws s3 cp s3://source-awsexamplebucket/ s3://destination-awsexamplebucket/ \
--recursive \
--exclude "*"\
--include "0*"\
--include "1*"
https://aws.amazon.com/ko/premiumsupport/knowledge-center/s3-large-transfer-between-buckets/
참고
Apache Airflow: operator to copy s3 to s3
- https://stackoverflow.com/questions/55135735/apache-airflow-operator-to-copy-s3-to-s3
How to read multiple files in a directory, all of which are csv.gzip with Airflow S3 Hook or boto3?
- https://stackoverflow.com/questions/63642502/how-to-read-multiple-files-in-a-directory-all-of-which-are-csv-gzip-with-airflo
Python boto3 복사, 업로드, 동기화 유용한 기능
- https://blog.actin.kr/devlog/2020/01/21/boto3-copy-folder/
S3 storage를 위한 boto3 사용법 정리
https://gaussian37.github.io/python-etc-s3_storage_for_boto3/htt
Airflow provider amazon
- https://github.com/apache/airflow/blob/main/airflow/providers/amazon/aws/operators/s3.py
- https://github.com/apache/airflow/blob/main/airflow/providers/amazon/aws/example_dags/example_s3.py