Data/Airflow
udemy)Airflow Operators Guide_5,6,7장 :: mightytedkim
MightyTedKim
2022. 3. 1. 23:10
728x90
반응형
3/1절 연휴동안 다 들을라했는데, 웹툰보고 놀다가 못함 ㅜㅜ
이번주 평일에 시간을 내야겠음
Section 5: DAG dependencies | 1hr 7min
-
45. Introduction 1min1. 파트너A/B/C에게서 데이터를 받는데, 각 파트너마다 dag가 있다고 치자2. 다 완료되면, 특정 공통된 task를 실행한다.
- 46. SubDagOperator 11min
복잡한 dag가 있는데, 특정 로직을 공유한다고 치자.
이 때 그룹핑할 수 있고, 시각적으로 명확하게 하는 역할을 한다.
장단점이 있음.
1. 시각화하려고 새로운 dag를 만들어야함
2. 새로운 dag를 또 후출해야하는 불편함이 있음
default_args는 똑같아야함. 부모/자식 간의 설정
dag_id도 명확하게 규정되야함.
worker_slot도 필요함
pool을 정의했다면, sub_dag가 공유하지 않을 수도 있음'
아래처럼 end를 a/b/c 이후에 공통으로 실행한다면,
아래처럼 그루핑을 할 수 있음
$ parent_dag.py from airflow.operators.bash import BashOperator from airflow.operators.subdag import SubDagOperator from subdag import subdag_factory with DAG (): start=BashOperator("start", bash_command="echo 'start'") group_training_tasks=SubDagOperator( "grouping_training_tasks", sub_dag=subdag_factory('parent_dag', 'grouping_training_tasks', default_args) ) #trainning_a=BashOperator("trainning_a", bash_command="echo 'trainning_a'") #trainning_b=BashOperator("trainning_b", bash_command="echo 'trainning_b'") #trainning_c=BashOperator("trainning_c", bash_command="echo 'trainning_c'") end=BashOperator("end", bash_command="echo 'end'") #start >> [training_a, training_b. training_c] >> end #start >> group_training_tasks >> end -------------------------- $ subdag.py from airflow.models import DAG from airflow.operators.bash import BashOperator def subdag_factory(parent_dag_id, subdag_id, default_args): with DAG(f"{parent_dag_id}.{sub_dag_id}", default_args=default_args) as dag: trainning_a=BashOperator("trainning_a", bash_command="echo 'trainning_a'") trainning_b=BashOperator("trainning_b", bash_command="echo 'trainning_b'") trainning_c=BashOperator("trainning_c", bash_command="echo 'trainning_c'") return dag |
- 47. Deep dive into the SubDagOperator 11min
subdag의 deadlock 이해
테스트를 위해 airflow pareallesim을 3로 설정하고 grouping task 3개를 아래처럼 설정
# airflow config -> AIRFLOW__CORE__PAREALLELISM=3 $ parent_dag.py from airflow.operators.bash import BashOperator from airflow.operators.subdag import SubDagOperator from subdag import subdag_factory with DAG (): start=BashOperator("start", bash_command="echo 'start'") group_training_tasks=SubDagOperator( "grouping_training_tasks", sub_dag=subdag_factory('parent_dag', 'grouping_training_tasks', default_args) ) group_training_tasks=SubDagOperator( "grouping_training_tasks_2", sub_dag=subdag_factory('parent_dag', 'grouping_training_tasks_2,' default_args) ) group_training_tasks=SubDagOperator( "grouping_training_tasks_3", sub_dag=subdag_factory('parent_dag', 'grouping_training_tasks_3', default_args) ) start >> [group_training_tasks,group_training_tasks_2,group_training_tasks_3] >> end |
subdag 도 task라서 그룹핑하는데 이미 task3개를 점유하고 있는거임.
그래서 뒤에 task들이 실행되지 않는 문제가 발생함
이걸 해결하기 위해서는 mode 설정을 변경해야함
아래 예시는 5분뒤에 reschedule하라는 것
group_training_tasks=SubDagOperator( "grouping_training_tasks_3", sub_dag=subdag_factory('parent_dag', 'grouping_training_tasks_3', default_args), mode="reschedule", timeout=60*5 #5분 ) |
pool을 이용하면 안되냐 궁금해할 수 있는데, 가능함
하지만 subDagOpertor만 가능하고, 실질적인 로직이 실행되는 task에는 영향을 미치지 않음
group_training_tasks=SubDagOperator( "grouping_training_tasks_3", sub_dag=subdag_factory('parent_dag', 'grouping_training_tasks_3', default_args), mode="reschedule", pool=training_pool# subdag에만 적용됨, 다시 말해 training_a/b/c는 해당 pool에서 실행 안됨 ) |
그럼 또 어떻게 하냐, subdag.py에 설정을 할 수 있음
$ subdag.py from airflow.models import DAG from airflow.operators.bash import BashOperator def subdag_factory(parent_dag_id, subdag_id, default_args): default_args['pools'] = training_pool with DAG(f"{parent_dag_id}.{sub_dag_id}", default_args=default_args) as dag: trainning_a=BashOperator("trainning_a", bash_command="echo 'trainning_a'") trainning_b=BashOperator("trainning_b", bash_command="echo 'trainning_b'") trainning_c=BashOperator("trainning_c", bash_command="echo 'trainning_c'") return dag |
편하려고 쓰려했는데, 점점 더 산으로 가는 느낌이 듦 ㅜㅜ
---
아래 예시는 subdag로 parameter 전달하는것
group_training_tasks=SubDagOperator( "grouping_training_tasks_3", sub_dag=subdag_factory('parent_dag', 'grouping_training_tasks_3', default_args), mode="reschedule", conf={'output':'/pot/airflow/ml'} ) $ subdag.py from airflow.models import DAG from airflow.operators.bash import BashOperator def subdag_factory(parent_dag_id, subdag_id, default_args): with DAG(f"{parent_dag_id}.{sub_dag_id}", default_args=default_args) as dag: trainning_a=BashOperator("trainning_a", bash_command="echo '{{dag_run.conf['output']}}'") trainning_b=BashOperator("trainning_b", bash_command="echo 'trainning_b'") trainning_c=BashOperator("trainning_c", bash_command="echo 'trainning_c'") return dag |
propagate_skipped_state=False 라는 설정값도 있음
- subdag에 skip이 있으면 다음 task가 실행되지 않음
(이건 사실 이해가 잘 안감)
group_training_tasks=SubDagOperator( "grouping_training_tasks_3", sub_dag=subdag_factory('parent_dag', 'grouping_training_tasks_3', default_args), mode="reschedule", conf={'output':'/pot/airflow/ml'}, propagate_skipped_state=False #default값이 False ) |
강의에서도 차라리 TaskGroups를 쓰라고 함. 똑같은데 더 쉬워서
- 48. Discover the TaskGroups 7min
subdag Operator 너무 어려움. 그래서 다른 것을 소개해줌
airflow2.0에 taskGroups가 나옴
$ parent_dag_taskgroup.py from airflow.operators.bash import BashOperator from subdag import subdag_factory from task_group import training_groups with DAG (): start=BashOperator("start", bash_command="echo 'start'") group_training_tasks=training_groups() end=BashOperator("end", bash_command="echo 'end'") start >> group_training_tasks >> end -------------------------- $ task_group.py from airflow.models import DAG from airflow.operators.bash import BashOperator from airflow.utils.task_group import TaskGroup def training_groups(): with TaskGroup("training_tasks") as training_tasks: trainning_a=BashOperator("trainning_a", bash_command="echo 'trainning_a'") trainning_b=BashOperator("trainning_b", bash_command="echo 'trainning_b'") trainning_c=BashOperator("trainning_c", bash_command="echo 'trainning_c'") with TaskGroup("publish_tasks") as publish_tasks: publish_a=BashOperator("publish_a", bash_command="echo 'publish_a'") publish_b=BashOperator("publish_b", bash_command="echo 'publish_b'") publish_c=BashOperator("publish_c", bash_command="echo 'publish_c'") return trainging_tasks |
- 49. TriggerDagRunOperator 16min
dag dependency 관리 : 실무 예시
동일한 코드를 A,B,C에 넣기 싫어서, 따로 정의할 수 있음
이 operator가 나오기 전까지는 관리가 힘들었음
sensor는 아니어서, argument가 다름
<구조>
A -
B -- ProcessingPartnerWork
C -
TriggerDagRunOperator
ex)
ML모델의 output에 따라 parent_dag를 Trigger하고 싶을 때가 있음 -> wait_for_completion
$ parent_dag_taskgroup.py from airflow.operators.bash import BashOperator from airflow.operators.subdag import subdag_factory from airflow.operators.trigger_dagrun import TriggerDagRunOperator from task_group import training_groups with DAG (): start=BashOperator("start", bash_command="echo 'start'") group_training_tasks=training_groups() process_ml = TriggerDagRunOperator( "process_ml", #trigger_dag_id="target_dag", trigger_dag_id="{{var.value.my_dag_id}}", conf={ 'path':'opt/airflow/ml' }, execution_date="{{ ds }}", #execution_date=datetime(2021,1,1), reset_dag_run=True, #false로 두면, same executiondate에 targetDag가 실행되지 않음 wait_for_completion=True, #triggered Dag가 완료될 때까지 기다림 poke_interval=60, allowed_states=["success"], failed_states=[] ) end=BashOperator("end", bash_command="echo 'end'") start >> group_training_tasks >> process_ml >> end --- # target_dag.py from airflow.models import DAG from airflow.operators.bash import BashOperator with DAG ('target_dag'): process_ml=BashOperator("process_ml", bash_command="echo '{{ dag_run.conf['path'] }}'") |
- 50. The famous ExternalTaskSensor 21min
TriggerDagRunOperator 가 가장 인기 있지만 ExternalTaskSensor 도 있음
다른 DAG가 끝날 때까지 기다리는 방법
Dag A
extract -> process
Dag B
extract -> process
DAG
done A --- star
done B -
$ parent_dag_taskgroup.py from airflow.operators.bash import BashOperator from airflow.operators.subdag import subdag_factory from airflow.operators.trigger_dagrun import TriggerDagRunOperator from task_group import training_groups with DAG (parent_dag): start=BashOperator("start", bash_command="echo 'start'") group_training_tasks=training_groups() end=BashOperator("end", bash_command="echo 'end'") start >> group_training_tasks >> end --- $ target_dag.py from airflow.sensors.external_task import ExternalTaskSensor from airflow.models import DAG from airflow.operators.bash import BashOperator with DAG ('target_dag'): waiting_end_parentd_dag = ExternalTaskSensor( 'waiting_end_parent_dag', external_dag_id='parent_dag', external_task_id='end' ) process_ml=BashOperator("process_ml", bash_command="echo 'processing ml'") waiting_end_parentd_dag >> process_ml |
이 경우에는 execution_date이 다르면 stuck될 수 있음
그렇기 때문에 똑같은 schedule_interval을 사용하는 것이 중요함
만약 parent와 target_dag의 interval이 다르면 어떡하냐?
execution_delta를 활용
이걸 활용하면 target
$ parent_dag.py with DAG ('target_dag', schedule_interval='10 * * * *'): $ target_dag.py from airflow.sensors.external_task import ExternalTaskSensor from airflow.models import DAG from airflow.operators.bash import BashOperator with DAG ('target_dag', schedule_interval='15 * * * *'): waiting_end_parentd_dag = ExternalTaskSensor( 'waiting_end_parent_dag', external_dag_id='parent_dag', external_task_id='end', execution_delta=timedelta(minutes=5) #-5분 으로 조정하는것 ) process_ml=BashOperator("process_ml", bash_command="echo 'processing ml'") waiting_end_parentd_dag >> process_ml |
아.. 이거 꼬이면 답없겠는데...
더 세부적으로 조정할 수 있음
$ target_dag.py from airflow.sensors.external_task import ExternalTaskSensor from airflow.models import DAG from airflow.operators.bash import BashOperator def _my_delta(execution_date): return [execution_date - timedelta(minutes=5), execution_date - timedelta(minutes=15)] with DAG ('target_dag', schedule_interval='15 * * * *'): waiting_end_parentd_dag = ExternalTaskSensor( 'waiting_end_parent_dag', external_dag_id='parent_dag', external_task_id='end', execution_date_fn=_my_delta ) process_ml=BashOperator("process_ml", bash_command="echo 'processing ml'") waiting_end_parentd_dag >> process_ml |
이것도 어려움
best_practice는 이거
$ target_dag.py from airflow.sensors.external_task import ExternalTaskSensor from airflow.models import DAG from airflow.operators.bash import BashOperator def _my_delta(execution_date): return [execution_date - timedelta(minutes=5), execution_date - timedelta(minutes=15)] with DAG ('target_dag', schedule_interval='15 * * * *'): waiting_end_parentd_dag = ExternalTaskSensor( 'waiting_end_parent_dag', external_dag_id='parent_dag', external_task_id='end', timeout=5*60, #5분으로 설정하고 mode='reschedule' #reschedule ) process_ml=BashOperator("process_ml", bash_command="echo 'processing ml'") waiting_end_parentd_dag >> process_ml |
51. DAG dependencies view 1min
Section 6: The Exotic Ones | 21min
특이한 Operator들
- 52. One DAG different schedules with the ShortCircuitOperator 8min
daily : A -> B -> C -> D(weekly)
$ short_dag.py from airflow.operators.dummy import DummyOperator from airflow.operators.python import ShortCircuitOperator def _is_monday(execution_date): return execution_date.weekday() == 0 with DAG('short_dag', schdeule_interval='@daily') as dag: task_a = DummyOperator('task_a') task_b = DummyOperator('task_b') task_c = DummyOperator('task_c') is_monday= ShortCircuitOperator( 'is_monday', 'python_callable=_is_monday ) task_d = DummyOperator('task_d') task_a >> task_b >> task_c >> is_monday>> task_d |
- 53. Prevent from running past tasks with the LatestOnlyOperator 7min
과거의 task를 또 실행하면, 예전것도 실행될 때가 있음
$ newlater_dag.py from airflow.operators.latest_only import LatestOnlyOperator with DAG('short_dag', schdeule_interval='@daily', catchUp=True) as dag: getting_data=DummyOperator("getting_data") creating_email=DummyOperator("creating_email") is_latest = LatestOnlyOperator("is_latest") sending_newsletter=DummyOperator("sending_newsletter") getting_data>> creating_email >> is_latest >> sending_newsletter |
- 54. Side Notes with the LatestOnlyOperator 1min
의견) catchup=False하면 되는거아닌가?
- 컨셉이 다름, 과거 작업을 실행하더라도 LatestOnlyOperator 실행하면 동작이 안됨
- 메일 보내기 처럼 같은 작업이 또 가면 안되는 경우에는 안전한
- 다만 manual에서는 동작이 되지 않음.
- Catchup = False and LastestOnlyOperator are different.
- Catchup=False prevents from running automatically past non triggered DagRuns.
- Whereas the LatestOnlyOperator enforces that tasks depending on it, cannot be run in the past.
- 55. You think you know the DummyOperator? 6min
1. DummyOperator는 executor를 거치지 않음.
- K8S Executor 사용할 때도 pod가 만들어지지 않음
- 왜냐하면 scheduler만 거치기 때문이기 때문
2. placeholder
- 테스트용으로 구조 잡을 때 사용함
$ dummy_dag.py from airflow.operators.dummy import DummyOperator from airflow.operators.bash import BashOperator task_a = BashOperator('task_a', bash_command='echo "task_a"') task_b = BashOperator('task_a', bash_command='echo "task_b"') task_c = BashOperator('task_a', bash_command='echo "task_c"') task_d = BashOperator('task_a', bash_command='echo "task_d"') dummy = DummyOperator('dummy') task_a >> dummy << task_b dummy >> task_c dummy >> task_d #task_a >> task_c #task_a >> task_d #task_b >> task_c #task_b >> task_c |
Section 7: Last Words! | 1min
-
56. Bonus Lecture 1min
드디어 끝남 ㅜㅜ
이제 실무에 적용하면서 소화시켜야겠음
728x90
반응형