728x90
반응형
airflow 공부하다가 궁금한게 잇어서, 하나씩 돌려봄 ㅎㅎ
요약
- task Context 공부하다가 의문이 생김
- 테스트
- **context, **kwargs 로 두개를 파라미터로 두면 어떻게 될까?
- execution_date를 변수로 명시하면 사용할 수 있다는데, **context 에서는 그럼 빠지는건가?
- execution_date 오버라이딩가능할까?
설명
1. task Context 공부하다가 의문이 생김
- execution_date를 변수로 명시하면 사용할 수 있다는데, **context 에서는 그럼 빠지는건가?
- **context, **kwargs 로 두개를 파라미터로 두면 어떻게 될까?
- execution_date 오버라이딩가능할까?
아래는 테스트할 때 사용할 기본 틀
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash import BashOperator
from airflow.operators.python_operator import PythonOperator
from time import sleep
from datetime import datetime, timedelta
def _my_func(**context):
print(f'context : {context}')
print(f'context["ds_nodash"] : {context["ds_nodash"]}')
print(f'context["prev_ds_nodash"]: {context["prev_ds_nodash"]}')
with DAG('sample-manual-test-hgkim',
description='really First DAG',
schedule_interval='@daily',
start_date=datetime(2021, 12, 12),
catchup=False,
tags=['sample','hgkim']
) as dag:
start_task = DummyOperator(task_id='start_task', retries=3)
end_task = DummyOperator(task_id='end_task', retries=3)
task_a = PythonOperator(
task_id='python_task',
python_callable= _my_func,
op_kwargs={"arg1":"hello"},
depends_on_past=True
)
start_task >> task_a >> end_task
2. 테스트
execution_date를 변수로 명시하면 사용할 수 있다는데, **context 에서는 그럼 빠지는건가?
-> 파라미터를 따로 명시하면, kwargs에서 제외됨
def _my_func(arg1, execution_date, **kwargs):
print(f'arg1 : {arg1}')
print(f'execution_date : {execution_date}') #kwargs에서 이게 빠지는지 확인
print(f'kwargs : {kwargs}')
[2022-05-03 00:32:18,780] {logging_mixin.py:109} INFO - arg1 : hello [2022-05-03 00:32:18,780] {logging_mixin.py:109} INFO - execution_date : 2022-05-03 [2022-05-03 00:32:18,780] {logging_mixin.py:109} INFO - kwargs : {'conf': <airflow.configuration.AirflowConfigParser object at 0x7fafdc85fc50>, 'dag': <DAG: sample-manual-test-hgkim>, 'dag_run': <DagRun sample-manual-test-hgkim @ 2022-05-03 00:30:46.136474+00:00: manual__2022-05-03T00:30:46.136474+00:00, externally triggered: True>, 'ds': '2022-05-03', 'ds_nodash': '20220503', 'inlets': [], 'macros': <module 'airflow.macros' from '/home/airflow/.local/lib/python3.6/site-packages/airflow/macros/__init__.py'>, 'next_ds': '2022-05-03', 'next_ds_nodash': '20220503', 'next_execution_date': DateTime(2022, 5, 3, 0, 30, 46, 136474, tzinfo=Timezone('+00:00')), 'outlets': [], 'params': {}, 'prev_ds': '2022-05-03', 'prev_ds_nodash': '20220503', 'prev_execution_date': DateTime(2022, 5, 3, 0, 30, 46, 136474, tzinfo=Timezone('+00:00')), 'prev_execution_date_success': <Proxy at 0x7fafd548e508 with factory <function TaskInstance.get_template_context.<locals>.<lambda> at 0x7fafd53e4048>>, 'prev_start_date_success': <Proxy at 0x7fafd53ebac8 with factory <function TaskInstance.get_template_context.<locals>.<lambda> at 0x7fafd53f5a60>>, 'run_id': 'manual__2022-05-03T00:30:46.136474+00:00', 'task': <Task(PythonOperator): python_task>, 'task_instance': <TaskInstance: sample-manual-test-hgkim.python_task 2022-05-03T00:30:46.136474+00:00 [running]>, 'task_instance_key_str': 'sample-manual-test-hgkim__python_task__20220503', 'test_mode': False, 'ti': <TaskInstance: sample-manual-test-hgkim.python_task 2022-05-03T00:30:46.136474+00:00 [running]>, 'tomorrow_ds': '2022-05-04', 'tomorrow_ds_nodash': '20220504', 'ts': '2022-05-03T00:30:46.136474+00:00', 'ts_nodash': '20220503T003046', 'ts_nodash_with_tz': '20220503T003046.136474+0000', 'var': {'json': None, 'value': None} , 'yesterday_ds': '2022-05-02', 'yesterday_ds_nodash': '20220502', 'templates_dict': None} |
execution_date 오버라이딩가능할까?
- kwargs값을 보면
- 'execution_date': '2022-05-03', # 오버라이딩 된 것 확인 (**kwargs에 없음)
- 'arg1': 'hello', # 추가 (**kwargs에 추가된 것 확인)
def _my_func(**kwargs):
print(f'kwargs : {kwargs}')
print(f'kwargs["arg1"] : {kwargs["arg1"]}')
print(f'kwargs["execution_date"] : {kwargs["execution_date"]}')
task_a = PythonOperator(
task_id='python_task',
python_callable= _my_func,
op_kwargs={
"arg1":"hello",
"execution_date": "{{execution_date.in_timezone('Asia/Seoul').strftime('%Y-%m-%d')}}"
},
depends_on_past=True
[2022-05-03 00:09:02,748] {logging_mixin.py:109} INFO - kwargs : {'conf': <airflow.configuration.AirflowConfigParser object at 0x7fb33fa46cc0>, 'dag': <DAG: sample-manual-test-hgkim>, 'dag_run': <DagRun sample-manual-test-hgkim @ 2022-05-03 00:08:56.645561+00:00: manual__2022-05-03T00:08:56.645561+00:00, externally triggered: True>, 'ds': '2022-05-03', 'ds_nodash': '20220503', 'execution_date': '2022-05-03', 'inlets': [], 'macros': <module 'airflow.macros' from '/home/airflow/.local/lib/python3.6/site-packages/airflow/macros/__init__.py'>, 'next_ds': '2022-05-03', 'next_ds_nodash': '20220503', 'next_execution_date': DateTime(2022, 5, 3, 0, 8, 56, 645561, tzinfo=Timezone('+00:00')), 'outlets': [], 'params': {}, 'prev_ds': '2022-05-03', 'prev_ds_nodash': '20220503', 'prev_execution_date': DateTime(2022, 5, 3, 0, 8, 56, 645561, tzinfo=Timezone('+00:00')), 'prev_execution_date_success': <Proxy at 0x7fb338608fc8 with factory <function TaskInstance.get_template_context.<locals>.<lambda> at 0x7fb338663e18>>, 'prev_start_date_success': <Proxy at 0x7fb3386080c8 with factory <function TaskInstance.get_template_context.<locals>.<lambda> at 0x7fb3385dc950>>, 'run_id': 'manual__2022-05-03T00:08:56.645561+00:00', 'task': <Task(PythonOperator): python_task>, 'task_instance': <TaskInstance: sample-manual-test-hgkim.python_task 2022-05-03T00:08:56.645561+00:00 [running]>, 'task_instance_key_str': 'sample-manual-test-hgkim__python_task__20220503', 'test_mode': False, 'ti': <TaskInstance: sample-manual-test-hgkim.python_task 2022-05-03T00:08:56.645561+00:00 [running]>, 'tomorrow_ds': '2022-05-04', 'tomorrow_ds_nodash': '20220504', 'ts': '2022-05-03T00:08:56.645561+00:00', 'ts_nodash': '20220503T000856', 'ts_nodash_with_tz': '20220503T000856.645561+0000', 'var': {'json': None, 'value': None}, 'yesterday_ds': '2022-05-02', 'yesterday_ds_nodash': '20220502', 'arg1': 'hello', 'templates_dict': None} [2022-05-03 00:09:02,748] {logging_mixin.py:109} INFO - kwargs["arg1"] : hello [2022-05-03 00:09:02,748] {logging_mixin.py:109} INFO - kwargs["execution_date"] : 2022-05-03 |
오버라이딩 헷갈리지 않으려면?
-> kwargs에서 정의한 변수는 명시적으로 함수에 넣어주면 됨
def _my_func(arg1,execution_date, **context):
print(f'arg1 : {arg1}')
print(f'execution_date : {execution_date}')
print(f'context : {context}')
t1 = PythonOperator(
task_id='my_task',
python_callable=func_my_task,
op_kwargs={
"arg1":"hello",
"execution_date": "{{execution_date.in_timezone('Asia/Seoul').strftime('%Y-%m-%d')}}"
},
dag=dag,
)
[2022-05-03 01:01:51,758] {logging_mixin.py:109} INFO - arg1 : hello [2022-05-03 01:01:51,758] {logging_mixin.py:109} INFO - execution_date : 2022-05-03 [2022-05-03 01:01:51,758] {logging_mixin.py:109} INFO - context : {'conf': <airflow.configuration.AirflowConfigParser object at 0x7f70f8e18cf8>, 'dag': <DAG: sample-manual-test-hgkim>, 'dag_run': <DagRun sample-manual-test-hgkim @ 2022-05-03 01:01:45.637966+00:00: manual__2022-05-03T01:01:45.637966+00:00, externally triggered: True>, 'ds': '2022-05-03', 'ds_nodash': '20220503', 'inlets': [], 'macros': <module 'airflow.macros' from '/home/airflow/.local/lib/python3.6/site-packages/airflow/macros/__init__.py'>, 'next_ds': '2022-05-03', 'next_ds_nodash': '20220503', 'next_execution_date': DateTime(2022, 5, 3, 1, 1, 45, 637966, tzinfo=Timezone('+00:00')), 'outlets': [], 'params': {}, 'prev_ds': '2022-05-03', 'prev_ds_nodash': '20220503', 'prev_execution_date': DateTime(2022, 5, 3, 1, 1, 45, 637966, tzinfo=Timezone('+00:00')), 'prev_execution_date_success': <Proxy at 0x7f70f19dae88 with factory <function TaskInstance.get_template_context.<locals>.<lambda> at 0x7f70f1a37e18>>, 'prev_start_date_success': <Proxy at 0x7f70f19a0e48 with factory <function TaskInstance.get_template_context.<locals>.<lambda> at 0x7f70f19ac950>>, 'run_id': 'manual__2022-05-03T01:01:45.637966+00:00', 'task': <Task(PythonOperator): python_task>, 'task_instance': <TaskInstance: sample-manual-test-hgkim.python_task 2022-05-03T01:01:45.637966+00:00 [running]>, 'task_instance_key_str': 'sample-manual-test-hgkim__python_task__20220503', 'test_mode': False, 'ti': <TaskInstance: sample-manual-test-hgkim.python_task 2022-05-03T01:01:45.637966+00:00 [running]>, 'tomorrow_ds': '2022-05-04', 'tomorrow_ds_nodash': '20220504', 'ts': '2022-05-03T01:01:45.637966+00:00', 'ts_nodash': '20220503T010145', 'ts_nodash_with_tz': '20220503T010145.637966+0000', 'var': {'json': None, 'value': None}, 'yesterday_ds': '2022-05-02', 'yesterday_ds_nodash': '20220502', 'templates_dict': None} [2022-05-03 01:01:51,758] {python.py:151} INFO - Done. Returned value was: None |
**context, **kwargs 로 두개를 파라미터로 두면 어떻게 될까?
-> ㄴㄴ 에러남
**kwargs가 실제 변수 이름이지만, text context라는 것을 표현하기 위해 **context로 많이 사용함
+ web UI에서 op_kwargs 미리 확인하기
renederd template 으로 kwargs를 미리 볼 수 있음
op_kwargs
{
"arg1": "hello",
"execution_date": "2022-05-03"
}
공부하다 그냥 궁금해서 정리해봄 ㅎ
728x90
반응형
'Data > Airflow' 카테고리의 다른 글
Slipp) Airflow2.0스터디_워크플로 트러거_4주차(6장) (0) | 2022.05.28 |
---|---|
Slipp) Airflow2.0 스터디_3주차(4/5장) (0) | 2022.05.15 |
Airflow) 2.0부터는 scheduler 는 replica 여러개 설정 추천 (0) | 2022.04.28 |
Slipp) Airflow2.0 스터디_2주차(3장) Airflow의 스케줄링 (2) | 2022.04.28 |
Slipp) Airflow2.0 스터디_1주차(사용 경험썰풀기) (0) | 2022.04.19 |