メインコンテンツへスキップ
W&B Weave の Threads を使用すると、LLM アプリケーション内の複数ターンにわたる会話をトラッキングして分析できます。Threads では、関連する Call を共通の thread_id の下にグループ化するため、セッション全体を可視化し、ターンをまたいで会話レベルのメトリクスをトラッキングできます。スレッドはプログラムで作成でき、Weave UI で可視化できます。 Threads を使い始めるには、次の手順に従ってください。
  1. まず、Threads の基本を確認します。
  2. コードサンプルを試します。一般的な使用パターンと実際のユースケースを示しています。

ユースケース

Threads は、次のような対象を整理・分析したい場合に役立ちます。
  • 複数ターンにわたる会話
  • セッションベースのワークフロー
  • 関連する一連の操作
Threads を使用すると、コンテキストごとに Calls をグループ化できるため、複数の ターン にまたがってシステムがどのように応答するかを把握しやすくなります。たとえば、単一のユーザーセッション、エージェント の一連の意思決定、あるいはインフラストラクチャー層とビジネスロジック層にまたがる複雑なリクエストをトラッキングできます。 application を threads と ターン で構造化することで、より整理されたメトリクスを得られ、Weave UI での可視性も向上します。低レベルの Op を個別にすべて見るのではなく、重要な高レベルの step に集中できます。

定義

Thread

Thread は、共通の会話コンテキストを共有する関連する Calls を、論理的にまとめたものです。Thread には次の特徴があります。
  • 一意の thread_id を持つ
  • 1 つ以上の turn を含む
  • Calls をまたいでコンテキストを維持する
  • 完結したユーザーセッションまたはインタラクションフローを表す

Turn

Turn は Thread 内の高レベルな操作で、UI ではスレッドビュー内の個々の行として表示されます。各 Turn には、次の特徴があります。
  • 会話またはワークフローにおける 1 つの論理的な step を表します
  • スレッドコンテキストの直接の子要素であり、ネストされた下位レベルの Call を含む場合があります (スレッドレベルの統計には表示されません) 。

Call

Call とは、アプリケーション内で実行される @weave.op でデコレートされた関数呼び出しのことです。
  • Turn Call は、新しいターンを開始するトップレベルの操作です。
  • Nested Call は、ターン内のより下位の操作です。

トレース

トレース には、単一のoperationにおける完全なCallスタックが記録されます。スレッドは、同じ論理的な会話またはセッションに属するトレースをまとめるものです。言い換えると、スレッドは複数のターンで構成され、それぞれが会話の一部を表します。トレースの詳細については、トレースの概要を参照してください。

UIの概要

Weave のプロジェクトのサイドバーで Threads を選択して、Threads list view を開きます。
Weave サイドバーの Threads アイコン

Threads list view

  • プロジェクト内の最近のThreadsを一覧表示します
  • 列には、ターン数、開始時刻、最終更新時刻があります
  • 行をクリックすると、詳細ドロワーが開きます
Threads list view

スレッド詳細ドロワー

  • 任意の行をクリックすると、その行の詳細ドロワーが開きます。
  • スレッド内のすべてのターンを表示します。
  • ターンは開始順に表示されます (継続時間や終了時刻ではなく、開始時刻に基づきます) 。
  • Callレベルのメタデータ (レイテンシ、inputs、outputs) が含まれます。
  • ログされていれば、メッセージ内容や構造化データを表示することもできます。
  • ターンの実行全体を表示するには、スレッド詳細ドロワーからそのターンを開きます。これにより、そのターン中に発生したすべてのネストされた操作を詳しく確認できます。
  • ターンに LLM calls から抽出されたメッセージが含まれている場合、それらはチャットペインに表示されます。これらのメッセージは通常、サポート対象のインテグレーションによる calls (たとえば openai.ChatCompletion.create) に由来し、表示するには特定の条件を満たす必要があります。詳細は チャットビューの挙動 を参照してください。

チャットビューの挙動

チャットペインには、各ターンで行われた LLM Calls から抽出された構造化メッセージデータが表示されます。このビューでは、やり取りを会話形式で表示できます。
構造化された LLM メッセージを表示する Threads のチャットペイン

どのようなものがメッセージとして扱われますか

Weave は、1 つのターン内にある Calls のうち、LLM プロバイダーとの直接的なやり取り (たとえば、プロンプトを送信してレスポンスを受け取る処理) を表すものからメッセージを抽出します。ほかの Calls の内側にさらにネストされていない Calls だけが、メッセージとして表示されます。これにより、中間 step や集約された内部ロジックが重複して表示されるのを防げます。 通常、メッセージを生成するのは、自動的にパッチが適用されたサードパーティ製 SDK です。たとえば、次のようなものがあります。
  • openai.ChatCompletion.create
  • anthropic.Anthropic.completion

メッセージがない場合

ターンでメッセージが1件も出力されない場合、チャットペイン にはそのターンの空のメッセージセクションが表示されます。同じスレッド内の別のターンのメッセージは、引き続き チャットペイン に表示される場合があります。

Turn と chat の連動

  • ターンをクリックすると、チャットペインがそのターンのメッセージ位置までスクロールします (ピン留めの動作) 。
  • チャットペインをスクロールすると、ターンリストで対応するターンがハイライト表示されます。
項目をクリックすると、そのターンのトレース全体を開けます。 左上に戻るボタンが表示され、スレッド詳細ビューに戻れます。Weave は、この画面遷移の前後で UI の状態 (スクロール位置など) を保持しません。
スレッドビューに戻るための戻るボタンがある Threads 詳細ドロワー

SDK の使用

以下のセクションでは、Weave SDK を使用してプログラムからスレッドを作成および管理する方法を説明します。このセクションの各例では、アプリケーション内でターンとスレッドを整理するための異なる方法を示します。ほとんどの例では、スタブ関数内に独自の LLM call やシステムの挙動を実装する必要があります。
  • セッションまたは会話をトラッキングするには、weave.thread() コンテキストマネージャを使用します。
  • 論理的な処理に @weave.op を付けて、ターンまたはネストされた Call としてトラッキングします。
  • thread_id を渡すと、Weave はそのブロック内のすべての処理を同じスレッドにまとめます。thread_id を省略すると、Weave が一意の ID を自動生成します。
weave.thread() の戻り値は、thread_id プロパティを持つ ThreadContext オブジェクトです。これをログしたり、再利用したり、他のシステムに渡したりできます。 ネストされた weave.thread() コンテキストは、同じ thread_id を再利用しない限り、常に新しいスレッドを開始します。子コンテキストを終了しても、親コンテキストが中断されたり上書きされたりすることはありません。これにより、アプリのロジックに応じて、分岐したスレッド構造や階層的なスレッドオーケストレーションが可能になります。

基本的なスレッド作成

次のコード例では、weave.thread() を使用して、1 つ以上の処理を共通の thread_id でグループ化する方法を示します。これは、アプリケーションでスレッドを使い始めるためのわかりやすい方法です。
import weave

@weave.op
def say_hello(name: str) -> str:
    return f"Hello, {name}!"

# 新しいスレッドコンテキストを開始する
with weave.thread() as thread_ctx:
    print(f"Thread ID: {thread_ctx.thread_id}")
    say_hello("Bill Nye the Science Guy")

手動エージェントループの実装

この例では、@weave.op デコレーターと weave.thread() のコンテキスト管理を使って、会話型エージェントを手動で定義する方法を示します。process_user_message を呼び出すたびに、スレッド内に新しいターンが作成されます。独自のエージェントループを構築し、コンテキストやネストの扱いを完全に制御したい場合は、このパターンを使用できます。 短時間のやり取りには自動生成されたスレッド ID を使用するか、カスタムのセッション ID (user_session_123 など) を渡して、セッションをまたいでスレッドのコンテキストを保持できます。
import weave

class ConversationAgent:
    @weave.op
    def process_user_message(self, message: str) -> str:
        """
        TURN-LEVEL OPERATION: This represents one conversation turn.
        Only this function will be counted in thread statistics.
        """
        # ユーザーメッセージを保存する
        # ネストされたCallを通じてAIレスポンスを生成する
        response = self._generate_response(message)
        # アシスタントのレスポンスを保存する
        return response

    @weave.op
    def _generate_response(self, message: str) -> str:
        """NESTED CALL: Implementation details, not counted in thread stats."""
        context = self._retrieve_context(message)     # 別のネストされたCall
        intent = self._classify_intent(message)       # 別のネストされたCall
        response = self._call_llm(message, context)   # LLM Call(ネスト)
        return self._format_response(response)        # 最後のネストされたCall

    @weave.op
    def _retrieve_context(self, message: str) -> str:
        # Vector DB ルックアップ、ナレッジベースクエリなど
        return "retrieved_context"

    @weave.op
    def _classify_intent(self, message: str) -> str:
        # インテント分類ロジック
        return "general_inquiry"

    @weave.op
    def _call_llm(self, message: str, context: str) -> str:
        # OpenAI/Anthropic などの API 呼び出し
        return "llm_response"

    @weave.op
    def _format_response(self, response: str) -> str:
        # レスポンスのフォーマットロジック
        return f"Formatted: {response}"

# 使用方法: スレッドコンテキストは自動的に確立される
agent = ConversationAgent()

# スレッドコンテキストを確立する - process_user_message の各呼び出しがターンになる
with weave.thread() as thread_ctx:  # thread_id を自動生成
    print(f"Thread ID: {thread_ctx.thread_id}")

    # process_user_message の各呼び出しで 1 ターン + 複数のネストされたCallが作成される
    agent.process_user_message("Hello, help with setup")           # ターン 1
    agent.process_user_message("What languages do you recommend?") # ターン 2
    agent.process_user_message("Explain Python vs JavaScript")     # ターン 3

# 結果: 3 ターンのスレッド、合計約 15〜20 件のCall(ネストを含む)

# 代替方法: セッショントラッキングに明示的な thread_id を使用する
session_id = "user_session_123"
with weave.thread(session_id) as thread_ctx:
    print(f"Session Thread ID: {thread_ctx.thread_id}")  # "user_session_123"

    agent.process_user_message("Continue our previous conversation")  # このセッションのターン 1
    agent.process_user_message("Can you summarize what we discussed?") # このセッションのターン 2

Call 深度が不均衡な手動エージェント

この例では、スレッドコンテキストの適用方法によって、ターンを Call スタック内の異なる深度で定義できることを示します。サンプルでは 2 つのプロバイダ (OpenAI と Anthropic) を使用しており、それぞれターン境界に到達するまでの Call 深度が異なります。 すべてのターンは同じ thread_id を共有しますが、ターン境界はプロバイダのロジックに応じてスタック内の異なるレベルに現れます。これは、バックエンドごとに Call を異なる方法でトレースする必要がありつつ、それらを同じスレッドにまとめたい場合に便利です。
import weave
import random
import asyncio

class OpenAIProvider:
    """OpenAI branch: 2 levels deep call chain to turn boundary"""

    @weave.op
    def route_to_openai(self, user_input: str, thread_id: str) -> str:
        """Level 1: Route and prepare OpenAI request"""
        # 入力の検証、ルーティングロジック、基本的な前処理
        print(f"  L1: Routing to OpenAI for: {user_input}")

        # ここがターン境界 - スレッドコンテキストでラップする
        with weave.thread(thread_id):
            # レベル2を直接呼び出す - これによりコールチェーンの深さが生まれる
            return self.execute_openai_call(user_input)

    @weave.op
    def execute_openai_call(self, user_input: str) -> str:
        """Level 2: TURN BOUNDARY - Execute OpenAI API call"""
        print(f"    L2: Executing OpenAI API call")
        response = f"OpenAI GPT-4 response: {user_input}"
        return response


class AnthropicProvider:
    """Anthropic branch: 3 levels deep call chain to turn boundary"""

    @weave.op
    def route_to_anthropic(self, user_input: str, thread_id: str) -> str:
        """Level 1: Route and prepare Anthropic request"""
        # 入力の検証、ルーティングロジック、プロバイダーの選択
        print(f"  L1: Routing to Anthropic for: {user_input}")

        # レベル2を呼び出す - これによりコールチェーンの深さが生まれる
        return self.authenticate_anthropic(user_input, thread_id)

    @weave.op
    def authenticate_anthropic(self, user_input: str, thread_id: str) -> str:
        """Level 2: Handle Anthropic authentication and setup"""
        print(f"    L2: Authenticating with Anthropic")

        # 認証、レート制限、セッション管理
        auth_token = "anthropic_key_xyz_authenticated"

         # ここがターン境界 - レベル3でスレッドコンテキストでラップする
        with weave.thread(thread_id):
            # レベル3を呼び出す - コールチェーンをさらにネストする
            return self.execute_anthropic_call(user_input, auth_token)

    @weave.op
    def execute_anthropic_call(self, user_input: str, auth_token: str) -> str:
        """Level 3: TURN BOUNDARY - Execute Anthropic API call"""
        print(f"      L3: Executing Anthropic API call with auth")
        response = f"Anthropic Claude response (auth: {auth_token[:15]}...): {user_input}"
        return response


class MultiProviderAgent:
    """Main agent that routes between providers with different call chain depths"""

    def __init__(self):
        self.openai_provider = OpenAIProvider()
        self.anthropic_provider = AnthropicProvider()

    def handle_conversation_turn(self, user_input: str, thread_id: str) -> str:
        """
        Route to different providers with imbalanced call chain depths.
        Thread context is applied at different nesting levels in each chain.
        """
        # デモ用にプロバイダーをランダムに選択する
        use_openai = random.choice([True, False])

        if use_openai:
            print(f"Choosing OpenAI (2-level call chain)")
            # OpenAI: レベル1 → レベル2(ターン境界)
            response = self.openai_provider.route_to_openai(user_input, thread_id)
            return f"[OpenAI Branch] {response}"
        else:
            print(f"Choosing Anthropic (3-level call chain)")
            # Anthropic: レベル1 → レベル2 → レベル3(ターン境界)
            response = self.anthropic_provider.route_to_anthropic(user_input, thread_id)
            return f"[Anthropic Branch] {response}"


async def main():
    agent = MultiProviderAgent()
    conversation_id = "nested_depth_conversation_999"

    # 異なるコールチェーンの深さを持つマルチターン会話
    conversation_turns = [
        "What's deep learning?",
        "Explain neural network backpropagation",
        "How do attention mechanisms work?",
        "What's the transformer architecture?",
        "Compare CNNs vs RNNs"
    ]

    print(f"Starting conversation: {conversation_id}")

    for i, user_input in enumerate(conversation_turns, 1):
        print(f"\\n--- Turn {i} ---")
        print(f"User: {user_input}")

        # 異なるコールチェーンの深さにわたって同じthread_idを使用する
        response = agent.handle_conversation_turn(user_input, conversation_id)
        print(f"Agent: {response}")

if __name__ == "__main__":
    asyncio.run(main())

# 期待される結果: 5ターンを持つ単一スレッド
# - OpenAI のターン: コールチェーンのレベル2にスレッドコンテキスト
#   コールスタック: route_to_openai() → execute_openai_call() ← ここにスレッドコンテキスト
# - Anthropic のターン: コールチェーンのレベル3にスレッドコンテキスト
#   コールスタック: route_to_anthropic() → authenticate_anthropic() → execute_anthropic_call() ← ここにスレッドコンテキスト
# - すべてのターンが thread_id を共有: "nested_depth_conversation_999"
# - ターン境界は異なるコールスタックの深さでマークされる
# - コールチェーン内のサポート操作はターンではなくネストされた Call としてトラッキングされる

以前のセッションを再開する

以前に開始したセッションを再開し、同じスレッドに Calls を追加し続けなければならない場合があります。一方で、既存のセッションを再開できず、代わりに新しいスレッドを開始しなければならない場合もあります。 スレッド再開をオプションとして実装する場合は、thread_id パラメーターを None のままにしないでください。そうすると、スレッドのグループ化が無効になります。代わりに、必ず有効なスレッド ID を指定してください。新しいスレッドを作成するには、generate_id() のような関数を使用して一意の ID を生成してください。 thread_id が指定されていない場合、Weave の内部実装によってランダムな UUID v7 が自動的に生成されます。この動作は独自の generate_id() 関数で再現することも、任意の一意な文字列値を使用することもできます。
import weave
import uuidv7
import argparse

def generate_id():
    """Generate a unique thread ID using UUID v7."""
    return str(uuidv7.uuidv7())

@weave.op
def load_history(session_id):
    """Load conversation history for the given session."""
    # ここに実装を記述してください
    return []

# セッション再開のためのコマンドライン引数を解析する
parser = argparse.ArgumentParser()
parser.add_argument("--session-id", help="Existing session ID to resume")
args = parser.parse_args()

# スレッド ID を決定する: 既存のセッションを再開するか、新しいセッションを作成する
if args.session_id:
    thread_id = args.session_id
    print(f"Resuming session: {thread_id}")
else:
    thread_id = generate_id()
    print(f"Starting new session: {thread_id}")

# Call をトラッキングするためのスレッドコンテキストを確立する
with weave.thread(thread_id) as thread_ctx:
    # 会話履歴を読み込むか初期化する
    history = load_history(thread_id)
    print(f"Active thread ID: {thread_ctx.thread_id}")

    # ここにアプリケーションのロジックを記述してください。

ネストされたスレッド

この例では、複数の連携したスレッドを使って複雑なアプリケーションをどのように構成するかを示します。 各レイヤーはそれぞれ独自のスレッドコンテキストで実行されるため、責務を明確に分離できます。親アプリケーションのスレッドは、共有の ThreadContext を使ってスレッド ID を設定し、これらのレイヤーを連携させます。システムの異なる部分を個別に分析または監視しつつ、同じセッションにひも付けたい場合は、このパターンを使用してください。
import weave
from contextlib import contextmanager
from typing import Dict

# ネストされたスレッドを調整するためのグローバルスレッドコンテキスト
class ThreadContext:
    def __init__(self):
        self.app_thread_id = None
        self.infra_thread_id = None
        self.logic_thread_id = None

    def setup_for_request(self, request_id: str):
        self.app_thread_id = f"app_{request_id}"
        self.infra_thread_id = f"{self.app_thread_id}_infra"
        self.logic_thread_id = f"{self.app_thread_id}_logic"

# グローバルインスタンス
thread_ctx = ThreadContext()

class InfrastructureLayer:
    """Handles all infrastructure operations in a dedicated thread"""

    @weave.op
    def authenticate_user(self, user_id: str) -> Dict:
        # 認証ロジック...
        return {"user_id": user_id, "authenticated": True}

    @weave.op
    def call_payment_gateway(self, amount: float) -> Dict:
        # 支払い処理...
        return {"status": "approved", "amount": amount}

    @weave.op
    def update_inventory(self, product_id: str, quantity: int) -> Dict:
        # 在庫管理...
        return {"product_id": product_id, "updated": True}

    def execute_operations(self, user_id: str, order_data: Dict) -> Dict:
        """Execute all infrastructure operations in dedicated thread context"""
        with weave.thread(thread_ctx.infra_thread_id):
            auth_result = self.authenticate_user(user_id)
            payment_result = self.call_payment_gateway(order_data["amount"])
            inventory_result = self.update_inventory(order_data["product_id"], order_data["quantity"])

            return {
                "auth": auth_result,
                "payment": payment_result,
                "inventory": inventory_result
            }


class BusinessLogicLayer:
    """Handles business logic in a dedicated thread"""

    @weave.op
    def validate_order(self, order_data: Dict) -> Dict:
        # 検証ロジック...
        return {"valid": True}

    @weave.op
    def calculate_pricing(self, order_data: Dict) -> Dict:
        # 価格計算...
        return {"total": order_data["amount"], "tax": order_data["amount"] * 0.08}

    @weave.op
    def apply_business_rules(self, order_data: Dict) -> Dict:
        # ビジネスルール...
        return {"rules_applied": ["standard_processing"], "priority": "normal"}

    def execute_logic(self, order_data: Dict) -> Dict:
        """Execute all business logic in dedicated thread context"""
        with weave.thread(thread_ctx.logic_thread_id):
            validation = self.validate_order(order_data)
            pricing = self.calculate_pricing(order_data)
            rules = self.apply_business_rules(order_data)

            return {"validation": validation, "pricing": pricing, "rules": rules}


class OrderProcessingApp:
    """Main application orchestrator"""

    def __init__(self):
        self.infra = InfrastructureLayer()
        self.business = BusinessLogicLayer()

    @weave.op
    def process_order(self, user_id: str, order_data: Dict) -> Dict:
        """Main order processing - becomes a turn in the app thread"""

        # ネストされた操作をそれぞれの専用スレッドで実行する
        infra_results = self.infra.execute_operations(user_id, order_data)
        logic_results = self.business.execute_logic(order_data)

        # 最終オーケストレーション
        return {
            "order_id": f"order_12345",
            "status": "completed",
            "infra_results": infra_results,
            "logic_results": logic_results
        }


# グローバルスレッドコンテキストの調整を使った使用例
def handle_order_request(request_id: str, user_id: str, order_data: Dict):
    # このリクエスト用のスレッドコンテキストをセットアップする
    thread_ctx.setup_for_request(request_id)

    # アプリスレッドコンテキストで実行する
    with weave.thread(thread_ctx.app_thread_id):
        app = OrderProcessingApp()
        result = app.process_order(user_id, order_data)
        return result

# 使用例
order_result = handle_order_request(
    request_id="req_789",
    user_id="user_001",
    order_data={"product_id": "laptop", "quantity": 1, "amount": 1299.99}
)

# 想定されるスレッド構造:
#
# App Thread: app_req_789
# └── Turn: process_order() ← メインオーケストレーション
#
# Infra Thread: app_req_789_infra
# ├── Turn: authenticate_user() ← インフラストラクチャー操作 1
# ├── Turn: call_payment_gateway() ← インフラストラクチャー操作 2
# └── Turn: update_inventory() ← インフラストラクチャー操作 3
#
# Logic Thread: app_req_789_logic
# ├── Turn: validate_order() ← ビジネスロジック操作 1
# ├── Turn: calculate_pricing() ← ビジネスロジック操作 2
# └── Turn: apply_business_rules() ← ビジネスロジック操作 3
#
# メリット:
# - スレッド間での関心の明確な分離
# - スレッド ID のパラメーター受け渡しが不要
# - アプリ/インフラ/ロジック層の独立したモニタリング
# - スレッドコンテキストによるグローバルな調整

API 仕様

以下のセクションでは、Threads クエリエンドポイント、そのリクエスト/レスポンスのスキーマ、およびスレッドデータをプログラムから取得する際に使用できる一般的なクエリパターンについて説明します。

エンドポイント

エンドポイント: POST /threads/query

リクエストスキーマ

class ThreadsQueryReq:
    project_id: str
    limit: Optional[int] = None
    offset: Optional[int] = None
    sort_by: Optional[list[SortBy]] = None  # サポートされるフィールド: thread_id, turn_count, start_time, last_updated
    sortable_datetime_after: Optional[datetime] = None   # グラニュール最適化によるスレッドのフィルター
    sortable_datetime_before: Optional[datetime] = None  # グラニュール最適化によるスレッドのフィルター

レスポンススキーマ

class ThreadSchema:
    thread_id: str           # スレッドの一意の ID
    turn_count: int          # このスレッド内の turn Call の数
    start_time: datetime     # このスレッド内の turn Call の最も早い開始時刻
    last_updated: datetime   # このスレッド内の turn Call の最も遅い終了時刻

class ThreadsQueryRes:
    threads: List[ThreadSchema]

最近更新されたアクティブなスレッドをクエリする

この例では、直近で更新されたスレッドを50件取得します。my-project は実際のプロジェクト ID に置き換えてください。
# 最近更新されたアクティブなスレッドを取得する
response = client.threads_query(ThreadsQueryReq(
    project_id="my-project",
    sort_by=[SortBy(field="last_updated", direction="desc")],
    limit=50
))

for thread in response.threads:
    print(f"Thread {thread.thread_id}: {thread.turn_count} turns, last active {thread.last_updated}")

アクティビティレベル別にスレッドをクエリする

この例では、ターン数が多い順に、最もアクティブなスレッド20件を取得します。
# 最もアクティブなスレッドを取得する(ターン数が多い順)
response = client.threads_query(ThreadsQueryReq(
    project_id="my-project",
    sort_by=[SortBy(field="turn_count", direction="desc")],
    limit=20
))

直近のスレッドのみをクエリする

この例では、過去24時間以内に開始されたスレッドを返します。timedeltadays の値を調整すると、期間を変更できます。
from datetime import datetime, timedelta

# 過去24時間以内に開始されたスレッドを取得する
yesterday = datetime.now() - timedelta(days=1)
response = client.threads_query(ThreadsQueryReq(
    project_id="my-project",
    sortable_datetime_after=yesterday,
    sort_by=[SortBy(field="start_time", direction="desc")]
))