'데이터 엔지니어'로 성장하기

정리하는 걸 좋아하고, 남이 읽으면 더 좋아함

Data/Airflow

Airflow) Hello world와 시행착오_celery worker,버그

MightyTedKim 2021. 10. 9. 20:58
728x90
반응형

최근에 airflow를 공부하고 있어요ㅎ 

 

 
대표사진 삭제
  • 사진 편집
  •  
  • 작게문서 너비옆트임
  •  
  • 삭제

사진 설명을 입력하세요.

 

그냥 "배치 돌리는 어플리케이션" 으로만 알고 있던 제가

하나씩 알아가며 메모를 하는 포스팅입니다 :)


Airflow

'워크플로우 관리 플랫폼' 으로 최근에 핫하게 사용되고 있어요.

airbnb 에서 2016년에 만들어졌어요.

2020년에 2.0이 나오면서 더 강력해졌어요(이뻐짐)

https://airflow.apache.org/docs/apache-airflow/stable/concepts.html

 

장점

1. python : 가장 큰 장점, python으로 쉽게 접근이 가능 (데이터 분석가, 현업)

2. hello world  쉬움 : 한시간이면 만들 수 있어요

3. UI : 이쁘고 직관적이에요. 2.0으로 넘어오면서 엄청 이뻐졋어요

4. cloud 벤더 : cloud에서 운영하는 서비스를 쉽게 찾을 수 있어요

5. 파이프 라인 관리 : worker, queue로 작업 분산 // 한 곳에서 배치를 관리해서 너무 좋음

단점

(세팅하는 엔지니어 관점에서의 단점 ㅋㅋ)

1. 좀 깊이 들어가면 레퍼런스가 잘 없음

 - 문서가 hello world에만 친절한 느낌

 - sparkKubernetesOperator, KubernetesPodOperator 등

2. 쉽다보니까, 사용자들이 resource를 많이 줌 ㅜ

 - 이건 추후 저희가 해결해야되는 부분이에요

 

쓰고 보니 딱히 단점이 없네요 ㅋㅋ

테스트 환경

  • airflow 2.1.2
  • celery worker
  • centos 7
  • pyenv -> 3.8.9

내가 생각한 유용했던 기능

1. variable : 비밀번호 같은 변수를 따로 관리해서 좋음

2. xcom : task 끼리 짧은 변수를 넘겨줘서 좋음

3. connection : UI에서 선택하고 입력해서 관리가 편함 (s3, kubernetes Cluster Connection 등)


경험했던 시행착오

celery 의 개념을 이해하는게 힘들었음

native 환경에서 airflow 여러개를 띄우고 worker를 테스트함
  1. airflow가 한개 떠있고, celery를 따로 설치하는 것으로 이해해서 힘들었음
  2. airflow 안에 내장되어 있음
    1. 명령어 : airflow celery worker

아래 그림에 celery worker 도 airflow 이미지를 사용한다고 생각하면 됨

https://engineering.linecorp.com/ko/blog/data-engineering-with-airflow-k8s-1/

그래서 celery 를 설치하고 worker 와 airflow 연결하는데 설명된 곳이 없어서 혼란스러웟음 ㅜ

 

python 이용해서 설치할 때는 아래처럼 필요한 플러그인들을 추가해주면 됨

 

celery worker용 linux

$ pip install "apache-airflow[celery,mysql,redis,crypto]==2.1.2" \
--constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.1.2/constraints-3.8.txt"

# 예시
$ airflow celery worker -H worker_1 -q queue_1 

$ airflow celery -h
usage: airflow celery [-h] COMMAND ...
Start celery components. Works only when using CeleryExecutor.
For more information, see https://airflow.apache.org/docs/apache-airflow/stable/executor/celery.html positional arguments: COMMAND flower Start a Celery Flower stop Stop the Celery worker gracefully worker Start a Celery worker node optional arguments:
-h, --help show this help message and exit

 

자세한 내용은 다른 글에서

https://mightytedkim.tistory.com/32?category=928713 

 

Airflow) airflow+celery worker 구성하기

airflow는 airbnb에서 만든 workflow 관리 플랫폼이에요 - 배치 파일을 순차적으로 돌릴 수도 있고 - 쿠버네티스의 yml 파일을 전달하는데도 사용할 수 있어요 최근에 공부를 하게되면서 알게된 구조와

mightytedkim.tistory.com


2. sql_alchemy에 pymysql 을 적용하면, airflow-scheduler -D 가 안됨

sql_alchemy 연결할 때는 pymsql이 아닌 mysqldb 사용하기

상황

  1. sql_alchemy를 이용해서 airflow는 DB에 접근할 수 있음
  2. pymysql 사용하면, pool 관련 에러가 나옴
  3. known 버그라고 함
    1. 출처 : https://stackoverflow.com/questions/61035608/airflow-scheduler-works-normally-fails-with-d 
# 에러 환경 
sql_alchemy_conn
 = mysql+pymysql://airflow:airflow@111.111.111.216:14381/airflow?charset=utf8

sql_alchemy_pool_enabled = True

로그

OSError: [Errno 9] Bad file descriptor Exception ignored in: 
<function _ConnectionRecord.checkout.<locals>.<lambda> at 0x7f2eb025e268> Traceback (most recent call last): File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/pool/base.py",
line 503, in <lambda> File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/pool/base.py", line 702, in _finalize_fairy File "/usr/lib/python3.6/logging/__init__.py", line 1337, in error File "/usr/lib/python3.6/logging/__init__.py",
line 1444, in _log File "/usr/lib/python3.6/logging/__init__.py", line 1454, in handle File "/usr/lib/python3.6/logging/__init__.py",
line 1516, in callHandlers File "/usr/lib/python3.6/logging/__init__.py", line 865, in handle File "/usr/lib/python3.6/logging/__init__.py", line 1071, in emit File "/usr/lib/python3.6/logging/__init__.py"
,
line 1061, in _open NameError: name 'open' is not definedOSError: [Errno 9] Bad file descriptor Exception ignored in: <function _ConnectionRecord.checkout.<locals>.<lambda> at 0x7f2eb025e268> Traceback (most recent call last): File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/pool/base.py", line 503, in <lambda> File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/pool/base.py",

line 702, in _finalize_fairy File "/usr/lib/python3.6/logging/__init__.py",
line 1337, in error File "/usr/lib/python3.6/logging/__init__.py", line 1444, in _log File "/usr/lib/python3.6/logging/__init__.py",
line 1454, in handle File "/usr/lib/python3.6/logging/__init__.py", line 1516, in callHandlers File "/usr/lib/python3.6/logging/__init__.py",
line 865, in handle File "/usr/lib/python3.6/logging/__init__.py", l
ine 1071, in emit File "/usr/lib/python3.6/logging/__init__.py", line 1061, in _open NameError: name 'open' is not defined

해결

  1. pool 관련 설정 오류라고 함
  2. pool 설정을 끔
    1. sql_alchemy_pool_enabled=FALSE 하면 잘됨
  3. 하지만 pool을 끄는건 아닌 것 같아서 mysqlconnector (mysql+mysqldb)를 사용했음
# 되는 예시
sql_alchemy_conn
 = mysql+mysqldb://airflow_celery:airflow_celery@111.111.111.216:14381/airflow_celery

sql_alchemy_pool_enabled = True

 


3. celery_taskmeta, Field 'id' doesn't have a default value

DDL 다시 만듦, localExecutor 테스트하다가 celeryExectuor 사용하다 꼬인듯

상황

  1. celeryExecutor를 테스트하려고 작업 중, mysql로 sqlAlchemy를 바꿈 
  2. celery_taskmeta 의 id 값에 default value가 없어서 생기는 오류 
  3. meta 데이터를 초기화해준다는 글을 발견한 걸로 봐서, 문제가 많이 생기는 테이블인듯

로그

Sqlalchemy.exc.IntegrityError: (MySQLdb._exceptions.IntegrityError)
(1364, "Field 'id' doesn't have a default value")
[SQL: INSERT INTO celery_taskmeta (task_id, `status`, result, date_done, traceback) VALUES (%s, %s, %s, %s, %s)]
[parameters:
('7f4a503d-0eeb-4d02-80f6-64b2ac79b697', 'PENDING', None, datetime.datetime(2021, 10, 6, 7, 28, 17, 556083), None)] (Background on this error at: http://sqlalche.me/e/13/gkpj)

해결

  1. field `id` doesn't have a default value -> 보통 db에서 not null 관련 오류
  2. 테이블 명으로 봐서 meta 데이터를 가지고 있는데, 그게 꼬인 것으로 추정
  3. ddl이 잘못 만들어졌다고 생각해서, database를 새로 만들어서 실행하니 됨
    1. 정확히 기억은 안나는데  'id' 쪽에 auto incremenet가 빠져있었던 것 같음
    2. local Excecutor로  db init 한 상태에서, celeryExecutor로 db init  해서 그런 것으로 추측
    3. 아래는 로그에서 긁은 DDL
    4.  [SQL: '\nCREATE TABLE celery_taskmeta 
       (\n\tid INTEGER NOT NULL, 
       \n\ttask_id VARCHAR(155),
       \n\tstatus VARCHAR(50), 
       \n\tresult BYTEA, 
       \n\tdate_done TIMESTAMP WITHOUT TIME ZONE, 
       \n\ttraceback TEXT, 
       \n\tPRIMARY KEY (id), 
       \n\tUNIQUE (task_id)\n)\n\n']
       (Background on this error at: http://sqlalche.me/e/gkpj)

4. dag_id 찾지 못해서 scheduler에서 에러 뱉음

airflow dags delete [dag_id]

상황

  1. dag_id 여러번 수정하다보니까, DAGS 가 업데이트가 안됨
  2. tail -f airflow-scheuler* 하면 에러 미친듯이 뱉음
  3. 확인해보니 dag_id를 찾지 못해서, dag는 삭제되었는데 db에서 있다고 생각하는 것 같음

로그

021-10-13 12:09:46,448 ERROR - DAG 'DEV_01_baseInfo_daily' not found in serialized_dag table
Traceback (most recent call last):
  File "/home/manager/anaconda3/envs/airflow-3.8.9/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py", line 1668, in _update_dag_next_dagruns
    dag = self.dagbag.get_dag(dag_model.dag_id, session=session)
  File "/home/manager/anaconda3/envs/airflow-3.8.9/lib/python3.8/site-packages/airflow/utils/session.py", line 67, in wrapper
    return func(*args, **kwargs)
  File "/home/manager/anaconda3/envs/airflow-3.8.9/lib/python3.8/site-packages/airflow/models/dagbag.py", line 186, in get_dag
    self._add_dag_from_db(dag_id=dag_id, session=session)
  File "/home/manager/anaconda3/envs/airflow-3.8.9/lib/python3.8/site-packages/airflow/models/dagbag.py", line 258, in _add_dag_from_db
    raise SerializedDagNotFound(f"DAG '{dag_id}' not found in serialized_dag table")
airflow.exceptions.SerializedDagNotFound: DAG 'DEV_01_baseInfo_dailynot found in serialized_dag table

해결

  1. airflow dags delete [dag_id]
  2. 로그에 나온 dag_id 입력하면 다 지워짐
  3. db에 직접 update나 delete하는것은 위험해서 테스트 안해봄

더 공부해야할 부분

execution_date, start_date -> 하다보면 알게됨

pyenv로 시작했으니, systemctl에 등록할 방법 찾아야함 -> helm 으로 설치 (갓 헬름)

xcom ->  48kb 이하의 변수만 이동 가능

비밀번호 같은 값들은 어떻게 관리할지 -> variables, k8s 를 사용하면 secret

 

21.10.12 : celery worker를 이용하면 부하를 분산할 수 있음

https://mightytedkim.tistory.com/32?category=928713 

 

Airflow) airflow+celery worker 구성하기

airflow는 airbnb에서 만든 workflow 관리 플랫폼이에요 - 배치 파일을 순차적으로 돌릴 수도 있고 - 쿠버네티스의 yml 파일을 전달하는데도 사용할 수 있어요 최근에 공부를 하게되면서 알게된 구조와

mightytedkim.tistory.com

21.11.01 : 하다보면 k8s 에 올리게 될거임.. Operatorer 들이 다양함. Helm으로 설치하는게 좋음

- KupernetesPodOperator를 helm으로 설정하기

https://mightytedkim.tistory.com/40?category=928713

 

Airflow) kuberentesPodOperator 설정_helm,logging,git-sync

airlow 1.10 부터 적용되고, airflow 2.0에서 본격적으로 사용되는 KPO 자료가 없어서 2주간의 삽질기 ㅎㅎ 아 진짜 쫄깃 쫄깃하고, 안되서 도중에 argo를 파야하나 생각하고 휴.. 요약 airflow는 airbnb에서

mightytedkim.tistory.com

참고 자료

- https://engineering.linecorp.com/ko/blog/data-engineering-with-airflow-k8s-1/

 

Kubernetes를 이용한 효율적인 데이터 엔지니어링(Airflow on Kubernetes VS Airflow Kubernetes Executor) - 1 - LINE

안녕하세요. LINE Financial Data Platform을 운영하고 개발하고 있는 이웅규입니다. 저는 지난 NAVER DEVIEW 2020에서 발표했던 Kubernetes를 이용한 효율적인 데이터 엔지니어링 (Airflow on Kubernetes VS Airflow Kubern

engineering.linecorp.com

- https://www.bucketplace.co.kr/post/2021-04-13-%EB%B2%84%ED%82%B7%ED%94%8C%EB%A0%88%EC%9D%B4%EC%8A%A4-airflow-%EB%8F%84%EC%9E%85%EA%B8%B0/

 

버킷플레이스 Airflow 도입기 - 오늘의집 블로그

탁월한 데이터플랫폼을 위한 Airflow 도입기

www.bucketplace.co.kr

- https://zzsza.github.io/data/2018/01/04/airflow-1/

 

Apache Airflow - Workflow 관리 도구(1)

오늘은 Workflow Management Tool인 Apache Airflow 관련 포스팅을 하려고 합니다. 이 글은 1.10.3 버전에서 작성되었습니다 최초 작성은 2018년 1월 4일이지만, 2020년 2월 9일에 글을 리뉴얼했습니다 슬라이드

zzsza.github.io

 

- https://airflow.apache.org/

 

Home

Platform created by the community to programmatically author, schedule and monitor workflows.

airflow.apache.org

 

728x90
반응형