Data/Airflow
Airflow) S3CopyObjectOperator 이용해서 copy 하기
MightyTedKim
2022. 6. 2. 09:41
728x90
반응형
s3 부분에서 에러가 간혹 발생되서, 핵심 로직과 분리하기로 결정
근데 이거는 파일 하나만 가능함. 폴더는 어떡하지
요약
- 상황
- 결과
- 참고
설명
상황
as-is
- python 코드로 모든 로직을 처리
- 단점
- 코드가 복잡해지고 관리가 되지 않음
- image안에 코드가 들어가면 어디서 에러가 나는지 알 수 없음
갑자기 모든 것을 바꿀 수 없겠지만, 하나씩 바꿔가기로 함
to-be
- s3 copy,delete를 aws.operators를 이용하기로함
- 이유
- 핵심 로직과 사이드카 같은 로직 분리(복사/이동/삭제)
- s3 operator는 많이 사용하기 때문에 버그가 적을거라고 생각
결과
from datetime import datetime
from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from airflow.providers.amazon.aws.operators.s3_copy_object import S3CopyObjectOperator
from airflow.providers.amazon.aws.operators.s3_delete_objects import S3DeleteObjectsOperator
with DAG(
dag_id="copy_test",
start_date=datetime(2021, 6, 29),
schedule_interval="@daily",
catchup=False,
#default_args={"retries": 1},
#template_searchpath="include/sql",
#default_view="graph",
) as dag:
begin = DummyOperator(task_id="begin")
end = DummyOperator(task_id="end")
copy1 = S3CopyObjectOperator(
task_id="copy1",
source_bucket_name="hgkim",
source_bucket_key="tmp/sample.parquet",
dest_bucket_name="hgkim",
dest_bucket_key="tmp/sample_for_delete.parquet",
aws_conn_id="CephObjectConn",
)
copy2 = S3CopyObjectOperator(
task_id="copy2",
source_bucket_name="hgkim",
source_bucket_key="tmp/sample.parquet",
dest_bucket_name="hgkim",
dest_bucket_key="tmp/sample_for_copy.parquet",
aws_conn_id="CephObjectConn",
)
# Delete the landed Accounts data file after persisting to the data lake
delete_copy1 = S3DeleteObjectsOperator(
task_id="delete_copy1",
bucket="hgkim",
keys="tmp/sample_for_delete.parquet",
aws_conn_id="CephObjectConn",
)
# Set task dependencies.
begin >> copy1 >> copy2 >> delete_copy1 >> end
|
참고
A Complete Guide to Airflow S3 Connection Simplified
- https://hevodata.com/learn/airflow-s3-connection/#s3
Amazon Web Services Connection
- https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/connections/aws.html
Providers/Amazon/Modules/S3CopyObjectOperator
- https://registry.astronomer.io/providers/amazon/modules/s3copyobjectoperator/#example-dags
Amazon S3 Operators
https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/operators/s3.html
728x90
반응형