Airflow) celery worker 환경 구성하기
airflow는 airbnb에서 만든 workflow 관리 플랫폼이에요
- 배치 파일을 순차적으로 돌릴 수도 있어요
- 이쁜 젠킨스
최근에 공부를 하게되면서 알게된 구조와 centos7에서 테스트한 예시를 메모하려고해요
airflow hello-world하면서 느낀점은 여기에 정리했어요 :)
구성
airflow webserver
- airflow UI
- workflow 상태 표시하고 실행, 재시작, 수동 조작, 로그 확인
airflow scheduler
- 작업 기준이 충족되는지 여부를 확인
- 종속 작업이 성공적으로 완료되었고, 예약 간격이 주어지면 실행할 수 있는 작업인지, 실행 조건이 충족되는지 등
- 위 충족 여부가 DB에 기록되면, task들이 worker에게 선택되서 작업을 실행함
airflow celery worker
- 여러개의 worker로 작업
- default는 한개의 worker로 로컬에서 작업이 돌아감
- celery를 따로 설치하지 않고, airflow 설치 후 `airflow celery worker -H worker_name -q queue_name`으로 실행(따로 celery 설치하느라 고생함)
airflow celery flower
- celery UI
- worker 상태, queue 등 확인
구성 설명
가장 기본적인 구성은 아래와 같아요
node1 : scheduler -> airflow scheduler -D node2 : worker1 -> airflow celery worker -q queue_1 -D node3 : worker2 -> airflow celery worker -q queue_2 -D |
그런데 관리하기 힘드니까 ui를 제공해요
저는 node1에다가 함께 설치했어요
> node1
airflow webserver : airflow webserver -D airflow celery ui : airflow celery flower -D |
그럼 이제 scheduler와 worker들이 어떻게 소통하는지?
> 그건 broker를 이용해요. rabbitMq 또는 redis를 사용할 수 있어요
$ broker_url = redis://111.111.111.214:16379/0 |
그러면 message que가 브로커 역할을 해줘요
그러면 또 worker들의 결과는 어디에 저장되는지?
> result backend 부분을 설정하면 결과가 저장되요
result_backend = db+mysql://airflow:airflow@111.111.111.216:14381/airflow?charset=utf8 |
마지막으로 저장된 결과를 조회하고, 실제 실행을 하는 airflow webserver는 어떻게 작동되냐?
> 파이썬의 db 툴킷인 sql_alchemy_conn을 사용해요. 저는 동일한 DB를 보도록 설정했어요
sql_alchemy_conn = mysql+mysqldb://airflow_celery:airflow_celery@111.111.111.216:14381/airflow_celery |
install
pip download
> offline 상태일 때는 이렇게 명시적으로 contstraint를 주지 않으면 오류가 남
- airflow : 기본
- extra : celery(workers), mysql(webserber meta, backend_result), redis(broker), crypto(db encrypt)
$ pip download "apache-airflow[celery,mysql,redis,crypto]==2.1.2" \ --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.1.2/constraints-3.8.txt" |
pip install
mysql-devel 설치안되어있으면 에러남ㅜㅜ
$ sudo yum install mysql-devel # 없으면 mysql 연결시 에러남 $ pip install --no-index -f ./ apache-airflow[celery,mysql,redis,crypto]==2.1.2 |
확인
$ airflow version 2.1.2 $ airflow celery -h usage: airflow celery [-h] COMMAND ... Start celery components. Works only when using CeleryExecutor. For more information, see https://airflow.apache.org/docs/apache-airflow/stable/executor/celery.html positional arguments: COMMAND flower Start a Celery Flower stop Stop the Celery worker gracefully worker Start a Celery worker node optional arguments: -h, --help show this help message and exit |
h1.mysql DB 설정
my.cnf
explicit_defaults_for_timestamp=1 CREATE DATABASE airflow_celery CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; CREATE USER 'airflow_celery' IDENTIFIED BY 'airflow_celery'; GRANT ALL PRIVILEGES ON airflow_celery.* TO 'airflow_celery'; ALTER DATABASE `airflow_celery` CHARACTER SET utf8; |
airflow 설정
user 밑의 airflow에 폴더가 생김 $ cd /home/manager/airflow $ mkdir logs dags $ vi /home/manager/airflow/airflow.cfg [core] dags_folder = /home/manager/airflow/dags # celery 사용하기 위해 설정 executor = CeleryExecutor # webserber metadata ## pymysql 사용하면, pool 관련 에러가 나옴, 버그라고 함 # 출처: https://stackoverflow.com/questions/61035608/airflow-scheduler-works-normally-fails-with- #sql_alchemy_conn = mysql+pymysql://airflow:airflow@111.111.111.216:14381/airflow?charset=utf8 #sql_alchemy_pool_enabled = FALSE # pymysql 대신 mysqlclient 사용 sql_alchemy_conn = mysql+mysqldb://airflow_celery:airflow_celery@111.111.111.216:14381/airflow_celery sql_alchemy_pool_enabled = True [logging] base_log_folder = /home/manager/airflow/logs [webserver] # port 수정 base_url = localhost:14380 web_server_port = 14380 [celery] # queue, rabbitMq도 가능 broker_url = redis://111.111.111.214:16379/0 # worker 결과 result_backend = db+mysql://airflow:airflow@111.111.111.216:14381/airflow?charset=utf8 # celery UI flower_port = 14384 |
run
init
$ airflow db init $ airflow users create \ --firstname admin \ --lastname admin \ --email admin \ --password admin \ --username admin \ --role Admin |
webserver, scheduler
$ airflow webserver -D $ airflow scheduler -D |
celery flower, worker
$ airflow celery flower -D $ airflow celery worker -H worker_bash -q worker_bash -D $airflow celery worker -H worker_celery -q worker_celery -D |
stop
pid를 기준으로 kill 해야함
airflow celery stop은 있지만, airflow stop은 없음
kill `cat /home/manager/airflow/airflow-webserver.pid` kill `cat /home/manager/airflow/airflow-scheduler.pid` kill `cat /home/manager/airflow/airflow-flower.pid` kill `cat /home/manager/airflow/airflow-worker.pid` |
이상으로 airflow celery worker를 이용한 hello world 였습니다.
+ 실제 운영은 native보다는 k8s에서 많이 하기 때문에
아래 글도 함께 보는걸 추천합니다.
https://mightytedkim.tistory.com/40
https://mightytedkim.tistory.com/53?category=922753