1 - Dagster

Guide on how to integrate W&B with Dagster.

Use Dagster and W&B (W&B) to orchestrate your MLOps pipelines and maintain ML assets. The integration with W&B makes it easy within Dagster to:

The W&B Dagster integration provides a W&B-specific Dagster resource and IO Manager:

  • wandb_resource: a Dagster resource used to authenticate and communicate to the W&B API.
  • wandb_artifacts_io_manager: a Dagster IO Manager used to consume W&B Artifacts.

The following guide demonstrates how to satisfy prerequisites to use W&B in Dagster, how to create and use W&B Artifacts in ops and assets, how to use W&B Launch and recommended best practices.

Before you get started

You will need the following resources to use Dagster within Weights and Biases:

  1. W&B API Key.
  2. W&B entity (user or team): An entity is a username or team name where you send W&B Runs and Artifacts. Make sure to create your account or team entity in the W&B App UI before you log runs. If you do not specify ain entity, the run will be sent to your default entity, which is usually your username. Change your default entity in your settings under Project Defaults.
  3. W&B project: The name of the project where W&B Runs are stored.

Find your W&B entity by checking the profile page for that user or team in the W&B App. You can use a pre-existing W&B project or create a new one. New projects can be created on the W&B App homepage or on user/team profile page. If a project does not exist it will be automatically created when you first use it. The proceeding instructions demonstrate how to get an API key:

How to get an API key

  1. Log in to W&B. Note: if you are using W&B Server ask your admin for the instance host name.
  2. Collect your API key by navigating to the authorize page or in your user/team settings. For a production environment we recommend using a service account to own that key.
  3. Set an environment variable for that API key export WANDB_API_KEY=YOUR_KEY.

The proceeding examples demonstrate where to specify your API key in your Dagster code. Make sure to specify your entity and project name within the wandb_config nested dictionary. You can pass different wandb_config values to different ops/assets if you want to use a different W&B Project. For more information about possible keys you can pass, see the Configuration section below.

Example: configuration for @job

# add this to your config.yaml
# alternatively you can set the config in Dagit's Launchpad or JobDefinition.execute_in_process
# Reference: https://docs.dagster.io/concepts/configuration/config-schema#specifying-runtime-configuration
resources:
 wandb_config:
   config:
     entity: my_entity # replace this with your W&B entity
     project: my_project # replace this with your W&B project


@job(
   resource_defs={
       "wandb_config": make_values_resource(
           entity=str,
           project=str,
       ),
       "wandb_resource": wandb_resource.configured(
           {"api_key": {"env": "WANDB_API_KEY"}}
       ),
       "io_manager": wandb_artifacts_io_manager,
   }
)
def simple_job_example():
   my_op()

Example: configuration for @repository using assets

from dagster_wandb import wandb_artifacts_io_manager, wandb_resource
from dagster import (
   load_assets_from_package_module,
   make_values_resource,
   repository,
   with_resources,
)

from . import assets

@repository
def my_repository():
   return [
       *with_resources(
           load_assets_from_package_module(assets),
           resource_defs={
               "wandb_config": make_values_resource(
                   entity=str,
                   project=str,
               ),
               "wandb_resource": wandb_resource.configured(
                   {"api_key": {"env": "WANDB_API_KEY"}}
               ),
               "wandb_artifacts_manager": wandb_artifacts_io_manager.configured(
                   {"cache_duration_in_minutes": 60} # only cache files for one hour
               ),
           },
           resource_config_by_key={
               "wandb_config": {
                   "config": {
                       "entity": "my_entity", # replace this with your W&B entity
                       "project": "my_project", # replace this with your W&B project
                   }
               }
           },
       ),
   ]

Note that we are configuring the IO Manager cache duration in this example contrary to the example for @job.

Configuration

The proceeding configuration options are used as settings on the W&B-specific Dagster resource and IO Manager provided by the integration.

  • wandb_resource: Dagster resource used to communicate with the W&B API. It automatically authenticates using the provided API key. Properties:
    • api_key: (str, required): a W&B API key necessary to communicate with the W&B API.
    • host: (str, optional): the API host server you wish to use. Only required if you are using W&B Server. It defaults to the Public Cloud host: https://api.wandb.ai
  • wandb_artifacts_io_manager: Dagster IO Manager to consume W&B Artifacts. Properties:
    • base_dir: (int, optional) Base directory used for local storage and caching. W&B Artifacts and W&B Run logs will be written and read from that directory. By default, it’s using the DAGSTER_HOME directory.
    • cache_duration_in_minutes: (int, optional) to define the amount of time W&B Artifacts and W&B Run logs should be kept in the local storage. Only files and directories that were not opened for that amount of time are removed from the cache. Cache purging happens at the end of an IO Manager execution. You can set it to 0, if you want to turn off caching completely. Caching improves speed when an Artifact is reused between jobs running on the same machine. It defaults to 30 days.
    • run_id: (str, optional): A unique ID for this run, used for resuming. It must be unique in the project, and if you delete a run you can’t reuse the ID. Use the name field for a short descriptive name, or config for saving hyperparameters to compare across runs. The ID cannot contain the following special characters: /\#?%:.. You need to set the Run ID when you are doing experiment tracking inside Dagster to allow the IO Manager to resume the run. By default it’s set to the Dagster Run ID e.g 7e4df022-1bf2-44b5-a383-bb852df4077e.
    • run_name: (str, optional) A short display name for this run to help you identify this run in the UI. By default, it is a string with the following format: dagster-run-[8 first characters of the Dagster Run ID]. For example, dagster-run-7e4df022.
    • run_tags: (list[str], optional): A list of strings, which will populate the list of tags on this run in the UI. Tags are useful for organizing runs together, or applying temporary labels like baseline or production. It’s easy to add and remove tags in the UI, or filter down to just runs with a specific tag. Any W&B Run used by the integration will have the dagster_wandb tag.

Use W&B Artifacts

The integration with W&B Artifact relies on a Dagster IO Manager.

IO Managers are user-provided objects that are responsible for storing the output of an asset or op and loading it as input to downstream assets or ops. For example, an IO Manager might store and load objects from files on a filesystem.

The integration provides an IO Manager for W&B Artifacts. This allows any Dagster @op or @asset to create and consume W&B Artifacts natively. Here’s a simple example of an @asset producing a W&B Artifact of type dataset containing a Python list.

@asset(
    name="my_artifact",
    metadata={
        "wandb_artifact_arguments": {
            "type": "dataset",
        }
    },
    io_manager_key="wandb_artifacts_manager",
)
def create_dataset():
    return [1, 2, 3] # this will be stored in an Artifact

You can annotate your @op, @asset and @multi_asset with a metadata configuration in order to write Artifacts. Similarly you can also consume W&B Artifacts even if they were created outside Dagster.

Write W&B Artifacts

Before continuing, we recommend you to have a good understanding of how to use W&B Artifacts. Consider reading the Guide on Artifacts.

Return an object from a Python function to write a W&B Artifact. The following objects are supported by W&B:

  • Python objects (int, dict, list…)
  • W&B objects (Table, Image, Graph…)
  • W&B Artifact objects

The proceeding examples demonstrate how to write W&B Artifacts with Dagster assets (@asset):

Anything that can be serialized with the pickle module is pickled and added to an Artifact created by the integration. The content is unpickled when you read that Artifact inside Dagster (see Read artifacts for more details).

@asset(
    name="my_artifact",
    metadata={
        "wandb_artifact_arguments": {
            "type": "dataset",
        }
    },
    io_manager_key="wandb_artifacts_manager",
)
def create_dataset():
    return [1, 2, 3]

W&B supports multiple Pickle-based serialization modules (pickle, dill, cloudpickle, joblib). You can also use more advanced serialization like ONNX or PMML. Please refer to the Serialization section for more information.

Any native W&B object (e.g Table, Image, or Graph) is added to an Artifact created by the integration. Here’s an example using a Table.

import wandb

@asset(
    name="my_artifact",
    metadata={
        "wandb_artifact_arguments": {
            "type": "dataset",
        }
    },
    io_manager_key="wandb_artifacts_manager",
)
def create_dataset_in_table():
    return wandb.Table(columns=["a", "b", "c"], data=[[1, 2, 3]])

For complex use cases, it might be necessary to build your own Artifact object. The integration still provides useful additional features like augmenting the metadata on both sides of the integration.

import wandb

MY_ASSET = "my_asset"

@asset(
    name=MY_ASSET,
    io_manager_key="wandb_artifacts_manager",
)
def create_artifact():
   artifact = wandb.Artifact(MY_ASSET, "dataset")
   table = wandb.Table(columns=["a", "b", "c"], data=[[1, 2, 3]])
   artifact.add(table, "my_table")
   return artifact

Configuration

A configuration dictionary called wandb_artifact_configuration can be set on an @op, @asset and @multi_asset. This dictionary must be passed in the decorator arguments as metadata. This configuration is required to control the IO Manager reads and writes of W&B Artifacts.

For @op, it’s located in the output metadata through the Out metadata argument. For @asset, it’s located in the metadata argument on the asset. For @multi_asset, it’s located in each output metadata through the AssetOut metadata arguments.

The proceeding code examples demonstrate how to configure a dictionary on an @op, @asset and @multi_asset computations:

Example for @op:

@op(
   out=Out(
       metadata={
           "wandb_artifact_configuration": {
               "name": "my_artifact",
               "type": "dataset",
           }
       }
   )
)
def create_dataset():
   return [1, 2, 3]

Example for @asset:

@asset(
   name="my_artifact",
   metadata={
       "wandb_artifact_configuration": {
           "type": "dataset",
       }
   },
   io_manager_key="wandb_artifacts_manager",
)
def create_dataset():
   return [1, 2, 3]

You do not need to pass a name through the configuration because the @asset already has a name. The integration sets the Artifact name as the asset name.

Example for @multi_asset:

@multi_asset(
   name="create_datasets",
   outs={
       "first_table": AssetOut(
           metadata={
               "wandb_artifact_configuration": {
                   "type": "training_dataset",
               }
           },
           io_manager_key="wandb_artifacts_manager",
       ),
       "second_table": AssetOut(
           metadata={
               "wandb_artifact_configuration": {
                   "type": "validation_dataset",
               }
           },
           io_manager_key="wandb_artifacts_manager",
       ),
   },
   group_name="my_multi_asset_group",
)
def create_datasets():
   first_table = wandb.Table(columns=["a", "b", "c"], data=[[1, 2, 3]])
   second_table = wandb.Table(columns=["d", "e"], data=[[4, 5]])

   return first_table, second_table

Supported properties:

  • name: (str) human-readable name for this artifact, which is how you can identify this artifact in the UI or reference it in use_artifact calls. Names can contain letters, numbers, underscores, hyphens, and dots. The name must be unique across a project. Required for @op.
  • type: (str) The type of the artifact, which is used to organize and differentiate artifacts. Common types include dataset or model, but you can use any string containing letters, numbers, underscores, hyphens, and dots. Required when the output is not already an Artifact.
  • description: (str) Free text that offers a description of the artifact. The description is markdown rendered in the UI, so this is a good place to place tables, links, etc.
  • aliases: (list[str]) An array containing one or more aliases you want to apply on the Artifact. The integration will also add the “latest” tag to that list whether it’s set or not. This is an effective way for you to manage versioning of models and datasets.
  • add_dirs: (list[dict[str, Any]]): An array containing configuration for each local directory to include in the Artifact. It supports the same arguments as the homonymous method in the SDK.
  • add_files: (list[dict[str, Any]]): An array containing configuration for each local file to include in the Artifact. It supports the same arguments as the homonymous method in the SDK.
  • add_references: (list[dict[str, Any]]): An array containing configuration for each external reference to include in the Artifact. It supports the same arguments as the homonymous method in the SDK.
  • serialization_module: (dict) Configuration of the serialization module to be used. Refer to the Serialization section for more information.
    • name: (str) Name of the serialization module. Accepted values: pickle, dill, cloudpickle, joblib. The module needs to be available locally.
    • parameters: (dict[str, Any]) Optional arguments passed to the serialization function. It accepts the same parameters as the dump method for that module. For example, {"compress": 3, "protocol": 4}.

Advanced example:

@asset(
   name="my_advanced_artifact",
   metadata={
       "wandb_artifact_configuration": {
           "type": "dataset",
           "description": "My *Markdown* description",
           "aliases": ["my_first_alias", "my_second_alias"],
           "add_dirs": [
               {
                   "name": "My directory",
                   "local_path": "path/to/directory",
               }
           ],
           "add_files": [
               {
                   "name": "validation_dataset",
                   "local_path": "path/to/data.json",
               },
               {
                   "is_tmp": True,
                   "local_path": "path/to/temp",
               },
           ],
           "add_references": [
               {
                   "uri": "https://picsum.photos/200/300",
                   "name": "External HTTP reference to an image",
               },
               {
                   "uri": "s3://my-bucket/datasets/mnist",
                   "name": "External S3 reference",
               },
           ],
       }
   },
   io_manager_key="wandb_artifacts_manager",
)
def create_advanced_artifact():
   return [1, 2, 3]

The asset is materialized with useful metadata on both sides of the integration:

  • W&B side: the source integration name and version, the python version used, the pickle protocol version and more.
  • Dagster side:
    • Dagster Run ID
    • W&B Run: ID, name, path, URL
    • W&B Artifact: ID, name, type, version, size, URL
    • W&B Entity
    • W&B Project

The proceeding image demonstrates the metadata from W&B that was added to the Dagster asset. This information would not be available without the integration.

The following image demonstrates how the provided configuration was enriched with useful metadata on the W&B Artifact. This information should help for reproducibility and maintenance. It would not be available without the integration.

Using partitions

The integration natively supports Dagster partitions.

The following is an example with a partitioned using DailyPartitionsDefinition.

@asset(
    partitions_def=DailyPartitionsDefinition(start_date="2023-01-01", end_date="2023-02-01"),
    name="my_daily_partitioned_asset",
    compute_kind="wandb",
    metadata={
        "wandb_artifact_configuration": {
            "type": "dataset",
        }
    },
)
def create_my_daily_partitioned_asset(context):
    partition_key = context.asset_partition_key_for_output()
    context.log.info(f"Creating partitioned asset for {partition_key}")
    return random.randint(0, 100)

This code will produce one W&B Artifact for each partition. View artifacts in the Artifact panel (UI) under the asset name, which has the partition key appended. For example, my_daily_partitioned_asset.2023-01-01, my_daily_partitioned_asset.2023-01-02, ormy_daily_partitioned_asset.2023-01-03. Assets that are partitioned across multiple dimensions shows each dimension in dot-delimited format. For example, my_asset.car.blue.

Advanced usage

Read W&B Artifacts

Reading W&B Artifacts is similar to writing them. A configuration dictionary called wandb_artifact_configuration can be set on an @op or @asset. The only difference is that we must set the configuration on the input instead of the output.

For @op, it’s located in the input metadata through the In metadata argument. You need to explicitly pass the name of the Artifact.

For @asset, it’s located in the input metadata through the Asset In metadata argument. You should not pass an Artifact name because the name of the parent asset should match it.

If you want to have a dependency on an Artifact created outside the integration you will need to use SourceAsset. It will always read the latest version of that asset.

The following examples demonstrate how to read an Artifact from various ops.

Reading an artifact from an @op

@op(
   ins={
       "artifact": In(
           metadata={
               "wandb_artifact_configuration": {
                   "name": "my_artifact",
               }
           }
       )
   },
   io_manager_key="wandb_artifacts_manager"
)
def read_artifact(context, artifact):
   context.log.info(artifact)

Reading an artifact created by another @asset

@asset(
   name="my_asset",
   ins={
       "artifact": AssetIn(
           # if you don't want to rename the input argument you can remove 'key'
           key="parent_dagster_asset_name",
           input_manager_key="wandb_artifacts_manager",
       )
   },
)
def read_artifact(context, artifact):
   context.log.info(artifact)

Reading an Artifact created outside Dagster:

my_artifact = SourceAsset(
   key=AssetKey("my_artifact"),  # the name of the W&B Artifact
   description="Artifact created outside Dagster",
   io_manager_key="wandb_artifacts_manager",
)


@asset
def read_artifact(context, my_artifact):
   context.log.info(my_artifact)

Configuration

The proceeding configuration is used to indicate what the IO Manager should collect and provide as inputs to the decorated functions. The following read patterns are supported.

  1. To get an named object contained within an Artifact use get:
@asset(
   ins={
       "table": AssetIn(
           key="my_artifact_with_table",
           metadata={
               "wandb_artifact_configuration": {
                   "get": "my_table",
               }
           },
           input_manager_key="wandb_artifacts_manager",
       )
   }
)
def get_table(context, table):
   context.log.info(table.get_column("a"))
  1. To get the local path of a downloaded file contained within an Artifact use get_path:
@asset(
   ins={
       "path": AssetIn(
           key="my_artifact_with_file",
           metadata={
               "wandb_artifact_configuration": {
                   "get_path": "name_of_file",
               }
           },
           input_manager_key="wandb_artifacts_manager",
       )
   }
)
def get_path(context, path):
   context.log.info(path)
  1. To get the entire Artifact object (with the content downloaded locally):
@asset(
   ins={
       "artifact": AssetIn(
           key="my_artifact",
           input_manager_key="wandb_artifacts_manager",
       )
   },
)
def get_artifact(context, artifact):
   context.log.info(artifact.name)

Supported properties

  • get: (str) Gets the W&B object located at the artifact relative name.
  • get_path: (str) Gets the path to the file located at the artifact relative name.

Serialization configuration

By default, the integration will use the standard pickle module, but some objects are not compatible with it. For example, functions with yield will raise an error if you try to pickle them.

We support more Pickle-based serialization modules (dill, cloudpickle, joblib). You can also use more advanced serialization like ONNX or PMML by returning a serialized string or creating an Artifact directly. The right choice will depend on your use case, please refer to the available literature on this subject.

Pickle-based serialization modules

You can configure the serialization used through the serialization_module dictionary in the wandb_artifact_configuration. Please make sure the module is available on the machine running Dagster.

The integration will automatically know which serialization module to use when you read that Artifact.

The currently supported modules are pickle, dill, cloudpickle, and joblib.

Here’s a simplified example where we create a “model” serialized with joblib and then use it for inference.

@asset(
    name="my_joblib_serialized_model",
    compute_kind="Python",
    metadata={
        "wandb_artifact_configuration": {
            "type": "model",
            "serialization_module": {
                "name": "joblib"
            },
        }
    },
    io_manager_key="wandb_artifacts_manager",
)
def create_model_serialized_with_joblib():
    # This is not a real ML model but this would not be possible with the pickle module
    return lambda x, y: x + y

@asset(
    name="inference_result_from_joblib_serialized_model",
    compute_kind="Python",
    ins={
        "my_joblib_serialized_model": AssetIn(
            input_manager_key="wandb_artifacts_manager",
        )
    },
    metadata={
        "wandb_artifact_configuration": {
            "type": "results",
        }
    },
    io_manager_key="wandb_artifacts_manager",
)
def use_model_serialized_with_joblib(
    context: OpExecutionContext, my_joblib_serialized_model
):
    inference_result = my_joblib_serialized_model(1, 2)
    context.log.info(inference_result)  # Prints: 3
    return inference_result

Advanced serialization formats (ONNX, PMML)

It’s common to use interchange file formats like ONNX and PMML. The integration supports those formats but it requires a bit more work than for Pickle-based serialization.

There are two different methods to use those formats.

  1. Convert your model to the selected format, then return the string representation of that format as if it were a normal Python objects. The integration will pickle that string. You can then rebuild your model using that string.
  2. Create a new local file with your serialized model, then build a custom Artifact with that file using the add_file configuration.

Here’s an example of a Scikit-learn model being serialized using ONNX.

import numpy
import onnxruntime as rt
from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import FloatTensorType
from sklearn.datasets import load_iris
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split

from dagster import AssetIn, AssetOut, asset, multi_asset

@multi_asset(
    compute_kind="Python",
    outs={
        "my_onnx_model": AssetOut(
            metadata={
                "wandb_artifact_configuration": {
                    "type": "model",
                }
            },
            io_manager_key="wandb_artifacts_manager",
        ),
        "my_test_set": AssetOut(
            metadata={
                "wandb_artifact_configuration": {
                    "type": "test_set",
                }
            },
            io_manager_key="wandb_artifacts_manager",
        ),
    },
    group_name="onnx_example",
)
def create_onnx_model():
    # Inspired from https://onnx.ai/sklearn-onnx/

    # Train a model.
    iris = load_iris()
    X, y = iris.data, iris.target
    X_train, X_test, y_train, y_test = train_test_split(X, y)
    clr = RandomForestClassifier()
    clr.fit(X_train, y_train)

    # Convert into ONNX format
    initial_type = [("float_input", FloatTensorType([None, 4]))]
    onx = convert_sklearn(clr, initial_types=initial_type)

    # Write artifacts (model + test_set)
    return onx.SerializeToString(), {"X_test": X_test, "y_test": y_test}

@asset(
    name="experiment_results",
    compute_kind="Python",
    ins={
        "my_onnx_model": AssetIn(
            input_manager_key="wandb_artifacts_manager",
        ),
        "my_test_set": AssetIn(
            input_manager_key="wandb_artifacts_manager",
        ),
    },
    group_name="onnx_example",
)
def use_onnx_model(context, my_onnx_model, my_test_set):
    # Inspired from https://onnx.ai/sklearn-onnx/

    # Compute the prediction with ONNX Runtime
    sess = rt.InferenceSession(my_onnx_model)
    input_name = sess.get_inputs()[0].name
    label_name = sess.get_outputs()[0].name
    pred_onx = sess.run(
        [label_name], {input_name: my_test_set["X_test"].astype(numpy.float32)}
    )[0]
    context.log.info(pred_onx)
    return pred_onx

Using partitions

The integration natively supports Dagster partitions.

You can selectively read one, multiple, or all partitions of an asset.

All partitions are provided in a dictionary, with the key and value representing the partition key and the Artifact content, respectively.

It reads all partitions of the upstream @asset, which are given as a dictionary. In this dictionary, the key and value correlate to the partition key and the Artifact content, respectively.

@asset(
    compute_kind="wandb",
    ins={"my_daily_partitioned_asset": AssetIn()},
    output_required=False,
)
def read_all_partitions(context, my_daily_partitioned_asset):
    for partition, content in my_daily_partitioned_asset.items():
        context.log.info(f"partition={partition}, content={content}")

The AssetIn’s partition_mapping configuration allows you to choose specific partitions. In this case, we are employing the TimeWindowPartitionMapping.

@asset(
    partitions_def=DailyPartitionsDefinition(start_date="2023-01-01", end_date="2023-02-01"),
    compute_kind="wandb",
    ins={
        "my_daily_partitioned_asset": AssetIn(
            partition_mapping=TimeWindowPartitionMapping(start_offset=-1)
        )
    },
    output_required=False,
)
def read_specific_partitions(context, my_daily_partitioned_asset):
    for partition, content in my_daily_partitioned_asset.items():
        context.log.info(f"partition={partition}, content={content}")

The configuration object, metadata, is used to configure how Weights & Biases (wandb) interacts with different artifact partitions in your project.

The object metadata contains a key named wandb_artifact_configuration which further contains a nested object partitions.

The partitions object maps the name of each partition to its configuration. The configuration for each partition can specify how to retrieve data from it. These configurations can contain different keys, namely get, version, and alias, depending on the requirements of each partition.

Configuration keys

  1. get: The get key specifies the name of the W&B Object (Table, Image…) where to fetch the data.
  2. version: The version key is used when you want to fetch a specific version for the Artifact.
  3. alias: The alias key allows you to get the Artifact by its alias.

Wildcard configuration

The wildcard "*" stands for all non-configured partitions. This provides a default configuration for partitions that are not explicitly mentioned in the partitions object.

For example,

"*": {
    "get": "default_table_name",
},

This configuration means that for all partitions not explicitly configured, data is fetched from the table named default_table_name.

Specific partition configuration

You can override the wildcard configuration for specific partitions by providing their specific configurations using their keys.

For example,

"yellow": {
    "get": "custom_table_name",
},

This configuration means that for the partition named yellow, data will be fetched from the table named custom_table_name, overriding the wildcard configuration.

Versioning and aliasing

For versioning and aliasing purposes, you can provide specific version and alias keys in your configuration.

For versions,

"orange": {
    "version": "v0",
},

This configuration will fetch data from the version v0 of the orange Artifact partition.

For aliases,

"blue": {
    "alias": "special_alias",
},

This configuration will fetch data from the table default_table_name of the Artifact partition with the alias special_alias (referred to as blue in the configuration).

Advanced usage

To view advanced usage of the integration please refer to the following full code examples:

Using W&B Launch

Before continuing, we recommend you to have a good understanding of how to use W&B Launch. Consider, reading the Guide on Launch: /guides/launch.

The Dagster integration helps with:

  • Running one or multiple Launch agents in your Dagster instance.
  • Executing local Launch jobs within your Dagster instance.
  • Remote Launch jobs on-prem or in a cloud.

Launch agents

The integration provides an importable @op called run_launch_agent. It starts a Launch Agent and runs it as a long running process until stopped manually.

Agents are processes that poll launch queues and execute the jobs (or dispatch them to external services to be executed) in order.

Refer to the reference documentation for configuration

You can also view useful descriptions for all properties in Launchpad.

Simple example

# add this to your config.yaml
# alternatively you can set the config in Dagit's Launchpad or JobDefinition.execute_in_process
# Reference: https://docs.dagster.io/concepts/configuration/config-schema#specifying-runtime-configuration
resources:
 wandb_config:
   config:
     entity: my_entity # replace this with your W&B entity
     project: my_project # replace this with your W&B project
ops:
 run_launch_agent:
   config:
     max_jobs: -1
     queues: 
       - my_dagster_queue

from dagster_wandb.launch.ops import run_launch_agent
from dagster_wandb.resources import wandb_resource

from dagster import job, make_values_resource

@job(
   resource_defs={
       "wandb_config": make_values_resource(
           entity=str,
           project=str,
       ),
       "wandb_resource": wandb_resource.configured(
           {"api_key": {"env": "WANDB_API_KEY"}}
       ),
   },
)
def run_launch_agent_example():
   run_launch_agent()

Launch jobs

The integration provides an importable @op called run_launch_job. It executes your Launch job.

A Launch job is assigned to a queue in order to be executed. You can create a queue or use the default one. Make sure you have an active agent listening to that queue. You can run an agent inside your Dagster instance but can also consider using a deployable agent in Kubernetes.

Refer to the reference documentation for configuration.

You can also view useful descriptions for all properties in Launchpad.

Simple example

# add this to your config.yaml
# alternatively you can set the config in Dagit's Launchpad or JobDefinition.execute_in_process
# Reference: https://docs.dagster.io/concepts/configuration/config-schema#specifying-runtime-configuration
resources:
 wandb_config:
   config:
     entity: my_entity # replace this with your W&B entity
     project: my_project # replace this with your W&B project
ops:
 my_launched_job:
   config:
     entry_point:
       - python
       - train.py
     queue: my_dagster_queue
     uri: https://github.com/wandb/example-dagster-integration-with-launch


from dagster_wandb.launch.ops import run_launch_job
from dagster_wandb.resources import wandb_resource

from dagster import job, make_values_resource


@job(resource_defs={
       "wandb_config": make_values_resource(
           entity=str,
           project=str,
       ),
       "wandb_resource": wandb_resource.configured(
           {"api_key": {"env": "WANDB_API_KEY"}}
       ),
   },
)
def run_launch_job_example():
   run_launch_job.alias("my_launched_job")() # we rename the job with an alias

Best practices

  1. Use the IO Manager to read and write Artifacts. You should never need to use Artifact.download() or Run.log_artifact() directly. Those methods are handled by integration. Simply return the data you wish to store in Artifact and let the integration do the rest. This will provide better lineage for the Artifact in W&B.

  2. Only build an Artifact object yourself for complex use cases. Python objects and W&B objects should be returned from your ops/assets. The integration handles bundling the Artifact. For complex use cases, you can build an Artifact directly in a Dagster job. We recommend you pass an Artifact object to the integration for metadata enrichment such as the source integration name and version, the python version used, the pickle protocol version and more.

  3. Add files, directories and external references to your Artifacts through the metadata. Use the integration wandb_artifact_configuration object to add any file, directory or external references (Amazon S3, GCS, HTTP…). See the advanced example in the Artifact configuration section for more information.

  4. Use an @asset instead of an @op when an Artifact is produced. Artifacts are assets. It is recommended to use an asset when Dagster maintains that asset. This will provide better observability in the Dagit Asset Catalog.

  5. Use a SourceAsset to consume an Artifact created outside Dagster. This allows you to take advantage of the integration to read externally created Artifacts. Otherwise, you can only use Artifacts created by the integration.

  6. Use W&B Launch to orchestrate training on dedicated compute for large models. You can train small models inside your Dagster cluster and you can run Dagster in a Kubernetes cluster with GPU nodes. We recommend using W&B Launch for large model training. This will prevent overloading your instance and provide access to more adequate compute.

  7. When experiment tracking within Dagster, set your W&B Run ID to the value of your Dagster Run ID. We recommend that you both: make the Run resumable and set the W&B Run ID to the Dagster Run ID or to a string of your choice. Following this recommendation ensures your W&B metrics and W&B Artifacts are stored in the same W&B Run when you train models inside of Dagster.

Either set the W&B Run ID to the Dagster Run ID.

wandb.init(
    id=context.run_id,
    resume="allow",
    ...
)

Or choose your own W&B Run ID and pass it to the IO Manager configuration.

wandb.init(
    id="my_resumable_run_id",
    resume="allow",
    ...
)

@job(
   resource_defs={
       "io_manager": wandb_artifacts_io_manager.configured(
           {"wandb_run_id": "my_resumable_run_id"}
       ),
   }
)
  1. Only collect data you need with get or get_path for large W&B Artifacts. By default, the integration will download an entire Artifact. If you are using very large artifacts you might want to only collect the specific files or objects you need. This will improve speed and resource utilization.

  2. For Python objects adapt the pickling module to your use case. By default, the W&B integration will use the standard pickle module. But some objects are not compatible with it. For example, functions with yield will raise an error if you try to pickle them. W&B supports other Pickle-based serialization modules (dill, cloudpickle, joblib).

You can also use more advanced serialization like ONNX or PMML by returning a serialized string or creating an Artifact directly. The right choice will depend on your use case, refer to the available literature on this subject.

2 - Launch multinode jobs with Volcano

This tutorial will guide you through the process of launching multinode training jobs with W&B and Volcano on Kubernetes.

Overview

In this tutorial, you will learn how to use W&B Launch to run multinode jobs on Kubernetes. The steps we will follow are:

  • Ensure that you have a Weights & Biases account and a Kubernetes cluster.
  • Create a launch queue for our volcano jobs.
  • Deploy a Launch agent into our kubernetes cluster.
  • Create a distributed training job.
  • Launch our distributed training.

Prerequisites

Before you get started, you will need:

  • A Weights & Biases account
  • A Kubernetes cluster

Create a launch queue

The first step is to create a launch queue. Head to wandb.ai/launch and in the top right corner of your screen, hit the blue Create a queue button. A queue creation drawer will slide out from the right side of your screen. Select an entity, enter a name, and select Kubernetes as the type for your queue.

In the configuration section, we will enter a volcano job template. Any runs launched from this queue will be created using this job specification, so you can modify this configuration as needed to customize your jobs.

This configuration block can accept a Kubernetes job specification, volcano job specification, or any other custom resource definition (CRD) that you are interested in launching. You can make use of macros in the configuration block to dynamically set the contents of this spec.

In this tutorial, we will use a configuration for multinode pytorch training that makes use of volcano’s pytorch plugin. You can copy and paste the following config as YAML or JSON:

kind: Job
spec:
  tasks:
    - name: master
      policies:
        - event: TaskCompleted
          action: CompleteJob
      replicas: 1
      template:
        spec:
          containers:
            - name: master
              image: ${image_uri}
              imagePullPolicy: IfNotPresent
          restartPolicy: OnFailure
    - name: worker
      replicas: 1
      template:
        spec:
          containers:
            - name: worker
              image: ${image_uri}
              workingDir: /home
              imagePullPolicy: IfNotPresent
          restartPolicy: OnFailure
  plugins:
    pytorch:
      - --master=master
      - --worker=worker
      - --port=23456
  minAvailable: 1
  schedulerName: volcano
metadata:
  name: wandb-job-${run_id}
  labels:
    wandb_entity: ${entity_name}
    wandb_project: ${project_name}
  namespace: wandb
apiVersion: batch.volcano.sh/v1alpha1
{
  "kind": "Job",
  "spec": {
    "tasks": [
      {
        "name": "master",
        "policies": [
          {
            "event": "TaskCompleted",
            "action": "CompleteJob"
          }
        ],
        "replicas": 1,
        "template": {
          "spec": {
            "containers": [
              {
                "name": "master",
                "image": "${image_uri}",
                "imagePullPolicy": "IfNotPresent"
              }
            ],
            "restartPolicy": "OnFailure"
          }
        }
      },
      {
        "name": "worker",
        "replicas": 1,
        "template": {
          "spec": {
            "containers": [
              {
                "name": "worker",
                "image": "${image_uri}",
                "workingDir": "/home",
                "imagePullPolicy": "IfNotPresent"
              }
            ],
            "restartPolicy": "OnFailure"
          }
        }
      }
    ],
    "plugins": {
      "pytorch": [
        "--master=master",
        "--worker=worker",
        "--port=23456"
      ]
    },
    "minAvailable": 1,
    "schedulerName": "volcano"
  },
  "metadata": {
    "name": "wandb-job-${run_id}",
    "labels": {
      "wandb_entity": "${entity_name}",
      "wandb_project": "${project_name}"
    },
    "namespace": "wandb"
  },
  "apiVersion": "batch.volcano.sh/v1alpha1"
}

Click the Create queue button at the bottom of the drawer to finish creating your queue.

Install Volcano

To install Volcano in your Kubernetes cluster, you can follow the official installation guide.

Deploy your launch agent

Now that you have created a queue, you will need to deploy a launch agent to pull and execute jobs from the queue. The easiest way to do this is with the launch-agent chart from W&B’s official helm-charts repository. Follow the instructions in the README to install the chart into your Kubernetes cluster, and be sure to configure the agent to poll the queue you created earlier.

Create a training job

Volcano’s pytorch plugin automatically configures the necessary environment variables for pytorch DPP to work, such as MASTER_ADDR, RANK, and WORLD_SIZE, as long as your pytorch code uses DDP correctly. Refer to pytorch’s documentation for more details on how to use DDP in your custom python code.

Launch 🚀

Now that our queue and cluster are set up, it’s time to launch some distributed training. To start off with we will use a job trains a simple multi-layer perceptron on random data using volcano’s pytorch plugin. You can find the source code for the job here.

To launch this job, head to the job’s page and click the Launch button in the top right corner of the screen. You will be prompted to select a queue to launch the job from.

  1. Set the jobs parameters however you like,
  2. Select the queue you created earlier.
  3. Modify the volcano job in the Resource config section to modify the parameters of your job. For example, you can change the number of workers by changing the replicas field in the worker task.
  4. Click Launch 🚀

You can monitor the progress and if necessary stop your job from the W&B UI.

3 - NVIDIA NeMo Inference Microservice Deploy Job

Deploy a model artifact from W&B to a NVIDIA NeMo Inference Microservice. To do this, use W&B Launch. W&B Launch converts model artifacts to NVIDIA NeMo Model and deploys to a running NIM/Triton server.

W&B Launch currently accepts the following compatible model types:

  1. Llama2
  2. StarCoder
  3. NV-GPT (coming soon)

Quickstart

  1. Create a launch queue if you don’t have one already. See an example queue config below.

    net: host
    gpus: all # can be a specific set of GPUs or `all` to use everything
    runtime: nvidia # also requires nvidia container runtime
    volume:
      - model-store:/model-store/
    
    image
  2. Create this job in your project:

    wandb job create -n "deploy-to-nvidia-nemo-inference-microservice" \
       -e $ENTITY \
       -p $PROJECT \
       -E jobs/deploy_to_nvidia_nemo_inference_microservice/job.py \
       -g andrew/nim-updates \
       git https://github.com/wandb/launch-jobs
    
  3. Launch an agent on your GPU machine:

    wandb launch-agent -e $ENTITY -p $PROJECT -q $QUEUE
    
  4. Submit the deployment launch job with your desired configs from the Launch UI

    1. You can also submit via the CLI:
      wandb launch -d gcr.io/playground-111/deploy-to-nemo:latest \
        -e $ENTITY \
        -p $PROJECT \
        -q $QUEUE \
        -c $CONFIG_JSON_FNAME
      
      image
  5. You can track the deployment process in the Launch UI. image

  6. Once complete, you can immediately curl the endpoint to test the model. The model name is always ensemble.

     #!/bin/bash
     curl -X POST "http://0.0.0.0:9999/v1/completions" \
         -H "accept: application/json" \
         -H "Content-Type: application/json" \
         -d '{
             "model": "ensemble",
             "prompt": "Tell me a joke",
             "max_tokens": 256,
             "temperature": 0.5,
             "n": 1,
             "stream": false,
             "stop": "string",
             "frequency_penalty": 0.0
             }'
    

4 - Spin up a single node GPU cluster with Minikube

Set up W&B Launch on a Minikube cluster that can schedule and run GPU workloads.

Background

The Nvidia container toolkit has made it easy to run GPU-enabled workflows on Docker. One limitation is a lack of native support for scheduling GPU by volume. If you want to use a GPU with the docker run command you must either request specific GPU by ID or all GPU present, which makes many distributed GPU enabled workloads impractical. Kubernetes offers support for scheduling by a volume request, but setting up a local Kubernetes cluster with GPU scheduling can take considerable time and effort, until recently. Minikube, one of the most popular tools for running single node Kubernetes clusters, recently released support for GPU scheduling 🎉 In this tutorial, we will create a Minikube cluster on a multi-GPU machine and launch concurrent stable diffusion inference jobs to the cluster using W&B Launch 🚀

Prerequisites

Before getting started, you will need:

  1. A W&B account.
  2. Linux machine with the following installed and running:
    1. Docker runtime
    2. Drivers for any GPU you want to use
    3. Nvidia container toolkit

Create a queue for launch jobs

First, create a launch queue for our launch jobs.

  1. Navigate to wandb.ai/launch (or <your-wandb-url>/launch if you use a private W&B server).
  2. In the top right corner of your screen, click the blue Create a queue button. A queue creation drawer will slide out from the right side of your screen.
  3. Select an entity, enter a name, and select Kubernetes as the type for your queue.
  4. The Config section of the drawer is where you will enter a Kubernetes job specification for the launch queue. Any runs launched from this queue will be created using this job specification, so you can modify this configuration as needed to customize your jobs. For this tutorial, you can copy and paste the sample config below in your queue config as YAML or JSON:
spec:
  template:
    spec:
      containers:
        - image: ${image_uri}
          resources:
            limits:
              cpu: 4
              memory: 12Gi
              nvidia.com/gpu: '{{gpus}}'
      restartPolicy: Never
  backoffLimit: 0
{
  "spec": {
    "template": {
      "spec": {
        "containers": [
          {
            "image": "${image_uri}",
            "resources": {
              "limits": {
                "cpu": 4,
                "memory": "12Gi",
                "nvidia.com/gpu": "{{gpus}}"
              }
            }
          }
        ],
        "restartPolicy": "Never"
      }
    },
    "backoffLimit": 0
  }
}

For more information about queue configurations, see the Set up Launch on Kubernetes and the Advanced queue setup guide.

The ${image_uri} and {{gpus}} strings are examples of the two kinds of variable templates that you can use in your queue configuration. The ${image_uri} template will be replaced with the image URI of the job you are launching by the agent. The {{gpus}} template will be used to create a template variable that you can override from the launch UI, CLI, or SDK when submitting a job. These values are placed in the job specification so that they will modify the correct fields to control the image and GPU resources used by the job.

  1. Click the Parse configuration button to begin customizing your gpus template variable.
  2. Set the Type to Integer and the Default, Min, and Max to values of your choosing. Attempts to submit a run to this queue which violate the constraints of the template variable will be rejected.
Image of queue creation drawer with gpus template variable
  1. Click Create queue to create your queue. You will be redirected to the queue page for your new queue.

In the next section, we will set up an agent that can pull and execute jobs from the queue you created.

Setup Docker + NVIDIA CTK

If you already have Docker and the Nvidia container toolkit setup on your machine, you can skip this section.

Refer to Docker’s documentation for instructions on setting up the Docker container engine on your system.

Once you have Docker installed, install the Nvidia container toolkit following the instructions in Nvidia’s documentation.

To validate that your container runtime has access to your GPU, you can run:

docker run --gpus all ubuntu nvidia-smi

You should see nvidia-smi output describing the GPU connected to your machine. For example, on our setup the output looks like this:

Wed Nov  8 23:25:53 2023
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 525.105.17   Driver Version: 525.105.17   CUDA Version: 12.0     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|===============================+======================+======================|
|   0  Tesla T4            Off  | 00000000:00:04.0 Off |                    0 |
| N/A   38C    P8     9W /  70W |      2MiB / 15360MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
|   1  Tesla T4            Off  | 00000000:00:05.0 Off |                    0 |
| N/A   38C    P8     9W /  70W |      2MiB / 15360MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
|   2  Tesla T4            Off  | 00000000:00:06.0 Off |                    0 |
| N/A   40C    P8     9W /  70W |      2MiB / 15360MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
|   3  Tesla T4            Off  | 00000000:00:07.0 Off |                    0 |
| N/A   39C    P8     9W /  70W |      2MiB / 15360MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+

+-----------------------------------------------------------------------------+
| Processes:                                                                  |
|  GPU   GI   CI        PID   Type   Process name                  GPU Memory |
|        ID   ID                                                   Usage      |
|=============================================================================|
|  No running processes found                                                 |
+-----------------------------------------------------------------------------+

Setup Minikube

Minikube’s GPU support requires version v1.32.0 or later. Refer to Minikube’s install documentation for up to date installation help. For this tutorial, we installed the latest Minikube release using the command:

curl -LO https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64
sudo install minikube-linux-amd64 /usr/local/bin/minikube

The next step is to start a minikube cluster using your GPU. On your machine, run:

minikube start --gpus all

The output of the command above will indicate whether a cluster has been successfully created.

Start launch agent

The launch agent for your new cluster can either be started by invoking wandb launch-agent directly or by deploying the launch agent using a helm chart managed by W&B.

In this tutorial we will run the agent directly on our host machine.

To run the agent locally, make sure your default Kubernetes API context refers to the Minikube cluster. Then, execute the following:

pip install "wandb[launch]"

to install the agent’s dependencies. To setup authentication for the agent, run wandb login or set the WANDB_API_KEY environment variable.

To start the agent, execute this command:

wandb launch-agent -j <max-number-concurrent-jobs> -q <queue-name> -e <queue-entity>

Within your terminal you should see the launch agent start to print polling message.

Congratulations, you have a launch agent polling your launch queue. When a job is added to your queue, your agent will pick it up and schedule it to run on your Minikube cluster.

Launch a job

Let’s send a job to our agent. You can launch a simple “hello world” from a terminal logged into your W&B account with:

wandb launch -d wandb/job_hello_world:main -p <target-wandb-project> -q <your-queue-name> -e <your-queue-entity>

You can test with any job or image you like, but make sure your cluster can pull your image. See Minikube’s documentation for additional guidance. You can also test using one of our public jobs.

(Optional) Model and data caching with NFS

For ML workloads we will often want multiple jobs to have access to the same data. For example, you might want to have a shared cache to avoid repeatedly downloading large assets like datasets or model weights. Kubernetes supports this through persistent volumes and persistent volume claims. Persistent volumes can be used to create volumeMounts in our Kubernetes workloads, providing direct filesystem access to the shared cache.

In this step, we will set up a network file system (NFS) server that can be used as a shared cache for model weights. The first step is to install and configure NFS. This process varies by operating system. Since our VM is running Ubuntu, we installed nfs-kernel-server and configured an export at /srv/nfs/kubedata:

sudo apt-get install nfs-kernel-server
sudo mkdir -p /srv/nfs/kubedata
sudo chown nobody:nogroup /srv/nfs/kubedata
sudo sh -c 'echo "/srv/nfs/kubedata *(rw,sync,no_subtree_check,no_root_squash,no_all_squash,insecure)" >> /etc/exports'
sudo exportfs -ra
sudo systemctl restart nfs-kernel-server

Keep note of the export location of the server in your host filesystem, as well as the local IP address of your NFS server. You need this information in the next step.

Next, you will need to create a persistent volume and persistent volume claim for this NFS. Persistent volumes are highly customizable, but we will use straightforward configuration here for the sake of simplicity.

Copy the yaml below into a file named nfs-persistent-volume.yaml , making sure to fill out your desired volume capacity and claim request. The PersistentVolume.spec.capcity.storage field controls the maximum size of the underlying volume. The PersistentVolumeClaim.spec.resources.requests.stroage can be used to limit the volume capacity allotted for a particular claim. For our use case, it makes sense to use the same value for each.

apiVersion: v1
kind: PersistentVolume
metadata:
  name: nfs-pv
spec:
  capacity:
    storage: 100Gi # Set this to your desired capacity.
  accessModes:
    - ReadWriteMany
  nfs:
    server: <your-nfs-server-ip> # TODO: Fill this in.
    path: '/srv/nfs/kubedata' # Or your custom path
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: nfs-pvc
spec:
  accessModes:
    - ReadWriteMany
  resources:
    requests:
      storage: 100Gi # Set this to your desired capacity.
  storageClassName: ''
  volumeName: nfs-pv

Create the resources in your cluster with:

kubectl apply -f nfs-persistent-volume.yaml

In order for our runs to make use of this cache, we will need to add volumes and volumeMounts to our launch queue config. To edit the launch config, head back to wandb.ai/launch (or <your-wandb-url>/launch for users on wandb server), find your queue, click to the queue page, and then click the Edit config tab. The original config can be modified to:

spec:
  template:
    spec:
      containers:
        - image: ${image_uri}
          resources:
            limits:
              cpu: 4
              memory: 12Gi
              nvidia.com/gpu: "{{gpus}}"
					volumeMounts:
            - name: nfs-storage
              mountPath: /root/.cache
      restartPolicy: Never
			volumes:
        - name: nfs-storage
          persistentVolumeClaim:
            claimName: nfs-pvc
  backoffLimit: 0
{
  "spec": {
    "template": {
      "spec": {
        "containers": [
          {
            "image": "${image_uri}",
            "resources": {
              "limits": {
                "cpu": 4,
                "memory": "12Gi",
                "nvidia.com/gpu": "{{gpus}}"
              },
              "volumeMounts": [
                {
                  "name": "nfs-storage",
                  "mountPath": "/root/.cache"
                }
              ]
            }
          }
        ],
        "restartPolicy": "Never",
        "volumes": [
          {
            "name": "nfs-storage",
            "persistentVolumeClaim": {
              "claimName": "nfs-pvc"
            }
          }
        ]
      }
    },
    "backoffLimit": 0
  }
}

Now, our NFS will be mounted at /root/.cache in the containers running our jobs. The mount path will require adjustment if your container runs as a user other than root. Huggingface’s libraries and W&B Artifacts both make use of $HOME/.cache/ by default, so downloads should only happen once.

Playing with stable diffusion

To test out our new system, we are going to experiment with stable diffusion’s inference parameters. To run a simple stable diffusion inference job with a default prompt and sane parameters, you can run:

wandb launch -d wandb/job_stable_diffusion_inference:main -p <target-wandb-project> -q <your-queue-name> -e <your-queue-entity>

The command above will submit the container image wandb/job_stable_diffusion_inference:main to your queue. Once your agent picks up the job and schedules it for execution on your cluster, it may take a while for the image to be pulled, depending on your connection. You can follow the status of the job on the queue page on wandb.ai/launch (or <your-wandb-url>/launch for users on wandb server).

Once the run has finished, you should have a job artifact in the project you specified. You can check your project’s job page (<project-url>/jobs) to find the job artifact. Its default name should be job-wandb_job_stable_diffusion_inference but you can change that to whatever you like on the job’s page by clicking the pencil icon next to the job name.

You can now use this job to run more stable diffusion inference on your cluster. From the job page, we can click the Launch button in the top right hand corner to configure a new inference job and submit it to our queue. The job configuration page will be pre-populated with the parameters from the original run, but you can change them to whatever you like by modifying their values in the Overrides section of the launch drawer.

Image of launch UI for stable diffusion inference job