> ## Documentation Index
> Fetch the complete documentation index at: https://docs.wandb.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# OpenAI Realtime API

> Weave を使用して、OpenAI Realtime API への呼び出しを自動的にトレースします。

Weave の [OpenAI Realtime API](https://developers.openai.com/api/docs/guides/realtime/) とのインテグレーションにより、アプリケーションで発生する音声対音声のやり取りをリアルタイムで自動的にトレースできます。これを使用すると、エージェントとユーザーの会話をキャプチャして、エージェントのパフォーマンスを確認・評価できます。

このガイドでは、アプリケーションで Realtime API のトレースを有効にする方法を説明し、ローカルで実行できる 2 つのエンドツーエンドの音声アシスタントの例を紹介します。

<div id="integrate-realtime-traces">
  ## リアルタイムトレースを統合する
</div>

このセクションでは、あらゆるアプリケーションで OpenAI Realtime API の Weave トレースを有効にするために必要な最小限のコードを示します。

Weave は OpenAI Realtime API に自動的にパッチを適用するため、アプリケーションでのオーディオインタラクションのキャプチャを開始するには、数行のコードを追加するだけです。以下のコードでは、Weave と Realtime API のインテグレーションをインポートします。

```python lines {2,5} theme={null}
import weave
from weave.integrations import patch_openai_realtime

weave.init("your-team-name/your-project-name")
patch_openai_realtime()

# アプリケーションのロジック
```

コードにimportして実行すると、Weave はユーザーと OpenAI Realtime API 間のやり取りを自動的にトレースします。

<div id="run-a-realtime-voice-assistant-with-the-openai-agents-sdk">
  ### OpenAI Agents SDK を使用してリアルタイム音声アシスタントを実行する
</div>

この例では、マイクの音声を OpenAI の Realtime API にストリーミングし、AI の音声による応答をローカルマシンのスピーカーで再生するリアルタイム音声アシスタントを実行します。このアプリケーションは `RealtimeAgent` と `RealtimeRunner` を使用する OpenAI Agents SDK を利用し、`patch_openai_realtime()` を適用して Weave のトレースを有効にします。Realtime セッションの管理を上位レベルの Agents SDK に任せたい場合は、この方法を使用します。

例を実行するには、次の手順を実行します：

1. Python 環境を起動して、次のライブラリをインストールします:

   <Tabs>
     <Tab title="uv">
       ```bash theme={null}
       uv add weave openai-agents websockets pyaudio numpy
       ```
     </Tab>

     <Tab title="pip">
       ```bash theme={null}
       pip install weave openai-agents websockets pyaudio numpy
       ```
     </Tab>
   </Tabs>

2. `weave_voice_assistant.py` という名前のファイルを作成し、次のコードを追加します。

   ハイライトされた行は、アプリケーション内での Weave のインテグレーションを示しています。残りのコードは、基本的な音声アシスタントアプリを作成するものです。

   <Accordion title="weave_voice_assistant.py">
     ```python lines {8,11,14,51-55} theme={null}
     import argparse
     import asyncio
     import queue
     import sys
     import termios
     import threading
     import tty
     import weave
     import pyaudio
     import numpy as np
     from weave.integrations import patch_openai_realtime
     from agents.realtime import RealtimeAgent, RealtimeRunner

     DEFAULT_WEAVE_PROJECT = "<your-team-name/your-project-name>"

     FORMAT = pyaudio.paInt16
     RATE = 24000  # Required by the OpenAI Realtime API.
     CHUNK = 1024
     MAX_INPUT_CHANNELS = 1
     MAX_OUTPUT_CHANNELS = 1

     INP_DEV_IDX = None
     OUT_DEV_IDX = None

     # Weave プロジェクト名とオーディオデバイス選択のための CLI 引数を解析します。
     def parse_args():
         parser = argparse.ArgumentParser(description="Realtime agent with Weave logging")
         parser.add_argument(
             "--weave-project",
             default=DEFAULT_WEAVE_PROJECT,
             help=f"Weave project name (default: {DEFAULT_WEAVE_PROJECT})",
             dest="weave_project"
         )
         parser.add_argument(
             "--input-device",
             type=int,
             default=None,
             help="PyAudio input (mic) device index. Defaults to system default. Run mic_detect.py to list devices.",
             dest="input_device"
         )
         parser.add_argument(
             "--output-device",
             type=int,
             default=None,
             help="PyAudio output (speaker) device index. Defaults to system default. Run mic_detect.py to list devices.",
             dest="output_device"
         )
         return parser.parse_args()


     # Weave を初期化し、トレース用に OpenAI Realtime API にパッチを適用します。
     def init_weave(project_name: str | None = None) -> None:
         name = project_name or DEFAULT_WEAVE_PROJECT
         weave.init(name)
         patch_openai_realtime()  # Enables automatic tracing of Realtime API sessions.


     mic_enabled = True

     # 't' キーを監視してマイクのオン/オフを切り替えます。デーモンスレッドで実行されます。
     def start_keylistener():
         global mic_enabled
         fd = sys.stdin.fileno()
         old_settings = termios.tcgetattr(fd)
         try:
             tty.setcbreak(fd)
             while True:
                 ch = sys.stdin.read(1)
                 if ch.lower() == 't':
                     mic_enabled = not mic_enabled
                     state = "ON" if mic_enabled else "OFF"
                     print(f"\n🎙  Mic {state} (press t to toggle)")
                 elif ch == '\x03':  # Ctrl-C
                     break
         finally:
             termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)


     # バックグラウンドスレッドでオーディオキューを消費し、スピーカーに書き込みます。
     def play_audio(output_stream: pyaudio.Stream, audio_output_queue: queue.Queue):
         while True:
             data = audio_output_queue.get()
             if data is None:
                 break
             output_stream.write(data)


     # オーディオストリームを開き、Realtime セッションを開始して、送受信ループを実行します。
     async def main(*, input_device_index: int | None = None, output_device_index: int | None = None):
         p = pyaudio.PyAudio()

         if input_device_index is None:
             input_device_index = int(p.get_default_input_device_info()['index'])
         if output_device_index is None:
             output_device_index = int(p.get_default_output_device_info()['index'])

         # pyaudio エラーを避けるため、チャンネル数をデバイスの上限に制限します。
         input_info = p.get_device_info_by_index(input_device_index)
         output_info = p.get_device_info_by_index(output_device_index)

         input_channels = min(int(input_info['maxInputChannels']), MAX_INPUT_CHANNELS)
         output_channels = min(int(output_info['maxOutputChannels']), MAX_OUTPUT_CHANNELS)

         mic = p.open(
             format=FORMAT,
             channels=input_channels,
             rate=RATE,
             input=True,
             output=False,
             frames_per_buffer=CHUNK,
             input_device_index=input_device_index,
             start=False,
         )
         speaker = p.open(
             format=FORMAT,
             channels=output_channels,
             rate=RATE,
             input=False,
             output=True,
             frames_per_buffer=CHUNK,
             output_device_index=output_device_index,
             start=False,
         )
         mic.start_stream()
         speaker.start_stream()

         # 割り込み時に再生中のオーディオをフラッシュできるよう、キューを介してバッファリングします。
         audio_output_queue = queue.Queue()
         threading.Thread(
             target=play_audio, args=(speaker, audio_output_queue), daemon=True
         ).start()

         s_agent = RealtimeAgent(
             name="Speech Assistant",
             instructions="You are a tool using AI. Use tools to accomplish a task whenever possible"
         )

         s_runner = RealtimeRunner(s_agent, config={
             "model_settings": {
                 "model_name": "gpt-realtime",
                 "modalities": ["audio"],
                 "output_modalities": ["audio"],
                 "input_audio_format": "pcm16",
                 "output_audio_format": "pcm16",
                 "speed": 1.2,
                 "turn_detection": {
                     "prefix_padding_ms": 100,
                     "silence_duration_ms": 100,
                     "type": "server_vad",
                     "interrupt_response": True,
                     "create_response": True,
                 },
             }
         })
         print("--- Session Active (Speak into mic) ---")
         print("🎙  Mic ON (press t to toggle)")

         threading.Thread(target=start_keylistener, daemon=True).start()

         async with await s_runner.run() as session:
             # マイク入力を Realtime API にストリーミングし、ミュート時は無音を送信します。
             async def send_mic_audio():
                 silence = b'\x00' * CHUNK * 2  # サンプルあたり 2 バイト（16 ビット PCM）。
                 try:
                     while True:
                         raw_data = mic.read(CHUNK, exception_on_overflow=False)

                         if mic_enabled:
                             audio_data = np.frombuffer(raw_data, dtype=np.int16).astype(np.float64)
                             rms = np.sqrt(np.mean(audio_data**2))
                             meter = int(min(rms / 50, 50))
                             print(f"Mic Level: {'█' * meter}{' ' * (50-meter)} | 🎙 ON ", end="\r")
                             await session.send_audio(raw_data)
                         else:
                             print(f"Mic Level: {' ' * 50} | 🎙 OFF", end="\r")
                             await session.send_audio(silence)

                         await asyncio.sleep(0)  # 読み取りの合間にイベントループへ制御を返します。
                 except Exception:
                     pass

             # セッションからイベントを受信し、オーディオをスピーカーに転送します。
             async def handle_events():
                 async for event in session:
                     if event.type == "audio":
                         audio_output_queue.put(event.audio.data)
                     elif event.type == "audio_interrupted":
                         # キューに溜まった AI のオーディオをフラッシュして、ユーザーの発話に被らないようにします。
                         while not audio_output_queue.empty():
                             try:
                                 audio_output_queue.get_nowait()
                             except queue.Empty:
                                 break

             mic_task = asyncio.create_task(send_mic_audio())
             try:
                 await handle_events()
             finally:
                 mic_task.cancel()

         # クリーンアップ
         audio_output_queue.put(None)  # 再生スレッドに終了を通知します。
         mic.close()
         speaker.close()
         p.terminate()

     if __name__ == "__main__":
         args = parse_args()
         init_weave(args.weave_project)
         fd = sys.stdin.fileno()
         old_settings = termios.tcgetattr(fd)
         try:
             asyncio.run(main(input_device_index=args.input_device, output_device_index=args.output_device))
         finally:
             termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
     ```
   </Accordion>

3. `DEFAULT_WEAVE_PROJECT` の値をチーム名とプロジェクト名に更新します。

4. `OPENAI_API_KEY` 環境変数を設定します。

5. 音声アシスタントを起動します:
   ```bash theme={null}
   python weave_voice_assistant.py
   ```

アシスタントが起動したら、キーボードの `T` キーを押してマイクのミュートを切り替えます。アシスタントは、発話のターン管理と割り込みの処理に、サーバー側の音声アクティビティ検出を使用します。

アシスタントに話しかけると、Weave がセッションのオーディオを含むトレースを記録し、それを Weave UI で確認できます。

<div id="run-a-realtime-voice-assistant-with-websockets">
  ### WebSockets を使用してリアルタイム音声アシスタントを実行する
</div>

次の例では、WebSockets 経由で OpenAI Realtime API に直接接続します。マイクのオーディオを API にストリーミングし、音声応答を再生し、ツール呼び出し (天気の検索、数式の評価、コードの実行、ファイルの書き込み) をサポートします。Weave は `weave.init()` と `patch_openai_realtime()` を使用してセッションをトレースします。Agents SDK に依存せずに Realtime セッションを完全に制御したい場合は、この方法を使用してください。

例を実行するには、次の手順を完了してください:

1. Python 環境を起動し、次のライブラリをインストールしてください。

   <Tabs>
     <Tab title="uv">
       ```bash theme={null}
       uv add weave websockets pyaudio numpy
       ```
     </Tab>

     <Tab title="pip">
       ```bash theme={null}
       pip install weave websockets pyaudio numpy
       ```
     </Tab>
   </Tabs>

2. `tool_definitions.py` という名前のファイルを作成し、次のツール定義を追加します。メインのアプリケーションはこのモジュールをインポートします。

   <Accordion title="tool_definitions.py">
     ```python lines theme={null}
     import json
     import subprocess
     import tempfile
     from pathlib import Path
     import weave


     # @function_tool
     @weave.op
     def get_weather(city: str) -> str:
         """指定した都市の現在の天気を取得します。

         引数:
             city: 天気を取得する都市名。
         """
         return json.dumps({"city": city, "temperature": "72°F", "condition": "sunny"})


     @weave.op
     def calculate(expression: str) -> str:
         """数式を評価して結果を返します。

         引数:
             expression: 評価する数式（例: '2 + 2'）。
         """
         try:
             result = eval(expression)
             return str(result)
         except Exception as e:
             return f"Error: {e}"


     @weave.op
     def run_python_code(code: str) -> str:
         """Python スクリプトを書き出して実行し、その stdout/stderr を返します。

         引数:
             code: 実行する Python ソースコード。
         """
         with tempfile.NamedTemporaryFile(
             mode="w", suffix=".py", dir=tempfile.gettempdir(), delete=False
         ) as f:
             f.write(code)
             script_path = Path(f.name)

         try:
             result = subprocess.run(
                 ["python", str(script_path)],
                 capture_output=True,
                 text=True,
                 timeout=30,
             )
             output = result.stdout
             if result.stderr:
                 output += f"\nSTDERR:\n{result.stderr}"
             if result.returncode != 0:
                 output += f"\n(exit code {result.returncode})"
             return output or "(no output)"
         except subprocess.TimeoutExpired:
             return "Error: script timed out after 30 seconds."
         finally:
             script_path.unlink(missing_ok=True)


     @weave.op
     async def write_file(file_path: str, content: str) -> str:
         """ディスク上のファイルにコンテンツを書き込みます。

         引数:
             file_path: ファイルの書き込み先パス。
             content: ファイルに書き込むコンテンツ。
         """
         try:
             path = Path(file_path)
             path.parent.mkdir(parents=True, exist_ok=True)
             path.write_text(content)
             return f"Wrote {len(content)} bytes to {file_path}"
         except Exception as e:
             return f"Error writing file: {e}"
     ```
   </Accordion>

3. 同じディレクトリに `weave_ws_voice_assistant.py` というファイルを作成し、次のコードを追加します。

   <Accordion title="weave_ws_voice_assistant.py">
     ```python theme={null}
     import asyncio
     import base64
     import json
     import os
     import queue
     import threading
     from typing import Any, Callable

     import numpy as np
     import pyaudio
     import websockets

     import weave
     weave.init("<your-team-name/your-project-name>")
     from weave.integrations import patch_openai_realtime
     patch_openai_realtime()

     from tool_definitions import (
         calculate,
         get_weather,
         run_python_code,
         write_file,
     )

     # オーディオフォーマット（Realtime API では PCM16 を使用する必要があります）。
     FORMAT = pyaudio.paInt16
     RATE = 24000
     CHUNK = 1024
     MAX_INPUT_CHANNELS = 2
     MAX_OUTPUT_CHANNELS = 2

     OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "")
     REALTIME_URL = "wss://api.openai.com/v1/realtime?model=gpt-realtime"

     DEBUG_WRITE_LOG = False

     # ツール名 -> 関数呼び出しディスパッチ用の callable のマッピング。
     TOOL_REGISTRY: dict[str, Callable[..., Any]] = {
         "get_weather": get_weather,
         "calculate": calculate,
         "run_python_code": run_python_code,
         "write_file": write_file,
     }

     # Realtime API セッション設定用の生のツール定義。
     TOOL_DEFINITIONS = [
         {
             "type": "function",
             "name": "get_weather",
             "description": "Get the current weather for a city.",
             "parameters": {
                 "type": "object",
                 "properties": {
                     "city": {
                         "type": "string",
                         "description": "The city name to get weather for.",
                     }
                 },
                 "required": ["city"],
             },
         },
         {
             "type": "function",
             "name": "calculate",
             "description": "Evaluate a math expression and return the result.",
             "parameters": {
                 "type": "object",
                 "properties": {
                     "expression": {
                         "type": "string",
                         "description": "A math expression to evaluate, e.g. '2 + 2'.",
                     }
                 },
                 "required": ["expression"],
             },
         },
         {
             "type": "function",
             "name": "run_python_code",
             "description": "Write and execute a Python script, returning its stdout/stderr.",
             "parameters": {
                 "type": "object",
                 "properties": {
                     "code": {
                         "type": "string",
                         "description": "The Python source code to execute.",
                     }
                 },
                 "required": ["code"],
             },
         },
         {
             "type": "function",
             "name": "write_file",
             "description": "Write content to a file on disk.",
             "parameters": {
                 "type": "object",
                 "properties": {
                     "file_path": {
                         "type": "string",
                         "description": "The path to write the file to.",
                     },
                     "content": {
                         "type": "string",
                         "description": "The content to write into the file.",
                     },
                 },
                 "required": ["file_path", "content"],
             },
         },
     ]


     async def send_event(ws, event: dict) -> None:
         await ws.send(json.dumps(event))


     async def configure_session(ws) -> None:
         event = {
             "type": "session.update",
             "session": {
                 "type": "realtime",
                 "model": "gpt-realtime",
                 "output_modalities": ["audio"],
                 "instructions": (
                     "You are a helpful AI assistant with access to tools. "
                     "Use tools to accomplish tasks whenever possible. "
                     "Speak clearly and briefly."
                 ),
                 "tools": TOOL_DEFINITIONS,
                 "tool_choice": "auto",
                 "audio": {
                     "input": {
                         "format": {"type": "audio/pcm", "rate": 24000},
                         "transcription": {"model": "gpt-4o-transcribe"},
                         "turn_detection": {
                             "type": "server_vad",
                             "threshold": 0.5,
                             "prefix_padding_ms": 300,
                             "silence_duration_ms": 500,
                         },
                     },
                     "output": {
                         "format": {"type": "audio/pcm", "rate": 24000},
                     },
                 },
             },
         }
         await send_event(ws, event)
         print("Session configured.")


     async def handle_function_call(ws, call_id: str, name: str, arguments: str) -> None:
         if not name:
             raise Exception("Did not get a function name")

         print(f"\n[Function Call] {name}({arguments})")
         tool_fn = TOOL_REGISTRY.get(name)
         if tool_fn is None:
             result = json.dumps({"error": f"Unknown function: {name}"})
         else:
             try:
                 args = json.loads(arguments)
                 result = tool_fn(**args)
                 if asyncio.iscoroutine(result):
                     result = await result
             except Exception as e:
                 result = json.dumps({"error": str(e)})

         print(f"[Function Result] {result}")

         # 関数呼び出しの出力をモデルに送り返す。
         await send_event(ws, {
             "type": "conversation.item.create",
             "item": {
                 "type": "function_call_output",
                 "call_id": call_id,
                 "output": result if isinstance(result, str) else json.dumps(result),
             },
         })

         # モデルが関数の結果を反映できるよう、新しいレスポンスをトリガーする。
         await send_event(ws, {"type": "response.create"})


     def play_audio(output_stream: pyaudio.Stream, audio_output_queue: queue.Queue):
         """pyaudio の write() はサウンドカードがサンプルを消費するまでブロックするため、
         別スレッドで実行します。再生を非同期イベントループから切り離すことで、
         処理中の書き込みの完了を待たずに割り込み時にキューをフラッシュできます。"""
         while True:
             data = audio_output_queue.get()
             if data is None:
                 break
             output_stream.write(data)


     async def send_mic_audio(ws, mic) -> None:
         try:
             while True:
                 raw_data = mic.read(CHUNK, exception_on_overflow=False)

                 # ビジュアルボリュームメーター。
                 audio_data = np.frombuffer(raw_data, dtype=np.int16).astype(np.float64)
                 rms = np.sqrt(np.mean(audio_data**2))
                 meter = int(min(rms / 50, 50))
                 print(f"Mic Level: {'█' * meter}{' ' * (50 - meter)} |", end="\r")

                 # Base64 エンコードしてオーディオチャンクを送信する。
                 b64_audio = base64.b64encode(raw_data).decode("utf-8")
                 await send_event(ws, {
                     "type": "input_audio_buffer.append",
                     "audio": b64_audio,
                 })

                 await asyncio.sleep(0)
         except asyncio.CancelledError:
             pass


     async def receive_events(ws, audio_output_queue: queue.Queue) -> None:
         # デルタイベントをまたいで関数呼び出しの引数を蓄積する。
         pending_calls: dict[str, dict] = {}
         async for raw_message in ws:
             if DEBUG_WRITE_LOG:
                 with open("data.jsonl", "a", encoding="utf-8") as f:
                     f.write(json.dumps(raw_message) + "\n")

             event = json.loads(raw_message)
             event_type = event.get("type", "")

             if event_type == "session.created":
                 print(raw_message)

             elif event_type == "session.updated":
                 print(raw_message)

             elif event_type == "error":
                 print(f"\n[Error] {event}")

             elif event_type == "input_audio_buffer.speech_started":
                 # キューに溜まった AI オーディオをフラッシュして、ユーザーの発話に被らないようにする。
                 while not audio_output_queue.empty():
                     try:
                         audio_output_queue.get_nowait()
                     except queue.Empty:
                         break

             elif event_type == "input_audio_buffer.speech_stopped":
                 pass

             elif event_type == "input_audio_buffer.committed":
                 pass

             elif event_type == "response.created":
                 pass

             elif event_type == "response.output_text.delta":
                 pass

             elif event_type == "response.output_text.done":
                 pass

             # オーディオ出力デルタ - 再生用にキューに追加する。
             elif event_type == "response.output_audio.delta":
                 audio_bytes = base64.b64decode(event.get("delta", ""))
                 audio_output_queue.put(audio_bytes)

             elif event_type == "response.output_audio_transcript.delta":
                 pass

             elif event_type == "response.output_audio_transcript.done":
                 pass

             # 関数呼び出し開始 - 保留中の呼び出しを初期化する。
             elif event_type == "response.output_item.added":
                 item = event.get("item", {})
                 if item.get("type") == "function_call" and item.get("status") == "in_progress":
                     item_id = item.get("id", "")
                     pending_calls[item_id] = {
                         "call_id": item.get("call_id", ""),
                         "name": item.get("name", ""),
                         "arguments": "",
                     }
                     print(f"\n[Function Call Started] {item.get('name', '')}")

             # 関数呼び出しの引数デルタ - 蓄積する。
             elif event_type == "response.function_call_arguments.delta":
                 item_id = event.get("item_id", "")
                 if item_id in pending_calls:
                     pending_calls[item_id]["arguments"] += event.get("delta", "")

             elif event_type == "response.function_call_arguments.done":
                 item_id = event.get("item_id", "")
                 call_info = pending_calls.pop(item_id, None)
                 if call_info is None:
                     # Fallback: use data directly from the done event.
                     call_info = {
                         "call_id": event.get("call_id"),
                         "name": event.get("name"),
                         "arguments": event.get("arguments"),
                     }
                 try:
                     await handle_function_call(
                         ws,
                         call_info["call_id"],
                         call_info["name"],
                         call_info["arguments"],
                     )
                 except Exception as e:
                     print(f"Failed to call function for message {call_info}: error - {e}")

             elif event_type == "response.done":
                 pass

             elif event_type == "rate_limits.updated":
                 pass

             else:
                 print(f"\n[Event: {event_type}]")


     async def main():
         if not OPENAI_API_KEY:
             print("Error: OPENAI_API_KEY environment variable not set")
             return

         p = pyaudio.PyAudio()

         input_device_index = int(p.get_default_input_device_info()['index'])
         output_device_index = int(p.get_default_output_device_info()['index'])

         # Channel count must match the device's capabilities or pyaudio errors upon open.
         input_info = p.get_device_info_by_index(input_device_index)
         output_info = p.get_device_info_by_index(output_device_index)
         input_channels = min(int(input_info['maxInputChannels']), 1)
         output_channels = min(int(output_info['maxOutputChannels']), 1)

         mic = p.open(
             format=FORMAT,
             channels=input_channels,
             rate=RATE,
             input=True,
             output=False,
             frames_per_buffer=CHUNK,
             input_device_index=input_device_index,
             start=False,
         )
         speaker = p.open(
             format=FORMAT,
             channels=output_channels,
             rate=RATE,
             input=False,
             output=True,
             frames_per_buffer=CHUNK,
             output_device_index=output_device_index,
             start=False,
         )
         mic.start_stream()
         speaker.start_stream()

         # オーディオをキュー経由にすることで、ユーザーが割り込んだときにフラッシュできます。
         # スピーカーに直接書き込むと、再生中のオーディオをキャンセルできなくなります。
         audio_output_queue = queue.Queue()
         threading.Thread(
             target=play_audio, args=(speaker, audio_output_queue), daemon=True
         ).start()

         headers = {
             "Authorization": f"Bearer {OPENAI_API_KEY}",
         }

         print("Connecting to OpenAI Realtime API...")

         async with websockets.connect(
             REALTIME_URL,
             additional_headers=headers,
         ) as ws:
             print("Connected! Configuring session...")
             await configure_session(ws)

             print("--- Session Active (Speak into mic) ---")

             mic_task = asyncio.create_task(send_mic_audio(ws, mic))
             try:
                 await receive_events(ws, audio_output_queue)
             finally:
                 mic_task.cancel()
                 try:
                     await mic_task
                 except asyncio.CancelledError:
                     pass

         # Cleanup
         audio_output_queue.put(None)  # Signal playback thread to exit.
         mic.close()
         speaker.close()
         p.terminate()
         print("\nSession ended.")


     if __name__ == "__main__":
         asyncio.run(main())
     ```
   </Accordion>

4. `weave.init()` の呼び出しを、チーム名とプロジェクト名に合わせて更新します。

5. `OPENAI_API_KEY` 環境変数を設定してください。

6. 音声アシスタントを起動します:

   ```bash theme={null}
   python weave_ws_voice_assistant.py
   ```

アシスタントの実行中は、キーボードの `T` キーを押してマイクのミュートを切り替えます。アシスタントは、発話のターン管理と割り込みの処理に、サーバー側の音声アクティビティ検出を使用します。

アシスタントに話しかけると、Weave がセッションのオーディオを含むトレースを記録し、それを Weave UI で確認できます。
