Passer au contenu principal
L’intégration de Weave avec l’API Realtime d’OpenAI vous permet de tracer automatiquement les interactions vocales de votre application en temps réel. Vous pouvez l’utiliser pour capturer les conversations entre vos agents et les utilisateurs afin d’examiner et d’évaluer les performances des agents. Ce guide vous montre comment activer le traçage de l’API Realtime dans votre application et vous présente deux exemples complets d’assistants vocaux que vous pouvez exécuter localement.

Intégrer les traces en temps réel

Cette section présente le code minimal nécessaire pour activer le tracing Weave pour l’API Realtime d’OpenAI dans toute application. Weave s’intègre automatiquement à l’API Realtime d’OpenAI et ne nécessite que quelques lignes de code supplémentaires pour commencer à capturer les interactions audio de votre application. Le code suivant importe Weave et l’intégration à l’API Realtime d’OpenAI :
import weave
from weave.integrations import patch_openai_realtime

weave.init("your-team-name/your-project-name")
patch_openai_realtime()

# La logique de votre application
Une fois Weave importé dans votre code et exécuté, Weave trace automatiquement les interactions entre l’utilisateur et l’API Realtime d’OpenAI.

Exécutez un assistant vocal en temps réel avec le SDK OpenAI Agents

Cet exemple exécute un assistant vocal en temps réel qui transmet en continu l’audio du microphone à l’API Realtime d’OpenAI et lit les réponses orales de l’IA via le haut-parleur de votre machine locale. L’application utilise le SDK OpenAI Agents avec RealtimeAgent et RealtimeRunner, et active le tracing Weave via le patch patch_openai_realtime(). Utilisez cette approche si vous souhaitez que le SDK Agents de plus haut niveau gère la session Realtime à votre place. Pour exécuter l’exemple, suivez les étapes ci-dessous :
  1. Démarrez votre environnement Python et installez les bibliothèques suivantes :
    uv add weave openai-agents websockets pyaudio numpy
    
  2. Créez un fichier nommé weave_voice_assistant.py et ajoutez-y le code suivant. Les lignes surlignées montrent l’intégration de Weave dans l’application. Le reste du code crée l’application de base de l’assistant vocal.
    import argparse
    import asyncio
    import queue
    import sys
    import termios
    import threading
    import tty
    import weave
    import pyaudio
    import numpy as np
    from weave.integrations import patch_openai_realtime
    from agents.realtime import RealtimeAgent, RealtimeRunner
    
    DEFAULT_WEAVE_PROJECT = "<your-team-name/your-project-name>"
    
    FORMAT = pyaudio.paInt16
    RATE = 24000  # Required by the OpenAI Realtime API.
    CHUNK = 1024
    MAX_INPUT_CHANNELS = 1
    MAX_OUTPUT_CHANNELS = 1
    
    INP_DEV_IDX = None
    OUT_DEV_IDX = None
    
    # Analyse les arguments CLI pour le nom du projet Weave et la sélection du périphérique audio.
    def parse_args():
        parser = argparse.ArgumentParser(description="Realtime agent with Weave logging")
        parser.add_argument(
            "--weave-project",
            default=DEFAULT_WEAVE_PROJECT,
            help=f"Weave project name (default: {DEFAULT_WEAVE_PROJECT})",
            dest="weave_project"
        )
        parser.add_argument(
            "--input-device",
            type=int,
            default=None,
            help="PyAudio input (mic) device index. Defaults to system default. Run mic_detect.py to list devices.",
            dest="input_device"
        )
        parser.add_argument(
            "--output-device",
            type=int,
            default=None,
            help="PyAudio output (speaker) device index. Defaults to system default. Run mic_detect.py to list devices.",
            dest="output_device"
        )
        return parser.parse_args()
    
    
    # Initialise Weave et applique le patch à l'API Realtime d'OpenAI pour le tracing.
    def init_weave(project_name: str | None = None) -> None:
        name = project_name or DEFAULT_WEAVE_PROJECT
        weave.init(name)
        patch_openai_realtime()  # Enables automatic tracing of Realtime API sessions.
    
    
    mic_enabled = True
    
    # Écoute la touche 't' pour activer/désactiver le micro. S'exécute dans un thread démon.
    def start_keylistener():
        global mic_enabled
        fd = sys.stdin.fileno()
        old_settings = termios.tcgetattr(fd)
        try:
            tty.setcbreak(fd)
            while True:
                ch = sys.stdin.read(1)
                if ch.lower() == 't':
                    mic_enabled = not mic_enabled
                    state = "ON" if mic_enabled else "OFF"
                    print(f"\n🎙  Mic {state} (press t to toggle)")
                elif ch == '\x03':  # Ctrl-C
                    break
        finally:
            termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
    
    
    # Vide la file d'attente audio et écrit vers le haut-parleur dans un thread en arrière-plan.
    def play_audio(output_stream: pyaudio.Stream, audio_output_queue: queue.Queue):
        while True:
            data = audio_output_queue.get()
            if data is None:
                break
            output_stream.write(data)
    
    
    # Ouvre les flux audio, démarre une session Realtime et exécute la boucle d'envoi/réception.
    async def main(*, input_device_index: int | None = None, output_device_index: int | None = None):
        p = pyaudio.PyAudio()
    
        if input_device_index is None:
            input_device_index = int(p.get_default_input_device_info()['index'])
        if output_device_index is None:
            output_device_index = int(p.get_default_output_device_info()['index'])
    
        # Limite le nombre de canaux aux capacités du périphérique pour éviter les erreurs pyaudio.
        input_info = p.get_device_info_by_index(input_device_index)
        output_info = p.get_device_info_by_index(output_device_index)
    
        input_channels = min(int(input_info['maxInputChannels']), MAX_INPUT_CHANNELS)
        output_channels = min(int(output_info['maxOutputChannels']), MAX_OUTPUT_CHANNELS)
    
        mic = p.open(
            format=FORMAT,
            channels=input_channels,
            rate=RATE,
            input=True,
            output=False,
            frames_per_buffer=CHUNK,
            input_device_index=input_device_index,
            start=False,
        )
        speaker = p.open(
            format=FORMAT,
            channels=output_channels,
            rate=RATE,
            input=False,
            output=True,
            frames_per_buffer=CHUNK,
            output_device_index=output_device_index,
            start=False,
        )
        mic.start_stream()
        speaker.start_stream()
    
        # Met l'audio en mémoire tampon via une file d'attente afin que la lecture en cours puisse être interrompue proprement.
        audio_output_queue = queue.Queue()
        threading.Thread(
            target=play_audio, args=(speaker, audio_output_queue), daemon=True
        ).start()
    
        s_agent = RealtimeAgent(
            name="Speech Assistant",
            instructions="You are a tool using AI. Use tools to accomplish a task whenever possible"
        )
    
        s_runner = RealtimeRunner(s_agent, config={
            "model_settings": {
                "model_name": "gpt-realtime",
                "modalities": ["audio"],
                "output_modalities": ["audio"],
                "input_audio_format": "pcm16",
                "output_audio_format": "pcm16",
                "speed": 1.2,
                "turn_detection": {
                    "prefix_padding_ms": 100,
                    "silence_duration_ms": 100,
                    "type": "server_vad",
                    "interrupt_response": True,
                    "create_response": True,
                },
            }
        })
        print("--- Session Active (Speak into mic) ---")
        print("🎙  Mic ON (press t to toggle)")
    
        threading.Thread(target=start_keylistener, daemon=True).start()
    
        async with await s_runner.run() as session:
            # Diffuse l'entrée micro vers l'API Realtime en envoyant du silence lorsque le micro est coupé.
            async def send_mic_audio():
                silence = b'\x00' * CHUNK * 2  # 2 bytes per sample (16-bit PCM).
                try:
                    while True:
                        raw_data = mic.read(CHUNK, exception_on_overflow=False)
    
                        if mic_enabled:
                            audio_data = np.frombuffer(raw_data, dtype=np.int16).astype(np.float64)
                            rms = np.sqrt(np.mean(audio_data**2))
                            meter = int(min(rms / 50, 50))
                            print(f"Mic Level: {'█' * meter}{' ' * (50-meter)} | 🎙 ON ", end="\r")
                            await session.send_audio(raw_data)
                        else:
                            print(f"Mic Level: {' ' * 50} | 🎙 OFF", end="\r")
                            await session.send_audio(silence)
    
                        await asyncio.sleep(0)  # Cède la main à la boucle d'événements entre les lectures.
                except Exception:
                    pass
    
            # Reçoit les événements de la session et achemine l'audio vers le haut-parleur.
            async def handle_events():
                async for event in session:
                    if event.type == "audio":
                        audio_output_queue.put(event.audio.data)
                    elif event.type == "audio_interrupted":
                        # Vide l'audio IA en attente pour éviter qu'il ne couvre la voix de l'utilisateur.
                        while not audio_output_queue.empty():
                            try:
                                audio_output_queue.get_nowait()
                            except queue.Empty:
                                break
    
            mic_task = asyncio.create_task(send_mic_audio())
            try:
                await handle_events()
            finally:
                mic_task.cancel()
    
        # Nettoyage
        audio_output_queue.put(None)  # Signale au thread de lecture de se terminer.
        mic.close()
        speaker.close()
        p.terminate()
    
    if __name__ == "__main__":
        args = parse_args()
        init_weave(args.weave_project)
        fd = sys.stdin.fileno()
        old_settings = termios.tcgetattr(fd)
        try:
            asyncio.run(main(input_device_index=args.input_device, output_device_index=args.output_device))
        finally:
            termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
    
  3. Mettez à jour la valeur de DEFAULT_WEAVE_PROJECT avec le nom de votre équipe et celui de votre projet.
  4. Définissez la variable d’environnement OPENAI_API_KEY.
  5. Lancez l’assistant vocal :
    python weave_voice_assistant.py
    
Une fois que l’assistant est en cours d’exécution, appuyez sur la touche T de votre clavier pour couper ou réactiver le micro. L’assistant utilise la détection de l’activité vocale côté serveur pour gérer la prise de parole à tour de rôle et les interruptions. Lorsque vous parlez à l’assistant, Weave capture des traces que vous pouvez explorer dans la Weave UI, y compris l’audio de la session.

Exécuter un assistant vocal en temps réel avec WebSockets

L’exemple suivant se connecte directement à l’OpenAI Realtime API via WebSockets. Il transmet l’audio du microphone à l’API, lit les réponses vocales et prend en charge l’appel d’outils (recherches météo, évaluation mathématique, exécution de code et écriture de fichiers). Weave trace la session à l’aide de weave.init() et patch_openai_realtime(). Utilisez cette approche lorsque vous souhaitez un contrôle total sur la session Realtime sans dépendre du SDK Agents. Pour exécuter l’exemple, effectuez les étapes suivantes :
  1. Lancez votre environnement Python et installez les bibliothèques suivantes :
    uv add weave websockets pyaudio numpy
    
  2. Créez un fichier intitulé tool_definitions.py et ajoutez-y les définitions d’outil suivantes. L’application principale importe ce module.
    import json
    import subprocess
    import tempfile
    from pathlib import Path
    import weave
    
    
    # @function_tool
    @weave.op
    def get_weather(city: str) -> str:
        """Obtenir la météo actuelle d'une ville.
    
        Arguments:
            city: Le nom de la ville pour laquelle obtenir la météo.
        """
        return json.dumps({"city": city, "temperature": "72°F", "condition": "sunny"})
    
    
    @weave.op
    def calculate(expression: str) -> str:
        """Évaluer une expression mathématique et renvoyer le résultat.
    
        Arguments:
            expression: Une expression mathématique à évaluer, par ex. '2 + 2'.
        """
        try:
            result = eval(expression)
            return str(result)
        except Exception as e:
            return f"Error: {e}"
    
    
    @weave.op
    def run_python_code(code: str) -> str:
        """Écrire et exécuter un script Python, puis renvoyer sa sortie stdout/stderr.
    
        Arguments:
            code: Le code source Python à exécuter.
        """
        with tempfile.NamedTemporaryFile(
            mode="w", suffix=".py", dir=tempfile.gettempdir(), delete=False
        ) as f:
            f.write(code)
            script_path = Path(f.name)
    
        try:
            result = subprocess.run(
                ["python", str(script_path)],
                capture_output=True,
                text=True,
                timeout=30,
            )
            output = result.stdout
            if result.stderr:
                output += f"\nSTDERR:\n{result.stderr}"
            if result.returncode != 0:
                output += f"\n(exit code {result.returncode})"
            return output or "(no output)"
        except subprocess.TimeoutExpired:
            return "Error: script timed out after 30 seconds."
        finally:
            script_path.unlink(missing_ok=True)
    
    
    @weave.op
    async def write_file(file_path: str, content: str) -> str:
        """Écrire du contenu dans un fichier sur disque.
    
        Arguments:
            file_path: Le chemin du fichier dans lequel écrire.
            content: Le contenu à écrire dans le fichier.
        """
        try:
            path = Path(file_path)
            path.parent.mkdir(parents=True, exist_ok=True)
            path.write_text(content)
            return f"Wrote {len(content)} bytes to {file_path}"
        except Exception as e:
            return f"Error writing file: {e}"
    
  3. Créez un fichier nommé weave_ws_voice_assistant.py dans le même répertoire, puis ajoutez-y le code suivant.
    import asyncio
    import base64
    import json
    import os
    import queue
    import threading
    from typing import Any, Callable
    
    import numpy as np
    import pyaudio
    import websockets
    
    import weave
    weave.init("<your-team-name/your-project-name>")
    from weave.integrations import patch_openai_realtime
    patch_openai_realtime()
    
    from tool_definitions import (
        calculate,
        get_weather,
        run_python_code,
        write_file,
    )
    
    # Audio format (must be PCM16 for the Realtime API).
    FORMAT = pyaudio.paInt16
    RATE = 24000
    CHUNK = 1024
    MAX_INPUT_CHANNELS = 2
    MAX_OUTPUT_CHANNELS = 2
    
    OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "")
    REALTIME_URL = "wss://api.openai.com/v1/realtime?model=gpt-realtime"
    
    DEBUG_WRITE_LOG = False
    
    # Map tool name -> callable for function call dispatch.
    TOOL_REGISTRY: dict[str, Callable[..., Any]] = {
        "get_weather": get_weather,
        "calculate": calculate,
        "run_python_code": run_python_code,
        "write_file": write_file,
    }
    
    # Raw tool definitions for the Realtime API session config.
    TOOL_DEFINITIONS = [
        {
            "type": "function",
            "name": "get_weather",
            "description": "Get the current weather for a city.",
            "parameters": {
                "type": "object",
                "properties": {
                    "city": {
                        "type": "string",
                        "description": "The city name to get weather for.",
                    }
                },
                "required": ["city"],
            },
        },
        {
            "type": "function",
            "name": "calculate",
            "description": "Evaluate a math expression and return the result.",
            "parameters": {
                "type": "object",
                "properties": {
                    "expression": {
                        "type": "string",
                        "description": "A math expression to evaluate, e.g. '2 + 2'.",
                    }
                },
                "required": ["expression"],
            },
        },
        {
            "type": "function",
            "name": "run_python_code",
            "description": "Write and execute a Python script, returning its stdout/stderr.",
            "parameters": {
                "type": "object",
                "properties": {
                    "code": {
                        "type": "string",
                        "description": "The Python source code to execute.",
                    }
                },
                "required": ["code"],
            },
        },
        {
            "type": "function",
            "name": "write_file",
            "description": "Write content to a file on disk.",
            "parameters": {
                "type": "object",
                "properties": {
                    "file_path": {
                        "type": "string",
                        "description": "The path to write the file to.",
                    },
                    "content": {
                        "type": "string",
                        "description": "The content to write into the file.",
                    },
                },
                "required": ["file_path", "content"],
            },
        },
    ]
    
    
    async def send_event(ws, event: dict) -> None:
        await ws.send(json.dumps(event))
    
    
    async def configure_session(ws) -> None:
        event = {
            "type": "session.update",
            "session": {
                "type": "realtime",
                "model": "gpt-realtime",
                "output_modalities": ["audio"],
                "instructions": (
                    "You are a helpful AI assistant with access to tools. "
                    "Use tools to accomplish tasks whenever possible. "
                    "Speak clearly and briefly."
                ),
                "tools": TOOL_DEFINITIONS,
                "tool_choice": "auto",
                "audio": {
                    "input": {
                        "format": {"type": "audio/pcm", "rate": 24000},
                        "transcription": {"model": "gpt-4o-transcribe"},
                        "turn_detection": {
                            "type": "server_vad",
                            "threshold": 0.5,
                            "prefix_padding_ms": 300,
                            "silence_duration_ms": 500,
                        },
                    },
                    "output": {
                        "format": {"type": "audio/pcm", "rate": 24000},
                    },
                },
            },
        }
        await send_event(ws, event)
        print("Session configured.")
    
    
    async def handle_function_call(ws, call_id: str, name: str, arguments: str) -> None:
        if not name:
            raise Exception("Did not get a function name")
    
        print(f"\n[Function Call] {name}({arguments})")
        tool_fn = TOOL_REGISTRY.get(name)
        if tool_fn is None:
            result = json.dumps({"error": f"Unknown function: {name}"})
        else:
            try:
                args = json.loads(arguments)
                result = tool_fn(**args)
                if asyncio.iscoroutine(result):
                    result = await result
            except Exception as e:
                result = json.dumps({"error": str(e)})
    
        print(f"[Function Result] {result}")
    
        # Renvoyer le résultat de l'appel de fonction au modèle.
        await send_event(ws, {
            "type": "conversation.item.create",
            "item": {
                "type": "function_call_output",
                "call_id": call_id,
                "output": result if isinstance(result, str) else json.dumps(result),
            },
        })
    
        # Déclencher une nouvelle réponse pour que le modèle intègre le résultat de la fonction.
        await send_event(ws, {"type": "response.create"})
    
    
    def play_audio(output_stream: pyaudio.Stream, audio_output_queue: queue.Queue):
        """S'exécute dans un thread séparé car la méthode write() de pyaudio bloque jusqu'à ce que
        la carte son consomme les échantillons. Découpler la lecture de la boucle d'événements async
        permet de vider la file d'attente lors d'une interruption sans attendre la fin des écritures en cours."""
        while True:
            data = audio_output_queue.get()
            if data is None:
                break
            output_stream.write(data)
    
    
    async def send_mic_audio(ws, mic) -> None:
        try:
            while True:
                raw_data = mic.read(CHUNK, exception_on_overflow=False)
    
                # Indicateur visuel du volume.
                audio_data = np.frombuffer(raw_data, dtype=np.int16).astype(np.float64)
                rms = np.sqrt(np.mean(audio_data**2))
                meter = int(min(rms / 50, 50))
                print(f"Mic Level: {'█' * meter}{' ' * (50 - meter)} |", end="\r")
    
                # Encoder en Base64 et envoyer le fragment audio.
                b64_audio = base64.b64encode(raw_data).decode("utf-8")
                await send_event(ws, {
                    "type": "input_audio_buffer.append",
                    "audio": b64_audio,
                })
    
                await asyncio.sleep(0)
        except asyncio.CancelledError:
            pass
    
    
    async def receive_events(ws, audio_output_queue: queue.Queue) -> None:
        # Accumuler les arguments des appels de fonction à travers les événements delta.
        pending_calls: dict[str, dict] = {}
        async for raw_message in ws:
            if DEBUG_WRITE_LOG:
                with open("data.jsonl", "a", encoding="utf-8") as f:
                    f.write(json.dumps(raw_message) + "\n")
    
            event = json.loads(raw_message)
            event_type = event.get("type", "")
    
            if event_type == "session.created":
                print(raw_message)
    
            elif event_type == "session.updated":
                print(raw_message)
    
            elif event_type == "error":
                print(f"\n[Error] {event}")
    
            elif event_type == "input_audio_buffer.speech_started":
                # Vider la file audio de l'IA pour qu'elle ne parle pas par-dessus l'utilisateur.
                while not audio_output_queue.empty():
                    try:
                        audio_output_queue.get_nowait()
                    except queue.Empty:
                        break
    
            elif event_type == "input_audio_buffer.speech_stopped":
                pass
    
            elif event_type == "input_audio_buffer.committed":
                pass
    
            elif event_type == "response.created":
                pass
    
            elif event_type == "response.output_text.delta":
                pass
    
            elif event_type == "response.output_text.done":
                pass
    
            # Audio output deltas - queue for playback.
            elif event_type == "response.output_audio.delta":
                audio_bytes = base64.b64decode(event.get("delta", ""))
                audio_output_queue.put(audio_bytes)
    
            elif event_type == "response.output_audio_transcript.delta":
                pass
    
            elif event_type == "response.output_audio_transcript.done":
                pass
    
            # Function call started - initialize pending call.
            elif event_type == "response.output_item.added":
                item = event.get("item", {})
                if item.get("type") == "function_call" and item.get("status") == "in_progress":
                    item_id = item.get("id", "")
                    pending_calls[item_id] = {
                        "call_id": item.get("call_id", ""),
                        "name": item.get("name", ""),
                        "arguments": "",
                    }
                    print(f"\n[Function Call Started] {item.get('name', '')}")
    
            # Function call argument deltas - accumulate.
            elif event_type == "response.function_call_arguments.delta":
                item_id = event.get("item_id", "")
                if item_id in pending_calls:
                    pending_calls[item_id]["arguments"] += event.get("delta", "")
    
            elif event_type == "response.function_call_arguments.done":
                item_id = event.get("item_id", "")
                call_info = pending_calls.pop(item_id, None)
                if call_info is None:
                    # Repli : utiliser les données directement depuis l'événement done.
                    call_info = {
                        "call_id": event.get("call_id"),
                        "name": event.get("name"),
                        "arguments": event.get("arguments"),
                    }
                try:
                    await handle_function_call(
                        ws,
                        call_info["call_id"],
                        call_info["name"],
                        call_info["arguments"],
                    )
                except Exception as e:
                    print(f"Échec de l'appel de fonction pour le message {call_info} : erreur - {e}")
    
            elif event_type == "response.done":
                pass
    
            elif event_type == "rate_limits.updated":
                pass
    
            else:
                print(f"\n[Event: {event_type}]")
    
    
    async def main():
        if not OPENAI_API_KEY:
            print("Erreur : la variable d'environnement OPENAI_API_KEY n'est pas définie")
            return
    
        p = pyaudio.PyAudio()
    
        input_device_index = int(p.get_default_input_device_info()['index'])
        output_device_index = int(p.get_default_output_device_info()['index'])
    
        # Le nombre de canaux doit correspondre aux capacités de l'appareil, sinon pyaudio génère une erreur à l'ouverture.
        input_info = p.get_device_info_by_index(input_device_index)
        output_info = p.get_device_info_by_index(output_device_index)
        input_channels = min(int(input_info['maxInputChannels']), 1)
        output_channels = min(int(output_info['maxOutputChannels']), 1)
    
        mic = p.open(
            format=FORMAT,
            channels=input_channels,
            rate=RATE,
            input=True,
            output=False,
            frames_per_buffer=CHUNK,
            input_device_index=input_device_index,
            start=False,
        )
        speaker = p.open(
            format=FORMAT,
            channels=output_channels,
            rate=RATE,
            input=False,
            output=True,
            frames_per_buffer=CHUNK,
            output_device_index=output_device_index,
            start=False,
        )
        mic.start_stream()
        speaker.start_stream()
    
        # L'audio transite par une file d'attente afin de pouvoir la vider lorsque l'utilisateur interrompt la lecture.
        # Écrire directement sur le haut-parleur rend impossible l'annulation de l'audio en cours de lecture.
        audio_output_queue = queue.Queue()
        threading.Thread(
            target=play_audio, args=(speaker, audio_output_queue), daemon=True
        ).start()
    
        headers = {
            "Authorization": f"Bearer {OPENAI_API_KEY}",
        }
    
        print("Connexion à l'OpenAI Realtime API...")
    
        async with websockets.connect(
            REALTIME_URL,
            additional_headers=headers,
        ) as ws:
            print("Connecté ! Configuration de la session...")
            await configure_session(ws)
    
            print("--- Session active (parlez dans le micro) ---")
    
            mic_task = asyncio.create_task(send_mic_audio(ws, mic))
            try:
                await receive_events(ws, audio_output_queue)
            finally:
                mic_task.cancel()
                try:
                    await mic_task
                except asyncio.CancelledError:
                    pass
    
        # Nettoyage
        audio_output_queue.put(None)  # Signal au thread de lecture pour qu'il se termine.
        mic.close()
        speaker.close()
        p.terminate()
        print("\nSession terminée.")
    
    
    if __name__ == "__main__":
        asyncio.run(main())
    
  4. Mettez à jour l’appel à weave.init() avec le nom de votre équipe et de votre projet.
  5. Définissez la variable d’environnement OPENAI_API_KEY.
  6. Lancez l’assistant vocal :
    python weave_ws_voice_assistant.py
    
Une fois que l’assistant est en cours d’exécution, appuyez sur la touche T de votre clavier pour couper ou réactiver le micro. L’assistant utilise la détection de l’activité vocale côté serveur pour gérer la prise de parole à tour de rôle et les interruptions. Lorsque vous parlez à l’assistant, Weave capture des traces que vous pouvez explorer dans la Weave UI, y compris l’audio de la session.