'데이터 엔지니어'로 성장하기

정리하는 걸 좋아하고, 남이 읽으면 더 좋아함

Data/Airflow

Airflow) K8S Pod 만들기_k8sPodOperator, helm

MightyTedKim 2021. 11. 12. 13:44
728x90
반응형

 

 

airflow 1.10 부터 적용되고, airflow 2.0에서 본격적으로 사용되는 KPO

자료가 없어서 2주간의 삽질기 ㅎㅎ

아 진짜 쫄깃 쫄깃하고, 안되서 도중에 argo를 파야하나 생각하고 휴..

요약

airflow는 airbnb에서 만든 파이프라인 관리 어플리케이션입니다.

현재 라인, 쏘카 등에서 사용하고 있습니다.

저는 네이티브 환경의 kubernetes에서 airflow를 사용했어요.

 

  • kuberentesPodOperator 설정
    • KubeneresPodOperator : DAG
    • LOGGING : minIO, connection
    • GIT-SYNC : secret, ssh
    • Helm : values.yaml
      • KubeneresPodOperator
      • GIT-SYNC : secret, ssh
      • LOGGING

작동 방식에 대해서는 인터넷에 글들이 많은데, 실제 helm으로 작동하는 설정을 공유하는게 없어서 정리했어요

도움이 되었으면 좋겠습니다. ㅎ


KubeneresPodOperator : DAG

실제 동작하는 컨테이너를 실행하기 위해 worker pod를 임시로 만드는 구조

아래 그림에서는 hadoop3.x Spark2.x 라는 이미지를 실제로 실행하는데, 중간에 airflow worker가 임시로 만들어짐

 

helm values.yaml 예시

env 로 붙는 것은 airflow.cfg로 들어갑니다. AIRFLOW__ 뒤에 붙는 것들이 하나씩 계층이되는거죠


defaultAirflowRepository: priavate.repo/library/apache/airflow 
(...)
executor: "KubernetesExecutor#LocalExecutor
(...)
env:
  - name: "AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY"
    value: "private.repo/library/apache/airflow"
  - name: "AIRFLOW__KUBERNETES__WORKER_CONATINER_TAG"
    value: "2.1.4"
  - name: "AIRFLOW__KUBERNETES__RUN_AS_USER"
    value: "50000"
  - name: "AIRFLOW__KUBERNETES__DAGS_IN_IMAGE"
    value: "True"

* 따옴표를 빼먹으면 이런식으로 소문자로 들어가요.

작동하는데는 크게 없는것 같은데 나중에 혹시 문제 생길 수도 있으니께 메모

- name: AIRFLOW__KUBERNETES__DAGS_IN_IMAGE
  value: true

DAG 파일 예시

from airflow import DAG
from airflow import configuration as conf
#etc
from datetime import datetime, timedelta
#operator
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator

default_args = { 'owner': 'airflow', 'start_date': datetime(2019, 1, 1)}
dag = DAG('sample-spark-312-aws-readparquet-dag', schedule_interval='@once', default_args=default_args)

##############################
k_2 = KubernetesPodOperator(
    task_id='spark-312-aws-readparquet-task',
    name='spark-312-aws-readparquet-name',
    namespace='airflow-test',
    image='private.repo/spark-3.1.2-util:1.0.1',  # 테스트로 만든 이미지
    arguments=[ "/opt/spark/bin/spark-submit",
                "--master", "local",
                "--conf","spark.ui.enabled=false",
                "--conf","spark.driver.host=localhost", # 이거 안해주면 host가 pod 명을 따라가면서 에러가 발생(명명규칙 에러)
                "/app/main.py"],
    in_cluster=True,
    config_file=None,
    is_delete_operator_pod=False,
    get_logs=True, 
    dag=dag
)

LOGGING : minIO, connection

클러스터를 구성하다보니, 로그를 로컬에 저장할 수 없었음. 그래서 log를 minio에 저장함

1. helm values.yaml 예시

helm으로 연결
env:
  - name: "AIRFLOW__CORE__REMOTE_LOGGING"
    value: "True"
  - name: "AIRFLOW__CORE__REMOTE_LOG_CONN_ID"
    value: "MyS3Conn"
  - name: "AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER"
    value:  "s3://airflow-logs/temp"

2. ui에서 connection 생성

UI에서 connection을 만드는 작업. 테스트를 위해서 일단 extra 부분에 json으로 정보를 입력할 수 있어요
conn id : MyS3Conn
conn Type : S3
(나머지는 공란)
Extra : 

  1. {"host" : "http://1**.**.**.**:9000", "aws_access_key_id" : "<your_access_key>" , "aws_secret_access_key": "<your_secret_key>" }

3. bucket 생성

minIO나 ceph object storage나 상관 없어요.
두개 모두 s3가 호환이 되니까요
http://1**.**.**.**:9000/object-browser/airflow-logs

+ apache-airflow[s3] 설치 

helm으로 설치하면 포함되어 있어요
하지만 python으로 설치하면 따로 pip 해줘야해요
$ pip install apache-airflow[s3]

$ (base) test@airflow:~/airflow:]$ pip freeze | grep s3
s3transfer==0.5.0

 


GIT-SYNC : secret, ssh

DAG 파일을 scheduler/webserver/worker에 공유할 때 설정

(k8s에서는 실제 동작은 scheduler pod 안의 scheduler container에서 함)

 

helm values.yaml 예시

git ssh 연동할 때 UI에서 복사하면
git@1**.**.1**.**:tedkim/airflow-dags.git 이렇게 나올 텐데 : 이 아니라 / 에요.

# Git sync
dags:
  (...)
  gitSync:
    enabled: true #false
    repo: ssh://git@1**.**.1**.**/tedkim/airflow-dags.git
    branch: main 
    rev: HEAD
    depth: 1
    # the number of consecutive failures allowed before aborting
    maxFailures: 0
    # subpath within the repo where dags are located
    # should be "" if dags are at repo root
    subPath: dags/ #"tests/dags" 예시는 쌍따옴표 있는데, 쌍따옴표 넣으면 에러남
    sshKeySecret: airflow-ssh-git-secret  # 위에서 만들었던 secret
    wait: 10 

kubectl get secret

gitlab에 등록한 private key를 secret에 넣는 작업이에요.

git 인증서를 kubernetes secret에 저장하고, values.yaml에서 호출

$ kubectl create secret generic \
    airflow-ssh-git-secret \ #
    --from-file=gitSshKey=/home/tedkim/gitlab_key/id_ed25519 \
    -n airflow-test

$ k get secret airflow-ssh-git-secret -n airflow-test
NAME                     TYPE     DATA   AGE
airflow-ssh-git-secret   Opaque   1      8d

 

 

 


HELM : values.yaml

helm을 이용한 설치  진행함 관리하기 편하고, 직관적임

helm 명령어

helm upgrade --install airflow apache-airflow/airflow -n airflow-webinar -f values.yaml --debug

최종 helm values.yaml


defaultAirflowRepository: priavate.repo/library/apache/airflow 
(...)
executor: "KubernetesExecutor" #LocalExecutor
(...)
env:
  - name: "AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY"
    value: "private.repo/library/apache/airflow"
  - name: "AIRFLOW__KUBERNETES__WORKER_CONATINER_TAG"
    value: "2.1.4"
  - name: "AIRFLOW__KUBERNETES__RUN_AS_USER"
    value: "50000"
  - name: "AIRFLOW__KUBERNETES__DAGS_IN_IMAGE"
    value: "True"

  - name: "AIRFLOW__CORE__REMOTE_LOGGING"
    value: True
  - name: "AIRFLOW__CORE__REMOTE_LOG_CONN_ID"
    value: "MyS3Conn"
  - name: "AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER"
    value:  "s3://airflow-logs/temp"

(...)

# Git sync

dags:
  (...)
  gitSync:
    enabled: true #false
    repo: ssh://git@1**.**.1**.**/tedkim/airflow-dags.git
    branch: main 
    rev: HEAD
    depth: 1
    # the number of consecutive failures allowed before aborting
    maxFailures: 0
    # subpath within the repo where dags are located
    # should be "" if dags are at repo root
    subPath: dags/ #"tests/dags" 예시는 쌍따옴표 있는데, 쌍따옴표 넣으면 에러남
    sshKeySecret: airflow-ssh-git-secret  # 위에서 만들었던 secret
    wait: 10 

 

참고

https://tech.socarcorp.kr/data/2021/06/01/data-engineering-with-airflow.html

 

쏘카 데이터 그룹 - Airflow와 함께한 데이터 환경 구축기(feat. Airflow on Kubernetes)

안녕하세요. 데이터 엔지니어링팀의 하디입니다. 이번 글에서는 쏘카 데이터 그룹의 태동기(2018년)부터 현재(2021년)까지 어떻게 Airflow를 구축하고 운영했는지를 소개합니다. 특히 최근에 쏘카

tech.socarcorp.kr

https://bomwo.cc/posts/kubernetespodoperator/

 

Airflow에서 KubernetesPodOperator 사용하기 - 데이터와 개발을 좋아하는 사람

들어가며 Airflow 2.0의 출시가 임박하게 되면서(현재는 2.0 베타가 나온 상태) Airflow를 Kubernetes 환경에서 사용하기 위해 KubernetesExecutor, helm chart의 변경 등 배포 및 운영을 좀 더 원활히 할 수 있는

bomwo.cc

 - https://engineering.linecorp.com/ko/blog/data-engineering-with-airflow-k8s-2/

 

Kubernetes를 이용한 효율적인 데이터 엔지니어링(Airflow on Kubernetes VS Airflow Kubernetes Executor) - 2 - LINE

안녕하세요. LINE Financial Data Platform을 운영하고 개발하고 있는 이웅규입니다. 이 글은 지난 NAVER DEVIEW 2020에서 발표했던 Kubernetes를 이용한 효율적인 데이터 엔지니어링 (Airflow on Kubernetes VS Airflow Ku

engineering.linecorp.com

- https://medium.com/daria-blog/airflow%EB%A5%BC-%ED%99%9C%EC%9A%A9%ED%95%9C-mlops-%EA%B5%AC%EC%84%B1%EB%B0%A9%EB%B2%95-a4fc9aa0d3ba

 

Airflow를 활용한 MLOps 구성방법

"MLOps란 무엇일까?" 의 글에서 설명했듯이 MLOps 는 사업적 성과를 만들어내기 위해, 사용자에게 예측 서비스를 빠르게 전달하는 개발 문화라고 정의했습니다.

medium.com

 

728x90
반응형