チュートリアル
Weights & Biases を使用するためのインタラクティブなチュートリアルを始めましょう。
基本
次のチュートリアルでは、機械学習の実験管理、モデル評価、ハイパーパラメータチューニング、モデルとデータセットのバージョン管理などにおける W&B の基本を紹介します。
人気のある ML フレームワークチュートリアル
以下のチュートリアルでは、W&B を使用して人気のある ML フレームワークやライブラリを使用するためのステップバイステップの情報を提供します。
その他のリソース
W&B AI アカデミーで、アプリケーションでの LLM のトレーニング、ファインチューン、および使用方法を学ぶことができます。 MLOps および LLMOps ソリューションを実装します。 W&B コースを使用して、現実世界の ML 課題に取り組みましょう。
- 大規模言語モデル (LLMs)
- 効果的な MLOps
- W&B Models
1 - 実験をトラッキングする
W&B を使って機械学習の実験管理、モデルのチェックポイント、チームとの共同作業などを行いましょう。
このノートブックでは、簡単な PyTorch モデルを使用して機械学習の実験を作成し、追跡します。ノートブックの最後には、チームの他のメンバーと共有してカスタマイズ可能なインタラクティブなプロジェクトダッシュボードを持つことになるでしょう。ここで例のダッシュボードを閲覧できます。
前提条件
W&B Python SDK をインストールしてログインします:
# W&B アカウントにログイン
import wandb
import random
import math
# wandb の新しいバックエンド用に wandb-core を使用
wandb.require("core")
W&B を使用して機械学習の実験をシミュレーションし、追跡する
機械学習の実験を作成、追跡、視覚化します。これを行うには:
- W&B run を初期化し、追跡したいハイパーパラメーターを渡します。
- トレーニングループ内で、精度や損失などのメトリクスをログに記録します。
import random
import math
# シミュレートされた実験を 5 回実行します
total_runs = 5
for run in range(total_runs):
# 1️. このスクリプトを追跡するための新しい run を開始します
wandb.init(
# この run がログされるプロジェクトを設定
project="basic-intro",
# run 名を渡します(そうでなければ sunshine-lollypop-10 のようにランダムに割り当てられます)
name=f"experiment_{run}",
# ハイパーパラメーターと run メタデータを追跡
config={
"learning_rate": 0.02,
"architecture": "CNN",
"dataset": "CIFAR-100",
"epochs": 10,
})
# この簡単なブロックは、メトリクスをログに記録するトレーニングループをシミュレーションします
epochs = 10
offset = random.random() / 5
for epoch in range(2, epochs):
acc = 1 - 2 ** -epoch - random.random() / epoch - offset
loss = 2 ** -epoch + random.random() / epoch + offset
# 2️. スクリプトから W&B にメトリクスをログ
wandb.log({"acc": acc, "loss": loss})
# run を終了としてマーク
wandb.finish()
W&B プロジェクトでの機械学習のパフォーマンスを確認します。前のセルから出力される URL リンクをコピーして貼り付けてください。その URL は、グラフを表示するダッシュボードを含む W&B プロジェクトにリダイレクトされます。
以下の画像は、ダッシュボードがどのように見えるかを示しています。
W&B を疑似的な機械学習トレーニングループに統合する方法を理解したので、基本的な PyTorch ニューラルネットワークを使用して機械学習の実験を追跡してみましょう。以下のコードは、他の組織内チームと共有するために W&B にモデルのチェックポイントをアップロードすることもできます。
Pytorch を使用して機械学習の実験を追跡する
以下のコードセルは、簡単な MNIST クラス分類器を定義しトレーニングします。トレーニング中は、W&B が URL を表示します。プロジェクトページリンクをクリックして、W&B プロジェクトでリアルタイムに結果を確認してください。
W&B run では自動で メトリクス、システム情報、ハイパーパラメーター、ターミナル出力 をログし、モデルの入力と出力を含む インタラクティブテーブル が表示されます。
PyTorch Dataloader をセットアップする
次のセルでは、機械学習モデルをトレーニングするために必要な便利な関数を定義します。これらの関数は W&B に特化したものではないため、ここでは詳しくは説明しません。詳細については、forward および backward training loop の定義方法、トレーニングデータをロードするための PyTorch DataLoaders の使用方法、および torch.nn.Sequential
クラス を使用して PyTorch モデルを定義する方法について、PyTorch のドキュメントを参照してください。
# @title
import torch, torchvision
import torch.nn as nn
from torchvision.datasets import MNIST
import torchvision.transforms as T
MNIST.mirrors = [
mirror for mirror in MNIST.mirrors if "http://yann.lecun.com/" not in mirror
]
device = "cuda:0" if torch.cuda.is_available() else "cpu"
def get_dataloader(is_train, batch_size, slice=5):
"トレーニングデータローダーを取得する"
full_dataset = MNIST(
root=".", train=is_train, transform=T.ToTensor(), download=True
)
sub_dataset = torch.utils.data.Subset(
full_dataset, indices=range(0, len(full_dataset), slice)
)
loader = torch.utils.data.DataLoader(
dataset=sub_dataset,
batch_size=batch_size,
shuffle=True if is_train else False,
pin_memory=True,
num_workers=2,
)
return loader
def get_model(dropout):
"簡単なモデル"
model = nn.Sequential(
nn.Flatten(),
nn.Linear(28 * 28, 256),
nn.BatchNorm1d(256),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(256, 10),
).to(device)
return model
def validate_model(model, valid_dl, loss_func, log_images=False, batch_idx=0):
"検証データセットでのモデルの性能を計算し wandb.Table をログ"
model.eval()
val_loss = 0.0
with torch.inference_mode():
correct = 0
for i, (images, labels) in enumerate(valid_dl):
images, labels = images.to(device), labels.to(device)
# フォワードパス ➡
outputs = model(images)
val_loss += loss_func(outputs, labels) * labels.size(0)
# 精度を計算して累積する
_, predicted = torch.max(outputs.data, 1)
correct += (predicted == labels).sum().item()
# ダッシュボードに1バッチの画像を常に同じbatch_idxでログ
if i == batch_idx and log_images:
log_image_table(images, predicted, labels, outputs.softmax(dim=1))
return val_loss / len(valid_dl.dataset), correct / len(valid_dl.dataset)
予測値と真の値を比較するためのテーブルを作成する
以下のセルは W&B に固有のものなので、一緒に見ていきましょう。
このセルでは log_image_table
という関数を定義しています。技術的にはオプションですが、この関数は W&B Table オブジェクトを作成します。このテーブルオブジェクトを使用して、各画像に対してモデルがどのように予測したかを示すテーブルを作成します。
具体的には、それぞれの行にはモデルに入力された画像、予測された値、そして実際の値 (ラベル) が含まれます。
def log_image_table(images, predicted, labels, probs):
"wandb.Table を (img, pred, target, scores) としてログ"
# 画像、ラベル、および予測を記録するための wandb テーブルを作成
table = wandb.Table(
columns=["image", "pred", "target"] + [f"score_{i}" for i in range(10)]
)
for img, pred, targ, prob in zip(
images.to("cpu"), predicted.to("cpu"), labels.to("cpu"), probs.to("cpu")
):
table.add_data(wandb.Image(img[0].numpy() * 255), pred, targ, *prob.numpy())
wandb.log({"predictions_table": table}, commit=False)
モデルをトレーニングし、チェックポイントをアップロードする
以下のコードは、プロジェクトにモデルのチェックポイントを保存します。トレーニング中にモデルのパフォーマンスを評価するために、通常どおりモデルのチェックポイントを使用します。
W&B は、保存したモデルやモデルのチェックポイントをチームや組織の他のメンバーと容易に共有することも可能です。チーム外のメンバーとモデルやモデルのチェックポイントを共有する方法は、W&B Registry をご覧ください。
# 3 つの異なるドロップアウト率を試して 3 つの実験を開始する
for _ in range(3):
# wandb run を初期化
wandb.init(
project="pytorch-intro",
config={
"epochs": 5,
"batch_size": 128,
"lr": 1e-3,
"dropout": random.uniform(0.01, 0.80),
},
)
# config をコピー
config = wandb.config
# データを取得
train_dl = get_dataloader(is_train=True, batch_size=config.batch_size)
valid_dl = get_dataloader(is_train=False, batch_size=2 * config.batch_size)
n_steps_per_epoch = math.ceil(len(train_dl.dataset) / config.batch_size)
# 簡単な MLP モデル
model = get_model(config.dropout)
# 損失とオプティマイザーを作成
loss_func = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=config.lr)
# トレーニング
example_ct = 0
step_ct = 0
for epoch in range(config.epochs):
model.train()
for step, (images, labels) in enumerate(train_dl):
images, labels = images.to(device), labels.to(device)
outputs = model(images)
train_loss = loss_func(outputs, labels)
optimizer.zero_grad()
train_loss.backward()
optimizer.step()
example_ct += len(images)
metrics = {
"train/train_loss": train_loss,
"train/epoch": (step + 1 + (n_steps_per_epoch * epoch))
/ n_steps_per_epoch,
"train/example_ct": example_ct,
}
if step + 1 < n_steps_per_epoch:
# トレーニングメトリクスを wandb にログ
wandb.log(metrics)
step_ct += 1
val_loss, accuracy = validate_model(
model, valid_dl, loss_func, log_images=(epoch == (config.epochs - 1))
)
# トレーニングと検証のメトリクスを wandb にログ
val_metrics = {"val/val_loss": val_loss, "val/val_accuracy": accuracy}
wandb.log({**metrics, **val_metrics})
# モデルのチェックポイントを wandb に保存
torch.save(model, "my_model.pt")
wandb.log_model(
"./my_model.pt",
"my_mnist_model",
aliases=[f"epoch-{epoch+1}_dropout-{round(wandb.config.dropout, 4)}"],
)
print(
f"Epoch: {epoch+1}, Train Loss: {train_loss:.3f}, Valid Loss: {val_loss:3f}, Accuracy: {accuracy:.2f}"
)
# テストセットがあった場合、以下のようにして Summary メトリクスとしてログすることができます
wandb.summary["test_accuracy"] = 0.8
# wandb run を閉じる
wandb.finish()
これで W&B を使用して最初のモデルをトレーニングしました。上記のリンクをクリックしてメトリクスを確認し、W&B App UI の Artifacts タブで保存したモデルのチェックポイントを確認してください。
(オプション) W&B アラートを設定する
W&B アラート を作成して、Python コードからあなたの Slack やメールにアラートを送信します。
コードから発生する 1 回目の Slack またはメールのアラートに対して実施する 2 つの手順は次のとおりです:
- W&B のユーザー設定でアラートをオンにする
wandb.alert()
をコードに追加します。例:
wandb.alert(title="低精度", text=f"精度が許容範囲を下回りました")
以下のセルでは、wandb.alert
の使い方を見るための最小限の例を示しています。
# wandb run を開始
wandb.init(project="pytorch-intro")
# モデルトレーニングループをシミュレーション
acc_threshold = 0.3
for training_step in range(1000):
# 精度のランダムな数値を生成
accuracy = round(random.random() + random.random(), 3)
print(f"Accuracy is: {accuracy}, {acc_threshold}")
# 精度を wandb にログ
wandb.log({"精度": accuracy})
# 精度がしきい値を下回った場合、W&B アラートを発動し run を停止する
if accuracy <= acc_threshold:
# wandb アラートを送信
wandb.alert(
title="低精度",
text=f"精度 {accuracy} はステップ {training_step} で許容しきい値 {acc_threshold} を下回っています",
)
print("アラートが発動されました")
break
# run を終了としてマーク(Jupyter ノートブックで便利)
wandb.finish()
W&B アラートの完全なドキュメントはこちらで見つけることができます。
次のステップ
次のチュートリアルでは、W&B Sweeps を使用したハイパーパラメーター最適化について学びます:
PyTorchを使ったハイパーパラメータースイープ
2 - 予測をテーブルで視覚化する
これは PyTorch を使用して MNIST データ上でトレーニングの過程でモデルの予測を追跡、可視化、比較する方法をカバーしています。
次のことを学びます:
- モデルトレーニングまたは評価中に
wandb.Table()
にメトリクス、画像、テキストなどを記録する
- これらのテーブルを表示、ソート、フィルター、グループ化、結合、対話的にクエリし、探索する
- モデルの予測または結果を比較する: 特定の画像、ハイパーパラメーター/モデルバージョン、またはタイムステップにわたって動的に比較する
例
特定の画像に対する予測スコアを比較する
ライブ例: トレーニング1 エポック目と5 エポック目の予測を比較する →
ヒストグラムは、2つのモデル間のクラスごとのスコアを比較します。各ヒストグラムの上部の緑のバーは、1回のエポックしかトレーニングされていないモデル「CNN-2, 1 epoch」(ID 0)を表しています。下部の紫のバーは、5エポックでトレーニングされたモデル「CNN-2, 5 epochs」(ID 1)を表しています。画像は、モデル間で意見が分かれるケースにフィルターされています。例えば、最初の行では、「4」が1エポック後すべての可能な数字で高いスコアを得ていますが、5エポック後には正しいラベルで最も高いスコアを獲得し、他のものは非常に低いスコアを得ています。
時間経過に伴う主要なエラーに焦点を当てる
ライブ例 →
テストデータ全体で誤った予測(“guess” != “truth” でフィルター)を参照します。1回のトレーニングエポック後には229件の間違った予測がありますが、5エポック後には98件のみです。
モデルのパフォーマンスを比較し、パターンを見つける
ライブ例で詳細を確認する →
正解を除外し、推測ごとにグループ化して、画像の誤分類例と真のラベルの根底にある分布を参照することができます。2倍のレイヤーサイズと学習率のモデルバリアントが左にあり、ベースラインが右にあります。ベースラインは推測された各クラスの間違いをわずかに多くします。
サインアップまたはログイン
サインアップまたはログインして、ブラウザで実験を見て操作します。
この例では、Google Colab を便利なホスト環境として使用していますが、どこからでもトレーニングスクリプトを実行し、W&B の実験管理ツールを使用してメトリクスを視覚化することができます。
アカウントにログインする
import wandb
wandb.login()
WANDB_PROJECT = "mnist-viz"
0. セットアップ
依存関係をインストールし、MNIST をダウンロードし、PyTorch を使用してトレインとテストのデータセットを作成します。
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as T
import torch.nn.functional as F
device = "cuda:0" if torch.cuda.is_available() else "cpu"
# トレインとテストのデータローダーを作成する
def get_dataloader(is_train, batch_size, slice=5):
"トレーニングデータローダーを取得する"
ds = torchvision.datasets.MNIST(root=".", train=is_train, transform=T.ToTensor(), download=True)
loader = torch.utils.data.DataLoader(dataset=ds,
batch_size=batch_size,
shuffle=True if is_train else False,
pin_memory=True, num_workers=2)
return loader
1. モデルとトレーニングスケジュールの定義
- 各エポックがトレーニングステップと検証(テスト)ステップで構成されるように、実行するエポック数を設定します。オプションで、各テストステップで記録するデータ量を設定します。ここでは、デモを簡略化するために、バッチ数とバッチごとに可視化する画像の数を少なく設定しています。
- シンプルな畳み込みニューラルネットワークを定義します(pytorch-tutorial のコードに従います)。
- PyTorch を使用してトレインとテストセットをロードする
# 実行するエポック数
# 各エポックはトレーニングステップとテストステップを含みます。これで、ログするテスト予測のテーブル数が設定されます
EPOCHS = 1
# 各テストステップのテストデータからログするバッチ数
# (シンプルなデモのためにデフォルトを低く設定)
NUM_BATCHES_TO_LOG = 10 #79
# 各テストバッチごとにログする画像の数
# (シンプルなデモのためにデフォルトを低く設定)
NUM_IMAGES_PER_BATCH = 32 #128
# トレーニング設定とハイパーパラメーター
NUM_CLASSES = 10
BATCH_SIZE = 32
LEARNING_RATE = 0.001
L1_SIZE = 32
L2_SIZE = 64
# これを変更すると、隣接するレイヤーの形状を変更する必要があります
CONV_KERNEL_SIZE = 5
# 二層の畳み込みニューラルネットワークを定義
class ConvNet(nn.Module):
def __init__(self, num_classes=10):
super(ConvNet, self).__init__()
self.layer1 = nn.Sequential(
nn.Conv2d(1, L1_SIZE, CONV_KERNEL_SIZE, stride=1, padding=2),
nn.BatchNorm2d(L1_SIZE),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2))
self.layer2 = nn.Sequential(
nn.Conv2d(L1_SIZE, L2_SIZE, CONV_KERNEL_SIZE, stride=1, padding=2),
nn.BatchNorm2d(L2_SIZE),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2))
self.fc = nn.Linear(7*7*L2_SIZE, NUM_CLASSES)
self.softmax = nn.Softmax(NUM_CLASSES)
def forward(self, x):
# 指定されたレイヤーの形状を確認するためにコメント解除:
#print("x: ", x.size())
out = self.layer1(x)
out = self.layer2(out)
out = out.reshape(out.size(0), -1)
out = self.fc(out)
return out
train_loader = get_dataloader(is_train=True, batch_size=BATCH_SIZE)
test_loader = get_dataloader(is_train=False, batch_size=2*BATCH_SIZE)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
2. トレーニングを実行し、テスト予測をログします
各エポックごとにトレーニングステップとテストステップを実行します。各テストステップごとに、テスト予測を保存する wandb.Table()
を作成します。これらはブラウザで視覚化され、動的にクエリされ、並べて比較されます。
# W&B: このモデルのトレーニングを追跡する新しい run を初期化します
wandb.init(project="table-quickstart")
# W&B: config を使用してハイパーパラメーターをログ
cfg = wandb.config
cfg.update({"epochs" : EPOCHS, "batch_size": BATCH_SIZE, "lr" : LEARNING_RATE,
"l1_size" : L1_SIZE, "l2_size": L2_SIZE,
"conv_kernel" : CONV_KERNEL_SIZE,
"img_count" : min(10000, NUM_IMAGES_PER_BATCH*NUM_BATCHES_TO_LOG)})
# モデル、損失関数、オプティマイザーを定義
model = ConvNet(NUM_CLASSES).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)
# テスト画像のバッチの予測をログするための便利な関数
def log_test_predictions(images, labels, outputs, predicted, test_table, log_counter):
# すべてのクラスの信頼スコアを取得
scores = F.softmax(outputs.data, dim=1)
log_scores = scores.cpu().numpy()
log_images = images.cpu().numpy()
log_labels = labels.cpu().numpy()
log_preds = predicted.cpu().numpy()
# 画像の順序に基づいて ID を追加
_id = 0
for i, l, p, s in zip(log_images, log_labels, log_preds, log_scores):
# データテーブルに必要な情報を追加:
# ID、画像ピクセル、モデルの推測、真のラベル、すべてのクラスのスコア
img_id = str(_id) + "_" + str(log_counter)
test_table.add_data(img_id, wandb.Image(i), p, l, *s)
_id += 1
if _id == NUM_IMAGES_PER_BATCH:
break
# モデルをトレーニングする
total_step = len(train_loader)
for epoch in range(EPOCHS):
# トレーニングステップ
for i, (images, labels) in enumerate(train_loader):
images = images.to(device)
labels = labels.to(device)
# forward pass
outputs = model(images)
loss = criterion(outputs, labels)
# backward and optimize
optimizer.zero_grad()
loss.backward()
optimizer.step()
# W&B: トレーニングステップでの損失をログし、UIでライブ視覚化
wandb.log({"loss" : loss})
if (i+1) % 100 == 0:
print ('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'
.format(epoch+1, EPOCHS, i+1, total_step, loss.item()))
# W&B: 各テストステップでの予測を保存するためのテーブルを作成
columns=["id", "image", "guess", "truth"]
for digit in range(10):
columns.append("score_" + str(digit))
test_table = wandb.Table(columns=columns)
# モデルをテスト
model.eval()
log_counter = 0
with torch.no_grad():
correct = 0
total = 0
for images, labels in test_loader:
images = images.to(device)
labels = labels.to(device)
outputs = model(images)
_, predicted = torch.max(outputs.data, 1)
if log_counter < NUM_BATCHES_TO_LOG:
log_test_predictions(images, labels, outputs, predicted, test_table, log_counter)
log_counter += 1
total += labels.size(0)
correct += (predicted == labels).sum().item()
acc = 100 * correct / total
# W&B: トレーニングエポックの精度をログして、UIで可視化
wandb.log({"epoch" : epoch, "acc" : acc})
print('Test Accuracy of the model on the 10000 test images: {} %'.format(acc))
# W&B: 予測テーブルを wandb にログ
wandb.log({"test_predictions" : test_table})
# W&B: run を完了としてマークする(マルチセルノートブックに便利)
wandb.finish()
次は何ですか?
次のチュートリアルでは、W&B Sweeps を使用してハイパーパラメーターを最適化する方法を学びます。
3 - ハイパーパラメーターをスイープでチューニングする
望みのメトリクス(例えばモデルの精度)を満たす機械学習モデルを見つけることは、通常、複数回のイテレーションを必要とする冗長な作業です。さらに悪いことに、特定のトレーニング run にどのハイパーパラメータの組み合わせを使用すべきか不明瞭な場合があります。
W&B Sweeps を使用して、学習率、バッチサイズ、隠れ層の数、オプティマイザーの種類などのハイパーパラメータ値の組み合わせを自動的に検索し、望みのメトリクスに基づいてモデルを最適化するための組織化された効率的な方法を作成します。
このチュートリアルでは、W&B PyTorch インテグレーションを使用してハイパーパラメータ探索を作成します。ビデオチュートリアルと一緒に進めてください。
Sweeps: 概要
Weights & Biases を使ったハイパーパラメータ探索 run はとても簡単です。以下のシンプルな3ステップで行います:
-
スイープの定義: 検索するパラメータ、検索戦略、最適化メトリクスなどを指定する辞書またはYAMLファイルを作成します。
-
スイープの初期化: 1行のコードでスイープを初期化し、sweep 設定の辞書を渡します:
sweep_id = wandb.sweep(sweep_config)
-
スイープエージェントの実行: こちらも1行のコードで完了し、wandb.agent()
を使ってsweep_id
を渡し、モデルアーキテクチャーを定義してトレーニングする関数と共に実行します:
wandb.agent(sweep_id, function=train)
始める前に
W&B をインストールし、W&B Python SDK をノートブックにインポートします:
!pip install
を使用してインストールします:
!pip install wandb -Uq
- W&B をインポートします:
import wandb
- W&B にログインし、プロンプトが表示されたら APIキー を提供します:
wandb.login()
ステップ 1️: スイープを定義する
W&B スイープは、ハイパーパラメータ値を試行するための戦略と、それを評価するコードを組み合わせたものです。
スイープを開始する前に、スイープ戦略を_スイープ設定_で定義する必要があります。
Jupyter ノートブックでスイープを開始する場合、スイープ設定はネストされた辞書でなければなりません。
コマンドラインでスイープを行う場合は、YAMLファイルでスイープ設定を指定する必要があります。
検索メソッドの選択
最初に、設定辞書内でハイパーパラメータ検索メソッドを指定します。選べるハイパーパラメータ検索戦略は、グリッド、ランダム、ベイズ探索です。
このチュートリアルでは、ランダム検索を使用します。ノートブック内で辞書を作成し、method
キーに対して random
を指定します。
sweep_config = {
'method': 'random'
}
最適化したいメトリクスを指定します。ランダム検索メソッドを使用するスイープでは、必ずしもメトリクスとゴールを指定する必要はありません。しかし、スイープの目標を把握しておくことで、後で参照することができるため、良い習慣です。
metric = {
'name': 'loss',
'goal': 'minimize'
}
sweep_config['metric'] = metric
検索するハイパーパラメータの指定
スイープ設定で検索メソッドが指定されたので、検索したいハイパーパラメータを指定します。
これを行うには、parameter
キーに1つ以上のハイパーパラメータ名を指定し、value
キーに1つ以上のハイパーパラメータ値を指定します。
特定のハイパーパラメータに対して検索する値は、調査しているハイパーパラメータの種類に依存します。
例えば、機械学習オプティマイザーを選択した場合、Adam オプティマイザーや確率的勾配降下法など、1つ以上の有限のオプティマイザー名を指定する必要があります。
parameters_dict = {
'optimizer': {
'values': ['adam', 'sgd']
},
'fc_layer_size': {
'values': [128, 256, 512]
},
'dropout': {
'values': [0.3, 0.4, 0.5]
},
}
sweep_config['parameters'] = parameters_dict
時にはハイパーパラメータを追跡したいが、その値を変えたくない場合があります。この場合、スイープ設定にハイパーパラメータを追加し、使用したい正確な値を指定します。以下のコードセルでは、epochs
が1に設定されています。
parameters_dict.update({
'epochs': {
'value': 1}
})
random
検索の場合、
特定の run で選ばれるパラメータのvalues
は均等に確率が分布しています。
あるいは、
特定のdistribution
を指定し、
例えばnormal
分布の平均mu
と標準偏差sigma
をパラメータとして指定することもできます。
parameters_dict.update({
'learning_rate': {
# 0から0.1のフラットな分布
'distribution': 'uniform',
'min': 0,
'max': 0.1
},
'batch_size': {
# 32から256の整数
# 対数が均等分布
'distribution': 'q_log_uniform_values',
'q': 8,
'min': 32,
'max': 256,
}
})
最後に完成したsweep_config
は、試してみたいparameters
を具体的に示したネストされた辞書で、
それらを試すためのmethod
を指定します。
次に、スイープ設定がどのように見えるかを確認しましょう:
import pprint
pprint.pprint(sweep_config)
設定オプションの全リストについては、Sweep 設定オプションをご覧ください。
可能性のある無限の選択肢を持つハイパーパラメータの場合
選択した少数のvalues
を試してみるのが通常意味があります。例えば、先のスイープ設定では、layer_size
およびdropout
パラメータキーに対して限定された値のリストが指定されています。
ステップ 2️: スイープの初期化
検索戦略を定義したら、それを実装するための準備をします。
W&Bは、クラウドまたはローカルで複数のマシンにわたってスイープを管理する Sweep Controller を使用します。このチュートリアルでは、W&Bにより管理されるスイープコントローラを使用します。
スイープコントローラーはスイープを管理しますが、実際にスイープを実行するコンポーネントは、_sweep agent_として知られています。
デフォルトでは、スイープコントローラのコンポーネントはW&Bのサーバー上で開始され、スイープエージェントはローカルマシン上で作成されます。
ノートブック内で、wandb.sweep
メソッドを使用してスイープコントローラをアクティブにできます。先ほど定義したスイープ設定辞書をsweep_config
フィールドに渡します:
sweep_id = wandb.sweep(sweep_config, project="pytorch-sweeps-demo")
wandb.sweep
関数は、後のステップでスイープをアクティブ化するために使用するsweep_id
を返します。
コマンドラインでは、この関数は次のように置き換えられます
ターミナルで W&B Sweeps を作成する方法についての詳細は、W&B Sweep ウォークスルーを参照してください。
ステップ 3: 機械学習コードを定義する
スイープを実行する前に、
試してみたいハイパーパラメータ値を使用するトレーニング手順を定義します。W&B Sweepsをトレーニングコードに統合するための鍵は、各トレーニング実験のために、あなたのトレーニングロジックがスイープ設定で定義されたハイパーパラメータ値にアクセスできるようにすることです。
次のコード例では、ヘルパー関数build_dataset
、build_network
、build_optimizer
、およびtrain_epoch
が、スイープハイパーパラメータ設定辞書にアクセスします。
次の機械学習トレーニングコードをノートブック内で実行します。これらの関数は、PyTorchでの基本的な全結合ニューラルネットワークを定義しています。
import torch
import torch.optim as optim
import torch.nn.functional as F
import torch.nn as nn
from torchvision import datasets, transforms
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
def train(config=None):
# 新しい wandb run を初期化する
with wandb.init(config=config):
# wandb.agent によって呼ばれた場合、以下のようにスイープコントローラによってこの構成が設定されます
config = wandb.config
loader = build_dataset(config.batch_size)
network = build_network(config.fc_layer_size, config.dropout)
optimizer = build_optimizer(network, config.optimizer, config.learning_rate)
for epoch in range(config.epochs):
avg_loss = train_epoch(network, loader, optimizer)
wandb.log({"loss": avg_loss, "epoch": epoch})
train
関数内では、次のW&B Python SDKメソッドが見られます:
次のセルでは、4つの関数を定義します:
build_dataset
、build_network
、build_optimizer
、およびtrain_epoch
。
これらの関数は基本的なPyTorchパイプラインの標準的な部分であり、
W&Bの使用によってその実装に影響しません。
def build_dataset(batch_size):
transform = transforms.Compose(
[transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))])
# MNISTトレーニングデータセットをダウンロード
dataset = datasets.MNIST(".", train=True, download=True,
transform=transform)
sub_dataset = torch.utils.data.Subset(
dataset, indices=range(0, len(dataset), 5))
loader = torch.utils.data.DataLoader(sub_dataset, batch_size=batch_size)
return loader
def build_network(fc_layer_size, dropout):
network = nn.Sequential( # 完全連結のシングル隠れ層
nn.Flatten(),
nn.Linear(784, fc_layer_size), nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(fc_layer_size, 10),
nn.LogSoftmax(dim=1))
return network.to(device)
def build_optimizer(network, optimizer, learning_rate):
if optimizer == "sgd":
optimizer = optim.SGD(network.parameters(),
lr=learning_rate, momentum=0.9)
elif optimizer == "adam":
optimizer = optim.Adam(network.parameters(),
lr=learning_rate)
return optimizer
def train_epoch(network, loader, optimizer):
cumu_loss = 0
for _, (data, target) in enumerate(loader):
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
# ➡ Forward pass
loss = F.nll_loss(network(data), target)
cumu_loss += loss.item()
# ⬅ Backward pass + weight update
loss.backward()
optimizer.step()
wandb.log({"batch loss": loss.item()})
return cumu_loss / len(loader)
PyTorch を使用した W&B のインストゥルメントについての詳細は、この Colabを参照してください。
ステップ 4: スイープエージェントのアクティベート
スイープ設定が定義され、ハイパーパラメータを対話的に利用できるトレーニングスクリプトが準備できたので、スイープエージェントをアクティベートする準備ができました。スイープエージェントは、スイープ設定で定義されたハイパーパラメータ値のセットを使用して実験を実行する責任を負っています。
sweep_id
を持つスイープの一部として、スイープが実行する関数(この例ではtrain
関数を使用)、(オプションで)スイープコントローラーに何個の設定を要求するかを指定してください。
同じsweep_id
を持つスイープエージェントを異なる計算リソース上で起動することができます。スイープコントローラーがあなたの定義したスイープ設定に基づいて協働することを保証します。
次のセルは、トレーニング関数(train
)を5回実行するスイープエージェントをアクティベートします:
wandb.agent(sweep_id, train, count=5)
スイープ設定でrandom
検索メソッドが指定されていたため、スイープコントローラーはランダムに生成されたハイパーパラメータ値を提供します。
ターミナルで W&B Sweeps を作成する方法についての詳細は、W&B Sweep ウォークスルーを参照してください。
スイープ結果の可視化
Parallel Coordinates プロット
このプロットは、ハイパーパラメータ値をモデルメトリクスにマッピングします。最も良いモデルパフォーマンスをもたらしたハイパーパラメータの組み合わせを特定するのに役立ちます。
ハイパーパラメータの重要度プロット
ハイパーパラメータの重要度プロットは、あなたのメトリクスの最良の予測因子だったハイパーパラメータを表面化します。
特徴重要度(ランダムフォレストモデルによる)と相関関係(暗黙の線形モデル)を報告します。
これらの可視化は、最も重要であり、それによって更に探求する価値があるパラメータ(および値範囲)に絞り込むことによって、高価なハイパーパラメータ最適化にかかる時間とリソースを節約するのに役立ちます。
W&B Sweeps についてもっと学ぶ
私たちは、あなたが試してみるためのシンプルなトレーニングスクリプトといくつかのスイープ設定のバリエーションを用意しました。これらを試すことを強くお勧めします。
そのリポジトリには、より高度なスイープ機能を試すための例もあります。例えば、Bayesian HyperbandとHyperoptが含まれています。
4 - モデルとデータセットをトラッキングする
このノートブックでは、W&B Artifacts を使用して ML 実験パイプラインを追跡する方法を紹介します。
ビデオチュートリアルとともに進めてください。
アーティファクトについて
ギリシャのアンフォラのように、アーティファクトは生成されたオブジェクト、つまりプロセスの出力です。
MLでは、最も重要なアーティファクトは datasets と models です。
そして、コロナドの十字架のように、これらの重要なアーティファクトは博物館に保管されるべきです。
それは、あなたやあなたのチーム、そして広範な ML コミュニティがそれらから学べるように、カタログ化されて整理されるべきだということです。
結局のところ、トレーニングを追跡しない人々はそれを繰り返す運命にあります。
私たちの Artifacts API を使用することで、Artifact
を W&B Run
の出力としてログしたり、Run
の入力として Artifact
を使用したりできます。
この図では、トレーニング run がデータセットを取り込み、モデルを生成します。
1つの run が他の run の出力を入力として使用できるため、Artifact
と Run
は、Artifact
と Run
のノードを持ち、それを消費または生成する Run
に接続する矢印を持つ、有向グラフ(二部 DAG)を形成します。
アーティファクトを使用してモデルとデータセットを追跡する
インストールとインポート
Artifacts は、バージョン 0.9.2
以降の私たちの Python ライブラリの一部です。
ほとんどの ML Python スタックの部分と同様に、pip
で利用可能です。
# wandb バージョン 0.9.2+ 互換
!pip install wandb -qqq
!apt install tree
データセットをログする
まず、いくつかの Artifacts を定義しましょう。
この例は、PyTorch の
“Basic MNIST Example”
を基にしていますが、TensorFlow や他のフレームワーク、
あるいは純粋な Python でも同様に行うことができます。
データセットを train
、validation
、test
として次のように定義します。
- パラメータを選択するための
train
セット
- ハイパーパラメータを選択するための
validation
セット
- 最終モデルを評価するための
test
セット
以下の最初のセルでこれらの3つのデータセットを定義します。
import random
import torch
import torchvision
from torch.utils.data import TensorDataset
from tqdm.auto import tqdm
# 挙動を決定的に
torch.backends.cudnn.deterministic = True
random.seed(0)
torch.manual_seed(0)
torch.cuda.manual_seed_all(0)
# デバイスの設定
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# データパラメータ
num_classes = 10
input_shape = (1, 28, 28)
# 遅いミラーを MNIST ミラーリストから削除
torchvision.datasets.MNIST.mirrors = [mirror for mirror in torchvision.datasets.MNIST.mirrors
if not mirror.startswith("http://yann.lecun.com")]
def load(train_size=50_000):
"""
# データをロードする
"""
# データ、train と test セットで分割
train = torchvision.datasets.MNIST("./", train=True, download=True)
test = torchvision.datasets.MNIST("./", train=False, download=True)
(x_train, y_train), (x_test, y_test) = (train.data, train.targets), (test.data, test.targets)
# ハイパーパラメータチューニングのために検証セットを分割
x_train, x_val = x_train[:train_size], x_train[train_size:]
y_train, y_val = y_train[:train_size], y_train[train_size:]
training_set = TensorDataset(x_train, y_train)
validation_set = TensorDataset(x_val, y_val)
test_set = TensorDataset(x_test, y_test)
datasets = [training_set, validation_set, test_set]
return datasets
このコードでは、アーティファクトとしてデータをログするコードがデータを生成するコードにラップされているというパターンが示されています。
この場合、データを load
するコードはデータを load_and_log
するコードとは分かれています。
これは良い実践です。
これらのデータセットをアーティファクトとしてログするには、次の手順が必要です。
wandb.init
で Run
を作成する (L4)
- データセットの
Artifact
を作成する (L10)
- 関連する
file
を保存してログする (L20, L23)
以下のコードセルの例を確認し、その後のセクションで詳細を展開してください。
def load_and_log():
# 🚀 run を開始し、ラベルを付けてどのプロジェクトに属するか呼び出せるようにする
with wandb.init(project="artifacts-example", job_type="load-data") as run:
datasets = load() # データセットをロードするコードを分ける
names = ["training", "validation", "test"]
# 🏺 アーティファクトを作成
raw_data = wandb.Artifact(
"mnist-raw", type="dataset",
description="Raw MNIST dataset, split into train/val/test",
metadata={"source": "torchvision.datasets.MNIST",
"sizes": [len(dataset) for dataset in datasets]})
for name, data in zip(names, datasets):
# 🐣 アーティファクトに新しいファイルを保存し、その内容に何かを書き込む
with raw_data.new_file(name + ".pt", mode="wb") as file:
x, y = data.tensors
torch.save((x, y), file)
# ✍️ W&B にアーティファクトを保存
run.log_artifact(raw_data)
load_and_log()
wandb.init
Artifact
を生成する Run
を作成するとき、その project
に属することを明示する必要があります。
ワークフローに応じて、
プロジェクトは car-that-drives-itself
のように大きいものであったり、
iterative-architecture-experiment-117
のように小さいものであったりします。
良い実践のルール: 可能であれば、同じ Artifact
を共有するすべての Run
を単一のプロジェクト内に保持してください。これによりシンプルになりますが、心配しないでください – Artifact
はプロジェクト間で移動可能です。
実行する様々な種類のジョブを追跡するために、Runs
を作成する際に job_type
を提供することをお勧めします。
これにより、Artifacts のグラフが整然とした状態を保つことができます。
良い実践のルール: job_type
は説明的であり、単一のパイプラインのステップに対応するものであるべきです。ここでは、preprocess
データと load
データを分けて処理しています。
wandb.Artifact
何かを Artifact
としてログするためには、最初に Artifact
オブジェクトを作成する必要があります。
すべての Artifact
には name
があり – これが第一引数で設定されます。
良い実践のルール: name
は説明的で、記憶しやすくタイプしやすいものであるべきです – 私たちは、コード内の変数名に対応するハイフンで区切られた名前を使用するのが好きです。
また、それには type
があります。Run
の job_type
と同様に、これも Run
と Artifact
のグラフを整理するために使用されます。
良い実践のルール: type
はシンプルであるべきです。
mnist-data-YYYYMMDD
よりも dataset
や model
のように。
また、辞書として description
と metadata
をアタッチすることもできます。
metadata
は JSON にシリアライズ可能である必要があります。
良い実践のルール: metadata
はできる限り詳しく記述するべきです。
artifact.new_file
および run.log_artifact
一度 Artifact
オブジェクトを作成したら、ファイルを追加する必要があります。
あなたはそれを正しく読みました。複数のファイル(s
付き)です。
Artifact
はディレクトリーのように構造化されており、
ファイルとサブディレクトリーを持っています。
良い実践のルール: 可能な限り、Artifact
の内容を複数のファイルに分割してください。スケールする時が来たときに役立ちます。
new_file
メソッドを使用して、ファイルを書き込むと同時にArtifact
にアタッチします。
次のセクションでは、これらの2つの手順を分ける add_file
メソッドを使用します。
すべてのファイルを追加したら、run.log_artifact
を使用して wandb.ai にログする必要があります。
出力には Run
ページの URL などのいくつかの URL が表示されることに気付くでしょう。
そこでは、ログされた Artifact
を含む Run
の結果を確認できます。
下記では、Run ページの他のコンポーネントをより活用する例をいくつか見ていきます。
ログされたデータセットアーティファクトを使用する
W&B 内の Artifact
は博物館内のアーティファクトとは異なり、
単に保存されるだけでなく 使用 を想定しています。
それがどのように機能するか見てみましょう。
以下のセルでは、生のデータセットを取り込み、それを preprocess
したデータセット(つまり normalize
され正しい形に整形されたもの)を生成するパイプラインステップを定義しています。
また、コードの重要部分である preprocess
を wandb
とインターフェースするコードから分けています。
def preprocess(dataset, normalize=True, expand_dims=True):
"""
## データを準備する
"""
x, y = dataset.tensors
if normalize:
# 画像を[0, 1]の範囲にスケール
x = x.type(torch.float32) / 255
if expand_dims:
# 画像が形状(1, 28, 28)を持つことを確認
x = torch.unsqueeze(x, 1)
return TensorDataset(x, y)
次に示すのは、この preprocess
ステップを wandb.Artifact
ログで装備するためのコードです。
以下の例では、use
する Artifact
が新たに登場し、log
することは前のステップと同様です。
Artifact
は、Run
の入力と出力の両方です。
このステップが前のものとは異なる種類のジョブであることを明確にするために、新しい job_type
、preprocess-data
を使用します。
def preprocess_and_log(steps):
with wandb.init(project="artifacts-example", job_type="preprocess-data") as run:
processed_data = wandb.Artifact(
"mnist-preprocess", type="dataset",
description="Preprocessed MNIST dataset",
metadata=steps)
# ✔️ どのアーティファクトを使用するかを宣言
raw_data_artifact = run.use_artifact('mnist-raw:latest')
# 📥 必要に応じてアーティファクトをダウンロード
raw_dataset = raw_data_artifact.download()
for split in ["training", "validation", "test"]:
raw_split = read(raw_dataset, split)
processed_dataset = preprocess(raw_split, **steps)
with processed_data.new_file(split + ".pt", mode="wb") as file:
x, y = processed_dataset.tensors
torch.save((x, y), file)
run.log_artifact(processed_data)
def read(data_dir, split):
filename = split + ".pt"
x, y = torch.load(os.path.join(data_dir, filename))
return TensorDataset(x, y)
ここで注意すべきことは、前処理の ステップ
が preprocessed_data
に metadata
とともに保存されているということです。
実験を再現可能にしようとしている場合、多くのメタデータを収集することは良いアイデアです。
また、データセットは “large artifact
” であるにもかかわらず、download
ステップは1秒もかからずに完了します。
詳細は以下のマークダウンセルを展開してください。
steps = {"normalize": True,
"expand_dims": True}
preprocess_and_log(steps)
run.use_artifact
これらのステップはよりシンプルです。消費者はその Artifact
の name
を知っていればいいだけです。その “bit more” とは、欲しい Artifact
の特定のバージョンの alias
です。
デフォルトでは、最後にアップロードされたバージョンが latest
としてタグ付けされます。
それ以外の場合は v0
や v1
などで古いバージョンを選択することもでき、
また best
や jit-script
のような独自の alias を提供することもできます。
Docker Hub タグのように、alias は名前から :
で分離されるため、私たちの求める Artifact
は mnist-raw:latest
になります。
良い実践のルール: alias を短く簡潔に保持します。
カスタム alias
である latest
や best
のような Artifact
を使用して、ある条件を満たすものを指定します。
artifact.download
さて、あなたは download
呼び出しについて心配しているかもしれません。
別のコピーをダウンロードすると、メモリの負担が2倍になるのではないでしょうか?
心配しないでください。実際に何かをダウンロードする前に、
正しいバージョンがローカルにあるかどうか確認します。
これは torrenting や git
を使用したバージョン管理 と同じ技術、ハッシュ化を使用しています。
Artifact
が作成されログされると、
作業ディレクトリ内の artifacts
というフォルダに
各 Artifact
用のサブディレクトリが埋まり始めます。
その内容を !tree artifacts
で確認してください。
アーティファクトページ
Artifact
をログし、使用したので、Run ページの Artifacts タブを見てみましょう。
wandb
出力からの Run ページの URL に移動し、
左サイドバーから “Artifacts” タブを選択します
(これはデータベースのアイコンで、
ホッケークラブを3つ重ねたようなものに見えるアイコンです)。
Input Artifacts テーブルや Output Artifacts テーブルのいずれかの行をクリックし、
その後に表示されるタブ (Overview, Metadata)
でログされた Artifact
についての情報を確認してください。
私たちは特に Graph View を好んでいます。
デフォルトでは、Artifact
の type
と
Run
の job_type
を 2 つの種類のノードとして持ち、
消費と生成を表す矢印を持つグラフを表示します。
モデルをログする
これで Artifact
API の動作がわかりましたが、
ML ワークフローを改善するために Artifact
がどのように役立つかを理解するために、
パイプラインの終わりまでこの例を追ってみましょう。
ここでの最初のセルは、PyTorch で DNN model
を構築します – 本当にシンプルな ConvNet です。
まず、model
を初期化するだけで、トレーニングはしません。
そのため、すべての他の要素を一定に保ちながら複数のトレーニングを繰り返すことができます。
from math import floor
import torch.nn as nn
class ConvNet(nn.Module):
def __init__(self, hidden_layer_sizes=[32, 64],
kernel_sizes=[3],
activation="ReLU",
pool_sizes=[2],
dropout=0.5,
num_classes=num_classes,
input_shape=input_shape):
super(ConvNet, self).__init__()
self.layer1 = nn.Sequential(
nn.Conv2d(in_channels=input_shape[0], out_channels=hidden_layer_sizes[0], kernel_size=kernel_sizes[0]),
getattr(nn, activation)(),
nn.MaxPool2d(kernel_size=pool_sizes[0])
)
self.layer2 = nn.Sequential(
nn.Conv2d(in_channels=hidden_layer_sizes[0], out_channels=hidden_layer_sizes[-1], kernel_size=kernel_sizes[-1]),
getattr(nn, activation)(),
nn.MaxPool2d(kernel_size=pool_sizes[-1])
)
self.layer3 = nn.Sequential(
nn.Flatten(),
nn.Dropout(dropout)
)
fc_input_dims = floor((input_shape[1] - kernel_sizes[0] + 1) / pool_sizes[0]) # layer 1 output size
fc_input_dims = floor((fc_input_dims - kernel_sizes[-1] + 1) / pool_sizes[-1]) # layer 2 output size
fc_input_dims = fc_input_dims*fc_input_dims*hidden_layer_sizes[-1] # layer 3 output size
self.fc = nn.Linear(fc_input_dims, num_classes)
def forward(self, x):
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.fc(x)
return x
ここでは、W&B を使用して run を追跡しており、すべてのハイパーパラメーターを保存するために wandb.config
オブジェクトを使用しています。
その dict
ionary バージョンの config
object は本当に便利な metadata
のピースなので、必ずそれを含めてください。
def build_model_and_log(config):
with wandb.init(project="artifacts-example", job_type="initialize", config=config) as run:
config = wandb.config
model = ConvNet(**config)
model_artifact = wandb.Artifact(
"convnet", type="model",
description="Simple AlexNet style CNN",
metadata=dict(config))
torch.save(model.state_dict(), "initialized_model.pth")
# ➕ アーティファクトにファイルを追加する別の方法
model_artifact.add_file("initialized_model.pth")
wandb.save("initialized_model.pth")
run.log_artifact(model_artifact)
model_config = {"hidden_layer_sizes": [32, 64],
"kernel_sizes": [3],
"activation": "ReLU",
"pool_sizes": [2],
"dropout": 0.5,
"num_classes": 10}
build_model_and_log(model_config)
artifact.add_file
データセットのログ例のように、new_file
を使用してファイルを書き込みアーティファクトに追加する代わりに、
ファイルを書き込んだ後に(ここでは torch.save
により)それをアーティファクトに追加することもできます。
👍のルール: 重複を避けるために、できるだけ new_file
を使用してください。
ログされたモデルアーティファクトを使用する
私たちがデータセットに対して use_artifact
を呼び出すことができたように、別の Run
で initialized_model
を使用することもできます。
この時は、model
を train
しましょう。
詳細については、
instrumenting W&B with PyTorch に関する Colab を参照してください。
import torch.nn.functional as F
def train(model, train_loader, valid_loader, config):
optimizer = getattr(torch.optim, config.optimizer)(model.parameters())
model.train()
example_ct = 0
for epoch in range(config.epochs):
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = F.cross_entropy(output, target)
loss.backward()
optimizer.step()
example_ct += len(data)
if batch_idx % config.batch_log_interval == 0:
print('Train Epoch: {} [{}/{} ({:.0%})]\tLoss: {:.6f}'.format(
epoch, batch_idx * len(data), len(train_loader.dataset),
batch_idx / len(train_loader), loss.item()))
train_log(loss, example_ct, epoch)
# エポック毎に検証セットでモデルを評価
loss, accuracy = test(model, valid_loader)
test_log(loss, accuracy, example_ct, epoch)
def test(model, test_loader):
model.eval()
test_loss = 0
correct = 0
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(device), target.to(device)
output = model(data)
test_loss += F.cross_entropy(output, target, reduction='sum') # バッチ損失を合計
pred = output.argmax(dim=1, keepdim=True) # 最大のlog確率のインデックスを取得
correct += pred.eq(target.view_as(pred)).sum()
test_loss /= len(test_loader.dataset)
accuracy = 100. * correct / len(test_loader.dataset)
return test_loss, accuracy
def train_log(loss, example_ct, epoch):
loss = float(loss)
# 魔法が起こる場所
wandb.log({"epoch": epoch, "train/loss": loss}, step=example_ct)
print(f"Loss after " + str(example_ct).zfill(5) + f" examples: {loss:.3f}")
def test_log(loss, accuracy, example_ct, epoch):
loss = float(loss)
accuracy = float(accuracy)
# 魔法が起こる場所
wandb.log({"epoch": epoch, "validation/loss": loss, "validation/accuracy": accuracy}, step=example_ct)
print(f"Loss/accuracy after " + str(example_ct).zfill(5) + f" examples: {loss:.3f}/{accuracy:.3f}")
今回は 2 つの別々の Artifact
を生成する Run
を実行します。
最初の model
ファイルを train
する Run
が終了し次第、
second
は test_dataset
上の評価性能を evaluate
することにより
trained-model
Artifact
を消費します。
また、ネットワークが最も混乱する32の例、
つまり categorical_crossentropy
が最高の例を引き出します。
これは、データセットとモデルの問題を診断するための良い方法です。
def evaluate(model, test_loader):
"""
## トレーニングされたモデルを評価する
"""
loss, accuracy = test(model, test_loader)
highest_losses, hardest_examples, true_labels, predictions = get_hardest_k_examples(model, test_loader.dataset)
return loss, accuracy, highest_losses, hardest_examples, true_labels, predictions
def get_hardest_k_examples(model, testing_set, k=32):
model.eval()
loader = DataLoader(testing_set, 1, shuffle=False)
# データセット内の各アイテムに対する損失と予測を取得する
losses = None
predictions = None
with torch.no_grad():
for data, target in loader:
data, target = data.to(device), target.to(device)
output = model(data)
loss = F.cross_entropy(output, target)
pred = output.argmax(dim=1, keepdim=True)
if losses is None:
losses = loss.view((1, 1))
predictions = pred
else:
losses = torch.cat((losses, loss.view((1, 1))), 0)
predictions = torch.cat((predictions, pred), 0)
argsort_loss = torch.argsort(losses, dim=0)
highest_k_losses = losses[argsort_loss[-k:]]
hardest_k_examples = testing_set[argsort_loss[-k:]][0]
true_labels = testing_set[argsort_loss[-k:]][1]
predicted_labels = predictions[argsort_loss[-k:]]
return highest_k_losses, hardest_k_examples, true_labels, predicted_labels
これらのログ関数は新しい Artifact
機能を追加することはないため、
コメントしません: これらは単に、
Artifact
を use
し、download
し、
log
しているだけです。
from torch.utils.data import DataLoader
def train_and_log(config):
with wandb.init(project="artifacts-example", job_type="train", config=config) as run:
config = wandb.config
data = run.use_artifact('mnist-preprocess:latest')
data_dir = data.download()
training_dataset = read(data_dir, "training")
validation_dataset = read(data_dir, "validation")
train_loader = DataLoader(training_dataset, batch_size=config.batch_size)
validation_loader = DataLoader(validation_dataset, batch_size=config.batch_size)
model_artifact = run.use_artifact("convnet:latest")
model_dir = model_artifact.download()
model_path = os.path.join(model_dir, "initialized_model.pth")
model_config = model_artifact.metadata
config.update(model_config)
model = ConvNet(**model_config)
model.load_state_dict(torch.load(model_path))
model = model.to(device)
train(model, train_loader, validation_loader, config)
model_artifact = wandb.Artifact(
"trained-model", type="model",
description="Trained NN model",
metadata=dict(model_config))
torch.save(model.state_dict(), "trained_model.pth")
model_artifact.add_file("trained_model.pth")
wandb.save("trained_model.pth")
run.log_artifact(model_artifact)
return model
def evaluate_and_log(config=None):
with wandb.init(project="artifacts-example", job_type="report", config=config) as run:
data = run.use_artifact('mnist-preprocess:latest')
data_dir = data.download()
testing_set = read(data_dir, "test")
test_loader = torch.utils.data.DataLoader(testing_set, batch_size=128, shuffle=False)
model_artifact = run.use_artifact("trained-model:latest")
model_dir = model_artifact.download()
model_path = os.path.join(model_dir, "trained_model.pth")
model_config = model_artifact.metadata
model = ConvNet(**model_config)
model.load_state_dict(torch.load(model_path))
model.to(device)
loss, accuracy, highest_losses, hardest_examples, true_labels, preds = evaluate(model, test_loader)
run.summary.update({"loss": loss, "accuracy": accuracy})
wandb.log({"high-loss-examples":
[wandb.Image(hard_example, caption=str(int(pred)) + "," + str(int(label)))
for hard_example, pred, label in zip(hardest_examples, preds, true_labels)]})
train_config = {"batch_size": 128,
"epochs": 5,
"batch_log_interval": 25,
"optimizer": "Adam"}
model = train_and_log(train_config)
evaluate_and_log()
5 - ワークスペースのプログラム管理
プログラムでワークスペースを作成、管理、カスタマイズすることにより、機械学習実験をより効果的に整理し、可視化できます。
wandb-workspaces
W&B ライブラリを使用して設定を定義し、パネルのレイアウトを設定し、セクションを整理できます。URLでワークスペースを読み込み、変更したり、runをフィルタリングしてグループ化するための式を使用したり、runの外観をカスタマイズすることも可能です。
wandb-workspaces
は、W&Bの Workspaces と Reports をプログラムで作成しカスタマイズするためのPythonライブラリです。
このチュートリアルでは、wandb-workspaces
を使って設定を定義し、パネルレイアウトを設定し、セクションを整理することでワークスペースを作成しカスタマイズする方法を学びます。
このノートブックの使い方
- 各セルを一度に1つずつ実行してください。
- セルの実行後に表示されるURLをコピーして貼り付けることで、ワークスペースへの変更を確認できます。
プログラムによるワークスペースとの対話は、現在
Saved workspaces viewsでサポートされています。Saved workspaces viewsは、ワークスペースの協働スナップショットです。同じチームのメンバーであれば、誰でも保存されたワークスペースビューを表示、編集、保存できます。
1. 依存関係のインストールとインポート
# 依存関係のインストール
!pip install wandb wandb-workspaces rich
# 依存関係のインポート
import os
import wandb
import wandb_workspaces.workspaces as ws
import wandb_workspaces.reports.v2 as wr # パネルを追加するためのReports APIを使用します
# 出力フォーマットを改善
%load_ext rich
2. 新しいプロジェクトとワークスペースの作成
このチュートリアルでは、新しいプロジェクトを作成し、wandb_workspaces
APIを使用して実験を行います。
注:ユニークな Saved view
URLを使用して既存のワークスペースを読み込むことができます。この方法は次のコードブロックで説明しています。
# Weights & Biases を初期化してログイン
wandb.login()
# 新しいプロジェクトとサンプルデータをログに記録するための関数
def create_project_and_log_data():
project = "workspace-api-example" # デフォルトのプロジェクト名
# サンプルデータをログに記録するためにrunを初期化
with wandb.init(project=project, name="sample_run") as run:
for step in range(100):
wandb.log({
"Step": step,
"val_loss": 1.0 / (step + 1),
"val_accuracy": step / 100.0,
"train_loss": 1.0 / (step + 2),
"train_accuracy": step / 110.0,
"f1_score": step / 100.0,
"recall": step / 120.0,
})
return project
# 新しいプロジェクトを作成してデータをログ
project = create_project_and_log_data()
entity = wandb.Api().default_entity
(オプション)既存のプロジェクトとワークスペースを読み込む
新しいプロジェクトを作成する代わりに、既存のプロジェクトとワークスペースを読み込むことができます。そのためには、ユニークなワークスペースURLを見つけて、それを文字列として ws.Workspace.from_url
に渡します。URLの形式は https://wandb.ai/[SOURCE-ENTITY]/[SOURCE-USER]?nw=abc
です。
例:
wandb.login()
workspace = ws.Workspace.from_url("https://wandb.ai/[SOURCE-ENTITY]/[SOURCE-USER]?nw=abc").
workspace = ws.Workspace(
entity="NEW-ENTITY",
project=NEW-PROJECT,
name="NEW-SAVED-VIEW-NAME"
)
3. プログラムによるワークスペースの例
以下に、プログラムによるワークスペースの特徴を使用するための例を示します:
# ワークスペース、セクション、およびパネルに利用可能なすべての設定を確認します。
all_settings_objects = [x for x in dir(ws) if isinstance(getattr(ws, x), type)]
all_settings_objects
saved view
を使ってワークスペースを作成
この例では、新しいワークスペースを作成し、それにセクションとパネルを配置する方法を示します。ワークスペースは通常のPythonオブジェクトのように編集できますので、柔軟性と使いやすさを提供します。
def sample_workspace_saved_example(entity: str, project: str) -> str:
workspace: ws.Workspace = ws.Workspace(
name="Example W&B Workspace",
entity=entity,
project=project,
sections=[
ws.Section(
name="Validation Metrics",
panels=[
wr.LinePlot(x="Step", y=["val_loss"]),
wr.BarPlot(metrics=["val_accuracy"]),
wr.ScalarChart(metric="f1_score", groupby_aggfunc="mean"),
],
is_open=True,
),
],
)
workspace.save()
print("Sample Workspace saved.")
return workspace.url
workspace_url: str = sample_workspace_saved_example(entity, project)
URLからワークスペースを読み込む
元のセットアップに影響を与えずにワークスペースを複製してカスタマイズします。これを行うためには、既存のワークスペースを読み込み、新しいビューとして保存します。
def save_new_workspace_view_example(url: str) -> None:
workspace: ws.Workspace = ws.Workspace.from_url(url)
workspace.name = "Updated Workspace Name"
workspace.save_as_new_view()
print(f"Workspace saved as new view.")
save_new_workspace_view_example(workspace_url)
今、あなたのワークスペースの名前は “Updated Workspace Name” です。
基本設定
次のコードは、ワークスペースを作成し、パネルを追加して、ワークスペース、個々のセクション、およびパネルの設定を構成する方法を示します。
# カスタム設定でワークスペースを作成して構成するための関数
def custom_settings_example(entity: str, project: str) -> None:
workspace: ws.Workspace = ws.Workspace(name="An example workspace", entity=entity, project=project)
workspace.sections = [
ws.Section(
name="Validation",
panels=[
wr.LinePlot(x="Step", y=["val_loss"]),
wr.LinePlot(x="Step", y=["val_accuracy"]),
wr.ScalarChart(metric="f1_score", groupby_aggfunc="mean"),
wr.ScalarChart(metric="recall", groupby_aggfunc="mean"),
],
is_open=True,
),
ws.Section(
name="Training",
panels=[
wr.LinePlot(x="Step", y=["train_loss"]),
wr.LinePlot(x="Step", y=["train_accuracy"]),
],
is_open=False,
),
]
workspace.settings = ws.WorkspaceSettings(
x_axis="Step",
x_min=0,
x_max=75,
smoothing_type="gaussian",
smoothing_weight=20.0,
ignore_outliers=False,
remove_legends_from_panels=False,
tooltip_number_of_runs="default",
tooltip_color_run_names=True,
max_runs=20,
point_visualization_method="bucketing",
auto_expand_panel_search_results=False,
)
section = workspace.sections[0]
section.panel_settings = ws.SectionPanelSettings(
x_min=25,
x_max=50,
smoothing_type="none",
)
panel = section.panels[0]
panel.title = "Validation Loss Custom Title"
panel.title_x = "Custom x-axis title"
workspace.save()
print("Workspace with custom settings saved.")
# ワークスペースを作成して設定する関数を実行
custom_settings_example(entity, project)
今、あなたは “An example workspace” という名前の別の保存済みビューを見ています。
Run のカスタマイズ
次のコードセルでは、runをフィルタリングし、色を変え、グループ化し、プログラムで並べ替える方法を示します。
各例では、一般的なワークフローとして、適切なパラメータに引数として指定されたカスタマイズ情報を ws.RunsetSettings
に渡します。
Run のフィルタリング
Pythonの式と wandb.log
でログしたメトリクス、または Created Timestamp のようにrunの一部として自動的にログされるメトリクスを使って、フィルターを作成できます。また、W&B App UI での表示方法、たとえば Name、Tags、または ID に基づいてフィルターを参照することもできます。
次の例では、検証損失のサマリ、検証精度のサマリ、および指定された正規表現に基づいてrunをフィルタリングする方法を示します。
def advanced_filter_example(entity: str, project: str) -> None:
# プロジェクト内のすべてのrunを取得
runs: list = wandb.Api().runs(f"{entity}/{project}")
# 複数のフィルターを適用:val_loss < 0.1、val_accuracy > 0.8、およびrun名が正規表現に一致
workspace: ws.Workspace = ws.Workspace(
name="Advanced Filtered Workspace with Regex",
entity=entity,
project=project,
sections=[
ws.Section(
name="Advanced Filtered Section",
panels=[
wr.LinePlot(x="Step", y=["val_loss"]),
wr.LinePlot(x="Step", y=["val_accuracy"]),
],
is_open=True,
),
],
runset_settings=ws.RunsetSettings(
filters=[
(ws.Summary("val_loss") < 0.1), # 'val_loss' のサマリでrunをフィルタ
(ws.Summary("val_accuracy") > 0.8), # 'val_accuracy' のサマリでrunをフィルタ
(ws.Metric("ID").isin([run.id for run in wandb.Api().runs(f"{entity}/{project}")])),
],
regex_query=True,
)
)
# run名が 's' で始まるものと一致するように正規表現検索を追加します
workspace.runset_settings.query = "^s"
workspace.runset_settings.regex_query = True
workspace.save()
print("Workspace with advanced filters and regex search saved.")
advanced_filter_example(entity, project)
フィルター式のリストを渡すと、ブールの「AND」論理が適用されることに注意してください。
Run の色を変更
この例では、ワークスペース内のrunの色を変更する方法を示します。
def run_color_example(entity: str, project: str) -> None:
# プロジェクト内のすべてのrunを取得
runs: list = wandb.Api().runs(f"{entity}/{project}")
# Runの色を動的に割り当てる
run_colors: list = ['purple', 'orange', 'teal', 'magenta']
run_settings: dict = {}
for i, run in enumerate(runs):
run_settings[run.id] = ws.RunSettings(color=run_colors[i % len(run_colors)])
workspace: ws.Workspace = ws.Workspace(
name="Run Colors Workspace",
entity=entity,
project=project,
sections=[
ws.Section(
name="Run Colors Section",
panels=[
wr.LinePlot(x="Step", y=["val_loss"]),
wr.LinePlot(x="Step", y=["val_accuracy"]),
],
is_open=True,
),
],
runset_settings=ws.RunsetSettings(
run_settings=run_settings
)
)
workspace.save()
print("Workspace with run colors saved.")
run_color_example(entity, project)
Run のグループ化
この例では、特定のメトリクスでrunをグループ化する方法を示します。
def grouping_example(entity: str, project: str) -> None:
workspace: ws.Workspace = ws.Workspace(
name="Grouped Runs Workspace",
entity=entity,
project=project,
sections=[
ws.Section(
name="Grouped Runs",
panels=[
wr.LinePlot(x="Step", y=["val_loss"]),
wr.LinePlot(x="Step", y=["val_accuracy"]),
],
is_open=True,
),
],
runset_settings=ws.RunsetSettings(
groupby=[ws.Metric("Name")]
)
)
workspace.save()
print("Workspace with grouped runs saved.")
grouping_example(entity, project)
Run をソート
この例では、検証損失のサマリに基づいてrunをソートする方法を示します。
def sorting_example(entity: str, project: str) -> None:
workspace: ws.Workspace = ws.Workspace(
name="Sorted Runs Workspace",
entity=entity,
project=project,
sections=[
ws.Section(
name="Sorted Runs",
panels=[
wr.LinePlot(x="Step", y=["val_loss"]),
wr.LinePlot(x="Step", y=["val_accuracy"]),
],
is_open=True,
),
],
runset_settings=ws.RunsetSettings(
order=[ws.Ordering(ws.Summary("val_loss"))] #val_lossサマリを使用して注文
)
)
workspace.save()
print("Workspace with sorted runs saved.")
sorting_example(entity, project)
4. まとめ: 包括的な例
この例では、包括的なワークスペースを作成し、その設定を構成し、セクションにパネルを追加する方法を示します。
def full_end_to_end_example(entity: str, project: str) -> None:
# プロジェクト内のすべてのrunを取得
runs: list = wandb.Api().runs(f"{entity}/{project}")
# Runの色を動的に割り当ててrunの設定を作成する
run_colors: list = ['red', 'blue', 'green', 'orange', 'purple', 'teal', 'magenta', '#FAC13C']
run_settings: dict = {}
for i, run in enumerate(runs):
run_settings[run.id] = ws.RunSettings(color=run_colors[i % len(run_colors)], disabled=False)
workspace: ws.Workspace = ws.Workspace(
name="My Workspace Template",
entity=entity,
project=project,
sections=[
ws.Section(
name="Main Metrics",
panels=[
wr.LinePlot(x="Step", y=["val_loss"]),
wr.LinePlot(x="Step", y=["val_accuracy"]),
wr.ScalarChart(metric="f1_score", groupby_aggfunc="mean"),
],
is_open=True,
),
ws.Section(
name="Additional Metrics",
panels=[
wr.ScalarChart(metric="precision", groupby_aggfunc="mean"),
wr.ScalarChart(metric="recall", groupby_aggfunc="mean"),
],
),
],
settings=ws.WorkspaceSettings(
x_axis="Step",
x_min=0,
x_max=100,
smoothing_type="none",
smoothing_weight=0,
ignore_outliers=False,
remove_legends_from_panels=False,
tooltip_number_of_runs="default",
tooltip_color_run_names=True,
max_runs=20,
point_visualization_method="bucketing",
auto_expand_panel_search_results=False,
),
runset_settings=ws.RunsetSettings(
query="",
regex_query=False,
filters=[
ws.Summary("val_loss") < 1,
ws.Metric("Name") == "sample_run",
],
groupby=[ws.Metric("Name")],
order=[ws.Ordering(ws.Summary("Step"), ascending=True)],
run_settings=run_settings
)
)
workspace.save()
print("Workspace created and saved.")
full_end_to_end_example(entity, project)
6 - Weave と Models インテグレーション デモ
このノートブックは、W&B Weave を W&B Models と一緒に使用する方法を示しています。具体的には、2つの異なるチームを検討しています。
- モデルチーム: モデル作成チームは、新しいチャットモデル (Llama 3.2) をファインチューニングし、W&B Models を使用してそれをレジストリに保存します。
- アプリチーム: アプリ開発チームはチャットモデルを取得して、新しいRAGチャットボットを作成および評価するために W&B Weave を使用します。
W&B Models と W&B Weave のパブリックワークスペースを こちら から見つけることができます。
ワークフローは次のステップをカバーしています:
- RAGアプリのコードを W&B Weave で計測する
- LLM(Llama 3.2 など、他のLLMに置き換えることも可能)をファインチューニングし、W&B Models でトラッキングする
- ファインチューニングされたモデルを W&B Registry にログする
- 新しいファインチューニングされたモデルを使用してRAGアプリを実装し、W&B Weave でアプリを評価する
- 結果に満足したら、更新されたRAGアプリの参照を W&B Registry に保存する
注意:
以下で参照される RagModel
は、完全なRAGアプリと考えられるトップレベルの weave.Model
です。これは ChatModel
、ベクトルデータベース、プロンプトを含みます。ChatModel
もまた別の weave.Model
であり、W&B Registry からアーティファクトをダウンロードする機能を持つコードを含んでおり、RagModel
の一部として任意の他のチャットモデルをサポートするために変更可能です。詳細は Weaveでの完全なモデル を参照してください。
1. セットアップ
まず、weave
と wandb
をインストールし、APIキーでログインします。APIキーは https://wandb.ai/settings で作成し、表示できます。
import wandb
import weave
import pandas as pd
PROJECT = "weave-cookboook-demo"
ENTITY = "wandb-smle"
wandb.login()
weave.init(ENTITY + "/" + PROJECT)
2. アーティファクトに基づく ChatModel
を作成する
Registry からファインチューニングされたチャットモデルを取得し、weave.Model
を作成して次のステップでRagModel
に直接プラグインします。既存の ChatModel と同じパラメータを取りますが、init
と predict
は変更されます。
pip install unsloth
pip uninstall unsloth -y && pip install --upgrade --no-cache-dir "unsloth[colab-new] @ git+https://github.com/unslothai/unsloth.git"
モデルチームは unsloth
ライブラリを使用して異なる Llama-3.2 モデルをファインチューニングし、より高速にしました。したがって、特殊な unsloth.FastLanguageModel
または peft.AutoPeftModelForCausalLM
モデルとアダプターを使用してモデルをダウンロードする必要があります。Registry の「使用」タブからロードコードをコピーして model_post_init
に貼り付けます。
import weave
from pydantic import PrivateAttr
from typing import Any, List, Dict, Optional
from unsloth import FastLanguageModel
import torch
class UnslothLoRAChatModel(weave.Model):
"""
モデル名だけでなく、より多くのパラメータを保存し、バージョン管理するための追加の ChatModel クラスを定義します。
これにより、特定のパラメータでファインチューニングが可能になります。
"""
chat_model: str
cm_temperature: float
cm_max_new_tokens: int
cm_quantize: bool
inference_batch_size: int
dtype: Any
device: str
_model: Any = PrivateAttr()
_tokenizer: Any = PrivateAttr()
def model_post_init(self, __context):
# Registry の「使用」タブからこれを貼り付けます
run = wandb.init(project=PROJECT, job_type="model_download")
artifact = run.use_artifact(f"{self.chat_model}")
model_path = artifact.download()
# unsloth バージョン(ネイティブで2倍の速度で推論)
self._model, self._tokenizer = FastLanguageModel.from_pretrained(
model_name=model_path,
max_seq_length=self.cm_max_new_tokens,
dtype=self.dtype,
load_in_4bit=self.cm_quantize,
)
FastLanguageModel.for_inference(self._model)
@weave.op()
async def predict(self, query: List[str]) -> dict:
# 生成プロンプトを追加 = true - 生成するために必須
input_ids = self._tokenizer.apply_chat_template(
query,
tokenize=True,
add_generation_prompt=True,
return_tensors="pt",
).to("cuda")
output_ids = self._model.generate(
input_ids=input_ids,
max_new_tokens=64,
use_cache=True,
temperature=1.5,
min_p=0.1,
)
decoded_outputs = self._tokenizer.batch_decode(
output_ids[0][input_ids.shape[1] :], skip_special_tokens=True
)
return "".join(decoded_outputs).strip()
次に、Registry から特定のリンクで新しいモデルを作成します:
MODEL_REG_URL = "wandb32/wandb-registry-RAG Chat Models/Finetuned Llama-3.2:v3"
max_seq_length = 2048
dtype = None
load_in_4bit = True
new_chat_model = UnslothLoRAChatModel(
name="UnslothLoRAChatModelRag",
chat_model=MODEL_REG_URL,
cm_temperature=1.0,
cm_max_new_tokens=max_seq_length,
cm_quantize=load_in_4bit,
inference_batch_size=max_seq_length,
dtype=dtype,
device="auto",
)
そして最後に非同期で評価を実行します:
await new_chat_model.predict(
[{"role": "user", "content": "What is the capital of Germany?"}]
)
3. 新しい ChatModel
バージョンを RagModel
に統合する
ファインチューニングされたチャットモデルを使用してRAGアプリを構築することは、特に会話型AIシステムの性能と多様性を向上させる上でいくつかの利点を提供します。
現在のWeaveプロジェクトから RagModel
を取得し、新しい ChatModel
に交換します。他のコンポーネント(VDB、プロンプトなど)を変更または再作成する必要はありません!
pip install litellm faiss-gpu
RagModel = weave.ref(
"weave:///wandb-smle/weave-cookboook-demo/object/RagModel:cqRaGKcxutBWXyM0fCGTR1Yk2mISLsNari4wlGTwERo"
).get()
# MAGIC: chat_model を交換して新しいバージョンを公開する(他のRAGコンポーネントについて心配する必要はありません)
RagModel.chat_model = new_chat_model
# 予測中に参照されるように新しいバージョンを最初に公開します
PUB_REFERENCE = weave.publish(RagModel, "RagModel")
await RagModel.predict("When was the first conference on climate change?")
4. 既存のモデルの run に接続する新しい weave.Evaluation
を実行する
最後に、既存の weave.Evaluation
上で新しい RagModel
を評価します。統合をできるだけシンプルにするために、以下の変更を含みます。
モデルの観点から:
- Registry からモデルを取得すると新しい
wandb.run
が作成され、チャットモデルのE2Eリネージの一部になります
- 現在の評価IDを持つTrace IDを実行設定に追加し、モデルチームがリンクをクリックして対応する Weave ページに移動できるようにします
Weave の観点から:
- アーティファクト / レジストリリンクを
ChatModel
(つまり RagModel
)への入力として保存します
weave.attributes
を使用して run.id をトレースの追加列として保存します
# MAGIC: 評価データセットや評価スコアラーと一緒に評価を取得して使用します
WEAVE_EVAL = "weave:///wandb-smle/weave-cookboook-demo/object/climate_rag_eval:ntRX6qn3Tx6w3UEVZXdhIh1BWGh7uXcQpOQnIuvnSgo"
climate_rag_eval = weave.ref(WEAVE_EVAL).get()
with weave.attributes({"wandb-run-id": wandb.run.id}):
# 結果およびそのコールを取得するために.evaluate.call 属性を使用して評価トレースを Models に保存します
summary, call = await climate_rag_eval.evaluate.call(climate_rag_eval, ` RagModel `)
5. レジストリに新しいRAGモデルを保存する
新しいRAGモデルを効果的に共有するために、参照アーティファクトとしてそれをレジストリにプッシュし、weaveバージョンをエイリアスとして追加します。
MODELS_OBJECT_VERSION = PUB_REFERENCE.digest # weave オブジェクトバージョン
MODELS_OBJECT_NAME = PUB_REFERENCE.name # weave オブジェクト名
models_url = f"https://wandb.ai/{ENTITY}/{PROJECT}/weave/objects/{MODELS_OBJECT_NAME}/versions/{MODELS_OBJECT_VERSION}"
models_link = (
f"weave:///{ENTITY}/{PROJECT}/object/{MODELS_OBJECT_NAME}:{MODELS_OBJECT_VERSION}"
)
with wandb.init(project=PROJECT, entity=ENTITY) as run:
# 新しいアーティファクトを作成
artifact_model = wandb.Artifact(
name="RagModel",
type="model",
description="Models Link from RagModel in Weave",
metadata={"url": models_url},
)
artifact_model.add_reference(models_link, name="model", checksum=False)
# 新しいアーティファクトをログする
run.log_artifact(artifact_model, aliases=[MODELS_OBJECT_VERSION])
# レジストリにリンクする
run.link_artifact(
artifact_model, target_path="wandb32/wandb-registry-RAG Models/RAG Model"
)
7 - インテグレーションチュートリアル
7.1 - PyTorch
Weights & Biases を使用して、機械学習の実験管理、データセットのバージョン管理、およびプロジェクトのコラボレーションを行います。
このノートブックでカバーする内容
PyTorch のコードに Weights & Biases を統合して、実験管理をパイプラインに追加する方法を示します。
# ライブラリをインポート
import wandb
# 新しい実験を開始
wandb.init(project="new-sota-model")
# ハイパーパラメータの辞書を config でキャプチャ
wandb.config = {"learning_rate": 0.001, "epochs": 100, "batch_size": 128}
# モデルとデータを設定
model, dataloader = get_model(), get_data()
# オプション: 勾配を追跡
wandb.watch(model)
for batch in dataloader:
metrics = model.training_step()
# トレーニングループ内でメトリクスをログしてモデルの性能を視覚化
wandb.log(metrics)
# オプション: モデルを最後に保存
model.to_onnx()
wandb.save("model.onnx")
ビデオチュートリアルに沿って進めましょう。
注意: Step で始まるセクションは、既存のパイプラインに W&B を統合するために必要な部分です。それ以外はデータを読み込み、モデルを定義するだけです。
インストール、インポート、ログイン
import os
import random
import numpy as np
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
from tqdm.auto import tqdm
# 決定論的な振る舞いを保証
torch.backends.cudnn.deterministic = True
random.seed(hash("setting random seeds") % 2**32 - 1)
np.random.seed(hash("improves reproducibility") % 2**32 - 1)
torch.manual_seed(hash("by removing stochasticity") % 2**32 - 1)
torch.cuda.manual_seed_all(hash("so runs are repeatable") % 2**32 - 1)
# デバイスの設定
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# MNIST ミラーのリストから遅いミラーを削除
torchvision.datasets.MNIST.mirrors = [mirror for mirror in torchvision.datasets.MNIST.mirrors
if not mirror.startswith("http://yann.lecun.com")]
ステップ 0: W&B をインストール
まずは、このライブラリを入手する必要があります。
wandb
は pip
を使用して簡単にインストールできます。
!pip install wandb onnx -Uq
ステップ 1: W&B をインポートしてログイン
データをウェブサービスにログするためには、
ログインが必要です。
W&B を初めて使用する場合は、
表示されるリンクから無料アカウントに登録する必要があります。
import wandb
wandb.login()
実験とパイプラインを定義
wandb.init
でメタデータとハイパーパラメータを追跡
プログラム上、最初に行うのは実験を定義することです。
ハイパーパラメータとは何か?この run に関連するメタデータは何か?
この情報を config
辞書(または類似オブジェクト)に保存し、
必要に応じてアクセスするのは非常に一般的なワークフローです。
この例では、数少ないハイパーパラメータのみを変動させ、
残りを手動でコーディングします。
しかし、あなたのモデルのどの部分も config
の一部にすることができます。
また、いくつかのメタデータも含めます:私たちは MNIST データセットと畳み込み
ニューラルネットワークを使用しているので、
同じプロジェクトで、たとえば全結合ニューラルネットワークで CIFAR を扱う場合、
この情報が run を分ける助けになります。
config = dict(
epochs=5,
classes=10,
kernels=[16, 32],
batch_size=128,
learning_rate=0.005,
dataset="MNIST",
architecture="CNN")
さて、全体のパイプラインを定義しましょう。
これは非常に典型的なモデルトレーニングの手順です:
- まず、モデル、関連するデータ、オプティマイザーを
make
し、
- それに基づいてモデルを
train
し、最後に
- トレーニングがどのように行われたかを確認するために
test
します。
これらの関数を以下で実装します。
def model_pipeline(hyperparameters):
# wandb に開始を伝えます
with wandb.init(project="pytorch-demo", config=hyperparameters):
# すべての HP を wandb.config 経由でアクセスし、ログが実行と一致するようにします。
config = wandb.config
# モデル、データ、最適化問題を作成します
model, train_loader, test_loader, criterion, optimizer = make(config)
print(model)
# それらを使用してモデルをトレーニングします
train(model, train_loader, criterion, optimizer, config)
# 最終的なパフォーマンスをテストします
test(model, test_loader)
return model
ここでの標準パイプラインとの唯一の違いは、
すべてが wandb.init
のコンテキスト内で行われる点です。
この関数を呼び出すことで、
コードとサーバーとの間に通信のラインが確立されます。
config
辞書を wandb.init
に渡すことで、
すべての情報が即座にログされるため、
実験に使用するハイパーパラメータの値を常に把握できます。
選んでログした値が常にモデルに使用されることを保証するために、
wandb.config
のオブジェクトコピーを使用することをお勧めします。
以下で make
の定義をチェックして、いくつかの例を見てください。
サイドノート: コードを別々のプロセスで実行するようにします、
私たちの側で問題があっても
(たとえば、巨大な海の怪物がデータセンターを攻撃した場合でも)
コードはクラッシュしません。
問題が解決したら、クラーケンが深海に戻る頃に、
wandb sync
でデータをログできます。
def make(config):
# データを作成
train, test = get_data(train=True), get_data(train=False)
train_loader = make_loader(train, batch_size=config.batch_size)
test_loader = make_loader(test, batch_size=config.batch_size)
# モデルを作成
model = ConvNet(config.kernels, config.classes).to(device)
# 損失とオプティマイザーを作成
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(
model.parameters(), lr=config.learning_rate)
return model, train_loader, test_loader, criterion, optimizer
データのロードとモデルを定義
次に、データがどのようにロードされるか、およびモデルの見た目を指定する必要があります。
これは非常に重要な部分ですが、wandb
なしで通常行われるのと何も違いませんので、
ここで詳しくは説明しません。
def get_data(slice=5, train=True):
full_dataset = torchvision.datasets.MNIST(root=".",
train=train,
transform=transforms.ToTensor(),
download=True)
# [::slice] でスライスするのと同等
sub_dataset = torch.utils.data.Subset(
full_dataset, indices=range(0, len(full_dataset), slice))
return sub_dataset
def make_loader(dataset, batch_size):
loader = torch.utils.data.DataLoader(dataset=dataset,
batch_size=batch_size,
shuffle=True,
pin_memory=True, num_workers=2)
return loader
モデルの定義は通常楽しい部分です。
しかし wandb
でも何も変わらないので、
標準的な ConvNet アーキテクチャーにとどまりましょう。
この部分をいじっていくつかの実験を試みるのを恐れないでください –
あなたの結果はすべて wandb.ai にログされます。
# 従来の畳み込みニューラルネットワーク
class ConvNet(nn.Module):
def __init__(self, kernels, classes=10):
super(ConvNet, self).__init__()
self.layer1 = nn.Sequential(
nn.Conv2d(1, kernels[0], kernel_size=5, stride=1, padding=2),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2))
self.layer2 = nn.Sequential(
nn.Conv2d(16, kernels[1], kernel_size=5, stride=1, padding=2),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2))
self.fc = nn.Linear(7 * 7 * kernels[-1], classes)
def forward(self, x):
out = self.layer1(x)
out = self.layer2(out)
out = out.reshape(out.size(0), -1)
out = self.fc(out)
return out
トレーニングロジックを定義
model_pipeline
を進めると、train
の指定を行う時がきます。
ここでは wandb
の 2 つの関数、watch
と log
が活躍します。
wandb.watch
で勾配を追跡し、他のすべてを wandb.log
で管理
wandb.watch
はトレーニングの log_freq
ステップごとに
モデルの勾配とパラメータをログします。
トレーニングを開始する前にこれを呼び出すだけで済みます。
トレーニングコードの残りは前と変わらず、
エポックとバッチを繰り返し、
forward pass と backward pass を実行し、
optimizer
を適用します。
def train(model, loader, criterion, optimizer, config):
# モデルを追跡している wandb に情報を伝えます: 勾配、重み、その他。
wandb.watch(model, criterion, log="all", log_freq=10)
# トレーニングを実行し wandb で追跡
total_batches = len(loader) * config.epochs
example_ct = 0 # これまでに見た例の数
batch_ct = 0
for epoch in tqdm(range(config.epochs)):
for _, (images, labels) in enumerate(loader):
loss = train_batch(images, labels, model, optimizer, criterion)
example_ct += len(images)
batch_ct += 1
# 25 バッチおきにメトリクスを報告
if ((batch_ct + 1) % 25) == 0:
train_log(loss, example_ct, epoch)
def train_batch(images, labels, model, optimizer, criterion):
images, labels = images.to(device), labels.to(device)
# Forward pass ➡
outputs = model(images)
loss = criterion(outputs, labels)
# Backward pass ⬅
optimizer.zero_grad()
loss.backward()
# オプティマイザーでステップ
optimizer.step()
return loss
唯一の違いは、記録のコードです:
以前ターミナルにメトリクスを報告していたところを、
wandb.log
に同じ情報を渡します。
wandb.log
は文字列をキーとする辞書を期待します。
これらの文字列は、ログされるオブジェクトを識別し、値を構成します。
また、オプションでトレーニングのどの step
にいるかをログできます。
サイドノート: 私はモデルが見た例の数を使用するのが好きです、
これはバッチサイズを超えて比較しやすくなるためですが、
生のステップやバッチカウントを使用することもできます。
長いトレーニングランの場合、epoch
単位でログすることも理にかなっているかもしれません。
def train_log(loss, example_ct, epoch):
# マジックが起こるところ
wandb.log({"epoch": epoch, "loss": loss}, step=example_ct)
print(f"Loss after {str(example_ct).zfill(5)} examples: {loss:.3f}")
テストロジックを定義
モデルのトレーニングが完了したら、テストしてみましょう:
本番環境から新たなデータに対して実行したり、
手作業で準備した例に適用したりします。
(オプション) wandb.save
を呼び出す
この時点で、モデルのアーキテクチャと最終パラメータをディスクに保存するのも良い時です。
最大の互換性を考慮して、モデルを
Open Neural Network eXchange (ONNX) フォーマット でエクスポートします。
そのファイル名を wandb.save
に渡すことで、モデルのパラメータが W&B のサーバーに保存されます:
もはやどの .h5
や .pb
がどのトレーニングrunに対応しているのかを見失うことはありません。
モデルの保存、バージョン管理、配布のための、高度な wandb
機能については、
Artifacts ツールをご覧ください。
def test(model, test_loader):
model.eval()
# いくつかのテスト例にモデルを実行
with torch.no_grad():
correct, total = 0, 0
for images, labels in test_loader:
images, labels = images.to(device), labels.to(device)
outputs = model(images)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
print(f"Accuracy of the model on the {total} " +
f"test images: {correct / total:%}")
wandb.log({"test_accuracy": correct / total})
# 交換可能な ONNX フォーマットでモデルを保存
torch.onnx.export(model, images, "model.onnx")
wandb.save("model.onnx")
トレーニングを実行し、wandb.ai でライブメトリクスを確認
さて、全体のパイプラインを定義し、数行の W&B コードを追加したら、
完全に追跡された実験を実行する準備が整いました。
いくつかのリンクを報告します:
ドキュメンテーション、
プロジェクトページ、これにはプロジェクトのすべての run が整理されています。
この run の結果が保存されるランページ。
ランページに移動して、これらのタブを確認:
- Charts、トレーニング中にモデルの勾配、パラメーター値、損失がログされます
- System、ここにはディスク I/O 応答率、CPU および GPU メトリクス(温度上昇も監視)、その他のさまざまなシステムメトリクスが含まれます
- Logs、トレーニング中に標準出力にプッシュされたもののコピーがあります
- Files、トレーニングが完了すると、
model.onnx
をクリックして Netron モデルビューア でネットワークを表示できます。
with wandb.init
ブロック終了時にランが終了すると、
結果の要約もセルの出力で印刷されます。
# パイプラインでモデルを構築、トレーニング、分析
model = model_pipeline(config)
ハイパーパラメータをスイープでテスト
この例では、ハイパーパラメータのセットを 1 つしか見ていません。
しかし、ほとんどの ML ワークフローの重要な部分は、
多くのハイパーパラメータを反復することです。
Weights & Biases Sweeps を使用してハイパーパラメータのテストを自動化し、モデルと最適化戦略の空間を探索できます。
Weights & Biases を使用したハイパーパラメータ探索です。非常に簡単です。たった3つの簡単なステップがあります:
-
スイープを定義する: これは、検索するパラメータ、検索戦略、最適化メトリクスなどを指定する辞書またはYAML ファイルを作成することで行います。
-
スイープを初期化する:
sweep_id = wandb.sweep(sweep_config)
-
スイープエージェントを実行する:
wandb.agent(sweep_id, function=train)
これだけでハイパーパラメータ探索が実行できます。
Example Gallery
W&B で追跡および視覚化されたプロジェクトの例をギャラリー →でご覧ください。
Advanced Setup
- 環境変数: 環境変数に API キーを設定して、管理されたクラスターでトレーニングを実行できます。
- オフラインモード:
dryrun
モードを使用してオフラインでトレーニングし、後で結果を同期。
- オンプレ: プライベートクラウドまたはエアギャップされたサーバーに W&B をインストールします。学術機関から企業チームまで、すべての人に向けたローカルインストールを提供。
- スイープ: ハイパーパラメータ探索を迅速に設定できる、チューニングのための軽量ツール。
7.2 - PyTorch Lightning
私たちは PyTorch Lightning を使用して画像分類パイプラインを構築します。この
スタイルガイドに従って、コードの読みやすさと再現性を高めます。このすばらしい説明は
こちらで利用可能です。
PyTorch Lightning と W&B のセットアップ
このチュートリアルでは、PyTorch Lightning と Weights & Biases が必要です。
pip install lightning -q
pip install wandb -qU
import lightning.pytorch as pl
# あなたのお気に入りの機械学習トラッキングツール
from lightning.pytorch.loggers import WandbLogger
import torch
from torch import nn
from torch.nn import functional as F
from torch.utils.data import random_split, DataLoader
from torchmetrics import Accuracy
from torchvision import transforms
from torchvision.datasets import CIFAR10
import wandb
これで、wandb アカウントにログインする必要があります。
wandb.login()
DataModule - 私たちが求めるデータパイプライン
DataModules は、データに関連するフックを LightningModule から分離する方法であり、データセットに依存しないモデルを開発できます。
これは、データパイプラインを1つの共有可能で再利用可能なクラスにまとめます。データモジュールは PyTorch のデータプロセッシングに関わる5つのステップをカプセル化します:
- ダウンロード / トークン化 / プロセス。
- クリーンし、(場合によっては)ディスクに保存。
- データセット内にロード。
- 変換を適用(回転、トークン化など)。
- DataLoader 内にラップ。
データモジュールについて詳しくはこちらをご覧ください。Cifar-10 データセット用のデータモジュールを構築しましょう。
class CIFAR10DataModule(pl.LightningDataModule):
def __init__(self, batch_size, data_dir: str = './'):
super().__init__()
self.data_dir = data_dir
self.batch_size = batch_size
self.transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])
self.num_classes = 10
def prepare_data(self):
CIFAR10(self.data_dir, train=True, download=True)
CIFAR10(self.data_dir, train=False, download=True)
def setup(self, stage=None):
# データローダーで使用する train/val データセットを割り当て
if stage == 'fit' or stage is None:
cifar_full = CIFAR10(self.data_dir, train=True, transform=self.transform)
self.cifar_train, self.cifar_val = random_split(cifar_full, [45000, 5000])
# データローダーで使用するテストデータセットを割り当て
if stage == 'test' or stage is None:
self.cifar_test = CIFAR10(self.data_dir, train=False, transform=self.transform)
def train_dataloader(self):
return DataLoader(self.cifar_train, batch_size=self.batch_size, shuffle=True)
def val_dataloader(self):
return DataLoader(self.cifar_val, batch_size=self.batch_size)
def test_dataloader(self):
return DataLoader(self.cifar_test, batch_size=self.batch_size)
コールバック
コールバックは、プロジェクト間で再利用可能な自己完結型プログラムです。PyTorch Lightning は、定期的に使用されるいくつかの組み込みコールバックを提供しています。
PyTorch Lightning のコールバックについて詳しくはこちらをご覧ください。
組み込みコールバック
このチュートリアルでは、Early Stopping と Model Checkpoint の組み込みコールバックを使用します。それらは Trainer
に渡すことができます。
カスタムコールバック
カスタム Keras コールバックに慣れている場合、PyTorch パイプラインで同じことができる能力は、まさにケーキの上のさくらんぼです。
画像分類を実行しているため、モデルのいくつかの画像サンプルに対する予測を視覚化する能力は役立つかもしれません。このコールバックの形式で提供されることで、モデルを早期段階でデバッグするのに役立ちます。
class ImagePredictionLogger(pl.callbacks.Callback):
def __init__(self, val_samples, num_samples=32):
super().__init__()
self.num_samples = num_samples
self.val_imgs, self.val_labels = val_samples
def on_validation_epoch_end(self, trainer, pl_module):
# テンソルを CPU に移動
val_imgs = self.val_imgs.to(device=pl_module.device)
val_labels = self.val_labels.to(device=pl_module.device)
# モデル予測を取得
logits = pl_module(val_imgs)
preds = torch.argmax(logits, -1)
# wandb Image として画像をログ
trainer.logger.experiment.log({
"examples":[wandb.Image(x, caption=f"Pred:{pred}, Label:{y}")
for x, pred, y in zip(val_imgs[:self.num_samples],
preds[:self.num_samples],
val_labels[:self.num_samples])]
})
LightningModule - システムの定義
LightningModule はシステムを定義し、モデルではありません。ここでシステムはすべての研究コードを1つのクラスにまとめて自己完結型にします。LightningModule
は PyTorch コードを5つのセクションに整理します:
- 計算(
__init__
)
- トレーニングループ(
training_step
)
- 検証ループ(
validation_step
)
- テストループ(
test_step
)
- オプティマイザー(
configure_optimizers
)
このようにして、容易に共有できるデータセットに依存しないモデルを構築できます。Cifar-10 分類のためのシステムを構築しましょう。
class LitModel(pl.LightningModule):
def __init__(self, input_shape, num_classes, learning_rate=2e-4):
super().__init__()
# ハイパーパラメーターをログ
self.save_hyperparameters()
self.learning_rate = learning_rate
self.conv1 = nn.Conv2d(3, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 32, 3, 1)
self.conv3 = nn.Conv2d(32, 64, 3, 1)
self.conv4 = nn.Conv2d(64, 64, 3, 1)
self.pool1 = torch.nn.MaxPool2d(2)
self.pool2 = torch.nn.MaxPool2d(2)
n_sizes = self._get_conv_output(input_shape)
self.fc1 = nn.Linear(n_sizes, 512)
self.fc2 = nn.Linear(512, 128)
self.fc3 = nn.Linear(128, num_classes)
self.accuracy = Accuracy(task='multiclass', num_classes=num_classes)
# convブロックからLinear層に渡される出力テンソルのサイズを返します。
def _get_conv_output(self, shape):
batch_size = 1
input = torch.autograd.Variable(torch.rand(batch_size, *shape))
output_feat = self._forward_features(input)
n_size = output_feat.data.view(batch_size, -1).size(1)
return n_size
# convブロックからの特徴テンソルを返します
def _forward_features(self, x):
x = F.relu(self.conv1(x))
x = self.pool1(F.relu(self.conv2(x)))
x = F.relu(self.conv3(x))
x = self.pool2(F.relu(self.conv4(x)))
return x
# 推論中に使用されます
def forward(self, x):
x = self._forward_features(x)
x = x.view(x.size(0), -1)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = F.log_softmax(self.fc3(x), dim=1)
return x
def training_step(self, batch, batch_idx):
x, y = batch
logits = self(x)
loss = F.nll_loss(logits, y)
# トレーニングメトリクス
preds = torch.argmax(logits, dim=1)
acc = self.accuracy(preds, y)
self.log('train_loss', loss, on_step=True, on_epoch=True, logger=True)
self.log('train_acc', acc, on_step=True, on_epoch=True, logger=True)
return loss
def validation_step(self, batch, batch_idx):
x, y = batch
logits = self(x)
loss = F.nll_loss(logits, y)
# 検証メトリクス
preds = torch.argmax(logits, dim=1)
acc = self.accuracy(preds, y)
self.log('val_loss', loss, prog_bar=True)
self.log('val_acc', acc, prog_bar=True)
return loss
def test_step(self, batch, batch_idx):
x, y = batch
logits = self(x)
loss = F.nll_loss(logits, y)
# 検証メトリクス
preds = torch.argmax(logits, dim=1)
acc = self.accuracy(preds, y)
self.log('test_loss', loss, prog_bar=True)
self.log('test_acc', acc, prog_bar=True)
return loss
def configure_optimizers(self):
optimizer = torch.optim.Adam(self.parameters(), lr=self.learning_rate)
return optimizer
トレーニングと評価
DataModule
を使用してデータパイプラインを整理し、 LightningModule
を使用してモデルアーキテクチャ+トレーニングループを整理したので、PyTorch Lightning の Trainer
が他のすべてを自動化します。
Trainer は次のことを自動化します:
- エポックとバッチの反復
optimizer.step()
, backward
, zero_grad()
の呼び出し
.eval()
の呼び出し、グラッドの有効化/無効化
- 重みの保存と読み込み
- Weights & Biases ログ
- マルチ GPU トレーニングサポート
- TPU サポート
- 16 ビットトレーニングサポート
dm = CIFAR10DataModule(batch_size=32)
# x_dataloader にアクセスするには、prepare_data および setup を呼び出す必要があります。
dm.prepare_data()
dm.setup()
# 画像予測をログするカスタム ImagePredictionLogger コールバックに必要なサンプル。
val_samples = next(iter(dm.val_dataloader()))
val_imgs, val_labels = val_samples[0], val_samples[1]
val_imgs.shape, val_labels.shape
model = LitModel((3, 32, 32), dm.num_classes)
# wandb ロガーを初期化
wandb_logger = WandbLogger(project='wandb-lightning', job_type='train')
# コールバックを初期化
early_stop_callback = pl.callbacks.EarlyStopping(monitor="val_loss")
checkpoint_callback = pl.callbacks.ModelCheckpoint()
# トレーナーを初期化
trainer = pl.Trainer(max_epochs=2,
logger=wandb_logger,
callbacks=[early_stop_callback,
ImagePredictionLogger(val_samples),
checkpoint_callback],
)
# モデルのトレーニング
trainer.fit(model, dm)
# 保留中のテストセットでモデルを評価 ⚡⚡
trainer.test(dataloaders=dm.test_dataloader())
# wandb run を閉じる
wandb.finish()
最終的な考え
私は TensorFlow/Keras エコシステムから来ており、PyTorch は洗練されたフレームワークであるにもかかわらず、ちょっと難しいと感じています。個人的な経験にすぎませんが。PyTorch Lightning を探索して、私が PyTorch から遠ざけていた理由のほとんどが解消されていることに気づきました。ここに私の興奮の概要があります:
- 以前: 従来の PyTorch モデル定義はバラバラでした。モデルは
model.py
スクリプトに、トレーニングループは train.py
ファイルにありました。パイプラインを理解するためには多くの見直しが必要でした。
- 現在:
LightningModule
は、モデルが training_step
、validation_step
などと共に定義されているシステムとして機能します。今ではモジュール化され、共有可能です。
- 以前: TensorFlow/Keras の最良の部分は入力データパイプラインでした。彼らのデータセットカタログは豊富で成長しています。PyTorch のデータパイプラインは、かつて最大の痛点でした。通常の PyTorch コードでは、データのダウンロード/クリーニング/準備は通常、多くのファイルに分散しています。
- 現在: DataModule は、データパイプラインを1つの共有可能で再利用可能なクラスに組織します。それは単に
train_dataloader
、val_dataloader
(s)、test_dataloader
(s) と、必要な変換やデータプロセッシング/ダウンロードステップの集まりです。
- 以前: Keras では
model.fit
を呼び出してモデルをトレーニングし、 model.predict
で推論を実行することができました。model.evaluate
は、テストデータに基づく昔ながらのシンプルな評価を提供しましたが、これは PyTorch ではありませんでした。通常、別々の train.py
および test.py
ファイルが見つかります。
- 現在:
LightningModule
が整備されることで、Trainer
がすべてを自動化します。ただ trainer.fit
と trainer.test
を呼び出してモデルをトレーニングと評価すればよいのです。
- 以前: TensorFlow は TPU を好む、PyTorch は…
- 現在: PyTorch Lightning では、複数の GPU で同じモデルをトレーニングするのがとても簡単ですし、TPU でも可能です。
- 以前: 私はコールバックの大ファンで、カスタムコールバックを書くことを好んでいます。従来の PyTorch では、Early Stopping のような些細なことが議論の対象になることがありました。
- 現在: PyTorch Lightning を使用すると、Early Stopping と Model Checkpointing が簡単です。カスタムコールバックを書くことさえもできます。
🎨 結論とリソース
このレポートが役に立つことを願っています。コードを試して、好きなデータセットで画像分類器をトレーニングすることをお勧めします。
PyTorch Lightningについてもっと学ぶためのリソース:
7.3 - Hugging Face
Hugging Face モデルのパフォーマンスをシームレスな
W&B インテグレーションで素早く可視化しましょう。
ハイパーパラメーター、アウトプットメトリクス、GPU利用率などのシステム統計をモデル間で比較します。
なぜW&Bを使うべきか?
- 統一されたダッシュボード: モデルのすべてのメトリクスと予測のための中央リポジトリ
- 軽量: Hugging Faceとのインテグレーションにコード変更は不要
- アクセス可能: 個人や学術チームには無料
- セキュア: すべてのプロジェクトはデフォルトでプライベート
- 信頼性: OpenAI、トヨタ、Lyftなどの機械学習チームで使用されている
W&Bを機械学習モデル用のGitHubのように考えてください。プライベートでホストされたダッシュボードに機械学習の実験管理を保存します。スクリプトをどこで実行しても、モデルのすべてのバージョンが保存されることを確信して、素早く実験できます。
W&Bの軽量なインテグレーションは、任意のPythonスクリプトで動作し、モデルのトラッキングと可視化を開始するには無料のW&Bアカウントにサインアップするだけです。
Hugging Face Transformersレポジトリでは、Trainingと評価メトリクスを各ログステップでW&Bに自動的にログするようにTrainerを設定しました。
インテグレーションの仕組みを詳しく見るにはこちら: Hugging Face + W&B Report
インストール、インポート、ログイン
このチュートリアルのためにHugging FaceとWeights & Biasesのライブラリ、GLUEデータセット、トレーニングスクリプトをインストールします。
!pip install datasets wandb evaluate accelerate -qU
!wget https://raw.githubusercontent.com/huggingface/transformers/refs/heads/main/examples/pytorch/text-classification/run_glue.py
# run_glue.pyスクリプトはtransformers devを必要とします
!pip install -q git+https://github.com/huggingface/transformers
続行する前に、無料アカウントにサインアップしてください。
APIキーを入力
サインアップしたら、次のセルを実行してリンクをクリックし、APIキーを取得してこのノートブックを認証してください。
import wandb
wandb.login()
オプションで、W&Bロギングをカスタマイズするために環境変数を設定できます。ドキュメントを参照してください。
# オプション: 勾配とパラメータの両方をログします
%env WANDB_WATCH=all
モデルをトレーニング
次に、ダウンロードしたトレーニングスクリプト run_glue.py を呼び出し、トレーニングがWeights & Biasesダッシュボードに自動的にトラックされるのを確認します。このスクリプトは、Microsoft Research Paraphrase CorpusでBERTをファインチューンし、意味的に同等であることを示す人間の注釈付きの文のペアを使用します。
%env WANDB_PROJECT=huggingface-demo
%env TASK_NAME=MRPC
!python run_glue.py \
--model_name_or_path bert-base-uncased \
--task_name $TASK_NAME \
--do_train \
--do_eval \
--max_seq_length 256 \
--per_device_train_batch_size 32 \
--learning_rate 2e-4 \
--num_train_epochs 3 \
--output_dir /tmp/$TASK_NAME/ \
--overwrite_output_dir \
--logging_steps 50
ダッシュボードで結果を可視化
上記で印刷されたリンクをクリックするか、wandb.ai にアクセスして、結果がリアルタイムでストリームされるのを確認してください。ブラウザでrunを表示するリンクは、すべての依存関係がロードされた後に表示されます。次のような出力を探します: “wandb: 🚀 View run at [URL to your unique run]”
モデルのパフォーマンスを可視化
数十の実験管理を一目で確認し、興味深い学びにズームインし、高次元のデータを可視化するのは簡単です。
アーキテクチャーを比較
こちらは BERT vs DistilBERT を比較する例です。異なるアーキテクチャーがトレーニング中の評価精度にどのように影響するかを、自動ラインプロット可視化で簡単に確認できます。
重要な情報をデフォルトで簡単にトラック
Weights & Biasesは、各実験で新しいrunを保存します。デフォルトで保存される情報は次の通りです:
- ハイパーパラメーター: モデルの設定がConfigに保存されます
- モデルメトリクス: ストリーミングメトリクスの時系列データはLogに保存されます
- ターミナルログ: コマンドラインの出力は保存され、タブで利用可能です
- システムメトリクス: GPUとCPUの使用率、メモリ、温度など
詳しく知る
- ドキュメント: Weights & BiasesとHugging Faceのインテグレーションに関するドキュメント
- ビデオ: YouTubeチャンネルでのチュートリアル、実務者とのインタビュー、その他
- お問い合わせ: contact@wandb.com までご質問をお寄せください
7.4 - TensorFlow
このノートブックでカバーする内容
- Weights & Biases と TensorFlow パイプラインの簡単なインテグレーションによる実験管理。
keras.metrics
を使用したメトリクスの計算
wandb.log
を使用して、カスタムトレーニングループでこれらのメトリクスをログに記録する方法。
注: ステップ から始まるセクションは、既存のコードに W&B を統合するために必要なすべてです。それ以外は通常の MNIST の例です。
import os
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.datasets import cifar10
インストール、インポート、ログイン
W&B のインストール
%%capture
!pip install wandb
W&B のインポートとログイン
import wandb
from wandb.integration.keras import WandbMetricsLogger
wandb.login()
サイドノート: もしこれが初めて W&B を使う場合や、まだログインしていない場合、wandb.login()
実行後に表示されるリンクはサインアップ/ログインページに移動します。サインアップはワンクリックで簡単です。
データセットの準備
# トレーニング用データセットの準備
BATCH_SIZE = 64
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
x_train = np.reshape(x_train, (-1, 784))
x_test = np.reshape(x_test, (-1, 784))
# tf.data を使用して入力パイプラインを構築
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(BATCH_SIZE)
val_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test))
val_dataset = val_dataset.batch(BATCH_SIZE)
モデルとトレーニングループの定義
def make_model():
inputs = keras.Input(shape=(784,), name="digits")
x1 = keras.layers.Dense(64, activation="relu")(inputs)
x2 = keras.layers.Dense(64, activation="relu")(x1)
outputs = keras.layers.Dense(10, name="predictions")(x2)
return keras.Model(inputs=inputs, outputs=outputs)
def train_step(x, y, model, optimizer, loss_fn, train_acc_metric):
with tf.GradientTape() as tape:
logits = model(x, training=True)
loss_value = loss_fn(y, logits)
grads = tape.gradient(loss_value, model.trainable_weights)
optimizer.apply_gradients(zip(grads, model.trainable_weights))
train_acc_metric.update_state(y, logits)
return loss_value
def test_step(x, y, model, loss_fn, val_acc_metric):
val_logits = model(x, training=False)
loss_value = loss_fn(y, val_logits)
val_acc_metric.update_state(y, val_logits)
return loss_value
トレーニングループに wandb.log
を追加
def train(
train_dataset,
val_dataset,
model,
optimizer,
train_acc_metric,
val_acc_metric,
epochs=10,
log_step=200,
val_log_step=50,
):
for epoch in range(epochs):
print("\nStart of epoch %d" % (epoch,))
train_loss = []
val_loss = []
# データセットのバッチに対して繰り返し処理
for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
loss_value = train_step(
x_batch_train,
y_batch_train,
model,
optimizer,
loss_fn,
train_acc_metric,
)
train_loss.append(float(loss_value))
# 各エポックの終了時に検証ループを実行
for step, (x_batch_val, y_batch_val) in enumerate(val_dataset):
val_loss_value = test_step(
x_batch_val, y_batch_val, model, loss_fn, val_acc_metric
)
val_loss.append(float(val_loss_value))
# 各エポックの終了時にメトリクスを表示
train_acc = train_acc_metric.result()
print("Training acc over epoch: %.4f" % (float(train_acc),))
val_acc = val_acc_metric.result()
print("Validation acc: %.4f" % (float(val_acc),))
# 各エポックの終了時にメトリクスをリセット
train_acc_metric.reset_states()
val_acc_metric.reset_states()
# ⭐: wandb.log を使用してメトリクスをログに記録
wandb.log(
{
"epochs": epoch,
"loss": np.mean(train_loss),
"acc": float(train_acc),
"val_loss": np.mean(val_loss),
"val_acc": float(val_acc),
}
)
トレーニングを実行
wandb.init
を呼び出して run を開始
これにより、実験を起動したことがわかり、ユニークな ID とダッシュボードを提供できます。
公式ドキュメントをチェック
# プロジェクト名で wandb を初期化し、設定をオプションで指定します。
# 設定の値を変えて、wandb ダッシュボードでの結果を確認してください。
config = {
"learning_rate": 0.001,
"epochs": 10,
"batch_size": 64,
"log_step": 200,
"val_log_step": 50,
"architecture": "CNN",
"dataset": "CIFAR-10",
}
run = wandb.init(project='my-tf-integration', config=config)
config = run.config
# モデルの初期化
model = make_model()
# モデルをトレーニングするためのオプティマイザーをインスタンス化
optimizer = keras.optimizers.SGD(learning_rate=config.learning_rate)
# 損失関数をインスタンス化
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)
# メトリクスを準備
train_acc_metric = keras.metrics.SparseCategoricalAccuracy()
val_acc_metric = keras.metrics.SparseCategoricalAccuracy()
train(
train_dataset,
val_dataset,
model,
optimizer,
train_acc_metric,
val_acc_metric,
epochs=config.epochs,
log_step=config.log_step,
val_log_step=config.val_log_step,
)
run.finish() # Jupyter/Colab では、完了したことを知らせます!
結果を可視化
上記の run ページ リンクをクリックして、ライブ結果を確認してください。
Sweep 101
Weights & Biases Sweeps を使用してハイパーパラメータの最適化を自動化し、可能なモデルのスペースを探索しましょう。
W&B Sweeps を使用するメリット
- 簡単なセットアップ: 数行のコードで W&B sweeps を実行できます。
- 透明性: 使用するアルゴリズムをすべて引用しており、コードはオープンソースです。
- 強力: スイープは完全にカスタマイズ可能で設定可能です。何十台ものマシンでスイープを起動することができ、それはラップトップでスイープを開始するのと同じくらい簡単です。
サンプルギャラリー
W&B を使って記録・可視化されたプロジェクトの例を見ることができます。Fully Connected →
ベストプラクティス
- Projects: 複数の runs をプロジェクトにログして、それらを比較します。
wandb.init(project="project-name")
- Groups: 複数のプロセスや交差検証フォールドの場合は、それぞれのプロセスを個別の run としてログし、まとめてグループ化します。
wandb.init(group="experiment-1")
- Tags: 現在のベースラインやプロダクションモデルを追跡するためにタグを追加します。
- Notes: テーブルにメモを入力して、runs 間の変更を追跡します。
- Reports: 進捗について同僚と共有するために迅速にメモを取り、ML プロジェクトのダッシュボードとスナップショットを作成します。
高度なセットアップ
- 環境変数: APIキーを環境変数に設定して、管理されたクラスターでトレーニングを実行できるようにします。
- オフラインモード
- オンプレミス: W&B をプライベートクラウドやエアギャップサーバーのあなたのインフラストラクチャ上にインストールします。学術的なユーザーから企業間のチームまで、みんなのためにローカルインストールを提供しています。
- Artifacts: モデルとデータセットを一元化された方法で追跡し、バージョン管理することで、モデルをトレーニングする際にパイプラインステップを自動的にキャプチャします。
7.5 - TensorFlow スイープ
W&B を使用して、機械学習実験管理、データセットのバージョン管理、プロジェクトコラボレーションを行いましょう。
W&B Sweeps を使用してハイパーパラメーターの最適化を自動化し、インタラクティブな ダッシュボードでモデルの可能性を探りましょう:
なぜ sweeps を使うのか
- クイックセットアップ: W&B sweeps を数行のコードで実行。
- 透明性: このプロジェクトは使用されるすべてのアルゴリズムを引用し、コードはオープンソースです。
- 強力: Sweeps はカスタマイズオプションを提供し、複数のマシンやノートパソコンで簡単に実行できます。
詳しくは、Sweep ドキュメントを参照してください。
このノートブックで扱う内容
- W&B Sweep と TensorFlow でのカスタム トレーニングループを開始する手順。
- 画像分類タスクのための最適なハイパーパラメーターを見つけること。
注意: Step から始まるセクションには、ハイパーパラメータースイープを実行するために必要なコードが示されています。それ以外はシンプルな例を設定します。
インストール、インポート、ログイン
W&B のインストール
W&B のインポートとログイン
import tqdm
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.datasets import cifar10
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import wandb
from wandb.integration.keras import WandbMetricsLogger
wandb.login()
W&B が初めてであるか、ログインしていない場合、wandb.login()
を実行した後のリンクは、新規登録/ログインページへと案内します。
データセットの準備
# トレーニングデータセットの準備
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
x_train = x_train / 255.0
x_test = x_test / 255.0
x_train = np.reshape(x_train, (-1, 784))
x_test = np.reshape(x_test, (-1, 784))
分類器 MLP の構築
def Model():
inputs = keras.Input(shape=(784,), name="digits")
x1 = keras.layers.Dense(64, activation="relu")(inputs)
x2 = keras.layers.Dense(64, activation="relu")(x1)
outputs = keras.layers.Dense(10, name="predictions")(x2)
return keras.Model(inputs=inputs, outputs=outputs)
def train_step(x, y, model, optimizer, loss_fn, train_acc_metric):
with tf.GradientTape() as tape:
logits = model(x, training=True)
loss_value = loss_fn(y, logits)
grads = tape.gradient(loss_value, model.trainable_weights)
optimizer.apply_gradients(zip(grads, model.trainable_weights))
train_acc_metric.update_state(y, logits)
return loss_value
def test_step(x, y, model, loss_fn, val_acc_metric):
val_logits = model(x, training=False)
loss_value = loss_fn(y, val_logits)
val_acc_metric.update_state(y, val_logits)
return loss_value
トレーニングループの記述
def train(
train_dataset,
val_dataset,
model,
optimizer,
loss_fn,
train_acc_metric,
val_acc_metric,
epochs=10,
log_step=200,
val_log_step=50,
):
for epoch in range(epochs):
print("\nStart of epoch %d" % (epoch,))
train_loss = []
val_loss = []
# データセットのバッチを繰り返す
for step, (x_batch_train, y_batch_train) in tqdm.tqdm(
enumerate(train_dataset), total=len(train_dataset)
):
loss_value = train_step(
x_batch_train,
y_batch_train,
model,
optimizer,
loss_fn,
train_acc_metric,
)
train_loss.append(float(loss_value))
# 各エポックの終わりに検証ループを実行
for step, (x_batch_val, y_batch_val) in enumerate(val_dataset):
val_loss_value = test_step(
x_batch_val, y_batch_val, model, loss_fn, val_acc_metric
)
val_loss.append(float(val_loss_value))
# 各エポック終了時にメトリクスを表示
train_acc = train_acc_metric.result()
print("Training acc over epoch: %.4f" % (float(train_acc),))
val_acc = val_acc_metric.result()
print("Validation acc: %.4f" % (float(val_acc),))
# 各エポック終了時にメトリクスをリセット
train_acc_metric.reset_states()
val_acc_metric.reset_states()
# 3️⃣ wandb.log を使用してメトリクスをログ
wandb.log(
{
"epochs": epoch,
"loss": np.mean(train_loss),
"acc": float(train_acc),
"val_loss": np.mean(val_loss),
"val_acc": float(val_acc),
}
)
sweep を設定する
sweep を設定する手順:
- 最適化するハイパーパラメーターを定義する
- 最適化メソッドを選択する:
random
、grid
、または bayes
bayes
の目標とメトリクスを設定する。例えば val_loss
を最小化する
hyperband
を使用して、実行中のものを早期終了する
詳しくは W&B Sweeps ドキュメントを参照してください。
sweep_config = {
"method": "random",
"metric": {"name": "val_loss", "goal": "minimize"},
"early_terminate": {"type": "hyperband", "min_iter": 5},
"parameters": {
"batch_size": {"values": [32, 64, 128, 256]},
"learning_rate": {"values": [0.01, 0.005, 0.001, 0.0005, 0.0001]},
},
}
トレーニングループを包む
wandb.config
を使用してハイパーパラメーターを設定してから train
を呼び出すような関数 sweep_train
を作成します。
def sweep_train(config_defaults=None):
# デフォルト値の設定
config_defaults = {"batch_size": 64, "learning_rate": 0.01}
# サンプルプロジェクト名で wandb を初期化
wandb.init(config=config_defaults) # これは Sweep で上書きされます
# 他のハイパーパラメータを設定に指定、ある場合
wandb.config.epochs = 2
wandb.config.log_step = 20
wandb.config.val_log_step = 50
wandb.config.architecture_name = "MLP"
wandb.config.dataset_name = "MNIST"
# tf.data を使用して入力パイプラインを構築
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = (
train_dataset.shuffle(buffer_size=1024)
.batch(wandb.config.batch_size)
.prefetch(buffer_size=tf.data.AUTOTUNE)
)
val_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test))
val_dataset = val_dataset.batch(wandb.config.batch_size).prefetch(
buffer_size=tf.data.AUTOTUNE
)
# モデルを初期化
model = Model()
# モデルをトレーニングするためのオプティマイザーをインスタンス化
optimizer = keras.optimizers.SGD(learning_rate=wandb.config.learning_rate)
# 損失関数をインスタンス化
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)
# メトリクスを準備
train_acc_metric = keras.metrics.SparseCategoricalAccuracy()
val_acc_metric = keras.metrics.SparseCategoricalAccuracy()
train(
train_dataset,
val_dataset,
model,
optimizer,
loss_fn,
train_acc_metric,
val_acc_metric,
epochs=wandb.config.epochs,
log_step=wandb.config.log_step,
val_log_step=wandb.config.val_log_step,
)
sweep を初期化し、パーソナルデジタルアシスタントを実行
sweep_id = wandb.sweep(sweep_config, project="sweeps-tensorflow")
count
パラメーターで実行の数を制限します。迅速な実行のために 10 に設定します。必要に応じて増やしてください。
wandb.agent(sweep_id, function=sweep_train, count=10)
結果を視覚化
ライブ結果を見るには、先行する Sweep URL リンクをクリックしてください。
サンプルギャラリー
W&B で追跡および視覚化されたプロジェクトを Gallery で探索してください。
ベストプラクティス
- Projects: 複数の実行をプロジェクトに記録し、それらを比較します。
wandb.init(project="project-name")
- Groups: 複数プロセスまたはクロスバリデーション折りたたみのために各プロセスを run として記録し、それらをグループ化します。
wandb.init(group='experiment-1')
- Tags: ベースラインまたはプロダクションモデルを追跡するためにタグを使用します。
- Notes: テーブルのメモに run 間の変更を追跡するためのメモを入力します。
- Reports: レポートを使用して進捗メモを作成し、同僚と共有し、MLプロジェクトのダッシュボードとスナップショットを作成します。
高度なセットアップ
- 環境変数: 管理されたクラスターでのトレーニングのために API キーを設定します。
- オフラインモード
- オンプレミス: W&B をプライベートクラウドまたはインフラストラクチャ内のエアギャップサーバーにインストールします。ローカルインストールは学術機関および企業チームに適しています。
7.6 - 3D 脳腫瘍セグメンテーション with MONAI
このチュートリアルでは、MONAIを使用して、マルチラベルの3D脳腫瘍セグメンテーションタスクのトレーニングワークフローを構築し、Weights & Biasesの実験管理とデータ可視化機能を使用する方法をデモンストレーションします。チュートリアルには以下の機能が含まれています。
- Weights & Biasesのrunを初期化し、再現性を確保するためにrunに関連するすべての設定を同期。
- MONAIトランスフォームAPI:
- 辞書形式のデータ用のMONAIトランスフォーム。
- MONAIの
transforms
APIに従った新しいトランスフォームの定義方法。
- データ拡張のための強度をランダムに調整する方法。
- データのロードと可視化:
- メタデータで
Nifti
画像をロードし、画像のリストをロードしてスタック。
- IOとトランスフォームをキャッシュしてトレーニングと検証を加速。
wandb.Table
とWeights & Biasesでインタラクティブなセグメンテーションオーバーレイを用いてデータを可視化。
- 3D
SegResNet
モデルのトレーニング
- MONAIの
networks
, losses
, metrics
APIsを使用。
- PyTorchトレーニングループを使用して3D
SegResNet
モデルをトレーニング。
- Weights & Biasesを使用してトレーニングの実験管理を追跡。
- Weights & Biases上でモデルのチェックポイントをモデルアーティファクトとしてログとバージョン管理。
wandb.Table
とWeights & Biasesでインタラクティブなセグメンテーションオーバーレイを使用して、検証データセット上の予測を可視化して比較。
セットアップとインストール
まず、MONAIとWeights & Biasesの最新バージョンをインストールします。
!python -c "import monai" || pip install -q -U "monai[nibabel, tqdm]"
!python -c "import wandb" || pip install -q -U wandb
import os
import numpy as np
from tqdm.auto import tqdm
import wandb
from monai.apps import DecathlonDataset
from monai.data import DataLoader, decollate_batch
from monai.losses import DiceLoss
from monai.inferers import sliding_window_inference
from monai.metrics import DiceMetric
from monai.networks.nets import SegResNet
from monai.transforms import (
Activations,
AsDiscrete,
Compose,
LoadImaged,
MapTransform,
NormalizeIntensityd,
Orientationd,
RandFlipd,
RandScaleIntensityd,
RandShiftIntensityd,
RandSpatialCropd,
Spacingd,
EnsureTyped,
EnsureChannelFirstd,
)
from monai.utils import set_determinism
import torch
次に、ColabインスタンスをW&Bで認証します。
W&B Runの初期化
新しいW&B runを開始して実験を追跡します。
wandb.init(project="monai-brain-tumor-segmentation")
適切な設定システムの使用は、再現性のある機械学習のための推奨されるベストプラクティスです。W&Bを使用して、各実験のハイパーパラメーターを追跡できます。
config = wandb.config
config.seed = 0
config.roi_size = [224, 224, 144]
config.batch_size = 1
config.num_workers = 4
config.max_train_images_visualized = 20
config.max_val_images_visualized = 20
config.dice_loss_smoothen_numerator = 0
config.dice_loss_smoothen_denominator = 1e-5
config.dice_loss_squared_prediction = True
config.dice_loss_target_onehot = False
config.dice_loss_apply_sigmoid = True
config.initial_learning_rate = 1e-4
config.weight_decay = 1e-5
config.max_train_epochs = 50
config.validation_intervals = 1
config.dataset_dir = "./dataset/"
config.checkpoint_dir = "./checkpoints"
config.inference_roi_size = (128, 128, 64)
config.max_prediction_images_visualized = 20
また、ランダムシードを設定して、モジュールの決定的なトレーニングを有効または無効にする必要があります。
set_determinism(seed=config.seed)
# ディレクトリを作成
os.makedirs(config.dataset_dir, exist_ok=True)
os.makedirs(config.checkpoint_dir, exist_ok=True)
データのロードと変換
ここでは、monai.transforms
APIを使用して、マルチクラスラベルをワンホット形式でのマルチラベルセグメンテーションタスクに変換するカスタムトランスフォームを作成します。
class ConvertToMultiChannelBasedOnBratsClassesd(MapTransform):
"""
脳腫瘍クラスに基づいてラベルをマルチチャネルに変換します:
ラベル 1 は腫瘍周辺の浮腫
ラベル 2 はGD 増強腫瘍
ラベル 3 は壊死および非増強腫瘍コア
考えられるクラスはTC (腫瘍コア)、WT (全腫瘍)
ET (増強腫瘍)です。
参考:https://github.com/Project-MONAI/tutorials/blob/main/3d_segmentation/brats_segmentation_3d.ipynb
"""
def __call__(self, data):
d = dict(data)
for key in self.keys:
result = []
# label 2 と label 3 を組み合わせて TC を構築
result.append(torch.logical_or(d[key] == 2, d[key] == 3))
# label 1, 2 と 3 を組み合わせて WT を構築
result.append(
torch.logical_or(
torch.logical_or(d[key] == 2, d[key] == 3), d[key] == 1
)
)
# label 2 は ET
result.append(d[key] == 2)
d[key] = torch.stack(result, axis=0).float()
return d
次に、トレーニングデータセットと検証データセット用にそれぞれトランスフォームを設定します。
train_transform = Compose(
[
# 4つの Nifti 画像を読み込み、それらをスタック
LoadImaged(keys=["image", "label"]),
EnsureChannelFirstd(keys="image"),
EnsureTyped(keys=["image", "label"]),
ConvertToMultiChannelBasedOnBratsClassesd(keys="label"),
Orientationd(keys=["image", "label"], axcodes="RAS"),
Spacingd(
keys=["image", "label"],
pixdim=(1.0, 1.0, 1.0),
mode=("bilinear", "nearest"),
),
RandSpatialCropd(
keys=["image", "label"], roi_size=config.roi_size, random_size=False
),
RandFlipd(keys=["image", "label"], prob=0.5, spatial_axis=0),
RandFlipd(keys=["image", "label"], prob=0.5, spatial_axis=1),
RandFlipd(keys=["image", "label"], prob=0.5, spatial_axis=2),
NormalizeIntensityd(keys="image", nonzero=True, channel_wise=True),
RandScaleIntensityd(keys="image", factors=0.1, prob=1.0),
RandShiftIntensityd(keys="image", offsets=0.1, prob=1.0),
]
)
val_transform = Compose(
[
LoadImaged(keys=["image", "label"]),
EnsureChannelFirstd(keys="image"),
EnsureTyped(keys=["image", "label"]),
ConvertToMultiChannelBasedOnBratsClassesd(keys="label"),
Orientationd(keys=["image", "label"], axcodes="RAS"),
Spacingd(
keys=["image", "label"],
pixdim=(1.0, 1.0, 1.0),
mode=("bilinear", "nearest"),
),
NormalizeIntensityd(keys="image", nonzero=True, channel_wise=True),
]
)
データセット
この実験で使用されるデータセットは、http://medicaldecathlon.com/ から入手可能です。マルチモーダルおよびマルチサイトMRIデータ(FLAIR, T1w, T1gd, T2w)を使用して、膠芽腫、壊死/活動中の腫瘍、および浮腫をセグメント化します。データセットは750個の4Dボリューム(484 トレーニング + 266 テスト)で構成されています。
DecathlonDataset
を使用してデータセットを自動的にダウンロードし、抽出します。MONAI CacheDataset
を継承し、cache_num=N
を設定してトレーニングのために N
アイテムをキャッシュし、メモリサイズに応じてデフォルト引数を使用して検証のためにすべてのアイテムをキャッシュできます。
train_dataset = DecathlonDataset(
root_dir=config.dataset_dir,
task="Task01_BrainTumour",
transform=val_transform,
section="training",
download=True,
cache_rate=0.0,
num_workers=4,
)
val_dataset = DecathlonDataset(
root_dir=config.dataset_dir,
task="Task01_BrainTumour",
transform=val_transform,
section="validation",
download=False,
cache_rate=0.0,
num_workers=4,
)
注意: train_transform
を train_dataset
に適用する代わりに、val_transform
をトレーニングおよび検証データセットの両方に適用します。これは、トレーニングの前に、データセットの2つのスプリットからのサンプルを視覚化するためです。
データセットの可視化
Weights & Biasesは画像、ビデオ、オーディオなどをサポートしています。リッチメディアをログに取り込み、結果を探索し、run、モデル、データセットを視覚的に比較できます。セグメンテーションマスクオーバーレイシステムを使用してデータボリュームを可視化します。 テーブルにセグメンテーションマスクをログに追加するには、テーブル内の各行にwandb.Image
オブジェクトを提供する必要があります。
次の擬似コードは、その例です。
table = wandb.Table(columns=["ID", "Image"])
for id, img, label in zip(ids, images, labels):
mask_img = wandb.Image(
img,
masks={
"prediction": {"mask_data": label, "class_labels": class_labels}
# ...
},
)
table.add_data(id, img)
wandb.log({"Table": table})
次に、サンプル画像、ラベル、wandb.Table
オブジェクトと関連するメタデータを受け取り、Weights & Biases ダッシュボードにログされるテーブルの行を埋めるユーティリティ関数を作成します。
def log_data_samples_into_tables(
sample_image: np.array,
sample_label: np.array,
split: str = None,
data_idx: int = None,
table: wandb.Table = None,
):
num_channels, _, _, num_slices = sample_image.shape
with tqdm(total=num_slices, leave=False) as progress_bar:
for slice_idx in range(num_slices):
ground_truth_wandb_images = []
for channel_idx in range(num_channels):
ground_truth_wandb_images.append(
masks = {
"ground-truth/Tumor-Core": {
"mask_data": sample_label[0, :, :, slice_idx],
"class_labels": {0: "background", 1: "Tumor Core"},
},
"ground-truth/Whole-Tumor": {
"mask_data": sample_label[1, :, :, slice_idx] * 2,
"class_labels": {0: "background", 2: "Whole Tumor"},
},
"ground-truth/Enhancing-Tumor": {
"mask_data": sample_label[2, :, :, slice_idx] * 3,
"class_labels": {0: "background", 3: "Enhancing Tumor"},
},
}
wandb.Image(
sample_image[channel_idx, :, :, slice_idx],
masks=masks,
)
)
table.add_data(split, data_idx, slice_idx, *ground_truth_wandb_images)
progress_bar.update(1)
return table
次に、wandb.Table
オブジェクトと、それが含む列を定義し、データ可視化を使用してそれを埋めます。
table = wandb.Table(
columns=[
"Split",
"Data Index",
"Slice Index",
"Image-Channel-0",
"Image-Channel-1",
"Image-Channel-2",
"Image-Channel-3",
]
)
次に、それぞれtrain_dataset
とval_dataset
をループして、データサンプルの可視化を生成し、ダッシュボードにログを取るためのテーブルの行を埋めます。
# train_dataset の可視化を生成
max_samples = (
min(config.max_train_images_visualized, len(train_dataset))
if config.max_train_images_visualized > 0
else len(train_dataset)
)
progress_bar = tqdm(
enumerate(train_dataset[:max_samples]),
total=max_samples,
desc="Generating Train Dataset Visualizations:",
)
for data_idx, sample in progress_bar:
sample_image = sample["image"].detach().cpu().numpy()
sample_label = sample["label"].detach().cpu().numpy()
table = log_data_samples_into_tables(
sample_image,
sample_label,
split="train",
data_idx=data_idx,
table=table,
)
# val_dataset の可視化を生成
max_samples = (
min(config.max_val_images_visualized, len(val_dataset))
if config.max_val_images_visualized > 0
else len(val_dataset)
)
progress_bar = tqdm(
enumerate(val_dataset[:max_samples]),
total=max_samples,
desc="Generating Validation Dataset Visualizations:",
)
for data_idx, sample in progress_bar:
sample_image = sample["image"].detach().cpu().numpy()
sample_label = sample["label"].detach().cpu().numpy()
table = log_data_samples_into_tables(
sample_image,
sample_label,
split="val",
data_idx=data_idx,
table=table,
)
# ダッシュボードにテーブルをログ
wandb.log({"Tumor-Segmentation-Data": table})
データはW&Bダッシュボード上でインタラクティブな表形式で表示されます。我々はデータボリュームの特定のスライスの各チャンネルを、各行の対応するセグメンテーションマスクと重ね合わせて見ることができます。テーブルのデータを論理的にフィルタリングして、特定の行に集中するために Weave クエリ を書くことができます。
 |
ログされたテーブルデータの例。 |
画像を開き、インタラクティブなオーバーレイを使用して、各セグメンテーションマスクをどのように操作できるかを見てみてください。
 |
セグメンテーションマップの可視化例。 |
注意: データセットのラベルはクラス間で重ならないマスクで構成されています。オーバーレイはラベルをオーバーレイ内の個別のマスクとしてログします。
データのロード
データセットからデータをロードするための PyTorch DataLoaders を作成します。 DataLoaders を作成する前に、train_dataset
の transform
を train_transform
に設定して、トレーニング用のデータを前処理および変換します。
# トレーニングデータセットにtrain_transformsを適用
train_dataset.transform = train_transform
# train_loaderを作成
train_loader = DataLoader(
train_dataset,
batch_size=config.batch_size,
shuffle=True,
num_workers=config.num_workers,
)
# val_loaderを作成
val_loader = DataLoader(
val_dataset,
batch_size=config.batch_size,
shuffle=False,
num_workers=config.num_workers,
)
モデル、損失、およびオプティマイザーの作成
このチュートリアルでは、 3D MRI brain tumor segmentation using auto-encoder regularization に基づいた SegResNet
モデルを作成します。SegResNet
モデルは、 monai.networks
API の一部として PyTorch モジュールとして実装されています。これはオプティマイザーと学習率スケジューラともに利用可能です。
device = torch.device("cuda:0")
# モデルの作成
model = SegResNet(
blocks_down=[1, 2, 2, 4],
blocks_up=[1, 1, 1],
init_filters=16,
in_channels=4,
out_channels=3,
dropout_prob=0.2,
).to(device)
# オプティマイザーの作成
optimizer = torch.optim.Adam(
model.parameters(),
config.initial_learning_rate,
weight_decay=config.weight_decay,
)
# 学習率スケジューラの作成
lr_scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
optimizer, T_max=config.max_train_epochs
)
損失を monai.losses
API を使用してマルチラベル DiceLoss
として定義し、それに対応するダイスメトリクスを monai.metrics
API を使用して定義します。
loss_function = DiceLoss(
smooth_nr=config.dice_loss_smoothen_numerator,
smooth_dr=config.dice_loss_smoothen_denominator,
squared_pred=config.dice_loss_squared_prediction,
to_onehot_y=config.dice_loss_target_onehot,
sigmoid=config.dice_loss_apply_sigmoid,
)
dice_metric = DiceMetric(include_background=True, reduction="mean")
dice_metric_batch = DiceMetric(include_background=True, reduction="mean_batch")
post_trans = Compose([Activations(sigmoid=True), AsDiscrete(threshold=0.5)])
# 自動混合精度を使用してトレーニングを加速
scaler = torch.cuda.amp.GradScaler()
torch.backends.cudnn.benchmark = True
混合精度推論のための小さなユーティリティを定義します。これはトレーニングプロセスの検証ステップおよびトレーニング後にモデルを実行したいときに役立ちます。
def inference(model, input):
def _compute(input):
return sliding_window_inference(
inputs=input,
roi_size=(240, 240, 160),
sw_batch_size=1,
predictor=model,
overlap=0.5,
)
with torch.cuda.amp.autocast():
return _compute(input)
トレーニングと検証
トレーニングの前に、トレーニングと検証の実験管理を追跡するために wandb.log()
でログを取るメトリクスプロパティを定義します。
wandb.define_metric("epoch/epoch_step")
wandb.define_metric("epoch/*", step_metric="epoch/epoch_step")
wandb.define_metric("batch/batch_step")
wandb.define_metric("batch/*", step_metric="batch/batch_step")
wandb.define_metric("validation/validation_step")
wandb.define_metric("validation/*", step_metric="validation/validation_step")
batch_step = 0
validation_step = 0
metric_values = []
metric_values_tumor_core = []
metric_values_whole_tumor = []
metric_values_enhanced_tumor = []
標準的な PyTorch トレーニングループの実行
# W&B アーティファクトオブジェクトを定義
artifact = wandb.Artifact(
name=f"{wandb.run.id}-checkpoint", type="model"
)
epoch_progress_bar = tqdm(range(config.max_train_epochs), desc="Training:")
for epoch in epoch_progress_bar:
model.train()
epoch_loss = 0
total_batch_steps = len(train_dataset) // train_loader.batch_size
batch_progress_bar = tqdm(train_loader, total=total_batch_steps, leave=False)
# トレーニングステップ
for batch_data in batch_progress_bar:
inputs, labels = (
batch_data["image"].to(device),
batch_data["label"].to(device),
)
optimizer.zero_grad()
with torch.cuda.amp.autocast():
outputs = model(inputs)
loss = loss_function(outputs, labels)
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
epoch_loss += loss.item()
batch_progress_bar.set_description(f"train_loss: {loss.item():.4f}:")
## バッチ単位のトレーニング損失を W&B にログ
wandb.log({"batch/batch_step": batch_step, "batch/train_loss": loss.item()})
batch_step += 1
lr_scheduler.step()
epoch_loss /= total_batch_steps
## エポック単位のトレーニング損失と学習率を W&B にログ
wandb.log(
{
"epoch/epoch_step": epoch,
"epoch/mean_train_loss": epoch_loss,
"epoch/learning_rate": lr_scheduler.get_last_lr()[0],
}
)
epoch_progress_bar.set_description(f"Training: train_loss: {epoch_loss:.4f}:")
# 検証とモデルのチェックポイントステップ
if (epoch + 1) % config.validation_intervals == 0:
model.eval()
with torch.no_grad():
for val_data in val_loader:
val_inputs, val_labels = (
val_data["image"].to(device),
val_data["label"].to(device),
)
val_outputs = inference(model, val_inputs)
val_outputs = [post_trans(i) for i in decollate_batch(val_outputs)]
dice_metric(y_pred=val_outputs, y=val_labels)
dice_metric_batch(y_pred=val_outputs, y=val_labels)
metric_values.append(dice_metric.aggregate().item())
metric_batch = dice_metric_batch.aggregate()
metric_values_tumor_core.append(metric_batch[0].item())
metric_values_whole_tumor.append(metric_batch[1].item())
metric_values_enhanced_tumor.append(metric_batch[2].item())
dice_metric.reset()
dice_metric_batch.reset()
checkpoint_path = os.path.join(config.checkpoint_dir, "model.pth")
torch.save(model.state_dict(), checkpoint_path)
# W&Bアーティファクトを使用してモデルのチェックポイントをログとバージョン管理。
artifact.add_file(local_path=checkpoint_path)
wandb.log_artifact(artifact, aliases=[f"epoch_{epoch}"])
# W&Bダッシュボードに検証メトリクスをログ。
wandb.log(
{
"validation/validation_step": validation_step,
"validation/mean_dice": metric_values[-1],
"validation/mean_dice_tumor_core": metric_values_tumor_core[-1],
"validation/mean_dice_whole_tumor": metric_values_whole_tumor[-1],
"validation/mean_dice_enhanced_tumor": metric_values_enhanced_tumor[-1],
}
)
validation_step += 1
# このアーティファクトがログを終了するのを待ちます。
artifact.wait()
コードを wandb.log
で計装することで、トレーニングと検証プロセスに関連するメトリクスすべてを追跡するだけでなく、W&Bダッシュボード上のすべてのシステムメトリクス(この場合はCPUとGPU)も追跡できます。
 |
W&Bでのトレーニングと検証プロセス追跡の例。 |
W&Bの run ダッシュボードのアーティファクトタブに移動して、トレーニング中にログされたモデルチェックポイントアーティファクトの異なるバージョンにアクセスします。
 |
W&Bでのモデルチェックポイントのログとバージョン管理の例。 |
推論
アーティファクトインターフェースを使用して、このケースでは平均エポック単位のトレーニング損失が最良のモデルチェックポイントであるアーティファクトのバージョンを選択できます。アーティファクト全体のリネージを探索し、必要なバージョンを使用することもできます。
 |
W&Bでのモデルアーティファクト追跡の例。 |
最良のエポック単位の平均トレーニング損失を持つモデルアーティファクトのバージョンをフェッチし、チェックポイントステート辞書をモデルにロードします。
model_artifact = wandb.use_artifact(
"geekyrakshit/monai-brain-tumor-segmentation/d5ex6n4a-checkpoint:v49",
type="model",
)
model_artifact_dir = model_artifact.download()
model.load_state_dict(torch.load(os.path.join(model_artifact_dir, "model.pth")))
model.eval()
予測の可視化と正解ラベルとの比較
予測されたセグメンテーションマスクと対応する正解のセグメンテーションマスクをインタラクティブなセグメンテーションマスクオーバーレイを使用して視覚化するためのユーティリティ関数を作成します。
def log_predictions_into_tables(
sample_image: np.array,
sample_label: np.array,
predicted_label: np.array,
split: str = None,
data_idx: int = None,
table: wandb.Table = None,
):
num_channels, _, _, num_slices = sample_image.shape
with tqdm(total=num_slices, leave=False) as progress_bar:
for slice_idx in range(num_slices):
wandb_images = []
for channel_idx in range(num_channels):
wandb_images += [
wandb.Image(
sample_image[channel_idx, :, :, slice_idx],
masks={
"ground-truth/Tumor-Core": {
"mask_data": sample_label[0, :, :, slice_idx],
"class_labels": {0: "background", 1: "Tumor Core"},
},
"prediction/Tumor-Core": {
"mask_data": predicted_label[0, :, :, slice_idx] * 2,
"class_labels": {0: "background", 2: "Tumor Core"},
},
},
),
wandb.Image(
sample_image[channel_idx, :, :, slice_idx],
masks={
"ground-truth/Whole-Tumor": {
"mask_data": sample_label[1, :, :, slice_idx],
"class_labels": {0: "background", 1: "Whole Tumor"},
},
"prediction/Whole-Tumor": {
"mask_data": predicted_label[1, :, :, slice_idx] * 2,
"class_labels": {0: "background", 2: "Whole Tumor"},
},
},
),
wandb.Image(
sample_image[channel_idx, :, :, slice_idx],
masks={
"ground-truth/Enhancing-Tumor": {
"mask_data": sample_label[2, :, :, slice_idx],
"class_labels": {0: "background", 1: "Enhancing Tumor"},
},
"prediction/Enhancing-Tumor": {
"mask_data": predicted_label[2, :, :, slice_idx] * 2,
"class_labels": {0: "background", 2: "Enhancing Tumor"},
},
},
),
]
table.add_data(split, data_idx, slice_idx, *wandb_images)
progress_bar.update(1)
return table
予測結果を予測テーブルにログします。
# 予測テーブルを作成
prediction_table = wandb.Table(
columns=[
"Split",
"Data Index",
"Slice Index",
"Image-Channel-0/Tumor-Core",
"Image-Channel-1/Tumor-Core",
"Image-Channel-2/Tumor-Core",
"Image-Channel-3/Tumor-Core",
"Image-Channel-0/Whole-Tumor",
"Image-Channel-1/Whole-Tumor",
"Image-Channel-2/Whole-Tumor",
"Image-Channel-3/Whole-Tumor",
"Image-Channel-0/Enhancing-Tumor",
"Image-Channel-1/Enhancing-Tumor",
"Image-Channel-2/Enhancing-Tumor",
"Image-Channel-3/Enhancing-Tumor",
]
)
# 推論と可視化を実行
with torch.no_grad():
config.max_prediction_images_visualized
max_samples = (
min(config.max_prediction_images_visualized, len(val_dataset))
if config.max_prediction_images_visualized > 0
else len(val_dataset)
)
progress_bar = tqdm(
enumerate(val_dataset[:max_samples]),
total=max_samples,
desc="Generating Predictions:",
)
for data_idx, sample in progress_bar:
val_input = sample["image"].unsqueeze(0).to(device)
val_output = inference(model, val_input)
val_output = post_trans(val_output[0])
prediction_table = log_predictions_into_tables(
sample_image=sample["image"].cpu().numpy(),
sample_label=sample["label"].cpu().numpy(),
predicted_label=val_output.cpu().numpy(),
data_idx=data_idx,
split="validation",
table=prediction_table,
)
wandb.log({"Predictions/Tumor-Segmentation-Data": prediction_table})
# 実験終了
wandb.finish()
インタラクティブなセグメンテーションマスクオーバーレイを使用して、予測されたセグメンテーションマスクと各クラスの正解ラベルを分析および比較します。
 |
W&Bでの予測と正解の可視化例。 |
謝辞と追加リソース
7.7 - Keras
Weights & Biases を使用して、機械学習の実験管理、データセット バージョン管理、プロジェクトのコラボレーションを行いましょう。
この Colabノートブックでは、WandbMetricsLogger
コールバックを紹介します。このコールバックは 実験管理 に使用できます。これにより、トレーニングと検証のメトリクスとシステムメトリクスを Weights & Biases に記録します。
セットアップとインストール
まず、最新バージョンの Weights & Biases をインストールしましょう。次に、この Colabインスタンスを認証して W&B を使用できるようにします。
import os
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras import models
import tensorflow_datasets as tfds
# Weights and Biases 関連のインポート
import wandb
from wandb.integration.keras import WandbMetricsLogger
W&B を初めて使用する場合、またはログインしていない場合、wandb.login()
を実行した後に表示されるリンクはサインアップ/ログインページに導きます。無料アカウント のサインアップは数クリックで完了します。
ハイパーパラメーター
適切な設定システムの使用は、再現可能な機械学習の推奨ベストプラクティスです。W&B を使用して、実験ごとにハイパーパラメーターを追跡できます。この Colab では、シンプルな Python の dict
を設定システムとして使用します。
configs = dict(
num_classes=10,
shuffle_buffer=1024,
batch_size=64,
image_size=28,
image_channels=1,
earlystopping_patience=3,
learning_rate=1e-3,
epochs=10,
)
データセット
この Colab では、TensorFlow データセットカタログから CIFAR100 データセットを使用します。私たちの目標は、TensorFlow/Keras を使用してシンプルな画像分類パイプラインを構築することです。
train_ds, valid_ds = tfds.load("fashion_mnist", split=["train", "test"])
AUTOTUNE = tf.data.AUTOTUNE
def parse_data(example):
# 画像を取得する
image = example["image"]
# image = tf.image.convert_image_dtype(image, dtype=tf.float32)
# ラベルを取得する
label = example["label"]
label = tf.one_hot(label, depth=configs["num_classes"])
return image, label
def get_dataloader(ds, configs, dataloader_type="train"):
dataloader = ds.map(parse_data, num_parallel_calls=AUTOTUNE)
if dataloader_type == "train":
dataloader = dataloader.shuffle(configs["shuffle_buffer"])
dataloader = dataloader.batch(configs["batch_size"]).prefetch(AUTOTUNE)
return dataloader
trainloader = get_dataloader(train_ds, configs)
validloader = get_dataloader(valid_ds, configs, dataloader_type="valid")
モデル
def get_model(configs):
backbone = tf.keras.applications.mobilenet_v2.MobileNetV2(
weights="imagenet", include_top=False
)
backbone.trainable = False
inputs = layers.Input(
shape=(configs["image_size"], configs["image_size"], configs["image_channels"])
)
resize = layers.Resizing(32, 32)(inputs)
neck = layers.Conv2D(3, (3, 3), padding="same")(resize)
preprocess_input = tf.keras.applications.mobilenet.preprocess_input(neck)
x = backbone(preprocess_input)
x = layers.GlobalAveragePooling2D()(x)
outputs = layers.Dense(configs["num_classes"], activation="softmax")(x)
return models.Model(inputs=inputs, outputs=outputs)
tf.keras.backend.clear_session()
model = get_model(configs)
model.summary()
モデルのコンパイル
model.compile(
optimizer="adam",
loss="categorical_crossentropy",
metrics=[
"accuracy",
tf.keras.metrics.TopKCategoricalAccuracy(k=5, name="top@5_accuracy"),
],
)
トレーニング
# W&B の run を初期化する
run = wandb.init(project="intro-keras", config=configs)
# あなたのモデルをトレーニングする
model.fit(
trainloader,
epochs=configs["epochs"],
validation_data=validloader,
callbacks=[
WandbMetricsLogger(log_freq=10)
], # ここで WandbMetricsLogger を使用することに注意
)
# W&B の run を終了する
run.finish()
7.8 - Keras テーブル
機械学習の実験管理、データセットバージョン管理、およびプロジェクトコラボレーションに Weights & Biases を使用します。
この Colabノートブックでは、WandbEvalCallback
を紹介します。これは抽象的なコールバックで、モデル予測の可視化とデータセットの可視化に役立つコールバックを構築するために継承されます。
セットアップとインストール
まず、最新バージョンの Weights and Biases をインストールしましょう。その後、この Colab インスタンスを認証して W&B を利用できるようにします。
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras import models
import tensorflow_datasets as tfds
# Weights and Biases に関連するインポート
import wandb
from wandb.integration.keras import WandbMetricsLogger
from wandb.integration.keras import WandbModelCheckpoint
from wandb.integration.keras import WandbEvalCallback
もしこれが初めての W&B の使用であるかまだログインしていない場合、wandb.login()
を実行した後に表示されるリンクがサインアップ/ログインページに誘導します。無料アカウントへのサインアップは、数クリックで簡単です。
ハイパーパラメーター
適切なコンフィグシステムの使用は、再現可能な機械学習のための推奨ベストプラクティスです。W&B を使用して、各実験のハイパーパラメーターを管理することができます。この Colab では、シンプルな Python の dict
をコンフィグシステムとして使用します。
configs = dict(
num_classes=10,
shuffle_buffer=1024,
batch_size=64,
image_size=28,
image_channels=1,
earlystopping_patience=3,
learning_rate=1e-3,
epochs=10,
)
データセット
この Colab では、TensorFlow データセットカタログから CIFAR100 データセットを使用します。TensorFlow/Keras を使用して、シンプルな画像分類 パイプラインを構築することを目指します。
train_ds, valid_ds = tfds.load("fashion_mnist", split=["train", "test"])
AUTOTUNE = tf.data.AUTOTUNE
def parse_data(example):
# 画像を取得
image = example["image"]
# image = tf.image.convert_image_dtype(image, dtype=tf.float32)
# ラベルを取得
label = example["label"]
label = tf.one_hot(label, depth=configs["num_classes"])
return image, label
def get_dataloader(ds, configs, dataloader_type="train"):
dataloader = ds.map(parse_data, num_parallel_calls=AUTOTUNE)
if dataloader_type=="train":
dataloader = dataloader.shuffle(configs["shuffle_buffer"])
dataloader = (
dataloader
.batch(configs["batch_size"])
.prefetch(AUTOTUNE)
)
return dataloader
trainloader = get_dataloader(train_ds, configs)
validloader = get_dataloader(valid_ds, configs, dataloader_type="valid")
モデル
def get_model(configs):
backbone = tf.keras.applications.mobilenet_v2.MobileNetV2(
weights="imagenet", include_top=False
)
backbone.trainable = False
inputs = layers.Input(
shape=(configs["image_size"], configs["image_size"], configs["image_channels"])
)
resize = layers.Resizing(32, 32)(inputs)
neck = layers.Conv2D(3, (3, 3), padding="same")(resize)
preprocess_input = tf.keras.applications.mobilenet.preprocess_input(neck)
x = backbone(preprocess_input)
x = layers.GlobalAveragePooling2D()(x)
outputs = layers.Dense(configs["num_classes"], activation="softmax")(x)
return models.Model(inputs=inputs, outputs=outputs)
tf.keras.backend.clear_session()
model = get_model(configs)
model.summary()
モデルのコンパイル
model.compile(
optimizer="adam",
loss="categorical_crossentropy",
metrics=[
"accuracy",
tf.keras.metrics.TopKCategoricalAccuracy(k=5, name="top@5_accuracy"),
],
)
WandbEvalCallback
WandbEvalCallback
は主にモデル予測の可視化、そして二次的にはデータセットの可視化のための Keras コールバックを構築するための抽象基底クラスです。
これはデータセットやタスクに依存しない抽象コールバックです。これを使用するには、この基底コールバッククラスから継承し、add_ground_truth
と add_model_prediction
メソッドを実装します。
WandbEvalCallback
は以下のような便利なメソッドを提供するユーティリティクラスです:
- データと予測の
wandb.Table
インスタンスを作成、
wandb.Artifact
としてデータと予測テーブルをログ、
on_train_begin
にデータテーブルをログ、
on_epoch_end
に予測テーブルをログ。
例として、画像分類タスクのために WandbClfEvalCallback
を以下に実装しました。この例では:
- W&B にバリデーションデータ (
data_table
) をログ、
- 推論を行い、各エポックの終わりに W&B に予測 (
pred_table
) をログします。
メモリ使用量が削減される仕組み
on_train_begin
メソッドが呼び出される時に data_table
を W&B にログします。一度 W&B のアーティファクトとしてアップロードされると、このテーブルへの参照を取得できます。それはクラス変数 data_table_ref
を使用してアクセスできます。data_table_ref
は 2D リストで、self.data_table_ref[idx][n]
のようにインデックス付けできます。ここで idx
は行番号、n
は列番号です。以下の例で使用方法を見てみましょう。
class WandbClfEvalCallback(WandbEvalCallback):
def __init__(
self, validloader, data_table_columns, pred_table_columns, num_samples=100
):
super().__init__(data_table_columns, pred_table_columns)
self.val_data = validloader.unbatch().take(num_samples)
def add_ground_truth(self, logs=None):
for idx, (image, label) in enumerate(self.val_data):
self.data_table.add_data(idx, wandb.Image(image), np.argmax(label, axis=-1))
def add_model_predictions(self, epoch, logs=None):
# 予測を得る
preds = self._inference()
table_idxs = self.data_table_ref.get_index()
for idx in table_idxs:
pred = preds[idx]
self.pred_table.add_data(
epoch,
self.data_table_ref.data[idx][0],
self.data_table_ref.data[idx][1],
self.data_table_ref.data[idx][2],
pred,
)
def _inference(self):
preds = []
for image, label in self.val_data:
pred = self.model(tf.expand_dims(image, axis=0))
argmax_pred = tf.argmax(pred, axis=-1).numpy()[0]
preds.append(argmax_pred)
return preds
トレーニング
# W&B の run を初期化
run = wandb.init(project="intro-keras", config=configs)
# モデルをトレーニング
model.fit(
trainloader,
epochs=configs["epochs"],
validation_data=validloader,
callbacks=[
WandbMetricsLogger(log_freq=10),
WandbClfEvalCallback(
validloader,
data_table_columns=["idx", "image", "ground_truth"],
pred_table_columns=["epoch", "idx", "image", "ground_truth", "prediction"],
), # ここで WandbEvalCallback を使用
],
)
# W&B の run を終了
run.finish()
7.9 - Keras モデル
Weights & Biases を使用して機械学習の実験管理、データセットのバージョン管理、プロジェクトのコラボレーションを行いましょう。
この Colab ノートブックは WandbModelCheckpoint
コールバックを紹介します。このコールバックを使用して、モデルのチェックポイントを Weights & Biases Artifacts にログします。
セットアップとインストール
まず、最新バージョンの Weights & Biases をインストールします。次に、この colab インスタンスを認証して W&B を使用します。
!pip install -qq -U wandb
import os
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras import models
import tensorflow_datasets as tfds
# Weights & Biases に関連するインポート
import wandb
from wandb.integration.keras import WandbMetricsLogger
from wandb.integration.keras import WandbModelCheckpoint
もし初めて W&B を使用するか、ログインしていない場合、wandb.login()
を実行した後に表示されるリンクをクリックするとサインアップ/ログインページに移動します。無料アカウント の登録は数回のクリックで簡単に行えます。
ハイパーパラメーター
適切なコンフィグシステムの使用は、再現性のある機械学習のベストプラクティスとして推奨されます。各実験のハイパーパラメーターを W&B を使用して管理できます。この colab では、コンフィグシステムとしてシンプルな Python の dict
を使用します。
configs = dict(
num_classes = 10,
shuffle_buffer = 1024,
batch_size = 64,
image_size = 28,
image_channels = 1,
earlystopping_patience = 3,
learning_rate = 1e-3,
epochs = 10
)
データセット
この colab では、 TensorFlow データセットカタログから CIFAR100 データセットを使用します。TensorFlow/Keras でシンプルな画像分類パイプラインを構築することを目指します。
train_ds, valid_ds = tfds.load('fashion_mnist', split=['train', 'test'])
AUTOTUNE = tf.data.AUTOTUNE
def parse_data(example):
# 画像を取得
image = example["image"]
# image = tf.image.convert_image_dtype(image, dtype=tf.float32)
# ラベルを取得
label = example["label"]
label = tf.one_hot(label, depth=configs["num_classes"])
return image, label
def get_dataloader(ds, configs, dataloader_type="train"):
dataloader = ds.map(parse_data, num_parallel_calls=AUTOTUNE)
if dataloader_type=="train":
dataloader = dataloader.shuffle(configs["shuffle_buffer"])
dataloader = (
dataloader
.batch(configs["batch_size"])
.prefetch(AUTOTUNE)
)
return dataloader
trainloader = get_dataloader(train_ds, configs)
validloader = get_dataloader(valid_ds, configs, dataloader_type="valid")
モデル
def get_model(configs):
backbone = tf.keras.applications.mobilenet_v2.MobileNetV2(weights='imagenet', include_top=False)
backbone.trainable = False
inputs = layers.Input(shape=(configs["image_size"], configs["image_size"], configs["image_channels"]))
resize = layers.Resizing(32, 32)(inputs)
neck = layers.Conv2D(3, (3,3), padding="same")(resize)
preprocess_input = tf.keras.applications.mobilenet.preprocess_input(neck)
x = backbone(preprocess_input)
x = layers.GlobalAveragePooling2D()(x)
outputs = layers.Dense(configs["num_classes"], activation="softmax")(x)
return models.Model(inputs=inputs, outputs=outputs)
tf.keras.backend.clear_session()
model = get_model(configs)
model.summary()
モデルのコンパイル
model.compile(
optimizer = "adam",
loss = "categorical_crossentropy",
metrics = ["accuracy", tf.keras.metrics.TopKCategoricalAccuracy(k=5, name='top@5_accuracy')]
)
トレーニング
# W&B の run を初期化
run = wandb.init(
project = "intro-keras",
config = configs
)
# モデルをトレーニング
model.fit(
trainloader,
epochs = configs["epochs"],
validation_data = validloader,
callbacks = [
WandbMetricsLogger(log_freq=10),
WandbModelCheckpoint(filepath="models/") # ここで WandbModelCheckpoint を使用しています
]
)
# W&B の run を終了
run.finish()
7.10 - XGBoost Sweeps
Weights & Biases を使って機械学習の実験管理、データセットのバージョン管理、プロジェクトの協力を行いましょう。
ツリー型モデルから最良のパフォーマンスを引き出すには、適切なハイパーパラメーターを選択する必要があります。いくつの early_stopping_rounds
が必要でしょうか? ツリーの max_depth
はどのくらいにすべきですか?
高次元のハイパーパラメータ空間を検索して最適なモデルを見つけることは非常に困難です。ハイパーパラメーター探索は、モデルのバトルロイヤルを組織的かつ効率的に実施し、勝者を決定する方法を提供します。これにより、ハイパーパラメータの組み合わせを自動的に検索して、最も最適な値を見つけ出すことができます。
このチュートリアルでは、Weights & Biases を使用して XGBoost モデルで洗練されたハイパーパラメーター探索を 3 つの簡単なステップで実行する方法を紹介します。
興味を引くために、以下のプロットをチェックしてください:
Sweeps: 概要
Weights & Biases を使ったハイパーパラメーター探索の実行は非常に簡単です。以下の3つのシンプルなステップです:
-
スイープを定義する: スイープを定義するためには、スイープを構成するパラメータ、使用する検索戦略、最適化するメトリクスを指定する辞書のようなオブジェクトを作成します。
-
スイープを初期化する: 1 行のコードでスイープを初期化し、スイープの設定の辞書を渡します:
sweep_id = wandb.sweep(sweep_config)
-
スイープエージェントを実行する: これも 1 行のコードで、wandb.agent()
を呼び出し、sweep_id
とモデルアーキテクチャを定義してトレーニングする関数を渡します:
wandb.agent(sweep_id, function=train)
これだけでハイパーパラメーター探索を実行することができます。
以下のノートブックでは、これらの 3 ステップを詳細に説明します。
ぜひこのノートブックをフォークして、パラメータを調整したり、独自のデータセットでモデルを試してみてください。
リソース
import wandb
wandb.login()
1. スイープを定義する
Weights & Biases の Sweeps は、希望通りにスイープを設定するための強力なレバーを少ないコード行で提供します。スイープの設定は、辞書または YAML ファイルとして定義できます。
それらのいくつかを一緒に見ていきましょう:
- メトリック: これは、スイープが最適化しようとしているメトリックです。メトリクスは
name
(トレーニングスクリプトでログされるべきメトリック名) と goal
(maximize
か minimize
) を取ることができます。
- 検索戦略:
"method"
キーを使用して指定されます。スイープでは、いくつかの異なる検索戦略をサポートしています。
- グリッド検索: ハイパーパラメーター値のすべての組み合わせを反復します。
- ランダム検索: ランダムに選ばれたハイパーパラメーター値の組み合わせを反復します。
- ベイズ探索: ハイパーパラメーターをメトリクススコアの確率とマッピングする確率モデルを作成し、メトリクスを改善する高い確率のパラメータを選択します。ベイズ最適化の目的は、ハイパーパラメーター値の選択に時間をかけることですが、その代わりにより少ないハイパーパラメーター値を試すことを試みます。
- パラメータ: ハイパーパラメータ名、離散値、範囲、または各反復で値を取り出す分布を含む辞書です。
詳細については、すべてのスイープ設定オプションのリストを参照してください。
sweep_config = {
"method": "random", # try grid or random
"metric": {
"name": "accuracy",
"goal": "maximize"
},
"parameters": {
"booster": {
"values": ["gbtree","gblinear"]
},
"max_depth": {
"values": [3, 6, 9, 12]
},
"learning_rate": {
"values": [0.1, 0.05, 0.2]
},
"subsample": {
"values": [1, 0.5, 0.3]
}
}
}
2. スイープを初期化する
wandb.sweep
を呼び出すとスイープコントローラー、つまり parameters
の設定を問い合わせるすべての者に提供し、metrics
のパフォーマンスを wandb
ログを介して返すことを期待する集中プロセスが開始されます。
sweep_id = wandb.sweep(sweep_config, project="XGBoost-sweeps")
トレーニングプロセスを定義する
スイープを実行する前に、モデルを作成してトレーニングする関数を定義する必要があります。
この関数は、ハイパーパラメーター値を取り込み、メトリクスを出力するものです。
また、スクリプト内に wandb
を統合する必要があります。
主なコンポーネントは3つです:
wandb.init()
: 新しい W&B run を初期化します。各 run はトレーニングスクリプトの単一の実行です。
wandb.config
: すべてのハイパーパラメーターを設定 オブジェクトに保存します。これにより、私たちのアプリを使用して、ハイパーパラメーター値ごとに run をソートおよび比較することができます。
wandb.log()
: 画像、ビデオ、オーディオファイル、HTML、プロット、またはポイントクラウドなどのメトリクスとカスタムオブジェクトをログします。
また、データをダウンロードする必要があります:
!wget https://raw.githubusercontent.com/jbrownlee/Datasets/master/pima-indians-diabetes.data.csv
# Pima Indians データセット用の XGBoost モデル
from numpy import loadtxt
from xgboost import XGBClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
# データを読み込む
def train():
config_defaults = {
"booster": "gbtree",
"max_depth": 3,
"learning_rate": 0.1,
"subsample": 1,
"seed": 117,
"test_size": 0.33,
}
wandb.init(config=config_defaults) # スイープ中にデフォルトが上書きされる
config = wandb.config
# データを読み込み、予測変数とターゲットに分ける
dataset = loadtxt("pima-indians-diabetes.data.csv", delimiter=",")
X, Y = dataset[:, :8], dataset[:, 8]
# データをトレインセットとテストセットに分割
X_train, X_test, y_train, y_test = train_test_split(X, Y,
test_size=config.test_size,
random_state=config.seed)
# トレインセットでモデルを適合させる
model = XGBClassifier(booster=config.booster, max_depth=config.max_depth,
learning_rate=config.learning_rate, subsample=config.subsample)
model.fit(X_train, y_train)
# テストセットで予測を行う
y_pred = model.predict(X_test)
predictions = [round(value) for value in y_pred]
# 予測を評価する
accuracy = accuracy_score(y_test, predictions)
print(f"Accuracy: {accuracy:.0%}")
wandb.log({"accuracy": accuracy})
3. エージェントでスイープを実行する
次に、wandb.agent
を呼び出してスイープを起動します。
wandb.agent
は W&B にログインしているすべてのマシンで呼び出すことができ、
sweep_id
- データセットと
train
関数
があるので、そのマシンはスイープに参加します。
注意: random
スイープはデフォルトでは永遠に実行され、新しいパラメータの組み合わせを試し続けます。しかし、それはアプリの UI からスイープをオフにするまでです。完了する run の合計 count
を agent
に指定することで、この動作を防ぐことができます。
wandb.agent(sweep_id, train, count=25)
結果を可視化する
スイープが終了したら、結果を確認します。
Weights & Biases は、あなたのために便利なプロットをいくつか自動的に生成します。
パラレル座標プロット
このプロットは、ハイパーパラメーター値をモデルのメトリクスにマッピングします。最良のモデルパフォーマンスをもたらしたハイパーパラメーターの組み合わせに焦点を当てるのに役立ちます。
このプロットは、単純な線形モデルを学習者として使用するよりも、ツリーを学習者として使用する方がやや優れていることを示しているようです。
ハイパーパラメーターの重要度プロット
ハイパーパラメーターの重要度プロットは、メトリクスに最も大きな影響を与えたハイパーパラメーター値を示しています。
相関関係(線形予測子として扱う)と特徴量の重要性(結果に基づいてランダムフォレストをトレーニングした後) の両方を報告しますので、どのパラメータが最も大きな影響を与えたか、そしてその影響がどちらの方向であったかを確認できます。
このチャートを読むと、上記のパラレル座標チャートで気づいた傾向の定量的な確認が得られます。検証精度に最大の影響を与えたのは学習者の選択であり、gblinear
学習者は一般的に gbtree
学習者よりも劣っていました。
これらの可視化は、最も重要で、さらに探索する価値のあるパラメータ(とその値の範囲)に焦点を当てることにより、高価なハイパーパラメーターの最適化を実行する時間とリソースを節約するのに役立ちます。