Slipp) Airflow2.0 스터디_1주차(사용 경험썰풀기)
airflow2.0을 살펴보는 스터디 시작
2달간 책의 필요한 부분을 빠르게 훑어보는 것이 목표!
스터디명 : 'Airflow 2.0 익숙해지고 상황별 사례 공유하기'
책 :Apcache Airflow 기반의 데이터 파이프라인 (에어플로 중심의 워크폴로 구축에서 커스텀 텀포넌트 개발 및 배포,관리까지)
목표: 책 완독하기!
얼마전 한글 번역본이 나와서 책을 구매했는데 내용이 괜찮았어요
그런제 혼자서는 끝까지 보지 않을 것 같아서 스터디원들을 모았어요.
마침 slipp도 스터디 주제 제안 기간이고, 우아한 스터디와 콜라보도 진행하길래 질렀죠.ㅎㅎ
https://puffy-stick-fa1.notion.site/SLiPP-X-Airflow-2-0-c86f6713faff446da4d30bd420dc02fe
MLOps 페북에도 공유될만큼 생각보다 일이 커졌더라고요
OT
불안한 마음을 뒤로하고, OT에 참석했어요
OT 때 이야기해보니, 다행히 스터디원 모두 같은 목표를 가지고 있더라고요
- 데이터 파이프라인 고도화
- 단순 사용을 넘어 프로세스 이해
5명의 스터디원 중 4명이 airflow에 경험이 있기 때문에
빠르게 책을 훑어보는 것을 목표로 진행 방식을 구체화했어요
진행방식
책의 목차를 보고, 관심있는 주제가 무엇인지 물어보니 아래 키워드들이 나왔어요
- trigger, xcom, operator, taskflowapi, 의존성, 테스트
2달 동안 480쪽의 책을 다 보는 것은 힘들기 때문에
각 챕터에서 공유하고 싶은 예제들을 발표하는 식으로 진행하기로 정했습니다.
구체적으로는 EC2에 airflow를 올려놓고, 함께 실습하는 방식을 선택했습니다.
예시 코드 : https://github.com/K9Ns/data-pipelines-with-apache-airflow
1장 airflow 살펴보기
개념 및 다른 툴과 비교하는 부분이 좋았어요
webserver, scheduler, worker 등의 구성 요소에 대해서 설명해요
실시간성 잡에는 적합하지 않지만
배치성, 증분, backfill이 필요한 잡에는 적합하다고 강조해요
dummyoperator로 간단한 예시를 보여주며 넘어가요
스터디하다가 나온 좋은 이야기
- 한마디로 -> airflow 는 젠킨스 이쁜 버젼
- 장점 -> python 코드라서 개발자 접근성이 좋고, 이쁨
- dag 파일은 어떻게 싱크되는지? -> 싱크 데몬을 띄움
- 회사서 어떻게 쓰고 있는지?
- B사: customOperator를 데이터 팀에서 제공
- E사: python,sparkKubernetesOperator, sensor 를 k8s 환경에서
- C사: scalaOperator로 scala 실행
- dummyOperator -> 구조 잡기, 코드 단순화, 시작/종료 표시
- 다행히 dag 파일 싱크는 제가 예시 준비하면서 테스트한게 있어서, 아래 코드로 설명했어요
# build: .
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
AIRFLOW__CORE__DAGS_FOLDER: /opt/airflow/dags/git/slipp #############slipp, default : {AIRFLOW_HOME}/dags
AIRFLOW__CORE__LOAD_EXAMPLES: 'false' #############slipp, default : true
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
# 추가
gitsync:
# <<: *airflow-common
image: k8s.gcr.io/git-sync/git-sync:v3.2.2
container_name: dags-sync
environment:
GIT_SYNC_USERNAME: ############
GIT_SYNC_PASSWORD: ############
GIT_SYNC_REPO: https://github.com/MightyTedKim/airflow-dag-slippstudy.git
GIT_SYNC_DEST: slipp
GIT_SYNC_BRANCH: main
GIT_SYNC_WAIT: 60
volumes:
- ./dags:/tmp:rw
restart: always
---
# 결과
[ec2-user@ip-172-31-39-60 airflow]$ tree
.
├── dags
│ └── git
│ ├── slipp -> rev-f0a85f25a2f6e601fcbae6d335f76b1bd3a7d3fd
│ └── rev-f0a85f25a2f6e601fcbae6d335f76b1bd3a7d3fd
│ ├── LICENSE
│ ├── README.md
│ └── test.py
2장 DAG 구조
UI 설명하는 부분이 많아서, 화면 띄워놓고 어떤 기능이 있는지 서로 이야기를 하며 진행했어요
2장도 python, bashoperator 가 다여서 빠르게 넘어갔어요.
UI화면을 공유하면서 어떤 기능이 있는지 설명했어요
- 재실행하는 법 clear
- 초기 좌절 포인트 -> start_dt, scheduler_interval
- connection -> kubeConn, S3Conn
- role 관리하는 법 -> 팀별로 부여하고 있다고함
- pool 사용하는법 -> slot1으로 항상 병렬로 되는 잡 조절할 수도 있음
- xcom 변수가 저장되는 형식 -> xcom 저장을 많이 못하고, db별로 차이가남 + 남발하면 db 죽을수도 있음
- 2.0부터 autorefresh 가 생김 -> 너무 좋음
- UTC로 되어있는데 default를 바꿀수도 있음
등등 화면보면서 썰을 풀었어요.
아래 내용들도 중요한데, 썰풀다가 넘어가버렸어요
- >> (연산자)
- python_callable 에 함수 연결
- 총 1시간 20분이 걸렸어요
첫날에 무리해서 1,2장을 함께 본 건 잘했다고 생각해요.
이제 갈길이 멀기 때문에 시간을 아껴야하기 때문이죠
다음인 3장 부터는 스케줄링으로 처음에 모두를 괴롭히는 날짜가 나오는데
복습하는 차원에서 기대가 됩니다 :)
아래는 slipp에 정리한 스터디 기록입니다.
1주차: https://www.slipp.net/wiki/pages/viewpage.action?pageId=3276807864429