'데이터 엔지니어'로 성장하기

정리하는 걸 좋아하고, 남이 읽으면 더 좋아함

Data/Airflow

Airflow) celery worker 환경 구성하기

MightyTedKim 2021. 10. 12. 08:35
728x90
반응형

airflow는 airbnb에서 만든 workflow 관리 플랫폼이에요

- 배치 파일을 순차적으로 돌릴 수도 있어요
- 이쁜 젠킨스

최근에 공부를 하게되면서 알게된 구조와 centos7에서 테스트한 예시를 메모하려고해요

airflow hello-world하면서 느낀점은 여기에 정리했어요 :)

Airflow) 공부 및 적용기(메모)

최근에 airflow를 공부하고 있어요. 원래는 kubernertes의 설정 파일을 던지는 친구로 알고 보고 있엇는데, 배치파일 돌리는 작업으로 세팅 중이에요 이제 며칠봐서 틀릴 수도 있지만 제가 공부한 내

mightytedkim.tistory.com

구성

airflow webserver

- airflow UI
- workflow 상태 표시하고 실행, 재시작, 수동 조작, 로그 확인

airflow scheduler

- 작업 기준이 충족되는지 여부를 확인
- 종속 작업이 성공적으로 완료되었고, 예약 간격이 주어지면 실행할 수 있는 작업인지, 실행 조건이 충족되는지 등
- 위 충족 여부가 DB에 기록되면, task들이 worker에게 선택되서 작업을 실행함

airflow celery worker

- 여러개의 worker로 작업
- default는 한개의 worker로 로컬에서 작업이 돌아감
- celery를 따로 설치하지 않고, airflow 설치 후 `airflow celery worker -H worker_name -q queue_name`으로 실행(따로 celery 설치하느라 고생함)

airflow celery flower

- celery UI
- worker 상태, queue 등 확인

구성 설명

가장 기본적인 구성은 아래와 같아요

node1 : scheduler -> airflow scheduler -D
node2 : worker1 -> airflow celery worker -q queue_1 -D
node3 : worker2 -> airflow celery worker -q queue_2 -D

그런데 관리하기 힘드니까 ui를 제공해요
저는 node1에다가 함께 설치했어요
> node1

airflow webserver : airflow webserver -D
airflow celery ui : airflow celery flower -D


그럼 이제 scheduler와 worker들이 어떻게 소통하는지?
> 그건 broker를 이용해요. rabbitMq 또는 redis를 사용할 수 있어요

$ broker_url = redis://111.111.111.214:16379/0

그러면 message que가 브로커 역할을 해줘요

그러면 또 worker들의 결과는 어디에 저장되는지?
> result backend 부분을 설정하면 결과가 저장되요

result_backend = db+mysql://airflow:airflow@111.111.111.216:14381/airflow?charset=utf8


마지막으로 저장된 결과를 조회하고, 실제 실행을 하는 airflow webserver는 어떻게 작동되냐?
> 파이썬의 db 툴킷인 sql_alchemy_conn을 사용해요. 저는 동일한 DB를 보도록 설정했어요

sql_alchemy_conn = mysql+mysqldb://airflow_celery:airflow_celery@111.111.111.216:14381/airflow_celery

install

pip download

> offline 상태일 때는 이렇게 명시적으로 contstraint를 주지 않으면 오류가 남
- airflow : 기본
- extra : celery(workers), mysql(webserber meta, backend_result), redis(broker), crypto(db encrypt)

$ pip download "apache-airflow[celery,mysql,redis,crypto]==2.1.2" \
--constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.1.2/constraints-3.8.txt"

pip install

mysql-devel 설치안되어있으면 에러남ㅜㅜ

$ sudo yum install mysql-devel

# 없으면 mysql 연결시 에러남
$ pip install --no-index -f ./ apache-airflow[celery,mysql,redis,crypto]==2.1.2

확인

$ airflow version
2.1.2

$ airflow celery -h
usage: airflow celery [-h] COMMAND ...
Start celery components. Works only when using CeleryExecutor.
For more information, see https://airflow.apache.org/docs/apache-airflow/stable/executor/celery.html positional arguments: COMMAND flower Start a Celery Flower stop Stop the Celery worker gracefully worker Start a Celery worker node optional arguments:
-h, --help show this help message and exit

h1.mysql DB 설정

my.cnf

explicit_defaults_for_timestamp=1
CREATE DATABASE airflow_celery CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE USER 'airflow_celery' IDENTIFIED BY 'airflow_celery';
GRANT ALL PRIVILEGES ON airflow_celery.* TO 'airflow_celery';
ALTER DATABASE `airflow_celery` CHARACTER SET utf8;

airflow 설정

user 밑의 airflow에 폴더가 생김
$ cd /home/manager/airflow
$ mkdir logs dags
$ vi /home/manager/airflow/airflow.cfg
[core]
dags_folder = /home/manager/airflow/dags
# celery 사용하기 위해 설정
executor = CeleryExecutor # webserber metadata


## pymysql 사용하면, pool 관련 에러가 나옴, 버그라고 함
# 출처: https://stackoverflow.com/questions/61035608/airflow-scheduler-works-normally-fails-with-
#sql_alchemy_conn = mysql+pymysql://airflow:airflow@111.111.111.216:14381/airflow?charset=utf8 #sql_alchemy_pool_enabled = FALSE
# pymysql 대신 mysqlclient 사용
sql_alchemy_conn = mysql+mysqldb://airflow_celery:airflow_celery@111.111.111.216:14381/airflow_celery sql_alchemy_pool_enabled = True


[logging]
base_log_folder = /home/manager/airflow/logs


[webserver]
# port 수정
base_url = localhost:14380
web_server_port = 14380


[celery]
# queue, rabbitMq도 가능
broker_url = redis://111.111.111.214:16379/0
# worker 결과
result_backend = db+mysql://airflow:airflow@111.111.111.216:14381/airflow?charset=utf8
# celery UI flower_port = 14384

run

init

$ airflow db init
$ airflow users create \
--firstname admin \
--lastname admin \
--email admin \
--password admin \
--username admin \
--role Admin

webserver, scheduler

$ airflow webserver -D
$ airflow scheduler -D

celery flower, worker

$ airflow celery flower -D
$ airflow celery worker -H worker_bash -q worker_bash -D
$airflow celery worker -H worker_celery -q worker_celery -D

stop

pid를 기준으로 kill 해야함
airflow celery stop은 있지만, airflow stop은 없음

kill `cat /home/manager/airflow/airflow-webserver.pid`
kill `cat /home/manager/airflow/airflow-scheduler.pid`
kill `cat /home/manager/airflow/airflow-flower.pid`
kill `cat /home/manager/airflow/airflow-worker.pid`


이상으로 airflow celery worker를 이용한 hello world 였습니다.


+ 실제 운영은 native보다는 k8s에서 많이 하기 때문에
아래 글도 함께 보는걸 추천합니다.

https://mightytedkim.tistory.com/40

Airflow) kuberentesPodOperator 설정_helm,logging,git-sync

airlow 1.10 부터 적용되고, airflow 2.0에서 본격적으로 사용되는 KPO 자료가 없어서 2주간의 삽질기 ㅎㅎ 아 진짜 쫄깃 쫄깃하고, 안되서 도중에 argo를 파야하나 생각하고 휴.. 요약 airflow는 airbnb에서

mightytedkim.tistory.com

https://mightytedkim.tistory.com/53?category=922753

Kubernetes)Airflow설치

버전 : airflow-1.2.0.tgz 링크 : https://airflow.apache.org/docs/helm-chart/stable/index.html 결과 $k get all -n airflow NAME READY STATUS RESTARTS AGE pod/airflow-postgresql-0 1/1 Running 0 28d pod..

mightytedkim.tistory.com

728x90
반응형