Mlflow) python, airflow로 테스트해보기
mlflow를 설치하고 hello world를 실행해봤어요
모델러 입장에서는 git처럼 이력 관리를 할 수 있고, 엔지니어 입장에서는 api를 특정 시점으로 변경할 수 있어서 좋은 것 같아요
제가 생각한 장점은 일단 쉽고 이뻐요
- UI 가 이쁘다.
- 모델간의 비교가 가능하다.
- 모델 정보를 s3에 저장할 수 있다. (pickle, source, parameter, etc)
요약
1. k8s에 mlflow 설치
2. juptyterhub로 테스트
3. airflow로 테스트
4. 결론
설명
1. k8s에 mlflow 설치
mlflow-pgsql.yaml
mlflow-pvc.yaml
mlflow.yaml
https://mightytedkim.tistory.com/55
2. juptyterhub로 테스트
아래는 requirements
- 머신러닝은 모르기 때문에 가장 유명한 wine data로 sklearn 라이브러리 예제를 가져옴
$ cat requirements.txt
toml
pyarrow
s3fs==2021.11.1
boto3
mlflow
sklearn
아래는 config 값
- minio나 ceph object storage를 사용할 수 있음
$ cat conf/configs.toml
[app.mlflow]
object_storage_endpoint = 'http://10.***.35.32:30071'
object_storage_bucket = "***"
object_storage_key = "model/1.raw_data/train.csv"
mlflow_tracking_url = "http://10.***.35.32:30013"
#mlflow
alpha = 0.1
l1_ratio = 0.1
artifact_path = '***-model-wine'
registered_model_name = '***-model-wine'
아래는 실행했던 코드
- 인터넷에 돌아가는 코드를 내가 알아보기 쉽도록 조금 고쳐봄
$ cat main.py
# Impor/ting in necessary libraries
import os
import numpy as np
import pandas as pd
import logging.config
import toml
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.linear_model import ElasticNet
import mlflow
import mlflow.sklearn
import io
import pyarrow.parquet as pq
import s3fs
import argparse
configs = toml.load("./conf/configs.toml")
app_configs = configs["app"]
print(app_configs)
class MlflowOperations:
def __init__(self, conf):
print("==init==")
self.conf = conf
self.alpha = self.conf["alpha"]
self.l1_ratio = self.conf["l1_ratio"]
#for Logging model to MLFlo
os.environ['MLFLOW_S3_ENDPOINT_URL'] = self.conf["object_storage_endpoint"]
os.environ['AWS_ACCESS_KEY_ID'] = 'access-key'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'secret-key'
def open_mlflow_session(self):
print("==open_mlflow_session==")
mlflow.set_tracking_uri(self.conf["mlflow_tracking_url"])
mlflow.set_experiment("***-wine-test")
return mlflow
def load(self):
print("==load==")
object_storage_fs = s3fs.S3FileSystem(
anon=False,
use_ssl=False,
client_kwargs={
"region_name": "",
"endpoint_url": self.conf['object_storage_endpoint'], #'http://10.233.21.208:80',
"aws_access_key_id": 'access-key',
"aws_secret_access_key": 'secret-key',
"verify": False,
}
)
return pd.read_csv(object_storage_fs.open('{}/{}'.format( self.conf['object_storage_bucket'], self.conf['object_storage_key']),mode='rb'))
def split_data(self):
print("==split_data==")
# Loading data from a CSV file
df_wine = self.load()
print(df_wine)
# # Separating the target class ('quality') from remainder of the training data
X = df_wine.drop(columns = ['quality','kind'])
y = df_wine[['quality']]
# Splitting the data into training and validation sets
X_train, self.X_val, y_train, self.y_val = train_test_split(X, y, random_state = 30)
self.X_train = np.float32(X_train)
self.y_train = np.float32(y_train)
self.X_train = np.nan_to_num(X_train, nan=-9999, posinf=33333333, neginf=33333333)
self.y_train = np.nan_to_num(y_train, nan=-9999, posinf=33333333, neginf=33333333)
def train_data(self):
mlfow = self.open_mlflow_session()
# Running MLFlow script
with mlflow.start_run():
# Instantiating model with model parameters
model = ElasticNet(alpha = self.alpha, l1_ratio = self.l1_ratio)
# Fitting training data to the model
model.fit(self.X_train, self.y_train)
# Running prediction on validation dataset
preds = model.predict(self.X_val)
# Getting metrics on the validation dataset
rmse = mean_squared_error(preds, self.y_val)
abs_error = mean_absolute_error(preds, self.y_val)
r2 = r2_score(preds, self.y_val)
# Logging params and metrics to MLFlow
mlflow.log_param('alpha', self.alpha)
mlflow.log_param('l1_ratio', self.l1_ratio)
mlflow.log_metric('rmse', rmse)
mlflow.log_metric('abs_error', abs_error)
mlflow.log_metric('r2', r2)
# Logging training data
#mlflow.log_artifact(local_path = './train.csv')
# Logging training code
#mlflow.log_artifact(local_path = './main.py')
# Logging model to MLFlow
mlflow.sklearn.log_model(sk_model = model,
artifact_path = self.conf["artifact_path"],
registered_model_name = self.conf["registered_model_name"] )
def main(conf):
print("==main==")
# argparse
parser = argparse.ArgumentParser(description='Argparse')
parser.add_argument('--alpha', type=float, default=0.1)
parser.add_argument('--l1ratio', type=float, default=0.1)
args = parser.parse_args()
# argument check
if args.alpha != 0.1:
conf["mlflow"]["alpha"] = args.alpha
if args.l1ratio != 0.1:
conf["mlflow"]["l1_ratio"] = args.l1ratio
print("------")
print("alpha: " + str(conf["mlflow"]["alpha"]))
print("l1ratio: " + str(conf["mlflow"]["l1_ratio"]))
print("------")
session = MlflowOperations(conf["mlflow"])
session.split_data()
session.train_data()
if __name__ == "__main__":
main(configs["app"])
실행은 terminal로 함
https://github.com/mlflow/mlflow-example/blob/master/train.py
3. airflow로 테스트
최근에 내가 관심이 많은 airflow에다가 적용
아래는 DAG, KubernetesPodOperator로 image를 실행함.
- 특이한건 arguments를 전달받을 수 있게 함. (restapi 로 외부 호출할라고)
from airflow import DAG
from airflow.utils.dates import days_ago
from datetime import datetime
#operator
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
dag = DAG(
'sample-manual-model-wine',
start_date=datetime(2022, 6, 25)
schedule_interval=None,
catchup=False,
default_args={ 'owner': 'hgkim' },
tags=['sample','manual','model','wine']
)
##########################
start_task = DummyOperator(task_id='start_task', dag=dag)
end_task = DummyOperator(task_id='end_task', dag=dag)
train = KubernetesPodOperator(
task_id='train',
name='train',
namespace='mlflow',
image='hgkim.harbor/model/sample-train-wine:latest',
image_pull_policy='Always',
cmds=['python','/app/mlflow/main.py'],
arguments=[
'--alpha', '{{ dag_run.conf["alpha"] if dag_run.conf else 0.1 }}',
'--l1ratio', '{{ dag_run.conf["l1ratio"] if dag_run.conf else 0.1 }}'
],
in_cluster=True,
config_file=None,
is_delete_operator_pod=True,
get_logs=True,
dag=dag
)
start_task >> train >> end_task
아래는 빌드에 사용했던 dockerfile
- ci/cd 연결해놓으면 편안함
FROM harbor-hgkim/library/python:3.8.7-slim
#docker proxy
COPY ./requirements.txt /tmp
RUN https_proxy=http://192.219.61.252:8080 http_proxy=http://192.219.61.252:8080 pip --trusted-host pypi.org --trusted-host files.pythonhosted.org install --no-cache-dir -r /tmp/requirements.txt
WORKDIR /app/mlflow
COPY python_source/main.py .
COPY python_source/conf/configs.toml conf/
아래는 configs.toml
- conf 는 jupyterhub 할 때랑 달리 약간 바꿈
[s3]
object_storage_endpoint = 'http://10.***.104.240:80'
object_storage_bucket = "sample"
[mlflow]
object_storage_key = "model/1.raw_data/train.csv"
mlflow_tracking_url = "http://10.*.74.170:30013"
alpha = 0.33 #default
l1_ratio = 0.99 #default
artifact_path = 'sample-model-wine'
registered_model_name = 'sample-model-wine'
아래는 main.py
# Importing in necessary libraries
import os
import numpy as np
import pandas as pd
import logging.config
import toml
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.linear_model import ElasticNet
import mlflow
import mlflow.sklearn
from mlflow.tracking import MlflowClient
import io
import pyarrow.parquet as pq
import s3fs
import argparse
class MlflowOperations:
def __init__(self, conf):
logger.info("==init==")
self.conf = conf
self.alpha = self.conf["alpha"]
self.l1_ratio = self.conf["l1_ratio"]
#for Logging model to MLFlow
os.environ['MLFLOW_S3_ENDPOINT_URL'] = self.conf["object_storage_endpoint"]
os.environ['AWS_ACCESS_KEY_ID'] = '***-key'
os.environ['AWS_SECRET_ACCESS_KEY'] = '***-key'
def open_mlflow_session(self):
logger.info("==open_mlflow_session==")
mlflow.set_tracking_uri(self.conf["mlflow_tracking_url"])
mlflow.set_experiment("gtc-wine-test")
return mlflow
def load(self):
logger.info("==load==")
object_storage_fs = s3fs.S3FileSystem(
anon=False,
use_ssl=False,
client_kwargs={
"region_name": "",
"endpoint_url": self.conf['object_storage_endpoint'],
"aws_access_key_id": 'access-key',
"aws_secret_access_key": 'secret-key',
"verify": False,
}
)
return pd.read_csv(object_storage_fs.open('{}/{}'.format( self.conf['object_storage_bucket'], self.conf['object_storage_key']),mode='rb'))
def split_data(self):
logger.info("==split_data==")
# Loading data from a CSV file
df_wine = self.load()
#print(df_wine)
# # Separating the target class ('quality') from remainder of the training data
X = df_wine.drop(columns = ['quality','kind'])
y = df_wine[['quality']]
# Splitting the data into training and validation sets
X_train, self.X_val, y_train, self.y_val = train_test_split(X, y, random_state = 30)
self.X_train = np.float32(X_train)
self.y_train = np.float32(y_train)
self.X_train = np.nan_to_num(X_train, nan=-9999, posinf=33333333, neginf=33333333)
self.y_train = np.nan_to_num(y_train, nan=-9999, posinf=33333333, neginf=33333333)
def print_auto_logged_info(self, r):
tags = {k: v for k, v in r.data.tags.items() if not k.startswith("mlflow.")}
artifacts = [f.path for f in MlflowClient().list_artifacts(r.info.run_id, "model")]
logger.info("run_id: {}".format(r.info.run_id))
logger.info("artifacts: {}".format(artifacts))
logger.info("params: {}".format(r.data.params))
logger.info("metrics: {}".format(r.data.metrics))
logger.info("tags: {}".format(tags))
def train_data(self):
mlfow = self.open_mlflow_session()
# Running MLFlow script
with mlflow.start_run() as run:
# Instantiating model with model parameters
model = ElasticNet(alpha = self.alpha, l1_ratio = self.l1_ratio)
# Fitting training data to the model
model.fit(self.X_train, self.y_train)
# Running prediction on validation dataset
preds = model.predict(self.X_val)
# Getting metrics on the validation dataset
rmse = mean_squared_error(preds, self.y_val)
abs_error = mean_absolute_error(preds, self.y_val)
r2 = r2_score(preds, self.y_val)
# Logging params and metrics to MLFlow
mlflow.log_param('alpha', self.alpha)
mlflow.log_param('l1_ratio', self.l1_ratio)
mlflow.log_metric('rmse', rmse)
mlflow.log_metric('abs_error', abs_error)
mlflow.log_metric('r2', r2)
# Logging training data
#mlflow.log_artifact(local_path = './train.csv')
# Logging training code
#mlflow.log_artifact(local_path = './main.py')
# Logging model to MLFlow
try:
mlflow.sklearn.log_model(sk_model = model, artifact_path = self.conf["mlflow"]["artifact_path"], registered_model_name = self.conf["registered_model_name"] )
except Exception as e:
err_msg = str(e)
logger.info(f"Error = {err_msg} ")
raise
finally:
# fetch the auto logged parameters and metrics for ended run
self.print_auto_logged_info(mlflow.get_run(run_id=run.info.run_id))
def main(conf):
print("==main==")
# argparse
parser = argparse.ArgumentParser(description='Argparse')
parser.add_argument('--alpha', type=float)
parser.add_argument('--l1ratio', type=float)
args = parser.parse_args()
print(args)
# argument check
if args.alpha != None:
conf["mlflow"]["alpha"] = args.alpha
if args.l1ratio != None:
conf["mlflow"]["l1_ratio"] = args.l1ratio
print("------")
print("alpha: " + str(conf["mlflow"]["alpha"]))
print("l1ratio: " + str(conf["mlflow"]["l1_ratio"]))
print("------")
session = MlflowOperations(conf["mlflow"])
session.split_data()
session.train_data()
if __name__ == "__main__":
configs = toml.load("./conf/configs.toml")
logger = logging.getLogger("default")
main(configs)
4. 결론
mlflow는 데이터 분석가들이 모델 이력을 저장하기 위해 사용하는 오픈소스 툴로 알고 있다.
사실 본격적으로 사용해보지는 않았는데, 나중에 api를 특정 시점으로 변경해서 배포하는데 효율적일 것 같다.