Airflow) 'Task 분리' 실무 적용하기 _k8s/spark
2022.05~07, 3달동안 진행한 airflow 스터디를 끝내고 실무에 적용한 내용 일부를 정리햇어요
airflow를 운영하면서 가장 불편했던 부분은 'Task가 분리되지 않은 것' 이었어요
- 어디서 문제가 생겼는지 추적할 수도 없는게 불편했어요
- 데이터 흐름이 어떻게 되있는지 알 수 없었어요
- 멱등성, 원자성도 지켜지지 않았어요
예상 독자는 아래와 같아요
- airflow 도입을 고민하는 분
- airflow를 이제 막 사용하시는 분
- airflow 관리가 어려운 운영 담당자
요약
- 문제: airflow에서 spark job이 간헐적으로 실패함
- 분석: task의 분리가 필요함
- 적용: task를 분리함
- 결과: 문제 발생 시점을 알 수 있고, 불필요한 작업을 줄일 수 있게됨
설명
1. 문제
airflow에서 spark job이 간헐적으로 실패했는데, 단순 s3 이동 작업 때문이었어요
k8s 에서 spark job을 실행하는데, spark 외의 로직들이 오류를 간헐적으로 일으키고 있었어요
- spark job이 실행되기 전에 실행되는 validation, backup, cleanse 로직이 문제
데이터를 cleanse할 때, file을 이동하는 로직이 있어요
"object_storage_fs.mv(source, target, recursive=True)"
Traceback (most recent call last):
File "/app/master_merge/batch_merge_master.py", line 204, in original_backup
object_storage_fs.mv(source, target, recursive=True)
File "/usr/local/lib/python3.9/dist-packages/fsspec/spec.py", line 916, in mv
self.rm(path1, recursive=recursive)
(생략)
botocore.exceptions.ConnectTimeoutError: Connect timeout on endpoint URL: "http://10.**.**.***:30**/hgkim?delete"
22/07/01 05:08:28 INFO SparkUI: Stopped Spark web UI at http://spark-19bdeb81b6331f5d-driver-svc.spark-operator.svc:4040
여기서 문제가 발생하더라고요.
spark 때문인지, 단순한 이동 문제인지 알 수 없었기 때문에 원인을 정확히 파악할 수 없었어요
스터디를 하고 나니까, 이건 airflow 답지 않은 거라 하더라고요
- airflow 다운 것 -> 적절한 task로 분리, 멱등성/원장성 준수
(스터디 안에서도 어디까지가 적절한 task인지 많은 이야기가 오고 갔어요)
2. 분석
task를 분리!
spark image를 파악해보니, 아래 4가지 로직이 하나의 이미지에 포함된 걸 확인했습니다.
- validation/ backup (s3fs)
- sparkjob(pyspark)
- cleanse (s3fs)
spark 앞뒤의 s3 로직들을 분리햇어요
Action Plan: spark image에는 spark만 남기고, 나머지는 task로 분리하기
3. 적용
task를 분리하고, 반복되는 작업들은 customOperator 구현했어요
'task 분리' 부분은 간단하게 끝났어요. CI/CD를 세팅해놔서, 소스를 수정하고 새로 이미지를 만들었거든요
python Operator로 s3 로직들을 모두 꺼내니까 지저분 하더라고요
그래서 Custom operator를 적용해서 모듈화했어요. 이 내용은 다음 글에서 정리할게요
4. 결론
task 쪼개기 덕분에, 어디서 문제가 생겼는지 확인할 수 있었어요.
task를 쪼갠 이후에는 spark job을 또 실행하지 않아도 되서
불필요한 자원을 아끼고, 어디서 문제가 생겼는지 명확히 파악 할 수 있었어요
이제 문제가 생겼다고, spark job을 또 돌리지 않아도 되서 뿌듯합니다 :)
다음에는 python operator를 줄이고, custom operator를 적용한 후기를 작성해볼게요