udemy)Airflow Operators Guide_3,4장 :: mightytedkim
SECTION 3: The most Common Operator | 1hr 12min
- 32. Introduction to Providers
provider package
- 제3자가 제공하는 것을 바로 적용할 수 있음
- 장점 : airflow의 업데이트를 기다릴 필요없음
s3의 경우 amz provider를 설치하면됨
- pip install apache-airflow-providers-amazon
설명은 operator 가이드에 들어가서 보면됨
astronomer의 홈페이지
Provider 설치 예시
- requiremnets.txt
airflow-provider-great-expectations==0.0.6
x-airflow-common이 먼저 실행되는데
image 부분에 build를 이용하면 됨 -> 그런데 그냥 이미지 빌드된거 쓸듯
33. The PythonOperator 15min
생각보다 우리는 pythonOperator 잘 모름
|
def _proccess(path, filename):
print(f"{path}/{filename}"
PythonOperator(
python_callable=_proccess,
op_args=['/user/local/airflow', 'data.csv']
)
|
def _proccess(path, filename):
print(f"{path}/{filename}"
PythonOperator(
python_callable=_proccess,
op_kargs={
'path': '/user/local/airflow',
'filename': 'data.csv'
)
|
- path: /user/local/airflow
- filename: data.csv
PythonOperator(
python_callable=_proccess,
op_kargs={
'path': '{{var.value.path}}',
'filename': '{{var.value.filename}}'
)
|
- my_settings: {"path":"/user/local/airflow", "filename":"data.csv"}
PythonOperator(
python_callable=_proccess,
op_kargs=Variable.get("my_settings", deserialize_json=True)
)
|
def _proccess(path, filename, **context):
print(f"{path}/{filename} - {context['ds']}")
def _proccess(path, filename, ti):
def _proccess(path, filename, ds)
|
- 34. The PythonOperator with the TaskFlow API 7min
from airflow.decorators import task
@task(task_id="task_a")
def process(my_settings):
print(f"{path}/{filename} - {my_settings['filename']}")
process(Variabel.get("my_settings", deserialize_json=True)) |
get_current_context 로 text context 가져오기
from airflow.decorators import task from airflow.operators.python import get_current_context @task(task_id="task_a")
def process(my_settings):
context = get_current_context print(f"{path}/{filename} - {my_settings['filename']} - {context['ds']}")
process(Variabel.get("my_settings", deserialize_json=True)) |
혼합해서 사용하는 예시
from airflow.decorators import task from airflow.operators.python import get_current_context @task(task_id="task_a")
def process(my_settings):
context = get_current_context print(f"{path}/{filename} - {my_settings['filename']} - {context['ds']}")
store = BashoOperator( task_id="store", bash_command="echo 'store'")process(Variabel.get("my_settings", deserialize_json=True)) >> store |
- 35. The BashOperator 15min
자만하지 않기, bash 도 잘 모르면서 다른거 사용하려하지 말기
from airflow.operators.bash import BashOperator with DAG('my_bash_dag', start_date=datetime(2021,1,1), schedule_interval='@daily', catchup=False) as dag: execute_command = BashOperator("execute_command, bash_command="echo 'execute'") --- with DAG('my_bash_dag', start_date=datetime(2021,1,1), schedule_interval='@daily', catchup=False) as dag: execute_command = BashOperator("execute_command, bash_command="scripts/commands.sh") $ cat scipts/commands.sh #!/bin/bash echo 'execute' exit 0 --- |
default로, 마지막 bash command는 xcom으로 push됨. 그래서 dlrp 싫으면 do_xcom_push=False를 task에 주면 됨
---
env로 key:value 넘길수 있음
BashOperator( "execute_command, bash_command="scripts/commands.sh", #skip_exit_code=99, #default #env={"my_var":"my_value"} env={"my_var":"my_value", "api_aws":"{{var.value.api_key_aws}}"} # variable ) -- $ cat scipts/commands.sh #!/bin/bash echo 'my apid : {{var.value.api_key_aws}}' exit 0 #exit 99 #task skip해줌 |
BashOperator는 user를 바꿀 수 없음, whoami 치면 airflow 나옴
- 36. Quick note about templated fields 2min
document > airflow.operators.bash
https://airflow.apache.org/docs/apache-airflow/stable/operators-and-hooks-ref.html
template_fields= ['bash_command', 'env'][source]
template_fields_renderers[source] template_ext= ['.sh', '.bash'][source] |
- 37. The PostgresOperator 15min
docker exec [container_id] airflow providers list : 설치된 provider를 확인할 수 있음
from airflow.providers.postgres.operators.postgres import PostgresOperator with DAG('my_postgres_dag', start_date=@daily(2021,1,1), schedule_interval='@daily', catchup=False) as dag: create_table = PostgresOperator( task_id="store", postgres_conn_id="postgres", #ui sql="CREATE IF NOT EXIST TABLE my_yable (table_value TEXT NOT NULL, PRIMARY KEY (table_value));" ) store = PostgresOperator( task_id="store", postgres_conn_id="postgres", #ui sql="INSERT INTO my_table VALUES ('my_value')" #sql="sql/CREATE_TABLE_MY_TABLE.sql" #sql=[ # "INSERT INTO my_table VALUES ('my_value') ON CONFLICT(table_value) DO UPDATE SET table_value='my_new_value'", # "SELECT * FROM my_table", # ] ) # add connection conn_id postgres conn_type Postgres login postgres poassword postgres port 5432 |
create_table task 실행하기
$ airflow tasks test my_postgres_Dag create_table 2021-01-01
$ airflow tasks test my_postgres_Dag store 2021-01-01
데이터 화인은 docker container 들어가서
$ psql -Upostgres
- 38. Passing Dynamic Parameters to the PostgresOperator 9min
---
store = PostgresOperator( task_id="store", postgres_conn_id="postgres", #ui sql=["sql/INSERT_INTO_MY_TABLE.sql"], parametes={ 'filename':'data.csv' } ) insert into my_table(table_value) values (%(filename)%) on CONFLICT(table_value) do nothing |
(Operator 수정하기)
.sql 을 이용할 수도 있음
- parameters 라는 변수는 tem[plate_field에 없음, 그렇기 때문에 추가해야함
template_fields:Sequence[str] = ['sql'][source]
class CustomPostgresOperator(PostgresOperator): template_fields = ('sql', 'parametes') def _my_task(): return 'tweets.csv' --- with DAG (생략): my_task = PythonOperator ( "my_task", python_callable=_my_task) store = CustomPostgresOperator( task_id="store", postgres_conn_id="postgres", #ui sql=["sql/INSERT_INTO_MY_TABLE.sql"], parametes={ 'filename':'{{ti.xcom_pull(task_ids=["my_task"][0]}}' } ) insert into my_table(table_value) values (%(filename)%) on CONFLICT(table_value) do nothing |
Section 4: Choose your path 0 / 6 | 59min
-
39. Introduction 1min특정 조건에 맞는 시작- 평일에만 시작- 특정 시간에만 시작
-
40. The BranchPythonOperator 14mintraining -> check accurate(BranchPythonOperator) -> accurate (run)-> inaccurate (skipped)
$ ml_dag.py
from airflow.operators.python import BranchPythonOperator
from airflow.operators.dummy import DummyOperator
def _check_accuracy():
accuracy = 0.16
if accuracy > 0.15:
return ['accurate', 'top_accurate']
retrun 'inaccurate'
with DAG() :
training_ml = DummyOperator("training_ml")
check_accuracy = BranchPythonOpreator(
"check_accuracy",
python_callable=_check_accuracy
)
accurate = DummyOperator("accurate")
top_accurate = DummyOperator("accurate")
inaccurate = DummyOperator("inaccurate")
# 이렇게하면 publish_ml도 skip된다는 문제가 생김 -> Trigger Rule이 필요함
#publish_ml = DummyOperator("publish_ml")
#training_ml >> check_accuracy >> [accurate, inaccurate] >> publish_ml
#publish_ml = DummyOperator("publish_ml", trigger_rule="none_failed_or_skipped")
#training_ml >> check_accuracy >> [accurate, inaccurate] >> publish_ml
taining_ml >> check_accuracy >> [accurate, inaccurate, top_accurate ] >> publish_ml
$ cat days_off.yml
- "2021-01-01"
- "2021-03-01"
- "2021-05-05"
$ ml_dag.py from airflow.operators.python import BranchPythonOperator from airflow.operators.dummy import DummyOperator impory yaml def _check_accuracy(ds): with open('dags/files/days_off.yml', 'r') as f: days_off = set(yaml.load(f, Loader=yaml.FullLoader)) if ds not in days_off: return 'process' return 'stop' process = DummyOperator("process") cleaning_stock = DummyOperator("cleaning_stock") stop = DummyOperator("stop") check_holiddays >> [[process, stop]] process>> cleaning_stock |
- 42. The BranchSQLOperator 16min
$ branch_sql_dag.py from airflow.providers.sql import BranchSQLOperator with DAG(): create_table = PostgresOperator( "create_table", sql = "sql/CREATE_TABLE_PARTNERS.sql", postgres_conn_id="postgres" ) insert_data = PostgresOperator( "insert_data", sql = "sql/CREATE_TABLE_PARTNERS.sql", postgres_conn_id="postgres" ) choose_task = BranchSQLOperator( "choose_task", sql="SELECT COUNT(1) FROM partners WHERE partner_status=TRUE", follow_task_ids_if_true=['postgres'], follow_task_ids_of_false=['notif_email', 'notif_slack'], conn_id='postgres', ) process = DummOperator("proccess") notif_email= DummOperator("notif_email") notif_slack= DummOperator("notif_slack") create_table >> insert_data >> choose_task >> [process, notif_email, notif_slack] ------------- A, True => follow_task_ids_of_true ------------ ------------- A, FALSE => follow_task_ids_of_false ------------ |
- 43. The BranchDateTimeOperator 8min
$ traget_dag.py from airflow.operators.datetime import BranchDateTimeOperator from datetime import datetime, time is_in_timeframe = BranchDateTimeOperator( "is_in_timeframe", follow_task_ids_if_true=['move_forward'], follow_task_ids_if_false=['end'], target_lower=datetime(2021, 1, 1, 10, 0, 0), target_upper=datetime(2021, 1, 1, 11, 0, 0), #target_lower=time(10, 0, 0), #target_upper=time(11, 0, 0), use_task_execution_date=True #과거 trigger task도 체크하도록 변경, default가 false(현재 시각만 확인) ) # 2021-01-01 10am 부터 11am에 실행되면 follow_task_ids_if_true , 나머지는 follow_task_ids_if_false move_forward= DummOperator("move_forward") end= DummOperator("end") is_in_timeframe >> [move_forward, end] |
- 44. The BranchDayOfWeekOperator 11min
$ branch_week_dag.py from airflow.operators.weekday import BranchDayOfWeekOperator from airflow.utils.weekday import WeekDay task_a = DummyOperator("task_a") task_b = DummyOperator("task_b") is_wednesday = BranchDayOfWeekOperator( "is_wednesday", follow_task_ids_if_true=['task_c'], follow_task_ids_if_false=['end'], week_day=WeekDay.WEDNESDAY, #week_day={WeekDay.TUESEDAY,WeekDay.WEDNESDAY} -> tree view에서 execution_date를 봐야함 use_task_execution=True, #False면, 현재 시간을 기준으로 진행 ) task_c = DummyOperator("task_c") end= DummOperator("end") |