'데이터 엔지니어'로 성장하기

정리하는 걸 좋아하고, 남이 읽으면 더 좋아함

Data/Airflow

Airflow) docker-compose로 dockerOperator 실행하기

MightyTedKim 2024. 10. 27. 14:44
728x90
반응형

급하게 회사 업무로 airflow를 사용할 일이 있었는데

docker-compose에서 dockerOperator를 사용한 경험입니다

# 상황

airflow 3년 전에 너가 세팅했지? 지금 바로 해줘

 

Airflow야 여러 버전별로 여러번 설치해봤으니까 

알겠다고 대답했습니다.

 

생각없이 그냥 작업하다가 삽질을 2번이나 했습니다.

# 삽질1: k8s 버전 생각 안하고 기존 소스로 설치하기

2년 전에 세팅해둔 gitlab repo(airflow helm 2.3.0)가 있길래,

회사 on-prem 쿠버(1.30) 에 실행해봤어요. 

 

그런데 역시나 버전이 에러가 났습니다.(멍청한 나)

- 현상: scheduler가 죽은 다음 살아나지를 않음

 

[원인]

혹시나가 역시나. 버전이 안맞음

gitlab repo가 2년 전 세팅한

- gitlab repo: airflow 2.3.0 (k8s 1.24까지 지원)

- 내가 받은 k8s는 1.30

 

발생했던 scheduler error log

  File "/home/airflow/.local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 281, in <listcomp>
    for sub_data in data]
  File "/home/airflow/.local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 303, in __deserialize
    return self.__deserialize_model(data, klass)
  File "/home/airflow/.local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 641, in __deserialize_model
    instance = klass(**kwargs)
  File "/home/airflow/.local/lib/python3.7/site-packages/kubernetes/client/models/v1_pod_condition.py", line 76, in __init__
    self.type = type
  File "/home/airflow/.local/lib/python3.7/site-packages/kubernetes/client/models/v1_pod_condition.py", line 221, in type
    .format(type, allowed_values)
ValueError: Invalid value for type (PodReadyToStartContainers), must be one of ['ContainersReady', 'Initialized', 'PodScheduled', 'Ready']

[삽질2: Docker-compose로 설치했는데, Python 버전이 안맞음]

빠르게 멍청했던 과거를 반성하고

ubuntu 22.04에 docker-comopse로 실행했어요

curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.5.1/docker-compose.yaml'

여기까지는 1분 컷이죠

 

끝!

 

이면 좋겠지만 세상 일은 그렇게 아름답지 않죠 


 

이제서야 요구사항이 들어옵니다.

 

python 3.9, scikit-learn 1.1.0 기반의

모델을 실행해야한다고 합니다.

?? 아니 미리 말해주지

 

제가 설치한 Airflow 는 Python 3.7을 사용하고 있어서,


Python Operator를 쓰려고 해도 

라이브러리 설치가 안되요...

 

Scikit-learn 1.0.2까지 설치할 수 있어서 돌려봤는데

역시 모델을 못읽음 ㅜ

/home/airflow/.local/lib/python3.7/site-packages/sklearn/base.py:338:
UserWarning: Trying to unpickle estimator DecisionTreeClassifier from version 1.1.1 when using version 1.0.2. This might lead to breaking code or invalid results.

 

[해결: Docker-Compose에서 DockerOperator 사용]

급하게, DockerOpertor를 실행했습니다.

2가지를 해야한다고 합니다.

gpt: docker socket 연결과 root user id 설정

 

user: 0:0 과 로컬 서버의 docker 소켓을 연결시켜줍니다. 

version: '3'
x-airflow-common:
  &airflow-common
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.5.1}
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
    AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
  volumes:
    - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
    - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
    - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
    - /var/run/docker.sock:/var/run/docker.sock  # Mount Docker socket to allow DockerOperator
  user: "0:0"  # Run as root to access Docker socket
  depends_on:
    - redis
    - postgres

services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "airflow"]
      interval: 5s
      retries: 5
    restart: always

  redis:
    image: redis:latest
    expose:
      - 6379
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s
      timeout: 30s
      retries: 50
    restart: always

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - 8080:8080
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      - airflow-init

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    healthcheck:
      test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      - airflow-init

  airflow-worker:
    <<: *airflow-common
    command: celery worker
    healthcheck:
      test:
        - "CMD-SHELL"
        - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      - airflow-init

  airflow-triggerer:
    <<: *airflow-common
    command: triggerer
    healthcheck:
      test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      - airflow-init

  airflow-init:
    <<: *airflow-common
    entrypoint: /bin/bash
    command:
      - -c
      - |
        mkdir -p /sources/logs /sources/dags /sources/plugins
        chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
        exec /entrypoint airflow version
    environment:
      <<: *airflow-common-env
      _AIRFLOW_DB_UPGRADE: 'true'
      _AIRFLOW_WWW_USER_CREATE: 'true'
      _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
      _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
    user: "0:0"  # Run as root for init
    volumes:
      - ${AIRFLOW_PROJ_DIR:-.}:/sources

volumes:
  postgres-db-volume:

 

이제 테스트를 위해 dockerOperator를 실행했어요

from airflow import DAG
from airflow.providers.docker.operators.docker import DockerOperator
from datetime import datetime

# Define default arguments for the DAG
default_args = {
    'owner': 'airflow',
    'retries': 1,
    'retry_delay': timedelta(minutes=1)
}

# Create the DAG
with DAG(
    dag_id='docker_operator_python_slim_3_9',
    default_args=default_args,
    start_date=datetime(2023, 1, 1),
    schedule_interval=None,  # Run on-demand
    catchup=False,
) as dag:

    # Define a DockerOperator to run a Python script
    run_python_script = DockerOperator(
        task_id='run_python_script',
        image='python:3.9-slim',  # Use Python slim image
        api_version='auto',
        auto_remove=True,  # Remove container after running
        command="python -c 'print(\"Hello from Python 3.9-slim!\")'",
        docker_url='unix://var/run/docker.sock',  # Docker socket
        network_mode='bridge',
    )

    run_python_script

 

이제 됩니다. 다행이에요

전달받은 model 코드를 이용해 docker image를 빌드하고

실행하니 결과가 나오네요

 

후기

급하다고 해서 

혹시나 버전이 되지 않을까라는 생각에

2번이나 실수를 했어요.

 

그래서 오히려 더 오래걸렸던 것 같습니다

다음부터는 좀 더 신중히 생각하고 작업을 해야겠어요.

 

 

 

참고

https://velog.io/@sophi_e/Airflow-docker-compose%EB%A1%9C-%EC%97%90%EC%96%B4%ED%94%8C%EB%A1%9C%EC%9A%B0-%EC%84%A4%EC%B9%98%ED%95%98%EA%B8%B0

728x90
반응형