'데이터 엔지니어'로 성장하기

정리하는 걸 좋아하고, 남이 읽으면 더 좋아함

Data/Airflow 28

Airflow) Base image에 라이브러리 추가하기

Airflow에 여러 operator들이 있는데, python Operator에서도 특정 라이브러리들이 필요할 때가 있어요 예상 독자는 다음과 같아요. 1. K8S Airflow에서 추가 패키지들을 어떻게 관리하는지 궁금한 분 2. dependency 최소로 db connection을 맺고 싶으신 분 저 같은 경우, 3가지를 추가해요 1. 디버깅을 위한 apt install - 디버깅의 경우 pod 안에서 curl, vim, tcpdump 등이 필요해서 root로 설치를 해줫어요 2. jdbc connection을 위한 jar - dependency 관리 측면에서 득이되기 때문에, 추가해주고 있어요. 3. pip install - 마지막으로는 jdbc 사용에 필요한 jaydebeapi, pymysql -..

Data/Airflow 2023.04.23

airflow) dag clear 하지 않고 특정 시점부터 재시작하기

airflow 돌리다보면 다시 dag를 실행해야할 때가 있어요 dag clear가 멱등성 원칙을 지켜진다면 최고지만 그렇지 않거나, history를 남겨야할 때는 어떡할가요 cli에서 dag clear와 dag backfill 명령어로 실행할 수 잇어요 airflow tasks clear test-d-del-log-dag --start-date 2023-02-24 --end-date 2023-02-26 airflow dags backfill test-batch-d-del-log-dag --start-date 2023-02-24 --end-date 2023-02-26 첫번째 tasks clear는 해당 날짜 범위사이를 clear 해줘요. ui 상에서는 하얀색으로 변하게 되겠죠 그럼 그 날짜를 토대로 bac..

Data/Airflow 2023.02.27

Airflow) 'Custom Operator' 실무 적용하기_s3,hook

스터디를 통해 배운 내용을 실무에 정리한 글이에요 책의 8장 내용 custom operator를 참고했습니다. airflow를 운영하면서 가장 불안했던 부분은 '관리가 힘든 것' 이었어요 Python을 사용해서 개발자/분석가 접근성이 높은 것은 좋은데, 자유도가 너무 높아서 산으로 가더라고요. 예상 독자는 아래와 같아요 airflow 도입을 고민하는 분 dag 관리가 산으로 가는 분 확산을 해야 하는 분 요약 문제: PythonOperator는 관리가 되지 않고, 높은 자유도가 오히려 독이 되고 있음, 분석: 재사용 가능한 모듈을 적용해야하고, 모듈은 커밋 전 코드 리뷰가 필요 적용: Custom Operator를 적용함 결과: 문제 발생 부분이 일원화되고, 반복 코드가 감소함 설명 1. 문제 Pytho..

Data/Airflow 2022.08.01

Slipp) Airflow 2.0 스터디 후기_22기

airflow 스터디가 끝나서 후기를 작성해보려고 해요 (MBTI 극J인 조장을 만나서 고생하신 팀원들께 다시 감사합니다) 3개월 동안 진행했던 'Airflow 2.0' 을 통해 얻은 것을 정리했어요 스터디를 통해 저희가 무엇을 배웠는지를 이야기해보려 해요 이 포스팅의 독자는 아래와 같아요 airflow 스터디가 어떻게 진행되는지 궁금하신 분 책 내용이 궁금하지만 직접 보기는 망설여지시는 분들 구성 스터디 소개 스터디 회고 실무 적용 마무리 1. 스터디 소개 'slipp X 우아한 스터디'를 통해 스터디원을 모집했고, 어마어마한 선배님들이 신청을 해주셨어요. 주제: Airflow 2.0에 익숙해지고 상황별 사례 공유하기 목표: 배워서 실무에 적용해보기 산출물: 챕터별 스터디 내용 + Q&A 모음 자료: ..

Data/Airflow 2022.07.29

글또) 7기 다짐글(2022.05 ~ 2022.11)_airflow

벌써 3번째 다짐글이네요 이 포스팅의 독자는 아래와 같아요 글또 가입을 고민하시는 분 글또 7기를 같은 그룹 분들 중 제가 궁금하신 분 글또 바이럴에 도움이 되기를 바라면서 아래 3가지 내용을 준비했어요 글또를 통해 얻게된 좋은 습관 5,6기에 작성했던 글 7기에 작성하게될 글 1) 글또를 통해 얻게된 좋은 습관 글또 덕분에 여러 습관을 얻게 되었어요 주기적으로 글쓰는 습관 피드백 받을 걸 생각하고 글쓰는 습관 어떻게 효과적으로 피드백을 줄지 생각하는 습관 아직도 메모처럼 블로그에 글을 작성하고 있지만, 매 기수마다 발전하는 저를 볼 수 있어요. 그리고 어떤 글을 썼는지 목차를 보며 그 동안 어떤 기술에 제가 관심이 있엇는지 볼 수 있어 좋아요 2) 5,6기에 작성했던 글 21년에는 elk, k8s를 공..

Data/Airflow 2022.07.10

Airflow) 'Task 분리' 실무 적용하기 _k8s/spark

2022.05~07, 3달동안 진행한 airflow 스터디를 끝내고 실무에 적용한 내용 일부를 정리햇어요 airflow를 운영하면서 가장 불편했던 부분은 'Task가 분리되지 않은 것' 이었어요 어디서 문제가 생겼는지 추적할 수도 없는게 불편했어요 데이터 흐름이 어떻게 되있는지 알 수 없었어요 멱등성, 원자성도 지켜지지 않았어요 예상 독자는 아래와 같아요 airflow 도입을 고민하는 분 airflow를 이제 막 사용하시는 분 airflow 관리가 어려운 운영 담당자 요약 문제: airflow에서 spark job이 간헐적으로 실패함 분석: task의 분리가 필요함 적용: task를 분리함 결과: 문제 발생 시점을 알 수 있고, 불필요한 작업을 줄일 수 있게됨 설명 1. 문제 airflow에서 spark..

Data/Airflow 2022.07.04

Airflow) custom operator 만들기_s3 Prefix copy

+ 요즘 docker Image에 다 몰려있는 로직을 task로 쪼개는 작업을 진행하고 있어요. 처음부터 airflow를 공부하고 적용했어야했는데, 과거의 내가 원망스럽네요. 요약 hook/operator 공부한김에 자주 사용할만한 기능을 operator로 하나 만들었어요 from custom.s3.operators.S3CopyFolderObjectOperator import S3CopyFolderObjectOperator with DAG( 'hook-S3CopyFolderObjectOperator', ) as dag: S3CopyFolderObjectOperator( task_id="check_ready", aws_conn_id="CephObjectConn", source_bucket_name="hgk..

Data/Airflow 2022.06.12

Airflow) pod external sigterm으로 죽는 현상_해결못해서 retries함

요약 상황 원인 조치 설명 상황 sparkK8sOperator 사용할때 sensor가 poking하다가 갑자기 죽음 결과를 보면 정상 실행되는데, sensor가 로그를 가져오지 못해서 에러로 표시됨 [2022-06-01 20:01:18,396] {spark_kubernetes.py:121} INFO - Spark application is still in state: RUNNING [2022-06-01 20:02:18,402] {spark_kubernetes.py:101} INFO - Poking: spark-test-20220531t193000-1 [2022-06-01 20:02:18,424] {spark_kubernetes.py:121} INFO - Spark application is still i..

Data/Airflow 2022.06.02

Airflow) S3CopyObjectOperator 이용해서 copy 하기

s3 부분에서 에러가 간혹 발생되서, 핵심 로직과 분리하기로 결정 근데 이거는 파일 하나만 가능함. 폴더는 어떡하지 요약 상황 결과 참고 설명 상황 as-is python 코드로 모든 로직을 처리 단점 코드가 복잡해지고 관리가 되지 않음 image안에 코드가 들어가면 어디서 에러가 나는지 알 수 없음 갑자기 모든 것을 바꿀 수 없겠지만, 하나씩 바꿔가기로 함 to-be s3 copy,delete를 aws.operators를 이용하기로함 이유 핵심 로직과 사이드카 같은 로직 분리(복사/이동/삭제) s3 operator는 많이 사용하기 때문에 버그가 적을거라고 생각 결과 from datetime import datetime from airflow.models import DAG from airflow.ope..

Data/Airflow 2022.06.02

Slipp) Airflow2.0스터디_워크플로 트러거_4주차(6장)

이번 챕터는 평소 궁금하던 sensor에 대해서 공부할 수 있어서 재미있었어요. + TriggerDagRunOperator의 경우 복잡해서 사용안하려고 했는데, s3 이동 같은 작업은 클래스처럼 정의해서 사용할 수 있다는 이야기를 들어서 유레카였어요 요약 센서 : 특정 조건을 센서에 만족하도록 대기하기 트리거 : 서로 다른 DAG의 태스크간 의존성 설정하기 CLI/API : REST API 를 통해 워크 플로 실행하기 마무리 설명 1. 센서 : 특정 조건을 센서에 만족하도록 대기하기 로그를 당겨오거나, 작업이 언제 끝날지 기다리거나, hive 테이블에 파티션이 있는지 확인하는 작업에 활용 할 수 있어요 Sensor 특정 조건이 참인지 여부를 지속적으로 확인(polling)하는 특수 유형 오퍼레이터 필요한..

Data/Airflow 2022.05.28