'데이터 엔지니어'로 성장하기

정리하는 걸 좋아하고, 남이 읽으면 더 좋아함

Data/Airflow

Airflow) context, kwargs 혼용 실습

MightyTedKim 2022. 5. 9. 21:07
728x90
반응형

airflow 공부하다가 궁금한게 잇어서, 하나씩 돌려봄 ㅎㅎ

요약

  1. task Context 공부하다가 의문이 생김
  2.  테스트
    1. **context, **kwargs 로 두개를 파라미터로 두면 어떻게 될까?
    2.  execution_date를 변수로 명시하면 사용할 수 있다는데, **context 에서는 그럼 빠지는건가?
    3.  execution_date 오버라이딩가능할까?

설명

1. task Context 공부하다가 의문이 생김

- execution_date를 변수로 명시하면 사용할 수 있다는데, **context 에서는 그럼 빠지는건가?

- **context, **kwargs 로 두개를 파라미터로 두면 어떻게 될까?

- execution_date 오버라이딩가능할까?

아래는 테스트할 때 사용할 기본 틀

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash import BashOperator
from airflow.operators.python_operator import PythonOperator

from time import sleep
from datetime import datetime, timedelta

def _my_func(**context):
  print(f'context                  : {context}')
  print(f'context["ds_nodash"]     : {context["ds_nodash"]}')
  print(f'context["prev_ds_nodash"]: {context["prev_ds_nodash"]}')

with DAG('sample-manual-test-hgkim',
  description='really First DAG',
  schedule_interval='@daily',
  start_date=datetime(2021, 12, 12),
  catchup=False,
  tags=['sample','hgkim']
) as dag:

  start_task = DummyOperator(task_id='start_task', retries=3)
  end_task = DummyOperator(task_id='end_task', retries=3)

  task_a = PythonOperator(
    task_id='python_task',
    python_callable= _my_func,
    op_kwargs={"arg1":"hello"},
    depends_on_past=True
  )

  start_task >> task_a >> end_task

 

2. 테스트

execution_date를 변수로 명시하면 사용할 수 있다는데, **context 에서는 그럼 빠지는건가?

-> 파라미터를 따로 명시하면, kwargs에서 제외됨

def _my_func(arg1, execution_date, **kwargs): 
  print(f'arg1                     : {arg1}')
  print(f'execution_date           : {execution_date}') #kwargs에서 이게 빠지는지 확인
  print(f'kwargs                   : {kwargs}')
[2022-05-03 00:32:18,780] {logging_mixin.py:109} INFO - arg1                     : hello
[2022-05-03 00:32:18,780] {logging_mixin.py:109} INFO - execution_date           : 2022-05-03
[2022-05-03 00:32:18,780] {logging_mixin.py:109} INFO - kwargs                   : {'conf': <airflow.configuration.AirflowConfigParser object at 0x7fafdc85fc50>, 'dag': <DAG: sample-manual-test-hgkim>, 'dag_run': <DagRun sample-manual-test-hgkim @ 2022-05-03 00:30:46.136474+00:00: manual__2022-05-03T00:30:46.136474+00:00, externally triggered: True>, 'ds': '2022-05-03', 'ds_nodash': '20220503', 'inlets': [], 'macros': <module 'airflow.macros' from '/home/airflow/.local/lib/python3.6/site-packages/airflow/macros/__init__.py'>, 'next_ds': '2022-05-03', 'next_ds_nodash': '20220503', 'next_execution_date': DateTime(2022, 5, 3, 0, 30, 46, 136474, tzinfo=Timezone('+00:00')), 'outlets': [], 'params': {}, 'prev_ds': '2022-05-03', 'prev_ds_nodash': '20220503', 'prev_execution_date': DateTime(2022, 5, 3, 0, 30, 46, 136474, tzinfo=Timezone('+00:00')), 'prev_execution_date_success': <Proxy at 0x7fafd548e508 with factory <function TaskInstance.get_template_context.<locals>.<lambda> at 0x7fafd53e4048>>, 'prev_start_date_success': <Proxy at 0x7fafd53ebac8 with factory <function TaskInstance.get_template_context.<locals>.<lambda> at 0x7fafd53f5a60>>, 'run_id': 'manual__2022-05-03T00:30:46.136474+00:00', 'task': <Task(PythonOperator): python_task>, 'task_instance': <TaskInstance: sample-manual-test-hgkim.python_task 2022-05-03T00:30:46.136474+00:00 [running]>, 'task_instance_key_str': 'sample-manual-test-hgkim__python_task__20220503', 'test_mode': False, 'ti': <TaskInstance: sample-manual-test-hgkim.python_task 2022-05-03T00:30:46.136474+00:00 [running]>, 'tomorrow_ds': '2022-05-04', 'tomorrow_ds_nodash': '20220504', 'ts': '2022-05-03T00:30:46.136474+00:00', 'ts_nodash': '20220503T003046', 'ts_nodash_with_tz': '20220503T003046.136474+0000', 'var': {'json': None, 'value': None}
    , 'yesterday_ds': '2022-05-02', 'yesterday_ds_nodash': '20220502', 'templates_dict': None}

execution_date 오버라이딩가능할까?

  • kwargs값을 보면
    • 'execution_date': '2022-05-03', # 오버라이딩 된 것 확인 (**kwargs에 없음)
    • 'arg1': 'hello', # 추가 (**kwargs에 추가된 것 확인)
def _my_func(**kwargs):
  print(f'kwargs                   : {kwargs}')
  print(f'kwargs["arg1"]           : {kwargs["arg1"]}')
  print(f'kwargs["execution_date"] : {kwargs["execution_date"]}')

  task_a = PythonOperator(
    task_id='python_task',
    python_callable= _my_func,
    op_kwargs={
        "arg1":"hello",
        "execution_date": "{{execution_date.in_timezone('Asia/Seoul').strftime('%Y-%m-%d')}}" 
    },
    depends_on_past=True

 

[2022-05-03 00:09:02,748] {logging_mixin.py:109} INFO - kwargs                   : {'conf': <airflow.configuration.AirflowConfigParser object at 0x7fb33fa46cc0>, 'dag': <DAG: sample-manual-test-hgkim>, 'dag_run': <DagRun sample-manual-test-hgkim @ 2022-05-03 00:08:56.645561+00:00: manual__2022-05-03T00:08:56.645561+00:00, externally triggered: True>, 'ds': '2022-05-03', 'ds_nodash': '20220503', 'execution_date': '2022-05-03',
 'inlets': [], 'macros': <module 'airflow.macros' from '/home/airflow/.local/lib/python3.6/site-packages/airflow/macros/__init__.py'>, 'next_ds': '2022-05-03', 'next_ds_nodash': '20220503', 'next_execution_date': DateTime(2022, 5, 3, 0, 8, 56, 645561, tzinfo=Timezone('+00:00')), 'outlets': [], 'params': {}, 'prev_ds': '2022-05-03', 'prev_ds_nodash': '20220503', 'prev_execution_date': DateTime(2022, 5, 3, 0, 8, 56, 645561, tzinfo=Timezone('+00:00')), 'prev_execution_date_success': <Proxy at 0x7fb338608fc8 with factory <function TaskInstance.get_template_context.<locals>.<lambda> at 0x7fb338663e18>>, 'prev_start_date_success': <Proxy at 0x7fb3386080c8 with factory <function TaskInstance.get_template_context.<locals>.<lambda> at 0x7fb3385dc950>>, 'run_id': 'manual__2022-05-03T00:08:56.645561+00:00', 'task': <Task(PythonOperator): python_task>, 'task_instance': <TaskInstance: sample-manual-test-hgkim.python_task 2022-05-03T00:08:56.645561+00:00 [running]>, 'task_instance_key_str': 'sample-manual-test-hgkim__python_task__20220503', 'test_mode': False, 'ti': <TaskInstance: sample-manual-test-hgkim.python_task 2022-05-03T00:08:56.645561+00:00 [running]>, 'tomorrow_ds': '2022-05-04', 'tomorrow_ds_nodash': '20220504', 'ts': '2022-05-03T00:08:56.645561+00:00', 'ts_nodash': '20220503T000856', 'ts_nodash_with_tz': '20220503T000856.645561+0000', 'var': {'json': None, 'value': None},
    'yesterday_ds': '2022-05-02', 'yesterday_ds_nodash': '20220502', 'arg1': 'hello',
 'templates_dict': None}

[2022-05-03 00:09:02,748] {logging_mixin.py:109} INFO - kwargs["arg1"]           : hello
[2022-05-03 00:09:02,748] {logging_mixin.py:109} INFO - kwargs["execution_date"] : 2022-05-03

오버라이딩 헷갈리지 않으려면?

-> kwargs에서 정의한 변수는 명시적으로 함수에 넣어주면 됨

def _my_func(arg1,execution_date, **context):
  print(f'arg1                     : {arg1}')
  print(f'execution_date           : {execution_date}')
  print(f'context                  : {context}')

t1 = PythonOperator(
    task_id='my_task',
    python_callable=func_my_task,
    op_kwargs={
        "arg1":"hello",
        "execution_date": "{{execution_date.in_timezone('Asia/Seoul').strftime('%Y-%m-%d')}}" 
    },
    dag=dag,
)

[2022-05-03 01:01:51,758] {logging_mixin.py:109} INFO - arg1                     : hello
[2022-05-03 01:01:51,758] {logging_mixin.py:109} INFO - execution_date           : 2022-05-03

[2022-05-03 01:01:51,758] {logging_mixin.py:109} INFO - context                  : {'conf': <airflow.configuration.AirflowConfigParser object at 0x7f70f8e18cf8>, 'dag': <DAG: sample-manual-test-hgkim>, 'dag_run': <DagRun sample-manual-test-hgkim @ 2022-05-03 01:01:45.637966+00:00: manual__2022-05-03T01:01:45.637966+00:00, externally triggered: True>, 'ds': '2022-05-03', 'ds_nodash': '20220503', 'inlets': [], 'macros': <module 'airflow.macros' from '/home/airflow/.local/lib/python3.6/site-packages/airflow/macros/__init__.py'>, 'next_ds': '2022-05-03', 'next_ds_nodash': '20220503', 'next_execution_date': DateTime(2022, 5, 3, 1, 1, 45, 637966, tzinfo=Timezone('+00:00')), 'outlets': [], 'params': {}, 'prev_ds': '2022-05-03', 'prev_ds_nodash': '20220503', 'prev_execution_date': DateTime(2022, 5, 3, 1, 1, 45, 637966, tzinfo=Timezone('+00:00')), 'prev_execution_date_success': <Proxy at 0x7f70f19dae88 with factory <function TaskInstance.get_template_context.<locals>.<lambda> at 0x7f70f1a37e18>>, 'prev_start_date_success': <Proxy at 0x7f70f19a0e48 with factory <function TaskInstance.get_template_context.<locals>.<lambda> at 0x7f70f19ac950>>, 'run_id': 'manual__2022-05-03T01:01:45.637966+00:00', 'task': <Task(PythonOperator): python_task>, 'task_instance': <TaskInstance: sample-manual-test-hgkim.python_task 2022-05-03T01:01:45.637966+00:00 [running]>, 'task_instance_key_str': 'sample-manual-test-hgkim__python_task__20220503', 'test_mode': False, 'ti': <TaskInstance: sample-manual-test-hgkim.python_task 2022-05-03T01:01:45.637966+00:00 [running]>, 'tomorrow_ds': '2022-05-04', 'tomorrow_ds_nodash': '20220504', 'ts': '2022-05-03T01:01:45.637966+00:00', 'ts_nodash': '20220503T010145', 'ts_nodash_with_tz': '20220503T010145.637966+0000', 'var': {'json': None, 'value': None}, 'yesterday_ds': '2022-05-02', 'yesterday_ds_nodash': '20220502', 'templates_dict': None}

[2022-05-03 01:01:51,758] {python.py:151} INFO - Done. Returned value was: None

**context, **kwargs 로 두개를 파라미터로 두면 어떻게 될까?

 -> ㄴㄴ 에러남

 

 **kwargs가 실제 변수 이름이지만, text context라는 것을 표현하기 위해 **context로 많이 사용함


+ web UI에서 op_kwargs 미리 확인하기

renederd template 으로 kwargs를 미리 볼 수 있음

op_kwargs
{
    "arg1": "hello",
    "execution_date": "2022-05-03" 
}

 

 

공부하다 그냥 궁금해서 정리해봄 ㅎ

728x90
반응형