これは、このセクションの複数ページの印刷可能なビューです。 印刷するには、ここをクリックしてください.
インテグレーションチュートリアル
- 1: PyTorch
- 2: PyTorch Lightning
- 3: Hugging Face
- 4: TensorFlow
- 5: TensorFlow スイープ
- 6: 3D 脳腫瘍セグメンテーション with MONAI
- 7: Keras
- 8: Keras テーブル
- 9: Keras モデル
- 10: XGBoost Sweeps
1 - PyTorch
Weights & Biases を使用して、機械学習の実験管理、データセットのバージョン管理、およびプロジェクトのコラボレーションを行います。

このノートブックでカバーする内容
PyTorch のコードに Weights & Biases を統合して、実験管理をパイプラインに追加する方法を示します。

# ライブラリをインポート
import wandb
# 新しい実験を開始
wandb.init(project="new-sota-model")
# ハイパーパラメータの辞書を config でキャプチャ
wandb.config = {"learning_rate": 0.001, "epochs": 100, "batch_size": 128}
# モデルとデータを設定
model, dataloader = get_model(), get_data()
# オプション: 勾配を追跡
wandb.watch(model)
for batch in dataloader:
metrics = model.training_step()
# トレーニングループ内でメトリクスをログしてモデルの性能を視覚化
wandb.log(metrics)
# オプション: モデルを最後に保存
model.to_onnx()
wandb.save("model.onnx")
ビデオチュートリアルに沿って進めましょう。
注意: Step で始まるセクションは、既存のパイプラインに W&B を統合するために必要な部分です。それ以外はデータを読み込み、モデルを定義するだけです。
インストール、インポート、ログイン
import os
import random
import numpy as np
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
from tqdm.auto import tqdm
# 決定論的な振る舞いを保証
torch.backends.cudnn.deterministic = True
random.seed(hash("setting random seeds") % 2**32 - 1)
np.random.seed(hash("improves reproducibility") % 2**32 - 1)
torch.manual_seed(hash("by removing stochasticity") % 2**32 - 1)
torch.cuda.manual_seed_all(hash("so runs are repeatable") % 2**32 - 1)
# デバイスの設定
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# MNIST ミラーのリストから遅いミラーを削除
torchvision.datasets.MNIST.mirrors = [mirror for mirror in torchvision.datasets.MNIST.mirrors
if not mirror.startswith("http://yann.lecun.com")]
ステップ 0: W&B をインストール
まずは、このライブラリを入手する必要があります。
wandb
は pip
を使用して簡単にインストールできます。
!pip install wandb onnx -Uq
ステップ 1: W&B をインポートしてログイン
データをウェブサービスにログするためには、 ログインが必要です。
W&B を初めて使用する場合は、 表示されるリンクから無料アカウントに登録する必要があります。
import wandb
wandb.login()
実験とパイプラインを定義
wandb.init
でメタデータとハイパーパラメータを追跡
プログラム上、最初に行うのは実験を定義することです。 ハイパーパラメータとは何か?この run に関連するメタデータは何か?
この情報を config
辞書(または類似オブジェクト)に保存し、
必要に応じてアクセスするのは非常に一般的なワークフローです。
この例では、数少ないハイパーパラメータのみを変動させ、
残りを手動でコーディングします。
しかし、あなたのモデルのどの部分も config
の一部にすることができます。
また、いくつかのメタデータも含めます:私たちは MNIST データセットと畳み込み ニューラルネットワークを使用しているので、 同じプロジェクトで、たとえば全結合ニューラルネットワークで CIFAR を扱う場合、 この情報が run を分ける助けになります。
config = dict(
epochs=5,
classes=10,
kernels=[16, 32],
batch_size=128,
learning_rate=0.005,
dataset="MNIST",
architecture="CNN")
さて、全体のパイプラインを定義しましょう。 これは非常に典型的なモデルトレーニングの手順です:
- まず、モデル、関連するデータ、オプティマイザーを
make
し、 - それに基づいてモデルを
train
し、最後に - トレーニングがどのように行われたかを確認するために
test
します。
これらの関数を以下で実装します。
def model_pipeline(hyperparameters):
# wandb に開始を伝えます
with wandb.init(project="pytorch-demo", config=hyperparameters):
# すべての HP を wandb.config 経由でアクセスし、ログが実行と一致するようにします。
config = wandb.config
# モデル、データ、最適化問題を作成します
model, train_loader, test_loader, criterion, optimizer = make(config)
print(model)
# それらを使用してモデルをトレーニングします
train(model, train_loader, criterion, optimizer, config)
# 最終的なパフォーマンスをテストします
test(model, test_loader)
return model
ここでの標準パイプラインとの唯一の違いは、
すべてが wandb.init
のコンテキスト内で行われる点です。
この関数を呼び出すことで、
コードとサーバーとの間に通信のラインが確立されます。
config
辞書を wandb.init
に渡すことで、
すべての情報が即座にログされるため、
実験に使用するハイパーパラメータの値を常に把握できます。
選んでログした値が常にモデルに使用されることを保証するために、
wandb.config
のオブジェクトコピーを使用することをお勧めします。
以下で make
の定義をチェックして、いくつかの例を見てください。
サイドノート: コードを別々のプロセスで実行するようにします、 私たちの側で問題があっても (たとえば、巨大な海の怪物がデータセンターを攻撃した場合でも) コードはクラッシュしません。 問題が解決したら、クラーケンが深海に戻る頃に、
wandb sync
でデータをログできます。
def make(config):
# データを作成
train, test = get_data(train=True), get_data(train=False)
train_loader = make_loader(train, batch_size=config.batch_size)
test_loader = make_loader(test, batch_size=config.batch_size)
# モデルを作成
model = ConvNet(config.kernels, config.classes).to(device)
# 損失とオプティマイザーを作成
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(
model.parameters(), lr=config.learning_rate)
return model, train_loader, test_loader, criterion, optimizer
データのロードとモデルを定義
次に、データがどのようにロードされるか、およびモデルの見た目を指定する必要があります。
これは非常に重要な部分ですが、wandb
なしで通常行われるのと何も違いませんので、
ここで詳しくは説明しません。
def get_data(slice=5, train=True):
full_dataset = torchvision.datasets.MNIST(root=".",
train=train,
transform=transforms.ToTensor(),
download=True)
# [::slice] でスライスするのと同等
sub_dataset = torch.utils.data.Subset(
full_dataset, indices=range(0, len(full_dataset), slice))
return sub_dataset
def make_loader(dataset, batch_size):
loader = torch.utils.data.DataLoader(dataset=dataset,
batch_size=batch_size,
shuffle=True,
pin_memory=True, num_workers=2)
return loader
モデルの定義は通常楽しい部分です。
しかし wandb
でも何も変わらないので、
標準的な ConvNet アーキテクチャーにとどまりましょう。
この部分をいじっていくつかの実験を試みるのを恐れないでください – あなたの結果はすべて wandb.ai にログされます。
# 従来の畳み込みニューラルネットワーク
class ConvNet(nn.Module):
def __init__(self, kernels, classes=10):
super(ConvNet, self).__init__()
self.layer1 = nn.Sequential(
nn.Conv2d(1, kernels[0], kernel_size=5, stride=1, padding=2),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2))
self.layer2 = nn.Sequential(
nn.Conv2d(16, kernels[1], kernel_size=5, stride=1, padding=2),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2))
self.fc = nn.Linear(7 * 7 * kernels[-1], classes)
def forward(self, x):
out = self.layer1(x)
out = self.layer2(out)
out = out.reshape(out.size(0), -1)
out = self.fc(out)
return out
トレーニングロジックを定義
model_pipeline
を進めると、train
の指定を行う時がきます。
ここでは wandb
の 2 つの関数、watch
と log
が活躍します。
wandb.watch
で勾配を追跡し、他のすべてを wandb.log
で管理
wandb.watch
はトレーニングの log_freq
ステップごとに
モデルの勾配とパラメータをログします。
トレーニングを開始する前にこれを呼び出すだけで済みます。
トレーニングコードの残りは前と変わらず、
エポックとバッチを繰り返し、
forward pass と backward pass を実行し、
optimizer
を適用します。
def train(model, loader, criterion, optimizer, config):
# モデルを追跡している wandb に情報を伝えます: 勾配、重み、その他。
wandb.watch(model, criterion, log="all", log_freq=10)
# トレーニングを実行し wandb で追跡
total_batches = len(loader) * config.epochs
example_ct = 0 # これまでに見た例の数
batch_ct = 0
for epoch in tqdm(range(config.epochs)):
for _, (images, labels) in enumerate(loader):
loss = train_batch(images, labels, model, optimizer, criterion)
example_ct += len(images)
batch_ct += 1
# 25 バッチおきにメトリクスを報告
if ((batch_ct + 1) % 25) == 0:
train_log(loss, example_ct, epoch)
def train_batch(images, labels, model, optimizer, criterion):
images, labels = images.to(device), labels.to(device)
# Forward pass ➡
outputs = model(images)
loss = criterion(outputs, labels)
# Backward pass ⬅
optimizer.zero_grad()
loss.backward()
# オプティマイザーでステップ
optimizer.step()
return loss
唯一の違いは、記録のコードです:
以前ターミナルにメトリクスを報告していたところを、
wandb.log
に同じ情報を渡します。
wandb.log
は文字列をキーとする辞書を期待します。
これらの文字列は、ログされるオブジェクトを識別し、値を構成します。
また、オプションでトレーニングのどの step
にいるかをログできます。
サイドノート: 私はモデルが見た例の数を使用するのが好きです、 これはバッチサイズを超えて比較しやすくなるためですが、 生のステップやバッチカウントを使用することもできます。 長いトレーニングランの場合、
epoch
単位でログすることも理にかなっているかもしれません。
def train_log(loss, example_ct, epoch):
# マジックが起こるところ
wandb.log({"epoch": epoch, "loss": loss}, step=example_ct)
print(f"Loss after {str(example_ct).zfill(5)} examples: {loss:.3f}")
テストロジックを定義
モデルのトレーニングが完了したら、テストしてみましょう: 本番環境から新たなデータに対して実行したり、 手作業で準備した例に適用したりします。
(オプション) wandb.save
を呼び出す
この時点で、モデルのアーキテクチャと最終パラメータをディスクに保存するのも良い時です。 最大の互換性を考慮して、モデルを Open Neural Network eXchange (ONNX) フォーマット でエクスポートします。
そのファイル名を wandb.save
に渡すことで、モデルのパラメータが W&B のサーバーに保存されます:
もはやどの .h5
や .pb
がどのトレーニングrunに対応しているのかを見失うことはありません。
モデルの保存、バージョン管理、配布のための、高度な wandb
機能については、
Artifacts ツールをご覧ください。
def test(model, test_loader):
model.eval()
# いくつかのテスト例にモデルを実行
with torch.no_grad():
correct, total = 0, 0
for images, labels in test_loader:
images, labels = images.to(device), labels.to(device)
outputs = model(images)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
print(f"Accuracy of the model on the {total} " +
f"test images: {correct / total:%}")
wandb.log({"test_accuracy": correct / total})
# 交換可能な ONNX フォーマットでモデルを保存
torch.onnx.export(model, images, "model.onnx")
wandb.save("model.onnx")
トレーニングを実行し、wandb.ai でライブメトリクスを確認
さて、全体のパイプラインを定義し、数行の W&B コードを追加したら、 完全に追跡された実験を実行する準備が整いました。
いくつかのリンクを報告します: ドキュメンテーション、 プロジェクトページ、これにはプロジェクトのすべての run が整理されています。 この run の結果が保存されるランページ。
ランページに移動して、これらのタブを確認:
- Charts、トレーニング中にモデルの勾配、パラメーター値、損失がログされます
- System、ここにはディスク I/O 応答率、CPU および GPU メトリクス(温度上昇も監視)、その他のさまざまなシステムメトリクスが含まれます
- Logs、トレーニング中に標準出力にプッシュされたもののコピーがあります
- Files、トレーニングが完了すると、
model.onnx
をクリックして Netron モデルビューア でネットワークを表示できます。
with wandb.init
ブロック終了時にランが終了すると、
結果の要約もセルの出力で印刷されます。
# パイプラインでモデルを構築、トレーニング、分析
model = model_pipeline(config)
ハイパーパラメータをスイープでテスト
この例では、ハイパーパラメータのセットを 1 つしか見ていません。 しかし、ほとんどの ML ワークフローの重要な部分は、 多くのハイパーパラメータを反復することです。
Weights & Biases Sweeps を使用してハイパーパラメータのテストを自動化し、モデルと最適化戦略の空間を探索できます。
W&B Sweeps を使用した PyTorch におけるハイパーパラメータ最適化をチェック
Weights & Biases を使用したハイパーパラメータ探索です。非常に簡単です。たった3つの簡単なステップがあります:
-
スイープを定義する: これは、検索するパラメータ、検索戦略、最適化メトリクスなどを指定する辞書またはYAML ファイルを作成することで行います。
-
スイープを初期化する:
sweep_id = wandb.sweep(sweep_config)
-
スイープエージェントを実行する:
wandb.agent(sweep_id, function=train)
これだけでハイパーパラメータ探索が実行できます。

Example Gallery
W&B で追跡および視覚化されたプロジェクトの例をギャラリー →でご覧ください。
Advanced Setup
2 - PyTorch Lightning
私たちは PyTorch Lightning を使用して画像分類パイプラインを構築します。このスタイルガイドに従って、コードの読みやすさと再現性を高めます。このすばらしい説明はこちらで利用可能です。PyTorch Lightning と W&B のセットアップ
このチュートリアルでは、PyTorch Lightning と Weights & Biases が必要です。
pip install lightning -q
pip install wandb -qU
import lightning.pytorch as pl
# あなたのお気に入りの機械学習トラッキングツール
from lightning.pytorch.loggers import WandbLogger
import torch
from torch import nn
from torch.nn import functional as F
from torch.utils.data import random_split, DataLoader
from torchmetrics import Accuracy
from torchvision import transforms
from torchvision.datasets import CIFAR10
import wandb
これで、wandb アカウントにログインする必要があります。
wandb.login()
DataModule - 私たちが求めるデータパイプライン
DataModules は、データに関連するフックを LightningModule から分離する方法であり、データセットに依存しないモデルを開発できます。
これは、データパイプラインを1つの共有可能で再利用可能なクラスにまとめます。データモジュールは PyTorch のデータプロセッシングに関わる5つのステップをカプセル化します:
- ダウンロード / トークン化 / プロセス。
- クリーンし、(場合によっては)ディスクに保存。
- データセット内にロード。
- 変換を適用(回転、トークン化など)。
- DataLoader 内にラップ。
データモジュールについて詳しくはこちらをご覧ください。Cifar-10 データセット用のデータモジュールを構築しましょう。
class CIFAR10DataModule(pl.LightningDataModule):
def __init__(self, batch_size, data_dir: str = './'):
super().__init__()
self.data_dir = data_dir
self.batch_size = batch_size
self.transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])
self.num_classes = 10
def prepare_data(self):
CIFAR10(self.data_dir, train=True, download=True)
CIFAR10(self.data_dir, train=False, download=True)
def setup(self, stage=None):
# データローダーで使用する train/val データセットを割り当て
if stage == 'fit' or stage is None:
cifar_full = CIFAR10(self.data_dir, train=True, transform=self.transform)
self.cifar_train, self.cifar_val = random_split(cifar_full, [45000, 5000])
# データローダーで使用するテストデータセットを割り当て
if stage == 'test' or stage is None:
self.cifar_test = CIFAR10(self.data_dir, train=False, transform=self.transform)
def train_dataloader(self):
return DataLoader(self.cifar_train, batch_size=self.batch_size, shuffle=True)
def val_dataloader(self):
return DataLoader(self.cifar_val, batch_size=self.batch_size)
def test_dataloader(self):
return DataLoader(self.cifar_test, batch_size=self.batch_size)
コールバック
コールバックは、プロジェクト間で再利用可能な自己完結型プログラムです。PyTorch Lightning は、定期的に使用されるいくつかの組み込みコールバックを提供しています。 PyTorch Lightning のコールバックについて詳しくはこちらをご覧ください。
組み込みコールバック
このチュートリアルでは、Early Stopping と Model Checkpoint の組み込みコールバックを使用します。それらは Trainer
に渡すことができます。
カスタムコールバック
カスタム Keras コールバックに慣れている場合、PyTorch パイプラインで同じことができる能力は、まさにケーキの上のさくらんぼです。
画像分類を実行しているため、モデルのいくつかの画像サンプルに対する予測を視覚化する能力は役立つかもしれません。このコールバックの形式で提供されることで、モデルを早期段階でデバッグするのに役立ちます。
class ImagePredictionLogger(pl.callbacks.Callback):
def __init__(self, val_samples, num_samples=32):
super().__init__()
self.num_samples = num_samples
self.val_imgs, self.val_labels = val_samples
def on_validation_epoch_end(self, trainer, pl_module):
# テンソルを CPU に移動
val_imgs = self.val_imgs.to(device=pl_module.device)
val_labels = self.val_labels.to(device=pl_module.device)
# モデル予測を取得
logits = pl_module(val_imgs)
preds = torch.argmax(logits, -1)
# wandb Image として画像をログ
trainer.logger.experiment.log({
"examples":[wandb.Image(x, caption=f"Pred:{pred}, Label:{y}")
for x, pred, y in zip(val_imgs[:self.num_samples],
preds[:self.num_samples],
val_labels[:self.num_samples])]
})
LightningModule - システムの定義
LightningModule はシステムを定義し、モデルではありません。ここでシステムはすべての研究コードを1つのクラスにまとめて自己完結型にします。LightningModule
は PyTorch コードを5つのセクションに整理します:
- 計算(
__init__
) - トレーニングループ(
training_step
) - 検証ループ(
validation_step
) - テストループ(
test_step
) - オプティマイザー(
configure_optimizers
)
このようにして、容易に共有できるデータセットに依存しないモデルを構築できます。Cifar-10 分類のためのシステムを構築しましょう。
class LitModel(pl.LightningModule):
def __init__(self, input_shape, num_classes, learning_rate=2e-4):
super().__init__()
# ハイパーパラメーターをログ
self.save_hyperparameters()
self.learning_rate = learning_rate
self.conv1 = nn.Conv2d(3, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 32, 3, 1)
self.conv3 = nn.Conv2d(32, 64, 3, 1)
self.conv4 = nn.Conv2d(64, 64, 3, 1)
self.pool1 = torch.nn.MaxPool2d(2)
self.pool2 = torch.nn.MaxPool2d(2)
n_sizes = self._get_conv_output(input_shape)
self.fc1 = nn.Linear(n_sizes, 512)
self.fc2 = nn.Linear(512, 128)
self.fc3 = nn.Linear(128, num_classes)
self.accuracy = Accuracy(task='multiclass', num_classes=num_classes)
# convブロックからLinear層に渡される出力テンソルのサイズを返します。
def _get_conv_output(self, shape):
batch_size = 1
input = torch.autograd.Variable(torch.rand(batch_size, *shape))
output_feat = self._forward_features(input)
n_size = output_feat.data.view(batch_size, -1).size(1)
return n_size
# convブロックからの特徴テンソルを返します
def _forward_features(self, x):
x = F.relu(self.conv1(x))
x = self.pool1(F.relu(self.conv2(x)))
x = F.relu(self.conv3(x))
x = self.pool2(F.relu(self.conv4(x)))
return x
# 推論中に使用されます
def forward(self, x):
x = self._forward_features(x)
x = x.view(x.size(0), -1)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = F.log_softmax(self.fc3(x), dim=1)
return x
def training_step(self, batch, batch_idx):
x, y = batch
logits = self(x)
loss = F.nll_loss(logits, y)
# トレーニングメトリクス
preds = torch.argmax(logits, dim=1)
acc = self.accuracy(preds, y)
self.log('train_loss', loss, on_step=True, on_epoch=True, logger=True)
self.log('train_acc', acc, on_step=True, on_epoch=True, logger=True)
return loss
def validation_step(self, batch, batch_idx):
x, y = batch
logits = self(x)
loss = F.nll_loss(logits, y)
# 検証メトリクス
preds = torch.argmax(logits, dim=1)
acc = self.accuracy(preds, y)
self.log('val_loss', loss, prog_bar=True)
self.log('val_acc', acc, prog_bar=True)
return loss
def test_step(self, batch, batch_idx):
x, y = batch
logits = self(x)
loss = F.nll_loss(logits, y)
# 検証メトリクス
preds = torch.argmax(logits, dim=1)
acc = self.accuracy(preds, y)
self.log('test_loss', loss, prog_bar=True)
self.log('test_acc', acc, prog_bar=True)
return loss
def configure_optimizers(self):
optimizer = torch.optim.Adam(self.parameters(), lr=self.learning_rate)
return optimizer
トレーニングと評価
DataModule
を使用してデータパイプラインを整理し、 LightningModule
を使用してモデルアーキテクチャ+トレーニングループを整理したので、PyTorch Lightning の Trainer
が他のすべてを自動化します。
Trainer は次のことを自動化します:
- エポックとバッチの反復
optimizer.step()
,backward
,zero_grad()
の呼び出し.eval()
の呼び出し、グラッドの有効化/無効化- 重みの保存と読み込み
- Weights & Biases ログ
- マルチ GPU トレーニングサポート
- TPU サポート
- 16 ビットトレーニングサポート
dm = CIFAR10DataModule(batch_size=32)
# x_dataloader にアクセスするには、prepare_data および setup を呼び出す必要があります。
dm.prepare_data()
dm.setup()
# 画像予測をログするカスタム ImagePredictionLogger コールバックに必要なサンプル。
val_samples = next(iter(dm.val_dataloader()))
val_imgs, val_labels = val_samples[0], val_samples[1]
val_imgs.shape, val_labels.shape
model = LitModel((3, 32, 32), dm.num_classes)
# wandb ロガーを初期化
wandb_logger = WandbLogger(project='wandb-lightning', job_type='train')
# コールバックを初期化
early_stop_callback = pl.callbacks.EarlyStopping(monitor="val_loss")
checkpoint_callback = pl.callbacks.ModelCheckpoint()
# トレーナーを初期化
trainer = pl.Trainer(max_epochs=2,
logger=wandb_logger,
callbacks=[early_stop_callback,
ImagePredictionLogger(val_samples),
checkpoint_callback],
)
# モデルのトレーニング
trainer.fit(model, dm)
# 保留中のテストセットでモデルを評価 ⚡⚡
trainer.test(dataloaders=dm.test_dataloader())
# wandb run を閉じる
wandb.finish()
最終的な考え
私は TensorFlow/Keras エコシステムから来ており、PyTorch は洗練されたフレームワークであるにもかかわらず、ちょっと難しいと感じています。個人的な経験にすぎませんが。PyTorch Lightning を探索して、私が PyTorch から遠ざけていた理由のほとんどが解消されていることに気づきました。ここに私の興奮の概要があります:
- 以前: 従来の PyTorch モデル定義はバラバラでした。モデルは
model.py
スクリプトに、トレーニングループはtrain.py
ファイルにありました。パイプラインを理解するためには多くの見直しが必要でした。 - 現在:
LightningModule
は、モデルがtraining_step
、validation_step
などと共に定義されているシステムとして機能します。今ではモジュール化され、共有可能です。 - 以前: TensorFlow/Keras の最良の部分は入力データパイプラインでした。彼らのデータセットカタログは豊富で成長しています。PyTorch のデータパイプラインは、かつて最大の痛点でした。通常の PyTorch コードでは、データのダウンロード/クリーニング/準備は通常、多くのファイルに分散しています。
- 現在: DataModule は、データパイプラインを1つの共有可能で再利用可能なクラスに組織します。それは単に
train_dataloader
、val_dataloader
(s)、test_dataloader
(s) と、必要な変換やデータプロセッシング/ダウンロードステップの集まりです。 - 以前: Keras では
model.fit
を呼び出してモデルをトレーニングし、model.predict
で推論を実行することができました。model.evaluate
は、テストデータに基づく昔ながらのシンプルな評価を提供しましたが、これは PyTorch ではありませんでした。通常、別々のtrain.py
およびtest.py
ファイルが見つかります。 - 現在:
LightningModule
が整備されることで、Trainer
がすべてを自動化します。ただtrainer.fit
とtrainer.test
を呼び出してモデルをトレーニングと評価すればよいのです。 - 以前: TensorFlow は TPU を好む、PyTorch は…
- 現在: PyTorch Lightning では、複数の GPU で同じモデルをトレーニングするのがとても簡単ですし、TPU でも可能です。
- 以前: 私はコールバックの大ファンで、カスタムコールバックを書くことを好んでいます。従来の PyTorch では、Early Stopping のような些細なことが議論の対象になることがありました。
- 現在: PyTorch Lightning を使用すると、Early Stopping と Model Checkpointing が簡単です。カスタムコールバックを書くことさえもできます。
🎨 結論とリソース
このレポートが役に立つことを願っています。コードを試して、好きなデータセットで画像分類器をトレーニングすることをお勧めします。
PyTorch Lightningについてもっと学ぶためのリソース:
- ステップバイステップのガイド - これは公式のチュートリアルの1つです。そのドキュメントは非常によく書かれており、良い学習リソースとして強くお勧めします。
- Weighs & Biases で Pytorch Lightning を使う - W&B を PyTorch Lightning で使用する方法について学ぶために実行できる短い colab です。
3 - Hugging Face

ハイパーパラメーター、アウトプットメトリクス、GPU利用率などのシステム統計をモデル間で比較します。
なぜW&Bを使うべきか?

- 統一されたダッシュボード: モデルのすべてのメトリクスと予測のための中央リポジトリ
- 軽量: Hugging Faceとのインテグレーションにコード変更は不要
- アクセス可能: 個人や学術チームには無料
- セキュア: すべてのプロジェクトはデフォルトでプライベート
- 信頼性: OpenAI、トヨタ、Lyftなどの機械学習チームで使用されている
W&Bを機械学習モデル用のGitHubのように考えてください。プライベートでホストされたダッシュボードに機械学習の実験管理を保存します。スクリプトをどこで実行しても、モデルのすべてのバージョンが保存されることを確信して、素早く実験できます。
W&Bの軽量なインテグレーションは、任意のPythonスクリプトで動作し、モデルのトラッキングと可視化を開始するには無料のW&Bアカウントにサインアップするだけです。
Hugging Face Transformersレポジトリでは、Trainingと評価メトリクスを各ログステップでW&Bに自動的にログするようにTrainerを設定しました。
インテグレーションの仕組みを詳しく見るにはこちら: Hugging Face + W&B Report
インストール、インポート、ログイン
このチュートリアルのためにHugging FaceとWeights & Biasesのライブラリ、GLUEデータセット、トレーニングスクリプトをインストールします。
- Hugging Face Transformers: 自然言語モデルとデータセット
- Weights & Biases: 実験管理と可視化
- GLUE dataset: 言語理解ベンチマークデータセット
- GLUE script: シーケンス分類用モデルのトレーニングスクリプト
!pip install datasets wandb evaluate accelerate -qU
!wget https://raw.githubusercontent.com/huggingface/transformers/refs/heads/main/examples/pytorch/text-classification/run_glue.py
# run_glue.pyスクリプトはtransformers devを必要とします
!pip install -q git+https://github.com/huggingface/transformers
続行する前に、無料アカウントにサインアップしてください。
APIキーを入力
サインアップしたら、次のセルを実行してリンクをクリックし、APIキーを取得してこのノートブックを認証してください。
import wandb
wandb.login()
オプションで、W&Bロギングをカスタマイズするために環境変数を設定できます。ドキュメントを参照してください。
# オプション: 勾配とパラメータの両方をログします
%env WANDB_WATCH=all
モデルをトレーニング
次に、ダウンロードしたトレーニングスクリプト run_glue.py を呼び出し、トレーニングがWeights & Biasesダッシュボードに自動的にトラックされるのを確認します。このスクリプトは、Microsoft Research Paraphrase CorpusでBERTをファインチューンし、意味的に同等であることを示す人間の注釈付きの文のペアを使用します。
%env WANDB_PROJECT=huggingface-demo
%env TASK_NAME=MRPC
!python run_glue.py \
--model_name_or_path bert-base-uncased \
--task_name $TASK_NAME \
--do_train \
--do_eval \
--max_seq_length 256 \
--per_device_train_batch_size 32 \
--learning_rate 2e-4 \
--num_train_epochs 3 \
--output_dir /tmp/$TASK_NAME/ \
--overwrite_output_dir \
--logging_steps 50
ダッシュボードで結果を可視化
上記で印刷されたリンクをクリックするか、wandb.ai にアクセスして、結果がリアルタイムでストリームされるのを確認してください。ブラウザでrunを表示するリンクは、すべての依存関係がロードされた後に表示されます。次のような出力を探します: “wandb: 🚀 View run at [URL to your unique run]”
モデルのパフォーマンスを可視化 数十の実験管理を一目で確認し、興味深い学びにズームインし、高次元のデータを可視化するのは簡単です。

アーキテクチャーを比較 こちらは BERT vs DistilBERT を比較する例です。異なるアーキテクチャーがトレーニング中の評価精度にどのように影響するかを、自動ラインプロット可視化で簡単に確認できます。

重要な情報をデフォルトで簡単にトラック
Weights & Biasesは、各実験で新しいrunを保存します。デフォルトで保存される情報は次の通りです:
- ハイパーパラメーター: モデルの設定がConfigに保存されます
- モデルメトリクス: ストリーミングメトリクスの時系列データはLogに保存されます
- ターミナルログ: コマンドラインの出力は保存され、タブで利用可能です
- システムメトリクス: GPUとCPUの使用率、メモリ、温度など
詳しく知る
- ドキュメント: Weights & BiasesとHugging Faceのインテグレーションに関するドキュメント
- ビデオ: YouTubeチャンネルでのチュートリアル、実務者とのインタビュー、その他
- お問い合わせ: contact@wandb.com までご質問をお寄せください
4 - TensorFlow
このノートブックでカバーする内容
- Weights & Biases と TensorFlow パイプラインの簡単なインテグレーションによる実験管理。
keras.metrics
を使用したメトリクスの計算wandb.log
を使用して、カスタムトレーニングループでこれらのメトリクスをログに記録する方法。

注: ステップ から始まるセクションは、既存のコードに W&B を統合するために必要なすべてです。それ以外は通常の MNIST の例です。
import os
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.datasets import cifar10
インストール、インポート、ログイン
W&B のインストール
%%capture
!pip install wandb
W&B のインポートとログイン
import wandb
from wandb.integration.keras import WandbMetricsLogger
wandb.login()
サイドノート: もしこれが初めて W&B を使う場合や、まだログインしていない場合、
wandb.login()
実行後に表示されるリンクはサインアップ/ログインページに移動します。サインアップはワンクリックで簡単です。
データセットの準備
# トレーニング用データセットの準備
BATCH_SIZE = 64
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
x_train = np.reshape(x_train, (-1, 784))
x_test = np.reshape(x_test, (-1, 784))
# tf.data を使用して入力パイプラインを構築
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(BATCH_SIZE)
val_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test))
val_dataset = val_dataset.batch(BATCH_SIZE)
モデルとトレーニングループの定義
def make_model():
inputs = keras.Input(shape=(784,), name="digits")
x1 = keras.layers.Dense(64, activation="relu")(inputs)
x2 = keras.layers.Dense(64, activation="relu")(x1)
outputs = keras.layers.Dense(10, name="predictions")(x2)
return keras.Model(inputs=inputs, outputs=outputs)
def train_step(x, y, model, optimizer, loss_fn, train_acc_metric):
with tf.GradientTape() as tape:
logits = model(x, training=True)
loss_value = loss_fn(y, logits)
grads = tape.gradient(loss_value, model.trainable_weights)
optimizer.apply_gradients(zip(grads, model.trainable_weights))
train_acc_metric.update_state(y, logits)
return loss_value
def test_step(x, y, model, loss_fn, val_acc_metric):
val_logits = model(x, training=False)
loss_value = loss_fn(y, val_logits)
val_acc_metric.update_state(y, val_logits)
return loss_value
トレーニングループに wandb.log
を追加
def train(
train_dataset,
val_dataset,
model,
optimizer,
train_acc_metric,
val_acc_metric,
epochs=10,
log_step=200,
val_log_step=50,
):
for epoch in range(epochs):
print("\nStart of epoch %d" % (epoch,))
train_loss = []
val_loss = []
# データセットのバッチに対して繰り返し処理
for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
loss_value = train_step(
x_batch_train,
y_batch_train,
model,
optimizer,
loss_fn,
train_acc_metric,
)
train_loss.append(float(loss_value))
# 各エポックの終了時に検証ループを実行
for step, (x_batch_val, y_batch_val) in enumerate(val_dataset):
val_loss_value = test_step(
x_batch_val, y_batch_val, model, loss_fn, val_acc_metric
)
val_loss.append(float(val_loss_value))
# 各エポックの終了時にメトリクスを表示
train_acc = train_acc_metric.result()
print("Training acc over epoch: %.4f" % (float(train_acc),))
val_acc = val_acc_metric.result()
print("Validation acc: %.4f" % (float(val_acc),))
# 各エポックの終了時にメトリクスをリセット
train_acc_metric.reset_states()
val_acc_metric.reset_states()
# ⭐: wandb.log を使用してメトリクスをログに記録
wandb.log(
{
"epochs": epoch,
"loss": np.mean(train_loss),
"acc": float(train_acc),
"val_loss": np.mean(val_loss),
"val_acc": float(val_acc),
}
)
トレーニングを実行
wandb.init
を呼び出して run を開始
これにより、実験を起動したことがわかり、ユニークな ID とダッシュボードを提供できます。
# プロジェクト名で wandb を初期化し、設定をオプションで指定します。
# 設定の値を変えて、wandb ダッシュボードでの結果を確認してください。
config = {
"learning_rate": 0.001,
"epochs": 10,
"batch_size": 64,
"log_step": 200,
"val_log_step": 50,
"architecture": "CNN",
"dataset": "CIFAR-10",
}
run = wandb.init(project='my-tf-integration', config=config)
config = run.config
# モデルの初期化
model = make_model()
# モデルをトレーニングするためのオプティマイザーをインスタンス化
optimizer = keras.optimizers.SGD(learning_rate=config.learning_rate)
# 損失関数をインスタンス化
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)
# メトリクスを準備
train_acc_metric = keras.metrics.SparseCategoricalAccuracy()
val_acc_metric = keras.metrics.SparseCategoricalAccuracy()
train(
train_dataset,
val_dataset,
model,
optimizer,
train_acc_metric,
val_acc_metric,
epochs=config.epochs,
log_step=config.log_step,
val_log_step=config.val_log_step,
)
run.finish() # Jupyter/Colab では、完了したことを知らせます!
結果を可視化
上記の run ページ リンクをクリックして、ライブ結果を確認してください。
Sweep 101
Weights & Biases Sweeps を使用してハイパーパラメータの最適化を自動化し、可能なモデルのスペースを探索しましょう。
Weights & Biases Sweeps を使用した TensorFlow におけるハイパーパラメータ最適化をチェック
W&B Sweeps を使用するメリット
- 簡単なセットアップ: 数行のコードで W&B sweeps を実行できます。
- 透明性: 使用するアルゴリズムをすべて引用しており、コードはオープンソースです。
- 強力: スイープは完全にカスタマイズ可能で設定可能です。何十台ものマシンでスイープを起動することができ、それはラップトップでスイープを開始するのと同じくらい簡単です。

サンプルギャラリー
W&B を使って記録・可視化されたプロジェクトの例を見ることができます。Fully Connected →
ベストプラクティス
- Projects: 複数の runs をプロジェクトにログして、それらを比較します。
wandb.init(project="project-name")
- Groups: 複数のプロセスや交差検証フォールドの場合は、それぞれのプロセスを個別の run としてログし、まとめてグループ化します。
wandb.init(group="experiment-1")
- Tags: 現在のベースラインやプロダクションモデルを追跡するためにタグを追加します。
- Notes: テーブルにメモを入力して、runs 間の変更を追跡します。
- Reports: 進捗について同僚と共有するために迅速にメモを取り、ML プロジェクトのダッシュボードとスナップショットを作成します。
高度なセットアップ
5 - TensorFlow スイープ
W&B を使用して、機械学習実験管理、データセットのバージョン管理、プロジェクトコラボレーションを行いましょう。
W&B Sweeps を使用してハイパーパラメーターの最適化を自動化し、インタラクティブな ダッシュボードでモデルの可能性を探りましょう:

なぜ sweeps を使うのか
- クイックセットアップ: W&B sweeps を数行のコードで実行。
- 透明性: このプロジェクトは使用されるすべてのアルゴリズムを引用し、コードはオープンソースです。
- 強力: Sweeps はカスタマイズオプションを提供し、複数のマシンやノートパソコンで簡単に実行できます。
詳しくは、Sweep ドキュメントを参照してください。
このノートブックで扱う内容
- W&B Sweep と TensorFlow でのカスタム トレーニングループを開始する手順。
- 画像分類タスクのための最適なハイパーパラメーターを見つけること。
注意: Step から始まるセクションには、ハイパーパラメータースイープを実行するために必要なコードが示されています。それ以外はシンプルな例を設定します。
インストール、インポート、ログイン
W&B のインストール
pip install wandb
W&B のインポートとログイン
import tqdm
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.datasets import cifar10
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import wandb
from wandb.integration.keras import WandbMetricsLogger
wandb.login()
wandb.login()
を実行した後のリンクは、新規登録/ログインページへと案内します。データセットの準備
# トレーニングデータセットの準備
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
x_train = x_train / 255.0
x_test = x_test / 255.0
x_train = np.reshape(x_train, (-1, 784))
x_test = np.reshape(x_test, (-1, 784))
分類器 MLP の構築
def Model():
inputs = keras.Input(shape=(784,), name="digits")
x1 = keras.layers.Dense(64, activation="relu")(inputs)
x2 = keras.layers.Dense(64, activation="relu")(x1)
outputs = keras.layers.Dense(10, name="predictions")(x2)
return keras.Model(inputs=inputs, outputs=outputs)
def train_step(x, y, model, optimizer, loss_fn, train_acc_metric):
with tf.GradientTape() as tape:
logits = model(x, training=True)
loss_value = loss_fn(y, logits)
grads = tape.gradient(loss_value, model.trainable_weights)
optimizer.apply_gradients(zip(grads, model.trainable_weights))
train_acc_metric.update_state(y, logits)
return loss_value
def test_step(x, y, model, loss_fn, val_acc_metric):
val_logits = model(x, training=False)
loss_value = loss_fn(y, val_logits)
val_acc_metric.update_state(y, val_logits)
return loss_value
トレーニングループの記述
def train(
train_dataset,
val_dataset,
model,
optimizer,
loss_fn,
train_acc_metric,
val_acc_metric,
epochs=10,
log_step=200,
val_log_step=50,
):
for epoch in range(epochs):
print("\nStart of epoch %d" % (epoch,))
train_loss = []
val_loss = []
# データセットのバッチを繰り返す
for step, (x_batch_train, y_batch_train) in tqdm.tqdm(
enumerate(train_dataset), total=len(train_dataset)
):
loss_value = train_step(
x_batch_train,
y_batch_train,
model,
optimizer,
loss_fn,
train_acc_metric,
)
train_loss.append(float(loss_value))
# 各エポックの終わりに検証ループを実行
for step, (x_batch_val, y_batch_val) in enumerate(val_dataset):
val_loss_value = test_step(
x_batch_val, y_batch_val, model, loss_fn, val_acc_metric
)
val_loss.append(float(val_loss_value))
# 各エポック終了時にメトリクスを表示
train_acc = train_acc_metric.result()
print("Training acc over epoch: %.4f" % (float(train_acc),))
val_acc = val_acc_metric.result()
print("Validation acc: %.4f" % (float(val_acc),))
# 各エポック終了時にメトリクスをリセット
train_acc_metric.reset_states()
val_acc_metric.reset_states()
# 3️⃣ wandb.log を使用してメトリクスをログ
wandb.log(
{
"epochs": epoch,
"loss": np.mean(train_loss),
"acc": float(train_acc),
"val_loss": np.mean(val_loss),
"val_acc": float(val_acc),
}
)
sweep を設定する
sweep を設定する手順:
- 最適化するハイパーパラメーターを定義する
- 最適化メソッドを選択する:
random
、grid
、またはbayes
bayes
の目標とメトリクスを設定する。例えばval_loss
を最小化するhyperband
を使用して、実行中のものを早期終了する
詳しくは W&B Sweeps ドキュメントを参照してください。
sweep_config = {
"method": "random",
"metric": {"name": "val_loss", "goal": "minimize"},
"early_terminate": {"type": "hyperband", "min_iter": 5},
"parameters": {
"batch_size": {"values": [32, 64, 128, 256]},
"learning_rate": {"values": [0.01, 0.005, 0.001, 0.0005, 0.0001]},
},
}
トレーニングループを包む
wandb.config
を使用してハイパーパラメーターを設定してから train
を呼び出すような関数 sweep_train
を作成します。
def sweep_train(config_defaults=None):
# デフォルト値の設定
config_defaults = {"batch_size": 64, "learning_rate": 0.01}
# サンプルプロジェクト名で wandb を初期化
wandb.init(config=config_defaults) # これは Sweep で上書きされます
# 他のハイパーパラメータを設定に指定、ある場合
wandb.config.epochs = 2
wandb.config.log_step = 20
wandb.config.val_log_step = 50
wandb.config.architecture_name = "MLP"
wandb.config.dataset_name = "MNIST"
# tf.data を使用して入力パイプラインを構築
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = (
train_dataset.shuffle(buffer_size=1024)
.batch(wandb.config.batch_size)
.prefetch(buffer_size=tf.data.AUTOTUNE)
)
val_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test))
val_dataset = val_dataset.batch(wandb.config.batch_size).prefetch(
buffer_size=tf.data.AUTOTUNE
)
# モデルを初期化
model = Model()
# モデルをトレーニングするためのオプティマイザーをインスタンス化
optimizer = keras.optimizers.SGD(learning_rate=wandb.config.learning_rate)
# 損失関数をインスタンス化
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)
# メトリクスを準備
train_acc_metric = keras.metrics.SparseCategoricalAccuracy()
val_acc_metric = keras.metrics.SparseCategoricalAccuracy()
train(
train_dataset,
val_dataset,
model,
optimizer,
loss_fn,
train_acc_metric,
val_acc_metric,
epochs=wandb.config.epochs,
log_step=wandb.config.log_step,
val_log_step=wandb.config.val_log_step,
)
sweep を初期化し、パーソナルデジタルアシスタントを実行
sweep_id = wandb.sweep(sweep_config, project="sweeps-tensorflow")
count
パラメーターで実行の数を制限します。迅速な実行のために 10 に設定します。必要に応じて増やしてください。
wandb.agent(sweep_id, function=sweep_train, count=10)
結果を視覚化
ライブ結果を見るには、先行する Sweep URL リンクをクリックしてください。
サンプルギャラリー
W&B で追跡および視覚化されたプロジェクトを Gallery で探索してください。
ベストプラクティス
- Projects: 複数の実行をプロジェクトに記録し、それらを比較します。
wandb.init(project="project-name")
- Groups: 複数プロセスまたはクロスバリデーション折りたたみのために各プロセスを run として記録し、それらをグループ化します。
wandb.init(group='experiment-1')
- Tags: ベースラインまたはプロダクションモデルを追跡するためにタグを使用します。
- Notes: テーブルのメモに run 間の変更を追跡するためのメモを入力します。
- Reports: レポートを使用して進捗メモを作成し、同僚と共有し、MLプロジェクトのダッシュボードとスナップショットを作成します。
高度なセットアップ
6 - 3D 脳腫瘍セグメンテーション with MONAI
このチュートリアルでは、MONAIを使用して、マルチラベルの3D脳腫瘍セグメンテーションタスクのトレーニングワークフローを構築し、Weights & Biasesの実験管理とデータ可視化機能を使用する方法をデモンストレーションします。チュートリアルには以下の機能が含まれています。
- Weights & Biasesのrunを初期化し、再現性を確保するためにrunに関連するすべての設定を同期。
- MONAIトランスフォームAPI:
- 辞書形式のデータ用のMONAIトランスフォーム。
- MONAIの
transforms
APIに従った新しいトランスフォームの定義方法。 - データ拡張のための強度をランダムに調整する方法。
- データのロードと可視化:
- メタデータで
Nifti
画像をロードし、画像のリストをロードしてスタック。 - IOとトランスフォームをキャッシュしてトレーニングと検証を加速。
wandb.Table
とWeights & Biasesでインタラクティブなセグメンテーションオーバーレイを用いてデータを可視化。
- メタデータで
- 3D
SegResNet
モデルのトレーニング- MONAIの
networks
,losses
,metrics
APIsを使用。 - PyTorchトレーニングループを使用して3D
SegResNet
モデルをトレーニング。 - Weights & Biasesを使用してトレーニングの実験管理を追跡。
- Weights & Biases上でモデルのチェックポイントをモデルアーティファクトとしてログとバージョン管理。
- MONAIの
wandb.Table
とWeights & Biasesでインタラクティブなセグメンテーションオーバーレイを使用して、検証データセット上の予測を可視化して比較。
セットアップとインストール
まず、MONAIとWeights & Biasesの最新バージョンをインストールします。
!python -c "import monai" || pip install -q -U "monai[nibabel, tqdm]"
!python -c "import wandb" || pip install -q -U wandb
import os
import numpy as np
from tqdm.auto import tqdm
import wandb
from monai.apps import DecathlonDataset
from monai.data import DataLoader, decollate_batch
from monai.losses import DiceLoss
from monai.inferers import sliding_window_inference
from monai.metrics import DiceMetric
from monai.networks.nets import SegResNet
from monai.transforms import (
Activations,
AsDiscrete,
Compose,
LoadImaged,
MapTransform,
NormalizeIntensityd,
Orientationd,
RandFlipd,
RandScaleIntensityd,
RandShiftIntensityd,
RandSpatialCropd,
Spacingd,
EnsureTyped,
EnsureChannelFirstd,
)
from monai.utils import set_determinism
import torch
次に、ColabインスタンスをW&Bで認証します。
wandb.login()
W&B Runの初期化
新しいW&B runを開始して実験を追跡します。
wandb.init(project="monai-brain-tumor-segmentation")
適切な設定システムの使用は、再現性のある機械学習のための推奨されるベストプラクティスです。W&Bを使用して、各実験のハイパーパラメーターを追跡できます。
config = wandb.config
config.seed = 0
config.roi_size = [224, 224, 144]
config.batch_size = 1
config.num_workers = 4
config.max_train_images_visualized = 20
config.max_val_images_visualized = 20
config.dice_loss_smoothen_numerator = 0
config.dice_loss_smoothen_denominator = 1e-5
config.dice_loss_squared_prediction = True
config.dice_loss_target_onehot = False
config.dice_loss_apply_sigmoid = True
config.initial_learning_rate = 1e-4
config.weight_decay = 1e-5
config.max_train_epochs = 50
config.validation_intervals = 1
config.dataset_dir = "./dataset/"
config.checkpoint_dir = "./checkpoints"
config.inference_roi_size = (128, 128, 64)
config.max_prediction_images_visualized = 20
また、ランダムシードを設定して、モジュールの決定的なトレーニングを有効または無効にする必要があります。
set_determinism(seed=config.seed)
# ディレクトリを作成
os.makedirs(config.dataset_dir, exist_ok=True)
os.makedirs(config.checkpoint_dir, exist_ok=True)
データのロードと変換
ここでは、monai.transforms
APIを使用して、マルチクラスラベルをワンホット形式でのマルチラベルセグメンテーションタスクに変換するカスタムトランスフォームを作成します。
class ConvertToMultiChannelBasedOnBratsClassesd(MapTransform):
"""
脳腫瘍クラスに基づいてラベルをマルチチャネルに変換します:
ラベル 1 は腫瘍周辺の浮腫
ラベル 2 はGD 増強腫瘍
ラベル 3 は壊死および非増強腫瘍コア
考えられるクラスはTC (腫瘍コア)、WT (全腫瘍)
ET (増強腫瘍)です。
参考:https://github.com/Project-MONAI/tutorials/blob/main/3d_segmentation/brats_segmentation_3d.ipynb
"""
def __call__(self, data):
d = dict(data)
for key in self.keys:
result = []
# label 2 と label 3 を組み合わせて TC を構築
result.append(torch.logical_or(d[key] == 2, d[key] == 3))
# label 1, 2 と 3 を組み合わせて WT を構築
result.append(
torch.logical_or(
torch.logical_or(d[key] == 2, d[key] == 3), d[key] == 1
)
)
# label 2 は ET
result.append(d[key] == 2)
d[key] = torch.stack(result, axis=0).float()
return d
次に、トレーニングデータセットと検証データセット用にそれぞれトランスフォームを設定します。
train_transform = Compose(
[
# 4つの Nifti 画像を読み込み、それらをスタック
LoadImaged(keys=["image", "label"]),
EnsureChannelFirstd(keys="image"),
EnsureTyped(keys=["image", "label"]),
ConvertToMultiChannelBasedOnBratsClassesd(keys="label"),
Orientationd(keys=["image", "label"], axcodes="RAS"),
Spacingd(
keys=["image", "label"],
pixdim=(1.0, 1.0, 1.0),
mode=("bilinear", "nearest"),
),
RandSpatialCropd(
keys=["image", "label"], roi_size=config.roi_size, random_size=False
),
RandFlipd(keys=["image", "label"], prob=0.5, spatial_axis=0),
RandFlipd(keys=["image", "label"], prob=0.5, spatial_axis=1),
RandFlipd(keys=["image", "label"], prob=0.5, spatial_axis=2),
NormalizeIntensityd(keys="image", nonzero=True, channel_wise=True),
RandScaleIntensityd(keys="image", factors=0.1, prob=1.0),
RandShiftIntensityd(keys="image", offsets=0.1, prob=1.0),
]
)
val_transform = Compose(
[
LoadImaged(keys=["image", "label"]),
EnsureChannelFirstd(keys="image"),
EnsureTyped(keys=["image", "label"]),
ConvertToMultiChannelBasedOnBratsClassesd(keys="label"),
Orientationd(keys=["image", "label"], axcodes="RAS"),
Spacingd(
keys=["image", "label"],
pixdim=(1.0, 1.0, 1.0),
mode=("bilinear", "nearest"),
),
NormalizeIntensityd(keys="image", nonzero=True, channel_wise=True),
]
)
データセット
この実験で使用されるデータセットは、http://medicaldecathlon.com/ から入手可能です。マルチモーダルおよびマルチサイトMRIデータ(FLAIR, T1w, T1gd, T2w)を使用して、膠芽腫、壊死/活動中の腫瘍、および浮腫をセグメント化します。データセットは750個の4Dボリューム(484 トレーニング + 266 テスト)で構成されています。
DecathlonDataset
を使用してデータセットを自動的にダウンロードし、抽出します。MONAI CacheDataset
を継承し、cache_num=N
を設定してトレーニングのために N
アイテムをキャッシュし、メモリサイズに応じてデフォルト引数を使用して検証のためにすべてのアイテムをキャッシュできます。
train_dataset = DecathlonDataset(
root_dir=config.dataset_dir,
task="Task01_BrainTumour",
transform=val_transform,
section="training",
download=True,
cache_rate=0.0,
num_workers=4,
)
val_dataset = DecathlonDataset(
root_dir=config.dataset_dir,
task="Task01_BrainTumour",
transform=val_transform,
section="validation",
download=False,
cache_rate=0.0,
num_workers=4,
)
train_transform
を train_dataset
に適用する代わりに、val_transform
をトレーニングおよび検証データセットの両方に適用します。これは、トレーニングの前に、データセットの2つのスプリットからのサンプルを視覚化するためです。データセットの可視化
Weights & Biasesは画像、ビデオ、オーディオなどをサポートしています。リッチメディアをログに取り込み、結果を探索し、run、モデル、データセットを視覚的に比較できます。セグメンテーションマスクオーバーレイシステムを使用してデータボリュームを可視化します。 テーブルにセグメンテーションマスクをログに追加するには、テーブル内の各行にwandb.Image
オブジェクトを提供する必要があります。
次の擬似コードは、その例です。
table = wandb.Table(columns=["ID", "Image"])
for id, img, label in zip(ids, images, labels):
mask_img = wandb.Image(
img,
masks={
"prediction": {"mask_data": label, "class_labels": class_labels}
# ...
},
)
table.add_data(id, img)
wandb.log({"Table": table})
次に、サンプル画像、ラベル、wandb.Table
オブジェクトと関連するメタデータを受け取り、Weights & Biases ダッシュボードにログされるテーブルの行を埋めるユーティリティ関数を作成します。
def log_data_samples_into_tables(
sample_image: np.array,
sample_label: np.array,
split: str = None,
data_idx: int = None,
table: wandb.Table = None,
):
num_channels, _, _, num_slices = sample_image.shape
with tqdm(total=num_slices, leave=False) as progress_bar:
for slice_idx in range(num_slices):
ground_truth_wandb_images = []
for channel_idx in range(num_channels):
ground_truth_wandb_images.append(
masks = {
"ground-truth/Tumor-Core": {
"mask_data": sample_label[0, :, :, slice_idx],
"class_labels": {0: "background", 1: "Tumor Core"},
},
"ground-truth/Whole-Tumor": {
"mask_data": sample_label[1, :, :, slice_idx] * 2,
"class_labels": {0: "background", 2: "Whole Tumor"},
},
"ground-truth/Enhancing-Tumor": {
"mask_data": sample_label[2, :, :, slice_idx] * 3,
"class_labels": {0: "background", 3: "Enhancing Tumor"},
},
}
wandb.Image(
sample_image[channel_idx, :, :, slice_idx],
masks=masks,
)
)
table.add_data(split, data_idx, slice_idx, *ground_truth_wandb_images)
progress_bar.update(1)
return table
次に、wandb.Table
オブジェクトと、それが含む列を定義し、データ可視化を使用してそれを埋めます。
table = wandb.Table(
columns=[
"Split",
"Data Index",
"Slice Index",
"Image-Channel-0",
"Image-Channel-1",
"Image-Channel-2",
"Image-Channel-3",
]
)
次に、それぞれtrain_dataset
とval_dataset
をループして、データサンプルの可視化を生成し、ダッシュボードにログを取るためのテーブルの行を埋めます。
# train_dataset の可視化を生成
max_samples = (
min(config.max_train_images_visualized, len(train_dataset))
if config.max_train_images_visualized > 0
else len(train_dataset)
)
progress_bar = tqdm(
enumerate(train_dataset[:max_samples]),
total=max_samples,
desc="Generating Train Dataset Visualizations:",
)
for data_idx, sample in progress_bar:
sample_image = sample["image"].detach().cpu().numpy()
sample_label = sample["label"].detach().cpu().numpy()
table = log_data_samples_into_tables(
sample_image,
sample_label,
split="train",
data_idx=data_idx,
table=table,
)
# val_dataset の可視化を生成
max_samples = (
min(config.max_val_images_visualized, len(val_dataset))
if config.max_val_images_visualized > 0
else len(val_dataset)
)
progress_bar = tqdm(
enumerate(val_dataset[:max_samples]),
total=max_samples,
desc="Generating Validation Dataset Visualizations:",
)
for data_idx, sample in progress_bar:
sample_image = sample["image"].detach().cpu().numpy()
sample_label = sample["label"].detach().cpu().numpy()
table = log_data_samples_into_tables(
sample_image,
sample_label,
split="val",
data_idx=data_idx,
table=table,
)
# ダッシュボードにテーブルをログ
wandb.log({"Tumor-Segmentation-Data": table})
データはW&Bダッシュボード上でインタラクティブな表形式で表示されます。我々はデータボリュームの特定のスライスの各チャンネルを、各行の対応するセグメンテーションマスクと重ね合わせて見ることができます。テーブルのデータを論理的にフィルタリングして、特定の行に集中するために Weave クエリ を書くことができます。
![]() |
---|
ログされたテーブルデータの例。 |
画像を開き、インタラクティブなオーバーレイを使用して、各セグメンテーションマスクをどのように操作できるかを見てみてください。
![]() |
---|
セグメンテーションマップの可視化例。 |
データのロード
データセットからデータをロードするための PyTorch DataLoaders を作成します。 DataLoaders を作成する前に、train_dataset
の transform
を train_transform
に設定して、トレーニング用のデータを前処理および変換します。
# トレーニングデータセットにtrain_transformsを適用
train_dataset.transform = train_transform
# train_loaderを作成
train_loader = DataLoader(
train_dataset,
batch_size=config.batch_size,
shuffle=True,
num_workers=config.num_workers,
)
# val_loaderを作成
val_loader = DataLoader(
val_dataset,
batch_size=config.batch_size,
shuffle=False,
num_workers=config.num_workers,
)
モデル、損失、およびオプティマイザーの作成
このチュートリアルでは、 3D MRI brain tumor segmentation using auto-encoder regularization に基づいた SegResNet
モデルを作成します。SegResNet
モデルは、 monai.networks
API の一部として PyTorch モジュールとして実装されています。これはオプティマイザーと学習率スケジューラともに利用可能です。
device = torch.device("cuda:0")
# モデルの作成
model = SegResNet(
blocks_down=[1, 2, 2, 4],
blocks_up=[1, 1, 1],
init_filters=16,
in_channels=4,
out_channels=3,
dropout_prob=0.2,
).to(device)
# オプティマイザーの作成
optimizer = torch.optim.Adam(
model.parameters(),
config.initial_learning_rate,
weight_decay=config.weight_decay,
)
# 学習率スケジューラの作成
lr_scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
optimizer, T_max=config.max_train_epochs
)
損失を monai.losses
API を使用してマルチラベル DiceLoss
として定義し、それに対応するダイスメトリクスを monai.metrics
API を使用して定義します。
loss_function = DiceLoss(
smooth_nr=config.dice_loss_smoothen_numerator,
smooth_dr=config.dice_loss_smoothen_denominator,
squared_pred=config.dice_loss_squared_prediction,
to_onehot_y=config.dice_loss_target_onehot,
sigmoid=config.dice_loss_apply_sigmoid,
)
dice_metric = DiceMetric(include_background=True, reduction="mean")
dice_metric_batch = DiceMetric(include_background=True, reduction="mean_batch")
post_trans = Compose([Activations(sigmoid=True), AsDiscrete(threshold=0.5)])
# 自動混合精度を使用してトレーニングを加速
scaler = torch.cuda.amp.GradScaler()
torch.backends.cudnn.benchmark = True
混合精度推論のための小さなユーティリティを定義します。これはトレーニングプロセスの検証ステップおよびトレーニング後にモデルを実行したいときに役立ちます。
def inference(model, input):
def _compute(input):
return sliding_window_inference(
inputs=input,
roi_size=(240, 240, 160),
sw_batch_size=1,
predictor=model,
overlap=0.5,
)
with torch.cuda.amp.autocast():
return _compute(input)
トレーニングと検証
トレーニングの前に、トレーニングと検証の実験管理を追跡するために wandb.log()
でログを取るメトリクスプロパティを定義します。
wandb.define_metric("epoch/epoch_step")
wandb.define_metric("epoch/*", step_metric="epoch/epoch_step")
wandb.define_metric("batch/batch_step")
wandb.define_metric("batch/*", step_metric="batch/batch_step")
wandb.define_metric("validation/validation_step")
wandb.define_metric("validation/*", step_metric="validation/validation_step")
batch_step = 0
validation_step = 0
metric_values = []
metric_values_tumor_core = []
metric_values_whole_tumor = []
metric_values_enhanced_tumor = []
標準的な PyTorch トレーニングループの実行
# W&B アーティファクトオブジェクトを定義
artifact = wandb.Artifact(
name=f"{wandb.run.id}-checkpoint", type="model"
)
epoch_progress_bar = tqdm(range(config.max_train_epochs), desc="Training:")
for epoch in epoch_progress_bar:
model.train()
epoch_loss = 0
total_batch_steps = len(train_dataset) // train_loader.batch_size
batch_progress_bar = tqdm(train_loader, total=total_batch_steps, leave=False)
# トレーニングステップ
for batch_data in batch_progress_bar:
inputs, labels = (
batch_data["image"].to(device),
batch_data["label"].to(device),
)
optimizer.zero_grad()
with torch.cuda.amp.autocast():
outputs = model(inputs)
loss = loss_function(outputs, labels)
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
epoch_loss += loss.item()
batch_progress_bar.set_description(f"train_loss: {loss.item():.4f}:")
## バッチ単位のトレーニング損失を W&B にログ
wandb.log({"batch/batch_step": batch_step, "batch/train_loss": loss.item()})
batch_step += 1
lr_scheduler.step()
epoch_loss /= total_batch_steps
## エポック単位のトレーニング損失と学習率を W&B にログ
wandb.log(
{
"epoch/epoch_step": epoch,
"epoch/mean_train_loss": epoch_loss,
"epoch/learning_rate": lr_scheduler.get_last_lr()[0],
}
)
epoch_progress_bar.set_description(f"Training: train_loss: {epoch_loss:.4f}:")
# 検証とモデルのチェックポイントステップ
if (epoch + 1) % config.validation_intervals == 0:
model.eval()
with torch.no_grad():
for val_data in val_loader:
val_inputs, val_labels = (
val_data["image"].to(device),
val_data["label"].to(device),
)
val_outputs = inference(model, val_inputs)
val_outputs = [post_trans(i) for i in decollate_batch(val_outputs)]
dice_metric(y_pred=val_outputs, y=val_labels)
dice_metric_batch(y_pred=val_outputs, y=val_labels)
metric_values.append(dice_metric.aggregate().item())
metric_batch = dice_metric_batch.aggregate()
metric_values_tumor_core.append(metric_batch[0].item())
metric_values_whole_tumor.append(metric_batch[1].item())
metric_values_enhanced_tumor.append(metric_batch[2].item())
dice_metric.reset()
dice_metric_batch.reset()
checkpoint_path = os.path.join(config.checkpoint_dir, "model.pth")
torch.save(model.state_dict(), checkpoint_path)
# W&Bアーティファクトを使用してモデルのチェックポイントをログとバージョン管理。
artifact.add_file(local_path=checkpoint_path)
wandb.log_artifact(artifact, aliases=[f"epoch_{epoch}"])
# W&Bダッシュボードに検証メトリクスをログ。
wandb.log(
{
"validation/validation_step": validation_step,
"validation/mean_dice": metric_values[-1],
"validation/mean_dice_tumor_core": metric_values_tumor_core[-1],
"validation/mean_dice_whole_tumor": metric_values_whole_tumor[-1],
"validation/mean_dice_enhanced_tumor": metric_values_enhanced_tumor[-1],
}
)
validation_step += 1
# このアーティファクトがログを終了するのを待ちます。
artifact.wait()
コードを wandb.log
で計装することで、トレーニングと検証プロセスに関連するメトリクスすべてを追跡するだけでなく、W&Bダッシュボード上のすべてのシステムメトリクス(この場合はCPUとGPU)も追跡できます。
![]() |
---|
W&Bでのトレーニングと検証プロセス追跡の例。 |
W&Bの run ダッシュボードのアーティファクトタブに移動して、トレーニング中にログされたモデルチェックポイントアーティファクトの異なるバージョンにアクセスします。
![]() |
---|
W&Bでのモデルチェックポイントのログとバージョン管理の例。 |
推論
アーティファクトインターフェースを使用して、このケースでは平均エポック単位のトレーニング損失が最良のモデルチェックポイントであるアーティファクトのバージョンを選択できます。アーティファクト全体のリネージを探索し、必要なバージョンを使用することもできます。
![]() |
---|
W&Bでのモデルアーティファクト追跡の例。 |
最良のエポック単位の平均トレーニング損失を持つモデルアーティファクトのバージョンをフェッチし、チェックポイントステート辞書をモデルにロードします。
model_artifact = wandb.use_artifact(
"geekyrakshit/monai-brain-tumor-segmentation/d5ex6n4a-checkpoint:v49",
type="model",
)
model_artifact_dir = model_artifact.download()
model.load_state_dict(torch.load(os.path.join(model_artifact_dir, "model.pth")))
model.eval()
予測の可視化と正解ラベルとの比較
予測されたセグメンテーションマスクと対応する正解のセグメンテーションマスクをインタラクティブなセグメンテーションマスクオーバーレイを使用して視覚化するためのユーティリティ関数を作成します。
def log_predictions_into_tables(
sample_image: np.array,
sample_label: np.array,
predicted_label: np.array,
split: str = None,
data_idx: int = None,
table: wandb.Table = None,
):
num_channels, _, _, num_slices = sample_image.shape
with tqdm(total=num_slices, leave=False) as progress_bar:
for slice_idx in range(num_slices):
wandb_images = []
for channel_idx in range(num_channels):
wandb_images += [
wandb.Image(
sample_image[channel_idx, :, :, slice_idx],
masks={
"ground-truth/Tumor-Core": {
"mask_data": sample_label[0, :, :, slice_idx],
"class_labels": {0: "background", 1: "Tumor Core"},
},
"prediction/Tumor-Core": {
"mask_data": predicted_label[0, :, :, slice_idx] * 2,
"class_labels": {0: "background", 2: "Tumor Core"},
},
},
),
wandb.Image(
sample_image[channel_idx, :, :, slice_idx],
masks={
"ground-truth/Whole-Tumor": {
"mask_data": sample_label[1, :, :, slice_idx],
"class_labels": {0: "background", 1: "Whole Tumor"},
},
"prediction/Whole-Tumor": {
"mask_data": predicted_label[1, :, :, slice_idx] * 2,
"class_labels": {0: "background", 2: "Whole Tumor"},
},
},
),
wandb.Image(
sample_image[channel_idx, :, :, slice_idx],
masks={
"ground-truth/Enhancing-Tumor": {
"mask_data": sample_label[2, :, :, slice_idx],
"class_labels": {0: "background", 1: "Enhancing Tumor"},
},
"prediction/Enhancing-Tumor": {
"mask_data": predicted_label[2, :, :, slice_idx] * 2,
"class_labels": {0: "background", 2: "Enhancing Tumor"},
},
},
),
]
table.add_data(split, data_idx, slice_idx, *wandb_images)
progress_bar.update(1)
return table
予測結果を予測テーブルにログします。
# 予測テーブルを作成
prediction_table = wandb.Table(
columns=[
"Split",
"Data Index",
"Slice Index",
"Image-Channel-0/Tumor-Core",
"Image-Channel-1/Tumor-Core",
"Image-Channel-2/Tumor-Core",
"Image-Channel-3/Tumor-Core",
"Image-Channel-0/Whole-Tumor",
"Image-Channel-1/Whole-Tumor",
"Image-Channel-2/Whole-Tumor",
"Image-Channel-3/Whole-Tumor",
"Image-Channel-0/Enhancing-Tumor",
"Image-Channel-1/Enhancing-Tumor",
"Image-Channel-2/Enhancing-Tumor",
"Image-Channel-3/Enhancing-Tumor",
]
)
# 推論と可視化を実行
with torch.no_grad():
config.max_prediction_images_visualized
max_samples = (
min(config.max_prediction_images_visualized, len(val_dataset))
if config.max_prediction_images_visualized > 0
else len(val_dataset)
)
progress_bar = tqdm(
enumerate(val_dataset[:max_samples]),
total=max_samples,
desc="Generating Predictions:",
)
for data_idx, sample in progress_bar:
val_input = sample["image"].unsqueeze(0).to(device)
val_output = inference(model, val_input)
val_output = post_trans(val_output[0])
prediction_table = log_predictions_into_tables(
sample_image=sample["image"].cpu().numpy(),
sample_label=sample["label"].cpu().numpy(),
predicted_label=val_output.cpu().numpy(),
data_idx=data_idx,
split="validation",
table=prediction_table,
)
wandb.log({"Predictions/Tumor-Segmentation-Data": prediction_table})
# 実験終了
wandb.finish()
インタラクティブなセグメンテーションマスクオーバーレイを使用して、予測されたセグメンテーションマスクと各クラスの正解ラベルを分析および比較します。
![]() |
---|
W&Bでの予測と正解の可視化例。 |
謝辞と追加リソース
7 - Keras
Weights & Biases を使用して、機械学習の実験管理、データセット バージョン管理、プロジェクトのコラボレーションを行いましょう。
この Colabノートブックでは、WandbMetricsLogger
コールバックを紹介します。このコールバックは 実験管理 に使用できます。これにより、トレーニングと検証のメトリクスとシステムメトリクスを Weights & Biases に記録します。
セットアップとインストール
まず、最新バージョンの Weights & Biases をインストールしましょう。次に、この Colabインスタンスを認証して W&B を使用できるようにします。
pip install -qq -U wandb
import os
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras import models
import tensorflow_datasets as tfds
# Weights and Biases 関連のインポート
import wandb
from wandb.integration.keras import WandbMetricsLogger
W&B を初めて使用する場合、またはログインしていない場合、wandb.login()
を実行した後に表示されるリンクはサインアップ/ログインページに導きます。無料アカウント のサインアップは数クリックで完了します。
wandb.login()
ハイパーパラメーター
適切な設定システムの使用は、再現可能な機械学習の推奨ベストプラクティスです。W&B を使用して、実験ごとにハイパーパラメーターを追跡できます。この Colab では、シンプルな Python の dict
を設定システムとして使用します。
configs = dict(
num_classes=10,
shuffle_buffer=1024,
batch_size=64,
image_size=28,
image_channels=1,
earlystopping_patience=3,
learning_rate=1e-3,
epochs=10,
)
データセット
この Colab では、TensorFlow データセットカタログから CIFAR100 データセットを使用します。私たちの目標は、TensorFlow/Keras を使用してシンプルな画像分類パイプラインを構築することです。
train_ds, valid_ds = tfds.load("fashion_mnist", split=["train", "test"])
AUTOTUNE = tf.data.AUTOTUNE
def parse_data(example):
# 画像を取得する
image = example["image"]
# image = tf.image.convert_image_dtype(image, dtype=tf.float32)
# ラベルを取得する
label = example["label"]
label = tf.one_hot(label, depth=configs["num_classes"])
return image, label
def get_dataloader(ds, configs, dataloader_type="train"):
dataloader = ds.map(parse_data, num_parallel_calls=AUTOTUNE)
if dataloader_type == "train":
dataloader = dataloader.shuffle(configs["shuffle_buffer"])
dataloader = dataloader.batch(configs["batch_size"]).prefetch(AUTOTUNE)
return dataloader
trainloader = get_dataloader(train_ds, configs)
validloader = get_dataloader(valid_ds, configs, dataloader_type="valid")
モデル
def get_model(configs):
backbone = tf.keras.applications.mobilenet_v2.MobileNetV2(
weights="imagenet", include_top=False
)
backbone.trainable = False
inputs = layers.Input(
shape=(configs["image_size"], configs["image_size"], configs["image_channels"])
)
resize = layers.Resizing(32, 32)(inputs)
neck = layers.Conv2D(3, (3, 3), padding="same")(resize)
preprocess_input = tf.keras.applications.mobilenet.preprocess_input(neck)
x = backbone(preprocess_input)
x = layers.GlobalAveragePooling2D()(x)
outputs = layers.Dense(configs["num_classes"], activation="softmax")(x)
return models.Model(inputs=inputs, outputs=outputs)
tf.keras.backend.clear_session()
model = get_model(configs)
model.summary()
モデルのコンパイル
model.compile(
optimizer="adam",
loss="categorical_crossentropy",
metrics=[
"accuracy",
tf.keras.metrics.TopKCategoricalAccuracy(k=5, name="top@5_accuracy"),
],
)
トレーニング
# W&B の run を初期化する
run = wandb.init(project="intro-keras", config=configs)
# あなたのモデルをトレーニングする
model.fit(
trainloader,
epochs=configs["epochs"],
validation_data=validloader,
callbacks=[
WandbMetricsLogger(log_freq=10)
], # ここで WandbMetricsLogger を使用することに注意
)
# W&B の run を終了する
run.finish()
8 - Keras テーブル
機械学習の実験管理、データセットバージョン管理、およびプロジェクトコラボレーションに Weights & Biases を使用します。
この Colabノートブックでは、WandbEvalCallback
を紹介します。これは抽象的なコールバックで、モデル予測の可視化とデータセットの可視化に役立つコールバックを構築するために継承されます。
セットアップとインストール
まず、最新バージョンの Weights and Biases をインストールしましょう。その後、この Colab インスタンスを認証して W&B を利用できるようにします。
pip install -qq -U wandb
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras import models
import tensorflow_datasets as tfds
# Weights and Biases に関連するインポート
import wandb
from wandb.integration.keras import WandbMetricsLogger
from wandb.integration.keras import WandbModelCheckpoint
from wandb.integration.keras import WandbEvalCallback
もしこれが初めての W&B の使用であるかまだログインしていない場合、wandb.login()
を実行した後に表示されるリンクがサインアップ/ログインページに誘導します。無料アカウントへのサインアップは、数クリックで簡単です。
wandb.login()
ハイパーパラメーター
適切なコンフィグシステムの使用は、再現可能な機械学習のための推奨ベストプラクティスです。W&B を使用して、各実験のハイパーパラメーターを管理することができます。この Colab では、シンプルな Python の dict
をコンフィグシステムとして使用します。
configs = dict(
num_classes=10,
shuffle_buffer=1024,
batch_size=64,
image_size=28,
image_channels=1,
earlystopping_patience=3,
learning_rate=1e-3,
epochs=10,
)
データセット
この Colab では、TensorFlow データセットカタログから CIFAR100 データセットを使用します。TensorFlow/Keras を使用して、シンプルな画像分類 パイプラインを構築することを目指します。
train_ds, valid_ds = tfds.load("fashion_mnist", split=["train", "test"])
AUTOTUNE = tf.data.AUTOTUNE
def parse_data(example):
# 画像を取得
image = example["image"]
# image = tf.image.convert_image_dtype(image, dtype=tf.float32)
# ラベルを取得
label = example["label"]
label = tf.one_hot(label, depth=configs["num_classes"])
return image, label
def get_dataloader(ds, configs, dataloader_type="train"):
dataloader = ds.map(parse_data, num_parallel_calls=AUTOTUNE)
if dataloader_type=="train":
dataloader = dataloader.shuffle(configs["shuffle_buffer"])
dataloader = (
dataloader
.batch(configs["batch_size"])
.prefetch(AUTOTUNE)
)
return dataloader
trainloader = get_dataloader(train_ds, configs)
validloader = get_dataloader(valid_ds, configs, dataloader_type="valid")
モデル
def get_model(configs):
backbone = tf.keras.applications.mobilenet_v2.MobileNetV2(
weights="imagenet", include_top=False
)
backbone.trainable = False
inputs = layers.Input(
shape=(configs["image_size"], configs["image_size"], configs["image_channels"])
)
resize = layers.Resizing(32, 32)(inputs)
neck = layers.Conv2D(3, (3, 3), padding="same")(resize)
preprocess_input = tf.keras.applications.mobilenet.preprocess_input(neck)
x = backbone(preprocess_input)
x = layers.GlobalAveragePooling2D()(x)
outputs = layers.Dense(configs["num_classes"], activation="softmax")(x)
return models.Model(inputs=inputs, outputs=outputs)
tf.keras.backend.clear_session()
model = get_model(configs)
model.summary()
モデルのコンパイル
model.compile(
optimizer="adam",
loss="categorical_crossentropy",
metrics=[
"accuracy",
tf.keras.metrics.TopKCategoricalAccuracy(k=5, name="top@5_accuracy"),
],
)
WandbEvalCallback
WandbEvalCallback
は主にモデル予測の可視化、そして二次的にはデータセットの可視化のための Keras コールバックを構築するための抽象基底クラスです。
これはデータセットやタスクに依存しない抽象コールバックです。これを使用するには、この基底コールバッククラスから継承し、add_ground_truth
と add_model_prediction
メソッドを実装します。
WandbEvalCallback
は以下のような便利なメソッドを提供するユーティリティクラスです:
- データと予測の
wandb.Table
インスタンスを作成、 wandb.Artifact
としてデータと予測テーブルをログ、on_train_begin
にデータテーブルをログ、on_epoch_end
に予測テーブルをログ。
例として、画像分類タスクのために WandbClfEvalCallback
を以下に実装しました。この例では:
- W&B にバリデーションデータ (
data_table
) をログ、 - 推論を行い、各エポックの終わりに W&B に予測 (
pred_table
) をログします。
メモリ使用量が削減される仕組み
on_train_begin
メソッドが呼び出される時に data_table
を W&B にログします。一度 W&B のアーティファクトとしてアップロードされると、このテーブルへの参照を取得できます。それはクラス変数 data_table_ref
を使用してアクセスできます。data_table_ref
は 2D リストで、self.data_table_ref[idx][n]
のようにインデックス付けできます。ここで idx
は行番号、n
は列番号です。以下の例で使用方法を見てみましょう。
class WandbClfEvalCallback(WandbEvalCallback):
def __init__(
self, validloader, data_table_columns, pred_table_columns, num_samples=100
):
super().__init__(data_table_columns, pred_table_columns)
self.val_data = validloader.unbatch().take(num_samples)
def add_ground_truth(self, logs=None):
for idx, (image, label) in enumerate(self.val_data):
self.data_table.add_data(idx, wandb.Image(image), np.argmax(label, axis=-1))
def add_model_predictions(self, epoch, logs=None):
# 予測を得る
preds = self._inference()
table_idxs = self.data_table_ref.get_index()
for idx in table_idxs:
pred = preds[idx]
self.pred_table.add_data(
epoch,
self.data_table_ref.data[idx][0],
self.data_table_ref.data[idx][1],
self.data_table_ref.data[idx][2],
pred,
)
def _inference(self):
preds = []
for image, label in self.val_data:
pred = self.model(tf.expand_dims(image, axis=0))
argmax_pred = tf.argmax(pred, axis=-1).numpy()[0]
preds.append(argmax_pred)
return preds
トレーニング
# W&B の run を初期化
run = wandb.init(project="intro-keras", config=configs)
# モデルをトレーニング
model.fit(
trainloader,
epochs=configs["epochs"],
validation_data=validloader,
callbacks=[
WandbMetricsLogger(log_freq=10),
WandbClfEvalCallback(
validloader,
data_table_columns=["idx", "image", "ground_truth"],
pred_table_columns=["epoch", "idx", "image", "ground_truth", "prediction"],
), # ここで WandbEvalCallback を使用
],
)
# W&B の run を終了
run.finish()
9 - Keras モデル
Weights & Biases を使用して機械学習の実験管理、データセットのバージョン管理、プロジェクトのコラボレーションを行いましょう。
この Colab ノートブックは WandbModelCheckpoint
コールバックを紹介します。このコールバックを使用して、モデルのチェックポイントを Weights & Biases Artifacts にログします。
セットアップとインストール
まず、最新バージョンの Weights & Biases をインストールします。次に、この colab インスタンスを認証して W&B を使用します。
!pip install -qq -U wandb
import os
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras import models
import tensorflow_datasets as tfds
# Weights & Biases に関連するインポート
import wandb
from wandb.integration.keras import WandbMetricsLogger
from wandb.integration.keras import WandbModelCheckpoint
もし初めて W&B を使用するか、ログインしていない場合、wandb.login()
を実行した後に表示されるリンクをクリックするとサインアップ/ログインページに移動します。無料アカウント の登録は数回のクリックで簡単に行えます。
wandb.login()
ハイパーパラメーター
適切なコンフィグシステムの使用は、再現性のある機械学習のベストプラクティスとして推奨されます。各実験のハイパーパラメーターを W&B を使用して管理できます。この colab では、コンフィグシステムとしてシンプルな Python の dict
を使用します。
configs = dict(
num_classes = 10,
shuffle_buffer = 1024,
batch_size = 64,
image_size = 28,
image_channels = 1,
earlystopping_patience = 3,
learning_rate = 1e-3,
epochs = 10
)
データセット
この colab では、 TensorFlow データセットカタログから CIFAR100 データセットを使用します。TensorFlow/Keras でシンプルな画像分類パイプラインを構築することを目指します。
train_ds, valid_ds = tfds.load('fashion_mnist', split=['train', 'test'])
AUTOTUNE = tf.data.AUTOTUNE
def parse_data(example):
# 画像を取得
image = example["image"]
# image = tf.image.convert_image_dtype(image, dtype=tf.float32)
# ラベルを取得
label = example["label"]
label = tf.one_hot(label, depth=configs["num_classes"])
return image, label
def get_dataloader(ds, configs, dataloader_type="train"):
dataloader = ds.map(parse_data, num_parallel_calls=AUTOTUNE)
if dataloader_type=="train":
dataloader = dataloader.shuffle(configs["shuffle_buffer"])
dataloader = (
dataloader
.batch(configs["batch_size"])
.prefetch(AUTOTUNE)
)
return dataloader
trainloader = get_dataloader(train_ds, configs)
validloader = get_dataloader(valid_ds, configs, dataloader_type="valid")
モデル
def get_model(configs):
backbone = tf.keras.applications.mobilenet_v2.MobileNetV2(weights='imagenet', include_top=False)
backbone.trainable = False
inputs = layers.Input(shape=(configs["image_size"], configs["image_size"], configs["image_channels"]))
resize = layers.Resizing(32, 32)(inputs)
neck = layers.Conv2D(3, (3,3), padding="same")(resize)
preprocess_input = tf.keras.applications.mobilenet.preprocess_input(neck)
x = backbone(preprocess_input)
x = layers.GlobalAveragePooling2D()(x)
outputs = layers.Dense(configs["num_classes"], activation="softmax")(x)
return models.Model(inputs=inputs, outputs=outputs)
tf.keras.backend.clear_session()
model = get_model(configs)
model.summary()
モデルのコンパイル
model.compile(
optimizer = "adam",
loss = "categorical_crossentropy",
metrics = ["accuracy", tf.keras.metrics.TopKCategoricalAccuracy(k=5, name='top@5_accuracy')]
)
トレーニング
# W&B の run を初期化
run = wandb.init(
project = "intro-keras",
config = configs
)
# モデルをトレーニング
model.fit(
trainloader,
epochs = configs["epochs"],
validation_data = validloader,
callbacks = [
WandbMetricsLogger(log_freq=10),
WandbModelCheckpoint(filepath="models/") # ここで WandbModelCheckpoint を使用しています
]
)
# W&B の run を終了
run.finish()
10 - XGBoost Sweeps
Weights & Biases を使って機械学習の実験管理、データセットのバージョン管理、プロジェクトの協力を行いましょう。
ツリー型モデルから最良のパフォーマンスを引き出すには、適切なハイパーパラメーターを選択する必要があります。いくつの early_stopping_rounds
が必要でしょうか? ツリーの max_depth
はどのくらいにすべきですか?
高次元のハイパーパラメータ空間を検索して最適なモデルを見つけることは非常に困難です。ハイパーパラメーター探索は、モデルのバトルロイヤルを組織的かつ効率的に実施し、勝者を決定する方法を提供します。これにより、ハイパーパラメータの組み合わせを自動的に検索して、最も最適な値を見つけ出すことができます。
このチュートリアルでは、Weights & Biases を使用して XGBoost モデルで洗練されたハイパーパラメーター探索を 3 つの簡単なステップで実行する方法を紹介します。
興味を引くために、以下のプロットをチェックしてください:

Sweeps: 概要
Weights & Biases を使ったハイパーパラメーター探索の実行は非常に簡単です。以下の3つのシンプルなステップです:
-
スイープを定義する: スイープを定義するためには、スイープを構成するパラメータ、使用する検索戦略、最適化するメトリクスを指定する辞書のようなオブジェクトを作成します。
-
スイープを初期化する: 1 行のコードでスイープを初期化し、スイープの設定の辞書を渡します:
sweep_id = wandb.sweep(sweep_config)
-
スイープエージェントを実行する: これも 1 行のコードで、
wandb.agent()
を呼び出し、sweep_id
とモデルアーキテクチャを定義してトレーニングする関数を渡します:wandb.agent(sweep_id, function=train)
これだけでハイパーパラメーター探索を実行することができます。
以下のノートブックでは、これらの 3 ステップを詳細に説明します。
ぜひこのノートブックをフォークして、パラメータを調整したり、独自のデータセットでモデルを試してみてください。
リソース
!pip install wandb -qU
import wandb
wandb.login()
1. スイープを定義する
Weights & Biases の Sweeps は、希望通りにスイープを設定するための強力なレバーを少ないコード行で提供します。スイープの設定は、辞書または YAML ファイルとして定義できます。
それらのいくつかを一緒に見ていきましょう:
- メトリック: これは、スイープが最適化しようとしているメトリックです。メトリクスは
name
(トレーニングスクリプトでログされるべきメトリック名) とgoal
(maximize
かminimize
) を取ることができます。 - 検索戦略:
"method"
キーを使用して指定されます。スイープでは、いくつかの異なる検索戦略をサポートしています。 - グリッド検索: ハイパーパラメーター値のすべての組み合わせを反復します。
- ランダム検索: ランダムに選ばれたハイパーパラメーター値の組み合わせを反復します。
- ベイズ探索: ハイパーパラメーターをメトリクススコアの確率とマッピングする確率モデルを作成し、メトリクスを改善する高い確率のパラメータを選択します。ベイズ最適化の目的は、ハイパーパラメーター値の選択に時間をかけることですが、その代わりにより少ないハイパーパラメーター値を試すことを試みます。
- パラメータ: ハイパーパラメータ名、離散値、範囲、または各反復で値を取り出す分布を含む辞書です。
詳細については、すべてのスイープ設定オプションのリストを参照してください。
sweep_config = {
"method": "random", # try grid or random
"metric": {
"name": "accuracy",
"goal": "maximize"
},
"parameters": {
"booster": {
"values": ["gbtree","gblinear"]
},
"max_depth": {
"values": [3, 6, 9, 12]
},
"learning_rate": {
"values": [0.1, 0.05, 0.2]
},
"subsample": {
"values": [1, 0.5, 0.3]
}
}
}
2. スイープを初期化する
wandb.sweep
を呼び出すとスイープコントローラー、つまり parameters
の設定を問い合わせるすべての者に提供し、metrics
のパフォーマンスを wandb
ログを介して返すことを期待する集中プロセスが開始されます。
sweep_id = wandb.sweep(sweep_config, project="XGBoost-sweeps")
トレーニングプロセスを定義する
スイープを実行する前に、モデルを作成してトレーニングする関数を定義する必要があります。 この関数は、ハイパーパラメーター値を取り込み、メトリクスを出力するものです。
また、スクリプト内に wandb
を統合する必要があります。
主なコンポーネントは3つです:
wandb.init()
: 新しい W&B run を初期化します。各 run はトレーニングスクリプトの単一の実行です。wandb.config
: すべてのハイパーパラメーターを設定 オブジェクトに保存します。これにより、私たちのアプリを使用して、ハイパーパラメーター値ごとに run をソートおよび比較することができます。wandb.log()
: 画像、ビデオ、オーディオファイル、HTML、プロット、またはポイントクラウドなどのメトリクスとカスタムオブジェクトをログします。
また、データをダウンロードする必要があります:
!wget https://raw.githubusercontent.com/jbrownlee/Datasets/master/pima-indians-diabetes.data.csv
# Pima Indians データセット用の XGBoost モデル
from numpy import loadtxt
from xgboost import XGBClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
# データを読み込む
def train():
config_defaults = {
"booster": "gbtree",
"max_depth": 3,
"learning_rate": 0.1,
"subsample": 1,
"seed": 117,
"test_size": 0.33,
}
wandb.init(config=config_defaults) # スイープ中にデフォルトが上書きされる
config = wandb.config
# データを読み込み、予測変数とターゲットに分ける
dataset = loadtxt("pima-indians-diabetes.data.csv", delimiter=",")
X, Y = dataset[:, :8], dataset[:, 8]
# データをトレインセットとテストセットに分割
X_train, X_test, y_train, y_test = train_test_split(X, Y,
test_size=config.test_size,
random_state=config.seed)
# トレインセットでモデルを適合させる
model = XGBClassifier(booster=config.booster, max_depth=config.max_depth,
learning_rate=config.learning_rate, subsample=config.subsample)
model.fit(X_train, y_train)
# テストセットで予測を行う
y_pred = model.predict(X_test)
predictions = [round(value) for value in y_pred]
# 予測を評価する
accuracy = accuracy_score(y_test, predictions)
print(f"Accuracy: {accuracy:.0%}")
wandb.log({"accuracy": accuracy})
3. エージェントでスイープを実行する
次に、wandb.agent
を呼び出してスイープを起動します。
wandb.agent
は W&B にログインしているすべてのマシンで呼び出すことができ、
sweep_id
- データセットと
train
関数
があるので、そのマシンはスイープに参加します。
注意:
random
スイープはデフォルトでは永遠に実行され、新しいパラメータの組み合わせを試し続けます。しかし、それはアプリの UI からスイープをオフにするまでです。完了する run の合計count
をagent
に指定することで、この動作を防ぐことができます。
wandb.agent(sweep_id, train, count=25)
結果を可視化する
スイープが終了したら、結果を確認します。
Weights & Biases は、あなたのために便利なプロットをいくつか自動的に生成します。
パラレル座標プロット
このプロットは、ハイパーパラメーター値をモデルのメトリクスにマッピングします。最良のモデルパフォーマンスをもたらしたハイパーパラメーターの組み合わせに焦点を当てるのに役立ちます。
このプロットは、単純な線形モデルを学習者として使用するよりも、ツリーを学習者として使用する方がやや優れていることを示しているようです。

ハイパーパラメーターの重要度プロット
ハイパーパラメーターの重要度プロットは、メトリクスに最も大きな影響を与えたハイパーパラメーター値を示しています。
相関関係(線形予測子として扱う)と特徴量の重要性(結果に基づいてランダムフォレストをトレーニングした後) の両方を報告しますので、どのパラメータが最も大きな影響を与えたか、そしてその影響がどちらの方向であったかを確認できます。
このチャートを読むと、上記のパラレル座標チャートで気づいた傾向の定量的な確認が得られます。検証精度に最大の影響を与えたのは学習者の選択であり、gblinear
学習者は一般的に gbtree
学習者よりも劣っていました。

これらの可視化は、最も重要で、さらに探索する価値のあるパラメータ(とその値の範囲)に焦点を当てることにより、高価なハイパーパラメーターの最適化を実行する時間とリソースを節約するのに役立ちます。