'데이터 엔지니어'로 성장하기

정리하는 걸 좋아하고, 남이 읽으면 더 좋아함

Data 50

Airflow) 'Custom Operator' 실무 적용하기_s3,hook

스터디를 통해 배운 내용을 실무에 정리한 글이에요 책의 8장 내용 custom operator를 참고했습니다. airflow를 운영하면서 가장 불안했던 부분은 '관리가 힘든 것' 이었어요 Python을 사용해서 개발자/분석가 접근성이 높은 것은 좋은데, 자유도가 너무 높아서 산으로 가더라고요. 예상 독자는 아래와 같아요 airflow 도입을 고민하는 분 dag 관리가 산으로 가는 분 확산을 해야 하는 분 요약 문제: PythonOperator는 관리가 되지 않고, 높은 자유도가 오히려 독이 되고 있음, 분석: 재사용 가능한 모듈을 적용해야하고, 모듈은 커밋 전 코드 리뷰가 필요 적용: Custom Operator를 적용함 결과: 문제 발생 부분이 일원화되고, 반복 코드가 감소함 설명 1. 문제 Pytho..

Data/Airflow 2022.08.01

Slipp) Airflow 2.0 스터디 후기_22기

airflow 스터디가 끝나서 후기를 작성해보려고 해요 (MBTI 극J인 조장을 만나서 고생하신 팀원들께 다시 감사합니다) 3개월 동안 진행했던 'Airflow 2.0' 을 통해 얻은 것을 정리했어요 스터디를 통해 저희가 무엇을 배웠는지를 이야기해보려 해요 이 포스팅의 독자는 아래와 같아요 airflow 스터디가 어떻게 진행되는지 궁금하신 분 책 내용이 궁금하지만 직접 보기는 망설여지시는 분들 구성 스터디 소개 스터디 회고 실무 적용 마무리 1. 스터디 소개 'slipp X 우아한 스터디'를 통해 스터디원을 모집했고, 어마어마한 선배님들이 신청을 해주셨어요. 주제: Airflow 2.0에 익숙해지고 상황별 사례 공유하기 목표: 배워서 실무에 적용해보기 산출물: 챕터별 스터디 내용 + Q&A 모음 자료: ..

Data/Airflow 2022.07.29

kafka) cmak 설치 (kafka manager)

kafka cluster를 설정하기 위해서, 이전 포스팅에서 zk와 kafka를 실행했어요 이제 관리를 위해 yahoo에서 만든 kafka manager를 설치해보려합니다. 예상 독자 kafka manager를 설치하려고 하는 분 미래의 나 요약 cmak binary 다운로드 설정 변경 시작/중지 설명 1. cmak binary 다운로드 wget https://github.com/yahoo/CMAK/releases/download/3.0.0.6/cmak-3.0.0.6.zip unzip cmak-3.0.0.6.zip 2. 설정 변경 # conf/application.conf 수정 kafka-manager.zkhosts="kafka1:2181,kafka2:2181,kafka3:2181" cmak.zkhost..

Data/Kafka 2022.07.28

kafka) kafka cluster 설정

이전 포스팅에서 zookeeper cluster 생성했고 kafka) zookeeper cluster 설정하기 kafka cluster를 만들기 위해서는 zookeeper cluster 생성이 필요해요 zookeeper를 걷어낸다고는 하지만 아직 한참 남은 것 같아요 예상 독자 zookeeper cluster 설정하려고 하는 분 미래의 나 요약 zookeepr 다운.. mightytedkim.tistory.com 이제 kafka cluster 세팅해야해요 예상 독자 kafka cluster 설정하려고 하는 분 미래의 나 요약 kafka다운로드 설정 변경 실행/중지 설명 1. Kafka 다운로드 다운로드 받아서 압축 풀기: https://archive.apache.org/dist/kafka/3.1.1/ka..

Data/Kafka 2022.07.28

kafka) zookeeper cluster 설정하기

kafka cluster를 만들기 위해서는 zookeeper cluster 생성이 필요해요 zookeeper를 걷어낸다고는 하지만 아직 한참 남은 것 같아요 예상 독자 zookeeper cluster 설정하려고 하는 분 미래의 나 요약 zookeepr 다운로드 설정 변경 실행/중지 설명 1. zookeepr 다운로드 다운로드 받아서 압축 풀기: https://zookeeper.apache.org/releases.html bin conf 폴더 안의 파일을 수정하고 zookeeper_data는 자동 생성 $ wget https://dlcdn.apache.org/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz $ ls /home/manager/apac..

Data/Kafka 2022.07.28

글또) 7기 다짐글(2022.05 ~ 2022.11)_airflow

벌써 3번째 다짐글이네요 이 포스팅의 독자는 아래와 같아요 글또 가입을 고민하시는 분 글또 7기를 같은 그룹 분들 중 제가 궁금하신 분 글또 바이럴에 도움이 되기를 바라면서 아래 3가지 내용을 준비했어요 글또를 통해 얻게된 좋은 습관 5,6기에 작성했던 글 7기에 작성하게될 글 1) 글또를 통해 얻게된 좋은 습관 글또 덕분에 여러 습관을 얻게 되었어요 주기적으로 글쓰는 습관 피드백 받을 걸 생각하고 글쓰는 습관 어떻게 효과적으로 피드백을 줄지 생각하는 습관 아직도 메모처럼 블로그에 글을 작성하고 있지만, 매 기수마다 발전하는 저를 볼 수 있어요. 그리고 어떤 글을 썼는지 목차를 보며 그 동안 어떤 기술에 제가 관심이 있엇는지 볼 수 있어 좋아요 2) 5,6기에 작성했던 글 21년에는 elk, k8s를 공..

Data/Airflow 2022.07.10

Spark) parquet file merge하기

parquet 파일이 너무 작은 조각으로 저장되서, 조회 속도에 문제가 생겼어요. 검색해보니 읽어서 원하는 크기로 저장하는 방법을 사용한다고 해요 요약 1. parquet 파일 너무 많아서 조회 속도 느림 2. repartition으로 원하는 사이즈로 저장함 설명 hdfs든, s3든 file이 많으면 속도가 느려지죠 그래서 file을 merge하는 방법을 사용해요 인터넷을 검색하면서 아래 코드처럼 repartition을 하는 방법을 찾았어요 def get_repartition_factor(dir_size): block_size = sc._jsc.hadoopConfiguration().get(“dfs.blocksize”) return math.ceil(dir_size/block_size) # returns..

Data/Spark 2022.07.09

Airflow) 'Task 분리' 실무 적용하기 _k8s/spark

2022.05~07, 3달동안 진행한 airflow 스터디를 끝내고 실무에 적용한 내용 일부를 정리햇어요 airflow를 운영하면서 가장 불편했던 부분은 'Task가 분리되지 않은 것' 이었어요 어디서 문제가 생겼는지 추적할 수도 없는게 불편했어요 데이터 흐름이 어떻게 되있는지 알 수 없었어요 멱등성, 원자성도 지켜지지 않았어요 예상 독자는 아래와 같아요 airflow 도입을 고민하는 분 airflow를 이제 막 사용하시는 분 airflow 관리가 어려운 운영 담당자 요약 문제: airflow에서 spark job이 간헐적으로 실패함 분석: task의 분리가 필요함 적용: task를 분리함 결과: 문제 발생 시점을 알 수 있고, 불필요한 작업을 줄일 수 있게됨 설명 1. 문제 airflow에서 spark..

Data/Airflow 2022.07.04

Airflow) custom operator 만들기_s3 Prefix copy

+ 요즘 docker Image에 다 몰려있는 로직을 task로 쪼개는 작업을 진행하고 있어요. 처음부터 airflow를 공부하고 적용했어야했는데, 과거의 내가 원망스럽네요. 요약 hook/operator 공부한김에 자주 사용할만한 기능을 operator로 하나 만들었어요 from custom.s3.operators.S3CopyFolderObjectOperator import S3CopyFolderObjectOperator with DAG( 'hook-S3CopyFolderObjectOperator', ) as dag: S3CopyFolderObjectOperator( task_id="check_ready", aws_conn_id="CephObjectConn", source_bucket_name="hgk..

Data/Airflow 2022.06.12

Airflow) pod external sigterm으로 죽는 현상_해결못해서 retries함

요약 상황 원인 조치 설명 상황 sparkK8sOperator 사용할때 sensor가 poking하다가 갑자기 죽음 결과를 보면 정상 실행되는데, sensor가 로그를 가져오지 못해서 에러로 표시됨 [2022-06-01 20:01:18,396] {spark_kubernetes.py:121} INFO - Spark application is still in state: RUNNING [2022-06-01 20:02:18,402] {spark_kubernetes.py:101} INFO - Poking: spark-test-20220531t193000-1 [2022-06-01 20:02:18,424] {spark_kubernetes.py:121} INFO - Spark application is still i..

Data/Airflow 2022.06.02