udemy)Airflow Operators Guide_1,2장 :: mightytedkim
kubernetes Pod Operator 공부할 때 참고했던 사람의 강의였기 때문에 신뢰가 감
ETL파이프 라인을 실무에서 세팅하는 방법을 급하게 알아야하기 때문에 좋음
심지어 버전도 2.x임, 회사에서 2.1.4사용하고 있어서 너무 좋음
총 7시간이고 해당 포스트는 1,2 섹션 메모용
Section 1: Introduction 5/5 10min
specific operator에 대해서 배우는 강의
dag dependency와 다양한 operator에대해서 배우게 됨
Section 2: The BaseOperator Exposed 26/26 3hr 28min
Bash Operator에 대해서 알려주며, 근본을 알아야한다고 강조함
Operator의 특징에 대해 알려주는데, 너무 좋았음
Idempotency -> same input, same ouput
Context
def _my_function_(**context): #airflow 2.xx 이후 부터는 provide_context=True 생략 가능
def _my_function_(ds): #current execution date
모든 operator는 BaseOperator를 상속함
BashOperator가 근본이라는 걸 알려줌
task_id, 중복나면 에러나는 것을 알려줌
dag versioning 얼탱 없음... 그냥 dag_id를 바꾸라함
불편할 수 있지만 과거 내역을 볼 필요가 있기 때문에 이렇게 관리하는게 맞다고함
그런데 실제 이렇게하면 귓방망이 맞을거같은데.. 좀 아쉽다.
owners, 처음 알았던 개념 디테일 ㅋㅋ
t1 = PythonOperator(
task_id='t1',
python_callable=hello,
owner='hgkim',
dag=dag,
)
t2 = PythonOperator(
task_id='t2',
python_callable=hello,
owner='tedkim',
dag=dag,
)
|
audit_logs 에서 Owner가 무슨 작업했는지 볼 수 있음
- user 또는 task의 owner 중 어느 것인지 구분할 필요가 있음
- 어떤 task는 task의 owner가 표시됨
- 어떤 task는 airflow 같은 user가 표시됨(ex. cli_task_run)
start_date, schedule_interval, 아주 중요한 개념
dag = DAG(
'my_dag',
start_date=datetime.now(),
schedule_interval='*/1 * * * *',
catchup=False,
tags=['hgkim']
)
|
task가 실패했을 떄, api 가져올 때 유용함, 실패할 수 있으니까
- bash_command="echo 'task_b' && exit 1"
- retries=3
- retry_delay=timedelta(seconds=10), bash_command="sleep 5 && exit 1"
- retry_exponential_backoff=True -> 재시작 시작 시간이 계속 길어짐
- {{ti.try_number}} -> 재식작 횟수를 알 수 있음
실패했을 때 email 보내기, 이거는 내부망에서 smtp 정보 받아서 진행해야겠음 편할듯
- param
- email = [xxxx@naver.com]
- email_on_retry
- email_on_failure
docker exec -it [airflow-scheduler]
- airflow tasks test [dag_id] [task_id] [2021-01-01]
dag간의 의존성에 대해서 설명
선행 task 가 성공해야지 뒤에 작업이 실행됨
depends_on_past -> 선행 task 가 성공해야지 뒤에 작업이 실행됨
wait_for_downstream : 사실 정확히 이해가지 않음
-> depends_on_past가 true 됨, 앞의 job가 성공해야지 작동됨. (에러난 job을 뒤의 scheudle interval로 계속 실행하지 않음)
- 모든 task를 확인하는게 아니라, task에 해당 설정값이 정의된것만 확인 (direct downstream tasks)
ex.) 같은 리소스를 여러 job들에서 사용할 경우
1 | 2 | 3 |
Read - wait_for_downstream=True |
Read - wait_for_downstream=True |
Read - wait_for_downstream=True |
Drop | Drop | Drop |
Create | Create | Create |
api 받아오는 경우 한번에 많은 요청이 가면 오류가 날 수 있기 때문에, 동시에 여러개가 실행되지 않게하는 이 작업은 의미가 있을듯
17. Pool party! 12min
순차적으로 진행되는 작업, 동시에 진행될 수 잇는 작업 수를 보기 위해서 알아야함
TBD
priority_weight
높을 수록 중요도가 높아짐. 기본적으로 1임. 그래서 어떤게 먼저 실행될지 모름
A (1) ->
B (2) -> D (6)
C (3) ->
결과 : C-> B-> A -> D (앞의 dependency가 우선)
동일한 pool에 있을 때만 효과가 있음, 따라서 pool도 신경써야함
Pool A : task a(4), task b(3)
Pool B : task c(2) task d(1)
weight_rule
모든 task는 동일한 weight을 가지고 있는데, 이게 upstream/downstream등에 따라 달라짐
(downstream)
A(4) -> C(3) -> E(2) -> F(1)
B(3) -> D(2) --------------
결과 : depth가 깊을 수록 달라짐, A->B->D->D->E->F
(upstream)
A(1) -> C(2) -> E(3) -> F(6)
B(1) -> D(2) --------------
결과: 총 6, 이걸 나눠가짐, A->B->D->D->E->F
그럼 upstream은 왜 필요한가?
-> 실무에서는 큰 의미 없음.
(absolute)
A(5) -> C(2) -> E(3) -> F(6)
B(6) -> D(2) --------------
결과: 절대값 B->A->D->D->E->F
과제
process_a = BashOperator( owner='marc', task_id="process_a", retries=3, retry_exponential_backoff=True, retry_delay=timedelta(seconds=10), bash_command="echo '{{ ti.try_number }}' && sleep 20", priority_weight=2, # HERE pool="process_tasks" ) |
과제
너무 오래걸리면 warining 받을 수 있음. 2가지가 있음
이걸 더 자주 사용할 것 같음
data pipeline이 죽지 않고 stuck 될 수 있는 상황을 고려
execution_timeout=timedelta(seconds=12)
하지만 execution_timeout가 slas는 다름
SLA는 그냥 warning, timeout은 fail 때림
24. Define a timeout 1min
내용 없음, 강의 만들다 실수한듯
여기는 아무 내용이 없
callback 4가지가 있음
1. on_success_callback=_extract_a_on_success
def _on_success_callback=_extract_a_on_success(context):
print(context)
2. on_failure_callback=_extract_a_on_failure
def _extract_a_on_failure=_extract_a_on_failure(context):
print(context)
exception이 나오는데, 이걸 잘 활용하면 좋음
3. on_retry_callback
4. on_failure_callback
26. Catch a timeout with the failure callback 1min
관련 링크와 내용을 공유해줌
xcom, task간 파라미터 전달할 수 잇음
pool과는 다른 task 레벨의 concurrency
더 좋은 표현법
from airflow.models.baseoperator import cross_downstream, chain
t1 >> t2
t1 >> t3
t1 >> t4
|
cross_downstream(t1, [t2,t3,t4]) |
다른 예시
t1 >> t3 t1 >> t4 t2 >> t3
t2 >> t4
|
chain([t1,t2], [t3,t4]) # list의 길이가 동일해야함 앞에 t0넣어도됨chain(t0,[t1,t2], [t3,t4]) # list의 길이가 동일해야함 |
아 빡셋다. 그래도 알고 사용안하는것과 모르는것은 천지차이라서
나중에 적용할 때 참고해야겠다. 기존 파이프라인 건드리는건 위험해서 일단 패스