최근에 airflow를 공부하고 있어요ㅎ
![](https://blog.kakaocdn.net/dn/biOsVQ/btrwn4pUKCS/kgkdNP8r8KuLkdxEr1Y7o0/img.png)
- 사진 편집
-
-
작게문서 너비옆트임
-
- 삭제
사진 설명을 입력하세요.
그냥 "배치 돌리는 어플리케이션" 으로만 알고 있던 제가
하나씩 알아가며 메모를 하는 포스팅입니다 :)
Airflow
'워크플로우 관리 플랫폼' 으로 최근에 핫하게 사용되고 있어요.
airbnb 에서 2016년에 만들어졌어요.
2020년에 2.0이 나오면서 더 강력해졌어요(이뻐짐)
장점
1. python : 가장 큰 장점, python으로 쉽게 접근이 가능 (데이터 분석가, 현업)
2. hello world 쉬움 : 한시간이면 만들 수 있어요
3. UI : 이쁘고 직관적이에요. 2.0으로 넘어오면서 엄청 이뻐졋어요
4. cloud 벤더 : cloud에서 운영하는 서비스를 쉽게 찾을 수 있어요
5. 파이프 라인 관리 : worker, queue로 작업 분산 // 한 곳에서 배치를 관리해서 너무 좋음
단점
(세팅하는 엔지니어 관점에서의 단점 ㅋㅋ)
1. 좀 깊이 들어가면 레퍼런스가 잘 없음
- 문서가 hello world에만 친절한 느낌
- sparkKubernetesOperator, KubernetesPodOperator 등
2. 쉽다보니까, 사용자들이 resource를 많이 줌 ㅜ
- 이건 추후 저희가 해결해야되는 부분이에요
쓰고 보니 딱히 단점이 없네요 ㅋㅋ
테스트 환경
- airflow 2.1.2
- celery worker
- centos 7
- pyenv -> 3.8.9
내가 생각한 유용했던 기능
1. variable : 비밀번호 같은 변수를 따로 관리해서 좋음
2. xcom : task 끼리 짧은 변수를 넘겨줘서 좋음
3. connection : UI에서 선택하고 입력해서 관리가 편함 (s3, kubernetes Cluster Connection 등)
경험했던 시행착오
celery 의 개념을 이해하는게 힘들었음
native 환경에서 airflow 여러개를 띄우고 worker를 테스트함
- airflow가 한개 떠있고, celery를 따로 설치하는 것으로 이해해서 힘들었음
- airflow 안에 내장되어 있음
- 명령어 : airflow celery worker
아래 그림에 celery worker 도 airflow 이미지를 사용한다고 생각하면 됨
그래서 celery 를 설치하고 worker 와 airflow 연결하는데 설명된 곳이 없어서 혼란스러웟음 ㅜ
python 이용해서 설치할 때는 아래처럼 필요한 플러그인들을 추가해주면 됨
celery worker용 linux
$ pip install "apache-airflow[celery,mysql,redis,crypto]==2.1.2" \ --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.1.2/constraints-3.8.txt" # 예시 $ airflow celery worker -H worker_1 -q queue_1 $ airflow celery -h usage: airflow celery [-h] COMMAND ... Start celery components. Works only when using CeleryExecutor. For more information, see https://airflow.apache.org/docs/apache-airflow/stable/executor/celery.html positional arguments: COMMAND flower Start a Celery Flower stop Stop the Celery worker gracefully worker Start a Celery worker node optional arguments: -h, --help show this help message and exit |
자세한 내용은 다른 글에서
https://mightytedkim.tistory.com/32?category=928713
Airflow) airflow+celery worker 구성하기
airflow는 airbnb에서 만든 workflow 관리 플랫폼이에요 - 배치 파일을 순차적으로 돌릴 수도 있고 - 쿠버네티스의 yml 파일을 전달하는데도 사용할 수 있어요 최근에 공부를 하게되면서 알게된 구조와
mightytedkim.tistory.com
2. sql_alchemy에 pymysql 을 적용하면, airflow-scheduler -D 가 안됨
sql_alchemy 연결할 때는 pymsql이 아닌 mysqldb 사용하기
상황
- sql_alchemy를 이용해서 airflow는 DB에 접근할 수 있음
- pymysql 사용하면, pool 관련 에러가 나옴
- known 버그라고 함
# 에러 환경 sql_alchemy_conn = mysql+pymysql://airflow:airflow@111.111.111.216:14381/airflow?charset=utf8 sql_alchemy_pool_enabled = True |
로그
OSError: [Errno 9] Bad file descriptor Exception ignored in: <function _ConnectionRecord.checkout.<locals>.<lambda> at 0x7f2eb025e268> Traceback (most recent call last): File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/pool/base.py", line 503, in <lambda> File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/pool/base.py", line 702, in _finalize_fairy File "/usr/lib/python3.6/logging/__init__.py", line 1337, in error File "/usr/lib/python3.6/logging/__init__.py", line 1444, in _log File "/usr/lib/python3.6/logging/__init__.py", line 1454, in handle File "/usr/lib/python3.6/logging/__init__.py", line 1516, in callHandlers File "/usr/lib/python3.6/logging/__init__.py", line 865, in handle File "/usr/lib/python3.6/logging/__init__.py", line 1071, in emit File "/usr/lib/python3.6/logging/__init__.py" , line 1061, in _open NameError: name 'open' is not definedOSError: [Errno 9] Bad file descriptor Exception ignored in: <function _ConnectionRecord.checkout.<locals>.<lambda> at 0x7f2eb025e268> Traceback (most recent call last): File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/pool/base.py", line 503, in <lambda> File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/pool/base.py", line 702, in _finalize_fairy File "/usr/lib/python3.6/logging/__init__.py", line 1337, in error File "/usr/lib/python3.6/logging/__init__.py", line 1444, in _log File "/usr/lib/python3.6/logging/__init__.py", line 1454, in handle File "/usr/lib/python3.6/logging/__init__.py", line 1516, in callHandlers File "/usr/lib/python3.6/logging/__init__.py", line 865, in handle File "/usr/lib/python3.6/logging/__init__.py", l ine 1071, in emit File "/usr/lib/python3.6/logging/__init__.py", line 1061, in _open NameError: name 'open' is not defined |
해결
- pool 관련 설정 오류라고 함
- pool 설정을 끔
- sql_alchemy_pool_enabled=FALSE 하면 잘됨
- 하지만 pool을 끄는건 아닌 것 같아서 mysqlconnector (mysql+mysqldb)를 사용했음
# 되는 예시 sql_alchemy_conn = mysql+mysqldb://airflow_celery:airflow_celery@111.111.111.216:14381/airflow_celery sql_alchemy_pool_enabled = True |
3. celery_taskmeta, Field 'id' doesn't have a default value
DDL 다시 만듦, localExecutor 테스트하다가 celeryExectuor 사용하다 꼬인듯
상황
- celeryExecutor를 테스트하려고 작업 중, mysql로 sqlAlchemy를 바꿈
- celery_taskmeta 의 id 값에 default value가 없어서 생기는 오류
- meta 데이터를 초기화해준다는 글을 발견한 걸로 봐서, 문제가 많이 생기는 테이블인듯
로그
Sqlalchemy.exc.IntegrityError: (MySQLdb._exceptions.IntegrityError) (1364, "Field 'id' doesn't have a default value") [SQL: INSERT INTO celery_taskmeta (task_id, `status`, result, date_done, traceback) VALUES (%s, %s, %s, %s, %s)] [parameters: ('7f4a503d-0eeb-4d02-80f6-64b2ac79b697', 'PENDING', None, datetime.datetime(2021, 10, 6, 7, 28, 17, 556083), None)] (Background on this error at: http://sqlalche.me/e/13/gkpj) |
해결
- field `id` doesn't have a default value -> 보통 db에서 not null 관련 오류
- 테이블 명으로 봐서 meta 데이터를 가지고 있는데, 그게 꼬인 것으로 추정
- ddl이 잘못 만들어졌다고 생각해서, database를 새로 만들어서 실행하니 됨
- 정확히 기억은 안나는데 'id' 쪽에 auto incremenet가 빠져있었던 것 같음
- local Excecutor로 db init 한 상태에서, celeryExecutor로 db init 해서 그런 것으로 추측
- 아래는 로그에서 긁은 DDL
-
[SQL: '\nCREATE TABLE celery_taskmeta (\n\tid INTEGER NOT NULL, \n\ttask_id VARCHAR(155), \n\tstatus VARCHAR(50), \n\tresult BYTEA, \n\tdate_done TIMESTAMP WITHOUT TIME ZONE, \n\ttraceback TEXT, \n\tPRIMARY KEY (id), \n\tUNIQUE (task_id)\n)\n\n'] (Background on this error at: http://sqlalche.me/e/gkpj)
4. dag_id 찾지 못해서 scheduler에서 에러 뱉음
airflow dags delete [dag_id]
상황
- dag_id 여러번 수정하다보니까, DAGS 가 업데이트가 안됨
- tail -f airflow-scheuler* 하면 에러 미친듯이 뱉음
- 확인해보니 dag_id를 찾지 못해서, dag는 삭제되었는데 db에서 있다고 생각하는 것 같음
로그
021-10-13 12:09:46,448 ERROR - DAG 'DEV_01_baseInfo_daily' not found in serialized_dag table Traceback (most recent call last): File "/home/manager/anaconda3/envs/airflow-3.8.9/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py", line 1668, in _update_dag_next_dagruns dag = self.dagbag.get_dag(dag_model.dag_id, session=session) File "/home/manager/anaconda3/envs/airflow-3.8.9/lib/python3.8/site-packages/airflow/utils/session.py", line 67, in wrapper return func(*args, **kwargs) File "/home/manager/anaconda3/envs/airflow-3.8.9/lib/python3.8/site-packages/airflow/models/dagbag.py", line 186, in get_dag self._add_dag_from_db(dag_id=dag_id, session=session) File "/home/manager/anaconda3/envs/airflow-3.8.9/lib/python3.8/site-packages/airflow/models/dagbag.py", line 258, in _add_dag_from_db raise SerializedDagNotFound(f"DAG '{dag_id}' not found in serialized_dag table") airflow.exceptions.SerializedDagNotFound: DAG 'DEV_01_baseInfo_daily' not found in serialized_dag table |
해결
- airflow dags delete [dag_id]
- 로그에 나온 dag_id 입력하면 다 지워짐
- db에 직접 update나 delete하는것은 위험해서 테스트 안해봄
더 공부해야할 부분
execution_date, start_date -> 하다보면 알게됨
pyenv로 시작했으니, systemctl에 등록할 방법 찾아야함 -> helm 으로 설치 (갓 헬름)
xcom -> 48kb 이하의 변수만 이동 가능
비밀번호 같은 값들은 어떻게 관리할지 -> variables, k8s 를 사용하면 secret
21.10.12 : celery worker를 이용하면 부하를 분산할 수 있음
https://mightytedkim.tistory.com/32?category=928713
Airflow) airflow+celery worker 구성하기
airflow는 airbnb에서 만든 workflow 관리 플랫폼이에요 - 배치 파일을 순차적으로 돌릴 수도 있고 - 쿠버네티스의 yml 파일을 전달하는데도 사용할 수 있어요 최근에 공부를 하게되면서 알게된 구조와
mightytedkim.tistory.com
21.11.01 : 하다보면 k8s 에 올리게 될거임.. Operatorer 들이 다양함. Helm으로 설치하는게 좋음
- KupernetesPodOperator를 helm으로 설정하기
https://mightytedkim.tistory.com/40?category=928713
Airflow) kuberentesPodOperator 설정_helm,logging,git-sync
airlow 1.10 부터 적용되고, airflow 2.0에서 본격적으로 사용되는 KPO 자료가 없어서 2주간의 삽질기 ㅎㅎ 아 진짜 쫄깃 쫄깃하고, 안되서 도중에 argo를 파야하나 생각하고 휴.. 요약 airflow는 airbnb에서
mightytedkim.tistory.com
참고 자료
- https://engineering.linecorp.com/ko/blog/data-engineering-with-airflow-k8s-1/
Kubernetes를 이용한 효율적인 데이터 엔지니어링(Airflow on Kubernetes VS Airflow Kubernetes Executor) - 1 - LINE
안녕하세요. LINE Financial Data Platform을 운영하고 개발하고 있는 이웅규입니다. 저는 지난 NAVER DEVIEW 2020에서 발표했던 Kubernetes를 이용한 효율적인 데이터 엔지니어링 (Airflow on Kubernetes VS Airflow Kubern
engineering.linecorp.com
버킷플레이스 Airflow 도입기 - 오늘의집 블로그
탁월한 데이터플랫폼을 위한 Airflow 도입기
www.bucketplace.co.kr
- https://zzsza.github.io/data/2018/01/04/airflow-1/
Apache Airflow - Workflow 관리 도구(1)
오늘은 Workflow Management Tool인 Apache Airflow 관련 포스팅을 하려고 합니다. 이 글은 1.10.3 버전에서 작성되었습니다 최초 작성은 2018년 1월 4일이지만, 2020년 2월 9일에 글을 리뉴얼했습니다 슬라이드
zzsza.github.io
Home
Platform created by the community to programmatically author, schedule and monitor workflows.
airflow.apache.org
'Data > Airflow' 카테고리의 다른 글
Kubernetes) k8s와 Airflow 이용한 spark작업_SparkKubernetesOperator (14) | 2021.11.25 |
---|---|
Airflow) K8S Pod 만들기_k8sPodOperator, helm (5) | 2021.11.12 |
Airflow) log를 minio에 저장_connection,yaml (0) | 2021.11.12 |
Airflow) Kubernetes에 올리기(yaml) (0) | 2021.10.28 |
Airflow) celery worker 환경 구성하기 (0) | 2021.10.12 |