1 - Dagster

W&B を Dagster と統合するためのガイド。

Dagster と W&B (W&B) を使用して MLOps パイプラインを調整し、ML アセットを維持します。W&B とのインテグレーションにより、Dagster 内で以下が簡単になります:

  • W&B Artifacts の使用と作成。
  • W&B Registry で Registered Models の使用と作成。
  • W&B Launch を使用して専用のコンピュートでトレーニングジョブを実行します。
  • ops とアセットで wandb クライアントを使用します。

W&B Dagster インテグレーションは W&B 専用の Dagster リソースと IO マネージャーを提供します:

  • wandb_resource: W&B API への認証と通信に使用される Dagster リソース。
  • wandb_artifacts_io_manager: W&B Artifacts を処理するために使用される Dagster IO マネージャー。

以下のガイドでは、Dagster で W&B を使用するための前提条件の満たし方、ops とアセットで W&B Artifacts を作成して使用する方法、W&B Launch の利用方法、そして推奨されるベストプラクティスについて説明します。

始める前に

Dagster を Weights and Biases 内で使用するためには、以下のリソースが必要です:

  1. W&B API Key
  2. W&B entity (ユーザーまたはチーム): Entity は W&B Runs と Artifacts を送信する場所のユーザー名またはチーム名です。Runs をログに記録する前に、W&B App の UI でアカウントまたはチームエンティティを作成しておいてください。エンティティを指定しない場合、その run はデフォルトのエンティティに送信されます。通常、これはあなたのユーザー名です。設定の「Project Defaults」内でデフォルトのエンティティを変更できます。
  3. W&B project: W&B Runs が保存されるプロジェクトの名前。

W&B entity は、W&B App のそのユーザーまたはチームページのプロフィールページをチェックすることで見つけられます。既存の W&B project を使用するか、新しいものを作成することができます。新しいプロジェクトは、W&B App のホームページまたはユーザー/チームのプロフィールページで作成できます。プロジェクトが存在しない場合は、初回使用時に自動的に作成されます。以下の手順は API キーを取得する方法を示しています:

APIキーの取得方法

  1. W&B にログインします。注:W&B サーバーを使用している場合は、管理者にインスタンスのホスト名を尋ねてください。
  2. 認証ページ またはユーザー/チーム設定で APIキーを集めます。プロダクション環境では、そのキーを所有するために サービスアカウント を使用することをお勧めします。
  3. その APIキー用に環境変数を設定します。WANDB_API_KEY=YOUR_KEY をエクスポートします。

以下の例は、Dagster コード内で API キーを指定する場所を示しています。wandb_config のネストされた辞書内でエンティティとプロジェクト名を必ず指定してください。異なる W&B Project を使用したい場合は、異なる wandb_config の値を異なる ops/assets に渡すことができます。渡すことができる可能性のあるキーについての詳細は、以下の設定セクションを参照してください。

例: @job の設定

# これを config.yaml に追加します
# 代わりに、Dagit's Launchpad または JobDefinition.execute_in_process で設定することもできます
# 参考: https://docs.dagster.io/concepts/configuration/config-schema#specifying-runtime-configuration
resources:
 wandb_config:
   config:
     entity: my_entity # これをあなたの W&B entity に置き換えます
     project: my_project # これをあなたの W&B project に置き換えます

@job(
   resource_defs={
       "wandb_config": make_values_resource(
           entity=str,
           project=str,
       ),
       "wandb_resource": wandb_resource.configured(
           {"api_key": {"env": "WANDB_API_KEY"}}
       ),
       "io_manager": wandb_artifacts_io_manager,
   }
)
def simple_job_example():
   my_op()
Python

例: アセットを使用する @repository の設定

from dagster_wandb import wandb_artifacts_io_manager, wandb_resource
from dagster import (
   load_assets_from_package_module,
   make_values_resource,
   repository,
   with_resources,
)

from . import assets

@repository
def my_repository():
   return [
       *with_resources(
           load_assets_from_package_module(assets),
           resource_defs={
               "wandb_config": make_values_resource(
                   entity=str,
                   project=str,
               ),
               "wandb_resource": wandb_resource.configured(
                   {"api_key": {"env": "WANDB_API_KEY"}}
               ),
               "wandb_artifacts_manager": wandb_artifacts_io_manager.configured(
                   {"cache_duration_in_minutes": 60} # ファイルを 1 時間だけキャッシュする
               ),
           },
           resource_config_by_key={
               "wandb_config": {
                   "config": {
                       "entity": "my_entity", # これをあなたの W&B entity に置き換えます
                       "project": "my_project", # これをあなたの W&B project に置き換えます
                   }
               }
           },
       ),
   ]
Python

この例では @job の例と異なり IO Manager キャッシュ期間を設定しています。

設定

以下の設定オプションは、インテグレーションによって提供される W&B 専用 Dagster リソースと IO マネージャーの設定として使用されます。

  • wandb_resource: W&B API と通信するために使用される Dagster リソース。提供された APIキー を使用して自動的に認証されます。プロパティ:
    • api_key: (ストリング, 必須): W&B API と通信するために必要な W&B APIキー。
    • host: (ストリング, オプショナル): 使用したい API ホストサーバー。W&B Server を使用している場合にのみ必要です。デフォルトはパブリッククラウドのホスト、https://api.wandb.ai です。
  • wandb_artifacts_io_manager: W&B Artifacts を消費するための Dagster IO マネージャー。プロパティ:
    • base_dir: (整数, オプショナル) ローカルストレージとキャッシュに使用される基本ディレクトリ。W&B Artifacts と W&B Run のログはそのディレクトリから読み書きされます。デフォルトでは DAGSTER_HOME ディレクトリを使用します。
    • cache_duration_in_minutes: (整数, オプショナル) W&B Artifacts と W&B Run ログをローカルストレージに保持する時間。指定された時間が経過しアクセスされなかったファイルとディレクトリはキャッシュから削除されます。キャッシュのクリアは IO マネージャーの実行の終了時に行われます。キャッシュを無効にしたい場合は 0 に設定してください。キャッシュはジョブ間でアーティファクトが再利用されるときに速度を向上させます。デフォルトは30日間です。
    • run_id: (ストリング, オプショナル): この run の一意のIDで再開に使用されます。プロジェクト内で一意である必要があり、run を削除した場合、IDを再利用することはできません。短い説明名は name フィールドを使用し、ハイパーパラメーターを保存して runs 間で比較するために config を使用してください。IDには /\#?%: という特殊文字を含めることはできません。Dagster 内で実験管理を行う場合、IO マネージャーが run を再開できるように Run ID を設定する必要があります。デフォルトでは Dagster Run ID に設定されます。例:7e4df022-1bf2-44b5-a383-bb852df4077e
    • run_name: (ストリング, オプショナル) この run を UI で識別しやすくするための短い表示名。デフォルトでは、以下の形式の文字列です:dagster-run-[8最初のDagster Run IDの文字]。たとえば、dagster-run-7e4df022
    • run_tags: (list[str], オプショナル): この run の UI にタグ一覧を埋める文字列リスト。タグは runs をまとめて整理したり baselineproduction など一時的なラベルを適用するのに便利です。UIでタグを追加・削除したり特定のタグを持つ run だけを絞り込むのは簡単です。インテグレーションで使用される W&B Run には dagster_wandb タグが付きます。

W&B Artifacts を使用する

W&B Artifact とのインテグレーションは Dagster IO マネージャーに依存しています。

IO マネージャー は、アセットまたは op の出力を保存し、それを下流のアセットまたは ops への入力として読み込む責任を持つユーザ提供のオブジェクトです。たとえば、IO マネージャーはファイルシステム上のファイルからオブジェクトを保存および読み込む可能性があります。

今回のインテグレーションは W&B Artifacts 用のIO マネージャーを提供します。これにより Dagster の @op または @asset は W&B Artifacts をネイティブに作成および消費できます。ここに Python リストを含むデータセットタイプの W&B Artifact を生み出す @asset の簡単な例があります。

@asset(
    name="my_artifact",
    metadata={
        "wandb_artifact_arguments": {
            "type": "dataset",
        }
    },
    io_manager_key="wandb_artifacts_manager",
)
def create_dataset():
    return [1, 2, 3] # これは Artifact に保存されます
Python

@op@asset@multi_asset をメタデータ設定で注釈を付けてアーティファクトを記述できます。同様に、W&B Artifacts を Dagster 外部で作成された場合でも消費できます。

W&B Artifacts を書き込む

続行する前に、W&B Artifacts の使用方法について十分な理解を持っていることをお勧めします。Guide on Artifacts を検討してください。

Python 関数からオブジェクトを返すことで W&B Artifact を書き込みます。W&B でサポートされているオブジェクトは以下の通りです:

  • Python オブジェクト (int, dict, list…)
  • W&B オブジェクト (Table, Image, Graph…)
  • W&B Artifact オブジェクト

以下の例は、Dagster アセット (@asset) を使用して W&B Artifacts を書き込む方法を示しています:

pickle モジュールでシリアライズできるものは何でも、インテグレーションによって作成された Artifact にピクルスされて追加されます。ダグスター内でその Artifact を読むときに内容が読み込まれます(さらなる詳細については Read artifacts を参照してください)。

@asset(
    name="my_artifact",
    metadata={
        "wandb_artifact_arguments": {
            "type": "dataset",
        }
    },
    io_manager_key="wandb_artifacts_manager",
)
def create_dataset():
    return [1, 2, 3]
Python

W&B は複数のピクルスベースのシリアライズモジュール(pickle, dill, cloudpickle, joblib) をサポートしています。また、ONNXPMML といったより高度なシリアライズも利用できます。Serialization セクションを参照してください。

ネイティブ W&B オブジェクト (例: Table, Image, or Graph) のいずれかが作成された Artifact にインテグレーションによって追加されます。以下は Table を使った例です。

import wandb

@asset(
    name="my_artifact",
    metadata={
        "wandb_artifact_arguments": {
            "type": "dataset",
        }
    },
    io_manager_key="wandb_artifacts_manager",
)
def create_dataset_in_table():
    return wandb.Table(columns=["a", "b", "c"], data=[[1, 2, 3]])
Python

複雑なユースケースの場合、独自の Artifact オブジェクトを構築する必要があるかもしれません。インテグレーションは、統合の両側のメタデータを拡充するなど、便利な追加機能も提供しています。

import wandb

MY_ASSET = "my_asset"

@asset(
    name=MY_ASSET,
    io_manager_key="wandb_artifacts_manager",
)
def create_artifact():
   artifact = wandb.Artifact(MY_ASSET, "dataset")
   table = wandb.Table(columns=["a", "b", "c"], data=[[1, 2, 3]])
   artifact.add(table, "my_table")
   return artifact
Python

設定

@op@asset、および @multi_asset の設定を行うために使用される辞書 wandb_artifact_configuration があり、この辞書はメタデータとしてデコレータの引数で渡される必要があります。この設定は、W&B Artifacts の IO マネージャーの読み取りと書き込みを制御するために必要です。

@op の場合、Out メタデータ引数を介して出力メタデータにあります。 @asset の場合、アセットのメタデータ引数にあります。 @multi_asset の場合、AssetOut メタデータ引数を介して各出力メタデータにあります。

以下のコード例は、@op@asset、および @multi_asset 計算で辞書を構成する方法を示しています:

@op の例:

@op(
   out=Out(
       metadata={
           "wandb_artifact_configuration": {
               "name": "my_artifact",
               "type": "dataset",
           }
       }
   )
)
def create_dataset():
   return [1, 2, 3]
Python

@asset の例:

@asset(
   name="my_artifact",
   metadata={
       "wandb_artifact_configuration": {
           "type": "dataset",
       }
   },
   io_manager_key="wandb_artifacts_manager",
)
def create_dataset():
   return [1, 2, 3]
Python

設定を通じて名前を渡す必要はありません。@asset にはすでに名前があります。インテグレーションはアセット名として Artifact 名を設定します。

@multi_asset の例:

@multi_asset(
   name="create_datasets",
   outs={
       "first_table": AssetOut(
           metadata={
               "wandb_artifact_configuration": {
                   "type": "training_dataset",
               }
           },
           io_manager_key="wandb_artifacts_manager",
       ),
       "second_table": AssetOut(
           metadata={
               "wandb_artifact_configuration": {
                   "type": "validation_dataset",
               }
           },
           io_manager_key="wandb_artifacts_manager",
       ),
   },
   group_name="my_multi_asset_group",
)
def create_datasets():
   first_table = wandb.Table(columns=["a", "b", "c"], data=[[1, 2, 3]])
   second_table = wandb.Table(columns=["d", "e"], data=[[4, 5]])

   return first_table, second_table
Python

サポートされたプロパティ:

  • name: (str) このアーティファクトの人間が読み取り可能な名前で、その名前で UI内でこのアーティファクトを識別したり use_artifact 呼び出しで参照したりできます。名前には文字、数字、アンダースコア、ハイフン、ドットを含めることができます。プロジェクト内で一意である必要があります。@op に必須です。
  • type: (str) アーティファクトのタイプで、アーティファクトを整理し差別化するために使用されます。一般的なタイプにはデータセットやモデルがありますが、任意の文字列を使用することができ、数字、アンダースコア、ハイフン、ドットを含めることができます。出力がすでにアーティファクトでない場合に必要です。
  • description: (str) アーティファクトを説明するための自由なテキスト.説明は Markdownとして UIでレンダリングされるため,テーブル,リンクなどを配置するのに良い場所です。
  • aliases: (list[str]) アーティファクトに適用したい 1つ以上のエイリアスを含む配列。インテグレーションは、それが設定されていようとなかろうと「最新」のタグもそのリストに追加します。これはモデルとデータセットのバージョン管理に効果的な方法です。
  • add_dirs: 配列(list[dict[str, Any]]): Artifact に含める各ローカルディレクトリの設定を含む配列。SDK内の同名メソッドと同じ引数をサポートしています。
  • add_files: 配列(list[dict[str, Any]]): Artifact に含める各ローカルファイルの設定を含む配列。SDK内の同名メソッドと同じ引数をサポートしています。
  • add_references: 配列(list[dict[str, Any]]): Artifact に含める各外部リファレンスの設定を含む配列。SDK内の同名メソッドと同じ引数をサポートしています。
  • serialization_module: (dict) 使用するシリアライズモジュールの設定。詳細については シリアル化 セクションを参照してください。
    • name: (str) シリアライズモジュールの名前。受け入れられる値: pickle, dill, cloudpickle, joblib。モジュールはローカルで使用可能である必要があります。
    • parameters: (dict[str, Any]) シリアライズ関数に渡されるオプション引数。モジュールの dump メソッドと同じ引数を受け入れます。例えば、{"compress": 3, "protocol": 4}

高度な例:

@asset(
   name="my_advanced_artifact",
   metadata={
       "wandb_artifact_configuration": {
           "type": "dataset",
           "description": "My *Markdown* description",
           "aliases": ["my_first_alias", "my_second_alias"],
           "add_dirs": [
               {
                   "name": "My directory",
                   "local_path": "path/to/directory",
               }
           ],
           "add_files": [
               {
                   "name": "validation_dataset",
                   "local_path": "path/to/data.json",
               },
               {
                   "is_tmp": True,
                   "local_path": "path/to/temp",
               },
           ],
           "add_references": [
               {
                   "uri": "https://picsum.photos/200/300",
                   "name": "External HTTP reference to an image",
               },
               {
                   "uri": "s3://my-bucket/datasets/mnist",
                   "name": "External S3 reference",
               },
           ],
       }
   },
   io_manager_key="wandb_artifacts_manager",
)
def create_advanced_artifact():
   return [1, 2, 3]
Python

アセットは統合の両側で有用なメタデータとともに実体化されます:

  • W&B 側: ソースインテグレーション名とバージョン、使用された python バージョン、pickle プロトコルバージョンなど。
  • Dagster 側:
    • Dagster Run ID
    • W&B Run: ID、名前、パス、URL
    • W&B Artifact: ID、名前、タイプ、バージョン、サイズ、URL
    • W&B エンティティ
    • W&B プロジェクト

以下の画像は、Dagster アセットに追加された W&B からのメタデータを示しています。この情報は、インテグレーションがなければ利用できませんでした。

以下の画像は、与えられた設定が W&B アーティファクト上の有用なメタデータでどのように充実されたかを示しています。この情報は、再現性とメンテナンスに役立ちます。インテグレーションがなければ利用できませんでした。

パーティションの利用

インテグレーションはネイティブにDagster パーティションをサポートしています。

以下は DailyPartitionsDefinition を使用したパーティション化の例です。

@asset(
    partitions_def=DailyPartitionsDefinition(start_date="2023-01-01", end_date="2023-02-01"),
    name="my_daily_partitioned_asset",
    compute_kind="wandb",
    metadata={
        "wandb_artifact_configuration": {
            "type": "dataset",
        }
    },
)
def create_my_daily_partitioned_asset(context):
    partition_key = context.asset_partition_key_for_output()
    context.log.info(f"Creating partitioned asset for {partition_key}")
    return random.randint(0, 100)
Python

このコードはパーティションごとに一つの W&B Artifact を生成します。アーティファクトは、アセット名の下にパーティションキーを追加して Artifact パネル (UI) で表示されます。例: my_daily_partitioned_asset.2023-01-01my_daily_partitioned_asset.2023-01-02、または my_daily_partitioned_asset.2023-01-03。複数の次元でパーティション化されたアセットは、次元を点で区切った形式で表示されます。例: my_asset.car.blue

高度な使用法

W&B Artifacts を読み取る

W&B Artifacts の読み取りは、それらを書くのと似ています。@op または @assetwandb_artifact_configuration と呼ばれる設定辞書を設定することができます。唯一の違いは、その設定を出力ではなく入力に設定する必要がある点です。

@op の場合、In メタデータ引数を介して入力メタデータにあります。Artifact の名前を明示的に渡す必要があります。

@asset の場合、Asset の In メタデータ引数の入力メタデータにあります。親アセットの名前がそれに一致する必要があるため、アーティファクトの名前を渡す必要はありません。

インテグレーションの外部で作成されたアーティファクトに依存関係を持たせたい場合は、SourceAsset を使用する必要があります。それは常にそのアセットの最新バージョンを読み込みます。

次の例は、さまざまな ops から Artifact を読み取る方法を示しています。

@op からアーティファクトを読み取る

@op(
   ins={
       "artifact": In(
           metadata={
               "wandb_artifact_configuration": {
                   "name": "my_artifact",
               }
           }
       )
   },
   io_manager_key="wandb_artifacts_manager"
)
def read_artifact(context, artifact):
   context.log.info(artifact)
Python

別の @asset によって作成されたアーティファクトを読み取る

@asset(
   name="my_asset",
   ins={
       "artifact": AssetIn(
           # 入力引数をリネームしたくない場合は 'key' を削除できます
           key="parent_dagster_asset_name",
           input_manager_key="wandb_artifacts_manager",
       )
   },
)
def read_artifact(context, artifact):
   context.log.info(artifact)
Python

Dagster の外部で作成された Artifact を読み取る:

my_artifact = SourceAsset(
   key=AssetKey("my_artifact"),  # W&B Artifact の名前
   description="Artifact created outside Dagster",
   io_manager_key="wandb_artifacts_manager",
)


@asset
def read_artifact(context, my_artifact):
   context.log.info(my_artifact)
Python

設定

以下の設定は、IO マネージャーが収集するものを装飾された関数への入力として提供するべきかを示すために使用されます。以下の読み取りパターンがサポートされています。

  1. アーティファクト内にある名前付きオブジェクトを取得するには、get を使用します:
@asset(
   ins={
       "table": AssetIn(
           key="my_artifact_with_table",
           metadata={
               "wandb_artifact_configuration": {
                   "get": "my_table",
               }
           },
           input_manager_key="wandb_artifacts_manager",
       )
   }
)
def get_table(context, table):
   context.log.info(table.get_column("a"))
Python
  1. アーティファクト内にあるダウンロードされたファイルのローカルパスを取得するには、get_path を使用します:
@asset(
   ins={
       "path": AssetIn(
           key="my_artifact_with_file",
           metadata={
               "wandb_artifact_configuration": {
                   "get_path": "name_of_file",
               }
           },
           input_manager_key="wandb_artifacts_manager",
       )
   }
)
def get_path(context, path):
   context.log.info(path)
Python
  1. アーティファクトオブジェクト全体を取得する(コンテンツをローカルでダウンロードします):
@asset(
   ins={
       "artifact": AssetIn(
           key="my_artifact",
           input_manager_key="wandb_artifacts_manager",
       )
   },
)
def get_artifact(context, artifact):
   context.log.info(artifact.name)
Python

サポートされているプロパティ

  • get: (str) アーティファクト相対の名前にある W&B オブジェクトを取得します。
  • get_path: (str) アーティファクト相対の名前にあるファイルへのパスを取得します。

シリアル化設定

デフォルトでは、インテグレーションは標準の pickle モジュールを使用しますが、一部のオブジェクトはこれと互換性がありません。たとえば、yield を持つ関数はシリアライズしようとした場合にエラーを発生させます。

より多くのピクルスベースのシリアライズモジュール (dill, cloudpickle, joblib) をサポートしています。また、より高度なシリアル化を使用して ONNX または PMML など、シリアル化された文字列を返すか、直接アーティファクトを作成することもできます。あなたのユースケースに最適な選択肢は、利用可能な文献を参考にしてください。

ピクルスベースのシリアル化モジュール

使用するシリアル化を wandb_artifact_configuration 内の serialization_module 辞書を通じて設定することができます。Dagster を実行しているマシンでモジュールが利用可能であることを確認してください。

インテグレーションは、そのアーティファクトを読む際にどのシリアル化モジュールを使用するべきかを自動的に判断します。

現在サポートされているモジュールは pickledillcloudpickle、および joblib です。

こちらが、joblib でシリアル化された「モデル」を作成し、推論に使用する例です。

@asset(
    name="my_joblib_serialized_model",
    compute_kind="Python",
    metadata={
        "wandb_artifact_configuration": {
            "type": "model",
            "serialization_module": {
                "name": "joblib"
            },
        }
    },
    io_manager_key="wandb_artifacts_manager",
)
def create_model_serialized_with_joblib():
    # これは本物の ML モデルではありませんが、pickle モジュールでは不可能であるものです
    return lambda x, y: x + y

@asset(
    name="inference_result_from_joblib_serialized_model",
    compute_kind="Python",
    ins={
        "my_joblib_serialized_model": AssetIn(
            input_manager_key="wandb_artifacts_manager",
        )
    },
    metadata={
        "wandb_artifact_configuration": {
            "type": "results",
        }
    },
    io_manager_key="wandb_artifacts_manager",
)
def use_model_serialized_with_joblib(
    context: OpExecutionContext, my_joblib_serialized_model
):
    inference_result = my_joblib_serialized_model(1, 2)
    context.log.info(inference_result)  # 出力: 3
    return inference_result
Python

高度なシリアル化フォーマット (ONNX, PMML)

交換ファイル形式として ONNX や PMML を使用することは一般的です。インテグレーションはこれらの形式をサポートしていますが、Pickle ベースのシリアル化の場合よりも少し多くの作業が必要です。

これらの形式を使用する方法は 2 種類あります。

  1. モデルを選択した形式に変換してから、通常の Python オブジェクトのようにその形式の文字列表現を返します。インテグレーションはその文字列をピクルスします。それから、その文字列を使用してモデルを再構築することができます。
  2. シリアル化されたモデルを持つ新しいローカルファイルを作成し、そのファイルをカスタムアーティファクトに追加するために add_file 設定を実行します。

こちらは、Scikit-learn モデルを ONNX を使用してシリアル化する例です。

import numpy
import onnxruntime as rt
from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import FloatTensorType
from sklearn.datasets import load_iris
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split

from dagster import AssetIn, AssetOut, asset, multi_asset

@multi_asset(
    compute_kind="Python",
    outs={
        "my_onnx_model": AssetOut(
            metadata={
                "wandb_artifact_configuration": {
                    "type": "model",
                }
            },
            io_manager_key="wandb_artifacts_manager",
        ),
        "my_test_set": AssetOut(
            metadata={
                "wandb_artifact_configuration": {
                    "type": "test_set",
                }
            },
            io_manager_key="wandb_artifacts_manager",
        ),
    },
    group_name="onnx_example",
)
def create_onnx_model():
    # https://onnx.ai/sklearn-onnx/ からインスパイアされたサンプル

    # モデルのトレーニング
    iris = load_iris()
    X, y = iris.data, iris.target
    X_train, X_test, y_train, y_test = train_test_split(X, y)
    clr = RandomForestClassifier()
    clr.fit(X_train, y_train)

    # ONNX 形式に変換
    initial_type = [("float_input", FloatTensorType([None, 4]))]
    onx = convert_sklearn(clr, initial_types=initial_type)

    # アーティファクトの書き込み(モデル + テストセット)
    return onx.SerializeToString(), {"X_test": X_test, "y_test": y_test}

@asset(
    name="experiment_results",
    compute_kind="Python",
    ins={
        "my_onnx_model": AssetIn(
            input_manager_key="wandb_artifacts_manager",
        ),
        "my_test_set": AssetIn(
            input_manager_key="wandb_artifacts_manager",
        ),
    },
    group_name="onnx_example",
)
def use_onnx_model(context, my_onnx_model, my_test_set):
    # https://onnx.ai/sklearn-onnx/ からインスパイアされたサンプル

    # ONNX ランタイムを使用して予測を計算します
    sess = rt.InferenceSession(my_onnx_model)
    input_name = sess.get_inputs()[0].name
    label_name = sess.get_outputs()[0].name
    pred_onx = sess.run(
        [label_name], {input_name: my_test_set["X_test"].astype(numpy.float32)}
    )[0]
    context.log.info(pred_onx)
    return pred_onx
Python

パーティションの利用

インテグレーションはネイティブにDagster パーティションをサポートしています。

1つ、複数またはすべてのアセットパーティションを選別的に読み取ります。

すべてのパーティションは辞書で提供され、キーと値はそれぞれパーティションキーとアーティファクトコンテンツを表します。

上流の @asset のすべてのパーティションを読み取り、それらは辞書として与えられます。この辞書で、キーはパーティションキー、値はアーティファクトコンテンツに関連しています。

@asset(
    compute_kind="wandb",
    ins={"my_daily_partitioned_asset": AssetIn()},
    output_required=False,
)
def read_all_partitions(context, my_daily_partitioned_asset):
    for partition, content in my_daily_partitioned_asset.items():
        context.log.info(f"partition={partition}, content={content}")
Python

指定したパーティションを選ぶために AssetInpartition_mapping 設定を使用します。この例では TimeWindowPartitionMapping を使用しています。

@asset(
    partitions_def=DailyPartitionsDefinition(start_date="2023-01-01", end_date="2023-02-01"),
    compute_kind="wandb",
    ins={
        "my_daily_partitioned_asset": AssetIn(
            partition_mapping=TimeWindowPartitionMapping(start_offset=-1)
        )
    },
    output_required=False,
)
def read_specific_partitions(context, my_daily_partitioned_asset):
    for partition, content in my_daily_partitioned_asset.items():
        context.log.info(f"partition={partition}, content={content}")
Python

設定オブジェクト metadata は、プロジェクト内の異なるアーティファクトパーティションと wandb のやり取りを設定するために使用されます。

オブジェクト metadata は、wandb_artifact_configuration というキーを含んでおり、さらに partitions というネストされたオブジェクトを含んでいます。

partitions オブジェクトは、各パーティションの名前とその設定をマッピングします。各パーティションの設定は、データの取得方法を指定でき、それには getversion、および alias のキーを含む場合があります。

設定キー

  1. get: get キーは、データを取得する W&B オブジェクト (テーブル、イメージなど) の名前を指定します。
  2. version: version キーは、特定のバージョンをアーティファクトから取得したいときに使用されます。
  3. alias: alias キーにより、エイリアスによってアーティファクトを取得することができます。

ワイルドカード設定

ワイルドカード "*" は、全ての非設定パーティションを表します。明示的に partitions オブジェクトに記載されていないパーティションに対するデフォルト設定を提供します。

例、

"*": {
    "get": "default_table_name",
},
Python

この設定は、明示的に設定されていないすべてのパーティションに対し、データが default_table_name というテーブルから取得されることを意味します。

特定のパーティション設定

ワイルドカード設定を、特定のキーを持つ特定のパーティション設定で上書きできます。

例、

"yellow": {
    "get": "custom_table_name",
},
Python

この設定は、yellow という名前のパーティションに対し、データが custom_table_name というテーブルから取得されることを意味し、ワイルドカード設定を上書きします。

バージョニングとエイリアス

バージョニングおよびエイリアスのために、設定で特定の version および alias のキーを指定することができます。

バージョンの場合、

"orange": {
    "version": "v0",
},
Python

この設定は、orange アーティファクトパーティションのバージョン v0 からのデータを取得します。

エイリアスの場合、

"blue": {
    "alias": "special_alias",
},
Python

この設定は、アーティファクトパーティションのエイリアス special_alias (設定では blue として参照) の default_table_name テーブルからデータを取得します。

高度な使用法

インテグレーションの高度な使用法を確認するには、以下の完全なコード例を参照してください:

W&B Launch の使用

継続する前に、W&B Launch の使用方法について十分な理解を持っていることをお勧めします。Launch のガイドを読むことを検討してください: /guides/launch。

Dagster インテグレーションは以下を補助します:

  • Dagster インスタンス内での1つまたは複数の Launch エージェントの実行。
  • あなたの Dagster インスタンス内でのローカル Launch ジョブの実行。
  • オンプレミスまたはクラウドでのリモート Launch ジョブ。

Launch エージェント

インテグレーションには run_launch_agent というインポート可能な @op が提供されます。この @op は Launch エージェントを起動し、手動で停止されるまで長時間実行プロセスとして実行します。

エージェントは launch キューをポールし、ジョブを(またはそれらを実行するために外部サービスにディスパッチ)発行するプロセスです。

設定については、リファレンスドキュメント を参照してください

Launchingpad で全プロパティに対する有用な説明を見ることもできます。

シンプルな例

# これを config.yaml に追加します
# 代わりに、Dagit's Launchpad または JobDefinition.execute_in_process で設定することもできます
# 参考: https://docs.dagster.io/concepts/configuration/config-schema#specifying-runtime-configuration
resources:
 wandb_config:
   config:
     entity: my_entity # これをあなたの W&B entity に置き換えます
     project: my_project # これをあなたの W&B project に置き換えます
ops:
 run_launch_agent:
   config:
     max_jobs: -1
     queues:
       - my_dagster_queue

from dagster_wandb.launch.ops import run_launch_agent
from dagster_wandb.resources import wandb_resource

from dagster import job, make_values_resource

@job(
   resource_defs={
       "wandb_config": make_values_resource(
           entity=str,
           project=str,
       ),
       "wandb_resource": wandb_resource.configured(
           {"api_key": {"env": "WANDB_API_KEY"}}
       ),
   },
)
def run_launch_agent_example():
   run_launch_agent()
Python

Launch ジョブ

インテグレーションには run_launch_job というインポート可能な @op が提供されます。この @op はあなたの Launch ジョブを実行します。

Launch ジョブは実行されるためにキューに割り当てられます。キューを作成するか、デフォルトのものを使用することができます。キューを監視する有効なエージェントがあることを確認します。あなたの Dagster インスタンス内でエージェントを実行するだけでなく、Kubernetes でデプロイ可能なエージェントを使用することも考慮に入れることができます。

設定については、リファレンスドキュメント を参照してください。

Launchpad では、すべてのプロパティに対する有用な説明も見ることができます。

シンプルな例

# これを config.yaml に追加します
# 代わりに、Dagit's Launchpad または JobDefinition.execute_in_process で設定することもできます
# 参考: https://docs.dagster.io/concepts/configuration/config-schema#specifying-runtime-configuration
resources:
 wandb_config:
   config:
     entity: my_entity # これをあなたの W&B entity に置き換えます
     project: my_project # これをあなたの W&B project に置き換えます
ops:
 my_launched_job:
   config:
     entry_point:
       - python
       - train.py
     queue: my_dagster_queue
     uri: https://github.com/wandb/example-dagster-integration-with-launch

from dagster_wandb.launch.ops import run_launch_job
from dagster_wandb.resources import wandb_resource

from dagster import job, make_values_resource

@job(resource_defs={
       "wandb_config": make_values_resource(
           entity=str,
           project=str,
       ),
       "wandb_resource": wandb_resource.configured(
           {"api_key": {"env": "WANDB_API_KEY"}}
       ),
   },
)
def run_launch_job_example():
   run_launch_job.alias("my_launched_job")() # 私たちはエイリアスを使ってジョブの名前を変更します。
Python

ベストプラクティス

  1. IO マネージャーを使用して Artifacts を読み書きします。 Artifact.download()Run.log_artifact() を直接使用する必要はありません。これらのメソッドはインテグレーションによって処理されます。Artifacts に保存したいデータを単に返し、インテグレーションに任せてください。これにより W&B での Artifact リネージが改善されます。

  2. 複雑なユースケースのためにのみ Artifact オブジェクトを自分で構築します。 Python オブジェクトと W&B オブジェクトを ops/assets から返すべきです。インテグレーションは Artifact のバンドルを扱います。 複雑なユースケースに対しては、Dagster ジョブ内で直接 Artifact を構築できます。インテグレーション名とバージョン、使用された Python バージョン、ピクルスプロトコルバージョンなどのメタデータ拡充のために、インテグレーションに Artifact を渡すことをお勧めします。

  3. メタデータを介してアーティファクトにファイル、ディレクトリ、外部リファレンスを追加します。 インテグレーション wandb_artifact_configuration オブジェクトを使用して、任意のファイル、ディレクトリ、外部リファレンス(Amazon S3、GCS、HTTP…)を追加します。詳細については Artifact 設定セクション の高度ない例を参照してください。

  4. アーティファクトが生成される場合は、@op より @asset を使用してください。 Artifacts はなんらかのアセットです。Dagster がそのアセットを管理する場合は、アセットを使用することをお勧めします。これにより、Dagit Asset Catalog の可観測性が向上します。

  5. Dagster 外部で作成されたアーティファクトを読み取るために SourceAsset を使用してください。 これにより、インテグレーションを活用して外部で作成されたアーティファクトを読むことができます。それ以外の場合、インテグレーションで作成されたアーティファクトのみを使用できます。

  6. 大規模なモデルのための専用コンピュートでのトレーニングを調整するために W&B Launch を使用してください。 小さなモデルは Dagster クラスター内でトレーニングできますし、GPU ノードを持つ Kubernetes クラスターで Dagster を実行することもできます。W&B Launch を使用して大規模なモデルのトレーニングを行うことをお勧めします。これによりインスタンスの負荷が軽減され、より適切なコンピュートへのアクセスが得られます。

  7. Dagster 内で実験管理を行う際は、W&B Run ID を Dagster Run ID の値に設定してください。 Run を再開可能にする ことと、W&B Run ID を Dagster Run ID またはお好みの文字列に設定することの両方をお勧めします。この推奨事項に従うことで、Dagster 内でモデルをトレーニングする際に W&B メトリクスと W&B Artifacts がすべて同じ W&B Run に格納されていることが保証されます。

W&B Run ID を Dagster Run ID に設定するか、

wandb.init(
    id=context.run_id,
    resume="allow",
    ...
)
Python

独自の W&B Run ID を選び、それを IO マネージャー設定に渡します。

wandb.init(
    id="my_resumable_run_id",
    resume="allow",
    ...
)

@job(
   resource_defs={
       "io_manager": wandb_artifacts_io_manager.configured(
           {"wandb_run_id": "my_resumable_run_id"}
       ),
   }
)
Python
  1. 大きな W&B Artifacts のために必要なデータだけを get や get_path で収集します。 デフォルトでインテグレーションはアーティファクト全体をダウンロードします。非常に大きなアーティファクトを使用している場合は、特定のファイルやオブジェクトだけを収集することをお勧めします。これにより速度が向上し、リソースの利用が向上します。

  2. Python オブジェクトに対してユースケースに合わせてピクルスモジュールを適応させます。 デフォルトで W&Bインテグレーションは標準の pickle モジュールを使用します。しかし、一部のオブジェクトはこれと互換性がありません。例えば、yield を持つ関数はシリアライズしようとするとエラーを発生します。W&B は他のピクルスベースのシリアライズモジュール(dill, cloudpickle, joblib) をサポートしています。

また、ONNXPMML など、より高度なシリアライズによってシリアライズされた文字列を返すか、直接 Artifact を作成することもできます。適切な選択はユースケースに依存します。このテーマに関しては、利用可能な文献を参考にしてください。

2 - Minikube でシングルノード GPU クラスターを起動する

W&B LaunchをMinikubeクラスターにセットアップし、GPU のワークロードをスケジュールして実行できるようにします。

背景

Nvidia コンテナツールキットのおかげで、DockerでGPUを有効にしたワークフローを簡単に実行できるようになりました。制限の一つに、ボリュームによるGPUのスケジューリングのネイティブなサポートがない点があります。docker run コマンドでGPUを使用したい場合は、特定のGPUをIDでリクエストするか、存在するすべてのGPUをリクエストする必要がありますが、これは多くの分散GPUを有効にしたワークロードを非現実的にします。Kubernetesはボリュームリクエストによるスケジューリングをサポートしていますが、GPUスケジューリングを備えたローカルKubernetesクラスターのセットアップには、最近までかなりの時間と労力がかかっていました。Minikubeは、シングルノードKubernetesクラスターを実行するための最も人気のあるツールの1つであり、最近 GPUスケジューリングのサポート をリリースしました 🎉 このチュートリアルでは、マルチGPUマシンにMinikubeクラスターを作成し、W&B Launchを使用してクラスターに並行して安定的な拡散推論ジョブを起動します 🚀

前提条件

始める前に、次のものが必要です:

  1. W&Bアカウント。
  2. 以下がインストールされているLinuxマシン:
    1. Docker runtime
    2. 使用したいGPU用のドライバ
    3. Nvidiaコンテナツールキット

Launchジョブ用のキューを作成

最初に、launchジョブ用のlaunchキューを作成します。

  1. wandb.ai/launch(またはプライベートW&Bサーバーを使用している場合は <your-wandb-url>/launch)に移動します。
  2. 画面の右上隅にある青い Create a queue ボタンをクリックします。キュー作成のドロワーが画面の右側からスライドアウトします。
  3. エンティティを選択し、名前を入力し、キューのタイプとして Kubernetes を選択します。
  4. ドロワーの Config セクションには、launchキュー用のKubernetesジョブ仕様を入力します。このキューから起動されたrunは、このジョブ仕様を使用して作成されるため、必要に応じてジョブをカスタマイズするためにこの設定を変更できます。このチュートリアルでは、下記のサンプル設定をキューの設定にYAMLまたはJSONとしてコピー&ペーストできます:
spec:
  template:
    spec:
      containers:
        - image: ${image_uri}
          resources:
            limits:
              cpu: 4
              memory: 12Gi
              nvidia.com/gpu: '{{gpus}}'
      restartPolicy: Never
  backoffLimit: 0
YAML
{
  "spec": {
    "template": {
      "spec": {
        "containers": [
          {
            "image": "${image_uri}",
            "resources": {
              "limits": {
                "cpu": 4,
                "memory": "12Gi",
                "nvidia.com/gpu": "{{gpus}}"
              }
            }
          }
        ],
        "restartPolicy": "Never"
      }
    },
    "backoffLimit": 0
  }
}
JSON

キュー設定の詳細については、 Set up Launch on KubernetesAdvanced queue setup guide を参照してください。

${image_uri}{{gpus}} 文字列は、キュー設定で使用できる2種類の変数テンプレートの例です。${image_uri} テンプレートは、エージェントが起動するジョブの画像URIに置き換えられます。{{gpus}} テンプレートは、ジョブを送信する際にlaunch UI、CLI、またはSDKからオーバーライドできるテンプレート変数の作成に使用されます。これらの値はジョブ仕様に配置され、ジョブで使用される画像とGPUリソースを制御する正しいフィールドを変更します。

  1. Parse configuration ボタンをクリックして gpus テンプレート変数をカスタマイズし始めます。
  2. TypeInteger に設定し、 DefaultMinMax を選択した値に設定します。このテンプレート変数の制約を違反するrunをこのキューに送信しようとすると、拒否されます。
gpusテンプレート変数を使用したキュー作成ドロワーの画像
  1. Create queue をクリックしてキューを作成します。新しいキューのキューページにリダイレクトされます。

次のセクションでは、作成したキューからジョブをプルして実行できるエージェントをセットアップします。

Docker + NVIDIA CTKのセットアップ

既にマシンでDockerとNvidiaコンテナツールキットを設定している場合は、このセクションをスキップできます。

Dockerのドキュメントを参照して、システム上でのDockerコンテナエンジンのセットアップ手順を確認してください。

Dockerがインストールされたら、その後にNvidiaコンテナツールキットをNvidiaのドキュメント に従ってインストールします。

コンテナランタイムがGPUにアクセスできることを確認するには、次を実行します:

docker run --gpus all ubuntu nvidia-smi
Bash

マシンに接続されているGPUを記述する nvidia-smi の出力が表示されるはずです。たとえば、私たちのセットアップでは、出力は次のようになります:

Wed Nov  8 23:25:53 2023
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 525.105.17   Driver Version: 525.105.17   CUDA Version: 12.0     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|===============================+======================+======================|
|   0  Tesla T4            Off  | 00000000:00:04.0 Off |                    0 |
| N/A   38C    P8     9W /  70W |      2MiB / 15360MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
|   1  Tesla T4            Off  | 00000000:00:05.0 Off |                    0 |
| N/A   38C    P8     9W /  70W |      2MiB / 15360MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
|   2  Tesla T4            Off  | 00000000:00:06.0 Off |                    0 |
| N/A   40C    P8     9W /  70W |      2MiB / 15360MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
|   3  Tesla T4            Off  | 00000000:00:07.0 Off |                    0 |
| N/A   39C    P8     9W /  70W |      2MiB / 15360MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+

+-----------------------------------------------------------------------------+
| Processes:                                                                  |
|  GPU   GI   CI        PID   Type   Process name                  GPU Memory |
|        ID   ID                                                   Usage      |
|=============================================================================|
|  No running processes found                                                 |
+-----------------------------------------------------------------------------+

Minikubeの設定

MinikubeのGPUサポートにはバージョンv1.32.0以上が必要です。Minikubeのインストールドキュメント を参照して、最新のインストール方法を確認してください。このチュートリアルでは、次のコマンドを使用して最新のMinikubeリリースをインストールしました:

curl -LO https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64
sudo install minikube-linux-amd64 /usr/local/bin/minikube
YAML

次のステップは、GPUを使用してminikubeクラスターを開始することです。マシン上で以下を実行します:

minikube start --gpus all
YAML

上記のコマンドの出力は、クラスターが正常に作成されたかどうかを示します。

launch エージェントを開始

新しいクラスター向けのlaunchエージェントは、wandb launch-agentを直接呼び出すか、W&Bによって管理されるHelmチャートを使用してlaunchエージェントをデプロイすることによって開始されます。

このチュートリアルでは、エージェントをホストマシンで直接実行します。

エージェントをローカルで実行するには、デフォルトのKubernetes APIコンテキストがMinikubeクラスターを指していることを確認してください。次に、以下を実行してエージェントの依存関係をインストールします:

pip install "wandb[launch]"
Bash

エージェントの認証を設定するには、wandb loginを実行するか、WANDB_API_KEY環境変数を設定します。

エージェントを開始するには、次のコマンドを実行します:

wandb launch-agent -j <max-number-concurrent-jobs> -q <queue-name> -e <queue-entity>
Bash

ターミナル内でlaunchエージェントがポーリングメッセージを印刷し始めるのを確認できます。

おめでとうございます、launchエージェントがlaunchキューのポーリングを行っています。キューにジョブが追加されると、エージェントがそれを受け取り、Minikubeクラスターで実行するようスケジュールします。

ジョブを起動

エージェントにジョブを送信しましょう。W&Bアカウントにログインしているターミナルから、以下のコマンドで簡単な “hello world” を起動できます:

wandb launch -d wandb/job_hello_world:main -p <target-wandb-project> -q <your-queue-name> -e <your-queue-entity>
YAML

好きなジョブやイメージでテストできますが、クラスターがイメージをプルできることを確認してください。追加のガイダンスについては、Minikubeのドキュメントを参照してください。また、私たちの公開ジョブを使用してテストすることもできます。

(オプション) NFSを用いたモデルとデータキャッシング

ML ワークロードのために、複数のジョブが同じデータにアクセスできるようにしたい場合があります。たとえば、大規模なアセット(データセットやモデルの重みなど)の再ダウンロードを避けるために共有キャッシュを持つことができます。Kubernetesは、永続ボリュームと永続ボリュームクレームを通じてこれをサポートしています。永続ボリュームは、Kubernetesワークロードにおいて volumeMounts を作成し、共有キャッシュへの直接ファイルシステムアクセスを提供します。

このステップでは、モデルの重みをキャッシュとして共有するために使用できるネットワークファイルシステム(NFS)サーバーをセットアップします。最初のステップはNFSをインストールし、設定することです。このプロセスはオペレーティングシステムによって異なります。私たちのVMはUbuntuを実行しているので、nfs-kernel-serverをインストールし、/srv/nfs/kubedataでエクスポートを設定しました:

sudo apt-get install nfs-kernel-server
sudo mkdir -p /srv/nfs/kubedata
sudo chown nobody:nogroup /srv/nfs/kubedata
sudo sh -c 'echo "/srv/nfs/kubedata *(rw,sync,no_subtree_check,no_root_squash,no_all_squash,insecure)" >> /etc/exports'
sudo exportfs -ra
sudo systemctl restart nfs-kernel-server
Bash

ホストファイルシステムのサーバーのエクスポート先と、NFSサーバーのローカルIPアドレスをメモしておいてください。次のステップでこの情報が必要です。

次に、このNFSの永続ボリュームと永続ボリュームクレームを作成する必要があります。永続ボリュームは非常にカスタマイズ可能ですが、シンプlicityのために、ここではシンプルな設定を使用します。

以下のyamlを nfs-persistent-volume.yaml というファイルにコピーし、希望のボリュームキャパシティとクレームリクエストを記入してください。PersistentVolume.spec.capcity.storage フィールドは、基になるボリュームの最大サイズを制御します。PersistentVolumeClaim.spec.resources.requests.storage は、特定のクレームに割り当てられるボリュームキャパシティを制限するために使用できます。私たちのユースケースでは、それぞれに同じ値を使用するのが理にかなっています。

apiVersion: v1
kind: PersistentVolume
metadata:
  name: nfs-pv
spec:
  capacity:
    storage: 100Gi # あなたの希望の容量に設定してください。
  accessModes:
    - ReadWriteMany
  nfs:
    server: <your-nfs-server-ip> # TODO: ここを記入してください。
    path: '/srv/nfs/kubedata' # またはあなたのカスタムパス
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: nfs-pvc
spec:
  accessModes:
    - ReadWriteMany
  resources:
    requests:
      storage: 100Gi # あなたの希望の容量に設定してください。
  storageClassName: ''
  volumeName: nfs-pv
YAML

以下のコマンドでクラスターにリソースを作成します:

kubectl apply -f nfs-persistent-volume.yaml
YAML

私たちのrunがこのキャッシュを使用できるようにするためには、launchキューの設定に volumesvolumeMounts を追加する必要があります。launchの設定を編集するには、再び wandb.ai/launch(またはwandbサーバー上のユーザーの場合は<your-wandb-url>/launch)に戻り、キューを見つけ、キューページに移動し、その後、Edit config タブをクリックします。元の設定を次のように変更できます:

spec:
  template:
    spec:
      containers:
        - image: ${image_uri}
          resources:
            limits:
              cpu: 4
              memory: 12Gi
              nvidia.com/gpu: "{{gpus}}"
					volumeMounts:
            - name: nfs-storage
              mountPath: /root/.cache
      restartPolicy: Never
			volumes:
        - name: nfs-storage
          persistentVolumeClaim:
            claimName: nfs-pvc
  backoffLimit: 0
YAML
{
  "spec": {
    "template": {
      "spec": {
        "containers": [
          {
            "image": "${image_uri}",
            "resources": {
              "limits": {
                "cpu": 4,
                "memory": "12Gi",
                "nvidia.com/gpu": "{{gpus}}"
              },
              "volumeMounts": [
                {
                  "name": "nfs-storage",
                  "mountPath": "/root/.cache"
                }
              ]
            }
          }
        ],
        "restartPolicy": "Never",
        "volumes": [
          {
            "name": "nfs-storage",
            "persistentVolumeClaim": {
              "claimName": "nfs-pvc"
            }
          }
        ]
      }
    },
    "backoffLimit": 0
  }
}
JSON

これで、NFSはジョブを実行しているコンテナの /root/.cache にマウントされます。コンテナが root 以外のユーザーとして実行される場合、マウントパスは調整が必要です。HuggingfaceのライブラリとW&B Artifactsは、デフォルトで $HOME/.cache/ を利用しているため、ダウンロードは一度だけ行われるはずです。

安定拡散と遊ぶ

新しいシステムをテストするために、安定的な拡散の推論パラメータを実験します。 デフォルトのプロンプトと常識的なパラメータでシンプルな安定的拡散推論ジョブを実行するには、次のコマンドを実行します:

wandb launch -d wandb/job_stable_diffusion_inference:main -p <target-wandb-project> -q <your-queue-name> -e <your-queue-entity>

上記のコマンドは、あなたのキューに wandb/job_stable_diffusion_inference:main コンテナイメージを送信します。 エージェントがジョブを受け取り、クラスターで実行するためにスケジュールするとき、接続に依存してイメージがプルされるまで時間がかかることがあります。 ジョブのステータスはwandb.ai/launch(またはwandbサーバー上のユーザーの場合の<your-wandb-url>/launch)キューページで確認できます。

runが終了すると、指定したプロジェクトにジョブアーティファクトがあるはずです。 プロジェクトのジョブページ (<project-url>/jobs) にアクセスしてジョブアーティファクトを見つけることができます。デフォルトの名前は job-wandb_job_stable_diffusion_inference ですが、ジョブのページでジョブ名の横にある鉛筆アイコンをクリックして好きなように変更できます。

このジョブを使って、クラスター上でさらに安定的な拡散推論を実行することができます。 ジョブページから、右上隅にある Launch ボタンをクリックして新しい推論ジョブを設定し、キューに送信します。ジョブの設定ページは、元のrunからのパラメータで自動的に入力されますが、 Overrides セクションで値を変更することで好きなように変更できます。

安定拡散推論ジョブのlaunch UIの画像

3 - NVIDIA NeMo 推論マイクロサービスデプロイジョブ

モデルアーティファクトを W&B から NVIDIA NeMo Inference Microservice にデプロイします。これを行うには、W&B Launch を使用します。W&B Launch はモデルアーティファクトを NVIDIA NeMo Model に変換し、稼働中の NIM/Triton サーバーにデプロイします。

W&B Launch は現在、以下の互換性のあるモデルタイプを受け入れています:

  1. Llama2
  2. StarCoder
  3. NV-GPT (近日公開)

クイックスタート

  1. launch キューを作成する まだ持っていない場合は、以下に例としてキュー設定を示します。

    net: host
    gpus: all # 特定の GPU セットまたは `all` を使用してすべてを使うこともできます
    runtime: nvidia # nvidia コンテナランタイムも必要です
    volume:
      - model-store:/model-store/
    
    YAML
    image
  2. プロジェクトにこのジョブを作成します:

    wandb job create -n "deploy-to-nvidia-nemo-inference-microservice" \
       -e $ENTITY \
       -p $PROJECT \
       -E jobs/deploy_to_nvidia_nemo_inference_microservice/job.py \
       -g andrew/nim-updates \
       git https://github.com/wandb/launch-jobs
    
    Bash
  3. GPU マシンでエージェントを起動します:

    wandb launch-agent -e $ENTITY -p $PROJECT -q $QUEUE
    
    Bash
  4. 希望する設定でデプロイメントローンチジョブを Launch UI から送信します。

    1. CLI から送信することもできます:
      wandb launch -d gcr.io/playground-111/deploy-to-nemo:latest \
        -e $ENTITY \
        -p $PROJECT \
        -q $QUEUE \
        -c $CONFIG_JSON_FNAME
      
      Bash
      image
  5. Launch UI でデプロイメントプロセスを追跡できます。 image

  6. 完了すると、すぐにエンドポイントに curl してモデルをテストできます。モデル名は常に ensemble です。

     #!/bin/bash
     curl -X POST "http://0.0.0.0:9999/v1/completions" \
         -H "accept: application/json" \
         -H "Content-Type: application/json" \
         -d '{
             "model": "ensemble",
             "prompt": "Tell me a joke",
             "max_tokens": 256,
             "temperature": 0.5,
             "n": 1,
             "stream": false,
             "stop": "string",
             "frequency_penalty": 0.0
             }'
    
    Bash

4 - Volcano でマルチノードジョブをローンチする

このチュートリアルでは、Kubernetes上でW&BとVolcanoを使用してマルチノードトレーニングのジョブをローンチするプロセスを説明します。

概要

このチュートリアルでは、W&B Launchを使用してKubernetes上でマルチノードジョブを実行する方法を学びます。私たちが従うステップは以下の通りです:

  • Weights & BiasesのアカウントとKubernetesクラスターを確認する。
  • Volcanoジョブ用のローンチキューを作成する。
  • KubernetesクラスターにLaunchエージェントをデプロイする。
  • 分散トレーニングジョブを作成する。
  • 分散トレーニングをローンチする。

必要条件

開始する前に必要なもの:

  • Weights & Biasesアカウント
  • Kubernetesクラスター

ローンチキューを作成する

最初のステップはローンチキューを作成することです。wandb.ai/launchにアクセスし、画面の右上隅にある青いCreate a queueボタンを押します。右側からキュー作成ドロワーがスライドアウトします。エンティティを選択し、名前を入力し、キューのタイプとしてKubernetesを選択します。

設定セクションで、volcanoのジョブのテンプレートを入力します。このキューからローンチされたすべてのrunはこのジョブ仕様を使用して作成されるため、ジョブをカスタマイズしたい場合はこの設定を変更できます。

この設定ブロックには、Kubernetesジョブ仕様、volcanoジョブ仕様、または他のカスタムリソース定義(CRD)をローンチするために使用することができます。設定ブロック内のマクロを利用して、この仕様の内容を動的に設定することができます。

このチュートリアルでは、volcanoのpytorchプラグインを利用したマルチノードpytorchトレーニングの設定を使用します。以下の設定をYAMLまたはJSONとしてコピーして貼り付けることができます:

kind: Job
spec:
  tasks:
    - name: master
      policies:
        - event: TaskCompleted
          action: CompleteJob
      replicas: 1
      template:
        spec:
          containers:
            - name: master
              image: ${image_uri}
              imagePullPolicy: IfNotPresent
          restartPolicy: OnFailure
    - name: worker
      replicas: 1
      template:
        spec:
          containers:
            - name: worker
              image: ${image_uri}
              workingDir: /home
              imagePullPolicy: IfNotPresent
          restartPolicy: OnFailure
  plugins:
    pytorch:
      - --master=master
      - --worker=worker
      - --port=23456
  minAvailable: 1
  schedulerName: volcano
metadata:
  name: wandb-job-${run_id}
  labels:
    wandb_entity: ${entity_name}
    wandb_project: ${project_name}
  namespace: wandb
apiVersion: batch.volcano.sh/v1alpha1
YAML
{
  "kind": "Job",
  "spec": {
    "tasks": [
      {
        "name": "master",
        "policies": [
          {
            "event": "TaskCompleted",
            "action": "CompleteJob"
          }
        ],
        "replicas": 1,
        "template": {
          "spec": {
            "containers": [
              {
                "name": "master",
                "image": "${image_uri}",
                "imagePullPolicy": "IfNotPresent"
              }
            ],
            "restartPolicy": "OnFailure"
          }
        }
      },
      {
        "name": "worker",
        "replicas": 1,
        "template": {
          "spec": {
            "containers": [
              {
                "name": "worker",
                "image": "${image_uri}",
                "workingDir": "/home",
                "imagePullPolicy": "IfNotPresent"
              }
            ],
            "restartPolicy": "OnFailure"
          }
        }
      }
    ],
    "plugins": {
      "pytorch": [
        "--master=master",
        "--worker=worker",
        "--port=23456"
      ]
    },
    "minAvailable": 1,
    "schedulerName": "volcano"
  },
  "metadata": {
    "name": "wandb-job-${run_id}",
    "labels": {
      "wandb_entity": "${entity_name}",
      "wandb_project": "${project_name}"
    },
    "namespace": "wandb"
  },
  "apiVersion": "batch.volcano.sh/v1alpha1"
}
JSON

ドロワーの下部にあるCreate queueボタンをクリックしてキューの作成を完了します。

Volcanoをインストールする

KubernetesクラスターにVolcanoをインストールするには、公式インストールガイドに従ってください。

ローンチエージェントをデプロイする

キューを作成した後は、キューからジョブを引き出して実行するためにローンチエージェントをデプロイする必要があります。これを行う最も簡単な方法は、W&Bの公式helm-chartsリポジトリからlaunch-agentチャートを使用することです。READMEに記載された指示に従って、Kubernetesクラスターにチャートをインストールし、エージェントが先ほど作成したキューをポーリングするように設定してください。

トレーニングジョブを作成する

Volcanoのpytorchプラグインは、pytorch DPPが機能するために必要な環境変数(MASTER_ADDRRANKWORLD_SIZEなど)を自動で設定します。ただし、pytorchコードがDDPを正しく使用している場合に限ります。カスタムのPythonコードでDDPを使用する方法の詳細については、pytorchのドキュメントを参照してください。

ローンチ 🚀

キューとクラスターのセットアップが完了したので、分散トレーニングを開始する時がきました。最初に、Volcanoのpytorchプラグインを使用してランダムデータ上でシンプルなマルチレイヤパーセプトロンをトレーニングするa jobを使用します。このジョブのソースコードはこちらで見つけることができます。

このジョブをローンチするには、ジョブのページにアクセスし、画面の右上にあるLaunchボタンをクリックします。ジョブをローンチするキューを選択するように促されます。

  1. ジョブのパラメータを好きなように設定し、
  2. 先ほど作成したキューを選択します。
  3. Resource configセクションでVolcanoジョブを変更してジョブのパラメータを変更します。例えば、workerタスクのreplicasフィールドを変更することによってワーカーの数を変更できます。
  4. Launchをクリック 🚀

W&B UIからジョブの進捗をモニターし、必要に応じてジョブを停止できます。