Airflow) Hello world와 시행착오_celery worker,버그
최근에 airflow를 공부하고 있어요ㅎ
- 사진 편집
-
-
작게문서 너비옆트임
-
- 삭제
사진 설명을 입력하세요.
그냥 "배치 돌리는 어플리케이션" 으로만 알고 있던 제가
하나씩 알아가며 메모를 하는 포스팅입니다 :)
Airflow
'워크플로우 관리 플랫폼' 으로 최근에 핫하게 사용되고 있어요.
airbnb 에서 2016년에 만들어졌어요.
2020년에 2.0이 나오면서 더 강력해졌어요(이뻐짐)
장점
1. python : 가장 큰 장점, python으로 쉽게 접근이 가능 (데이터 분석가, 현업)
2. hello world 쉬움 : 한시간이면 만들 수 있어요
3. UI : 이쁘고 직관적이에요. 2.0으로 넘어오면서 엄청 이뻐졋어요
4. cloud 벤더 : cloud에서 운영하는 서비스를 쉽게 찾을 수 있어요
5. 파이프 라인 관리 : worker, queue로 작업 분산 // 한 곳에서 배치를 관리해서 너무 좋음
단점
(세팅하는 엔지니어 관점에서의 단점 ㅋㅋ)
1. 좀 깊이 들어가면 레퍼런스가 잘 없음
- 문서가 hello world에만 친절한 느낌
- sparkKubernetesOperator, KubernetesPodOperator 등
2. 쉽다보니까, 사용자들이 resource를 많이 줌 ㅜ
- 이건 추후 저희가 해결해야되는 부분이에요
쓰고 보니 딱히 단점이 없네요 ㅋㅋ
테스트 환경
- airflow 2.1.2
- celery worker
- centos 7
- pyenv -> 3.8.9
내가 생각한 유용했던 기능
1. variable : 비밀번호 같은 변수를 따로 관리해서 좋음
2. xcom : task 끼리 짧은 변수를 넘겨줘서 좋음
3. connection : UI에서 선택하고 입력해서 관리가 편함 (s3, kubernetes Cluster Connection 등)
경험했던 시행착오
celery 의 개념을 이해하는게 힘들었음
native 환경에서 airflow 여러개를 띄우고 worker를 테스트함
- airflow가 한개 떠있고, celery를 따로 설치하는 것으로 이해해서 힘들었음
- airflow 안에 내장되어 있음
- 명령어 : airflow celery worker
아래 그림에 celery worker 도 airflow 이미지를 사용한다고 생각하면 됨
그래서 celery 를 설치하고 worker 와 airflow 연결하는데 설명된 곳이 없어서 혼란스러웟음 ㅜ
python 이용해서 설치할 때는 아래처럼 필요한 플러그인들을 추가해주면 됨
celery worker용 linux
$ pip install "apache-airflow[celery,mysql,redis,crypto]==2.1.2" \ --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.1.2/constraints-3.8.txt" # 예시 $ airflow celery worker -H worker_1 -q queue_1 $ airflow celery -h usage: airflow celery [-h] COMMAND ... Start celery components. Works only when using CeleryExecutor. For more information, see https://airflow.apache.org/docs/apache-airflow/stable/executor/celery.html positional arguments: COMMAND flower Start a Celery Flower stop Stop the Celery worker gracefully worker Start a Celery worker node optional arguments: -h, --help show this help message and exit |
자세한 내용은 다른 글에서
https://mightytedkim.tistory.com/32?category=928713
2. sql_alchemy에 pymysql 을 적용하면, airflow-scheduler -D 가 안됨
sql_alchemy 연결할 때는 pymsql이 아닌 mysqldb 사용하기
상황
- sql_alchemy를 이용해서 airflow는 DB에 접근할 수 있음
- pymysql 사용하면, pool 관련 에러가 나옴
- known 버그라고 함
# 에러 환경 sql_alchemy_conn = mysql+pymysql://airflow:airflow@111.111.111.216:14381/airflow?charset=utf8 sql_alchemy_pool_enabled = True |
로그
OSError: [Errno 9] Bad file descriptor Exception ignored in: <function _ConnectionRecord.checkout.<locals>.<lambda> at 0x7f2eb025e268> Traceback (most recent call last): File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/pool/base.py", line 503, in <lambda> File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/pool/base.py", line 702, in _finalize_fairy File "/usr/lib/python3.6/logging/__init__.py", line 1337, in error File "/usr/lib/python3.6/logging/__init__.py", line 1444, in _log File "/usr/lib/python3.6/logging/__init__.py", line 1454, in handle File "/usr/lib/python3.6/logging/__init__.py", line 1516, in callHandlers File "/usr/lib/python3.6/logging/__init__.py", line 865, in handle File "/usr/lib/python3.6/logging/__init__.py", line 1071, in emit File "/usr/lib/python3.6/logging/__init__.py" , line 1061, in _open NameError: name 'open' is not definedOSError: [Errno 9] Bad file descriptor Exception ignored in: <function _ConnectionRecord.checkout.<locals>.<lambda> at 0x7f2eb025e268> Traceback (most recent call last): File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/pool/base.py", line 503, in <lambda> File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/pool/base.py", line 702, in _finalize_fairy File "/usr/lib/python3.6/logging/__init__.py", line 1337, in error File "/usr/lib/python3.6/logging/__init__.py", line 1444, in _log File "/usr/lib/python3.6/logging/__init__.py", line 1454, in handle File "/usr/lib/python3.6/logging/__init__.py", line 1516, in callHandlers File "/usr/lib/python3.6/logging/__init__.py", line 865, in handle File "/usr/lib/python3.6/logging/__init__.py", l ine 1071, in emit File "/usr/lib/python3.6/logging/__init__.py", line 1061, in _open NameError: name 'open' is not defined |
해결
- pool 관련 설정 오류라고 함
- pool 설정을 끔
- sql_alchemy_pool_enabled=FALSE 하면 잘됨
- 하지만 pool을 끄는건 아닌 것 같아서 mysqlconnector (mysql+mysqldb)를 사용했음
# 되는 예시 sql_alchemy_conn = mysql+mysqldb://airflow_celery:airflow_celery@111.111.111.216:14381/airflow_celery sql_alchemy_pool_enabled = True |
3. celery_taskmeta, Field 'id' doesn't have a default value
DDL 다시 만듦, localExecutor 테스트하다가 celeryExectuor 사용하다 꼬인듯
상황
- celeryExecutor를 테스트하려고 작업 중, mysql로 sqlAlchemy를 바꿈
- celery_taskmeta 의 id 값에 default value가 없어서 생기는 오류
- meta 데이터를 초기화해준다는 글을 발견한 걸로 봐서, 문제가 많이 생기는 테이블인듯
로그
Sqlalchemy.exc.IntegrityError: (MySQLdb._exceptions.IntegrityError) (1364, "Field 'id' doesn't have a default value") [SQL: INSERT INTO celery_taskmeta (task_id, `status`, result, date_done, traceback) VALUES (%s, %s, %s, %s, %s)] [parameters: ('7f4a503d-0eeb-4d02-80f6-64b2ac79b697', 'PENDING', None, datetime.datetime(2021, 10, 6, 7, 28, 17, 556083), None)] (Background on this error at: http://sqlalche.me/e/13/gkpj) |
해결
- field `id` doesn't have a default value -> 보통 db에서 not null 관련 오류
- 테이블 명으로 봐서 meta 데이터를 가지고 있는데, 그게 꼬인 것으로 추정
- ddl이 잘못 만들어졌다고 생각해서, database를 새로 만들어서 실행하니 됨
- 정확히 기억은 안나는데 'id' 쪽에 auto incremenet가 빠져있었던 것 같음
- local Excecutor로 db init 한 상태에서, celeryExecutor로 db init 해서 그런 것으로 추측
- 아래는 로그에서 긁은 DDL
-
[SQL: '\nCREATE TABLE celery_taskmeta (\n\tid INTEGER NOT NULL, \n\ttask_id VARCHAR(155), \n\tstatus VARCHAR(50), \n\tresult BYTEA, \n\tdate_done TIMESTAMP WITHOUT TIME ZONE, \n\ttraceback TEXT, \n\tPRIMARY KEY (id), \n\tUNIQUE (task_id)\n)\n\n'] (Background on this error at: http://sqlalche.me/e/gkpj)
4. dag_id 찾지 못해서 scheduler에서 에러 뱉음
airflow dags delete [dag_id]
상황
- dag_id 여러번 수정하다보니까, DAGS 가 업데이트가 안됨
- tail -f airflow-scheuler* 하면 에러 미친듯이 뱉음
- 확인해보니 dag_id를 찾지 못해서, dag는 삭제되었는데 db에서 있다고 생각하는 것 같음
로그
021-10-13 12:09:46,448 ERROR - DAG 'DEV_01_baseInfo_daily' not found in serialized_dag table Traceback (most recent call last): File "/home/manager/anaconda3/envs/airflow-3.8.9/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py", line 1668, in _update_dag_next_dagruns dag = self.dagbag.get_dag(dag_model.dag_id, session=session) File "/home/manager/anaconda3/envs/airflow-3.8.9/lib/python3.8/site-packages/airflow/utils/session.py", line 67, in wrapper return func(*args, **kwargs) File "/home/manager/anaconda3/envs/airflow-3.8.9/lib/python3.8/site-packages/airflow/models/dagbag.py", line 186, in get_dag self._add_dag_from_db(dag_id=dag_id, session=session) File "/home/manager/anaconda3/envs/airflow-3.8.9/lib/python3.8/site-packages/airflow/models/dagbag.py", line 258, in _add_dag_from_db raise SerializedDagNotFound(f"DAG '{dag_id}' not found in serialized_dag table") airflow.exceptions.SerializedDagNotFound: DAG 'DEV_01_baseInfo_daily' not found in serialized_dag table |
해결
- airflow dags delete [dag_id]
- 로그에 나온 dag_id 입력하면 다 지워짐
- db에 직접 update나 delete하는것은 위험해서 테스트 안해봄
더 공부해야할 부분
execution_date, start_date -> 하다보면 알게됨
pyenv로 시작했으니, systemctl에 등록할 방법 찾아야함 -> helm 으로 설치 (갓 헬름)
xcom -> 48kb 이하의 변수만 이동 가능
비밀번호 같은 값들은 어떻게 관리할지 -> variables, k8s 를 사용하면 secret
21.10.12 : celery worker를 이용하면 부하를 분산할 수 있음
https://mightytedkim.tistory.com/32?category=928713
21.11.01 : 하다보면 k8s 에 올리게 될거임.. Operatorer 들이 다양함. Helm으로 설치하는게 좋음
- KupernetesPodOperator를 helm으로 설정하기
https://mightytedkim.tistory.com/40?category=928713
참고 자료
- https://engineering.linecorp.com/ko/blog/data-engineering-with-airflow-k8s-1/
- https://zzsza.github.io/data/2018/01/04/airflow-1/