Data/Airflow

Airflow) 'Task 분리' 실무 적용하기 _k8s/spark

MightyTedKim 2022. 7. 4. 11:22
728x90
반응형

 

2022.05~07, 3달동안 진행한 airflow 스터디를 끝내고 실무에 적용한 내용 일부를 정리햇어요

우아한 네트워킹 데이 발표

airflow를 운영하면서 가장 불편했던 부분은 'Task가 분리되지 않은 것' 이었어요

  • 어디서 문제가 생겼는지 추적할 수도 없는게 불편했어요
  • 데이터 흐름이 어떻게 되있는지 알 수 없었어요
  • 멱등성, 원자성도 지켜지지 않았어요

 

예상 독자는 아래와 같아요

  1. airflow 도입을 고민하는 분
  2. airflow를 이제 막 사용하시는 분
  3. airflow 관리가 어려운 운영 담당자

요약

  1. 문제: airflow에서 spark job이 간헐적으로 실패함
  2. 분석: task의 분리가 필요함
  3. 적용: task를 분리함
  4. 결과: 문제 발생 시점을 알 수 있고, 불필요한 작업을 줄일 수 있게됨

설명

1. 문제

airflow에서 spark job이 간헐적으로 실패했는데, 단순 s3 이동 작업 때문이었어요

k8s 에서 spark job을 실행하는데, spark 외의 로직들이 오류를 간헐적으로 일으키고 있었어요

  • spark job이 실행되기 전에 실행되는 validation, backup, cleanse 로직이 문제

단일 이미지에 너무 많은 로직이 들어있는 문제

데이터를 cleanse할 때, file을 이동하는 로직이 있어요

 "object_storage_fs.mv(source, target, recursive=True)"

Traceback (most recent call last):
  File "/app/master_merge/batch_merge_master.py", line 204, in original_backup
    object_storage_fs.mv(source, target, recursive=True)
  File "/usr/local/lib/python3.9/dist-packages/fsspec/spec.py", line 916, in mv
    self.rm(path1, recursive=recursive)    
(생략)
botocore.exceptions.ConnectTimeoutError: Connect timeout on endpoint URL: "http://10.**.**.***:30**/hgkim?delete"
22/07/01 05:08:28 INFO SparkUI: Stopped Spark web UI at http://spark-19bdeb81b6331f5d-driver-svc.spark-operator.svc:4040

여기서 문제가 발생하더라고요. 

spark 때문인지, 단순한 이동 문제인지 알 수 없었기 때문에 원인을 정확히 파악할 수 없었어요

sparkKubernetesOperator 안에서 빌드한 image 호출

스터디를 하고 나니까, 이건 airflow 답지 않은 거라 하더라고요

  • airflow 다운 것 -> 적절한 task로 분리, 멱등성/원장성 준수
    (스터디 안에서도 어디까지가 적절한 task인지 많은 이야기가 오고 갔어요)

2. 분석

task를 분리!

spark image를 파악해보니, 아래 4가지 로직이 하나의 이미지에 포함된 걸 확인했습니다.

  1. validation/ backup (s3fs)
  2. sparkjob(pyspark)
  3. cleanse (s3fs)

spark 앞뒤의 s3 로직들을 분리햇어요

Action Plan: spark image에는 spark만 남기고, 나머지는 task로 분리하기

dummy Operator로 구성 잡기


3. 적용

task를 분리하고, 반복되는 작업들은 customOperator 구현했어요

'task 분리' 부분은 간단하게 끝났어요. CI/CD를 세팅해놔서, 소스를 수정하고 새로 이미지를 만들었거든요

python Operator로 s3 로직들을 모두 꺼내니까 지저분 하더라고요

그래서 Custom operator를 적용해서 모듈화했어요. 이 내용은 다음 글에서 정리할게요


 

4. 결론

task 쪼개기 덕분에, 어디서 문제가 생겼는지 확인할 수 있었어요.

task를 쪼갠 이후에는 spark job을 또 실행하지 않아도 되서
불필요한 자원을 아끼고, 어디서 문제가 생겼는지 명확히 파악 할 수 있었어요

이제 문제가 생겼다고, spark job을 또 돌리지 않아도 되서 뿌듯합니다 :)

 

다음에는 python operator를 줄이고, custom operator를 적용한 후기를 작성해볼게요

728x90
반응형