メインコンテンツへスキップ
AutoGen は、AI エージェントやアプリケーションを構築するための Microsoft 製フレームワークです。複雑なマルチエージェントシステムを簡単に構築でき、対話型 AI 向けのコンポーネント (AgentChat) 、マルチエージェントのコア機能 (Core) 、外部サービスとのインテグレーション (Extensions) を提供します。さらに、AutoGen にはノーコードでエージェントをプロトタイピングできる Studio も用意されています。詳しくは、AutoGen 公式ドキュメントをご覧ください。
このガイドは、AutoGen の基本的な理解があることを前提としています。
Weave は AutoGen と統合されており、マルチエージェントアプリケーションの実行をトレースして可視化できます。Weave を初期化するだけで、autogen_agentchatautogen_coreautogen_ext 内のやり取りを自動的にトラッキングできます。このガイドでは、AutoGen と Weave を組み合わせた使い方を、さまざまな例を通して説明します。

前提条件

始める前に、AutoGen と Weave がインストールされていることを確認してください。また、使用する予定の LLM プロバイダ (例: OpenAI、Anthropic) 向けの SDK も必要です。
pip install autogen_agentchat "autogen_ext[openai,anthropic]" weave 
APIキーを環境変数として設定します:
import os

os.environ["OPENAI_API_KEY"] = "<your-openai-api-key>"
os.environ["ANTHROPIC_API_KEY"] = "<your-anthropic-api-key>"

基本的なセットアップ

トレースの取得を開始するには、スクリプトの先頭で Weave を初期化します。
import weave
weave.init("autogen-demo")

シンプルなモデルクライアントをトレースする

Weave では、AutoGen 内でモデルクライアントに直接行われる call / calls をトレースできます。

クライアントの create call / calls をトレースする

この例では、OpenAIChatCompletionClient に対する単一の call / calls をトレースする方法を示します。
import asyncio
from autogen_core.models import UserMessage
from autogen_ext.models.openai import OpenAIChatCompletionClient
# from autogen_ext.models.anthropic import AnthropicChatCompletionClient

async def simple_client_call(model_name = "gpt-4o"):
    model_client = OpenAIChatCompletionClient(
        model=model_name,
    )
    # または、Anthropic や他のモデルクライアントを使用できます
    # model_client = AnthropicChatCompletionClient(
        # model="claude-3-haiku-20240307"
    # )
    response = await model_client.create(
        [UserMessage(content="Hello, how are you?", source="user")]
    )
    print(response)

asyncio.run(simple_client_call())

autogen-simple-client.png

ストリーミングでクライアントの create call / calls をトレースする

Weave はストリーミングされる応答のトレースもサポートします。

async def simple_client_call_stream(model_name = "gpt-4o"):
    openai_model_client = OpenAIChatCompletionClient(model=model_name)
    async for item in openai_model_client.create_stream(
        [UserMessage(content="Hello, how are you?", source="user")]
    ):
      print(item, flush=True, end="")

asyncio.run(simple_client_call_stream())

autogen-streaming-client.png

Weave はキャッシュされた call / calls を記録します

AutoGen の ChatCompletionCache を使用でき、Weave はこれらのやり取りをトレースして、応答がキャッシュから返されたものか、新しい call / calls によるものかを表示します。

from autogen_ext.models.cache import ChatCompletionCache

async def run_cache_client(model_name = "gpt-4o"):
      openai_model_client = OpenAIChatCompletionClient(model=model_name)
      cache_client = ChatCompletionCache(openai_model_client,)

      response = await cache_client.create(
          [UserMessage(content="Hello, how are you?", source="user")]
      )
      print(response)  # OpenAIからの応答を出力するはず
      response = await cache_client.create(
          [UserMessage(content="Hello, how are you?", source="user")]
      )
      print(response)  # キャッシュされた応答を出力するはず

asyncio.run(run_cache_client())

autogen-cached-client.png

ツールcall / callsを行うエージェントのトレース

Weave はエージェントとそのツールの使用状況をトレースし、エージェントがどのようにツールを選択して実行しているかを可視化します。
from autogen_agentchat.agents import AssistantAgent

async def get_weather(city: str) -> str:
    return f"The weather in {city} is 73 degrees and Sunny."

async def run_agent_with_tools(model_name = "gpt-4o"):
    model_client = OpenAIChatCompletionClient(model=model_name)

    agent = AssistantAgent(
        name="weather_agent",
        model_client=model_client,
        tools=[get_weather],
        system_message="You are a helpful assistant.",
        reflect_on_tool_use=True,
    )
    # コンソールへのストリーミング出力の場合:
    # await Console(agent.run_stream(task="What is the weather in New York?"))
    res = await agent.run(task="What is the weather in New York?")
    print(res)
    await model_client.close()

asyncio.run(run_agent_with_tools())

autogen-agent-tools.png

GroupChat のトレース - ラウンドロビン

Weave は RoundRobinGroupChat などのグループチャット内のやり取りをトレースするため、エージェント間の会話の流れを追跡できます。

from autogen_agentchat.conditions import TextMentionTermination
from autogen_agentchat.teams import RoundRobinGroupChat

# グループチャット全体をトレースするためにweave opをここに追加する
# 完全に任意だが、使用することを強く推奨する

@weave.op
async def run_round_robin_group_chat(model_name="gpt-4o"):
    model_client = OpenAIChatCompletionClient(model=model_name)

    primary_agent = AssistantAgent(
        "primary",
        model_client=model_client,
        system_message="You are a helpful AI assistant.",
    )

    critic_agent = AssistantAgent(
        "critic",
        model_client=model_client,
        system_message="Provide constructive feedback. Respond with 'APPROVE' to when your feedbacks are addressed.",
    )

    text_termination = TextMentionTermination("APPROVE")

    team = RoundRobinGroupChat(
        [primary_agent, critic_agent], termination_condition=text_termination
    )
    await team.reset()
    # コンソールへのストリーミング出力の場合:
    # await Console(team.run_stream(task="Write a short poem about the fall season."))
    result = await team.run(task="Write a short poem about the fall season.")
    print(result)
    await model_client.close()


asyncio.run(run_round_robin_group_chat())

round_robin_group_chat.png

メモリのトレース

AutoGen’s メモリコンポーネントは Weave でトレースできます。@weave.op() を使用すると、可読性を高めるために、メモリ操作を単一のトレースにまとめることができます。

from autogen_core.memory import ListMemory, MemoryContent, MemoryMimeType

# メモリのadd call / callsとget call / callsを単一のトレースにまとめてトレースするために
# weave opをここに追加しています
# 完全に任意ですが、使用することを強くお勧めします

@weave.op
async def run_memory_agent(model_name="gpt-4o"):
    user_memory = ListMemory()

    await user_memory.add(
        MemoryContent(
            content="The weather should be in metric units",
            mime_type=MemoryMimeType.TEXT,
        )
    )

    await user_memory.add(
        MemoryContent(
            content="Meal recipe must be vegan", mime_type=MemoryMimeType.TEXT
        )
    )

    async def get_weather(city: str, units: str = "imperial") -> str:
        if units == "imperial":
            return f"The weather in {city} is 73 °F and Sunny."
        elif units == "metric":
            return f"The weather in {city} is 23 °C and Sunny."
        else:
            return f"Sorry, I don't know the weather in {city}."

    model_client = OpenAIChatCompletionClient(model=model_name)
    assistant_agent = AssistantAgent(
        name="assistant_agent",
        model_client=model_client,
        tools=[get_weather],
        memory=[user_memory],
    )

    # コンソールへのストリーミング出力の場合:
    # stream = assistant_agent.run_stream(task="What is the weather in New York?")
    # await Console(stream)
    result = await assistant_agent.run(task="What is the weather in New York?")
    print(result)
    await model_client.close()


asyncio.run(run_memory_agent())

autogen-memory.png

RAG ワークフローのトレース

ChromaDBVectorMemory のようなメモリシステムを使ったドキュメントのインデックス作成や検索を含む Retrieval Augmented Generation (RAG) ワークフローは、トレースできます。RAG プロセスに @weave.op() を付けると、処理の流れ全体を可視化しやすくなります。
この RAG の例では chromadb が必要です。pip install chromadb でインストールしてください。
# !pip install -q chromadb 
# 環境に chromadb がインストールされていることを確認してください: `pip install chromadb`

import re
from typing import List
import os
from pathlib import Path

import aiofiles
import aiohttp

from autogen_core.memory import Memory, MemoryContent, MemoryMimeType
from autogen_ext.memory.chromadb import (
    ChromaDBVectorMemory,
    PersistentChromaDBVectorMemoryConfig,
)

class SimpleDocumentIndexer:
    def __init__(self, memory: Memory, chunk_size: int = 1500) -> None:
        self.memory = memory
        self.chunk_size = chunk_size

    async def _fetch_content(self, source: str) -> str:
        if source.startswith(("http://", "https://")):
            async with aiohttp.ClientSession() as session:
                async with session.get(source) as response:
                    return await response.text()
        else:
            async with aiofiles.open(source, "r", encoding="utf-8") as f:
                return await f.read()

    def _strip_html(self, text: str) -> str:
        text = re.sub(r"<[^>]*>", " ", text)
        text = re.sub(r"\\s+", " ", text)
        return text.strip()

    def _split_text(self, text: str) -> List[str]:
        chunks: list[str] = []
        for i in range(0, len(text), self.chunk_size):
            chunk = text[i : i + self.chunk_size]
            chunks.append(chunk.strip())
        return chunks

    async def index_documents(self, sources: List[str]) -> int:
        total_chunks = 0
        for source in sources:
            try:
                content = await self._fetch_content(source)
                if "<" in content and ">" in content:
                    content = self._strip_html(content)
                chunks = self._split_text(content)
                for i, chunk in enumerate(chunks):
                    await self.memory.add(
                        MemoryContent(
                            content=chunk,
                            mime_type=MemoryMimeType.TEXT,
                            metadata={"source": source, "chunk_index": i},
                        )
                    )
                total_chunks += len(chunks)
            except Exception as e:
                print(f"{source} のインデックス中にエラーが発生しました: {str(e)}")
        return total_chunks

@weave.op
async def run_rag_agent(model_name="gpt-4o"):
    rag_memory = ChromaDBVectorMemory(
        config=PersistentChromaDBVectorMemoryConfig(
            collection_name="autogen_docs",
            persistence_path=os.path.join(str(Path.home()), ".chromadb_autogen_weave"),
            k=3,
            score_threshold=0.4,
        )
    )
    # await rag_memory.clear() # 既存のメモリをクリアする場合はコメントアウトを解除してください

    async def index_autogen_docs() -> None:
        indexer = SimpleDocumentIndexer(memory=rag_memory)
        sources = [
            "https://raw.githubusercontent.com/microsoft/autogen/main/README.md",
            "https://microsoft.github.io/autogen/dev/user-guide/agentchat-user-guide/tutorial/agents.html",
        ]
        chunks: int = await indexer.index_documents(sources)
        print(f"{len(sources)} 件の AutoGen ドキュメントから {chunks} チャンクをインデックスしました")
    
    # コレクションが空の場合、または再インデックスしたい場合のみ実行してください
    # デモ目的では、毎回インデックスするか、インデックス済みかどうかを確認することができます。
    # この例では run のたびにインデックスを試みます。チェックの追加を検討してください。
    await index_autogen_docs()

    model_client = OpenAIChatCompletionClient(model=model_name)
    rag_assistant = AssistantAgent(
        name="rag_assistant",
        model_client=model_client,
        memory=[rag_memory],
    )
    
    # コンソールへのストリーミング出力を行う場合:
    # stream = rag_assistant.run_stream(task="What is AgentChat?")
    # await Console(stream)
    result = await rag_assistant.run(task="What is AgentChat?")
    print(result)

    await rag_memory.close()
    await model_client.close()

asyncio.run(run_rag_agent())
autogen-rag.png

エージェントランタイムのトレース

Weave では、SingleThreadedAgentRuntime のような AutoGen の agent runtime 内の操作をトレースできます。ランタイムの実行関数を @weave.op() でラップすると、関連するトレースをグループ化できます。
from dataclasses import dataclass
from typing import Callable

from autogen_core import (
    DefaultTopicId,
    MessageContext,
    RoutedAgent,
    default_subscription,
    message_handler,
    AgentId,
    SingleThreadedAgentRuntime
)

@dataclass
class Message:
    content: int

@default_subscription
class Modifier(RoutedAgent):
    def __init__(self, modify_val: Callable[[int], int]) -> None:
        super().__init__("A modifier agent.")
        self._modify_val = modify_val

    @message_handler
    async def handle_message(self, message: Message, ctx: MessageContext) -> None:
        val = self._modify_val(message.content)
        print(f"{'-'*80}\\nModifier:\\nModified {message.content} to {val}")
        await self.publish_message(Message(content=val), DefaultTopicId())

@default_subscription
class Checker(RoutedAgent):
    def __init__(self, run_until: Callable[[int], bool]) -> None:
        super().__init__("A checker agent.")
        self._run_until = run_until

    @message_handler
    async def handle_message(self, message: Message, ctx: MessageContext) -> None:
        if not self._run_until(message.content):
            print(f"{'-'*80}\\nChecker:\\n{message.content} passed the check, continue.")
            await self.publish_message(
                Message(content=message.content), DefaultTopicId()
            )
        else:
            print(f"{'-'*80}\\nChecker:\\n{message.content} failed the check, stopping.")

# エージェントランタイム全体のcall / callsを単一のtraceとして追跡するために
# ここにweave opを追加する
# 完全にoptionalだが、使用することを強く推奨する

@weave.op
async def run_agent_runtime() -> None:
    runtime = SingleThreadedAgentRuntime()

    await Modifier.register(
        runtime,
        "modifier",
        lambda: Modifier(modify_val=lambda x: x - 1),
    )

    await Checker.register(
        runtime,
        "checker",
        lambda: Checker(run_until=lambda x: x <= 1),
    )

    runtime.start()
    await runtime.send_message(Message(content=3), AgentId("checker", "default"))
    await runtime.stop_when_idle()

asyncio.run(run_agent_runtime())

autogen-runtime.png

ワークフローのトレース (逐次)

エージェント間のやり取りの順序を定義する、複雑なエージェントワークフローをトレースできます。@weave.op() を使用すると、ワークフロー全体の高レベルなトレースを提供できます。
from autogen_core import TopicId, type_subscription
from autogen_core.models import ChatCompletionClient, SystemMessage, UserMessage

@dataclass
class WorkflowMessage:
    content: str

concept_extractor_topic_type = "ConceptExtractorAgent"
writer_topic_type = "WriterAgent"
format_proof_topic_type = "FormatProofAgent"
user_topic_type = "User"

@type_subscription(topic_type=concept_extractor_topic_type)
class ConceptExtractorAgent(RoutedAgent):
    def __init__(self, model_client: ChatCompletionClient) -> None:
        super().__init__("コンセプト抽出エージェント。")
        self._system_message = SystemMessage(
            content=(
                "あなたはマーケティングアナリストです。プロダクトの説明文をもとに、以下を特定してください:\n"
                "- 主な機能\n"
                "- ターゲットオーディエンス\n"
                "- 独自の強み\n\n"
            )
        )
        self._model_client = model_client

    @message_handler
    async def handle_user_description(self, message: WorkflowMessage, ctx: MessageContext) -> None:
        prompt = f"プロダクトの説明: {message.content}"
        llm_result = await self._model_client.create(
            messages=[self._system_message, UserMessage(content=prompt, source=self.id.key)],
            cancellation_token=ctx.cancellation_token,
        )
        response = llm_result.content
        assert isinstance(response, str)
        print(f"{'-'*80}\\n{self.id.type}:\\n{response}")
        await self.publish_message(
            WorkflowMessage(response), topic_id=TopicId(writer_topic_type, source=self.id.key)
        )

@type_subscription(topic_type=writer_topic_type)
class WriterAgent(RoutedAgent):
    def __init__(self, model_client: ChatCompletionClient) -> None:
        super().__init__("ライターエージェント。")
        self._system_message = SystemMessage(
            content=(
                "あなたはマーケティングコピーライターです。機能、ターゲットオーディエンス、独自の強みを説明するテキストをもとに、"
                "これらのポイントを強調した説得力のあるマーケティングコピー(ニュースレターのセクションなど)を作成してください。"
                "出力は短く(約150語)、コピーのみを1つのテキストブロックとして出力してください。"
            )
        )
        self._model_client = model_client
    
    @message_handler
    async def handle_intermediate_text(self, message: WorkflowMessage, ctx: MessageContext) -> None:
        prompt = f"以下はプロダクトに関する情報です:\\n\\n{message.content}"
        llm_result = await self._model_client.create(
            messages=[self._system_message, UserMessage(content=prompt, source=self.id.key)],
            cancellation_token=ctx.cancellation_token,
        )
        response = llm_result.content
        assert isinstance(response, str)
        print(f"{'-'*80}\\n{self.id.type}:\\n{response}")
        await self.publish_message(
            WorkflowMessage(response), topic_id=TopicId(format_proof_topic_type, source=self.id.key)
        )

@type_subscription(topic_type=format_proof_topic_type)
class FormatProofAgent(RoutedAgent):
    def __init__(self, model_client: ChatCompletionClient) -> None:
        super().__init__("フォーマット&校正エージェント。")
        self._system_message = SystemMessage(
            content=(
                "あなたは編集者です。下書きのコピーをもとに、文法を修正し、明瞭さを向上させ、一貫したトーンを確保し、"
                "フォーマットを整えて洗練させてください。改善した最終コピーを1つのテキストブロックとして出力してください。"
            )
        )
        self._model_client = model_client

    @message_handler
    async def handle_intermediate_text(self, message: WorkflowMessage, ctx: MessageContext) -> None:
        prompt = f"下書きコピー:\\n{message.content}."
        llm_result = await self._model_client.create(
            messages=[self._system_message, UserMessage(content=prompt, source=self.id.key)],
            cancellation_token=ctx.cancellation_token,
        )
        response = llm_result.content
        assert isinstance(response, str)
        print(f"{'-'*80}\\n{self.id.type}:\\n{response}")
        await self.publish_message(
            WorkflowMessage(response), topic_id=TopicId(user_topic_type, source=self.id.key)
        )

@type_subscription(topic_type=user_topic_type)
class UserAgent(RoutedAgent):
    def __init__(self) -> None:
        super().__init__("最終コピーをユーザーに出力するユーザーエージェント。")

    @message_handler
    async def handle_final_copy(self, message: WorkflowMessage, ctx: MessageContext) -> None:
        print(f"\\n{'-'*80}\\n{self.id.type} が最終コピーを受信しました:\\n{message.content}")

# エージェントワークフロー全体を単一のトレースで追跡するために
# ここに weave op を追加しています
# 省略可能ですが、使用することを強くお勧めします

@weave.op(call_display_name="逐次エージェントワークフロー")
async def run_agent_workflow(model_name="gpt-4o"):
    model_client = OpenAIChatCompletionClient(model=model_name)
    runtime = SingleThreadedAgentRuntime()

    await ConceptExtractorAgent.register(runtime, type=concept_extractor_topic_type, factory=lambda: ConceptExtractorAgent(model_client=model_client))
    await WriterAgent.register(runtime, type=writer_topic_type, factory=lambda: WriterAgent(model_client=model_client))
    await FormatProofAgent.register(runtime, type=format_proof_topic_type, factory=lambda: FormatProofAgent(model_client=model_client))
    await UserAgent.register(runtime, type=user_topic_type, factory=lambda: UserAgent())

    runtime.start()
    await runtime.publish_message(
        WorkflowMessage(
            content="飲み物を24時間冷たく保つ、環境に優しいステンレス製ウォーターボトル"
        ),
        topic_id=TopicId(concept_extractor_topic_type, source="default"),
    )
    await runtime.stop_when_idle()
    await model_client.close()

asyncio.run(run_agent_workflow())
autogen-sequential-workflow.png

コードエグゼキュータのトレース

Docker 必須 この例では Docker を使ってコードを実行するため、すべての環境 (例: Colab 上で直接実行する場合) で動作するとは限りません。試す場合は、Docker がローカルで実行中であることを確認してください。
Weave は、AutoGen エージェントによるコードの生成と実行をトレースします。

import tempfile
from autogen_core import DefaultTopicId
from autogen_core.code_executor import CodeBlock, CodeExecutor
from autogen_core.models import (
    AssistantMessage,
    ChatCompletionClient,
    LLMMessage,
    SystemMessage,
    UserMessage,
)
from autogen_ext.code_executors.docker import DockerCommandLineCodeExecutor


@dataclass
class CodeGenMessage:
    content: str

@default_subscription
class Assistant(RoutedAgent):
    def __init__(self, model_client: ChatCompletionClient) -> None:
        super().__init__("An assistant agent.")
        self._model_client = model_client
        self._chat_history: List[LLMMessage] = [
           SystemMessage(
                content="""Write Python script in markdown block, and it will be executed.
Always save figures to file in the current directory. Do not use plt.show(). All code required to complete this task must be contained within a single response.""",
            )
        ]

    @message_handler
    async def handle_message(self, message: CodeGenMessage, ctx: MessageContext) -> None:
        self._chat_history.append(UserMessage(content=message.content, source="user"))
        result = await self._model_client.create(self._chat_history)
        print(f"\\n{'-'*80}\\nAssistant:\\n{result.content}")
        self._chat_history.append(AssistantMessage(content=result.content, source="assistant"))
        await self.publish_message(CodeGenMessage(content=result.content), DefaultTopicId())

def extract_markdown_code_blocks(markdown_text: str) -> List[CodeBlock]:
    pattern = re.compile(r"```(?:\\s*([\\w\\+\\-]+))?\\n([\\s\\S]*?)```")
    matches = pattern.findall(markdown_text)
    code_blocks: List[CodeBlock] = []
    for match in matches:
        language = match[0].strip() if match[0] else ""
        code_content = match[1]
        code_blocks.append(CodeBlock(code=code_content, language=language))
    return code_blocks

@default_subscription
class Executor(RoutedAgent):
    def __init__(self, code_executor: CodeExecutor) -> None:
        super().__init__("An executor agent.")
        self._code_executor = code_executor

    @message_handler
    async def handle_message(self, message: CodeGenMessage, ctx: MessageContext) -> None:
        code_blocks = extract_markdown_code_blocks(message.content)
        if code_blocks:
            result = await self._code_executor.execute_code_blocks(
                code_blocks, cancellation_token=ctx.cancellation_token
            )
            print(f"\\n{'-'*80}\\nExecutor:\\n{result.output}")
            await self.publish_message(CodeGenMessage(content=result.output), DefaultTopicId())

# コード生成ワークフロー全体を単一のトレースで追跡するために
# ここにweave opを追加しています
# 任意ですが、使用することを強くお勧めします

@weave.op(call_display_name="CodeGen Agent Workflow")
async def run_codegen(model_name="gpt-4o"): # モデルを更新
    work_dir = tempfile.mkdtemp()
    runtime = SingleThreadedAgentRuntime()

    # この例を実行する前にDockerが起動していることを確認してください
    try:
        async with DockerCommandLineCodeExecutor(work_dir=work_dir) as executor:
            model_client = OpenAIChatCompletionClient(model=model_name)
            await Assistant.register(runtime, "assistant", lambda: Assistant(model_client=model_client))
            await Executor.register(runtime, "executor", lambda: Executor(executor))

            runtime.start()
            await runtime.publish_message(
                CodeGenMessage(content="Create a plot of NVDA vs TSLA stock returns YTD from 2024-01-01."),
                DefaultTopicId(),
            )
            await runtime.stop_when_idle()
            await model_client.close()
    except Exception as e:
        print(f"Dockerコード実行例を実行できませんでした: {e}")
        print("Dockerがインストールされ、起動していることを確認してください。")
    finally:
        import shutil
        shutil.rmtree(work_dir)


asyncio.run(run_codegen())
autogen-codegen.png

詳しくはこちら

このガイドは、Weave と AutoGen を統合するための出発点です。Weave UI では、エージェントのやり取り、モデルのcall / calls、ツールの使用状況に関する詳細なトレースを確認できます。