728x90
반응형
질문 타임이 1시간이나 있었음
https://www.slipp.net/wiki/pages/viewpage.action?pageId=3276808650952
요약
1. 4장 : 태스크 콘텍스트와 Jinja 템플릿 작업
2. 5장 : 태스크 템플릿 + 의존성 정의
3. 마무리
설명
1. 4장 : 태스크 콘텍스트와 Jinja 템플릿 작업
스터디원 중에서 airflow 사용 경험이 없는 분이 발표를 진행했는데
가닥을 잡는데 도움이 되는 챕터였다는 이야기를 들었습니다
{{ds}}, 증분 등의 실습이 많아서 그런 것 같습니다 ㅎ
execution_date은 오버라이딩 되나?
- kwargs로 오버라이딩하면 **context에 오버라이딩 됨
- https://mightytedkim.tistory.com/112
parameter를 명시하면, **context에서 제외되나
- 제외되서, key/value에서 빠짐
- https://mightytedkim.tistory.com/112
하루 전날 date로 실행하려면 어떡하나?,timedelta(days=1) 했는데 왜 하루 전날이 아닌지
- timezone이나 execution_date에 따라 다를 수 있음
retry_delay는 어떻게 사용하는지
- 실무 예시
default_args = {
'owner': 'slipp',
'start_date': datetime(2020, 10, 20),
'email': ['slipp@gmail.com'],
'retries': 6,
'retry_delay': timedelta(minutes=5),
'depends_on_past': False
}
실무에서는 실패하면 어떻게 예외 처리하는지
- retry 나 skip + slack 알람
많이 사용하는 jinja_template이 뭐가 있는지(ti, ds)
- task instance 많이 사용함 (애정함)
- ds, ds_nodash, yesterday_ds_nodash
jinja 템플릿 안에서는 계산이 안되는지, {{파람}} - timedelta(days=1) 안됨
- 연산은 안되고, 있는 템플릿의 변수를 활용하는 단계까지만
{{execution_date.year}}
멱등성 원칙을 위반한 예시는 뭐가 있을까
- datetime.today()를 사용하는 것
# merge date
today = datetime.today() - timedelta(days=1)
yesterday = today.strftime('%Y%m%d')
source = conf["spark"]["warehouse_dir"] + "/dt=" + yesterday
target = conf["spark"]["hist_dir"] + "/dt=" + yesterday
airflow UI에서는 코드를 수정 할 수 없는데 다른 방법은 없을까? (젠킨스는 되는데)
- 서버에서 직접 파이썬 코드를 수정하니까 오탈자도 많고 불편함
- 보통 git을 연동해서 commit → push해서 사용함 (이 방법을 보통 사용)
- 도커에서도 k8s 싱크 이미지를 사용함(테스트 완료)
- airflow code editor 플러그인을 설치해서 사용하는 방법도 있음
- 비추천, 관리하기 힘듦(믿어주삼.. 실시간 경험중)
UTC start_date, execute_date 너무 헷갈리는데 방법이 없을까
- 우측 상단에 KST로 설정하면 혼선을 줄일 수 있음
- start_date, execution_date 적용 됌 (기존 task 인스턴스에도)
- 설정파일로 바꾸는 법이 있기는 함 (Airflow 2.0부터 적용된듯?!)
- 다만 로그에 남아있는 날짜 데이터의 경우 보정되지 않음.(이건 text로 저장되니까 어쩔수 없음)
2. 5장 : 태스크 템플릿 + 의존성 정의
5장은 3명에서 나눠서 발표했어요
혼자하기에는 양도 많고, 중요한 내용들이라 자세히 보고 싶었어요
- all_success, one_failed 같은 트리거 규칙
- 테스크간 데이터 공유 (xcom)
- @task (데코레이터) : taskflow api
xcom은 왜 무분별하게 사용하면 안될끼?
- xcom 과다 사용하다 metadata db가 crush 남
용량이 초과된 원인은 XCom의 무분별한 사용에 있었습니다. Airflow1에서는 XCom 전송이 Database를 통해 이루어지기 때문에 과도한 데이터를 XCom을 통해 전송하면 Database에 거대한 데이터를 저장하는 것과 마찬가지였고, 우리의 경우 거대한 교육 컨텐츠를 잘게 쪼개놓은 테스크간에 전달하기 때문에 막대한 데이터를 여러 번 Database에 저장하고 조회하는 비효율적인 방식으로 동작했습니다. https://medium.com/riiid-teamblog-kr/airflow2를-이용한-데이터-파이프라인-구성기-ab1ff1471546 |
- 공식 문서
If possible, use XCom to communicate small messages between tasks and a good way of passing larger data between tasks is to use a remote storage such as S3/HDFS. -> 큰 데이터는 S3/HDFS For example, if we have a task that stores processed data in S3 that task can push the S3 path for the output data in Xcom, and the downstream tasks can pull the path from XCom and use it to read the data. -> xcom에는 path만 저장하는 것 추천 https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html |
왜 피클링이 불가능한가요?
-> 책: "피클링이 불가능한 개체로는 데이터베이스 연결과 파일 핸들러가 있습니다."
attrbute은 가져오지 않기 때문에 connection이 앋뇐다는거 아닐까? -> 공부 필요
* 공식 문서에서는 파일 오브젝트, 네트워크 커넥션, 데이터베이스커넥션 안된다고 함
With pickle protocol v1, you cannot pickle open file objects, network connections, or database connections. When you think about it, it makes sense -- pickle cannot will the connection for file object to exist when you unpickle your object, and the process of creating that connection goes beyond what pickle can automatically do for you. If you really want to pickle something that has an attribute that is causing problems, look at the pickle documentation for __getstate__, __setstate__, and __getinitargs__ -- using these you can exclude problematic attributes. https://wiki.python.org/moin/UsingPickle |
XCom 내용도 디비에 저장하는 것으로 이해했는데, XCom은 디스크에 결과를 저장하는게 아닌건가요?
-> 책: "향후 더 많은 페이지 처리로 데이터 크기가 커질 수 있다는 점을 염두에 두고 XCom 대신 디스크에 결과를 저장합니다."
디비에 저장하는것도 결국 디스크에 저장하는것
TaskFlow Api의 한계는?
python operator만 지원하고, 아직은 shift 연산자와 혼용해야되서 헷갈릴 수 있을듯
(관리자가 없이 사용하게 되면 혼선이 올 수도 있음)
TaskFlow Api의 장점은?
[2.0의 가장 큰 장점] Airflow 2 TaskFlow API의 가장 큰 장점은 ti(TaskInstance)를 통해 XCom data를 직접 push/pull 할 필요 없이 @task 로 정의한 함수의 반환 값을 그대로 다른 테스크의 인자로 제공하면 XCom 전송이 일어나도록 구성되어 데이터의 전송 흐름과 테스크의 흐름이 동일해지고 XCom 호출 과정을 생략할 수 있어서 간결한 테스크 구성과 데이터 전송 흐름을 바탕으로 빠른 파이프라인 진단이 가능해집니다. https://medium.com/riiid-teamblog-kr/airflow2를-이용한-데이터-파이프라인-구성기-ab1ff1471546 |
3. 마무리
이번 스터디는 고운님의 폭풍 질문이 없었으면 늘어졌을 것 같아요.
airflow 사용해본 다른 분들에게도 도움이 될 질문을 해주셨으니까요 ㅎ
그리고 저는 뤼이드 블로그에서 많이 배웠습니다. 마침 저도 데코레이터(@task)를 적용해야하난 생각 중이었거든요
연휴 끝나고 오프라인 모임해요 :)
728x90
반응형
'Data > Airflow' 카테고리의 다른 글
Airflow) S3CopyObjectOperator 이용해서 copy 하기 (0) | 2022.06.02 |
---|---|
Slipp) Airflow2.0스터디_워크플로 트러거_4주차(6장) (0) | 2022.05.28 |
Airflow) context, kwargs 혼용 실습 (0) | 2022.05.09 |
Airflow) 2.0부터는 scheduler 는 replica 여러개 설정 추천 (0) | 2022.04.28 |
Slipp) Airflow2.0 스터디_2주차(3장) Airflow의 스케줄링 (2) | 2022.04.28 |