Data/Airflow

udemy)Airflow Operators Guide_5,6,7장 :: mightytedkim

MightyTedKim 2022. 3. 1. 23:10
728x90
반응형
 

3/1절 연휴동안 다 들을라했는데, 웹툰보고 놀다가 못함 ㅜㅜ 

이번주 평일에 시간을 내야겠음

Section 5: DAG dependencies  | 1hr 7min

  • 45. Introduction 1min
    1. 파트너A/B/C에게서 데이터를 받는데, 각 파트너마다 dag가 있다고 치자
    2. 다 완료되면, 특정 공통된 task를 실행한다.
     
  • 46. SubDagOperator 11min
복잡한 dag가 있는데, 특정 로직을 공유한다고 치자.
이 때 그룹핑할 수 있고, 시각적으로 명확하게 하는 역할을 한다.
 
장단점이 있음.
1. 시각화하려고 새로운 dag를 만들어야함
2. 새로운 dag를  또 후출해야하는 불편함이 있음
 
default_args는 똑같아야함. 부모/자식 간의 설정
dag_id도 명확하게 규정되야함.
 
worker_slot도 필요함
 
pool을 정의했다면, sub_dag가 공유하지 않을 수도 있음'
 
아래처럼 end를 a/b/c 이후에 공통으로 실행한다면, 
 
아래처럼 그루핑을 할 수 있음
$ parent_dag.py

from airflow.operators.bash import BashOperator
from airflow.operators.subdag import SubDagOperator
from subdag import subdag_factory

with DAG ():
  start=BashOperator("start", bash_command="echo 'start'")

  group_training_tasks=SubDagOperator(
    "grouping_training_tasks",
    sub_dag=subdag_factory('parent_dag', 'grouping_training_tasks', default_args)
  )

  #trainning_a=BashOperator("trainning_a", bash_command="echo 'trainning_a'")
  #trainning_b=BashOperator("trainning_b", bash_command="echo 'trainning_b'")
  #trainning_c=BashOperator("trainning_c", bash_command="echo 'trainning_c'")

  end=BashOperator("end", bash_command="echo 'end'")

#start >> [training_a, training_b. training_c] >> end
#start >> group_training_tasks >> end


--------------------------
$ subdag.py
from airflow.models import DAG
from airflow.operators.bash import BashOperator

def subdag_factory(parent_dag_id, subdag_id, default_args):

  with DAG(f"{parent_dag_id}.{sub_dag_id}", default_args=default_args) as dag:
    trainning_a=BashOperator("trainning_a", bash_command="echo 'trainning_a'")
    trainning_b=BashOperator("trainning_b", bash_command="echo 'trainning_b'")
    trainning_c=BashOperator("trainning_c", bash_command="echo 'trainning_c'")
return dag

  • 47. Deep dive into the SubDagOperator 11min
subdag의 deadlock 이해
 
테스트를 위해 airflow pareallesim을 3로 설정하고 grouping task 3개를 아래처럼 설정
 
# airflow config -> AIRFLOW__CORE__PAREALLELISM=3

$ parent_dag.py

from airflow.operators.bash import BashOperator
from airflow.operators.subdag import SubDagOperator
from subdag import subdag_factory

with DAG ():
  start=BashOperator("start", bash_command="echo 'start'")

 group_training_tasks=SubDagOperator(
    "grouping_training_tasks",
    sub_dag=subdag_factory('parent_dag', 'grouping_training_tasks', default_args)
  )
  group_training_tasks=SubDagOperator(
    "grouping_training_tasks_2",
    sub_dag=subdag_factory('parent_dag', 'grouping_training_tasks_2,' default_args)
  )
  group_training_tasks=SubDagOperator(
    "grouping_training_tasks_3",
    sub_dag=subdag_factory('parent_dag', 'grouping_training_tasks_3', default_args)
  )

start >> [group_training_tasks,group_training_tasks_2,group_training_tasks_3] >> end
subdag 도 task라서 그룹핑하는데 이미 task3개를 점유하고 있는거임.
그래서 뒤에 task들이 실행되지 않는 문제가 발생함
 
 
이걸 해결하기 위해서는 mode 설정을 변경해야함
아래 예시는 5분뒤에 reschedule하라는 것
 
group_training_tasks=SubDagOperator(
    "grouping_training_tasks_3",
    sub_dag=subdag_factory('parent_dag', 'grouping_training_tasks_3', default_args),
    mode="reschedule",
    timeout=60*5 #5분
)

pool을 이용하면 안되냐 궁금해할 수 있는데, 가능함

하지만 subDagOpertor만 가능하고, 실질적인 로직이 실행되는 task에는 영향을 미치지 않음

 
group_training_tasks=SubDagOperator(
    "grouping_training_tasks_3",
    sub_dag=subdag_factory('parent_dag', 'grouping_training_tasks_3', default_args),
    mode="reschedule",
    pool=training_pool# subdag에만 적용됨, 다시 말해 training_a/b/c는 해당 pool에서 실행 안됨
)
그럼 또 어떻게 하냐, subdag.py에 설정을 할 수 있음
$ subdag.py
from airflow.models import DAG
from airflow.operators.bash import BashOperator

def subdag_factory(parent_dag_id, subdag_id, default_args):
  default_args['pools'] = training_pool
  with DAG(f"{parent_dag_id}.{sub_dag_id}", default_args=default_args) as dag:
    trainning_a=BashOperator("trainning_a", bash_command="echo 'trainning_a'")
    trainning_b=BashOperator("trainning_b", bash_command="echo 'trainning_b'")
    trainning_c=BashOperator("trainning_c", bash_command="echo 'trainning_c'")
return dag
편하려고 쓰려했는데, 점점 더 산으로 가는 느낌이 듦 ㅜㅜ
 
---
아래 예시는 subdag로 parameter 전달하는것
 
 
group_training_tasks=SubDagOperator(
    "grouping_training_tasks_3",
    sub_dag=subdag_factory('parent_dag', 'grouping_training_tasks_3', default_args),
    mode="reschedule",
    conf={'output':'/pot/airflow/ml'}
)

$ subdag.py
from airflow.models import DAG
from airflow.operators.bash import BashOperator

def subdag_factory(parent_dag_id, subdag_id, default_args):
  with DAG(f"{parent_dag_id}.{sub_dag_id}", default_args=default_args) as dag:
    trainning_a=BashOperator("trainning_a", bash_command="echo '{{dag_run.conf['output']}}'")
    trainning_b=BashOperator("trainning_b", bash_command="echo 'trainning_b'")
    trainning_c=BashOperator("trainning_c", bash_command="echo 'trainning_c'")
return dag

 
propagate_skipped_state=False 라는 설정값도 있음
- subdag에 skip이 있으면 다음 task가 실행되지 않음 
(이건 사실 이해가 잘 안감)
 
group_training_tasks=SubDagOperator(
    "grouping_training_tasks_3",
    sub_dag=subdag_factory('parent_dag', 'grouping_training_tasks_3', default_args),
    mode="reschedule",
    conf={'output':'/pot/airflow/ml'},
    propagate_skipped_state=False #default값이 False
)

강의에서도 차라리 TaskGroups를 쓰라고 함. 똑같은데 더 쉬워서

  • 48. Discover the TaskGroups 7min

subdag Operator 너무 어려움. 그래서 다른 것을 소개해줌

airflow2.0에 taskGroups가 나옴

 

$ parent_dag_taskgroup.py

from airflow.operators.bash import BashOperator
from subdag import subdag_factory

from task_group import training_groups

with DAG ():
  start=BashOperator("start", bash_command="echo 'start'")
  group_training_tasks=training_groups()
  end=BashOperator("end", bash_command="echo 'end'")

  start >> group_training_tasks >> end


--------------------------
$ task_group.py
from airflow.models import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.task_group import TaskGroup

def training_groups():

  with TaskGroup("training_tasks") as training_tasks:
    trainning_a=BashOperator("trainning_a", bash_command="echo 'trainning_a'")
    trainning_b=BashOperator("trainning_b", bash_command="echo 'trainning_b'")
    trainning_c=BashOperator("trainning_c", bash_command="echo 'trainning_c'")

    with TaskGroup("publish_tasks") as publish_tasks:
      publish_a=BashOperator("publish_a", bash_command="echo 'publish_a'")
      publish_b=BashOperator("publish_b", bash_command="echo 'publish_b'")
      publish_c=BashOperator("publish_c", bash_command="echo 'publish_c'")
return trainging_tasks
  • 49. TriggerDagRunOperator 16min 
dag dependency 관리 : 실무 예시
동일한 코드를 A,B,C에 넣기 싫어서, 따로 정의할 수 있음
이 operator가 나오기 전까지는 관리가 힘들었음
sensor는 아니어서, argument가 다름
 
<구조>
A  -
B  -- ProcessingPartnerWork
C  - 
 
 
TriggerDagRunOperator
ex)
  ML모델의 output에 따라 parent_dag를 Trigger하고 싶을 때가 있음 -> wait_for_completion
 
 
$ parent_dag_taskgroup.py

from airflow.operators.bash import BashOperator
from airflow.operators.subdag import subdag_factory
from airflow.operators.trigger_dagrun import TriggerDagRunOperator

from task_group import training_groups

with DAG ():
  start=BashOperator("start", bash_command="echo 'start'")

  group_training_tasks=training_groups()
  process_ml = TriggerDagRunOperator(
    "process_ml",
    #trigger_dag_id="target_dag",
    trigger_dag_id="{{var.value.my_dag_id}}",
    conf={
      'path':'opt/airflow/ml'
    }, 
    execution_date="{{ ds }}", #execution_date=datetime(2021,1,1),
    reset_dag_run=True, #false로 두면, same executiondate에 targetDag가 실행되지 않음
    wait_for_completion=True, #triggered Dag가 완료될 때까지 기다림
    poke_interval=60,
    allowed_states=["success"],
    failed_states=[]
 
  )
  end=BashOperator("end", bash_command="echo 'end'")

  start >> group_training_tasks >> process_ml  >> end

---
# target_dag.py

from airflow.models import DAG
from airflow.operators.bash import BashOperator

with DAG ('target_dag'):
  process_ml=BashOperator("process_ml", bash_command="echo '{{ dag_run.conf['path'] }}'")
  • 50. The famous ExternalTaskSensor 21min
TriggerDagRunOperator 가 가장 인기 있지만 ExternalTaskSensor 도 있음
다른 DAG가 끝날 때까지 기다리는 방법
 
Dag A
extract -> process
 
Dag B
extract -> process
 
 
DAG
done A  --- star
done B  -
 
 
$ parent_dag_taskgroup.py

from airflow.operators.bash import BashOperator
from airflow.operators.subdag import subdag_factory
from airflow.operators.trigger_dagrun import TriggerDagRunOperator

from task_group import training_groups

with DAG (parent_dag):
  start=BashOperator("start", bash_command="echo 'start'")

  group_training_tasks=training_groups()
  end=BashOperator("end", bash_command="echo 'end'")

  start >> group_training_tasks >> end

---
$ target_dag.py

from airflow.sensors.external_task import ExternalTaskSensor
from airflow.models import DAG
from airflow.operators.bash import BashOperator

with DAG ('target_dag'):
  waiting_end_parentd_dag = ExternalTaskSensor(
    'waiting_end_parent_dag',
    external_dag_id='parent_dag',
    external_task_id='end'
  )
  process_ml=BashOperator("process_ml", bash_command="echo 'processing ml'")

  waiting_end_parentd_dag  >> process_ml
이 경우에는 execution_date이 다르면 stuck될 수 있음
그렇기 때문에 똑같은 schedule_interval을 사용하는 것이 중요함
 
만약 parent와 target_dag의 interval이 다르면 어떡하냐?
 
execution_delta를 활용
이걸 활용하면 target
$ parent_dag.py

with DAG ('target_dag', schedule_interval='10 * * * *'):


$ target_dag.py

from airflow.sensors.external_task import ExternalTaskSensor
from airflow.models import DAG
from airflow.operators.bash import BashOperator

with DAG ('target_dag', schedule_interval='15 * * * *'):
  waiting_end_parentd_dag = ExternalTaskSensor(
    'waiting_end_parent_dag',
    external_dag_id='parent_dag',
    external_task_id='end',
    execution_delta=timedelta(minutes=5) #-5분  으로 조정하는것

  )
  process_ml=BashOperator("process_ml", bash_command="echo 'processing ml'")

  waiting_end_parentd_dag  >> process_ml
아.. 이거 꼬이면 답없겠는데...
더 세부적으로 조정할 수 있음 
 

$ target_dag.py

from airflow.sensors.external_task import ExternalTaskSensor
from airflow.models import DAG
from airflow.operators.bash import BashOperator

def _my_delta(execution_date):
  return [execution_date - timedelta(minutes=5), execution_date - timedelta(minutes=15)]
 

with DAG ('target_dag', schedule_interval='15 * * * *'):
  waiting_end_parentd_dag = ExternalTaskSensor(
    'waiting_end_parent_dag',
    external_dag_id='parent_dag',
    external_task_id='end',
    execution_date_fn=_my_delta

  )
  process_ml=BashOperator("process_ml", bash_command="echo 'processing ml'")

  waiting_end_parentd_dag  >> process_ml

이것도 어려움

best_practice는 이거

$ target_dag.py

from airflow.sensors.external_task import ExternalTaskSensor
from airflow.models import DAG
from airflow.operators.bash import BashOperator

def _my_delta(execution_date):
  return [execution_date - timedelta(minutes=5), execution_date - timedelta(minutes=15)]
 

with DAG ('target_dag', schedule_interval='15 * * * *'):
  waiting_end_parentd_dag = ExternalTaskSensor(
    'waiting_end_parent_dag',
    external_dag_id='parent_dag',
    external_task_id='end',
    timeout=5*60, #5분으로 설정하고
    mode='reschedule' #reschedule
  )
  process_ml=BashOperator("process_ml", bash_command="echo 'processing ml'")

  waiting_end_parentd_dag  >> process_ml

 51. DAG dependencies view 1min

 

Section 6: The Exotic Ones  | 21min

특이한 Operator들

  • 52. One DAG different schedules with the ShortCircuitOperator 8min
daily : A -> B -> C -> D(weekly)
 
$ short_dag.py

from airflow.operators.dummy import DummyOperator
from airflow.operators.python import ShortCircuitOperator

def _is_monday(execution_date):
  return execution_date.weekday() == 0

with DAG('short_dag', schdeule_interval='@daily') as dag:

  task_a = DummyOperator('task_a')
  task_b = DummyOperator('task_b')
  task_c = DummyOperator('task_c')

  is_monday= ShortCircuitOperator(
    'is_monday',
    'python_callable=_is_monday
  )

  task_d = DummyOperator('task_d')

  task_a >> task_b >> task_c >> is_monday>> task_d

  
  • 53. Prevent from running past tasks with the LatestOnlyOperator 7min
과거의 task를 또 실행하면, 예전것도 실행될 때가 있음 
 
$ newlater_dag.py

from airflow.operators.latest_only import LatestOnlyOperator

with DAG('short_dag', schdeule_interval='@daily', catchUp=True) as dag:
  getting_data=DummyOperator("getting_data")
  creating_email=DummyOperator("creating_email")
  
  is_latest = LatestOnlyOperator("is_latest")

  sending_newsletter=DummyOperator("sending_newsletter")

  getting_data>> creating_email >> is_latest >> sending_newsletter


  • 54. Side Notes with the LatestOnlyOperator 1min

 의견) catchup=False하면 되는거아닌가?

- 컨셉이 다름, 과거 작업을 실행하더라도 LatestOnlyOperator 실행하면 동작이 안됨
- 메일 보내기 처럼 같은 작업이 또 가면 안되는 경우에는 안전한
- 다만 manual에서는 동작이 되지 않음.
  • Catchup = False and LastestOnlyOperator are different.
    • Catchup=False prevents from running automatically past non triggered DagRuns.
    • Whereas the LatestOnlyOperator enforces that tasks depending on it, cannot be run in the past. 
  • 55. You think you know the DummyOperator? 6min
1. DummyOperator는 executor를 거치지 않음.
- K8S Executor 사용할 때도 pod가 만들어지지 않음
- 왜냐하면 scheduler만 거치기 때문이기 때문
 
2. placeholder
- 테스트용으로 구조 잡을 때 사용함
 
​$ dummy_dag.py

from airflow.operators.dummy import DummyOperator
from airflow.operators.bash import BashOperator


  task_a = BashOperator('task_a', bash_command='echo "task_a"')
  task_b = BashOperator('task_a', bash_command='echo "task_b"')
  task_c = BashOperator('task_a', bash_command='echo "task_c"')
  task_d = BashOperator('task_a', bash_command='echo "task_d"')
  dummy = DummyOperator('dummy')

task_a >> dummy << task_b

dummy >> task_c
dummy >> task_d

#task_a >> task_c
#task_a >> task_d
#task_b >> task_c
#task_b >> task_c

Section 7: Last Words! | 1min

  • 56. Bonus Lecture 1min

 

드디어 끝남 ㅜㅜ 

이제 실무에 적용하면서 소화시켜야겠음

728x90
반응형