Airflow) Pool 활용해서 dag 겹침 이슈 해결
평화로운 어느날
sparkKubernetesOperator가 미쳐 날뛰기 시작했다.
분명 10분정도면 끝나는 spark job인데, 30분이 넘게 걸려서 뒤의 작업과 겹쳐버렸다. (내 잘못ㅜ)
grafana 보니까, spark-driver가 여러개 띄어져 있고 executor는 더 난리.
혹시 몰라 grafana에 대시보드 spark 용으로 만들어둬서 다행
spark thrift server ui를 보니까, 0.1초면 끝나는 작업이 hang 걸려있고 난리도 난리가 아니었다.
schedule_interval="0 4 * * * " -> 10분이면 끝나는 작업
schedule_interval="30 4 * * * " -> 30분 넘게 걸려서 다른 작업과 겹침
줄줄이 비엔나로 겹침
쨋든 문제는 발견해서 다행
1. 순서가 있는 dag 2개를 합치기
-> 비슷한 성격의 dag들이 얼마나 늘어날지 모르고, 큰 작업들을 하나의 dag로 몰아넣기가 부담스러웠음
2. taskSensor
-> dag dependency, 멋있는 작업이지만 관리하기 힘들다는 면에서 이것도 부담스러웟음
3. pool
-> 무조건 k8s 서버에 1개의 task만 실행되도 되는 작업이기 때문에 적합하다고 판단
예시는 예전에 작성했던 sko 예시에 bash_operator 1개를 추가
- ui에서 connection pool을 slot 1개로 생성 (Kubernetes Cluster Connection) -> kubeConn
- task 1개 추가
wait_task= BashOperator(task_id="task_1",bash_command="echo 'task_1'",pool= "single_pool",dag=dag) |
https://mightytedkim.tistory.com/43
from datetime import datetime, timedelta
# [START import_module]
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash import BashOperator
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import SparkKubernetesSensor
#etc
import pathlib
dag = DAG(
'sample-manual-spark-sko',
default_args={'pool': 'single_pool'}, #max_active_runs': 1, '
description='submit spark-pi as sparkApplication on kubernetes',
schedule_interval=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['sample','manual','spark','sko']
)
start_task = DummyOperator(task_id='start_task', retries=3)
wait_task= BashOperator(task_id="task_1",bash_command="echo 'task_1'",pool= "single_pool",dag=dag)
end_task = DummyOperator(task_id='end_task', retries=3)
############################
# Pod
############################
# spark-pi-20211122t070418-1-driver 1/1 Running 0 15s
# spark-pi-20211122t070418-1-a882cc7d46760183-exec-1 1/1 Running 0 4s
# spark-pi-20211122t070418-1-a882cc7d46760183-exec-2 1/1 Running 0 4s
# spark-pi-20211122t070418-1-a882cc7d46760183-exec-3 1/1 Running 0 4s
spark_task = SparkKubernetesOperator(
task_id='task1-spark',
namespace="spark-operator",
application_file=pathlib.Path("/opt/airflow/dags/repo/script/sample/sample-sko-spark.yaml").read_text(),
kubernetes_conn_id="kubeConn", #ns default in airflow connection UI
do_xcom_push=True,
dag=dag
)
############################
# logs
############################
# saving log from executors
# k logs spark-pi-20211122t065844-2-driver
logging_task = SparkKubernetesSensor(
task_id='sample-sko-spark-log',
namespace="spark-operator",
application_name="{{ task_instance.xcom_pull(task_ids='task1-spark')['metadata']['name'] }}",
kubernetes_conn_id="kubeConn",
attach_log=True, #
dag=dag,
)
start_task >> wait_task>> spark_task >> logging_task >> end_task
|
돌려보니까 원하는 결과가 나온다.
이제 아침에 놀랄일은 줄어들것 같다.
- dummyoperator, SparkKubernetesOperator, SparkKubernetesSensor 등의 오퍼레이터는 pool이 안먹힌다.
그래서 bashoperator를 넣었다. python으로 해도 상관은 없음