airflow 1.10 부터 적용되고, airflow 2.0에서 본격적으로 사용되는 KPO
자료가 없어서 2주간의 삽질기 ㅎㅎ
아 진짜 쫄깃 쫄깃하고, 안되서 도중에 argo를 파야하나 생각하고 휴..
요약
airflow는 airbnb에서 만든 파이프라인 관리 어플리케이션입니다.
현재 라인, 쏘카 등에서 사용하고 있습니다.
저는 네이티브 환경의 kubernetes에서 airflow를 사용했어요.
- kuberentesPodOperator 설정
- KubeneresPodOperator : DAG
- LOGGING : minIO, connection
- GIT-SYNC : secret, ssh
- Helm : values.yaml
- KubeneresPodOperator
- GIT-SYNC : secret, ssh
- LOGGING
작동 방식에 대해서는 인터넷에 글들이 많은데, 실제 helm으로 작동하는 설정을 공유하는게 없어서 정리했어요
도움이 되었으면 좋겠습니다. ㅎ
KubeneresPodOperator : DAG
실제 동작하는 컨테이너를 실행하기 위해 worker pod를 임시로 만드는 구조
아래 그림에서는 hadoop3.x Spark2.x 라는 이미지를 실제로 실행하는데, 중간에 airflow worker가 임시로 만들어짐
helm values.yaml 예시
env 로 붙는 것은 airflow.cfg로 들어갑니다. AIRFLOW__ 뒤에 붙는 것들이 하나씩 계층이되는거죠
defaultAirflowRepository: priavate.repo/library/apache/airflow (...) executor: "KubernetesExecutor" #LocalExecutor (...) env: - name: "AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY" value: "private.repo/library/apache/airflow" - name: "AIRFLOW__KUBERNETES__WORKER_CONATINER_TAG" value: "2.1.4" - name: "AIRFLOW__KUBERNETES__RUN_AS_USER" value: "50000" - name: "AIRFLOW__KUBERNETES__DAGS_IN_IMAGE" value: "True" |
* 따옴표를 빼먹으면 이런식으로 소문자로 들어가요.
작동하는데는 크게 없는것 같은데 나중에 혹시 문제 생길 수도 있으니께 메모
- name: AIRFLOW__KUBERNETES__DAGS_IN_IMAGE value: true |
DAG 파일 예시
- spark 관련된 설정은 "--conf","spark.driver.host=localhost 이 가장 중요
from airflow import DAG from airflow import configuration as conf #etc from datetime import datetime, timedelta #operator from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator default_args = { 'owner': 'airflow', 'start_date': datetime(2019, 1, 1)} dag = DAG('sample-spark-312-aws-readparquet-dag', schedule_interval='@once', default_args=default_args) ############################## k_2 = KubernetesPodOperator( task_id='spark-312-aws-readparquet-task', name='spark-312-aws-readparquet-name', namespace='airflow-test', image='private.repo/spark-3.1.2-util:1.0.1', # 테스트로 만든 이미지 arguments=[ "/opt/spark/bin/spark-submit", "--master", "local", "--conf","spark.ui.enabled=false", "--conf","spark.driver.host=localhost", # 이거 안해주면 host가 pod 명을 따라가면서 에러가 발생(명명규칙 에러) "/app/main.py"], in_cluster=True, config_file=None, is_delete_operator_pod=False, get_logs=True, dag=dag ) |
LOGGING : minIO, connection
클러스터를 구성하다보니, 로그를 로컬에 저장할 수 없었음. 그래서 log를 minio에 저장함
- 인터넷에 S3 저장하는 방법을 참고했음
- 로그는 airflow-webserver의 것을 저장
- 자세한 설명은 https://mightytedkim.tistory.com/38
1. helm values.yaml 예시
helm으로 연결
env: - name: "AIRFLOW__CORE__REMOTE_LOGGING" value: "True" - name: "AIRFLOW__CORE__REMOTE_LOG_CONN_ID" value: "MyS3Conn" - name: "AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER" value: "s3://airflow-logs/temp" |
2. ui에서 connection 생성
UI에서 connection을 만드는 작업. 테스트를 위해서 일단 extra 부분에 json으로 정보를 입력할 수 있어요
conn id : MyS3Conn conn Type : S3 (나머지는 공란) Extra :
|
3. bucket 생성
minIO나 ceph object storage나 상관 없어요.
두개 모두 s3가 호환이 되니까요
http://1**.**.**.**:9000/object-browser/airflow-logs |
+ apache-airflow[s3] 설치
helm으로 설치하면 포함되어 있어요
하지만 python으로 설치하면 따로 pip 해줘야해요
$ pip install apache-airflow[s3] $ (base) test@airflow:~/airflow:]$ pip freeze | grep s3 s3transfer==0.5.0 |
GIT-SYNC : secret, ssh
DAG 파일을 scheduler/webserver/worker에 공유할 때 설정
(k8s에서는 실제 동작은 scheduler pod 안의 scheduler container에서 함)
helm values.yaml 예시
git ssh 연동할 때 UI에서 복사하면
git@1**.**.1**.**:tedkim/airflow-dags.git 이렇게 나올 텐데 : 이 아니라 / 에요.
# Git sync dags: (...) gitSync: enabled: true #false repo: ssh://git@1**.**.1**.**/tedkim/airflow-dags.git branch: main rev: HEAD depth: 1 # the number of consecutive failures allowed before aborting maxFailures: 0 # subpath within the repo where dags are located # should be "" if dags are at repo root subPath: dags/ #"tests/dags" 예시는 쌍따옴표 있는데, 쌍따옴표 넣으면 에러남 sshKeySecret: airflow-ssh-git-secret # 위에서 만들었던 secret wait: 10 |
kubectl get secret
gitlab에 등록한 private key를 secret에 넣는 작업이에요.
git 인증서를 kubernetes secret에 저장하고, values.yaml에서 호출
$ kubectl create secret generic \ airflow-ssh-git-secret \ # --from-file=gitSshKey=/home/tedkim/gitlab_key/id_ed25519 \ -n airflow-test $ k get secret airflow-ssh-git-secret -n airflow-test NAME TYPE DATA AGE airflow-ssh-git-secret Opaque 1 8d |
HELM : values.yaml
helm을 이용한 설치 진행함 관리하기 편하고, 직관적임
helm 명령어
helm upgrade --install airflow apache-airflow/airflow -n airflow-webinar -f values.yaml --debug |
최종 helm values.yaml
defaultAirflowRepository: priavate.repo/library/apache/airflow (...) executor: "KubernetesExecutor" #LocalExecutor (...) env: - name: "AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY" value: "private.repo/library/apache/airflow" - name: "AIRFLOW__KUBERNETES__WORKER_CONATINER_TAG" value: "2.1.4" - name: "AIRFLOW__KUBERNETES__RUN_AS_USER" value: "50000" - name: "AIRFLOW__KUBERNETES__DAGS_IN_IMAGE" value: "True" - name: "AIRFLOW__CORE__REMOTE_LOGGING" value: True - name: "AIRFLOW__CORE__REMOTE_LOG_CONN_ID" value: "MyS3Conn" - name: "AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER" value: "s3://airflow-logs/temp" (...) # Git sync dags: (...) gitSync: enabled: true #false repo: ssh://git@1**.**.1**.**/tedkim/airflow-dags.git branch: main rev: HEAD depth: 1 # the number of consecutive failures allowed before aborting maxFailures: 0 # subpath within the repo where dags are located # should be "" if dags are at repo root subPath: dags/ #"tests/dags" 예시는 쌍따옴표 있는데, 쌍따옴표 넣으면 에러남 sshKeySecret: airflow-ssh-git-secret # 위에서 만들었던 secret wait: 10 |
참고
https://tech.socarcorp.kr/data/2021/06/01/data-engineering-with-airflow.html
https://bomwo.cc/posts/kubernetespodoperator/
- https://engineering.linecorp.com/ko/blog/data-engineering-with-airflow-k8s-2/
'Data > Airflow' 카테고리의 다른 글
Airflow) helm Api 설정 켜기_auth_backend (0) | 2021.12.17 |
---|---|
Kubernetes) k8s와 Airflow 이용한 spark작업_SparkKubernetesOperator (14) | 2021.11.25 |
Airflow) log를 minio에 저장_connection,yaml (0) | 2021.11.12 |
Airflow) Kubernetes에 올리기(yaml) (0) | 2021.10.28 |
Airflow) celery worker 환경 구성하기 (0) | 2021.10.12 |