'데이터 엔지니어'로 성장하기

정리하는 걸 좋아하고, 남이 읽으면 더 좋아함

Data/Airflow

Kubernetes)Airflow설치_helm

MightyTedKim 2022. 2. 21. 14:24
728x90
반응형

 

버전 : airflow-1.2.0.tgz

링크 : https://airflow.apache.org/docs/helm-chart/stable/index.html

결과

$k get all -n airflow
NAME                                    READY   STATUS    RESTARTS   AGE
pod/airflow-postgresql-0                1/1     Running   0          28d
pod/airflow-scheduler-88588ff5c-9cr2x   3/3     Running   0          6d17h
pod/airflow-statsd-5df44cb959-h64bn     1/1     Running   0          28d
pod/airflow-webserver-ffc95467d-s6gx9   1/1     Running   0          17d

NAME                                  TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)             AGE
service/airflow-postgresql            ClusterIP   10.107.117.120   <none>        5432/TCP            28d
service/airflow-postgresql-headless   ClusterIP   None             <none>        5432/TCP            28d
service/airflow-postgresql-nodeport   NodePort    10.103.116.105   <none>        30016:30016/TCP     56d
service/airflow-statsd                ClusterIP   10.104.226.69    <none>        9125/UDP,9102/TCP   28d
service/airflow-webserver             ClusterIP   10.110.31.162    <none>        8080/TCP            28d
service/airflow-webserver-svc-np      NodePort    10.107.203.140   <none>        8080:30012/TCP      59d

NAME                                READY   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/airflow-scheduler   1/1     1            1           28d
deployment.apps/airflow-statsd      1/1     1            1           28d
deployment.apps/airflow-webserver   1/1     1            1           28d

NAME                                          DESIRED   CURRENT   READY   AGE
replicaset.apps/airflow-scheduler-88588ff5c   1         1         1       28d
replicaset.apps/airflow-statsd-5df44cb959     1         1         1       28d
replicaset.apps/airflow-webserver-ffc95467d   1         1         1       28d

NAME                                  READY   AGE
statefulset.apps/airflow-postgresql   1/1     28d

$ k get pvc -n airflow
NAME                        STATUS   VOLUME                                     CAPACITY   ACCESS MODES   STORAGECLASS      AGE
data-airflow-postgresql-0   Bound    pvc-d8d79772-5bfa-4e83-b826-320df9dacb5f   8Gi        RWO            rook-ceph-block   28d

$k get secret -n airflow
NAME                                       TYPE                                  DATA   AGE
airflow-airflow-metadata                   Opaque                                1      28d
airflow-broker-url                         Opaque                                1      28d
airflow-create-user-job-token-tz2g7        kubernetes.io/service-account-token   3      28d
airflow-fernet-key                         Opaque                                1      28d
airflow-migrate-database-job-token-hbd56   kubernetes.io/service-account-token   3      28d
airflow-postgresql                         Opaque                                1      28d
airflow-redis-password                     Opaque                                1      28d
airflow-scheduler-token-n59mm              kubernetes.io/service-account-token   3      28d
airflow-ssh-git-secret                     Opaque                                1      59d
airflow-statsd-token-7pnrc                 kubernetes.io/service-account-token   3      28d
airflow-webserver-secret-key               Opaque                                1      28d
airflow-webserver-token-2pnzj              kubernetes.io/service-account-token   3      28d
airflow-worker-token-xxmtf                 kubernetes.io/service-account-token   3      28d
default-token-xzzjw                        kubernetes.io/service-account-token   3      59d
sh.helm.release.v1.airflow.v1              helm.sh/release.v1                    1      28d
 

설치

1. secret

git연결
$ kubectl create secret generic airflow-ssh-git-secret --from-file=gitSshKey=./id_rsa -n airflow
$ k get secret airflow-ssh-git-secret -n airflow-test
NAME TYPE DATA AGE
airflow-ssh-git-secret Opaque 1 8d

2. pvc

airflowlog-pv-claim.yaml
airflowpgsql-pv-claim.yaml
airflow-webserver-svc-np.yaml
$ apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: log-airflow-0
  namespace: airflow #-webinar
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 50Gi
  storageClassName: rook-ceph-block

$ apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: data-airflow-postgresql-0
  namespace: airflow
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 10Gi
  storageClassName: rook-ceph-block

$ apiVersion: v1
kind: Service
metadata:
  name: airflow-webserver-svc-np
  namespace: airflow
spec:
  type: NodePort
  selector:
    component: webserver
    release: airflow
    tier: airflow

  ports:
      # By default and for convenience, the `targetPort` is set to the same value as the `port` field.
    - port: 8080
      targetPort: 8080
      # Optional field
      # By default and for convenience, the Kubernetes control plane will allocate a port from a range (default: 30000-32767)
      nodePort: 30012

4. ./values.yaml

gitlab 주소 변경, image 경로 변경, 프로메테우스 metric 추가
더보기

# Default airflow repository -- overrides all the specific images below
#defaultAirflowRepository: apache/airflow
defaultAirflowRepository: hgkim/library/apache/airflow-custom #hgkim

# Default airflow tag to deploy
#defaultAirflowTag: "2.1.4-20211216-2e54a0d7" 
defaultAirflowTag: "latest" 

# Airflow version (Used to make some decisions based on Airflow Version being deployed)
airflowVersion: "2.1.4" 

# Images
images:
  useDefaultImageForMigration: true #false, hgkim
  pod_template:
    pullPolicy: Always #IfNotPresent,hgkim
  flower:
    pullPolicy: Always #IfNotPresent,hgkim
  statsd:
    repository: hgkim/library/apache/airflow
    tag: airflow-statsd-exporter-2021.04.28-v0.17.0-20211208
    pullPolicy: IfNotPresent
  gitSync:
    repository: hgkim/library/k8s.gcr.io/git-sync/git-sync
    tag: v3.3.0-20211208

 

 


# Airflow executor
# Options: LocalExecutor, CeleryExecutor, KubernetesExecutor, CeleryKubernetesExecutor
#executor: "CeleryExecutor" 
executor: "KubernetesExecutor" #hgkim

#   hgkim_start
env:
  - name: "AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY" 
    value: "hgkim/library/apache/airflow-custom" 
  - name: "AIRFLOW__KUBERNETES__WORKER_CONATINER_TAG" 
    value: "latest" 
    #value: "2.1.4-20211216-2e54a0d7" 
  - name: "AIRFLOW__KUBERNETES__RUN_AS_USER" 
    value: "50000" 
  - name: "AIRFLOW__KUBERNETES__DAGS_IN_IMAGE" 
    value: "True" 
  - name: "AIRFLOW__CORE__REMOTE_LOGGING" 
    value: "True" 
  - name: "AIRFLOW__CORE__REMOTE_LOG_CONN_ID" 
    value: "CephObjectConn" 
  - name: "AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER" 
    value: "s3://hgkim/airflow/logs" 
  - name: "AIRFLOW__CORE__DEFAULT_TIMEZONE" 
    value: "Asia/Seoul" 
#   hgkim_end


  persistence:
    # Enable persistent volumes
    enabled: false #true hgkim
    # Volume size for worker StatefulSet
    size: 10Gi # 100Gi, hgkim
    # If using a custom storageClass, pass name ref to all statefulSets here
    storageClassName:
    # Execute init container to chown log directory.
    # This is currently only needed in kind, due to usage
    # of local-path provisioner.
    fixPermissions: false

 

  # Additional mappings for statsd exporter.
  #extraMappings: []
  extraMappings:
   # === Counters ===
    - match: "(.+)\\.(.+)_start$" 
      match_metric_type: counter
      name: "af_agg_job_start" 
      match_type: regex
      labels:
        airflow_id: "$1" 
        job_name: "$2" 
    - match: "(.+)\\.(.+)_end$" 
      match_metric_type: counter
      name: "af_agg_job_end" 
      match_type: regex
      labels:
        airflow_id: "$1" 
        job_name: "$2" 
    - match: "(.+)\\.operator_failures_(.+)$" 
      match_metric_type: counter
      name: "af_agg_operator_failures" 
      match_type: regex
      labels:
        airflow_id: "$1" 
        operator_name: "$2" 
    - match: "(.+)\\.operator_successes_(.+)$" 
      match_metric_type: counter
      name: "af_agg_operator_successes" 
      match_type: regex
      labels:
        airflow_id: "$1" 
        operator_name: "$2" 
    - match: "*.ti_failures" 
      match_metric_type: counter
      name: "af_agg_ti_failures" 
      labels:
        airflow_id: "$1" 
    - match: "*.ti_successes" 
      match_metric_type: counter
      name: "af_agg_ti_successes" 
      labels:
        airflow_id: "$1" 
    - match: "*.zombies_killed" 
      match_metric_type: counter
      name: "af_agg_zombies_killed" 
      labels:
        airflow_id: "$1" 
    - match: "*.scheduler_heartbeat" 
      match_metric_type: counter
      name: "af_agg_scheduler_heartbeat" 
      labels:
        airflow_id: "$1" 
        instance: "$2" 
    - match: "*.dag_processing.processes" 
      match_metric_type: counter
      name: "af_agg_dag_processing_processes" 
      labels:
        airflow_id: "$1" 
    - match: "*.scheduler.tasks.killed_externally" 
      match_metric_type: counter
      name: "af_agg_scheduler_tasks_killed_externally" 
      labels:
        airflow_id: "$1" 
    - match: "*.scheduler.tasks.running" 
      match_metric_type: counter
      name: "af_agg_scheduler_tasks_running" 
      labels:
        airflow_id: "$1" 
    - match: "*.scheduler.tasks.starving" 
      match_metric_type: counter
      name: "af_agg_scheduler_tasks_starving" 
      labels:
        airflow_id: "$1" 
    - match: "*.scheduler.orphaned_tasks.cleared" 
      match_metric_type: counter
      name: "af_agg_scheduler_orphaned_tasks_cleared" 
      labels:
        airflow_id: "$1" 
    - match: "*.scheduler.orphaned_tasks.adopted" 
      match_metric_type: counter
      name: "af_agg_scheduler_orphaned_tasks_adopted" 
      labels:
        airflow_id: "$1" 
    - match: "*.scheduler.critical_section_busy" 
      match_metric_type: counter
      name: "af_agg_scheduler_critical_section_busy" 
      labels:
        airflow_id: "$1" 
    - match: "*.sla_email_notification_failure" 
      match_metric_type: counter
      name: "af_agg_sla_email_notification_failure" 
      labels:
        airflow_id: "$1" 
    - match: "*.ti.start.*.*" 
      match_metric_type: counter
      name: "af_agg_ti_start" 
      labels:
        airflow_id: "$1" 
        dag_id: "$2" 
        task_id: "$3" 
    - match: "*.ti.finish.*.*.*" 
      match_metric_type: counter
      name: "af_agg_ti_finish" 
      labels:
        airflow_id: "$1" 
        dag_id: "$2" 
        task_id: "$3" 
        state: "$4" 
    - match: "*.dag.callback_exceptions" 
      match_metric_type: counter
      name: "af_agg_dag_callback_exceptions" 
      labels:
        airflow_id: "$1" 
    - match: "*.celery.task_timeout_error" 
      match_metric_type: counter
      name: "af_agg_celery_task_timeout_error" 
      labels:
        airflow_id: "$1" 
    # === Gauges ===
    - match: "*.dagbag_size" 
      match_metric_type: gauge
      name: "af_agg_dagbag_size" 
      labels:
        airflow_id: "$1" 
    - match: "*.dag_processing.import_errors" 
      match_metric_type: gauge
      name: "af_agg_dag_processing_import_errors" 
      labels:
        airflow_id: "$1" 
    - match: "*.dag_processing.total_parse_time" 
      match_metric_type: gauge
      name: "af_agg_dag_processing_total_parse_time" 
      labels:
        airflow_id: "$1" 
    - match: "*.dag_processing.last_runtime.*" 
      match_metric_type: gauge
      name: "af_agg_dag_processing_last_runtime" 
      labels:
        airflow_id: "$1" 
        dag_file: "$2" 
    - match: "*.dag_processing.last_run.seconds_ago.*" 
      match_metric_type: gauge
      name: "af_agg_dag_processing_last_run_seconds" 
      labels:
        airflow_id: "$1" 
        dag_file: "$2" 
    - match: "*.dag_processing.processor_timeouts" 
      match_metric_type: gauge
      name: "af_agg_dag_processing_processor_timeouts" 
      labels:
        airflow_id: "$1" 
    - match: "*.executor.open_slots" 
      match_metric_type: gauge
      name: "af_agg_executor_open_slots" 
      labels:
        airflow_id: "$1" 
    - match: "*.executor.queued_tasks" 
      match_metric_type: gauge
      name: "af_agg_executor_queued_tasks" 
      labels:
        airflow_id: "$1" 
    - match: "*.executor.running_tasks" 
      match_metric_type: gauge
      name: "af_agg_executor_running_tasks" 
      labels:
        airflow_id: "$1" 
    - match: "*.pool.open_slots.*" 
      match_metric_type: gauge
      name: "af_agg_pool_open_slots" 
      labels:
        airflow_id: "$1" 
        pool_name: "$2" 
    - match: "*.pool.queued_slots.*" 
      match_metric_type: gauge
      name: "af_agg_pool_queued_slots" 
      labels:
        airflow_id: "$1" 
        pool_name: "$2" 
    - match: "*.pool.running_slots.*" 
      match_metric_type: gauge
      name: "af_agg_pool_running_slots" 
      labels:
        airflow_id: "$1" 
        pool_name: "$2" 
    - match: "*.pool.starving_tasks.*" 
      match_metric_type: gauge
      name: "af_agg_pool_starving_tasks" 
      labels:
        airflow_id: "$1" 
        pool_name: "$2" 
    - match: "*.smart_sensor_operator.poked_tasks" 
      match_metric_type: gauge
      name: "af_agg_smart_sensor_operator_poked_tasks" 
      labels:
        airflow_id: "$1" 
    - match: "*.smart_sensor_operator.poked_success" 
      match_metric_type: gauge
      name: "af_agg_smart_sensor_operator_poked_success" 
      labels:
        airflow_id: "$1" 
    - match: "*.smart_sensor_operator.poked_exception" 
      match_metric_type: gauge
      name: "af_agg_smart_sensor_operator_poked_exception" 
      labels:
        airflow_id: "$1" 
    - match: "*.smart_sensor_operator.exception_failures" 
      match_metric_type: gauge
      name: "af_agg_smart_sensor_operator_exception_failures" 
      labels:
        airflow_id: "$1" 
    - match: "*.smart_sensor_operator.infra_failures" 
      match_metric_type: gauge
      name: "af_agg_smart_sensor_operator_infra_failures" 
      labels:
        airflow_id: "$1" 

    # === Timers ===
    - match: "*.dagrun.dependency-check.*" 
      match_metric_type: observer
      name: "af_agg_dagrun_dependency_check" 
      labels:
        airflow_id: "$1" 
        dag_id: "$2" 
    - match: "*.dag.*.*.duration" 
      match_metric_type: observer
      name: "af_agg_dag_task_duration" 
      labels:
        airflow_id: "$1" 
        dag_id: "$2" 
        task_id: "$3" 
    - match: "*.dag_processing.last_duration.*" 
      match_metric_type: observer
      name: "af_agg_dag_processing_duration" 
      labels:
        airflow_id: "$1" 
        dag_file: "$2" 
    - match: "*.dagrun.duration.success.*" 
      match_metric_type: observer
      name: "af_agg_dagrun_duration_success" 
      labels:
        airflow_id: "$1" 
        dag_id: "$2" 
    - match: "*.dagrun.duration.failed.*" 
      match_metric_type: observer
      name: "af_agg_dagrun_duration_failed" 
      labels:
        airflow_id: "$1" 
        dag_id: "$2" 
    - match: "*.dagrun.schedule_delay.*" 
      match_metric_type: observer
      name: "af_agg_dagrun_schedule_delay" 
      labels:
        airflow_id: "$1" 
        dag_id: "$2" 
    - match: "*.scheduler.critical_section_duration" 
      match_metric_type: observer
      name: "af_agg_scheduler_critical_section_duration" 
      labels:
        airflow_id: "$1" 
    - match: "*.dagrun.*.first_task_scheduling_delay" 
      match_metric_type: observer
      name: "af_agg_dagrun_first_task_scheduling_delay" 
      labels:
        airflow_id: "$1" 
        dag_id: "$2" 

 

# Git sync
dags:
  persistence:
    # Enable persistent volume for storing dags
    enabled: false
    # Volume size for dags
    size: 1Gi
    # If using a custom storageClass, pass name here
    storageClassName:
    # access mode of the persistent volume
    accessMode: ReadWriteOnce
    ## the name of an existing PVC to use
    existingClaim:
  gitSync:
    #enabled: false
    enabled: true #hgkim
    repo: git@10.***.29.37:hakgeon.kim/sfg-cmm-airflowdags-mb.git
    branch: main #master #hgkim
    rev: HEAD
    depth: 1
    # the number of consecutive failures allowed before aborting
    maxFailures: 0
    # subpath within the repo where dags are located
    # should be "" if dags are at repo root
    subPath: dags/ #"tests/dags" 
    # if your repo needs a user name password
    sshKeySecret: airflow-ssh-git-secret #hgkim
    wait: 10 #60 hgkim
    containerName: git-sync
    uid: 65533
    extraVolumeMounts: []
    env: []
    resources: #{}, hgkim
      limits:
       cpu: 200m
       memory: 256Mi
      requests:
       cpu: 100m
       memory: 128Mi

5. run

helm upgrade --install airflow hgkim/airflow -n airflow -f ./airflow/values.yaml --debug
 

설정(web)

- login
10.***.**.**:30012

- variables

admin > variable > `+`

Key             Val     
ceph_endpoint    http://10.**.74.170:30071
ceph_access_key    access-key    
ceph_secret_key    ********

- connection

conn id   : MyS3Conn
conn Type : S3
(나머지는 공란)
Extra     : {"host" : "http://10.**.74.170:30071", "aws_access_key_id" : "access_key , "aws_secret_access_key": "secret_key " }

bucket 생성 안하면, 로그가 저장되지 않음

특이사항

1. pvc 매핑 안됨

storage class 만들지 않아서 안된거였음

$ k get pvc -n airflow
NAME            STATUS   VOLUME                                     CAPACITY   ACCESS MODES   STORAGECLASS      AGE
log-airflow-0   Bound    pvc-8e8e1356-615b-4e39-90bf-aeea7d3089af   50Gi       RWO            rook-ceph-block   1

 

728x90
반응형