Kubernetes)Airflow설치_helm
버전 : airflow-1.2.0.tgz
링크 : https://airflow.apache.org/docs/helm-chart/stable/index.html
결과
$k get all -n airflow
NAME READY STATUS RESTARTS AGE
pod/airflow-postgresql-0 1/1 Running 0 28d
pod/airflow-scheduler-88588ff5c-9cr2x 3/3 Running 0 6d17h
pod/airflow-statsd-5df44cb959-h64bn 1/1 Running 0 28d
pod/airflow-webserver-ffc95467d-s6gx9 1/1 Running 0 17d
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/airflow-postgresql ClusterIP 10.107.117.120 <none> 5432/TCP 28d
service/airflow-postgresql-headless ClusterIP None <none> 5432/TCP 28d
service/airflow-postgresql-nodeport NodePort 10.103.116.105 <none> 30016:30016/TCP 56d
service/airflow-statsd ClusterIP 10.104.226.69 <none> 9125/UDP,9102/TCP 28d
service/airflow-webserver ClusterIP 10.110.31.162 <none> 8080/TCP 28d
service/airflow-webserver-svc-np NodePort 10.107.203.140 <none> 8080:30012/TCP 59d
NAME READY UP-TO-DATE AVAILABLE AGE
deployment.apps/airflow-scheduler 1/1 1 1 28d
deployment.apps/airflow-statsd 1/1 1 1 28d
deployment.apps/airflow-webserver 1/1 1 1 28d
NAME DESIRED CURRENT READY AGE
replicaset.apps/airflow-scheduler-88588ff5c 1 1 1 28d
replicaset.apps/airflow-statsd-5df44cb959 1 1 1 28d
replicaset.apps/airflow-webserver-ffc95467d 1 1 1 28d
NAME READY AGE
statefulset.apps/airflow-postgresql 1/1 28d
$ k get pvc -n airflow
NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGE
data-airflow-postgresql-0 Bound pvc-d8d79772-5bfa-4e83-b826-320df9dacb5f 8Gi RWO rook-ceph-block 28d
$k get secret -n airflow
NAME TYPE DATA AGE
airflow-airflow-metadata Opaque 1 28d
airflow-broker-url Opaque 1 28d
airflow-create-user-job-token-tz2g7 kubernetes.io/service-account-token 3 28d
airflow-fernet-key Opaque 1 28d
airflow-migrate-database-job-token-hbd56 kubernetes.io/service-account-token 3 28d
airflow-postgresql Opaque 1 28d
airflow-redis-password Opaque 1 28d
airflow-scheduler-token-n59mm kubernetes.io/service-account-token 3 28d
airflow-ssh-git-secret Opaque 1 59d
airflow-statsd-token-7pnrc kubernetes.io/service-account-token 3 28d
airflow-webserver-secret-key Opaque 1 28d
airflow-webserver-token-2pnzj kubernetes.io/service-account-token 3 28d
airflow-worker-token-xxmtf kubernetes.io/service-account-token 3 28d
default-token-xzzjw kubernetes.io/service-account-token 3 59d
sh.helm.release.v1.airflow.v1 helm.sh/release.v1 1 28d
설치
1. secret
git연결
$ kubectl create secret generic airflow-ssh-git-secret --from-file=gitSshKey=./id_rsa -n airflow
$ k get secret airflow-ssh-git-secret -n airflow-test
NAME TYPE DATA AGE
airflow-ssh-git-secret Opaque 1 8d
2. pvc
airflowlog-pv-claim.yaml
airflowpgsql-pv-claim.yaml
airflow-webserver-svc-np.yaml
$ apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: log-airflow-0
namespace: airflow #-webinar
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 50Gi
storageClassName: rook-ceph-block
$ apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: data-airflow-postgresql-0
namespace: airflow
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10Gi
storageClassName: rook-ceph-block
$ apiVersion: v1
kind: Service
metadata:
name: airflow-webserver-svc-np
namespace: airflow
spec:
type: NodePort
selector:
component: webserver
release: airflow
tier: airflow
ports:
# By default and for convenience, the `targetPort` is set to the same value as the `port` field.
- port: 8080
targetPort: 8080
# Optional field
# By default and for convenience, the Kubernetes control plane will allocate a port from a range (default: 30000-32767)
nodePort: 30012
4. ./values.yaml
gitlab 주소 변경, image 경로 변경, 프로메테우스 metric 추가
# Default airflow repository -- overrides all the specific images below
#defaultAirflowRepository: apache/airflow
defaultAirflowRepository: hgkim/library/apache/airflow-custom #hgkim
# Default airflow tag to deploy
#defaultAirflowTag: "2.1.4-20211216-2e54a0d7"
defaultAirflowTag: "latest"
# Airflow version (Used to make some decisions based on Airflow Version being deployed)
airflowVersion: "2.1.4"
# Images
images:
useDefaultImageForMigration: true #false, hgkim
pod_template:
pullPolicy: Always #IfNotPresent,hgkim
flower:
pullPolicy: Always #IfNotPresent,hgkim
statsd:
repository: hgkim/library/apache/airflow
tag: airflow-statsd-exporter-2021.04.28-v0.17.0-20211208
pullPolicy: IfNotPresent
gitSync:
repository: hgkim/library/k8s.gcr.io/git-sync/git-sync
tag: v3.3.0-20211208
# Airflow executor
# Options: LocalExecutor, CeleryExecutor, KubernetesExecutor, CeleryKubernetesExecutor
#executor: "CeleryExecutor"
executor: "KubernetesExecutor" #hgkim
# hgkim_start
env:
- name: "AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY"
value: "hgkim/library/apache/airflow-custom"
- name: "AIRFLOW__KUBERNETES__WORKER_CONATINER_TAG"
value: "latest"
#value: "2.1.4-20211216-2e54a0d7"
- name: "AIRFLOW__KUBERNETES__RUN_AS_USER"
value: "50000"
- name: "AIRFLOW__KUBERNETES__DAGS_IN_IMAGE"
value: "True"
- name: "AIRFLOW__CORE__REMOTE_LOGGING"
value: "True"
- name: "AIRFLOW__CORE__REMOTE_LOG_CONN_ID"
value: "CephObjectConn"
- name: "AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER"
value: "s3://hgkim/airflow/logs"
- name: "AIRFLOW__CORE__DEFAULT_TIMEZONE"
value: "Asia/Seoul"
# hgkim_end
persistence:
# Enable persistent volumes
enabled: false #true hgkim
# Volume size for worker StatefulSet
size: 10Gi # 100Gi, hgkim
# If using a custom storageClass, pass name ref to all statefulSets here
storageClassName:
# Execute init container to chown log directory.
# This is currently only needed in kind, due to usage
# of local-path provisioner.
fixPermissions: false
# Additional mappings for statsd exporter.
#extraMappings: []
extraMappings:
# === Counters ===
- match: "(.+)\\.(.+)_start$"
match_metric_type: counter
name: "af_agg_job_start"
match_type: regex
labels:
airflow_id: "$1"
job_name: "$2"
- match: "(.+)\\.(.+)_end$"
match_metric_type: counter
name: "af_agg_job_end"
match_type: regex
labels:
airflow_id: "$1"
job_name: "$2"
- match: "(.+)\\.operator_failures_(.+)$"
match_metric_type: counter
name: "af_agg_operator_failures"
match_type: regex
labels:
airflow_id: "$1"
operator_name: "$2"
- match: "(.+)\\.operator_successes_(.+)$"
match_metric_type: counter
name: "af_agg_operator_successes"
match_type: regex
labels:
airflow_id: "$1"
operator_name: "$2"
- match: "*.ti_failures"
match_metric_type: counter
name: "af_agg_ti_failures"
labels:
airflow_id: "$1"
- match: "*.ti_successes"
match_metric_type: counter
name: "af_agg_ti_successes"
labels:
airflow_id: "$1"
- match: "*.zombies_killed"
match_metric_type: counter
name: "af_agg_zombies_killed"
labels:
airflow_id: "$1"
- match: "*.scheduler_heartbeat"
match_metric_type: counter
name: "af_agg_scheduler_heartbeat"
labels:
airflow_id: "$1"
instance: "$2"
- match: "*.dag_processing.processes"
match_metric_type: counter
name: "af_agg_dag_processing_processes"
labels:
airflow_id: "$1"
- match: "*.scheduler.tasks.killed_externally"
match_metric_type: counter
name: "af_agg_scheduler_tasks_killed_externally"
labels:
airflow_id: "$1"
- match: "*.scheduler.tasks.running"
match_metric_type: counter
name: "af_agg_scheduler_tasks_running"
labels:
airflow_id: "$1"
- match: "*.scheduler.tasks.starving"
match_metric_type: counter
name: "af_agg_scheduler_tasks_starving"
labels:
airflow_id: "$1"
- match: "*.scheduler.orphaned_tasks.cleared"
match_metric_type: counter
name: "af_agg_scheduler_orphaned_tasks_cleared"
labels:
airflow_id: "$1"
- match: "*.scheduler.orphaned_tasks.adopted"
match_metric_type: counter
name: "af_agg_scheduler_orphaned_tasks_adopted"
labels:
airflow_id: "$1"
- match: "*.scheduler.critical_section_busy"
match_metric_type: counter
name: "af_agg_scheduler_critical_section_busy"
labels:
airflow_id: "$1"
- match: "*.sla_email_notification_failure"
match_metric_type: counter
name: "af_agg_sla_email_notification_failure"
labels:
airflow_id: "$1"
- match: "*.ti.start.*.*"
match_metric_type: counter
name: "af_agg_ti_start"
labels:
airflow_id: "$1"
dag_id: "$2"
task_id: "$3"
- match: "*.ti.finish.*.*.*"
match_metric_type: counter
name: "af_agg_ti_finish"
labels:
airflow_id: "$1"
dag_id: "$2"
task_id: "$3"
state: "$4"
- match: "*.dag.callback_exceptions"
match_metric_type: counter
name: "af_agg_dag_callback_exceptions"
labels:
airflow_id: "$1"
- match: "*.celery.task_timeout_error"
match_metric_type: counter
name: "af_agg_celery_task_timeout_error"
labels:
airflow_id: "$1"
# === Gauges ===
- match: "*.dagbag_size"
match_metric_type: gauge
name: "af_agg_dagbag_size"
labels:
airflow_id: "$1"
- match: "*.dag_processing.import_errors"
match_metric_type: gauge
name: "af_agg_dag_processing_import_errors"
labels:
airflow_id: "$1"
- match: "*.dag_processing.total_parse_time"
match_metric_type: gauge
name: "af_agg_dag_processing_total_parse_time"
labels:
airflow_id: "$1"
- match: "*.dag_processing.last_runtime.*"
match_metric_type: gauge
name: "af_agg_dag_processing_last_runtime"
labels:
airflow_id: "$1"
dag_file: "$2"
- match: "*.dag_processing.last_run.seconds_ago.*"
match_metric_type: gauge
name: "af_agg_dag_processing_last_run_seconds"
labels:
airflow_id: "$1"
dag_file: "$2"
- match: "*.dag_processing.processor_timeouts"
match_metric_type: gauge
name: "af_agg_dag_processing_processor_timeouts"
labels:
airflow_id: "$1"
- match: "*.executor.open_slots"
match_metric_type: gauge
name: "af_agg_executor_open_slots"
labels:
airflow_id: "$1"
- match: "*.executor.queued_tasks"
match_metric_type: gauge
name: "af_agg_executor_queued_tasks"
labels:
airflow_id: "$1"
- match: "*.executor.running_tasks"
match_metric_type: gauge
name: "af_agg_executor_running_tasks"
labels:
airflow_id: "$1"
- match: "*.pool.open_slots.*"
match_metric_type: gauge
name: "af_agg_pool_open_slots"
labels:
airflow_id: "$1"
pool_name: "$2"
- match: "*.pool.queued_slots.*"
match_metric_type: gauge
name: "af_agg_pool_queued_slots"
labels:
airflow_id: "$1"
pool_name: "$2"
- match: "*.pool.running_slots.*"
match_metric_type: gauge
name: "af_agg_pool_running_slots"
labels:
airflow_id: "$1"
pool_name: "$2"
- match: "*.pool.starving_tasks.*"
match_metric_type: gauge
name: "af_agg_pool_starving_tasks"
labels:
airflow_id: "$1"
pool_name: "$2"
- match: "*.smart_sensor_operator.poked_tasks"
match_metric_type: gauge
name: "af_agg_smart_sensor_operator_poked_tasks"
labels:
airflow_id: "$1"
- match: "*.smart_sensor_operator.poked_success"
match_metric_type: gauge
name: "af_agg_smart_sensor_operator_poked_success"
labels:
airflow_id: "$1"
- match: "*.smart_sensor_operator.poked_exception"
match_metric_type: gauge
name: "af_agg_smart_sensor_operator_poked_exception"
labels:
airflow_id: "$1"
- match: "*.smart_sensor_operator.exception_failures"
match_metric_type: gauge
name: "af_agg_smart_sensor_operator_exception_failures"
labels:
airflow_id: "$1"
- match: "*.smart_sensor_operator.infra_failures"
match_metric_type: gauge
name: "af_agg_smart_sensor_operator_infra_failures"
labels:
airflow_id: "$1"
# === Timers ===
- match: "*.dagrun.dependency-check.*"
match_metric_type: observer
name: "af_agg_dagrun_dependency_check"
labels:
airflow_id: "$1"
dag_id: "$2"
- match: "*.dag.*.*.duration"
match_metric_type: observer
name: "af_agg_dag_task_duration"
labels:
airflow_id: "$1"
dag_id: "$2"
task_id: "$3"
- match: "*.dag_processing.last_duration.*"
match_metric_type: observer
name: "af_agg_dag_processing_duration"
labels:
airflow_id: "$1"
dag_file: "$2"
- match: "*.dagrun.duration.success.*"
match_metric_type: observer
name: "af_agg_dagrun_duration_success"
labels:
airflow_id: "$1"
dag_id: "$2"
- match: "*.dagrun.duration.failed.*"
match_metric_type: observer
name: "af_agg_dagrun_duration_failed"
labels:
airflow_id: "$1"
dag_id: "$2"
- match: "*.dagrun.schedule_delay.*"
match_metric_type: observer
name: "af_agg_dagrun_schedule_delay"
labels:
airflow_id: "$1"
dag_id: "$2"
- match: "*.scheduler.critical_section_duration"
match_metric_type: observer
name: "af_agg_scheduler_critical_section_duration"
labels:
airflow_id: "$1"
- match: "*.dagrun.*.first_task_scheduling_delay"
match_metric_type: observer
name: "af_agg_dagrun_first_task_scheduling_delay"
labels:
airflow_id: "$1"
dag_id: "$2"
# Git sync
dags:
persistence:
# Enable persistent volume for storing dags
enabled: false
# Volume size for dags
size: 1Gi
# If using a custom storageClass, pass name here
storageClassName:
# access mode of the persistent volume
accessMode: ReadWriteOnce
## the name of an existing PVC to use
existingClaim:
gitSync:
#enabled: false
enabled: true #hgkim
repo: git@10.***.29.37:hakgeon.kim/sfg-cmm-airflowdags-mb.git
branch: main #master #hgkim
rev: HEAD
depth: 1
# the number of consecutive failures allowed before aborting
maxFailures: 0
# subpath within the repo where dags are located
# should be "" if dags are at repo root
subPath: dags/ #"tests/dags"
# if your repo needs a user name password
sshKeySecret: airflow-ssh-git-secret #hgkim
wait: 10 #60 hgkim
containerName: git-sync
uid: 65533
extraVolumeMounts: []
env: []
resources: #{}, hgkim
limits:
cpu: 200m
memory: 256Mi
requests:
cpu: 100m
memory: 128Mi
5. run
helm upgrade --install airflow hgkim/airflow -n airflow -f ./airflow/values.yaml --debug
설정(web)
- login
10.***.**.**:30012
- variables
admin > variable > `+`
Key Val
ceph_endpoint http://10.**.74.170:30071
ceph_access_key access-key
ceph_secret_key ********
- connection
conn id : MyS3Conn
conn Type : S3
(나머지는 공란)
Extra : {"host" : "http://10.**.74.170:30071", "aws_access_key_id" : "access_key , "aws_secret_access_key": "secret_key " }
bucket 생성 안하면, 로그가 저장되지 않음
특이사항
1. pvc 매핑 안됨
storage class 만들지 않아서 안된거였음
$ k get pvc -n airflow
NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGE
log-airflow-0 Bound pvc-8e8e1356-615b-4e39-90bf-aeea7d3089af 50Gi RWO rook-ceph-block 1