728x90
반응형
s3 부분에서 에러가 간혹 발생되서, 핵심 로직과 분리하기로 결정
근데 이거는 파일 하나만 가능함. 폴더는 어떡하지
요약
- 상황
- 결과
- 참고
설명
상황
as-is
- python 코드로 모든 로직을 처리
- 단점
- 코드가 복잡해지고 관리가 되지 않음
- image안에 코드가 들어가면 어디서 에러가 나는지 알 수 없음
갑자기 모든 것을 바꿀 수 없겠지만, 하나씩 바꿔가기로 함
to-be
- s3 copy,delete를 aws.operators를 이용하기로함
- 이유
- 핵심 로직과 사이드카 같은 로직 분리(복사/이동/삭제)
- s3 operator는 많이 사용하기 때문에 버그가 적을거라고 생각
결과
from datetime import datetime
from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from airflow.providers.amazon.aws.operators.s3_copy_object import S3CopyObjectOperator
from airflow.providers.amazon.aws.operators.s3_delete_objects import S3DeleteObjectsOperator
with DAG(
dag_id="copy_test",
start_date=datetime(2021, 6, 29),
schedule_interval="@daily",
catchup=False,
#default_args={"retries": 1},
#template_searchpath="include/sql",
#default_view="graph",
) as dag:
begin = DummyOperator(task_id="begin")
end = DummyOperator(task_id="end")
copy1 = S3CopyObjectOperator(
task_id="copy1",
source_bucket_name="hgkim",
source_bucket_key="tmp/sample.parquet",
dest_bucket_name="hgkim",
dest_bucket_key="tmp/sample_for_delete.parquet",
aws_conn_id="CephObjectConn",
)
copy2 = S3CopyObjectOperator(
task_id="copy2",
source_bucket_name="hgkim",
source_bucket_key="tmp/sample.parquet",
dest_bucket_name="hgkim",
dest_bucket_key="tmp/sample_for_copy.parquet",
aws_conn_id="CephObjectConn",
)
# Delete the landed Accounts data file after persisting to the data lake
delete_copy1 = S3DeleteObjectsOperator(
task_id="delete_copy1",
bucket="hgkim",
keys="tmp/sample_for_delete.parquet",
aws_conn_id="CephObjectConn",
)
# Set task dependencies.
begin >> copy1 >> copy2 >> delete_copy1 >> end
|
참고
A Complete Guide to Airflow S3 Connection Simplified
- https://hevodata.com/learn/airflow-s3-connection/#s3
Amazon Web Services Connection
- https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/connections/aws.html
Providers/Amazon/Modules/S3CopyObjectOperator
- https://registry.astronomer.io/providers/amazon/modules/s3copyobjectoperator/#example-dags
Amazon S3 Operators
https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/operators/s3.html
728x90
반응형
'Data > Airflow' 카테고리의 다른 글
Airflow) custom operator 만들기_s3 Prefix copy (0) | 2022.06.12 |
---|---|
Airflow) pod external sigterm으로 죽는 현상_해결못해서 retries함 (0) | 2022.06.02 |
Slipp) Airflow2.0스터디_워크플로 트러거_4주차(6장) (0) | 2022.05.28 |
Slipp) Airflow2.0 스터디_3주차(4/5장) (0) | 2022.05.15 |
Airflow) context, kwargs 혼용 실습 (0) | 2022.05.09 |