(Update on Dec 30, 2021: I created a python library luigiflow for the workflow management I describe here. Any feedback is appreciated!)
I’m a research engineer of machine learning (ML), exploring ML models for various tasks. Like many ML researchers, I spend a lot of time doing experiments. So improving the efficiency of experiment workflows would yield considerable benefits.
MLOps is one of the hottest topics in the field, but I’ve found that the primary goal of MLOps isn’t for me. MLOps aims to improve ML models continuously, whereas I’m responsible for developing new ML models for new tasks. I believe this isn’t the case just for me. I’ve met many people who have the same trouble, from academia to industry. So I started to think about what I don’t need in addition to what I need for managing ML experiments.
Considering my typical work scenario, I found two components that most of the practices consider but I don’t need.
- I don’t need to consider production (at the beginning).
- I don’t need to take too much time to develop pipelines.
On the other hand, here are what I actually need to manage my experiments.
- I need to control task dependencies.
- I need to manage parameters and outputs of each task.
This article explains why I (don’t) need the first two components. Then I introduce a simple yet effective workflow that I employ in my daily jobs.
I don’t need to consider production (at the beginning)
I wouldn’t say that considering production doesn’t matter. It’s important to monitor performances or to have continuous deployment pipelines to improve product quality. But do you have to think about production? If your goal is to publish a paper, you may not need to do so. Even if you’re working for production, you may not need to think about it when exploring algorithms to solve new tasks. I’d like to have another workflow for managing ML experiments ,independent of the complex flow of MLOps.
I don’t need to take too much time to develop pipelines
While developing an experimental environment is fun, we have to remember that it’s not the main goal for most of us. My main responsibility is to develop and evaluate ML models to improve the task. I believe many people have been assigned the same responsibility. If so, taking too much time to improve experimental efficiency wouldn’t be a good idea. It’s totally fine not to use “modern” and “state-of-the-art” frameworks if they’re costly.
I need to control task dependencies
I usually break a machine learning problem down into small tasks. This approach helps me add new experiments quickly. Take an example of training a model of news article classification. I would deconstruct it into the following tasks. There are some dependencies between tasks.
- Fetch target article IDs from the database.
- Get article titles from Task 1.
- Get article bodies from Task 1.
- Remove HTML tags from Task 3.
- Generate BERT features from Tasks 2 and 4.
- Build a BERT classifier model.
- Train the BERT classifier generated from Task 6 on the data from Task 5.
Why isolate fetching IDs from fetching article bodies? This is to enhance the modularity of tasks. If you want to add article titles to the article features, for example, you can just add another task to fetch article titles instead of changing existing task behaviors. There is a trade-off between the modularity and the complexity of dependencies. I prioritize enhancing modularity because workflow engines support dealing with complex dependencies.
I need to manage parameters and outputs of each task
Comparison is essential when developing machine learning algorithms. I often compare different parameters by looking at outputs, so I need functions to support that.
My workflow: Luigi and MLflow tracking
In the following part, I introduce my simple workflow using Luigi and MLflow. Considering the “need” and “don’t need” features above, I decided to use Luigi and MLflow to manage my experiments. I use Luigi to manage task dependencies and statuses and MLflow to manage parameters, metrics, and results (artifacts) of individual tasks. Both of them are easy to deploy.
Each task has its own MLflow experiment. A Luigi task stores parameters, outputs (artifacts), and metrics to its corresponding MLflow experiment. The workflow above has some tasks that don’t look like “experiments”, but it’s totally fine to manage them as experiments in MLflow. (In fact, MLflow is helpful just as storage of parameters and artifacts.)
Lack of some features? You can see a lack of some features such as support for scale-out and UI-based task running. But it’s totally fine because I don’t need them!
Overview of Luigi
Luigi is a workflow manager for Python.
It helps us run tasks that have complex dependencies among them.
Unlike language-agnostic workflow managers,
you can write a task easily by implementing a subclass of luigi.Task
.
A task class implements three methods: requires()
, outputs()
, and run()
,
which specifies task dependencies, output path(s), and the main procedure, respectively.
import luigi
class HelloWorld(luigi.Task):
def requires(self):
# requires no tasks
return None
def output(self):
# output to a local file
# In this example, I assume that the output file is on the local machine,
# but you can put it somewhere else by replacing `luigi.LocalTarget`.
return luigi.LocalTarget('helloworld.txt')
def run(self):
with self.output().open('w') as outfile:
outfile.write('Hello World!\n')
A task can have parameters as instances objects, as follows.
class MyTask(luigi.Task):
date = luigi.DataParameter()
learning_rate = luigi.FloatParameter(default=.005)
And you can specify parameters at runtime by having luigi.cfg
like the following. (See the official document for more detailed information.)
[MyTask]
date = 2021-10-01
learning_rate = 0.1
You can run a task by calling luigi.build()
.
luigi.build([MyTask(), ])
Overview of MLflow
MLflow is a platform to help us manage ML lifecycles. I use MLflow Tracking, which is a component of MLflow for managing ML experiments. (Hereafter, I call MLflow Tracking just MLflow.)
MLflow offers simple APIs. You can create a run in an experiment and make logs as follows.
import mlflow
with mlflow.start_run():
mlflow.log_param('lr', .005)
mlflow.log_metric('accuracy', .5)
Sample implementation of luigi.Task
integrated with MLflow
For each Luigi Task, such as fetching IDs or clustering,
I prepare an experiment in MLflow.
To make a template for Luigi tasks to integrate with MLflow,
I made a subclass of luigi.Task
.
I customized the following methods in luigi.Task
.
def output()
: This method specifies the path(s) of a task’s output. I override this method to retrieve them by searching for MLflow.def complete()
: This method determines if the task has been done.
The default implementation ofcomplete
determines completion by the existence of its outputs.
import hashlib
import json
from pathlib import Path
from typing import Dict, Any, Optional, final, TypeVar, Callable, List, Tuple
import luigi
import mlflow
from luigi import LocalTarget
from mlflow.entities import Run
from mlflow.protos.service_pb2 import ACTIVE_ONLY
from .utils import get_tmp_filepath # dummy
T = TypeVar('T')
class MlflowTask(luigi.Task):
class Meta:
mlflow_experiment_name = None # define a name of the MLflow experiment
mlflow_artifact_fnames = dict() # {artifact_name: artifact_file_name}
@abstractmethod
def requires(self) -> Dict[str, luigi.Task]:
"""
Declare what tasks are need to be done before runnign this task.
:return: A dictionary consisting of {task_name: task}
"""
raise NotImplementedError()
@abstractmethod
def to_mlflow_tags(self) -> Dict[str, Any]:
"""
Serialize this task into a tag dictionary.
"""
raise NotImplementedError()
@abstractmethod
def _run(self):
"""
The main script
"""
raise NotImplementedError()
@final
def run(self):
mlflow.set_experiment(self.Meta.mlflow_experiment_name)
self._run()
def search_for_mlflow_run(self) -> Optional[Run]:
"""
Return an existing run with the same tags if exists.
"""
experiment = mlflow.get_experiment_by_name(self.Meta.mlflow_experiment_name)
query_items = [
f'tag.{pname} = "{pval}"'
for pname, pval in self.to_mlflow_tags().items()
]
query = ' and '.join(query_items) # Convert into a query string used in MLflow
res = mlflow.search_runs(
experiment_ids=[experiment.experiment_id, ],
filter_string=query,
max_results=1,
run_view_type=ACTIVE_ONLY,
output_format='list',
)
if len(res) > 0:
return res[0]
else:
return None
def complete(self):
"""
A task's completion is determined by the existence of a run with the same tags in MLflow.
"""
is_complete = (self.search_for_mlflow_run() is not None)
self.logger.info(f'is_complete: {is_complete}')
return is_complete
def output(self) -> Dict[str, LocalTarget]:
"""
- If this task has been already completed, it returns the path to its artifacts.
- If it hasn't been completed yet,
return a temporal path to put the artifacts in.
Then `self.save_to_mlflow` copies these temporal files to MLflow's artifact directory.
"""
maybe_mlf_run = self.search_for_mlflow_run()
if maybe_mlf_run is None:
paths = {
key: get_tmp_filepath(self, fname)
for key, fname in self.Meta.mlflow_artifact_fnames.items()
}
else:
paths = {
key: Path(maybe_mlf_run.info.artifact_uri) / fname
for key, fname in self.Meta.mlflow_artifact_fnames.items()
}
return {
key: LocalTarget(str(p))
for key, p in paths.items()
}
@final
def save_to_mlflow(
self,
artifacts_and_save_funcs: Dict[str, Tuple[T, Callable[[T, str], None]]] = None,
metrics: Dict[str, float] = None,
):
"""
Register artifacts and/or metrics to mlflow.
:param artifacts_and_save_funcs:
List of tuples of `(artifact, artifact_saver)`.
An `artifact_save` accepts an object and a filepath and dumps the artifact into the filepath.
e.g. `lambda df, path: df.to_csv(path)` is an artifact_saver of a pandas dataframe.
"""
artifact_paths: List[str] = []
artifacts_and_save_funcs = artifacts_and_save_funcs or []
# Save artifacts
for name, (artifact, save_fn) in artifacts_and_save_funcs.items():
out_path = self.output()[name].path
save_fn(artifact, out_path)
artifact_paths.append(out_path)
# Log into MLflow
with mlflow.start_run():
mlflow.set_tags(self.to_mlflow_tags())
if metrics is not None:
mlflow.log_metrics(metrics)
for path in artifact_paths:
mlflow.log_artifact(path)
And these are what each subclass of MlflowTask
has to override:
class Meta
to declare both the name of its MLflow experiment and filenames of its artifacts.def requires
to declare dependent tasks.def to_mlflow_tags
to specify how to serialize its run.def _run
to declare what to do in the task.
For example, FetchArticleTitles
(Task 2) can be implemented as follows:
from typing import Any, Dict
import luigi
import pandas as pd
from .tasks.base import MlflowTask
from .tasks.extract_ids import FetchArticleIds # dummy
from .utils import get_title_from_id # dummy
class FetchArticleTitles(MlflowTask):
class Meta:
mlflow_experiment_name = 'fetch_article_titles'
mlflow_artifact_fnames = {
'title': 'titles.csv'
}
def requires(self) -> Dict[str, luigi.Task]:
return {
'ids': FetchArticleIds(),
}
def to_mlflow_tags(self) -> Dict[str, Any]:
return {
'ids_path': self.input()['ids']['ids'].path,
}
def _run(self):
articles_df = pd.read_csv(self.input()['ids']['ids'].path)
# get title
articles_df['title'] = articles_df['id'].apply(
lambda x: get_title_from_id(x)
)
# Save
self.save_to_mlflow(
{
'title': (articles_df, lambda df, path: df.to_csv(path)),
}
)
Summary
This article introduces my two don’ts and dos for ML experiments and my experiment pipeline. I don’t assume that this pipeline works for everyone. Some people may need additional features that Luigi and MLflow don’t support. But I believe that thinking about the “don’t need”s is becoming more important nowadays.
I’d like to thank Sajjadur Rahman and Sara Evensen for giving great feedback!