ミニマルな機械学習の実験管理を目指して

必要なもの・必要でないものから考える実験環境構築

December 15, 2021

(追記 (2021/12/30): ここで述べた方法を実践するための Python パッケージ luigiflow を公開しました。)

(この記事は私が書いた英語版の記事 Toward the minimalism of machine learning experiment workflows を私自身が翻訳したものです。 元記事の作成にあたり友人の Sajjadur RahmanSara Evensen にフィードバックをいただいたことを感謝します。)

本記事は、ストックマーク Advent Calendar 2021 の 15 日目の記事です。

機械学習分野の研究者やエンジニアの多くにとって実験は業務の大きなコストを占めます。 そのため実験環境の構築が業務のパフォーマンスに直結することは言うまでもありません。 それと関連して近年 MLOps への注目が非常に高まっていますが、 MLOps にまつわる手法はプロダクトへの反映に重点が置かれており、 単に機械学習の実験だけ行いたい場合はオーバースペックになりがちです。 これは私だけの問題ではなく社内外でよく聞く話だったため、 私がどのようにして実験環境をデザインしたかという実例が少しでも参考になればと思いこの記事を書きました。

MLOps is overwhelming

MLOps で考えることは実に様々。

私は実験環境について考える際、まず業務フローにおいてどんな機能が必要でどんな機能が必要でないかを考えることから始めました。 その結果、以下の 2 つの必要ないものと 2 つの必要なものが浮き彫りになりました。

  • (最初から) プロダクションについて考える必要はない
  • 実験環境の構築に時間をかけすぎる必要はない
  • タスクの依存性を管理する必要がある
  • 各タスクのパラメータと出力を管理する必要がある

この記事ではこれらの必要・不必要の背景について説明した後、これらの点を反映して私が構築した Luigi と MLflow で作るシンプルな実験管理環境について紹介します。

(最初から) プロダクションについて考える必要はない

最初に断っておきますが、プロダクションとモデル開発との持続的な連携は非常に重要です。 機械学習のモデルは一度作っておしまいではないですし、継続的なテストやデプロイ、 あるいは精度のモニタリングはプロダクトの質を向上に直結します。 ただ個人的には、 プロダクションに向けたモデル開発とディスカバリー段階でのモデル実験は分けた方がすっきりするのではないかと思います。 つまりまず新しいタスクにして様々なモデルを検証し、そこで定まったモデルをプロダクションに向けて再び実装するのが良いと思っています。 この方針は一見モデル実装が 2 度必要になるため非効率に思えますが、私はそれよりも前者の実験環境をシンプルに保てるメリットが大きいと思っています。

I want to focus on experiments

実験とプロダクションの連携ではなく、実験そのものを整理したい!

実験環境構築に時間をかけすぎる必要はない

この部分をを自動化してここはリアルタイムで UI 上ににステータスが反映されて… といったように実験環境を構築するることはそれ自身とても楽しい作業です。 しかし多くの機械学習研究者・エンジニアにとって実験環境の構築は主な目的ではなあく、 あくまで実験によって新しい知見を得ることが主な目的です。 私自身もこうした作業に時間をかけすぎてしまい、 本来の問題であるモデルの検証に割く時間がなくなってしまった苦い経験がたくさんあります。 このような自分への戒めも込めて、私はできるだけデプロイや管理に時間がかからない方法を選びました。

Building pipelines is fun, but...

環境作りは楽しいですが、しばしば本当の問題を置き去りにしてしまいます...

タスクの依存関係を管理する必要がある

一方で私にとって必要な機能の1つがタスクの依存関係の管理です。 私の場合、ある ML のタスクが与えられた時にまずそれを細かいタスク (機能) に分類します。 例えばニュース記事の分類というタスクを考えた際、私は以下のような細かいタスクに分割します。

  1. DB から対象となる記事 ID を取得する。
  2. 1の ID から記事タイトルを取得する。
  3. 1の ID から記事本文を取得する。
  4. 3で取得した本文から HTML タグを除去する。
  5. 2と4の出力から BERT への入力を生成する。
  6. 5で生成した入力に対して BERT の分類器を適用し、結果を保存する。

いくつかのタスクの間には依存関係が存在します。 例えば上記のタスク4はタスク3の実行を待たなくてはなりません (=タスク4はタスク3に依存する) が、 このような制御は煩雑で人手で行うとミスのもとになります。 そのためタスクの依存関係の管理はライブラリに任せたいと思います。

なぜここまでタスクを細かく分けるのか?   ここではタスクをかなり細かく分けています。 例えば 1 の記事 ID の取得と 2 の記事タイトルの取得は同じタスクでも良いかもしれません。 しかし例えば記事タイトルに新しい前処理を追加して効果を検証したい場合、 記事 ID の取得とタイトルの取得を分けていれば既存のタスクを修正することなく新しいタスク (前処理) を追加するだけで前処理を追加できます。 タスクの細かさとタスクの依存関係の複雑さはトレードオフの関係にありますが、 私の場合はタスクを細かくすることを優先し、その結果生じる依存関係の複雑さを Luigi によって緩和するという方針を取っています。

タスクのパラメータと出力を管理する必要がある

比較は実験の重要な要素です。 機械学習の実験においても複数のパラメータを Accuracy や F1-score などのメトリクスを用いて比較することが多々あります。 私の場合たいていのタスクのパラメータの数はとても多く手動で管理するにはコストがかかりすぎるので、 タスクのパラメータや出力、あるいはメトリクスを管理する機能は必要です。

tasks

タスクとその依存関係の例。それぞれのタスクはパラメータや出力を持つ。

Luigi と MLflow によるワークフロー

以下では私が現在採用している LuigiMLflow による実験管理方法を紹介します。 ここでは Luigi はタスクの実行とタスク間の依存関係の管理に使用し、 MLflow は個々のタスクのパラメータ・出力・メトリクスを管理するのに用います。 両者とも運用が非常に簡単なため (どちらもコマンド1つで起動します)、 これから紹介する方法は気軽に試していただけると思います。

mlflow-and-luigi

Roles of Luigi and MLflow.

Luigi のタスクはそれぞれ MLflow の対応する experiment を1つ持ちます。 記事 ID を取得、などは “experiment” と呼ぶには違和感があるかもしれませんが、 MLflow はこれらのパラメータ (引数) や出力を保存するだけでも十分使う価値があります。

機能が少なすぎ? Python 以外のプログラムとの連携やデバッグ、様々なモニタリングツールなど この方法ではサポートされていない便利な機能がたくさんあります。 もちろんそれらがあるに越したことはないのですが、 それによって管理のオーバーヘッドが増大する場合は (私のユースケースでは) 無くても良いと思います。 もちろんこれはトレードオフの問題で中にはこれらの機能が必要な場合もあると思いますが、 “必要ないものは必要ない” というミニマリスト的な判断基準はあると便利です。

Luigi について

Luigi は Python で使用できるワークフローマネージャーです。 Luigi を用いることでタスクの実行管理および依存関係を簡単に管理することができます。 Luigi では個々のタスクを luigi.Task をオーバーライドすることで実装します。 その際に実装するメソッドは requires(), outputs(), run() の 3 つで、 それぞれに依存するタスク、出力先、実行内容を記述します。 例えば Hellow World! という文字列をテキストファイルに出力するタスクは以下のように書くことができます。

import luigi

class HelloWorld(luigi.Task):
    def requires(self):
        # requires no tasks
        return None

    def output(self):
        # output to a local file
        # `luigi.LocalTarget` を置き換えることでローカル以外にもファイルを保存することができます。
        return luigi.LocalTarget('helloworld.txt')

    def run(self):
        with self.output().open('w') as outfile:
            outfile.write('Hello World!\n')

タスクはパラメータを持つこともできます。パラメータは luigi.Parameter() (あるいはその子クラス) のインスタンスであるようなインスタンスオブジェクト (あるいはクラスオブジェクト) として定義します。

class MyTask(luigi.Task):
    date = luigi.DataParameter()
    learning_rate = luigi.FloatParameter(default=.005)

パラメータは luigi.cfg という外部ファイルに記述することもできます。 これによってプログラムをパラメータの値を分離できるため、 私は常にこの方法を用いています。 (これについて詳しくは 公式ドキュメント を参照してください。) 例えば上の MyTask のパラメータは、次のような luigi.cfg を書くことで指定できます。

[MyTask]
date = 2021-10-01
learning_rate = 0.1

タスクを実行するには luigi.build を呼び出します。

luigi.build([MyTask(), ]) 

MLflow について

MLflow は MLOps を総合的にサポートするツール群です。 今回は MLflow のパッケージのうち、機械学習の実験をサポートする MLflow Tracking を使用します。 (以降では簡単のため MLflow Tracking を単に MLflow と呼びます。) MLflow を用いることで、パラメータや出力などをブラウザから簡単に閲覧・比較できるようになります。

my mlflow tracking UI

MLflow の画面例。

mlflow run

MLflow ではブラウザ上でタスクの出力がプレビューできます。

MLflow の Python API は非常にシンプルで、 例えばパラメータやメトリクスは以下のようなコードで保存することができます。

import mlflow

with mlflow.start_run():
    mlflow.log_param('lr', .005)
    mlflow.log_metric('accuracy', .5)

MLflow と連携する luigi.Task のテンプレート

luigi.Task と MLflow の連携を自動化するために、 私は以下のようなクラスを全てのタスクの基底クラスとしています。 このクラスでは以下の 2 つのメソッドの挙動をオリジナルの luigi.Task から変更しています。

  • def output() このメソッドはタスクの出力先を指定します。 ここでは MLflow の対応する run の artifact path を保存します。
  • def complete() このメソッドにはタスクが実行済みかどうかを判断するロジックが格納されます。 元の実装ではこれをアウトプットが存在してるかどうかで判断していましたが、 今回は MLflow で該当する run が完了したかどうかで判断することにしました。

基底クラスはこのような感じです。

import hashlib
import json
from pathlib import Path
from typing import Dict, Any, Optional, final, TypeVar, Callable, List, Tuple

import luigi
import mlflow
from luigi import LocalTarget
from mlflow.entities import Run
from mlflow.protos.service_pb2 import ACTIVE_ONLY

from .utils import get_tmp_filepath  # dummy


T = TypeVar('T')


class MlflowTask(luigi.Task):

    class Meta:
        # 対応する MLflow の experiment の名前 
        mlflow_experiment_name = None
        # {出力ファイルの識別名: 出力ファイルのファイル名}
        # 例 `{"ids": "ids.csv", "embeddings": "embeddings.pickle"}`
        mlflow_artifact_fnames = dict()
        
    @abstractmethod
    def requires(self) -> Dict[str, luigi.Task]:
        """
        :return: {タスク名: タスク} からなる dict。例: `{"texts": FetchText(), "embeddings": EmbeddingTexts()}`
        """
        raise NotImplementedError()

    @abstractmethod
    def to_mlflow_tags(self) -> Dict[str, Any]:
        """
        パラメータを MLflow のタグにシリアライズする。
        """
        raise NotImplementedError()
    
    @abstractmethod
    def _run(self):
        """
        メインの処理 (入力から出力まで書く)
        """
        raise NotImplementedError()

    @final
    def run(self):
        mlflow.set_experiment(self.Meta.mlflow_experiment_name)
        self._run()

    def search_for_mlflow_run(self) -> Optional[Run]:
        """
        :return: 同じタグを持つ run が MLflow に既に存在する場合はそれを返す。無ければ None を返す。
        """
        experiment = mlflow.get_experiment_by_name(self.Meta.mlflow_experiment_name)
        # MLflow 用のクエリ文字列を生成する
        query_items = [
            f'tag.{pname} = "{pval}"'
            for pname, pval in self.to_mlflow_tags().items()
        ]
        query = ' and '.join(query_items)  # Convert into a query string used in MLflow
        res = mlflow.search_runs(
            experiment_ids=[experiment.experiment_id, ],
            filter_string=query,
            max_results=1,
            run_view_type=ACTIVE_ONLY,
            output_format='list',
        )
        if len(res) > 0:
            return res[0]
        else:
            return None

    def complete(self):
        # run が MLflow に存在すれば完了済みとみなす。
        is_complete = (self.search_for_mlflow_run() is not None)
        self.logger.info(f'is_complete: {is_complete}')
        return is_complete

    def output(self) -> Dict[str, LocalTarget]:
        """
        試用
        - タスクが既に完了してる (= MLflow に run がある) 場合は、その artifact path(s) を返す。
        - タスクがまだ完了していない  (= MLflow に run がない) 場合は、一時ディレクトリ path を返す。
          (MLflow run を作るまでの一時的な待避先)
        """
        maybe_mlf_run = self.search_for_mlflow_run()
        if maybe_mlf_run is None:
            paths = {
                key: get_tmp_filepath(self, fname)
                for key, fname in self.Meta.mlflow_artifact_fnames.items()
            }
        else:
            paths = {
                key: Path(maybe_mlf_run.info.artifact_uri) / fname
                for key, fname in self.Meta.mlflow_artifact_fnames.items()
            }
        return {
            key: LocalTarget(str(p))
            for key, p in paths.items()
        }

    @final
    def save_to_mlflow(
        self,
        artifacts_and_save_funcs: Dict[str, Tuple[T, Callable[[T, str], None]]] = None,
        metrics: Dict[str, float] = None,
    ):
        """
        出力 (artifact), メトリクスを MLflow に登録する。`_run()` から呼び出されることを想定。
        
        :param artifacts_and_save_funcs:
            `(artifact_obj, artifact_saver)`. のリスト。
           `artifact_saver` は artifact とファイルパスを受け取り、artifact をそのパスに保存する関数。
           `artifact_saver` の一例:  `lambda df, path: df.to_csv(path)`
        """
        artifact_paths: List[str] = []
        artifacts_and_save_funcs = artifacts_and_save_funcs or []
        # Save artifacts
        for name, (artifact, save_fn) in artifacts_and_save_funcs.items():
            out_path = self.output()[name].path
            save_fn(artifact, out_path)
            artifact_paths.append(out_path)
        # Log into MLflow
        with mlflow.start_run():
            mlflow.set_tags(self.to_mlflow_tags())
            if metrics is not None:
                mlflow.log_metrics(metrics)
            for path in artifact_paths:
                mlflow.log_artifact(path)

workflow

`MLflowTask` のフローチャート。

個々のタスクは MlflowTask を継承し、次の 4 つをオーバーライドすることで実装します。

  • class Meta クラスのメタ情報 (MLflow の experiment 名、出力ファイル名) を指定する。
  • def requires 依存タスクを dict 形式で指定する。
  • def to_mlflow_tags MLflow の Tag へのシリアライズ方法を指定する。
  • def _run メインの処理を指定する。

例えば、記事のタイトルを DB から取得するタスク FetchArticleTitles は次のように実装できます。

from typing import Any, Dict

import luigi
import pandas as pd

from .tasks.base import MlflowTask
from .tasks.extract_ids import FetchArticleIds  # dummy
from .utils import get_title_from_id  # dummy


class FetchArticleTitles(MlflowTask):

    class Meta:
        mlflow_experiment_name = 'fetch_article_titles'
        mlflow_artifact_fnames = {
            'title': 'titles.csv'
        }

    def requires(self) -> Dict[str, luigi.Task]:
        return {
            'ids': FetchArticleIds(),
        }

    def to_mlflow_tags(self) -> Dict[str, Any]:
        return {
            'ids_path': self.input()['ids']['ids'].path,
        }


    def _run(self):
        articles_df = pd.read_csv(self.input()['ids']['ids'].path)
        # get title
        articles_df['title'] = articles_df['id'].apply(
            lambda x: get_title_from_id(x)
        )
        # Save
        self.save_to_mlflow(
            {
                'title': (articles_df, lambda df, path: df.to_csv(path)),
            }
        )

このタスクに実行するには前述した luigi.build() を用います。

luigi.build([FetchArticleTitles(), ])

まとめ

この記事では機械学習の環境構築について、“必要なこと” と “必要ではないこと” を考えてデザインするというプロセスを紹介しました。 ここで紹介した具体例はあらゆる人に最適とは言い切れませんが、 あらゆる技術スタックがあふれる今日において “必要でないこと” を考えてミニマルに環境構築するという考え方が 役立つことがあれば幸いです。

[原文: Toward the minimalism of machine learning experiment workflows ]


Profile picture

Thank you for reading this article! This article is written by Wataru Hirota (email: {my_first_name}@whiro.me). Please follow me on Twitter  or connect via LinkedIn  if you're interested in articles like this. :)

© 2022 Wataru Hirota