1 - PyTorch

Weights & Biases を使用して、機械学習の実験管理、データセットのバージョン管理、およびプロジェクトのコラボレーションを行います。

このノートブックでカバーする内容

PyTorch のコードに Weights & Biases を統合して、実験管理をパイプラインに追加する方法を示します。

# ライブラリをインポート
import wandb

# 新しい実験を開始
wandb.init(project="new-sota-model")

# ハイパーパラメータの辞書を config でキャプチャ
wandb.config = {"learning_rate": 0.001, "epochs": 100, "batch_size": 128}

# モデルとデータを設定
model, dataloader = get_model(), get_data()

# オプション: 勾配を追跡
wandb.watch(model)

for batch in dataloader:
  metrics = model.training_step()
  # トレーニングループ内でメトリクスをログしてモデルの性能を視覚化
  wandb.log(metrics)

# オプション: モデルを最後に保存
model.to_onnx()
wandb.save("model.onnx")

ビデオチュートリアルに沿って進めましょう。

注意: Step で始まるセクションは、既存のパイプラインに W&B を統合するために必要な部分です。それ以外はデータを読み込み、モデルを定義するだけです。

インストール、インポート、ログイン

import os
import random

import numpy as np
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
from tqdm.auto import tqdm

# 決定論的な振る舞いを保証
torch.backends.cudnn.deterministic = True
random.seed(hash("setting random seeds") % 2**32 - 1)
np.random.seed(hash("improves reproducibility") % 2**32 - 1)
torch.manual_seed(hash("by removing stochasticity") % 2**32 - 1)
torch.cuda.manual_seed_all(hash("so runs are repeatable") % 2**32 - 1)

# デバイスの設定
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# MNIST ミラーのリストから遅いミラーを削除
torchvision.datasets.MNIST.mirrors = [mirror for mirror in torchvision.datasets.MNIST.mirrors
                                      if not mirror.startswith("http://yann.lecun.com")]

ステップ 0: W&B をインストール

まずは、このライブラリを入手する必要があります。 wandbpip を使用して簡単にインストールできます。

!pip install wandb onnx -Uq

ステップ 1: W&B をインポートしてログイン

データをウェブサービスにログするためには、 ログインが必要です。

W&B を初めて使用する場合は、 表示されるリンクから無料アカウントに登録する必要があります。

import wandb

wandb.login()

実験とパイプラインを定義

wandb.init でメタデータとハイパーパラメータを追跡

プログラム上、最初に行うのは実験を定義することです。 ハイパーパラメータとは何か?この run に関連するメタデータは何か?

この情報を config 辞書(または類似オブジェクト)に保存し、 必要に応じてアクセスするのは非常に一般的なワークフローです。

この例では、数少ないハイパーパラメータのみを変動させ、 残りを手動でコーディングします。 しかし、あなたのモデルのどの部分も config の一部にすることができます。

また、いくつかのメタデータも含めます:私たちは MNIST データセットと畳み込み ニューラルネットワークを使用しているので、 同じプロジェクトで、たとえば全結合ニューラルネットワークで CIFAR を扱う場合、 この情報が run を分ける助けになります。

config = dict(
    epochs=5,
    classes=10,
    kernels=[16, 32],
    batch_size=128,
    learning_rate=0.005,
    dataset="MNIST",
    architecture="CNN")

さて、全体のパイプラインを定義しましょう。 これは非常に典型的なモデルトレーニングの手順です:

  1. まず、モデル、関連するデータ、オプティマイザーを make し、
  2. それに基づいてモデルを train し、最後に
  3. トレーニングがどのように行われたかを確認するために test します。

これらの関数を以下で実装します。

def model_pipeline(hyperparameters):

    # wandb に開始を伝えます
    with wandb.init(project="pytorch-demo", config=hyperparameters):
      # すべての HP を wandb.config 経由でアクセスし、ログが実行と一致するようにします。
      config = wandb.config

      # モデル、データ、最適化問題を作成します
      model, train_loader, test_loader, criterion, optimizer = make(config)
      print(model)

      # それらを使用してモデルをトレーニングします
      train(model, train_loader, criterion, optimizer, config)

      # 最終的なパフォーマンスをテストします
      test(model, test_loader)

    return model

ここでの標準パイプラインとの唯一の違いは、 すべてが wandb.init のコンテキスト内で行われる点です。 この関数を呼び出すことで、 コードとサーバーとの間に通信のラインが確立されます。

config 辞書を wandb.init に渡すことで、 すべての情報が即座にログされるため、 実験に使用するハイパーパラメータの値を常に把握できます。

選んでログした値が常にモデルに使用されることを保証するために、 wandb.config のオブジェクトコピーを使用することをお勧めします。 以下で make の定義をチェックして、いくつかの例を見てください。

サイドノート: コードを別々のプロセスで実行するようにします、 私たちの側で問題があっても (たとえば、巨大な海の怪物がデータセンターを攻撃した場合でも) コードはクラッシュしません。 問題が解決したら、クラーケンが深海に戻る頃に、 wandb sync でデータをログできます。

def make(config):
    # データを作成
    train, test = get_data(train=True), get_data(train=False)
    train_loader = make_loader(train, batch_size=config.batch_size)
    test_loader = make_loader(test, batch_size=config.batch_size)

    # モデルを作成
    model = ConvNet(config.kernels, config.classes).to(device)

    # 損失とオプティマイザーを作成
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(
        model.parameters(), lr=config.learning_rate)
    
    return model, train_loader, test_loader, criterion, optimizer

データのロードとモデルを定義

次に、データがどのようにロードされるか、およびモデルの見た目を指定する必要があります。

これは非常に重要な部分ですが、wandb なしで通常行われるのと何も違いませんので、 ここで詳しくは説明しません。

def get_data(slice=5, train=True):
    full_dataset = torchvision.datasets.MNIST(root=".",
                                              train=train, 
                                              transform=transforms.ToTensor(),
                                              download=True)
    #  [::slice] でスライスするのと同等 
    sub_dataset = torch.utils.data.Subset(
      full_dataset, indices=range(0, len(full_dataset), slice))
    
    return sub_dataset


def make_loader(dataset, batch_size):
    loader = torch.utils.data.DataLoader(dataset=dataset,
                                         batch_size=batch_size, 
                                         shuffle=True,
                                         pin_memory=True, num_workers=2)
    return loader

モデルの定義は通常楽しい部分です。

しかし wandb でも何も変わらないので、 標準的な ConvNet アーキテクチャーにとどまりましょう。

この部分をいじっていくつかの実験を試みるのを恐れないでください – あなたの結果はすべて wandb.ai にログされます。

# 従来の畳み込みニューラルネットワーク

class ConvNet(nn.Module):
    def __init__(self, kernels, classes=10):
        super(ConvNet, self).__init__()
        
        self.layer1 = nn.Sequential(
            nn.Conv2d(1, kernels[0], kernel_size=5, stride=1, padding=2),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        self.layer2 = nn.Sequential(
            nn.Conv2d(16, kernels[1], kernel_size=5, stride=1, padding=2),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        self.fc = nn.Linear(7 * 7 * kernels[-1], classes)
        
    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = out.reshape(out.size(0), -1)
        out = self.fc(out)
        return out

トレーニングロジックを定義

model_pipeline を進めると、train の指定を行う時がきます。

ここでは wandb の 2 つの関数、watchlog が活躍します。

wandb.watch で勾配を追跡し、他のすべてを wandb.log で管理

wandb.watch はトレーニングの log_freq ステップごとに モデルの勾配とパラメータをログします。

トレーニングを開始する前にこれを呼び出すだけで済みます。

トレーニングコードの残りは前と変わらず、 エポックとバッチを繰り返し、 forward pass と backward pass を実行し、 optimizer を適用します。

def train(model, loader, criterion, optimizer, config):
    # モデルを追跡している wandb に情報を伝えます: 勾配、重み、その他。
    wandb.watch(model, criterion, log="all", log_freq=10)

    # トレーニングを実行し wandb で追跡
    total_batches = len(loader) * config.epochs
    example_ct = 0  # これまでに見た例の数
    batch_ct = 0
    for epoch in tqdm(range(config.epochs)):
        for _, (images, labels) in enumerate(loader):

            loss = train_batch(images, labels, model, optimizer, criterion)
            example_ct +=  len(images)
            batch_ct += 1

            # 25 バッチおきにメトリクスを報告
            if ((batch_ct + 1) % 25) == 0:
                train_log(loss, example_ct, epoch)


def train_batch(images, labels, model, optimizer, criterion):
    images, labels = images.to(device), labels.to(device)
    
    # Forward pass ➡
    outputs = model(images)
    loss = criterion(outputs, labels)
    
    # Backward pass ⬅
    optimizer.zero_grad()
    loss.backward()

    # オプティマイザーでステップ
    optimizer.step()

    return loss

唯一の違いは、記録のコードです: 以前ターミナルにメトリクスを報告していたところを、 wandb.log に同じ情報を渡します。

wandb.log は文字列をキーとする辞書を期待します。 これらの文字列は、ログされるオブジェクトを識別し、値を構成します。 また、オプションでトレーニングのどの step にいるかをログできます。

サイドノート: 私はモデルが見た例の数を使用するのが好きです、 これはバッチサイズを超えて比較しやすくなるためですが、 生のステップやバッチカウントを使用することもできます。 長いトレーニングランの場合、epoch 単位でログすることも理にかなっているかもしれません。

def train_log(loss, example_ct, epoch):
    # マジックが起こるところ
    wandb.log({"epoch": epoch, "loss": loss}, step=example_ct)
    print(f"Loss after {str(example_ct).zfill(5)} examples: {loss:.3f}")

テストロジックを定義

モデルのトレーニングが完了したら、テストしてみましょう: 本番環境から新たなデータに対して実行したり、 手作業で準備した例に適用したりします。

(オプション) wandb.save を呼び出す

この時点で、モデルのアーキテクチャと最終パラメータをディスクに保存するのも良い時です。 最大の互換性を考慮して、モデルを Open Neural Network eXchange (ONNX) フォーマット でエクスポートします。

そのファイル名を wandb.save に渡すことで、モデルのパラメータが W&B のサーバーに保存されます: もはやどの .h5.pb がどのトレーニングrunに対応しているのかを見失うことはありません。

モデルの保存、バージョン管理、配布のための、高度な wandb 機能については、 Artifacts ツールをご覧ください。

def test(model, test_loader):
    model.eval()

    # いくつかのテスト例にモデルを実行
    with torch.no_grad():
        correct, total = 0, 0
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        print(f"Accuracy of the model on the {total} " +
              f"test images: {correct / total:%}")
        
        wandb.log({"test_accuracy": correct / total})

    # 交換可能な ONNX フォーマットでモデルを保存
    torch.onnx.export(model, images, "model.onnx")
    wandb.save("model.onnx")

トレーニングを実行し、wandb.ai でライブメトリクスを確認

さて、全体のパイプラインを定義し、数行の W&B コードを追加したら、 完全に追跡された実験を実行する準備が整いました。

いくつかのリンクを報告します: ドキュメンテーション、 プロジェクトページ、これにはプロジェクトのすべての run が整理されています。 この run の結果が保存されるランページ。

ランページに移動して、これらのタブを確認:

  1. Charts、トレーニング中にモデルの勾配、パラメーター値、損失がログされます
  2. System、ここにはディスク I/O 応答率、CPU および GPU メトリクス(温度上昇も監視)、その他のさまざまなシステムメトリクスが含まれます
  3. Logs、トレーニング中に標準出力にプッシュされたもののコピーがあります
  4. Files、トレーニングが完了すると、model.onnx をクリックして Netron モデルビューア でネットワークを表示できます。

with wandb.init ブロック終了時にランが終了すると、 結果の要約もセルの出力で印刷されます。

# パイプラインでモデルを構築、トレーニング、分析
model = model_pipeline(config)

ハイパーパラメータをスイープでテスト

この例では、ハイパーパラメータのセットを 1 つしか見ていません。 しかし、ほとんどの ML ワークフローの重要な部分は、 多くのハイパーパラメータを反復することです。

Weights & Biases Sweeps を使用してハイパーパラメータのテストを自動化し、モデルと最適化戦略の空間を探索できます。

W&B Sweeps を使用した PyTorch におけるハイパーパラメータ最適化をチェック

Weights & Biases を使用したハイパーパラメータ探索です。非常に簡単です。たった3つの簡単なステップがあります:

  1. スイープを定義する: これは、検索するパラメータ、検索戦略、最適化メトリクスなどを指定する辞書またはYAML ファイルを作成することで行います。

  2. スイープを初期化する: sweep_id = wandb.sweep(sweep_config)

  3. スイープエージェントを実行する: wandb.agent(sweep_id, function=train)

これだけでハイパーパラメータ探索が実行できます。

W&B で追跡および視覚化されたプロジェクトの例をギャラリー →でご覧ください。

Advanced Setup

  1. 環境変数: 環境変数に API キーを設定して、管理されたクラスターでトレーニングを実行できます。
  2. オフラインモード: dryrun モードを使用してオフラインでトレーニングし、後で結果を同期。
  3. オンプレ: プライベートクラウドまたはエアギャップされたサーバーに W&B をインストールします。学術機関から企業チームまで、すべての人に向けたローカルインストールを提供。
  4. スイープ: ハイパーパラメータ探索を迅速に設定できる、チューニングのための軽量ツール。

2 - PyTorch Lightning

私たちは PyTorch Lightning を使用して画像分類パイプラインを構築します。このスタイルガイドに従って、コードの読みやすさと再現性を高めます。このすばらしい説明はこちらで利用可能です。

PyTorch Lightning と W&B のセットアップ

このチュートリアルでは、PyTorch Lightning と Weights & Biases が必要です。

pip install lightning -q
pip install wandb -qU
import lightning.pytorch as pl

# あなたのお気に入りの機械学習トラッキングツール
from lightning.pytorch.loggers import WandbLogger

import torch
from torch import nn
from torch.nn import functional as F
from torch.utils.data import random_split, DataLoader

from torchmetrics import Accuracy

from torchvision import transforms
from torchvision.datasets import CIFAR10

import wandb

これで、wandb アカウントにログインする必要があります。

wandb.login()

DataModule - 私たちが求めるデータパイプライン

DataModules は、データに関連するフックを LightningModule から分離する方法であり、データセットに依存しないモデルを開発できます。

これは、データパイプラインを1つの共有可能で再利用可能なクラスにまとめます。データモジュールは PyTorch のデータプロセッシングに関わる5つのステップをカプセル化します:

  • ダウンロード / トークン化 / プロセス。
  • クリーンし、(場合によっては)ディスクに保存。
  • データセット内にロード。
  • 変換を適用(回転、トークン化など)。
  • DataLoader 内にラップ。

データモジュールについて詳しくはこちらをご覧ください。Cifar-10 データセット用のデータモジュールを構築しましょう。

class CIFAR10DataModule(pl.LightningDataModule):
    def __init__(self, batch_size, data_dir: str = './'):
        super().__init__()
        self.data_dir = data_dir
        self.batch_size = batch_size

        self.transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
        ])
        
        self.num_classes = 10
    
    def prepare_data(self):
        CIFAR10(self.data_dir, train=True, download=True)
        CIFAR10(self.data_dir, train=False, download=True)
    
    def setup(self, stage=None):
        # データローダーで使用する train/val データセットを割り当て
        if stage == 'fit' or stage is None:
            cifar_full = CIFAR10(self.data_dir, train=True, transform=self.transform)
            self.cifar_train, self.cifar_val = random_split(cifar_full, [45000, 5000])

        # データローダーで使用するテストデータセットを割り当て
        if stage == 'test' or stage is None:
            self.cifar_test = CIFAR10(self.data_dir, train=False, transform=self.transform)
    
    def train_dataloader(self):
        return DataLoader(self.cifar_train, batch_size=self.batch_size, shuffle=True)

    def val_dataloader(self):
        return DataLoader(self.cifar_val, batch_size=self.batch_size)

    def test_dataloader(self):
        return DataLoader(self.cifar_test, batch_size=self.batch_size)

コールバック

コールバックは、プロジェクト間で再利用可能な自己完結型プログラムです。PyTorch Lightning は、定期的に使用されるいくつかの組み込みコールバックを提供しています。 PyTorch Lightning のコールバックについて詳しくはこちらをご覧ください。

組み込みコールバック

このチュートリアルでは、Early StoppingModel Checkpoint の組み込みコールバックを使用します。それらは Trainer に渡すことができます。

カスタムコールバック

カスタム Keras コールバックに慣れている場合、PyTorch パイプラインで同じことができる能力は、まさにケーキの上のさくらんぼです。

画像分類を実行しているため、モデルのいくつかの画像サンプルに対する予測を視覚化する能力は役立つかもしれません。このコールバックの形式で提供されることで、モデルを早期段階でデバッグするのに役立ちます。

class ImagePredictionLogger(pl.callbacks.Callback):
    def __init__(self, val_samples, num_samples=32):
        super().__init__()
        self.num_samples = num_samples
        self.val_imgs, self.val_labels = val_samples
    
    def on_validation_epoch_end(self, trainer, pl_module):
        # テンソルを CPU に移動
        val_imgs = self.val_imgs.to(device=pl_module.device)
        val_labels = self.val_labels.to(device=pl_module.device)
        # モデル予測を取得
        logits = pl_module(val_imgs)
        preds = torch.argmax(logits, -1)
        # wandb Image として画像をログ
        trainer.logger.experiment.log({
            "examples":[wandb.Image(x, caption=f"Pred:{pred}, Label:{y}") 
                           for x, pred, y in zip(val_imgs[:self.num_samples], 
                                                 preds[:self.num_samples], 
                                                 val_labels[:self.num_samples])]
            })
        

LightningModule - システムの定義

LightningModule はシステムを定義し、モデルではありません。ここでシステムはすべての研究コードを1つのクラスにまとめて自己完結型にします。LightningModule は PyTorch コードを5つのセクションに整理します:

  • 計算(__init__
  • トレーニングループ(training_step
  • 検証ループ(validation_step
  • テストループ(test_step
  • オプティマイザー(configure_optimizers

このようにして、容易に共有できるデータセットに依存しないモデルを構築できます。Cifar-10 分類のためのシステムを構築しましょう。

class LitModel(pl.LightningModule):
    def __init__(self, input_shape, num_classes, learning_rate=2e-4):
        super().__init__()
        
        # ハイパーパラメーターをログ
        self.save_hyperparameters()
        self.learning_rate = learning_rate
        
        self.conv1 = nn.Conv2d(3, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 32, 3, 1)
        self.conv3 = nn.Conv2d(32, 64, 3, 1)
        self.conv4 = nn.Conv2d(64, 64, 3, 1)

        self.pool1 = torch.nn.MaxPool2d(2)
        self.pool2 = torch.nn.MaxPool2d(2)
        
        n_sizes = self._get_conv_output(input_shape)

        self.fc1 = nn.Linear(n_sizes, 512)
        self.fc2 = nn.Linear(512, 128)
        self.fc3 = nn.Linear(128, num_classes)

        self.accuracy = Accuracy(task='multiclass', num_classes=num_classes)

    # convブロックからLinear層に渡される出力テンソルのサイズを返します。
    def _get_conv_output(self, shape):
        batch_size = 1
        input = torch.autograd.Variable(torch.rand(batch_size, *shape))

        output_feat = self._forward_features(input) 
        n_size = output_feat.data.view(batch_size, -1).size(1)
        return n_size
        
    # convブロックからの特徴テンソルを返します
    def _forward_features(self, x):
        x = F.relu(self.conv1(x))
        x = self.pool1(F.relu(self.conv2(x)))
        x = F.relu(self.conv3(x))
        x = self.pool2(F.relu(self.conv4(x)))
        return x
    
    # 推論中に使用されます
    def forward(self, x):
       x = self._forward_features(x)
       x = x.view(x.size(0), -1)
       x = F.relu(self.fc1(x))
       x = F.relu(self.fc2(x))
       x = F.log_softmax(self.fc3(x), dim=1)
       
       return x
    
    def training_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = F.nll_loss(logits, y)
        
        # トレーニングメトリクス
        preds = torch.argmax(logits, dim=1)
        acc = self.accuracy(preds, y)
        self.log('train_loss', loss, on_step=True, on_epoch=True, logger=True)
        self.log('train_acc', acc, on_step=True, on_epoch=True, logger=True)
        
        return loss
    
    def validation_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = F.nll_loss(logits, y)

        # 検証メトリクス
        preds = torch.argmax(logits, dim=1)
        acc = self.accuracy(preds, y)
        self.log('val_loss', loss, prog_bar=True)
        self.log('val_acc', acc, prog_bar=True)
        return loss
    
    def test_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = F.nll_loss(logits, y)
        
        # 検証メトリクス
        preds = torch.argmax(logits, dim=1)
        acc = self.accuracy(preds, y)
        self.log('test_loss', loss, prog_bar=True)
        self.log('test_acc', acc, prog_bar=True)
        return loss
    
    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=self.learning_rate)
        return optimizer

トレーニングと評価

DataModule を使用してデータパイプラインを整理し、 LightningModule を使用してモデルアーキテクチャ+トレーニングループを整理したので、PyTorch Lightning の Trainer が他のすべてを自動化します。

Trainer は次のことを自動化します:

  • エポックとバッチの反復
  • optimizer.step(), backward, zero_grad() の呼び出し
  • .eval() の呼び出し、グラッドの有効化/無効化
  • 重みの保存と読み込み
  • Weights & Biases ログ
  • マルチ GPU トレーニングサポート
  • TPU サポート
  • 16 ビットトレーニングサポート
dm = CIFAR10DataModule(batch_size=32)
# x_dataloader にアクセスするには、prepare_data および setup を呼び出す必要があります。
dm.prepare_data()
dm.setup()

# 画像予測をログするカスタム ImagePredictionLogger コールバックに必要なサンプル。
val_samples = next(iter(dm.val_dataloader()))
val_imgs, val_labels = val_samples[0], val_samples[1]
val_imgs.shape, val_labels.shape
model = LitModel((3, 32, 32), dm.num_classes)

# wandb ロガーを初期化
wandb_logger = WandbLogger(project='wandb-lightning', job_type='train')

# コールバックを初期化
early_stop_callback = pl.callbacks.EarlyStopping(monitor="val_loss")
checkpoint_callback = pl.callbacks.ModelCheckpoint()

# トレーナーを初期化
trainer = pl.Trainer(max_epochs=2,
                     logger=wandb_logger,
                     callbacks=[early_stop_callback,
                                ImagePredictionLogger(val_samples),
                                checkpoint_callback],
                     )

# モデルのトレーニング
trainer.fit(model, dm)

# 保留中のテストセットでモデルを評価 ⚡⚡
trainer.test(dataloaders=dm.test_dataloader())

# wandb run を閉じる
wandb.finish()

最終的な考え

私は TensorFlow/Keras エコシステムから来ており、PyTorch は洗練されたフレームワークであるにもかかわらず、ちょっと難しいと感じています。個人的な経験にすぎませんが。PyTorch Lightning を探索して、私が PyTorch から遠ざけていた理由のほとんどが解消されていることに気づきました。ここに私の興奮の概要があります:

  • 以前: 従来の PyTorch モデル定義はバラバラでした。モデルは model.py スクリプトに、トレーニングループは train.py ファイルにありました。パイプラインを理解するためには多くの見直しが必要でした。
  • 現在: LightningModule は、モデルが training_stepvalidation_step などと共に定義されているシステムとして機能します。今ではモジュール化され、共有可能です。
  • 以前: TensorFlow/Keras の最良の部分は入力データパイプラインでした。彼らのデータセットカタログは豊富で成長しています。PyTorch のデータパイプラインは、かつて最大の痛点でした。通常の PyTorch コードでは、データのダウンロード/クリーニング/準備は通常、多くのファイルに分散しています。
  • 現在: DataModule は、データパイプラインを1つの共有可能で再利用可能なクラスに組織します。それは単に train_dataloaderval_dataloader(s)、test_dataloader(s) と、必要な変換やデータプロセッシング/ダウンロードステップの集まりです。
  • 以前: Keras では model.fit を呼び出してモデルをトレーニングし、 model.predict で推論を実行することができました。model.evaluate は、テストデータに基づく昔ながらのシンプルな評価を提供しましたが、これは PyTorch ではありませんでした。通常、別々の train.py および test.py ファイルが見つかります。
  • 現在: LightningModule が整備されることで、Trainer がすべてを自動化します。ただ trainer.fittrainer.test を呼び出してモデルをトレーニングと評価すればよいのです。
  • 以前: TensorFlow は TPU を好む、PyTorch は…
  • 現在: PyTorch Lightning では、複数の GPU で同じモデルをトレーニングするのがとても簡単ですし、TPU でも可能です。
  • 以前: 私はコールバックの大ファンで、カスタムコールバックを書くことを好んでいます。従来の PyTorch では、Early Stopping のような些細なことが議論の対象になることがありました。
  • 現在: PyTorch Lightning を使用すると、Early Stopping と Model Checkpointing が簡単です。カスタムコールバックを書くことさえもできます。

🎨 結論とリソース

このレポートが役に立つことを願っています。コードを試して、好きなデータセットで画像分類器をトレーニングすることをお勧めします。

PyTorch Lightningについてもっと学ぶためのリソース:

3 - Hugging Face

Hugging Face モデルのパフォーマンスをシームレスな W&B インテグレーションで素早く可視化しましょう。

ハイパーパラメーター、アウトプットメトリクス、GPU利用率などのシステム統計をモデル間で比較します。

なぜW&Bを使うべきか?

  • 統一されたダッシュボード: モデルのすべてのメトリクスと予測のための中央リポジトリ
  • 軽量: Hugging Faceとのインテグレーションにコード変更は不要
  • アクセス可能: 個人や学術チームには無料
  • セキュア: すべてのプロジェクトはデフォルトでプライベート
  • 信頼性: OpenAI、トヨタ、Lyftなどの機械学習チームで使用されている

W&Bを機械学習モデル用のGitHubのように考えてください。プライベートでホストされたダッシュボードに機械学習の実験管理を保存します。スクリプトをどこで実行しても、モデルのすべてのバージョンが保存されることを確信して、素早く実験できます。

W&Bの軽量なインテグレーションは、任意のPythonスクリプトで動作し、モデルのトラッキングと可視化を開始するには無料のW&Bアカウントにサインアップするだけです。

Hugging Face Transformersレポジトリでは、Trainingと評価メトリクスを各ログステップでW&Bに自動的にログするようにTrainerを設定しました。

インテグレーションの仕組みを詳しく見るにはこちら: Hugging Face + W&B Report

インストール、インポート、ログイン

このチュートリアルのためにHugging FaceとWeights & Biasesのライブラリ、GLUEデータセット、トレーニングスクリプトをインストールします。

!pip install datasets wandb evaluate accelerate -qU
!wget https://raw.githubusercontent.com/huggingface/transformers/refs/heads/main/examples/pytorch/text-classification/run_glue.py
# run_glue.pyスクリプトはtransformers devを必要とします
!pip install -q git+https://github.com/huggingface/transformers

続行する前に、無料アカウントにサインアップしてください

APIキーを入力

サインアップしたら、次のセルを実行してリンクをクリックし、APIキーを取得してこのノートブックを認証してください。

import wandb
wandb.login()

オプションで、W&Bロギングをカスタマイズするために環境変数を設定できます。ドキュメントを参照してください。

# オプション: 勾配とパラメータの両方をログします
%env WANDB_WATCH=all

モデルをトレーニング

次に、ダウンロードしたトレーニングスクリプト run_glue.py を呼び出し、トレーニングがWeights & Biasesダッシュボードに自動的にトラックされるのを確認します。このスクリプトは、Microsoft Research Paraphrase CorpusでBERTをファインチューンし、意味的に同等であることを示す人間の注釈付きの文のペアを使用します。

%env WANDB_PROJECT=huggingface-demo
%env TASK_NAME=MRPC

!python run_glue.py \
  --model_name_or_path bert-base-uncased \
  --task_name $TASK_NAME \
  --do_train \
  --do_eval \
  --max_seq_length 256 \
  --per_device_train_batch_size 32 \
  --learning_rate 2e-4 \
  --num_train_epochs 3 \
  --output_dir /tmp/$TASK_NAME/ \
  --overwrite_output_dir \
  --logging_steps 50

ダッシュボードで結果を可視化

上記で印刷されたリンクをクリックするか、wandb.ai にアクセスして、結果がリアルタイムでストリームされるのを確認してください。ブラウザでrunを表示するリンクは、すべての依存関係がロードされた後に表示されます。次のような出力を探します: “wandb: 🚀 View run at [URL to your unique run]”

モデルのパフォーマンスを可視化 数十の実験管理を一目で確認し、興味深い学びにズームインし、高次元のデータを可視化するのは簡単です。

アーキテクチャーを比較 こちらは BERT vs DistilBERT を比較する例です。異なるアーキテクチャーがトレーニング中の評価精度にどのように影響するかを、自動ラインプロット可視化で簡単に確認できます。

重要な情報をデフォルトで簡単にトラック

Weights & Biasesは、各実験で新しいrunを保存します。デフォルトで保存される情報は次の通りです:

  • ハイパーパラメーター: モデルの設定がConfigに保存されます
  • モデルメトリクス: ストリーミングメトリクスの時系列データはLogに保存されます
  • ターミナルログ: コマンドラインの出力は保存され、タブで利用可能です
  • システムメトリクス: GPUとCPUの使用率、メモリ、温度など

詳しく知る

  • ドキュメント: Weights & BiasesとHugging Faceのインテグレーションに関するドキュメント
  • ビデオ: YouTubeチャンネルでのチュートリアル、実務者とのインタビュー、その他
  • お問い合わせ: contact@wandb.com までご質問をお寄せください

4 - TensorFlow

このノートブックでカバーする内容

  • Weights & Biases と TensorFlow パイプラインの簡単なインテグレーションによる実験管理。
  • keras.metrics を使用したメトリクスの計算
  • wandb.log を使用して、カスタムトレーニングループでこれらのメトリクスをログに記録する方法。
ダッシュボード

: ステップ から始まるセクションは、既存のコードに W&B を統合するために必要なすべてです。それ以外は通常の MNIST の例です。

import os

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.datasets import cifar10

インストール、インポート、ログイン

W&B のインストール

%%capture
!pip install wandb

W&B のインポートとログイン

import wandb
from wandb.integration.keras import WandbMetricsLogger

wandb.login()

サイドノート: もしこれが初めて W&B を使う場合や、まだログインしていない場合、wandb.login() 実行後に表示されるリンクはサインアップ/ログインページに移動します。サインアップはワンクリックで簡単です。

データセットの準備

# トレーニング用データセットの準備
BATCH_SIZE = 64
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
x_train = np.reshape(x_train, (-1, 784))
x_test = np.reshape(x_test, (-1, 784))

# tf.data を使用して入力パイプラインを構築
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(BATCH_SIZE)

val_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test))
val_dataset = val_dataset.batch(BATCH_SIZE)

モデルとトレーニングループの定義

def make_model():
    inputs = keras.Input(shape=(784,), name="digits")
    x1 = keras.layers.Dense(64, activation="relu")(inputs)
    x2 = keras.layers.Dense(64, activation="relu")(x1)
    outputs = keras.layers.Dense(10, name="predictions")(x2)

    return keras.Model(inputs=inputs, outputs=outputs)
def train_step(x, y, model, optimizer, loss_fn, train_acc_metric):
    with tf.GradientTape() as tape:
        logits = model(x, training=True)
        loss_value = loss_fn(y, logits)

    grads = tape.gradient(loss_value, model.trainable_weights)
    optimizer.apply_gradients(zip(grads, model.trainable_weights))

    train_acc_metric.update_state(y, logits)

    return loss_value
def test_step(x, y, model, loss_fn, val_acc_metric):
    val_logits = model(x, training=False)
    loss_value = loss_fn(y, val_logits)
    val_acc_metric.update_state(y, val_logits)

    return loss_value

トレーニングループに wandb.log を追加

def train(
    train_dataset,
    val_dataset,
    model,
    optimizer,
    train_acc_metric,
    val_acc_metric,
    epochs=10,
    log_step=200,
    val_log_step=50,
):
    for epoch in range(epochs):
        print("\nStart of epoch %d" % (epoch,))

        train_loss = []
        val_loss = []

        # データセットのバッチに対して繰り返し処理
        for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
            loss_value = train_step(
                x_batch_train,
                y_batch_train,
                model,
                optimizer,
                loss_fn,
                train_acc_metric,
            )
            train_loss.append(float(loss_value))

        # 各エポックの終了時に検証ループを実行
        for step, (x_batch_val, y_batch_val) in enumerate(val_dataset):
            val_loss_value = test_step(
                x_batch_val, y_batch_val, model, loss_fn, val_acc_metric
            )
            val_loss.append(float(val_loss_value))

        # 各エポックの終了時にメトリクスを表示
        train_acc = train_acc_metric.result()
        print("Training acc over epoch: %.4f" % (float(train_acc),))

        val_acc = val_acc_metric.result()
        print("Validation acc: %.4f" % (float(val_acc),))

        # 各エポックの終了時にメトリクスをリセット
        train_acc_metric.reset_states()
        val_acc_metric.reset_states()

        # ⭐: wandb.log を使用してメトリクスをログに記録
        wandb.log(
            {
                "epochs": epoch,
                "loss": np.mean(train_loss),
                "acc": float(train_acc),
                "val_loss": np.mean(val_loss),
                "val_acc": float(val_acc),
            }
        )

トレーニングを実行

wandb.init を呼び出して run を開始

これにより、実験を起動したことがわかり、ユニークな ID とダッシュボードを提供できます。

公式ドキュメントをチェック

# プロジェクト名で wandb を初期化し、設定をオプションで指定します。
# 設定の値を変えて、wandb ダッシュボードでの結果を確認してください。
config = {
    "learning_rate": 0.001,
    "epochs": 10,
    "batch_size": 64,
    "log_step": 200,
    "val_log_step": 50,
    "architecture": "CNN",
    "dataset": "CIFAR-10",
}

run = wandb.init(project='my-tf-integration', config=config)
config = run.config

# モデルの初期化
model = make_model()

# モデルをトレーニングするためのオプティマイザーをインスタンス化
optimizer = keras.optimizers.SGD(learning_rate=config.learning_rate)
# 損失関数をインスタンス化
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)

# メトリクスを準備
train_acc_metric = keras.metrics.SparseCategoricalAccuracy()
val_acc_metric = keras.metrics.SparseCategoricalAccuracy()

train(
    train_dataset,
    val_dataset, 
    model,
    optimizer,
    train_acc_metric,
    val_acc_metric,
    epochs=config.epochs, 
    log_step=config.log_step, 
    val_log_step=config.val_log_step,
)

run.finish()  # Jupyter/Colab では、完了したことを知らせます!

結果を可視化

上記の run ページ リンクをクリックして、ライブ結果を確認してください。

Sweep 101

Weights & Biases Sweeps を使用してハイパーパラメータの最適化を自動化し、可能なモデルのスペースを探索しましょう。

Weights & Biases Sweeps を使用した TensorFlow におけるハイパーパラメータ最適化をチェック

W&B Sweeps を使用するメリット

  • 簡単なセットアップ: 数行のコードで W&B sweeps を実行できます。
  • 透明性: 使用するアルゴリズムをすべて引用しており、コードはオープンソースです。
  • 強力: スイープは完全にカスタマイズ可能で設定可能です。何十台ものマシンでスイープを起動することができ、それはラップトップでスイープを開始するのと同じくらい簡単です。
スイープ結果

サンプルギャラリー

W&B を使って記録・可視化されたプロジェクトの例を見ることができます。Fully Connected →

ベストプラクティス

  1. Projects: 複数の runs をプロジェクトにログして、それらを比較します。 wandb.init(project="project-name")
  2. Groups: 複数のプロセスや交差検証フォールドの場合は、それぞれのプロセスを個別の run としてログし、まとめてグループ化します。 wandb.init(group="experiment-1")
  3. Tags: 現在のベースラインやプロダクションモデルを追跡するためにタグを追加します。
  4. Notes: テーブルにメモを入力して、runs 間の変更を追跡します。
  5. Reports: 進捗について同僚と共有するために迅速にメモを取り、ML プロジェクトのダッシュボードとスナップショットを作成します。

高度なセットアップ

  1. 環境変数: APIキーを環境変数に設定して、管理されたクラスターでトレーニングを実行できるようにします。
  2. オフラインモード
  3. オンプレミス: W&B をプライベートクラウドやエアギャップサーバーのあなたのインフラストラクチャ上にインストールします。学術的なユーザーから企業間のチームまで、みんなのためにローカルインストールを提供しています。
  4. Artifacts: モデルとデータセットを一元化された方法で追跡し、バージョン管理することで、モデルをトレーニングする際にパイプラインステップを自動的にキャプチャします。

5 - TensorFlow スイープ

W&B を使用して、機械学習実験管理、データセットのバージョン管理、プロジェクトコラボレーションを行いましょう。

W&B Sweeps を使用してハイパーパラメーターの最適化を自動化し、インタラクティブな ダッシュボードでモデルの可能性を探りましょう:

なぜ sweeps を使うのか

  • クイックセットアップ: W&B sweeps を数行のコードで実行。
  • 透明性: このプロジェクトは使用されるすべてのアルゴリズムを引用し、コードはオープンソースです。
  • 強力: Sweeps はカスタマイズオプションを提供し、複数のマシンやノートパソコンで簡単に実行できます。

詳しくは、Sweep ドキュメントを参照してください。

このノートブックで扱う内容

  • W&B Sweep と TensorFlow でのカスタム トレーニングループを開始する手順。
  • 画像分類タスクのための最適なハイパーパラメーターを見つけること。

注意: Step から始まるセクションには、ハイパーパラメータースイープを実行するために必要なコードが示されています。それ以外はシンプルな例を設定します。

インストール、インポート、ログイン

W&B のインストール

pip install wandb

W&B のインポートとログイン

import tqdm
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.datasets import cifar10

import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import wandb
from wandb.integration.keras import WandbMetricsLogger

wandb.login()

データセットの準備

# トレーニングデータセットの準備
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()

x_train = x_train / 255.0
x_test = x_test / 255.0
x_train = np.reshape(x_train, (-1, 784))
x_test = np.reshape(x_test, (-1, 784))

分類器 MLP の構築

def Model():
    inputs = keras.Input(shape=(784,), name="digits")
    x1 = keras.layers.Dense(64, activation="relu")(inputs)
    x2 = keras.layers.Dense(64, activation="relu")(x1)
    outputs = keras.layers.Dense(10, name="predictions")(x2)

    return keras.Model(inputs=inputs, outputs=outputs)


def train_step(x, y, model, optimizer, loss_fn, train_acc_metric):
    with tf.GradientTape() as tape:
        logits = model(x, training=True)
        loss_value = loss_fn(y, logits)

    grads = tape.gradient(loss_value, model.trainable_weights)
    optimizer.apply_gradients(zip(grads, model.trainable_weights))

    train_acc_metric.update_state(y, logits)

    return loss_value


def test_step(x, y, model, loss_fn, val_acc_metric):
    val_logits = model(x, training=False)
    loss_value = loss_fn(y, val_logits)
    val_acc_metric.update_state(y, val_logits)

    return loss_value

トレーニングループの記述

def train(
    train_dataset,
    val_dataset,
    model,
    optimizer,
    loss_fn,
    train_acc_metric,
    val_acc_metric,
    epochs=10,
    log_step=200,
    val_log_step=50,
):

    for epoch in range(epochs):
        print("\nStart of epoch %d" % (epoch,))

        train_loss = []
        val_loss = []

        # データセットのバッチを繰り返す
        for step, (x_batch_train, y_batch_train) in tqdm.tqdm(
            enumerate(train_dataset), total=len(train_dataset)
        ):
            loss_value = train_step(
                x_batch_train,
                y_batch_train,
                model,
                optimizer,
                loss_fn,
                train_acc_metric,
            )
            train_loss.append(float(loss_value))

        # 各エポックの終わりに検証ループを実行
        for step, (x_batch_val, y_batch_val) in enumerate(val_dataset):
            val_loss_value = test_step(
                x_batch_val, y_batch_val, model, loss_fn, val_acc_metric
            )
            val_loss.append(float(val_loss_value))

        # 各エポック終了時にメトリクスを表示
        train_acc = train_acc_metric.result()
        print("Training acc over epoch: %.4f" % (float(train_acc),))

        val_acc = val_acc_metric.result()
        print("Validation acc: %.4f" % (float(val_acc),))

        # 各エポック終了時にメトリクスをリセット
        train_acc_metric.reset_states()
        val_acc_metric.reset_states()

        # 3️⃣ wandb.log を使用してメトリクスをログ
        wandb.log(
            {
                "epochs": epoch,
                "loss": np.mean(train_loss),
                "acc": float(train_acc),
                "val_loss": np.mean(val_loss),
                "val_acc": float(val_acc),
            }
        )

sweep を設定する

sweep を設定する手順:

  • 最適化するハイパーパラメーターを定義する
  • 最適化メソッドを選択する: randomgrid、または bayes
  • bayes の目標とメトリクスを設定する。例えば val_loss を最小化する
  • hyperband を使用して、実行中のものを早期終了する

詳しくは W&B Sweeps ドキュメントを参照してください。

sweep_config = {
    "method": "random",
    "metric": {"name": "val_loss", "goal": "minimize"},
    "early_terminate": {"type": "hyperband", "min_iter": 5},
    "parameters": {
        "batch_size": {"values": [32, 64, 128, 256]},
        "learning_rate": {"values": [0.01, 0.005, 0.001, 0.0005, 0.0001]},
    },
}

トレーニングループを包む

wandb.config を使用してハイパーパラメーターを設定してから train を呼び出すような関数 sweep_train を作成します。

def sweep_train(config_defaults=None):
    # デフォルト値の設定
    config_defaults = {"batch_size": 64, "learning_rate": 0.01}
    # サンプルプロジェクト名で wandb を初期化
    wandb.init(config=config_defaults)  # これは Sweep で上書きされます

    # 他のハイパーパラメータを設定に指定、ある場合
    wandb.config.epochs = 2
    wandb.config.log_step = 20
    wandb.config.val_log_step = 50
    wandb.config.architecture_name = "MLP"
    wandb.config.dataset_name = "MNIST"

    # tf.data を使用して入力パイプラインを構築
    train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
    train_dataset = (
        train_dataset.shuffle(buffer_size=1024)
        .batch(wandb.config.batch_size)
        .prefetch(buffer_size=tf.data.AUTOTUNE)
    )

    val_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test))
    val_dataset = val_dataset.batch(wandb.config.batch_size).prefetch(
        buffer_size=tf.data.AUTOTUNE
    )

    # モデルを初期化
    model = Model()

    # モデルをトレーニングするためのオプティマイザーをインスタンス化
    optimizer = keras.optimizers.SGD(learning_rate=wandb.config.learning_rate)
    # 損失関数をインスタンス化
    loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)

    # メトリクスを準備
    train_acc_metric = keras.metrics.SparseCategoricalAccuracy()
    val_acc_metric = keras.metrics.SparseCategoricalAccuracy()

    train(
        train_dataset,
        val_dataset,
        model,
        optimizer,
        loss_fn,
        train_acc_metric,
        val_acc_metric,
        epochs=wandb.config.epochs,
        log_step=wandb.config.log_step,
        val_log_step=wandb.config.val_log_step,
    )

sweep を初期化し、パーソナルデジタルアシスタントを実行

sweep_id = wandb.sweep(sweep_config, project="sweeps-tensorflow")

count パラメーターで実行の数を制限します。迅速な実行のために 10 に設定します。必要に応じて増やしてください。

wandb.agent(sweep_id, function=sweep_train, count=10)

結果を視覚化

ライブ結果を見るには、先行する Sweep URL リンクをクリックしてください。

サンプルギャラリー

W&B で追跡および視覚化されたプロジェクトを Gallery で探索してください。

ベストプラクティス

  1. Projects: 複数の実行をプロジェクトに記録し、それらを比較します。wandb.init(project="project-name")
  2. Groups: 複数プロセスまたはクロスバリデーション折りたたみのために各プロセスを run として記録し、それらをグループ化します。wandb.init(group='experiment-1')
  3. Tags: ベースラインまたはプロダクションモデルを追跡するためにタグを使用します。
  4. Notes: テーブルのメモに run 間の変更を追跡するためのメモを入力します。
  5. Reports: レポートを使用して進捗メモを作成し、同僚と共有し、MLプロジェクトのダッシュボードとスナップショットを作成します。

高度なセットアップ

  1. 環境変数: 管理されたクラスターでのトレーニングのために API キーを設定します。
  2. オフラインモード
  3. オンプレミス: W&B をプライベートクラウドまたはインフラストラクチャ内のエアギャップサーバーにインストールします。ローカルインストールは学術機関および企業チームに適しています。

6 - 3D 脳腫瘍セグメンテーション with MONAI

このチュートリアルでは、MONAIを使用して、マルチラベルの3D脳腫瘍セグメンテーションタスクのトレーニングワークフローを構築し、Weights & Biasesの実験管理とデータ可視化機能を使用する方法をデモンストレーションします。チュートリアルには以下の機能が含まれています。

  1. Weights & Biasesのrunを初期化し、再現性を確保するためにrunに関連するすべての設定を同期。
  2. MONAIトランスフォームAPI:
    1. 辞書形式のデータ用のMONAIトランスフォーム。
    2. MONAIのtransforms APIに従った新しいトランスフォームの定義方法。
    3. データ拡張のための強度をランダムに調整する方法。
  3. データのロードと可視化:
    1. メタデータでNifti画像をロードし、画像のリストをロードしてスタック。
    2. IOとトランスフォームをキャッシュしてトレーニングと検証を加速。
    3. wandb.TableとWeights & Biasesでインタラクティブなセグメンテーションオーバーレイを用いてデータを可視化。
  4. 3D SegResNetモデルのトレーニング
    1. MONAIのnetworks, losses, metrics APIsを使用。
    2. PyTorchトレーニングループを使用して3D SegResNetモデルをトレーニング。
    3. Weights & Biasesを使用してトレーニングの実験管理を追跡。
    4. Weights & Biases上でモデルのチェックポイントをモデルアーティファクトとしてログとバージョン管理。
  5. wandb.TableとWeights & Biasesでインタラクティブなセグメンテーションオーバーレイを使用して、検証データセット上の予測を可視化して比較。

セットアップとインストール

まず、MONAIとWeights & Biasesの最新バージョンをインストールします。

!python -c "import monai" || pip install -q -U "monai[nibabel, tqdm]"
!python -c "import wandb" || pip install -q -U wandb
import os

import numpy as np
from tqdm.auto import tqdm
import wandb

from monai.apps import DecathlonDataset
from monai.data import DataLoader, decollate_batch
from monai.losses import DiceLoss
from monai.inferers import sliding_window_inference
from monai.metrics import DiceMetric
from monai.networks.nets import SegResNet
from monai.transforms import (
    Activations,
    AsDiscrete,
    Compose,
    LoadImaged,
    MapTransform,
    NormalizeIntensityd,
    Orientationd,
    RandFlipd,
    RandScaleIntensityd,
    RandShiftIntensityd,
    RandSpatialCropd,
    Spacingd,
    EnsureTyped,
    EnsureChannelFirstd,
)
from monai.utils import set_determinism

import torch

次に、ColabインスタンスをW&Bで認証します。

wandb.login()

W&B Runの初期化

新しいW&B runを開始して実験を追跡します。

wandb.init(project="monai-brain-tumor-segmentation")

適切な設定システムの使用は、再現性のある機械学習のための推奨されるベストプラクティスです。W&Bを使用して、各実験のハイパーパラメーターを追跡できます。

config = wandb.config
config.seed = 0
config.roi_size = [224, 224, 144]
config.batch_size = 1
config.num_workers = 4
config.max_train_images_visualized = 20
config.max_val_images_visualized = 20
config.dice_loss_smoothen_numerator = 0
config.dice_loss_smoothen_denominator = 1e-5
config.dice_loss_squared_prediction = True
config.dice_loss_target_onehot = False
config.dice_loss_apply_sigmoid = True
config.initial_learning_rate = 1e-4
config.weight_decay = 1e-5
config.max_train_epochs = 50
config.validation_intervals = 1
config.dataset_dir = "./dataset/"
config.checkpoint_dir = "./checkpoints"
config.inference_roi_size = (128, 128, 64)
config.max_prediction_images_visualized = 20

また、ランダムシードを設定して、モジュールの決定的なトレーニングを有効または無効にする必要があります。

set_determinism(seed=config.seed)

# ディレクトリを作成
os.makedirs(config.dataset_dir, exist_ok=True)
os.makedirs(config.checkpoint_dir, exist_ok=True)

データのロードと変換

ここでは、monai.transforms APIを使用して、マルチクラスラベルをワンホット形式でのマルチラベルセグメンテーションタスクに変換するカスタムトランスフォームを作成します。

class ConvertToMultiChannelBasedOnBratsClassesd(MapTransform):
    """
    脳腫瘍クラスに基づいてラベルをマルチチャネルに変換します:
    ラベル 1 は腫瘍周辺の浮腫
    ラベル 2 はGD 増強腫瘍
    ラベル 3 は壊死および非増強腫瘍コア
    考えられるクラスはTC (腫瘍コア)、WT (全腫瘍)
    ET (増強腫瘍)です。

    参考:https://github.com/Project-MONAI/tutorials/blob/main/3d_segmentation/brats_segmentation_3d.ipynb

    """

    def __call__(self, data):
        d = dict(data)
        for key in self.keys:
            result = []
            # label 2 と label 3 を組み合わせて TC を構築
            result.append(torch.logical_or(d[key] == 2, d[key] == 3))
            # label 1, 2 と 3 を組み合わせて WT を構築
            result.append(
                torch.logical_or(
                    torch.logical_or(d[key] == 2, d[key] == 3), d[key] == 1
                )
            )
            # label 2 は ET
            result.append(d[key] == 2)
            d[key] = torch.stack(result, axis=0).float()
        return d

次に、トレーニングデータセットと検証データセット用にそれぞれトランスフォームを設定します。

train_transform = Compose(
    [
        # 4つの Nifti 画像を読み込み、それらをスタック
        LoadImaged(keys=["image", "label"]),
        EnsureChannelFirstd(keys="image"),
        EnsureTyped(keys=["image", "label"]),
        ConvertToMultiChannelBasedOnBratsClassesd(keys="label"),
        Orientationd(keys=["image", "label"], axcodes="RAS"),
        Spacingd(
            keys=["image", "label"],
            pixdim=(1.0, 1.0, 1.0),
            mode=("bilinear", "nearest"),
        ),
        RandSpatialCropd(
            keys=["image", "label"], roi_size=config.roi_size, random_size=False
        ),
        RandFlipd(keys=["image", "label"], prob=0.5, spatial_axis=0),
        RandFlipd(keys=["image", "label"], prob=0.5, spatial_axis=1),
        RandFlipd(keys=["image", "label"], prob=0.5, spatial_axis=2),
        NormalizeIntensityd(keys="image", nonzero=True, channel_wise=True),
        RandScaleIntensityd(keys="image", factors=0.1, prob=1.0),
        RandShiftIntensityd(keys="image", offsets=0.1, prob=1.0),
    ]
)
val_transform = Compose(
    [
        LoadImaged(keys=["image", "label"]),
        EnsureChannelFirstd(keys="image"),
        EnsureTyped(keys=["image", "label"]),
        ConvertToMultiChannelBasedOnBratsClassesd(keys="label"),
        Orientationd(keys=["image", "label"], axcodes="RAS"),
        Spacingd(
            keys=["image", "label"],
            pixdim=(1.0, 1.0, 1.0),
            mode=("bilinear", "nearest"),
        ),
        NormalizeIntensityd(keys="image", nonzero=True, channel_wise=True),
    ]
)

データセット

この実験で使用されるデータセットは、http://medicaldecathlon.com/ から入手可能です。マルチモーダルおよびマルチサイトMRIデータ(FLAIR, T1w, T1gd, T2w)を使用して、膠芽腫、壊死/活動中の腫瘍、および浮腫をセグメント化します。データセットは750個の4Dボリューム(484 トレーニング + 266 テスト)で構成されています。

DecathlonDataset を使用してデータセットを自動的にダウンロードし、抽出します。MONAI CacheDataset を継承し、cache_num=N を設定してトレーニングのために N アイテムをキャッシュし、メモリサイズに応じてデフォルト引数を使用して検証のためにすべてのアイテムをキャッシュできます。

train_dataset = DecathlonDataset(
    root_dir=config.dataset_dir,
    task="Task01_BrainTumour",
    transform=val_transform,
    section="training",
    download=True,
    cache_rate=0.0,
    num_workers=4,
)
val_dataset = DecathlonDataset(
    root_dir=config.dataset_dir,
    task="Task01_BrainTumour",
    transform=val_transform,
    section="validation",
    download=False,
    cache_rate=0.0,
    num_workers=4,
)

データセットの可視化

Weights & Biasesは画像、ビデオ、オーディオなどをサポートしています。リッチメディアをログに取り込み、結果を探索し、run、モデル、データセットを視覚的に比較できます。セグメンテーションマスクオーバーレイシステムを使用してデータボリュームを可視化します。 テーブルにセグメンテーションマスクをログに追加するには、テーブル内の各行にwandb.Image オブジェクトを提供する必要があります。

次の擬似コードは、その例です。

table = wandb.Table(columns=["ID", "Image"])

for id, img, label in zip(ids, images, labels):
    mask_img = wandb.Image(
        img,
        masks={
            "prediction": {"mask_data": label, "class_labels": class_labels}
            # ...
        },
    )

    table.add_data(id, img)

wandb.log({"Table": table})

次に、サンプル画像、ラベル、wandb.Table オブジェクトと関連するメタデータを受け取り、Weights & Biases ダッシュボードにログされるテーブルの行を埋めるユーティリティ関数を作成します。

def log_data_samples_into_tables(
    sample_image: np.array,
    sample_label: np.array,
    split: str = None,
    data_idx: int = None,
    table: wandb.Table = None,
):
    num_channels, _, _, num_slices = sample_image.shape
    with tqdm(total=num_slices, leave=False) as progress_bar:
        for slice_idx in range(num_slices):
            ground_truth_wandb_images = []
            for channel_idx in range(num_channels):
                ground_truth_wandb_images.append(
                    masks = {
                        "ground-truth/Tumor-Core": {
                            "mask_data": sample_label[0, :, :, slice_idx],
                            "class_labels": {0: "background", 1: "Tumor Core"},
                        },
                        "ground-truth/Whole-Tumor": {
                            "mask_data": sample_label[1, :, :, slice_idx] * 2,
                            "class_labels": {0: "background", 2: "Whole Tumor"},
                        },
                        "ground-truth/Enhancing-Tumor": {
                            "mask_data": sample_label[2, :, :, slice_idx] * 3,
                            "class_labels": {0: "background", 3: "Enhancing Tumor"},
                        },
                    }
                    wandb.Image(
                        sample_image[channel_idx, :, :, slice_idx],
                        masks=masks,
                    )
                )
            table.add_data(split, data_idx, slice_idx, *ground_truth_wandb_images)
            progress_bar.update(1)
    return table

次に、wandb.Table オブジェクトと、それが含む列を定義し、データ可視化を使用してそれを埋めます。

table = wandb.Table(
    columns=[
        "Split",
        "Data Index",
        "Slice Index",
        "Image-Channel-0",
        "Image-Channel-1",
        "Image-Channel-2",
        "Image-Channel-3",
    ]
)

次に、それぞれtrain_datasetval_dataset をループして、データサンプルの可視化を生成し、ダッシュボードにログを取るためのテーブルの行を埋めます。

# train_dataset の可視化を生成
max_samples = (
    min(config.max_train_images_visualized, len(train_dataset))
    if config.max_train_images_visualized > 0
    else len(train_dataset)
)
progress_bar = tqdm(
    enumerate(train_dataset[:max_samples]),
    total=max_samples,
    desc="Generating Train Dataset Visualizations:",
)
for data_idx, sample in progress_bar:
    sample_image = sample["image"].detach().cpu().numpy()
    sample_label = sample["label"].detach().cpu().numpy()
    table = log_data_samples_into_tables(
        sample_image,
        sample_label,
        split="train",
        data_idx=data_idx,
        table=table,
    )

# val_dataset の可視化を生成
max_samples = (
    min(config.max_val_images_visualized, len(val_dataset))
    if config.max_val_images_visualized > 0
    else len(val_dataset)
)
progress_bar = tqdm(
    enumerate(val_dataset[:max_samples]),
    total=max_samples,
    desc="Generating Validation Dataset Visualizations:",
)
for data_idx, sample in progress_bar:
    sample_image = sample["image"].detach().cpu().numpy()
    sample_label = sample["label"].detach().cpu().numpy()
    table = log_data_samples_into_tables(
        sample_image,
        sample_label,
        split="val",
        data_idx=data_idx,
        table=table,
    )

# ダッシュボードにテーブルをログ
wandb.log({"Tumor-Segmentation-Data": table})

データはW&Bダッシュボード上でインタラクティブな表形式で表示されます。我々はデータボリュームの特定のスライスの各チャンネルを、各行の対応するセグメンテーションマスクと重ね合わせて見ることができます。テーブルのデータを論理的にフィルタリングして、特定の行に集中するために Weave クエリ を書くことができます。

An example of logged table data.
ログされたテーブルデータの例。

画像を開き、インタラクティブなオーバーレイを使用して、各セグメンテーションマスクをどのように操作できるかを見てみてください。

An example of visualized segmentation maps.
セグメンテーションマップの可視化例。

データのロード

データセットからデータをロードするための PyTorch DataLoaders を作成します。 DataLoaders を作成する前に、train_datasettransformtrain_transform に設定して、トレーニング用のデータを前処理および変換します。

# トレーニングデータセットにtrain_transformsを適用
train_dataset.transform = train_transform

# train_loaderを作成
train_loader = DataLoader(
    train_dataset,
    batch_size=config.batch_size,
    shuffle=True,
    num_workers=config.num_workers,
)

# val_loaderを作成
val_loader = DataLoader(
    val_dataset,
    batch_size=config.batch_size,
    shuffle=False,
    num_workers=config.num_workers,
)

モデル、損失、およびオプティマイザーの作成

このチュートリアルでは、 3D MRI brain tumor segmentation using auto-encoder regularization に基づいた SegResNet モデルを作成します。SegResNet モデルは、 monai.networks API の一部として PyTorch モジュールとして実装されています。これはオプティマイザーと学習率スケジューラともに利用可能です。

device = torch.device("cuda:0")

# モデルの作成
model = SegResNet(
    blocks_down=[1, 2, 2, 4],
    blocks_up=[1, 1, 1],
    init_filters=16,
    in_channels=4,
    out_channels=3,
    dropout_prob=0.2,
).to(device)

# オプティマイザーの作成
optimizer = torch.optim.Adam(
    model.parameters(),
    config.initial_learning_rate,
    weight_decay=config.weight_decay,
)

# 学習率スケジューラの作成
lr_scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
    optimizer, T_max=config.max_train_epochs
)

損失を monai.losses API を使用してマルチラベル DiceLoss として定義し、それに対応するダイスメトリクスを monai.metrics API を使用して定義します。

loss_function = DiceLoss(
    smooth_nr=config.dice_loss_smoothen_numerator,
    smooth_dr=config.dice_loss_smoothen_denominator,
    squared_pred=config.dice_loss_squared_prediction,
    to_onehot_y=config.dice_loss_target_onehot,
    sigmoid=config.dice_loss_apply_sigmoid,
)

dice_metric = DiceMetric(include_background=True, reduction="mean")
dice_metric_batch = DiceMetric(include_background=True, reduction="mean_batch")
post_trans = Compose([Activations(sigmoid=True), AsDiscrete(threshold=0.5)])

# 自動混合精度を使用してトレーニングを加速
scaler = torch.cuda.amp.GradScaler()
torch.backends.cudnn.benchmark = True

混合精度推論のための小さなユーティリティを定義します。これはトレーニングプロセスの検証ステップおよびトレーニング後にモデルを実行したいときに役立ちます。

def inference(model, input):
    def _compute(input):
        return sliding_window_inference(
            inputs=input,
            roi_size=(240, 240, 160),
            sw_batch_size=1,
            predictor=model,
            overlap=0.5,
        )

    with torch.cuda.amp.autocast():
        return _compute(input)

トレーニングと検証

トレーニングの前に、トレーニングと検証の実験管理を追跡するために wandb.log() でログを取るメトリクスプロパティを定義します。

wandb.define_metric("epoch/epoch_step")
wandb.define_metric("epoch/*", step_metric="epoch/epoch_step")
wandb.define_metric("batch/batch_step")
wandb.define_metric("batch/*", step_metric="batch/batch_step")
wandb.define_metric("validation/validation_step")
wandb.define_metric("validation/*", step_metric="validation/validation_step")

batch_step = 0
validation_step = 0
metric_values = []
metric_values_tumor_core = []
metric_values_whole_tumor = []
metric_values_enhanced_tumor = []

標準的な PyTorch トレーニングループの実行

# W&B アーティファクトオブジェクトを定義
artifact = wandb.Artifact(
    name=f"{wandb.run.id}-checkpoint", type="model"
)

epoch_progress_bar = tqdm(range(config.max_train_epochs), desc="Training:")

for epoch in epoch_progress_bar:
    model.train()
    epoch_loss = 0

    total_batch_steps = len(train_dataset) // train_loader.batch_size
    batch_progress_bar = tqdm(train_loader, total=total_batch_steps, leave=False)
    
    # トレーニングステップ
    for batch_data in batch_progress_bar:
        inputs, labels = (
            batch_data["image"].to(device),
            batch_data["label"].to(device),
        )
        optimizer.zero_grad()
        with torch.cuda.amp.autocast():
            outputs = model(inputs)
            loss = loss_function(outputs, labels)
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
        epoch_loss += loss.item()
        batch_progress_bar.set_description(f"train_loss: {loss.item():.4f}:")
        ## バッチ単位のトレーニング損失を W&B にログ
        wandb.log({"batch/batch_step": batch_step, "batch/train_loss": loss.item()})
        batch_step += 1

    lr_scheduler.step()
    epoch_loss /= total_batch_steps
    ## エポック単位のトレーニング損失と学習率を W&B にログ
    wandb.log(
        {
            "epoch/epoch_step": epoch,
            "epoch/mean_train_loss": epoch_loss,
            "epoch/learning_rate": lr_scheduler.get_last_lr()[0],
        }
    )
    epoch_progress_bar.set_description(f"Training: train_loss: {epoch_loss:.4f}:")

    # 検証とモデルのチェックポイントステップ
    if (epoch + 1) % config.validation_intervals == 0:
        model.eval()
        with torch.no_grad():
            for val_data in val_loader:
                val_inputs, val_labels = (
                    val_data["image"].to(device),
                    val_data["label"].to(device),
                )
                val_outputs = inference(model, val_inputs)
                val_outputs = [post_trans(i) for i in decollate_batch(val_outputs)]
                dice_metric(y_pred=val_outputs, y=val_labels)
                dice_metric_batch(y_pred=val_outputs, y=val_labels)

            metric_values.append(dice_metric.aggregate().item())
            metric_batch = dice_metric_batch.aggregate()
            metric_values_tumor_core.append(metric_batch[0].item())
            metric_values_whole_tumor.append(metric_batch[1].item())
            metric_values_enhanced_tumor.append(metric_batch[2].item())
            dice_metric.reset()
            dice_metric_batch.reset()

            checkpoint_path = os.path.join(config.checkpoint_dir, "model.pth")
            torch.save(model.state_dict(), checkpoint_path)
            
            # W&Bアーティファクトを使用してモデルのチェックポイントをログとバージョン管理。
            artifact.add_file(local_path=checkpoint_path)
            wandb.log_artifact(artifact, aliases=[f"epoch_{epoch}"])

            # W&Bダッシュボードに検証メトリクスをログ。
            wandb.log(
                {
                    "validation/validation_step": validation_step,
                    "validation/mean_dice": metric_values[-1],
                    "validation/mean_dice_tumor_core": metric_values_tumor_core[-1],
                    "validation/mean_dice_whole_tumor": metric_values_whole_tumor[-1],
                    "validation/mean_dice_enhanced_tumor": metric_values_enhanced_tumor[-1],
                }
            )
            validation_step += 1


# このアーティファクトがログを終了するのを待ちます。
artifact.wait()

コードを wandb.log で計装することで、トレーニングと検証プロセスに関連するメトリクスすべてを追跡するだけでなく、W&Bダッシュボード上のすべてのシステムメトリクス(この場合はCPUとGPU)も追跡できます。

An example of training and validation process tracking on W&B.
W&Bでのトレーニングと検証プロセス追跡の例。

W&Bの run ダッシュボードのアーティファクトタブに移動して、トレーニング中にログされたモデルチェックポイントアーティファクトの異なるバージョンにアクセスします。

An example of model checkpoints logging and versioning on W&B.
W&Bでのモデルチェックポイントのログとバージョン管理の例。

推論

アーティファクトインターフェースを使用して、このケースでは平均エポック単位のトレーニング損失が最良のモデルチェックポイントであるアーティファクトのバージョンを選択できます。アーティファクト全体のリネージを探索し、必要なバージョンを使用することもできます。

An example of model artifact tracking on W&B.
W&Bでのモデルアーティファクト追跡の例。

最良のエポック単位の平均トレーニング損失を持つモデルアーティファクトのバージョンをフェッチし、チェックポイントステート辞書をモデルにロードします。

model_artifact = wandb.use_artifact(
    "geekyrakshit/monai-brain-tumor-segmentation/d5ex6n4a-checkpoint:v49",
    type="model",
)
model_artifact_dir = model_artifact.download()
model.load_state_dict(torch.load(os.path.join(model_artifact_dir, "model.pth")))
model.eval()

予測の可視化と正解ラベルとの比較

予測されたセグメンテーションマスクと対応する正解のセグメンテーションマスクをインタラクティブなセグメンテーションマスクオーバーレイを使用して視覚化するためのユーティリティ関数を作成します。

def log_predictions_into_tables(
    sample_image: np.array,
    sample_label: np.array,
    predicted_label: np.array,
    split: str = None,
    data_idx: int = None,
    table: wandb.Table = None,
):
    num_channels, _, _, num_slices = sample_image.shape
    with tqdm(total=num_slices, leave=False) as progress_bar:
        for slice_idx in range(num_slices):
            wandb_images = []
            for channel_idx in range(num_channels):
                wandb_images += [
                    wandb.Image(
                        sample_image[channel_idx, :, :, slice_idx],
                        masks={
                            "ground-truth/Tumor-Core": {
                                "mask_data": sample_label[0, :, :, slice_idx],
                                "class_labels": {0: "background", 1: "Tumor Core"},
                            },
                            "prediction/Tumor-Core": {
                                "mask_data": predicted_label[0, :, :, slice_idx] * 2,
                                "class_labels": {0: "background", 2: "Tumor Core"},
                            },
                        },
                    ),
                    wandb.Image(
                        sample_image[channel_idx, :, :, slice_idx],
                        masks={
                            "ground-truth/Whole-Tumor": {
                                "mask_data": sample_label[1, :, :, slice_idx],
                                "class_labels": {0: "background", 1: "Whole Tumor"},
                            },
                            "prediction/Whole-Tumor": {
                                "mask_data": predicted_label[1, :, :, slice_idx] * 2,
                                "class_labels": {0: "background", 2: "Whole Tumor"},
                            },
                        },
                    ),
                    wandb.Image(
                        sample_image[channel_idx, :, :, slice_idx],
                        masks={
                            "ground-truth/Enhancing-Tumor": {
                                "mask_data": sample_label[2, :, :, slice_idx],
                                "class_labels": {0: "background", 1: "Enhancing Tumor"},
                            },
                            "prediction/Enhancing-Tumor": {
                                "mask_data": predicted_label[2, :, :, slice_idx] * 2,
                                "class_labels": {0: "background", 2: "Enhancing Tumor"},
                            },
                        },
                    ),
                ]
            table.add_data(split, data_idx, slice_idx, *wandb_images)
            progress_bar.update(1)
    return table

予測結果を予測テーブルにログします。

# 予測テーブルを作成
prediction_table = wandb.Table(
    columns=[
        "Split",
        "Data Index",
        "Slice Index",
        "Image-Channel-0/Tumor-Core",
        "Image-Channel-1/Tumor-Core",
        "Image-Channel-2/Tumor-Core",
        "Image-Channel-3/Tumor-Core",
        "Image-Channel-0/Whole-Tumor",
        "Image-Channel-1/Whole-Tumor",
        "Image-Channel-2/Whole-Tumor",
        "Image-Channel-3/Whole-Tumor",
        "Image-Channel-0/Enhancing-Tumor",
        "Image-Channel-1/Enhancing-Tumor",
        "Image-Channel-2/Enhancing-Tumor",
        "Image-Channel-3/Enhancing-Tumor",
    ]
)

# 推論と可視化を実行
with torch.no_grad():
    config.max_prediction_images_visualized
    max_samples = (
        min(config.max_prediction_images_visualized, len(val_dataset))
        if config.max_prediction_images_visualized > 0
        else len(val_dataset)
    )
    progress_bar = tqdm(
        enumerate(val_dataset[:max_samples]),
        total=max_samples,
        desc="Generating Predictions:",
    )
    for data_idx, sample in progress_bar:
        val_input = sample["image"].unsqueeze(0).to(device)
        val_output = inference(model, val_input)
        val_output = post_trans(val_output[0])
        prediction_table = log_predictions_into_tables(
            sample_image=sample["image"].cpu().numpy(),
            sample_label=sample["label"].cpu().numpy(),
            predicted_label=val_output.cpu().numpy(),
            data_idx=data_idx,
            split="validation",
            table=prediction_table,
        )

    wandb.log({"Predictions/Tumor-Segmentation-Data": prediction_table})


# 実験終了
wandb.finish()

インタラクティブなセグメンテーションマスクオーバーレイを使用して、予測されたセグメンテーションマスクと各クラスの正解ラベルを分析および比較します。

An example of predictions and ground-truth visualization on W&B.
W&Bでの予測と正解の可視化例。

謝辞と追加リソース

7 - Keras

Weights & Biases を使用して、機械学習の実験管理、データセット バージョン管理、プロジェクトのコラボレーションを行いましょう。

この Colabノートブックでは、WandbMetricsLogger コールバックを紹介します。このコールバックは 実験管理 に使用できます。これにより、トレーニングと検証のメトリクスとシステムメトリクスを Weights & Biases に記録します。

セットアップとインストール

まず、最新バージョンの Weights & Biases をインストールしましょう。次に、この Colabインスタンスを認証して W&B を使用できるようにします。

pip install -qq -U wandb
import os
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras import models
import tensorflow_datasets as tfds

# Weights and Biases 関連のインポート
import wandb
from wandb.integration.keras import WandbMetricsLogger

W&B を初めて使用する場合、またはログインしていない場合、wandb.login() を実行した後に表示されるリンクはサインアップ/ログインページに導きます。無料アカウント のサインアップは数クリックで完了します。

wandb.login()

ハイパーパラメーター

適切な設定システムの使用は、再現可能な機械学習の推奨ベストプラクティスです。W&B を使用して、実験ごとにハイパーパラメーターを追跡できます。この Colab では、シンプルな Python の dict を設定システムとして使用します。

configs = dict(
    num_classes=10,
    shuffle_buffer=1024,
    batch_size=64,
    image_size=28,
    image_channels=1,
    earlystopping_patience=3,
    learning_rate=1e-3,
    epochs=10,
)

データセット

この Colab では、TensorFlow データセットカタログから CIFAR100 データセットを使用します。私たちの目標は、TensorFlow/Keras を使用してシンプルな画像分類パイプラインを構築することです。

train_ds, valid_ds = tfds.load("fashion_mnist", split=["train", "test"])
AUTOTUNE = tf.data.AUTOTUNE

def parse_data(example):
    # 画像を取得する
    image = example["image"]
    # image = tf.image.convert_image_dtype(image, dtype=tf.float32)

    # ラベルを取得する
    label = example["label"]
    label = tf.one_hot(label, depth=configs["num_classes"])

    return image, label

def get_dataloader(ds, configs, dataloader_type="train"):
    dataloader = ds.map(parse_data, num_parallel_calls=AUTOTUNE)

    if dataloader_type == "train":
        dataloader = dataloader.shuffle(configs["shuffle_buffer"])

    dataloader = dataloader.batch(configs["batch_size"]).prefetch(AUTOTUNE)

    return dataloader
trainloader = get_dataloader(train_ds, configs)
validloader = get_dataloader(valid_ds, configs, dataloader_type="valid")

モデル

def get_model(configs):
    backbone = tf.keras.applications.mobilenet_v2.MobileNetV2(
        weights="imagenet", include_top=False
    )
    backbone.trainable = False

    inputs = layers.Input(
        shape=(configs["image_size"], configs["image_size"], configs["image_channels"])
    )
    resize = layers.Resizing(32, 32)(inputs)
    neck = layers.Conv2D(3, (3, 3), padding="same")(resize)
    preprocess_input = tf.keras.applications.mobilenet.preprocess_input(neck)
    x = backbone(preprocess_input)
    x = layers.GlobalAveragePooling2D()(x)
    outputs = layers.Dense(configs["num_classes"], activation="softmax")(x)

    return models.Model(inputs=inputs, outputs=outputs)
tf.keras.backend.clear_session()
model = get_model(configs)
model.summary()

モデルのコンパイル

model.compile(
    optimizer="adam",
    loss="categorical_crossentropy",
    metrics=[
        "accuracy",
        tf.keras.metrics.TopKCategoricalAccuracy(k=5, name="top@5_accuracy"),
    ],
)

トレーニング

# W&B の run を初期化する
run = wandb.init(project="intro-keras", config=configs)

# あなたのモデルをトレーニングする
model.fit(
    trainloader,
    epochs=configs["epochs"],
    validation_data=validloader,
    callbacks=[
        WandbMetricsLogger(log_freq=10)
    ],  # ここで WandbMetricsLogger を使用することに注意
)

# W&B の run を終了する
run.finish()

8 - Keras テーブル

機械学習の実験管理、データセットバージョン管理、およびプロジェクトコラボレーションに Weights & Biases を使用します。

この Colabノートブックでは、WandbEvalCallback を紹介します。これは抽象的なコールバックで、モデル予測の可視化とデータセットの可視化に役立つコールバックを構築するために継承されます。

セットアップとインストール

まず、最新バージョンの Weights and Biases をインストールしましょう。その後、この Colab インスタンスを認証して W&B を利用できるようにします。

pip install -qq -U wandb
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras import models
import tensorflow_datasets as tfds

# Weights and Biases に関連するインポート
import wandb
from wandb.integration.keras import WandbMetricsLogger
from wandb.integration.keras import WandbModelCheckpoint
from wandb.integration.keras import WandbEvalCallback

もしこれが初めての W&B の使用であるかまだログインしていない場合、wandb.login() を実行した後に表示されるリンクがサインアップ/ログインページに誘導します。無料アカウントへのサインアップは、数クリックで簡単です。

wandb.login()

ハイパーパラメーター

適切なコンフィグシステムの使用は、再現可能な機械学習のための推奨ベストプラクティスです。W&B を使用して、各実験のハイパーパラメーターを管理することができます。この Colab では、シンプルな Python の dict をコンフィグシステムとして使用します。

configs = dict(
    num_classes=10,
    shuffle_buffer=1024,
    batch_size=64,
    image_size=28,
    image_channels=1,
    earlystopping_patience=3,
    learning_rate=1e-3,
    epochs=10,
)

データセット

この Colab では、TensorFlow データセットカタログから CIFAR100 データセットを使用します。TensorFlow/Keras を使用して、シンプルな画像分類 パイプラインを構築することを目指します。

train_ds, valid_ds = tfds.load("fashion_mnist", split=["train", "test"])
AUTOTUNE = tf.data.AUTOTUNE

def parse_data(example):
    # 画像を取得
    image = example["image"]
    # image = tf.image.convert_image_dtype(image, dtype=tf.float32)

    # ラベルを取得
    label = example["label"]
    label = tf.one_hot(label, depth=configs["num_classes"])

    return image, label

def get_dataloader(ds, configs, dataloader_type="train"):
    dataloader = ds.map(parse_data, num_parallel_calls=AUTOTUNE)

    if dataloader_type=="train":
        dataloader = dataloader.shuffle(configs["shuffle_buffer"])
      
    dataloader = (
        dataloader
        .batch(configs["batch_size"])
        .prefetch(AUTOTUNE)
    )

    return dataloader
trainloader = get_dataloader(train_ds, configs)
validloader = get_dataloader(valid_ds, configs, dataloader_type="valid")

モデル

def get_model(configs):
    backbone = tf.keras.applications.mobilenet_v2.MobileNetV2(
        weights="imagenet", include_top=False
    )
    backbone.trainable = False

    inputs = layers.Input(
        shape=(configs["image_size"], configs["image_size"], configs["image_channels"])
    )
    resize = layers.Resizing(32, 32)(inputs)
    neck = layers.Conv2D(3, (3, 3), padding="same")(resize)
    preprocess_input = tf.keras.applications.mobilenet.preprocess_input(neck)
    x = backbone(preprocess_input)
    x = layers.GlobalAveragePooling2D()(x)
    outputs = layers.Dense(configs["num_classes"], activation="softmax")(x)

    return models.Model(inputs=inputs, outputs=outputs)
tf.keras.backend.clear_session()
model = get_model(configs)
model.summary()

モデルのコンパイル

model.compile(
    optimizer="adam",
    loss="categorical_crossentropy",
    metrics=[
        "accuracy",
        tf.keras.metrics.TopKCategoricalAccuracy(k=5, name="top@5_accuracy"),
    ],
)

WandbEvalCallback

WandbEvalCallback は主にモデル予測の可視化、そして二次的にはデータセットの可視化のための Keras コールバックを構築するための抽象基底クラスです。

これはデータセットやタスクに依存しない抽象コールバックです。これを使用するには、この基底コールバッククラスから継承し、add_ground_truthadd_model_prediction メソッドを実装します。

WandbEvalCallback は以下のような便利なメソッドを提供するユーティリティクラスです:

  • データと予測の wandb.Table インスタンスを作成、
  • wandb.Artifact としてデータと予測テーブルをログ、
  • on_train_begin にデータテーブルをログ、
  • on_epoch_end に予測テーブルをログ。

例として、画像分類タスクのために WandbClfEvalCallback を以下に実装しました。この例では:

  • W&B にバリデーションデータ (data_table) をログ、
  • 推論を行い、各エポックの終わりに W&B に予測 (pred_table) をログします。

メモリ使用量が削減される仕組み

on_train_begin メソッドが呼び出される時に data_table を W&B にログします。一度 W&B のアーティファクトとしてアップロードされると、このテーブルへの参照を取得できます。それはクラス変数 data_table_ref を使用してアクセスできます。data_table_ref は 2D リストで、self.data_table_ref[idx][n] のようにインデックス付けできます。ここで idx は行番号、n は列番号です。以下の例で使用方法を見てみましょう。

class WandbClfEvalCallback(WandbEvalCallback):
    def __init__(
        self, validloader, data_table_columns, pred_table_columns, num_samples=100
    ):
        super().__init__(data_table_columns, pred_table_columns)

        self.val_data = validloader.unbatch().take(num_samples)

    def add_ground_truth(self, logs=None):
        for idx, (image, label) in enumerate(self.val_data):
            self.data_table.add_data(idx, wandb.Image(image), np.argmax(label, axis=-1))

    def add_model_predictions(self, epoch, logs=None):
        # 予測を得る
        preds = self._inference()
        table_idxs = self.data_table_ref.get_index()

        for idx in table_idxs:
            pred = preds[idx]
            self.pred_table.add_data(
                epoch,
                self.data_table_ref.data[idx][0],
                self.data_table_ref.data[idx][1],
                self.data_table_ref.data[idx][2],
                pred,
            )

    def _inference(self):
        preds = []
        for image, label in self.val_data:
            pred = self.model(tf.expand_dims(image, axis=0))
            argmax_pred = tf.argmax(pred, axis=-1).numpy()[0]
            preds.append(argmax_pred)

        return preds

トレーニング

# W&B の run を初期化
run = wandb.init(project="intro-keras", config=configs)

# モデルをトレーニング
model.fit(
    trainloader,
    epochs=configs["epochs"],
    validation_data=validloader,
    callbacks=[
        WandbMetricsLogger(log_freq=10),
        WandbClfEvalCallback(
            validloader,
            data_table_columns=["idx", "image", "ground_truth"],
            pred_table_columns=["epoch", "idx", "image", "ground_truth", "prediction"],
        ),  # ここで WandbEvalCallback を使用
    ],
)

# W&B の run を終了
run.finish()

9 - Keras モデル

Weights & Biases を使用して機械学習の実験管理、データセットのバージョン管理、プロジェクトのコラボレーションを行いましょう。

この Colab ノートブックは WandbModelCheckpoint コールバックを紹介します。このコールバックを使用して、モデルのチェックポイントを Weights & Biases Artifacts にログします。

セットアップとインストール

まず、最新バージョンの Weights & Biases をインストールします。次に、この colab インスタンスを認証して W&B を使用します。

!pip install -qq -U wandb
import os
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras import models
import tensorflow_datasets as tfds

# Weights & Biases に関連するインポート
import wandb
from wandb.integration.keras import WandbMetricsLogger
from wandb.integration.keras import WandbModelCheckpoint

もし初めて W&B を使用するか、ログインしていない場合、wandb.login() を実行した後に表示されるリンクをクリックするとサインアップ/ログインページに移動します。無料アカウント の登録は数回のクリックで簡単に行えます。

wandb.login()

ハイパーパラメーター

適切なコンフィグシステムの使用は、再現性のある機械学習のベストプラクティスとして推奨されます。各実験のハイパーパラメーターを W&B を使用して管理できます。この colab では、コンフィグシステムとしてシンプルな Python の dict を使用します。

configs = dict(
    num_classes = 10,
    shuffle_buffer = 1024,
    batch_size = 64,
    image_size = 28,
    image_channels = 1,
    earlystopping_patience = 3,
    learning_rate = 1e-3,
    epochs = 10
)

データセット

この colab では、 TensorFlow データセットカタログから CIFAR100 データセットを使用します。TensorFlow/Keras でシンプルな画像分類パイプラインを構築することを目指します。

train_ds, valid_ds = tfds.load('fashion_mnist', split=['train', 'test'])
AUTOTUNE = tf.data.AUTOTUNE

def parse_data(example):
    # 画像を取得
    image = example["image"]
    # image = tf.image.convert_image_dtype(image, dtype=tf.float32)

    # ラベルを取得
    label = example["label"]
    label = tf.one_hot(label, depth=configs["num_classes"])

    return image, label

def get_dataloader(ds, configs, dataloader_type="train"):
    dataloader = ds.map(parse_data, num_parallel_calls=AUTOTUNE)

    if dataloader_type=="train":
        dataloader = dataloader.shuffle(configs["shuffle_buffer"])
      
    dataloader = (
        dataloader
        .batch(configs["batch_size"])
        .prefetch(AUTOTUNE)
    )

    return dataloader
trainloader = get_dataloader(train_ds, configs)
validloader = get_dataloader(valid_ds, configs, dataloader_type="valid")

モデル

def get_model(configs):
    backbone = tf.keras.applications.mobilenet_v2.MobileNetV2(weights='imagenet', include_top=False)
    backbone.trainable = False

    inputs = layers.Input(shape=(configs["image_size"], configs["image_size"], configs["image_channels"]))
    resize = layers.Resizing(32, 32)(inputs)
    neck = layers.Conv2D(3, (3,3), padding="same")(resize)
    preprocess_input = tf.keras.applications.mobilenet.preprocess_input(neck)
    x = backbone(preprocess_input)
    x = layers.GlobalAveragePooling2D()(x)
    outputs = layers.Dense(configs["num_classes"], activation="softmax")(x)

    return models.Model(inputs=inputs, outputs=outputs)
tf.keras.backend.clear_session()
model = get_model(configs)
model.summary()

モデルのコンパイル

model.compile(
    optimizer = "adam",
    loss = "categorical_crossentropy",
    metrics = ["accuracy", tf.keras.metrics.TopKCategoricalAccuracy(k=5, name='top@5_accuracy')]
)

トレーニング

# W&B の run を初期化
run = wandb.init(
    project = "intro-keras",
    config = configs
)

# モデルをトレーニング
model.fit(
    trainloader,
    epochs = configs["epochs"],
    validation_data = validloader,
    callbacks = [
        WandbMetricsLogger(log_freq=10),
        WandbModelCheckpoint(filepath="models/") # ここで WandbModelCheckpoint を使用しています
    ]
)

# W&B の run を終了
run.finish()

10 - XGBoost Sweeps

Weights & Biases を使って機械学習の実験管理、データセットのバージョン管理、プロジェクトの協力を行いましょう。

ツリー型モデルから最良のパフォーマンスを引き出すには、適切なハイパーパラメーターを選択する必要があります。いくつの early_stopping_rounds が必要でしょうか? ツリーの max_depth はどのくらいにすべきですか?

高次元のハイパーパラメータ空間を検索して最適なモデルを見つけることは非常に困難です。ハイパーパラメーター探索は、モデルのバトルロイヤルを組織的かつ効率的に実施し、勝者を決定する方法を提供します。これにより、ハイパーパラメータの組み合わせを自動的に検索して、最も最適な値を見つけ出すことができます。

このチュートリアルでは、Weights & Biases を使用して XGBoost モデルで洗練されたハイパーパラメーター探索を 3 つの簡単なステップで実行する方法を紹介します。

興味を引くために、以下のプロットをチェックしてください:

sweeps_xgboost

Sweeps: 概要

Weights & Biases を使ったハイパーパラメーター探索の実行は非常に簡単です。以下の3つのシンプルなステップです:

  1. スイープを定義する: スイープを定義するためには、スイープを構成するパラメータ、使用する検索戦略、最適化するメトリクスを指定する辞書のようなオブジェクトを作成します。

  2. スイープを初期化する: 1 行のコードでスイープを初期化し、スイープの設定の辞書を渡します: sweep_id = wandb.sweep(sweep_config)

  3. スイープエージェントを実行する: これも 1 行のコードで、wandb.agent() を呼び出し、sweep_id とモデルアーキテクチャを定義してトレーニングする関数を渡します: wandb.agent(sweep_id, function=train)

これだけでハイパーパラメーター探索を実行することができます。

以下のノートブックでは、これらの 3 ステップを詳細に説明します。

ぜひこのノートブックをフォークして、パラメータを調整したり、独自のデータセットでモデルを試してみてください。

リソース

!pip install wandb -qU

import wandb
wandb.login()

1. スイープを定義する

Weights & Biases の Sweeps は、希望通りにスイープを設定するための強力なレバーを少ないコード行で提供します。スイープの設定は、辞書または YAML ファイルとして定義できます。

それらのいくつかを一緒に見ていきましょう:

  • メトリック: これは、スイープが最適化しようとしているメトリックです。メトリクスは name (トレーニングスクリプトでログされるべきメトリック名) と goal (maximizeminimize) を取ることができます。
  • 検索戦略: "method" キーを使用して指定されます。スイープでは、いくつかの異なる検索戦略をサポートしています。
  • グリッド検索: ハイパーパラメーター値のすべての組み合わせを反復します。
  • ランダム検索: ランダムに選ばれたハイパーパラメーター値の組み合わせを反復します。
  • ベイズ探索: ハイパーパラメーターをメトリクススコアの確率とマッピングする確率モデルを作成し、メトリクスを改善する高い確率のパラメータを選択します。ベイズ最適化の目的は、ハイパーパラメーター値の選択に時間をかけることですが、その代わりにより少ないハイパーパラメーター値を試すことを試みます。
  • パラメータ: ハイパーパラメータ名、離散値、範囲、または各反復で値を取り出す分布を含む辞書です。

詳細については、すべてのスイープ設定オプションのリストを参照してください。

sweep_config = {
    "method": "random", # try grid or random
    "metric": {
      "name": "accuracy",
      "goal": "maximize"   
    },
    "parameters": {
        "booster": {
            "values": ["gbtree","gblinear"]
        },
        "max_depth": {
            "values": [3, 6, 9, 12]
        },
        "learning_rate": {
            "values": [0.1, 0.05, 0.2]
        },
        "subsample": {
            "values": [1, 0.5, 0.3]
        }
    }
}

2. スイープを初期化する

wandb.sweep を呼び出すとスイープコントローラー、つまり parameters の設定を問い合わせるすべての者に提供し、metrics のパフォーマンスを wandb ログを介して返すことを期待する集中プロセスが開始されます。

sweep_id = wandb.sweep(sweep_config, project="XGBoost-sweeps")

トレーニングプロセスを定義する

スイープを実行する前に、モデルを作成してトレーニングする関数を定義する必要があります。 この関数は、ハイパーパラメーター値を取り込み、メトリクスを出力するものです。

また、スクリプト内に wandb を統合する必要があります。 主なコンポーネントは3つです:

  • wandb.init(): 新しい W&B run を初期化します。各 run はトレーニングスクリプトの単一の実行です。
  • wandb.config: すべてのハイパーパラメーターを設定 オブジェクトに保存します。これにより、私たちのアプリを使用して、ハイパーパラメーター値ごとに run をソートおよび比較することができます。
  • wandb.log(): 画像、ビデオ、オーディオファイル、HTML、プロット、またはポイントクラウドなどのメトリクスとカスタムオブジェクトをログします。

また、データをダウンロードする必要があります:

!wget https://raw.githubusercontent.com/jbrownlee/Datasets/master/pima-indians-diabetes.data.csv
# Pima Indians データセット用の XGBoost モデル
from numpy import loadtxt
from xgboost import XGBClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

# データを読み込む
def train():
  config_defaults = {
    "booster": "gbtree",
    "max_depth": 3,
    "learning_rate": 0.1,
    "subsample": 1,
    "seed": 117,
    "test_size": 0.33,
  }

  wandb.init(config=config_defaults)  # スイープ中にデフォルトが上書きされる
  config = wandb.config

  # データを読み込み、予測変数とターゲットに分ける
  dataset = loadtxt("pima-indians-diabetes.data.csv", delimiter=",")
  X, Y = dataset[:, :8], dataset[:, 8]

  # データをトレインセットとテストセットに分割
  X_train, X_test, y_train, y_test = train_test_split(X, Y,
                                                      test_size=config.test_size,
                                                      random_state=config.seed)

  # トレインセットでモデルを適合させる
  model = XGBClassifier(booster=config.booster, max_depth=config.max_depth,
                        learning_rate=config.learning_rate, subsample=config.subsample)
  model.fit(X_train, y_train)

  # テストセットで予測を行う
  y_pred = model.predict(X_test)
  predictions = [round(value) for value in y_pred]

  # 予測を評価する
  accuracy = accuracy_score(y_test, predictions)
  print(f"Accuracy: {accuracy:.0%}")
  wandb.log({"accuracy": accuracy})

3. エージェントでスイープを実行する

次に、wandb.agent を呼び出してスイープを起動します。

wandb.agent は W&B にログインしているすべてのマシンで呼び出すことができ、

  • sweep_id
  • データセットと train 関数

があるので、そのマシンはスイープに参加します。

注意: random スイープはデフォルトでは永遠に実行され、新しいパラメータの組み合わせを試し続けます。しかし、それはアプリの UI からスイープをオフにするまでです。完了する run の合計 countagent に指定することで、この動作を防ぐことができます。

wandb.agent(sweep_id, train, count=25)

結果を可視化する

スイープが終了したら、結果を確認します。

Weights & Biases は、あなたのために便利なプロットをいくつか自動的に生成します。

パラレル座標プロット

このプロットは、ハイパーパラメーター値をモデルのメトリクスにマッピングします。最良のモデルパフォーマンスをもたらしたハイパーパラメーターの組み合わせに焦点を当てるのに役立ちます。

このプロットは、単純な線形モデルを学習者として使用するよりも、ツリーを学習者として使用する方がやや優れていることを示しているようです。

sweeps_xgboost

ハイパーパラメーターの重要度プロット

ハイパーパラメーターの重要度プロットは、メトリクスに最も大きな影響を与えたハイパーパラメーター値を示しています。

相関関係(線形予測子として扱う)と特徴量の重要性(結果に基づいてランダムフォレストをトレーニングした後) の両方を報告しますので、どのパラメータが最も大きな影響を与えたか、そしてその影響がどちらの方向であったかを確認できます。

このチャートを読むと、上記のパラレル座標チャートで気づいた傾向の定量的な確認が得られます。検証精度に最大の影響を与えたのは学習者の選択であり、gblinear 学習者は一般的に gbtree 学習者よりも劣っていました。

sweeps_xgboost

これらの可視化は、最も重要で、さらに探索する価値のあるパラメータ(とその値の範囲)に焦点を当てることにより、高価なハイパーパラメーターの最適化を実行する時間とリソースを節約するのに役立ちます。