급하게 회사 업무로 airflow를 사용할 일이 있었는데
docker-compose에서 dockerOperator를 사용한 경험입니다
# 상황
airflow 3년 전에 너가 세팅했지? 지금 바로 해줘
Airflow야 여러 버전별로 여러번 설치해봤으니까
알겠다고 대답했습니다.
생각없이 그냥 작업하다가 삽질을 2번이나 했습니다.
# 삽질1: k8s 버전 생각 안하고 기존 소스로 설치하기
2년 전에 세팅해둔 gitlab repo(airflow helm 2.3.0)가 있길래,
회사 on-prem 쿠버(1.30) 에 실행해봤어요.
그런데 역시나 버전이 에러가 났습니다.(멍청한 나)
- 현상: scheduler가 죽은 다음 살아나지를 않음
[원인]
혹시나가 역시나. 버전이 안맞음
gitlab repo가 2년 전 세팅한
- gitlab repo: airflow 2.3.0 (k8s 1.24까지 지원)
- 내가 받은 k8s는 1.30
발생했던 scheduler error log
File "/home/airflow/.local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 281, in <listcomp>
for sub_data in data]
File "/home/airflow/.local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 303, in __deserialize
return self.__deserialize_model(data, klass)
File "/home/airflow/.local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 641, in __deserialize_model
instance = klass(**kwargs)
File "/home/airflow/.local/lib/python3.7/site-packages/kubernetes/client/models/v1_pod_condition.py", line 76, in __init__
self.type = type
File "/home/airflow/.local/lib/python3.7/site-packages/kubernetes/client/models/v1_pod_condition.py", line 221, in type
.format(type, allowed_values)
ValueError: Invalid value for type (PodReadyToStartContainers), must be one of ['ContainersReady', 'Initialized', 'PodScheduled', 'Ready']
[삽질2: Docker-compose로 설치했는데, Python 버전이 안맞음]
빠르게 멍청했던 과거를 반성하고
ubuntu 22.04에 docker-comopse로 실행했어요
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.5.1/docker-compose.yaml'
여기까지는 1분 컷이죠
끝!
이면 좋겠지만 세상 일은 그렇게 아름답지 않죠
이제서야 요구사항이 들어옵니다.
python 3.9, scikit-learn 1.1.0 기반의
모델을 실행해야한다고 합니다.
제가 설치한 Airflow 는 Python 3.7을 사용하고 있어서,
Python Operator를 쓰려고 해도
라이브러리 설치가 안되요...
Scikit-learn 1.0.2까지 설치할 수 있어서 돌려봤는데
역시 모델을 못읽음 ㅜ
/home/airflow/.local/lib/python3.7/site-packages/sklearn/base.py:338:
UserWarning: Trying to unpickle estimator DecisionTreeClassifier from version 1.1.1 when using version 1.0.2. This might lead to breaking code or invalid results.
[해결: Docker-Compose에서 DockerOperator 사용]
급하게, DockerOpertor를 실행했습니다.
2가지를 해야한다고 합니다.
user: 0:0 과 로컬 서버의 docker 소켓을 연결시켜줍니다.
version: '3'
x-airflow-common:
&airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.5.1}
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
volumes:
- ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
- ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
- ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
- /var/run/docker.sock:/var/run/docker.sock # Mount Docker socket to allow DockerOperator
user: "0:0" # Run as root to access Docker socket
depends_on:
- redis
- postgres
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 5s
retries: 5
restart: always
redis:
image: redis:latest
expose:
- 6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 30s
retries: 50
restart: always
airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- 8080:8080
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
- airflow-init
airflow-scheduler:
<<: *airflow-common
command: scheduler
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
- airflow-init
airflow-worker:
<<: *airflow-common
command: celery worker
healthcheck:
test:
- "CMD-SHELL"
- 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
- airflow-init
airflow-triggerer:
<<: *airflow-common
command: triggerer
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
- airflow-init
airflow-init:
<<: *airflow-common
entrypoint: /bin/bash
command:
- -c
- |
mkdir -p /sources/logs /sources/dags /sources/plugins
chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
exec /entrypoint airflow version
environment:
<<: *airflow-common-env
_AIRFLOW_DB_UPGRADE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
user: "0:0" # Run as root for init
volumes:
- ${AIRFLOW_PROJ_DIR:-.}:/sources
volumes:
postgres-db-volume:
이제 테스트를 위해 dockerOperator를 실행했어요
from airflow import DAG
from airflow.providers.docker.operators.docker import DockerOperator
from datetime import datetime
# Define default arguments for the DAG
default_args = {
'owner': 'airflow',
'retries': 1,
'retry_delay': timedelta(minutes=1)
}
# Create the DAG
with DAG(
dag_id='docker_operator_python_slim_3_9',
default_args=default_args,
start_date=datetime(2023, 1, 1),
schedule_interval=None, # Run on-demand
catchup=False,
) as dag:
# Define a DockerOperator to run a Python script
run_python_script = DockerOperator(
task_id='run_python_script',
image='python:3.9-slim', # Use Python slim image
api_version='auto',
auto_remove=True, # Remove container after running
command="python -c 'print(\"Hello from Python 3.9-slim!\")'",
docker_url='unix://var/run/docker.sock', # Docker socket
network_mode='bridge',
)
run_python_script
이제 됩니다. 다행이에요
전달받은 model 코드를 이용해 docker image를 빌드하고
실행하니 결과가 나오네요
후기
급하다고 해서
혹시나 버전이 되지 않을까라는 생각에
2번이나 실수를 했어요.
그래서 오히려 더 오래걸렸던 것 같습니다
다음부터는 좀 더 신중히 생각하고 작업을 해야겠어요.
참고
'Data > Airflow' 카테고리의 다른 글
Airflow) Base image에 라이브러리 추가하기 (0) | 2023.04.23 |
---|---|
airflow) dag clear 하지 않고 특정 시점부터 재시작하기 (0) | 2023.02.27 |
Airflow) 'Custom Operator' 실무 적용하기_s3,hook (0) | 2022.08.01 |
Slipp) Airflow 2.0 스터디 후기_22기 (0) | 2022.07.29 |
글또) 7기 다짐글(2022.05 ~ 2022.11)_airflow (0) | 2022.07.10 |