Toward the minimalism of machine learning experiment workflows

Think about what you need and what you don't need

November 21, 2021

(Update on Dec 30, 2021: I created a python library luigiflow for the workflow management I describe here. Any feedback is appreciated!)

I’m a research engineer of machine learning (ML), exploring ML models for various tasks. Like many ML researchers, I spend a lot of time doing experiments. So improving the efficiency of experiment workflows would yield considerable benefits.

MLOps is one of the hottest topics in the field, but I’ve found that the primary goal of MLOps isn’t for me. MLOps aims to improve ML models continuously, whereas I’m responsible for developing new ML models for new tasks. I believe this isn’t the case just for me. I’ve met many people who have the same trouble, from academia to industry. So I started to think about what I don’t need in addition to what I need for managing ML experiments.

MLOps is overwhelming

MLOps requires you to think about a lot of stuff!

Considering my typical work scenario, I found two components that most of the practices consider but I don’t need.

  • I don’t need to consider production (at the beginning).
  • I don’t need to take too much time to develop pipelines.

On the other hand, here are what I actually need to manage my experiments.

  • I need to control task dependencies.
  • I need to manage parameters and outputs of each task.

This article explains why I (don’t) need the first two components. Then I introduce a simple yet effective workflow that I employ in my daily jobs.

I don’t need to consider production (at the beginning)

I wouldn’t say that considering production doesn’t matter. It’s important to monitor performances or to have continuous deployment pipelines to improve product quality. But do you have to think about production? If your goal is to publish a paper, you may not need to do so. Even if you’re working for production, you may not need to think about it when exploring algorithms to solve new tasks. I’d like to have another workflow for managing ML experiments ,independent of the complex flow of MLOps.

I want to focus on experiments

I'd like to focus on experiments, which are complex themselves!

I don’t need to take too much time to develop pipelines

While developing an experimental environment is fun, we have to remember that it’s not the main goal for most of us. My main responsibility is to develop and evaluate ML models to improve the task. I believe many people have been assigned the same responsibility. If so, taking too much time to improve experimental efficiency wouldn’t be a good idea. It’s totally fine not to use “modern” and “state-of-the-art” frameworks if they’re costly.

Building pipelines is fun, but...

Building pipelines is fun, but we tend to ignore actual problems...

I need to control task dependencies

I usually break a machine learning problem down into small tasks. This approach helps me add new experiments quickly. Take an example of training a model of news article classification. I would deconstruct it into the following tasks. There are some dependencies between tasks.

  1. Fetch target article IDs from the database.
  2. Get article titles from Task 1.
  3. Get article bodies from Task 1.
  4. Remove HTML tags from Task 3.
  5. Generate BERT features from Tasks 2 and 4.
  6. Build a BERT classifier model.
  7. Train the BERT classifier generated from Task 6 on the data from Task 5.

Why isolate fetching IDs from fetching article bodies?   This is to enhance the modularity of tasks. If you want to add article titles to the article features, for example, you can just add another task to fetch article titles instead of changing existing task behaviors. There is a trade-off between the modularity and the complexity of dependencies. I prioritize enhancing modularity because workflow engines support dealing with complex dependencies.

I need to manage parameters and outputs of each task

Comparison is essential when developing machine learning algorithms. I often compare different parameters by looking at outputs, so I need functions to support that.

tasks

Graph of tasks and their dependencies. Each task has parameters and outputs.

My workflow: Luigi and MLflow tracking

In the following part, I introduce my simple workflow using Luigi and MLflow. Considering the “need” and “don’t need” features above, I decided to use Luigi and MLflow to manage my experiments. I use Luigi to manage task dependencies and statuses and MLflow to manage parameters, metrics, and results (artifacts) of individual tasks. Both of them are easy to deploy.

mlflow-and-luigi

Roles of Luigi and MLflow.

Each task has its own MLflow experiment. A Luigi task stores parameters, outputs (artifacts), and metrics to its corresponding MLflow experiment. The workflow above has some tasks that don’t look like “experiments”, but it’s totally fine to manage them as experiments in MLflow. (In fact, MLflow is helpful just as storage of parameters and artifacts.)

Lack of some features? You can see a lack of some features such as support for scale-out and UI-based task running. But it’s totally fine because I don’t need them!

Overview of Luigi

Luigi is a workflow manager for Python. It helps us run tasks that have complex dependencies among them. Unlike language-agnostic workflow managers, you can write a task easily by implementing a subclass of luigi.Task. A task class implements three methods: requires(), outputs(), and run(), which specifies task dependencies, output path(s), and the main procedure, respectively.

import luigi

class HelloWorld(luigi.Task):
    def requires(self):
        # requires no tasks
        return None

    def output(self):
        # output to a local file
        # In this example, I assume that the output file is on the local machine,
        # but you can put it somewhere else by replacing `luigi.LocalTarget`.
        return luigi.LocalTarget('helloworld.txt')

    def run(self):
        with self.output().open('w') as outfile:
            outfile.write('Hello World!\n')

A task can have parameters as instances objects, as follows.

class MyTask(luigi.Task):
    date = luigi.DataParameter()
    learning_rate = luigi.FloatParameter(default=.005)

And you can specify parameters at runtime by having luigi.cfg like the following. (See the official document for more detailed information.)

[MyTask]
date = 2021-10-01
learning_rate = 0.1

You can run a task by calling luigi.build().

luigi.build([MyTask(), ]) 

Overview of MLflow

MLflow is a platform to help us manage ML lifecycles. I use MLflow Tracking, which is a component of MLflow for managing ML experiments. (Hereafter, I call MLflow Tracking just MLflow.)

my mlflow tracking UI

This is the UI of MLflow tracking for one of my projects.

mlflow run

For each run, you can preview outputs.

MLflow offers simple APIs. You can create a run in an experiment and make logs as follows.

import mlflow

with mlflow.start_run():
    mlflow.log_param('lr', .005)
    mlflow.log_metric('accuracy', .5)

Sample implementation of luigi.Task integrated with MLflow

For each Luigi Task, such as fetching IDs or clustering, I prepare an experiment in MLflow. To make a template for Luigi tasks to integrate with MLflow, I made a subclass of luigi.Task. I customized the following methods in luigi.Task.

  • def output() : This method specifies the path(s) of a task’s output. I override this method to retrieve them by searching for MLflow.
  • def complete() : This method determines if the task has been done.
    The default implementation of complete determines completion by the existence of its outputs.
import hashlib
import json
from pathlib import Path
from typing import Dict, Any, Optional, final, TypeVar, Callable, List, Tuple

import luigi
import mlflow
from luigi import LocalTarget
from mlflow.entities import Run
from mlflow.protos.service_pb2 import ACTIVE_ONLY

from .utils import get_tmp_filepath  # dummy


T = TypeVar('T')


class MlflowTask(luigi.Task):

    class Meta:
        mlflow_experiment_name = None  # define a name of the MLflow experiment
        mlflow_artifact_fnames = dict()  # {artifact_name: artifact_file_name}
        
    @abstractmethod
    def requires(self) -> Dict[str, luigi.Task]:
        """
        Declare what tasks are need to be done before runnign this task.
        :return: A dictionary consisting of {task_name: task}
        """
        raise NotImplementedError()

    @abstractmethod
    def to_mlflow_tags(self) -> Dict[str, Any]:
        """
        Serialize this task into a tag dictionary.
        """
        raise NotImplementedError()
    
    @abstractmethod
    def _run(self):
        """
        The main script
        """
        raise NotImplementedError()

    @final
    def run(self):
        mlflow.set_experiment(self.Meta.mlflow_experiment_name)
        self._run()

    def search_for_mlflow_run(self) -> Optional[Run]:
        """
        Return an existing run with the same tags if exists.
        """
        experiment = mlflow.get_experiment_by_name(self.Meta.mlflow_experiment_name)
        query_items = [
            f'tag.{pname} = "{pval}"'
            for pname, pval in self.to_mlflow_tags().items()
        ]
        query = ' and '.join(query_items)  # Convert into a query string used in MLflow
        res = mlflow.search_runs(
            experiment_ids=[experiment.experiment_id, ],
            filter_string=query,
            max_results=1,
            run_view_type=ACTIVE_ONLY,
            output_format='list',
        )
        if len(res) > 0:
            return res[0]
        else:
            return None

    def complete(self):
        """
        A task's completion is determined by the existence of a run with the same tags in MLflow.
        """
        is_complete = (self.search_for_mlflow_run() is not None)
        self.logger.info(f'is_complete: {is_complete}')
        return is_complete

    def output(self) -> Dict[str, LocalTarget]:
        """
        - If this task has been already completed, it returns the path to its artifacts.
        - If it hasn't been completed yet,
          return a temporal path to put the artifacts in.
          Then `self.save_to_mlflow` copies these temporal files to MLflow's artifact directory.
        """
        maybe_mlf_run = self.search_for_mlflow_run()
        if maybe_mlf_run is None:
            paths = {
                key: get_tmp_filepath(self, fname)
                for key, fname in self.Meta.mlflow_artifact_fnames.items()
            }
        else:
            paths = {
                key: Path(maybe_mlf_run.info.artifact_uri) / fname
                for key, fname in self.Meta.mlflow_artifact_fnames.items()
            }
        return {
            key: LocalTarget(str(p))
            for key, p in paths.items()
        }

    @final
    def save_to_mlflow(
        self,
        artifacts_and_save_funcs: Dict[str, Tuple[T, Callable[[T, str], None]]] = None,
        metrics: Dict[str, float] = None,
    ):
        """
        Register artifacts and/or metrics to mlflow.
        
        :param artifacts_and_save_funcs:
            List of tuples of `(artifact, artifact_saver)`.
            An `artifact_save` accepts an object and a filepath and dumps the artifact into the filepath.
            e.g. `lambda df, path: df.to_csv(path)` is an artifact_saver of a pandas dataframe.
        """
        artifact_paths: List[str] = []
        artifacts_and_save_funcs = artifacts_and_save_funcs or []
        # Save artifacts
        for name, (artifact, save_fn) in artifacts_and_save_funcs.items():
            out_path = self.output()[name].path
            save_fn(artifact, out_path)
            artifact_paths.append(out_path)
        # Log into MLflow
        with mlflow.start_run():
            mlflow.set_tags(self.to_mlflow_tags())
            if metrics is not None:
                mlflow.log_metrics(metrics)
            for path in artifact_paths:
                mlflow.log_artifact(path)

workflow

Flowchart of MlflowTask.

And these are what each subclass of MlflowTask has to override:

  • class Meta to declare both the name of its MLflow experiment and filenames of its artifacts.
  • def requires to declare dependent tasks.
  • def to_mlflow_tags to specify how to serialize its run.
  • def _run to declare what to do in the task.

For example, FetchArticleTitles (Task 2) can be implemented as follows:

from typing import Any, Dict

import luigi
import pandas as pd

from .tasks.base import MlflowTask
from .tasks.extract_ids import FetchArticleIds  # dummy
from .utils import get_title_from_id  # dummy


class FetchArticleTitles(MlflowTask):

    class Meta:
        mlflow_experiment_name = 'fetch_article_titles'
        mlflow_artifact_fnames = {
            'title': 'titles.csv'
        }

    def requires(self) -> Dict[str, luigi.Task]:
        return {
            'ids': FetchArticleIds(),
        }

    def to_mlflow_tags(self) -> Dict[str, Any]:
        return {
            'ids_path': self.input()['ids']['ids'].path,
        }


    def _run(self):
        articles_df = pd.read_csv(self.input()['ids']['ids'].path)
        # get title
        articles_df['title'] = articles_df['id'].apply(
            lambda x: get_title_from_id(x)
        )
        # Save
        self.save_to_mlflow(
            {
                'title': (articles_df, lambda df, path: df.to_csv(path)),
            }
        )

Summary

This article introduces my two don’ts and dos for ML experiments and my experiment pipeline. I don’t assume that this pipeline works for everyone. Some people may need additional features that Luigi and MLflow don’t support. But I believe that thinking about the “don’t need”s is becoming more important nowadays.

I’d like to thank Sajjadur Rahman and Sara Evensen for giving great feedback!


Profile picture

Thank you for reading this article! This article is written by Wataru Hirota (email: {my_first_name}@whiro.me). Please follow me on X  or connect via LinkedIn  if you're interested in articles like this. :)

© 2023 Wataru Hirota