'데이터 엔지니어'로 성장하기

정리하는 걸 좋아하고, 남이 읽으면 더 좋아함

Data/Airflow

Airflow) log를 minio에 저장_connection,yaml

MightyTedKim 2021. 11. 12. 08:44
728x90
반응형

하고 나면 너무 간단한데생각보다 인터넷에 관련된 정보가 명확히 나와잇지 않음

똑같은데 그럼 endpoint는 어디다가 입력하는데ㅜㅜ

성공하고 나서 문서를 보면 당연하지만, 작업 중에는 너무나 막막했던 내용

 


설정 방법

1. minio

  • 실제 url에서는 이런식으로 http://1**.**.**.**:9000/object-browser/airflow-logs
  1. bucket 만들기
    1. airflow-logs
      1. http://1**.**.**.**:9000

참고

pyspark 테스트하던 코드는 여기

https://mightytedkim.tistory.com/28 

from pyspark import SparkContext
from pyspark.sql import SparkSession
from time import sleep

spark = SparkSession.builder.getOrCreate()

def load_config(spark_context: SparkContext):
    spark_context._jsc.hadoopConfiguration().set('fs.s3a.access.key', "<your_access_key>")
    spark_context._jsc.hadoopConfiguration().set('fs.s3a.secret.key', "<your_secret_key>")
    spark_context._jsc.hadoopConfiguration().set('fs.s3a.path.style.access', 'true')
    spark_context._jsc.hadoopConfiguration().set('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
    spark_context._jsc.hadoopConfiguration().set('fs.s3a.endpoint', 'http://1**.**.**.**:9000')
    spark_context._jsc.hadoopConfiguration().set('fs.s3a.connection.ssl.enabled', 'false')

load_config(spark.sparkContext)

dataframe = spark.read.json('s3a://test/*')
average = dataframe.agg({'amount': 'avg'})
average.show()

df=spark.read.parquet("s3a://orders/*")
df.show()

 

2. airflow : connection

  • id/pwd는 보안상 문제가 있지만 일단 여기에 입력, host명 endpoint를 여기에다가
  1.  apache-airflow[s3]
    1. pip install  apache-airflow[s3]
      airflow db init
  2. airflow webserver ui
    1. admin > connection
  3. 정보 입력
conn id : MyS3Conn
conn Type : S3
(나머지는 공란)
Extra : 
  1. {"host" : "http://1**.**.**.**:9000", "aws_access_key_id" : "<your_access_key>" , "aws_secret_access_key": "<your_secret_key>" }

 

3. airflow : airflow.cfg

굵은 글씨 3개만 수정하면 됨

조심해야할거는 remote_base_log_folder에 `"` 를 넣으면 안된다는거. 그러면 airflow 자체가 안돌아감

$ airflow config list
(...)
[logging]
base_log_folder = /home/test/airflow/logs
remote_logging = True
remote_log_conn_id = MyS3Conn
google_key_path =
remote_base_log_folder = s3://airflow-logs/test
encrypt_s3_logs = False
logging_level = INFO
fab_logging_level = WARN
logging_config_class =
colored_console_log = True
colored_log_format = [%(blue)s%(asctime)s%(reset)s] {%(blue)s%(filename)s:%(reset)s%(lineno)d} %(log_color)s%(levelname)s%(reset)s - %(log_color)s%(message)s%(reset)s
colored_formatter_class = airflow.utils.log.colored_log.CustomTTYColoredFormatter
log_format = [%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s
simple_log_format = %(asctime)s %(levelname)s - %(message)s
task_log_prefix_template =
log_filename_template = {{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log
log_processor_filename_template = {{ filename }}.log
dag_processor_manager_log_location = /home/test
/airflow/logs/dag_processor_manager/dag_processor_manager.log
task_log_reader = task
extra_loggers =
(...)

이후에 나는 airflow db init을 때려줌

 


특이사항

minio console(ui)에 들어가서 보면 로그가 안보임, 그런데 폴더는 생김

 

UI(minio console)에서 확인

 

http://1**.**.**.**:9090/object-browser/airflow-logs/test/DEV_01_pyspark_minio/main/2021-11-11T09:19:32.388080+00:00

나오는 메시지
There are no Rewind Objects yet.

(minio console 들어가서 readWrite policy 주고 이거거것 해봤는데 안됨)

아니 뭐지 왜 폴더는 잘 생기는데 로그는 안생기지..


command에서 확인 : mc(minioclient)

root@kube01:~/project/minio:]# mc ls -r minio3/airflow-logs/test/DEV_01_pyspark_minio/
[2021-11-11 18:28:34 KST] 6.6KiB main/2021-11-11T09:19:32.388080+00:00/1.log

아 그냥 minio 화면에서만 안나오는구나 하고 안심 ㅋㅋ


그리고 나머지

  1. default port (ui : 9000), (endpoint:9090)
  2. minio console이 UI임 헷갈림
  3. airflow 1.*, 2* 부터 로깅 표시하는곳이 다름
    1. 2.*부터는 [loggging] 부분에만 입력하면 됨

이상 너무나 간단했지만 막막했던 airflow logs to minio 예시

helm 설정도 성공하면 포스팅 예정

 

Helm

helm에는 이렇게 설정하면 됨 ㅎ

# vi values.yaml

env:
  - name: "AIRFLOW__CORE__REMOTE_LOGGING"
    value: True
  - name: "AIRFLOW__CORE__REMOTE_LOG_CONN_ID"
    value: "MyS3Conn"
  - name: "AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER"
    value:  "s3://airflow-logs/temp"

 

airflow webserver에 들어가서 보니, 성공적으로 log가 mount 된 것을 확인할 수 있었음

helm으로 로그 값을 세팅했을 때 scheduler의 로그는 어디서 봐야하는지 아직 모르겠음


saving logs to minio

 

 

 

728x90
반응형