'데이터 엔지니어'로 성장하기

정리하는 걸 좋아하고, 남이 읽으면 더 좋아함

Data/Airflow

udemy)Airflow Operators Guide_3,4장 :: mightytedkim

MightyTedKim 2022. 2. 22. 11:33
728x90
반응형

 

 

SECTION 3: The most Common Operator  | 1hr 12min

  • 32. Introduction to Providers
provider package 
 - 제3자가 제공하는 것을 바로 적용할 수 있음
 - 장점 : airflow의 업데이트를 기다릴 필요없음
 
 s3의 경우 amz provider를 설치하면됨
 - pip install apache-airflow-providers-amazon
설명은 operator 가이드에 들어가서 보면됨
 
astronomer의 홈페이지 
 
Provider 설치 예시
-  requiremnets.txt
 airflow-provider-great-expectations==0.0.6
 
x-airflow-common이 먼저 실행되는데
image 부분에 build를 이용하면 됨 -> 그런데 그냥 이미지 빌드된거 쓸듯
33. The PythonOperator 15min
생각보다 우리는 pythonOperator 잘 모름
1. airflow tasks 
  - cmd로 특정 task 실행해서 test 해보기
  - airflow tasks test dag_id dag_id 2021-01-01
 
2. airflow dag 시작할 때 print 하는 task를 넣자
 
3. op_args (list)
def _proccess(path, filename):
  print(f"{path}/{filename}"
 
PythonOperator(
  python_callable=_proccess,
  op_args=['/user/local/airflow', 'data.csv']
)
 
4. op_kargs (key,value)
def _proccess(path, filename):
  print(f"{path}/{filename}"
PythonOperator(
  python_callable=_proccess,
  op_kargs={
    'path': '/user/local/airflow',
    'filename': 'data.csv'
)
5. jinja template
# UI에서 variable key, value 추가
  - path: /user/local/airflow
  - filename: data.csv
PythonOperator(
  python_callable=_proccess,
  op_kargs={
    'path': '{{var.value.path}}',
    'filename': '{{var.value.filename}}'
)
 
# Variable로 가져오기
  - my_settings: {"path":"/user/local/airflow", "filename":"data.csv"}
PythonOperator(
  python_callable=_proccess,
  op_kargs=Variable.get("my_settings", deserialize_json=True)
)
 
6. Text Context (마지막에)
- provide_context 이제 airflow 2.0 부터 default가 되어버림
def _proccess(path, filename, **context):
  print(f"{path}/{filename} - {context['ds']}")
 
def _proccess(path, filename, ti):
def _proccess(path, filename, ds)
  • 34. The PythonOperator with the TaskFlow API 7min
taskAPI를 사용하면 편함, 새로 나온거임
 
from airflow.decorators import task
 
@task(task_id="task_a")
def process(my_settings):
  print(f"{path}/{filename} - {my_settings['filename']}")
process(Variabel.get("my_settings", deserialize_json=True))

get_current_context 로 text context 가져오기

from airflow.decorators import task
from airflow.operators.python import get_current_context
 
@task(task_id="task_a")
def process(my_settings):
  context = get_current_context
  print(f"{path}/{filename} - {my_settings['filename']} - {context['ds']}")

process(Variabel.get("my_settings", deserialize_json=True))

혼합해서 사용하는 예시

from airflow.decorators import task
from airflow.operators.python import get_current_context
 
@task(task_id="task_a")
def process(my_settings):
  context = get_current_context
  print(f"{path}/{filename} - {my_settings['filename']} - {context['ds']}")

store = BashoOperator( task_id="store", bash_command="echo 'store'")

process(Variabel.get("my_settings", deserialize_json=True)) >> store

 

  • 35. The BashOperator 15min

자만하지 않기, bash 도 잘 모르면서 다른거 사용하려하지 말기

from airflow.operators.bash import BashOperator

with DAG('my_bash_dag', start_date=datetime(2021,1,1), schedule_interval='@daily', catchup=False) as dag:
  execute_command = BashOperator("execute_command, bash_command="echo 'execute'")

---
with DAG('my_bash_dag', start_date=datetime(2021,1,1), schedule_interval='@daily', catchup=False) as dag:
  execute_command = BashOperator("execute_command, bash_command="scripts/commands.sh")

$ cat scipts/commands.sh
#!/bin/bash
echo 'execute'
exit 0
---

default로, 마지막 bash command는 xcom으로 push됨. 그래서 dlrp 싫으면 do_xcom_push=False를 task에 주면 됨

---

env로 key:value 넘길수 있음

 

 BashOperator(
  "execute_command,
  bash_command="scripts/commands.sh",
  #skip_exit_code=99, #default
  #env={"my_var":"my_value"}
  env={"my_var":"my_value", "api_aws":"{{var.value.api_key_aws}}"} # variable
)
--
$ cat scipts/commands.sh
#!/bin/bash
echo 'my apid : {{var.value.api_key_aws}}'
exit 0
#exit 99 #task skip해줌 

BashOperator는 user를 바꿀 수 없음, whoami 치면 airflow 나옴

  • 36. Quick note about templated fields 2min  

document > airflow.operators.bash

https://airflow.apache.org/docs/apache-airflow/stable/operators-and-hooks-ref.html

template_fields= ['bash_command', 'env'][source]
template_fields_renderers[source]
template_ext= ['.sh', '.bash'][source]
  • 37. The PostgresOperator 15min

docker exec [container_id] airflow providers list : 설치된 provider를 확인할 수 있음

from airflow.providers.postgres.operators.postgres import PostgresOperator

with DAG('my_postgres_dag', start_date=@daily(2021,1,1), schedule_interval='@daily', catchup=False) as dag:

  create_table = PostgresOperator(
     task_id="store",
     postgres_conn_id="postgres", #ui
     sql="CREATE IF NOT EXIST TABLE my_yable (table_value TEXT NOT NULL, PRIMARY KEY (table_value));"
  )

  store = PostgresOperator(
     task_id="store",
     postgres_conn_id="postgres", #ui
     sql="INSERT INTO my_table VALUES ('my_value')"
     #sql="sql/CREATE_TABLE_MY_TABLE.sql"   
     #sql=[
     #          "INSERT INTO my_table VALUES ('my_value') ON CONFLICT(table_value) DO UPDATE SET table_value='my_new_value'",
     #          "SELECT * FROM my_table", 
     #         ]
  )


# add connection
conn_id postgres
conn_type Postgres
login postgres
poassword postgres
port 5432

create_table task 실행하기

$ airflow tasks test my_postgres_Dag create_table 2021-01-01

$ airflow tasks test my_postgres_Dag store 2021-01-01

 

데이터 화인은 docker container 들어가서 

$ psql -Upostgres

 

  • 38. Passing Dynamic Parameters to the PostgresOperator 9min

---

  store = PostgresOperator(
     task_id="store",
     postgres_conn_id="postgres", #ui
     sql=["sql/INSERT_INTO_MY_TABLE.sql"],
     parametes={ 'filename':'data.csv' }     
  )

insert into my_table(table_value)
values (%(filename)%)
on CONFLICT(table_value)
do nothing

(Operator 수정하기)

.sql 을 이용할 수도 있음

 - parameters 라는 변수는 tem[plate_field에 없음, 그렇기 때문에 추가해야함

template_fields:Sequence[str] = ['sql'][source]

 

class CustomPostgresOperator(PostgresOperator):
  template_fields = ('sql', 'parametes')

def _my_task():
  return 'tweets.csv'
---
with DAG (생략):

  my_task = PythonOperator ( "my_task", python_callable=_my_task)

  store = CustomPostgresOperator(
     task_id="store",
     postgres_conn_id="postgres", #ui
     sql=["sql/INSERT_INTO_MY_TABLE.sql"],
     parametes={ 'filename':'{{ti.xcom_pull(task_ids=["my_task"][0]}}' }     
  )

insert into my_table(table_value)
values (%(filename)%)
on CONFLICT(table_value)
do nothing

 

Section 4: Choose your path 0 / 6 | 59min

  • 39. Introduction 1min
    특정 조건에 맞는 시작
    - 평일에만 시작
    - 특정 시간에만 시작
  • 40. The BranchPythonOperator 14min
    training -> check accurate(BranchPythonOperator) -> accurate (run)
                                                                        -> inaccurate (skipped)
     
    $ ml_dag.py
    from airflow.operators.python import BranchPythonOperator
    from airflow.operators.dummy import DummyOperator

    def _check_accuracy():
      accuracy = 0.16
      if accuracy > 0.15: 
        return ['accurate', 'top_accurate']
      retrun 'inaccurate'

    with DAG() :
      training_ml = DummyOperator("training_ml")

      check_accuracy = BranchPythonOpreator(
         "check_accuracy",
          python_callable=_check_accuracy
       )
      accurate = DummyOperator("accurate")
      top_accurate = DummyOperator("accurate")
      inaccurate = DummyOperator("inaccurate")

      # 이렇게하면 publish_ml도 skip된다는 문제가 생김 -> Trigger Rule이 필요함 
      #publish_ml = DummyOperator("publish_ml")
      #training_ml >> check_accuracy >> [accurate, inaccurate] >> publish_ml

      #publish_ml = DummyOperator("publish_ml", trigger_rule="none_failed_or_skipped")
      #training_ml >> check_accuracy >> [accurate, inaccurate] >> publish_ml


       taining_ml >> check_accuracy >> [accurate, inaccurate, top_accurate ] >> publish_ml

     
41. DAG scheduling based on Calendar! 9min
휴일에 돌리고 싶지 않을 때 쓰는 방법
 
- holiday를 정의하는 정보만 있으면 됨
$ cat days_off.yml
- "2021-01-01"
- "2021-03-01"
- "2021-05-05"

$ ml_dag.py

from airflow.operators.python import BranchPythonOperator
from airflow.operators.dummy import DummyOperator
impory yaml

def _check_accuracy(ds):
  with open('dags/files/days_off.yml', 'r') as f:
    days_off = set(yaml.load(f, Loader=yaml.FullLoader))
    if ds not in days_off:
      return 'process'
  return 'stop'

  process = DummyOperator("process")
  cleaning_stock = DummyOperator("cleaning_stock")
  stop = DummyOperator("stop")

  check_holiddays >> [[process, stop]]
  process>> cleaning_stock
  • 42. The BranchSQLOperator 16min
특정 값에 따라 Operator를 다르게 사용할 수 잇다는 장점이 있음
 
$ branch_sql_dag.py

from airflow.providers.sql import BranchSQLOperator

with DAG():

   create_table = PostgresOperator(
     "create_table",
      sql = "sql/CREATE_TABLE_PARTNERS.sql",
      postgres_conn_id="postgres"
   )

   insert_data = PostgresOperator(
     "insert_data",
      sql = "sql/CREATE_TABLE_PARTNERS.sql",
      postgres_conn_id="postgres"
   )

  choose_task = BranchSQLOperator(
     "choose_task",
      sql="SELECT COUNT(1) FROM partners WHERE partner_status=TRUE",
      follow_task_ids_if_true=['postgres'],
      follow_task_ids_of_false=['notif_email', 'notif_slack'],
      conn_id='postgres',
   )

  process = DummOperator("proccess")
  notif_email= DummOperator("notif_email")
  notif_slack= DummOperator("notif_slack")

  create_table  >> insert_data  >> choose_task  >> [process, notif_email, notif_slack]

-------------
 A, True  => follow_task_ids_of_true
------------
-------------
 A, FALSE => follow_task_ids_of_false
------------
  • 43. The BranchDateTimeOperator 8min
10AM ~ 11AM은 trigger되지 않게하는 방법
$ traget_dag.py

from airflow.operators.datetime import BranchDateTimeOperator
from datetime import datetime, time

  is_in_timeframe = BranchDateTimeOperator(
    "is_in_timeframe",
     follow_task_ids_if_true=['move_forward'],
     follow_task_ids_if_false=['end'],
     target_lower=datetime(2021, 1, 1, 10, 0, 0),
     target_upper=datetime(2021, 1, 1, 11, 0, 0),
     #target_lower=time(10, 0, 0),
     #target_upper=time(11, 0, 0),
     use_task_execution_date=True #과거 trigger task도 체크하도록 변경, default가 false(현재 시각만 확인)
  )

# 2021-01-01 10am 부터 11am에 실행되면 follow_task_ids_if_true , 나머지는 follow_task_ids_if_false

  move_forward= DummOperator("move_forward")
  end= DummOperator("end")

  is_in_timeframe  >> [move_forward, end]
  • 44. The BranchDayOfWeekOperator 11min
schedulerInterval을 dag마다 1개씩 줄 수밖에 없다는 불편함이 있음
 
$ branch_week_dag.py

from airflow.operators.weekday import BranchDayOfWeekOperator
from airflow.utils.weekday import WeekDay

task_a = DummyOperator("task_a")
task_b = DummyOperator("task_b")

is_wednesday = BranchDayOfWeekOperator(
  "is_wednesday",
  follow_task_ids_if_true=['task_c'],
  follow_task_ids_if_false=['end'],
  week_day=WeekDay.WEDNESDAY,
  #week_day={WeekDay.TUESEDAY,WeekDay.WEDNESDAY} -> tree view에서 execution_date를 봐야함 
  use_task_execution=True, #False면, 현재 시간을 기준으로 진행
)


task_c = DummyOperator("task_c")
end= DummOperator("end")

728x90
반응형