'데이터 엔지니어'로 성장하기

정리하는 걸 좋아하고, 남이 읽으면 더 좋아함

Data/Airflow

Airflow) S3CopyObjectOperator 이용해서 copy 하기

MightyTedKim 2022. 6. 2. 09:41
728x90
반응형

s3 부분에서 에러가 간혹 발생되서, 핵심 로직과 분리하기로 결정
근데 이거는 파일 하나만 가능함. 폴더는 어떡하지

요약

  1. 상황
  2. 결과
  3. 참고

설명

상황

as-is

  • python 코드로 모든 로직을 처리
  • 단점
    • 코드가 복잡해지고 관리가 되지 않음
    • image안에 코드가 들어가면 어디서 에러가 나는지 알 수 없음

갑자기 모든 것을 바꿀 수 없겠지만, 하나씩 바꿔가기로 함

 

to-be

  • s3 copy,delete를 aws.operators를 이용하기로함
  • 이유
    • 핵심 로직과 사이드카 같은 로직 분리(복사/이동/삭제)
    • s3 operator는 많이 사용하기 때문에 버그가 적을거라고 생각

결과

 
from datetime import datetime

from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from airflow.providers.amazon.aws.operators.s3_copy_object import S3CopyObjectOperator
from airflow.providers.amazon.aws.operators.s3_delete_objects import S3DeleteObjectsOperator

with DAG(
    dag_id="copy_test",
    start_date=datetime(2021629),
    schedule_interval="@daily",
    catchup=False,
    #default_args={"retries": 1},
    #template_searchpath="include/sql",
    #default_view="graph",
as dag:
    begin = DummyOperator(task_id="begin")
    end = DummyOperator(task_id="end")

    copy1 = S3CopyObjectOperator(
        task_id="copy1",
        source_bucket_name="hgkim",
        source_bucket_key="tmp/sample.parquet",
        dest_bucket_name="hgkim",
        dest_bucket_key="tmp/sample_for_delete.parquet",
        aws_conn_id="CephObjectConn",
    )

    copy2 = S3CopyObjectOperator(
        task_id="copy2",
        source_bucket_name="hgkim",
        source_bucket_key="tmp/sample.parquet",
        dest_bucket_name="hgkim",
        dest_bucket_key="tmp/sample_for_copy.parquet",
        aws_conn_id="CephObjectConn",
    )

    # Delete the landed Accounts data file after persisting to the data lake
    delete_copy1 = S3DeleteObjectsOperator(
        task_id="delete_copy1",
        bucket="hgkim",
        keys="tmp/sample_for_delete.parquet",
        aws_conn_id="CephObjectConn",
    )

    # Set task dependencies.
    begin >> copy1 >> copy2 >> delete_copy1 >> end

 

참고

A Complete Guide to Airflow S3 Connection Simplified

- https://hevodata.com/learn/airflow-s3-connection/#s3

 

A Complete Guide to Airflow S3 Connection Simplified

This article provides a comprehensive guide on setting up Airflow S3 Connection. It also provides information on Airflow and S3.

hevodata.com

Amazon Web Services Connection

- https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/connections/aws.html

 

Amazon Web Services Connection — apache-airflow-providers-amazon Documentation

 

airflow.apache.org

Providers/Amazon/Modules/S3CopyObjectOperator

- https://registry.astronomer.io/providers/amazon/modules/s3copyobjectoperator/#example-dags

 

S3CopyObjectOperator

Creates a copy of an object that is already stored in S3.

registry.astronomer.io

Amazon S3 Operators

https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/operators/s3.html

 

Amazon S3 Operators — apache-airflow-providers-amazon Documentation

 

airflow.apache.org

 

728x90
반응형