'데이터 엔지니어'로 성장하기

정리하는 걸 좋아하고, 남이 읽으면 더 좋아함

Data/Airflow

Airflow) Pool 활용해서 dag 겹침 이슈 해결

MightyTedKim 2022. 3. 15. 16:27
728x90
반응형
 

평화로운 어느날

sparkKubernetesOperator가 미쳐 날뛰기 시작했다.

 

분명 10분정도면 끝나는 spark job인데, 30분이 넘게 걸려서 뒤의 작업과 겹쳐버렸다. (내 잘못ㅜ)

grafana 보니까, spark-driver가 여러개 띄어져 있고 executor는 더 난리.

혹시 몰라 grafana에 대시보드 spark 용으로 만들어둬서 다행

spark thrift server ui를 보니까, 0.1초면 끝나는 작업이 hang 걸려있고 난리도 난리가 아니었다.

 

schedule_interval="0 4 * * * " -> 10분이면 끝나는 작업

schedule_interval="30 4 * * * " -> 30분 넘게 걸려서 다른 작업과 겹침

줄줄이 비엔나로 겹침

 

쨋든 문제는 발견해서 다행

1. 순서가 있는 dag 2개를 합치기 

-> 비슷한 성격의 dag들이 얼마나 늘어날지 모르고, 큰 작업들을 하나의 dag로 몰아넣기가 부담스러웠음 

2. taskSensor

-> dag dependency, 멋있는 작업이지만 관리하기 힘들다는 면에서 이것도 부담스러웟음

3. pool

-> 무조건 k8s 서버에 1개의 task만 실행되도 되는 작업이기 때문에 적합하다고 판단


예시는 예전에 작성했던 sko 예시에 bash_operator 1개를 추가

- ui에서 connection pool을 slot 1개로 생성 (Kubernetes Cluster Connection) -> kubeConn

- task 1개 추가

wait_task= BashOperator(task_id="task_1",bash_command="echo 'task_1'",pool= "single_pool",dag=dag)

https://mightytedkim.tistory.com/43

 

Kubernetes) k8s와 Airflow 이용한 spark_submit (SparkK8sOperator)

요약 1. kubenetes 환경에서 airflow를 이용해 spark_submit을 함 2. SparkKubernetesOpertor(SKO)를 선택함 개요 상황 쿠버네티스 클러스터 환경에서 spark_submit 관리 필요 후보 (3가지) KubernetePodOperator..

mightytedkim.tistory.com

 

from datetime import datetime, timedelta

# [START import_module]
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash import BashOperator

from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import SparkKubernetesSensor

#etc
import pathlib
 
dag = DAG(
    'sample-manual-spark-sko',
    default_args={'pool''single_pool'}, #max_active_runs': 1, '
    description='submit spark-pi as sparkApplication on kubernetes',
    schedule_interval=timedelta(days=1),
    start_date=datetime(202111),
    catchup=False,
    tags=['sample','manual','spark','sko']
)

start_task  = DummyOperator(task_id='start_task', retries=3)
wait_task= BashOperator(task_id="task_1",bash_command="echo 'task_1'",pool= "single_pool",dag=dag)
end_task    = DummyOperator(task_id='end_task', retries=3)

############################
# Pod
############################
# spark-pi-20211122t070418-1-driver                    1/1     Running     0          15s
# spark-pi-20211122t070418-1-a882cc7d46760183-exec-1   1/1     Running     0          4s
# spark-pi-20211122t070418-1-a882cc7d46760183-exec-2   1/1     Running     0          4s
# spark-pi-20211122t070418-1-a882cc7d46760183-exec-3   1/1     Running     0          4s
spark_task = SparkKubernetesOperator(
    task_id='task1-spark',
    namespace="spark-operator",
    application_file=pathlib.Path("/opt/airflow/dags/repo/script/sample/sample-sko-spark.yaml").read_text(),
    kubernetes_conn_id="kubeConn"#ns default in airflow connection UI
    do_xcom_push=True,
    dag=dag
)

############################
# logs
############################
# saving log from executors 
# k logs spark-pi-20211122t065844-2-driver
logging_task = SparkKubernetesSensor(
    task_id='sample-sko-spark-log',
    namespace="spark-operator",
    application_name="{{ task_instance.xcom_pull(task_ids='task1-spark')['metadata']['name'] }}",
    kubernetes_conn_id="kubeConn",
    attach_log=True#
    dag=dag,
)

start_task >> wait_task>> spark_task >> logging_task >> end_task

 

돌려보니까 원하는 결과가 나온다.

이제 아침에 놀랄일은 줄어들것 같다.

 

- dummyoperator, SparkKubernetesOperator, SparkKubernetesSensor 등의 오퍼레이터는 pool이 안먹힌다.

그래서 bashoperator를 넣었다. python으로 해도 상관은 없음

728x90
반응형