'데이터 엔지니어'로 성장하기

정리하는 걸 좋아하고, 남이 읽으면 더 좋아함

AI/MLOps

Mlflow) python, airflow로 테스트해보기

MightyTedKim 2022. 6. 26. 21:00
728x90
반응형
mlflow를 설치하고 hello world를 실행해봤어요
모델러 입장에서는 git처럼 이력 관리를 할 수 있고, 엔지니어 입장에서는 api를 특정 시점으로  변경할 수 있어서 좋은 것 같아요
제가 생각한 장점은 일단 쉽고 이뻐요
- UI 가 이쁘다.
- 모델간의 비교가 가능하다.
- 모델 정보를 s3에 저장할 수 있다. (pickle, source, parameter, etc)

요약

1. k8s에 mlflow 설치

2. juptyterhub로 테스트

3. airflow로 테스트

4. 결론

설명

1. k8s에 mlflow 설치

mlflow-pgsql.yaml
mlflow-pvc.yaml
mlflow.yaml

https://mightytedkim.tistory.com/55

 

2. juptyterhub로 테스트

아래는 requirements

- 머신러닝은 모르기 때문에 가장 유명한 wine data로 sklearn 라이브러리 예제를 가져옴

$ cat requirements.txt
toml
pyarrow
s3fs==2021.11.1
boto3
mlflow
sklearn

아래는 config 값

- minio나 ceph object storage를 사용할 수 있음

$ cat conf/configs.toml
[app.mlflow]

object_storage_endpoint = 'http://10.***.35.32:30071'
object_storage_bucket   = "***" 
object_storage_key      = "model/1.raw_data/train.csv" 
mlflow_tracking_url     = "http://10.***.35.32:30013" 

#mlflow
alpha = 0.1
l1_ratio = 0.1
artifact_path = '***-model-wine'
registered_model_name = '***-model-wine'

아래는 실행했던 코드

- 인터넷에 돌아가는 코드를 내가 알아보기 쉽도록 조금 고쳐봄

 

$ cat main.py
# Impor/ting in necessary libraries
import os
import numpy as np
import pandas as pd

import logging.config
import toml

from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.linear_model import ElasticNet

import mlflow
import mlflow.sklearn

import io
import pyarrow.parquet as pq
import s3fs
import argparse

configs = toml.load("./conf/configs.toml")
app_configs = configs["app"]
print(app_configs)

class MlflowOperations:
    def __init__(self, conf):
        print("==init==")
        self.conf = conf
        self.alpha = self.conf["alpha"]
        self.l1_ratio = self.conf["l1_ratio"]

        #for Logging model to MLFlo
        os.environ['MLFLOW_S3_ENDPOINT_URL'] = self.conf["object_storage_endpoint"]
        os.environ['AWS_ACCESS_KEY_ID'] = 'access-key'
        os.environ['AWS_SECRET_ACCESS_KEY'] = 'secret-key'

    def open_mlflow_session(self):
        print("==open_mlflow_session==")
        mlflow.set_tracking_uri(self.conf["mlflow_tracking_url"])
        mlflow.set_experiment("***-wine-test")
        return mlflow

    def load(self):
        print("==load==")
        object_storage_fs = s3fs.S3FileSystem(
            anon=False,
            use_ssl=False,
            client_kwargs={
                "region_name": "",
                "endpoint_url": self.conf['object_storage_endpoint'], #'http://10.233.21.208:80',
                "aws_access_key_id": 'access-key',
                "aws_secret_access_key": 'secret-key',
                "verify": False,
            }
        )
        return pd.read_csv(object_storage_fs.open('{}/{}'.format( self.conf['object_storage_bucket'], self.conf['object_storage_key']),mode='rb'))

    def split_data(self):
        print("==split_data==")
        # Loading data from a CSV file
        df_wine = self.load()
        print(df_wine)

        # # Separating the target class ('quality') from remainder of the training data
        X = df_wine.drop(columns = ['quality','kind'])
        y = df_wine[['quality']]
        # Splitting the data into training and validation sets
        X_train, self.X_val, y_train, self.y_val = train_test_split(X, y, random_state = 30)
        self.X_train = np.float32(X_train)
        self.y_train = np.float32(y_train)

        self.X_train = np.nan_to_num(X_train, nan=-9999, posinf=33333333, neginf=33333333)
        self.y_train = np.nan_to_num(y_train, nan=-9999, posinf=33333333, neginf=33333333)

    def train_data(self):
        mlfow = self.open_mlflow_session()
        # Running MLFlow script
        with mlflow.start_run():
            # Instantiating model with model parameters
            model = ElasticNet(alpha = self.alpha, l1_ratio = self.l1_ratio)
            # Fitting training data to the model
            model.fit(self.X_train, self.y_train)
            # Running prediction on validation dataset
            preds = model.predict(self.X_val)
            # Getting metrics on the validation dataset
            rmse = mean_squared_error(preds, self.y_val)
            abs_error = mean_absolute_error(preds, self.y_val)
            r2 = r2_score(preds, self.y_val)

        # Logging params and metrics to MLFlow
            mlflow.log_param('alpha', self.alpha)
            mlflow.log_param('l1_ratio', self.l1_ratio)
            mlflow.log_metric('rmse', rmse)
            mlflow.log_metric('abs_error', abs_error)
            mlflow.log_metric('r2', r2)
        # Logging training data
            #mlflow.log_artifact(local_path = './train.csv')
        # Logging training code
            #mlflow.log_artifact(local_path = './main.py')

        # Logging model to MLFlow
            mlflow.sklearn.log_model(sk_model = model,
                                     artifact_path = self.conf["artifact_path"],
                                     registered_model_name = self.conf["registered_model_name"] )

def main(conf):
    print("==main==")
    # argparse
    parser = argparse.ArgumentParser(description='Argparse')
    parser.add_argument('--alpha',    type=float,   default=0.1)
    parser.add_argument('--l1ratio',  type=float,   default=0.1)
    args    = parser.parse_args()

    # argument check
    if args.alpha != 0.1:
        conf["mlflow"]["alpha"] = args.alpha
    if args.l1ratio != 0.1:
        conf["mlflow"]["l1_ratio"] = args.l1ratio

    print("------")
    print("alpha: " + str(conf["mlflow"]["alpha"]))
    print("l1ratio: " + str(conf["mlflow"]["l1_ratio"]))
    print("------")

    session = MlflowOperations(conf["mlflow"])
    session.split_data()
    session.train_data()

if __name__ == "__main__":
    main(configs["app"])

실행은 terminal로 함

https://github.com/mlflow/mlflow-example/blob/master/train.py

 

3. airflow로 테스트

최근에 내가 관심이 많은 airflow에다가 적용

 

아래는 DAG, KubernetesPodOperator로 image를 실행함.

-  특이한건 arguments를 전달받을 수 있게 함. (restapi  로 외부 호출할라고)

from airflow import DAG
from airflow.utils.dates import days_ago
from datetime import datetime
#operator
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator

dag = DAG(
    'sample-manual-model-wine', 
    start_date=datetime(2022, 6, 25)
    schedule_interval=None,
    catchup=False,
    default_args={ 'owner': 'hgkim' },
    tags=['sample','manual','model','wine']
)
##########################
start_task 	= DummyOperator(task_id='start_task', dag=dag)
end_task 	= DummyOperator(task_id='end_task', dag=dag)
train = KubernetesPodOperator(
    task_id='train',
    name='train',
    namespace='mlflow',
    image='hgkim.harbor/model/sample-train-wine:latest',
    image_pull_policy='Always',
    cmds=['python','/app/mlflow/main.py'],
    arguments=[ 
        '--alpha',   '{{ dag_run.conf["alpha"]   if dag_run.conf else 0.1 }}',
        '--l1ratio', '{{ dag_run.conf["l1ratio"] if dag_run.conf else 0.1 }}'
    ],
    in_cluster=True,
    config_file=None,
    is_delete_operator_pod=True,
    get_logs=True, 
    dag=dag
)

start_task >> train >> end_task

아래는 빌드에 사용했던 dockerfile

- ci/cd 연결해놓으면 편안함

FROM harbor-hgkim/library/python:3.8.7-slim

#docker proxy
COPY ./requirements.txt /tmp
RUN https_proxy=http://168.219.61.252:8080 http_proxy=http://168.219.61.252:8080 pip --trusted-host pypi.org --trusted-host files.pythonhosted.org install --no-cache-dir -r /tmp/requirements.txt

WORKDIR /app/mlflow
COPY python_source/main.py .
COPY python_source/conf/configs.toml conf/

아래는 configs.toml

- conf 는 jupyterhub 할 때랑 달리 약간 바꿈

[s3]
object_storage_endpoint = 'http://10.***.104.240:80' 
object_storage_bucket   = "sample"

[mlflow]
object_storage_key      = "model/1.raw_data/train.csv"
mlflow_tracking_url     = "http://10.*.74.170:30013"

alpha = 0.33 #default
l1_ratio = 0.99 #default
artifact_path = 'sample-model-wine'
registered_model_name = 'sample-model-wine'

아래는 main.py

# Importing in necessary libraries
import os
import numpy as np
import pandas as pd

import logging.config
import toml

from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.linear_model import ElasticNet

import mlflow
import mlflow.sklearn
from mlflow.tracking import MlflowClient

import io
import pyarrow.parquet as pq
import s3fs
import argparse

class MlflowOperations:
    def __init__(self, conf):
        logger.info("==init==")
        self.conf = conf
        self.alpha = self.conf["alpha"]
        self.l1_ratio = self.conf["l1_ratio"]
            
        #for Logging model to MLFlow
        os.environ['MLFLOW_S3_ENDPOINT_URL'] = self.conf["object_storage_endpoint"]
        os.environ['AWS_ACCESS_KEY_ID'] = '***-key'
        os.environ['AWS_SECRET_ACCESS_KEY'] = '***-key'

    def open_mlflow_session(self):
        logger.info("==open_mlflow_session==")
        mlflow.set_tracking_uri(self.conf["mlflow_tracking_url"])
        mlflow.set_experiment("gtc-wine-test")
        return mlflow

    def load(self):
        logger.info("==load==")
        object_storage_fs = s3fs.S3FileSystem(
            anon=False,
            use_ssl=False,
            client_kwargs={
                "region_name": "",
                "endpoint_url": self.conf['object_storage_endpoint'],
                "aws_access_key_id": 'access-key',
                "aws_secret_access_key": 'secret-key',
                "verify": False,
            }
        )
        return pd.read_csv(object_storage_fs.open('{}/{}'.format( self.conf['object_storage_bucket'], self.conf['object_storage_key']),mode='rb'))
    
    def split_data(self):
        logger.info("==split_data==")
        # Loading data from a CSV file
        df_wine = self.load()
        #print(df_wine)

        # # Separating the target class ('quality') from remainder of the training data
        X = df_wine.drop(columns = ['quality','kind'])
        y = df_wine[['quality']]
        # Splitting the data into training and validation sets
        X_train, self.X_val, y_train, self.y_val = train_test_split(X, y, random_state = 30)
        self.X_train = np.float32(X_train)
        self.y_train = np.float32(y_train)

        self.X_train = np.nan_to_num(X_train, nan=-9999, posinf=33333333, neginf=33333333)
        self.y_train = np.nan_to_num(y_train, nan=-9999, posinf=33333333, neginf=33333333)

    def print_auto_logged_info(self, r):
        tags = {k: v for k, v in r.data.tags.items() if not k.startswith("mlflow.")}
        artifacts = [f.path for f in MlflowClient().list_artifacts(r.info.run_id, "model")]
        logger.info("run_id: {}".format(r.info.run_id))
        logger.info("artifacts: {}".format(artifacts))
        logger.info("params: {}".format(r.data.params))
        logger.info("metrics: {}".format(r.data.metrics))
        logger.info("tags: {}".format(tags))
    
    def train_data(self):
        mlfow = self.open_mlflow_session()
        # Running MLFlow script
        with mlflow.start_run() as run:
            # Instantiating model with model parameters
            model = ElasticNet(alpha = self.alpha, l1_ratio = self.l1_ratio)
            # Fitting training data to the model
            model.fit(self.X_train, self.y_train)
            # Running prediction on validation dataset
            preds = model.predict(self.X_val)
            # Getting metrics on the validation dataset
            rmse = mean_squared_error(preds, self.y_val)
            abs_error = mean_absolute_error(preds, self.y_val)
            r2 = r2_score(preds, self.y_val)

            # Logging params and metrics to MLFlow
            mlflow.log_param('alpha', self.alpha)
            mlflow.log_param('l1_ratio', self.l1_ratio)
            mlflow.log_metric('rmse', rmse)
            mlflow.log_metric('abs_error', abs_error)
            mlflow.log_metric('r2', r2)
            # Logging training data
            #mlflow.log_artifact(local_path = './train.csv')
            # Logging training code
            #mlflow.log_artifact(local_path = './main.py')

            # Logging model to MLFlow
            try:
                mlflow.sklearn.log_model(sk_model = model, artifact_path = self.conf["mlflow"]["artifact_path"], registered_model_name = self.conf["registered_model_name"] )
            except Exception as e:
                err_msg = str(e)
                logger.info(f"Error = {err_msg} ")
                raise
            finally:
                # fetch the auto logged parameters and metrics for ended run
                self.print_auto_logged_info(mlflow.get_run(run_id=run.info.run_id))

def main(conf):
    print("==main==")
    # argparse
    parser = argparse.ArgumentParser(description='Argparse')
    parser.add_argument('--alpha',    type=float)
    parser.add_argument('--l1ratio',  type=float)
    args    = parser.parse_args()
    print(args)
    # argument check
    if args.alpha != None:
        conf["mlflow"]["alpha"] = args.alpha
    if args.l1ratio != None:
        conf["mlflow"]["l1_ratio"] = args.l1ratio

    print("------")
    print("alpha: " + str(conf["mlflow"]["alpha"]))
    print("l1ratio: " + str(conf["mlflow"]["l1_ratio"]))
    print("------")

    session = MlflowOperations(conf["mlflow"])
    session.split_data()
    session.train_data()
    
if __name__ == "__main__":
    configs = toml.load("./conf/configs.toml")
    logger = logging.getLogger("default")
    main(configs)

 

4. 결론

 

mlflow는 데이터 분석가들이 모델 이력을 저장하기 위해 사용하는 오픈소스 툴로 알고 있다.

사실 본격적으로 사용해보지는 않았는데, 나중에 api를 특정 시점으로 변경해서 배포하는데 효율적일 것 같다.

 

728x90
반응형