これは対話型ノートブックです。ローカルで実行するか、以下のリンクを使用できます。
オーディオデータで Weave を使う方法: OpenAI の例

セットアップ
openai) とWeave (weave) の依存関係、および APIキー管理用の依存関係である set-env をインストールします。
%%capture
!pip install openai
!pip install weave
!pip install set-env-colab-kaggle-dotenv -q # 環境変数用
python
%%capture
# openai の bug を修正するための一時的な回避策:
# TypeError: Client.__init__() got an unexpected keyword argument 'proxies'
# 参照: https://community.openai.com/t/error-with-openai-1-56-0-client-init-got-an-unexpected-keyword-argument-proxies/1040332/15
!pip install "httpx<0.28"
google.colab.userdata の代替となる set_env を使用します。使用方法については、こちらを参照してください。
# 環境変数を設定する。
from set_env import set_env
_ = set_env("OPENAI_API_KEY")
_ = set_env("WANDB_API_KEY")
import base64
import os
import time
import wave
import numpy as np
from IPython.display import display
from openai import OpenAI
import weave
オーディオのストリーミングと保存の例
client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
weave.init("openai-audio-chat")
op) を追加します。
ここでは、関数 prompt_endpont_and_log_trace を定義します。この関数は、主に 3 つの step で構成されています。
-
テキストとオーディオの入力と出力をサポートする
GPT 4o Audio Previewモデルを使用して、completion オブジェクトを作成します。- モデルに、アクセントを変えながらゆっくり 13 まで数えるよう指示します。
- completion を “stream” に設定します。
- ストリーミングされたデータをチャンクごとに書き込むための新しい出力ファイルを開きます。
- Weave がトレースにオーディオデータをログするように、オーディオファイルの開いたファイルハンドラを返します。
SAMPLE_RATE = 22050
@weave.op()
def prompt_endpoint_and_log_trace(system_prompt=None, user_prompt=None):
if not system_prompt:
system_prompt = "You're the fastest counter in the world"
if not user_prompt:
user_prompt = "Count to 13 super super slow, enunciate each number with a dramatic flair, changing up accents as you go along. British, French, German, Spanish, etc."
# オーディオモダリティを使用してOpenAI APIにリクエスト
completion = client.chat.completions.create(
model="gpt-4o-audio-preview",
modalities=["text", "audio"],
audio={"voice": "fable", "format": "pcm16"},
stream=True,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
)
# 書き込み用にwaveファイルを開く
with wave.open("./output.wav", "wb") as wav_file:
wav_file.setnchannels(1) # モノラル
wav_file.setsampwidth(2) # 16ビット
wav_file.setframerate(SAMPLE_RATE) # サンプルレート(必要に応じて調整)
# APIからストリーミングされたチャンクを順次書き込む
for chunk in completion:
if (
hasattr(chunk, "choices")
and chunk.choices is not None
and len(chunk.choices) > 0
and hasattr(chunk.choices[0].delta, "audio")
and chunk.choices[0].delta.audio.get("data") is not None
):
# base64オーディオデータをデコード
audio_data = base64.b64decode(chunk.choices[0].delta.audio.get("data"))
# 現在のチャンクをwaveファイルに書き込む
wav_file.writeframes(audio_data)
# Weave opにファイルを返す
return wave.open("output.wav", "rb")
テスト
from IPython.display import Audio
# オーディオストリームを書き込む関数を呼び出す
prompt_endpoint_and_log_trace(
system_prompt="You're the fastest counter in the world",
user_prompt="Count to 13 super super slow, enunciate each number with a dramatic flair, changing up accents as you go along. British, French, German, Spanish, etc.",
)
# 更新されたオーディオストリームを表示する
display(Audio("output.wav", rate=SAMPLE_RATE, autoplay=True))
高度な使用方法: Weave での Realtime Audio API

- マイクの設定 のセルを確認してください
- Google Colab の実行環境には制約があるため、これはお使いのマシン上で Jupyter Notebook として実行する必要があります。ブラウザでは実行できません。
- MacOS では、Pyaudio を動作させるために Brew 経由で
portaudioをインストールする必要があります (こちらを参照) 。
- MacOS では、Pyaudio を動作させるために Brew 経由で
enable_audio_playbackトグルを有効にすると、アシスタントが出力した音声が再生されます。エコー検出の実装は非常に複雑なため、これを有効にする場合はヘッドホンが必要です。
必要な環境の設定
%%capture
!pip install numpy==2.0
!pip install weave
!pip install pyaudio # Macの場合、先に`brew install portaudio`でportaudioをインストールする必要があります
!pip install websocket-client
!pip install set-env-colab-kaggle-dotenv -q # 環境変数用
!pip install resampy
python
import io
import json
import os
import threading
from typing import Optional
import pyaudio
import resampy
import websocket
from set_env import set_env
import weave
python
# 環境変数を設定します。
# 使用方法については https://pypi.org/project/set-env-colab-kaggle-dotenv/ を参照してください。
_ = set_env("OPENAI_API_KEY")
_ = set_env("WANDB_API_KEY")
マイクの設定
INPUT_DEVICE_INDEX と OUTPUT_DEVICE_INDEX を設定します。入力デバイスには少なくとも 1 つの入力チャネルがあり、出力デバイスには少なくとも 1 つの出力チャネルがあります。
# 次のセルを設定するために pyaudio からデバイスリストを取得する
p = pyaudio.PyAudio()
devices_data = {i: p.get_device_info_by_index(i) for i in range(p.get_device_count())}
for i, device in devices_data.items():
print(
f"Found device @{i}: {device['name']} with sample rate: {device['defaultSampleRate']} and input channels: {device['maxInputChannels']} and output channels: {device['maxOutputChannels']}"
)
python
INPUT_DEVICE_INDEX = 3 # @param # 上記のデバイスリストから選択する。デバイスの入力チャンネル数が 0 より多いことを確認すること。
OUTPUT_DEVICE_INDEX = 12 # @param # 上記のデバイスリストから選択する。デバイスの出力チャンネル数が 0 より多いことを確認すること。
enable_audio_playback = True # @param {type:"boolean"} # アシスタントのオーディオ再生をオンにする。ヘッドフォンが必要。
# オーディオ録音およびストリーミングのパラメーター
INPUT_DEVICE_CHANNELS = devices_data[INPUT_DEVICE_INDEX][
"maxInputChannels"
] # 上記のデバイスリストより
SAMPLE_RATE = int(
devices_data[INPUT_DEVICE_INDEX]["defaultSampleRate"]
) # 上記のデバイスリストより
CHUNK = int(SAMPLE_RATE / 10) # フレームあたりのサンプル数
SAMPLE_WIDTH = p.get_sample_size(pyaudio.paInt16) # フォーマットのフレームあたりのサンプル数
CHUNK_DURATION = 0.3 # OAI API に送信する 1 チャンクあたりのオーディオ秒数
OAI_SAMPLE_RATE = (
24000 # OAI のサンプルレートは 24kHz。アシスタントのオーディオを再生または保存する際に必要
)
OUTPUT_DEVICE_CHANNELS = 1 # モノラル出力の場合は 1 に設定する
OpenAI Realtime API スキーマ実装
OpenAI Realtime API の Pydantic スキーマ
from enum import Enum
from typing import Any, Literal, Union
from pydantic import BaseModel, Field, ValidationError
class BaseEvent(BaseModel):
type: Union["ClientEventTypes", "ServerEventTypes"]
event_id: Optional[str] = None # すべてのイベントに対してオプションフィールドとして event_id を追加
# def model_dump_json(self, *args, **kwargs):
# # None でないフィールドのみを含める
# return super().model_dump_json(*args, exclude_none=True, **kwargs)
class ChatMessage(BaseModel):
role: Literal["user", "assistant"]
content: str
timestamp: float
""" CLIENT EVENTS """
class ClientEventTypes(str, Enum):
SESSION_UPDATE = "session.update"
CONVERSATION_ITEM_CREATE = "conversation.item.create"
CONVERSATION_ITEM_TRUNCATE = "conversation.item.truncate"
CONVERSATION_ITEM_DELETE = "conversation.item.delete"
RESPONSE_CREATE = "response.create"
RESPONSE_CANCEL = "response.cancel"
INPUT_AUDIO_BUFFER_APPEND = "input_audio_buffer.append"
INPUT_AUDIO_BUFFER_COMMIT = "input_audio_buffer.commit"
INPUT_AUDIO_BUFFER_CLEAR = "input_audio_buffer.clear"
ERROR = "error"
#### Session Update
class TurnDetection(BaseModel):
type: Literal["server_vad"]
threshold: float = Field(..., ge=0.0, le=1.0)
prefix_padding_ms: int
silence_duration_ms: int
class InputAudioTranscription(BaseModel):
model: Optional[str] = None
class ToolParameterProperty(BaseModel):
type: str
class ToolParameter(BaseModel):
type: str
properties: dict[str, ToolParameterProperty]
required: list[str]
class Tool(BaseModel):
type: Literal["function", "code_interpreter", "file_search"]
name: Optional[str] = None
description: Optional[str] = None
parameters: Optional[ToolParameter] = None
class Session(BaseModel):
modalities: Optional[list[str]] = None
instructions: Optional[str] = None
voice: Optional[str] = None
input_audio_format: Optional[str] = None
output_audio_format: Optional[str] = None
input_audio_transcription: Optional[InputAudioTranscription] = None
turn_detection: Optional[TurnDetection] = None
tools: Optional[list[Tool]] = None
tool_choice: Optional[str] = None
temperature: Optional[float] = None
max_output_tokens: Optional[int] = None
class SessionUpdate(BaseEvent):
type: Literal[ClientEventTypes.SESSION_UPDATE] = ClientEventTypes.SESSION_UPDATE
session: Session
#### Audio Buffers
class InputAudioBufferAppend(BaseEvent):
type: Literal[ClientEventTypes.INPUT_AUDIO_BUFFER_APPEND] = (
ClientEventTypes.INPUT_AUDIO_BUFFER_APPEND
)
audio: str
class InputAudioBufferCommit(BaseEvent):
type: Literal[ClientEventTypes.INPUT_AUDIO_BUFFER_COMMIT] = (
ClientEventTypes.INPUT_AUDIO_BUFFER_COMMIT
)
class InputAudioBufferClear(BaseEvent):
type: Literal[ClientEventTypes.INPUT_AUDIO_BUFFER_CLEAR] = (
ClientEventTypes.INPUT_AUDIO_BUFFER_CLEAR
)
#### Messages
class MessageContent(BaseModel):
type: Literal["input_audio"]
audio: str
class ConversationItemContent(BaseModel):
type: Literal["input_text", "input_audio", "text", "audio"]
text: Optional[str] = None
audio: Optional[str] = None
transcript: Optional[str] = None
class FunctionCallContent(BaseModel):
call_id: str
name: str
arguments: str
class FunctionCallOutputContent(BaseModel):
output: str
class ConversationItem(BaseModel):
id: Optional[str] = None
type: Literal["message", "function_call", "function_call_output"]
status: Optional[Literal["completed", "in_progress", "incomplete"]] = None
role: Literal["user", "assistant", "system"]
content: list[
Union[ConversationItemContent, FunctionCallContent, FunctionCallOutputContent]
]
call_id: Optional[str] = None
name: Optional[str] = None
arguments: Optional[str] = None
output: Optional[str] = None
class ConversationItemCreate(BaseEvent):
type: Literal[ClientEventTypes.CONVERSATION_ITEM_CREATE] = (
ClientEventTypes.CONVERSATION_ITEM_CREATE
)
item: ConversationItem
class ConversationItemTruncate(BaseEvent):
type: Literal[ClientEventTypes.CONVERSATION_ITEM_TRUNCATE] = (
ClientEventTypes.CONVERSATION_ITEM_TRUNCATE
)
item_id: str
content_index: int
audio_end_ms: int
class ConversationItemDelete(BaseEvent):
type: Literal[ClientEventTypes.CONVERSATION_ITEM_DELETE] = (
ClientEventTypes.CONVERSATION_ITEM_DELETE
)
item_id: str
#### Responses
class ResponseCreate(BaseEvent):
type: Literal[ClientEventTypes.RESPONSE_CREATE] = ClientEventTypes.RESPONSE_CREATE
class ResponseCancel(BaseEvent):
type: Literal[ClientEventTypes.RESPONSE_CANCEL] = ClientEventTypes.RESPONSE_CANCEL
# すべてのイベントタイプを含むように Event ユニオンを更新
ClientEvent = Union[
SessionUpdate,
InputAudioBufferAppend,
InputAudioBufferCommit,
InputAudioBufferClear,
ConversationItemCreate,
ConversationItemTruncate,
ConversationItemDelete,
ResponseCreate,
ResponseCancel,
]
""" SERVER EVENTS """
class ServerEventTypes(str, Enum):
ERROR = "error"
RESPONSE_AUDIO_TRANSCRIPT_DONE = "response.audio_transcript.done"
RESPONSE_AUDIO_TRANSCRIPT_DELTA = "response.audio_transcript.delta"
RESPONSE_AUDIO_DELTA = "response.audio.delta"
SESSION_CREATED = "session.created"
SESSION_UPDATED = "session.updated"
CONVERSATION_CREATED = "conversation.created"
INPUT_AUDIO_BUFFER_COMMITTED = "input_audio_buffer.committed"
INPUT_AUDIO_BUFFER_CLEARED = "input_audio_buffer.cleared"
INPUT_AUDIO_BUFFER_SPEECH_STARTED = "input_audio_buffer.speech_started"
INPUT_AUDIO_BUFFER_SPEECH_STOPPED = "input_audio_buffer.speech_stopped"
CONVERSATION_ITEM_CREATED = "conversation.item.created"
CONVERSATION_ITEM_INPUT_AUDIO_TRANSCRIPTION_COMPLETED = (
"conversation.item.input_audio_transcription.completed"
)
CONVERSATION_ITEM_INPUT_AUDIO_TRANSCRIPTION_FAILED = (
"conversation.item.input_audio_transcription.failed"
)
CONVERSATION_ITEM_TRUNCATED = "conversation.item.truncated"
CONVERSATION_ITEM_DELETED = "conversation.item.deleted"
RESPONSE_CREATED = "response.created"
RESPONSE_DONE = "response.done"
RESPONSE_OUTPUT_ITEM_ADDED = "response.output_item.added"
RESPONSE_OUTPUT_ITEM_DONE = "response.output_item.done"
RESPONSE_CONTENT_PART_ADDED = "response.content_part.added"
RESPONSE_CONTENT_PART_DONE = "response.content_part.done"
RESPONSE_TEXT_DELTA = "response.text.delta"
RESPONSE_TEXT_DONE = "response.text.done"
RESPONSE_AUDIO_DONE = "response.audio.done"
RESPONSE_FUNCTION_CALL_ARGUMENTS_DELTA = "response.function_call_arguments.delta"
RESPONSE_FUNCTION_CALL_ARGUMENTS_DONE = "response.function_call_arguments.done"
RATE_LIMITS_UPDATED = "rate_limits.updated"
#### Errors
class ErrorDetails(BaseModel):
type: Optional[str] = None
code: Optional[str] = None
message: Optional[str] = None
param: Optional[str] = None
class ErrorEvent(BaseEvent):
type: Literal[ServerEventTypes.ERROR] = ServerEventTypes.ERROR
error: ErrorDetails
#### Session
class SessionCreated(BaseEvent):
type: Literal[ServerEventTypes.SESSION_CREATED] = ServerEventTypes.SESSION_CREATED
session: Session
class SessionUpdated(BaseEvent):
type: Literal[ServerEventTypes.SESSION_UPDATED] = ServerEventTypes.SESSION_UPDATED
session: Session
#### Conversation
class Conversation(BaseModel):
id: str
object: Literal["realtime.conversation"]
class ConversationCreated(BaseEvent):
type: Literal[ServerEventTypes.CONVERSATION_CREATED] = (
ServerEventTypes.CONVERSATION_CREATED
)
conversation: Conversation
class ConversationItemCreated(BaseEvent):
type: Literal[ServerEventTypes.CONVERSATION_ITEM_CREATED] = (
ServerEventTypes.CONVERSATION_ITEM_CREATED
)
previous_item_id: Optional[str] = None
item: ConversationItem
class ConversationItemInputAudioTranscriptionCompleted(BaseEvent):
type: Literal[
ServerEventTypes.CONVERSATION_ITEM_INPUT_AUDIO_TRANSCRIPTION_COMPLETED
] = ServerEventTypes.CONVERSATION_ITEM_INPUT_AUDIO_TRANSCRIPTION_COMPLETED
item_id: str
content_index: int
transcript: str
class ConversationItemInputAudioTranscriptionFailed(BaseEvent):
type: Literal[
ServerEventTypes.CONVERSATION_ITEM_INPUT_AUDIO_TRANSCRIPTION_FAILED
] = ServerEventTypes.CONVERSATION_ITEM_INPUT_AUDIO_TRANSCRIPTION_FAILED
item_id: str
content_index: int
error: dict[str, Any]
class ConversationItemTruncated(BaseEvent):
type: Literal[ServerEventTypes.CONVERSATION_ITEM_TRUNCATED] = (
ServerEventTypes.CONVERSATION_ITEM_TRUNCATED
)
item_id: str
content_index: int
audio_end_ms: int
class ConversationItemDeleted(BaseEvent):
type: Literal[ServerEventTypes.CONVERSATION_ITEM_DELETED] = (
ServerEventTypes.CONVERSATION_ITEM_DELETED
)
item_id: str
#### レスポンス
class ResponseUsage(BaseModel):
total_tokens: int
input_tokens: int
output_tokens: int
input_token_details: Optional[dict[str, int]] = None
output_token_details: Optional[dict[str, int]] = None
class ResponseOutput(BaseModel):
id: str
object: Literal["realtime.item"]
type: str
status: str
role: str
content: list[dict[str, Any]]
class ResponseContentPart(BaseModel):
type: str
text: Optional[str] = None
class ResponseOutputItemContent(BaseModel):
type: str
text: Optional[str] = None
class ResponseStatusDetails(BaseModel):
type: str
reason: str
class ResponseOutputItem(BaseModel):
id: str
object: Literal["realtime.item"]
type: str
status: str
role: str
content: list[ResponseOutputItemContent]
class Response(BaseModel):
id: str
object: Literal["realtime.response"]
status: str
status_details: Optional[ResponseStatusDetails] = None
output: list[ResponseOutput]
usage: Optional[ResponseUsage]
class ResponseCreated(BaseEvent):
type: Literal[ServerEventTypes.RESPONSE_CREATED] = ServerEventTypes.RESPONSE_CREATED
response: Response
class ResponseDone(BaseEvent):
type: Literal[ServerEventTypes.RESPONSE_DONE] = ServerEventTypes.RESPONSE_DONE
response: Response
class ResponseOutputItemAdded(BaseEvent):
type: Literal[ServerEventTypes.RESPONSE_OUTPUT_ITEM_ADDED] = (
ServerEventTypes.RESPONSE_OUTPUT_ITEM_ADDED
)
response_id: str
output_index: int
item: ResponseOutputItem
class ResponseOutputItemDone(BaseEvent):
type: Literal[ServerEventTypes.RESPONSE_OUTPUT_ITEM_DONE] = (
ServerEventTypes.RESPONSE_OUTPUT_ITEM_DONE
)
response_id: str
output_index: int
item: ResponseOutputItem
class ResponseContentPartAdded(BaseEvent):
type: Literal[ServerEventTypes.RESPONSE_CONTENT_PART_ADDED] = (
ServerEventTypes.RESPONSE_CONTENT_PART_ADDED
)
response_id: str
item_id: str
output_index: int
content_index: int
part: ResponseContentPart
class ResponseContentPartDone(BaseEvent):
type: Literal[ServerEventTypes.RESPONSE_CONTENT_PART_DONE] = (
ServerEventTypes.RESPONSE_CONTENT_PART_DONE
)
response_id: str
item_id: str
output_index: int
content_index: int
part: ResponseContentPart
#### レスポンステキスト
class ResponseTextDelta(BaseEvent):
type: Literal[ServerEventTypes.RESPONSE_TEXT_DELTA] = (
ServerEventTypes.RESPONSE_TEXT_DELTA
)
response_id: str
item_id: str
output_index: int
content_index: int
delta: str
class ResponseTextDone(BaseEvent):
type: Literal[ServerEventTypes.RESPONSE_TEXT_DONE] = (
ServerEventTypes.RESPONSE_TEXT_DONE
)
response_id: str
item_id: str
output_index: int
content_index: int
text: str
#### レスポンスオーディオ
class ResponseAudioTranscriptDone(BaseEvent):
type: Literal[ServerEventTypes.RESPONSE_AUDIO_TRANSCRIPT_DONE] = (
ServerEventTypes.RESPONSE_AUDIO_TRANSCRIPT_DONE
)
transcript: str
class ResponseAudioTranscriptDelta(BaseEvent):
type: Literal[ServerEventTypes.RESPONSE_AUDIO_TRANSCRIPT_DELTA] = (
ServerEventTypes.RESPONSE_AUDIO_TRANSCRIPT_DELTA
)
delta: str
class ResponseAudioDelta(BaseEvent):
type: Literal[ServerEventTypes.RESPONSE_AUDIO_DELTA] = (
ServerEventTypes.RESPONSE_AUDIO_DELTA
)
response_id: str
item_id: str
delta: str
class ResponseAudioDone(BaseEvent):
type: Literal[ServerEventTypes.RESPONSE_AUDIO_DONE] = (
ServerEventTypes.RESPONSE_AUDIO_DONE
)
response_id: str
item_id: str
output_index: int
content_index: int
class InputAudioBufferCommitted(BaseEvent):
type: Literal[ServerEventTypes.INPUT_AUDIO_BUFFER_COMMITTED] = (
ServerEventTypes.INPUT_AUDIO_BUFFER_COMMITTED
)
previous_item_id: Optional[str] = None
item_id: Optional[str] = None
event_id: Optional[str] = None
class InputAudioBufferCleared(BaseEvent):
type: Literal[ServerEventTypes.INPUT_AUDIO_BUFFER_CLEARED] = (
ServerEventTypes.INPUT_AUDIO_BUFFER_CLEARED
)
class InputAudioBufferSpeechStarted(BaseEvent):
type: Literal[ServerEventTypes.INPUT_AUDIO_BUFFER_SPEECH_STARTED] = (
ServerEventTypes.INPUT_AUDIO_BUFFER_SPEECH_STARTED
)
audio_start_ms: int
item_id: str
class InputAudioBufferSpeechStopped(BaseEvent):
type: Literal[ServerEventTypes.INPUT_AUDIO_BUFFER_SPEECH_STOPPED] = (
ServerEventTypes.INPUT_AUDIO_BUFFER_SPEECH_STOPPED
)
audio_end_ms: int
item_id: str
#### 関数呼び出し
class ResponseFunctionCallArgumentsDelta(BaseEvent):
type: Literal[ServerEventTypes.RESPONSE_FUNCTION_CALL_ARGUMENTS_DELTA] = (
ServerEventTypes.RESPONSE_FUNCTION_CALL_ARGUMENTS_DELTA
)
response_id: str
item_id: str
output_index: int
call_id: str
delta: str
class ResponseFunctionCallArgumentsDone(BaseEvent):
type: Literal[ServerEventTypes.RESPONSE_FUNCTION_CALL_ARGUMENTS_DONE] = (
ServerEventTypes.RESPONSE_FUNCTION_CALL_ARGUMENTS_DONE
)
response_id: str
item_id: str
output_index: int
call_id: str
arguments: str
#### レート制限
class RateLimit(BaseModel):
name: str
limit: int
remaining: int
reset_seconds: float
class RateLimitsUpdated(BaseEvent):
type: Literal[ServerEventTypes.RATE_LIMITS_UPDATED] = (
ServerEventTypes.RATE_LIMITS_UPDATED
)
rate_limits: list[RateLimit]
ServerEvent = Union[
ErrorEvent,
ConversationCreated,
ResponseAudioTranscriptDone,
ResponseAudioTranscriptDelta,
ResponseAudioDelta,
ResponseCreated,
ResponseDone,
ResponseOutputItemAdded,
ResponseOutputItemDone,
ResponseContentPartAdded,
ResponseContentPartDone,
ResponseTextDelta,
ResponseTextDone,
ResponseAudioDone,
ConversationItemInputAudioTranscriptionCompleted,
SessionCreated,
SessionUpdated,
InputAudioBufferCleared,
InputAudioBufferSpeechStarted,
InputAudioBufferSpeechStopped,
ConversationItemCreated,
ConversationItemInputAudioTranscriptionFailed,
ConversationItemTruncated,
ConversationItemDeleted,
RateLimitsUpdated,
]
EVENT_TYPE_TO_MODEL = {
ServerEventTypes.ERROR: ErrorEvent,
ServerEventTypes.RESPONSE_AUDIO_TRANSCRIPT_DONE: ResponseAudioTranscriptDone,
ServerEventTypes.RESPONSE_AUDIO_TRANSCRIPT_DELTA: ResponseAudioTranscriptDelta,
ServerEventTypes.RESPONSE_AUDIO_DELTA: ResponseAudioDelta,
ServerEventTypes.CONVERSATION_ITEM_INPUT_AUDIO_TRANSCRIPTION_COMPLETED: ConversationItemInputAudioTranscriptionCompleted,
ServerEventTypes.SESSION_CREATED: SessionCreated,
ServerEventTypes.SESSION_UPDATED: SessionUpdated,
ServerEventTypes.CONVERSATION_CREATED: ConversationCreated,
ServerEventTypes.INPUT_AUDIO_BUFFER_COMMITTED: InputAudioBufferCommitted,
ServerEventTypes.INPUT_AUDIO_BUFFER_CLEARED: InputAudioBufferCleared,
ServerEventTypes.INPUT_AUDIO_BUFFER_SPEECH_STARTED: InputAudioBufferSpeechStarted,
ServerEventTypes.INPUT_AUDIO_BUFFER_SPEECH_STOPPED: InputAudioBufferSpeechStopped,
ServerEventTypes.CONVERSATION_ITEM_CREATED: ConversationItemCreated,
ServerEventTypes.CONVERSATION_ITEM_INPUT_AUDIO_TRANSCRIPTION_FAILED: ConversationItemInputAudioTranscriptionFailed,
ServerEventTypes.CONVERSATION_ITEM_TRUNCATED: ConversationItemTruncated,
ServerEventTypes.CONVERSATION_ITEM_DELETED: ConversationItemDeleted,
ServerEventTypes.RESPONSE_CREATED: ResponseCreated,
ServerEventTypes.RESPONSE_DONE: ResponseDone,
ServerEventTypes.RESPONSE_OUTPUT_ITEM_ADDED: ResponseOutputItemAdded,
ServerEventTypes.RESPONSE_OUTPUT_ITEM_DONE: ResponseOutputItemDone,
ServerEventTypes.RESPONSE_CONTENT_PART_ADDED: ResponseContentPartAdded,
ServerEventTypes.RESPONSE_CONTENT_PART_DONE: ResponseContentPartDone,
ServerEventTypes.RESPONSE_TEXT_DELTA: ResponseTextDelta,
ServerEventTypes.RESPONSE_TEXT_DONE: ResponseTextDone,
ServerEventTypes.RESPONSE_AUDIO_DONE: ResponseAudioDone,
ServerEventTypes.RATE_LIMITS_UPDATED: RateLimitsUpdated,
}
def parse_server_event(event_data: dict) -> ServerEvent:
event_type = event_data.get("type")
if not event_type:
raise ValueError("イベントデータに 'type' フィールドがありません")
model_class = EVENT_TYPE_TO_MODEL.get(event_type)
if not model_class:
raise ValueError(f"不明なイベントタイプ: {event_type}")
try:
return model_class(**event_data)
except ValidationError as e:
raise ValueError(f"タイプ {event_type} のイベントの解析に失敗しました: {str(e)}") from e
オーディオストリームライター (ディスクおよびメモリ内への書き込み)
class StreamingWavWriter:
"""オーディオの整数またはバイト配列チャンクをWAVファイルに書き込む。"""
wav_file = None
buffer = None
in_memory = False
def __init__(
self,
filename=None,
channels=INPUT_DEVICE_CHANNELS,
sample_width=SAMPLE_WIDTH,
framerate=SAMPLE_RATE,
):
self.in_memory = filename is None
if self.in_memory:
self.buffer = io.BytesIO()
self.wav_file = wave.open(self.buffer, "wb")
else:
self.wav_file = wave.open(filename, "wb")
self.wav_file.setnchannels(channels)
self.wav_file.setsampwidth(sample_width)
self.wav_file.setframerate(framerate)
def append_int16_chunk(self, int16_data):
if int16_data is not None:
self.wav_file.writeframes(
int16_data.tobytes()
if isinstance(int16_data, np.ndarray)
else int16_data
)
def close(self):
self.wav_file.close()
def get_wav_buffer(self):
assert self.in_memory, "バッファはストリームがメモリ内にある場合のみ利用可能です。"
return self.buffer
Realtime オーディオモデル
- init: ローカルバッファ (入力オーディオ) とストリーム (assistant の再生ストリーム、ユーザーオーディオのディスク書き込みストリーム) を初期化し、Realtime API への接続を開きます。
-
receive_messages_thread: API からのメッセージ受信を処理するスレッドです。主に次の 4 種類のイベントを処理します。 - RESPONSE_AUDIO_TRANSCRIPT_DONE:
サーバーは assistant の応答が完了したことを示し、その文字起こしを返します。
- CONVERSATION_ITEM_INPUT_AUDIO_TRANSCRIPTION_COMPLETED: サーバーはユーザーのオーディオの文字起こしが完了したことを示し、その文字起こし結果を送信します。これを Weave にログし、ユーザー向けに表示します。
- RESPONSE_AUDIO_DELTA: サーバーは assistant の応答オーディオの新しいチャンクを送信します。これを response ID に対応する進行中の応答データに追加し、再生用の出力ストリームにも追加します。
- RESPONSE_DONE: サーバーは assistant の応答が完了したことを示します。応答に関連付けられたすべてのオーディオチャンクと文字起こしを取得し、これらを Weave にログします。
class RTAudioModel(weave.Model):
"""ログ記録用のWhisperユーザー文字起こしを使用したリアルタイムe2eオーディオOpenAIモデルインタラクションのモデルクラス。"""
realtime_model_name: str = "gpt-4o-realtime-preview-2024-10-01" # リアルタイムe2eオーディオ専用モデルインタラクション
stop_event: Optional[threading.Event] = threading.Event() # モデルを停止するイベント
ws: Optional[websocket.WebSocket] = None # OpenAI通信用WebSocket
user_wav_writer: Optional[StreamingWavWriter] = (
None # ユーザー出力をファイルに書き込むストリーム
)
input_audio_buffer: Optional[np.ndarray] = None # ユーザーオーディオチャンク用バッファ
assistant_outputs: dict[str, StreamingWavWriter] = (
None # Weaveに送信するためにまとめたアシスタント出力
)
playback_stream: Optional[pyaudio.Stream] = (
None # アシスタントの応答を再生するための再生ストリーム
)
def __init__(self):
super().__init__()
self.stop_event.clear()
self.user_wav_writer = StreamingWavWriter(
filename="user_audio.wav", framerate=SAMPLE_RATE
)
self.input_audio_buffer = np.array([], dtype=np.int16)
self.ws = websocket.WebSocket()
self.assistant_outputs = {}
# 有効な場合、アシスタントオーディオ再生ストリームを開く
if enable_audio_playback:
self.playback_stream = pyaudio.PyAudio().open(
format=pyaudio.paInt16,
channels=OUTPUT_DEVICE_CHANNELS,
rate=OAI_SAMPLE_RATE,
output=True,
output_device_index=OUTPUT_DEVICE_INDEX,
)
# WebSocketに接続
try:
self.ws.connect(
f"wss://api.openai.com/v1/realtime?model={self.realtime_model_name}",
header={
"Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}",
"OpenAI-Beta": "realtime=v1",
},
)
# 設定メッセージを送信
config_event = SessionUpdate(
session=Session(
modalities=["text", "audio"], # 使用するモダリティ
input_audio_transcription=InputAudioTranscription(
model="whisper-1"
), # 文字起こし用whisper-1
turn_detection=TurnDetection(
type="server_vad",
threshold=0.3,
prefix_padding_ms=300,
silence_duration_ms=600,
), # 無音検出用サーバーVAD
)
)
self.ws.send(config_event.model_dump_json(exclude_none=True))
self.log_ws_message(config_event.model_dump_json(exclude_none=True), "Sent")
# リスナーを起動
websocket_thread = threading.Thread(target=self.receive_messages_thread)
websocket_thread.daemon = True
websocket_thread.start()
except Exception as e:
print(f"WebSocket接続エラー: {e}")
##### Weaveインテグレーションとメッセージハンドラー #####
def handle_assistant_response_audio_delta(self, data: ResponseAudioDelta):
if data.response_id not in self.assistant_outputs:
self.assistant_outputs[data.response_id] = StreamingWavWriter(
framerate=OAI_SAMPLE_RATE
)
data_bytes = base64.b64decode(data.delta)
self.assistant_outputs[data.response_id].append_int16_chunk(data_bytes)
if enable_audio_playback:
self.playback_stream.write(data_bytes)
return {"assistant_audio": data_bytes}
@weave.op()
def handle_assistant_response_done(self, data: ResponseDone):
wave_file_stream = self.assistant_outputs[data.response.id]
wave_file_stream.close()
wave_file_stream.buffer.seek(0)
weave_payload = {
"assistant_audio": wave.open(wave_file_stream.get_wav_buffer(), "rb"),
"assistant_transcript": data.response.output[0]
.content[0]
.get("transcript", "文字起こしを利用できません。"),
}
return weave_payload
@weave.op()
def handle_user_transcription_done(
self, data: ConversationItemInputAudioTranscriptionCompleted
):
return {"user_transcript": data.transcript}
##### メッセージ受信と送信 #####
def receive_messages_thread(self):
while not self.stop_event.is_set():
try:
data = json.loads(self.ws.recv())
self.log_ws_message(json.dumps(data, indent=2))
parsed_event = parse_server_event(data)
if parsed_event.type == ServerEventTypes.RESPONSE_AUDIO_TRANSCRIPT_DONE:
print("アシスタント: ", parsed_event.transcript)
elif (
parsed_event.type
== ServerEventTypes.CONVERSATION_ITEM_INPUT_AUDIO_TRANSCRIPTION_COMPLETED
):
print("ユーザー: ", parsed_event.transcript)
self.handle_user_transcription_done(parsed_event)
elif parsed_event.type == ServerEventTypes.RESPONSE_AUDIO_DELTA:
self.handle_assistant_response_audio_delta(parsed_event)
elif parsed_event.type == ServerEventTypes.RESPONSE_DONE:
self.handle_assistant_response_done(parsed_event)
elif parsed_event.type == ServerEventTypes.ERROR:
print(
f"\nサーバーエラー: {parsed_event.error.model_dump_json(exclude_none=True)}"
)
except websocket.WebSocketConnectionClosedException:
print("\nWebSocket接続が閉じられました")
break
except json.JSONDecodeError:
continue
except Exception as e:
print(f"\nreceive_messagesでエラーが発生しました: {e}")
break
def send_audio(self, audio_chunk):
if self.ws and self.ws.connected:
self.input_audio_buffer = np.append(
self.input_audio_buffer, np.frombuffer(audio_chunk, dtype=np.int16)
)
if len(self.input_audio_buffer) >= SAMPLE_RATE * CHUNK_DURATION:
try:
# OAIサンプルレートにリサンプリング
resampled_audio = (
resampy.resample(
self.input_audio_buffer, SAMPLE_RATE, OAI_SAMPLE_RATE
)
if SAMPLE_RATE != OAI_SAMPLE_RATE
else self.input_audio_buffer
)
# OAI APIにオーディオチャンクを送信
audio_event = InputAudioBufferAppend(
audio=base64.b64encode(
resampled_audio.astype(np.int16).tobytes()
).decode("utf-8") # オーディオ配列をb64バイトに変換
)
self.ws.send(audio_event.model_dump_json(exclude_none=True))
self.log_ws_message(
audio_event.model_dump_json(exclude_none=True), "Sent"
)
finally:
self.user_wav_writer.append_int16_chunk(self.input_audio_buffer)
# オーディオバッファをクリア
self.input_audio_buffer = np.array([], dtype=np.int16)
else:
print("オーディオ送信エラー: WebSocketが初期化されていません。")
##### 汎用ユーティリティ関数 #####
def log_ws_message(self, message, direction="受信"):
with open("websocket_log.txt", "a") as log_file:
log_file.write(
f"{time.strftime('%Y-%m-%d %H:%M:%S')} - {direction}: {message}\n"
)
def stop(self):
self.stop_event.set()
if self.ws:
self.ws.close()
self.user_wav_writer.close()
オーディオレコーダー
send_audio methodに接続されたハンドラー付きのpyaudio入力ストリームを使用します。このストリームはメインスレッドに返されるため、プログラムの終了時に安全に停止できます。
# オーディオキャプチャストリーム
def record_audio(realtime_model: RTAudioModel) -> pyaudio.Stream:
"""Pyaudioの入力ストリームをセットアップし、RTAudioModelをストリーミングデータのコールバックとして使用する。"""
def audio_callback(in_data, frame_count, time_info, status):
realtime_model.send_audio(in_data)
return (None, pyaudio.paContinue)
p = pyaudio.PyAudio()
stream = p.open(
format=pyaudio.paInt16,
channels=INPUT_DEVICE_CHANNELS,
rate=SAMPLE_RATE,
input=True,
input_device_index=INPUT_DEVICE_INDEX,
frames_per_buffer=CHUNK,
stream_callback=audio_callback,
)
stream.start_stream()
print("録音を開始しました。パーソナルアシスタントに話しかけてください...")
return stream
メインスレッド (実行してください!)
weave.init(project_name="realtime-oai-audio-testing")
realtime_model = RTAudioModel()
if realtime_model.ws and realtime_model.ws.connected:
recording_stream: pyaudio.Stream = record_audio(realtime_model)
try:
while not realtime_model.stop_event.is_set():
time.sleep(1)
except KeyboardInterrupt:
pass
except Exception as e:
print(f"メインループでエラーが発生しました: {e}")
import traceback
traceback.print_exc()
finally:
print("終了中...")
realtime_model.stop()
if recording_stream and recording_stream.is_active():
recording_stream.stop_stream()
recording_stream.close()
else:
print(
"WebSocket接続に失敗しました。APIキーとインターネット接続を確認してください。"
)