From 1cb00f8fed78052aacbb9e0fac997b6ba0d44d2a Mon Sep 17 00:00:00 2001 From: Robert Craigie Date: Mon, 16 Dec 2024 19:16:25 +0000 Subject: [PATCH] docs: add examples + guidance on Realtime API support --- README.md | 61 ++++++ examples/realtime/audio_util.py | 142 +++++++++++++ examples/realtime/push_to_talk_app.py | 281 ++++++++++++++++++++++++++ mypy.ini | 5 +- pyproject.toml | 5 + 5 files changed, 493 insertions(+), 1 deletion(-) create mode 100644 examples/realtime/audio_util.py create mode 100755 examples/realtime/push_to_talk_app.py diff --git a/README.md b/README.md index cbcfdb4447..4c3ba87c97 100644 --- a/README.md +++ b/README.md @@ -258,6 +258,67 @@ We recommend that you always instantiate a client (e.g., with `client = OpenAI() - It's harder to mock for testing purposes - It's not possible to control cleanup of network connections +## Realtime API beta + +The Realtime API enables you to build low-latency, multi-modal conversational experiences. It currently supports text and audio as both input and output, as well as [function calling](https://platform.openai.com/docs/guides/function-calling) through a WebSocket connection. + +Under the hood the SDK uses the [`websockets`](https://websockets.readthedocs.io/en/stable/) library to manage connections. + +The Realtime API works through a combination of client-sent events and server-sent events. Clients can send events to do things like update session configuration or send text and audio inputs. Server events confirm when audio responses have completed, or when a text response from the model has been received. A full event reference can be found [here](platform.openai.com/docs/api-reference/realtime-client-events) and a guide can be found [here](https://platform.openai.com/docs/guides/realtime). + +Basic text based example: + +```py +import asyncio +from openai import AsyncOpenAI + +async def main(): + client = AsyncOpenAI() + + async with client.beta.realtime.connect(model="gpt-4o-realtime-preview-2024-10-01") as connection: + await connection.session.update(session={'modalities': ['text']}) + + await connection.conversation.item.create( + item={ + "type": "message", + "role": "user", + "content": [{"type": "input_text", "text": "Say hello!"}], + } + ) + await connection.response.create() + + async for event in connection: + if event.type == 'response.text.delta': + print(event.delta, flush=True, end="") + + elif event.type == 'response.text.done': + print() + + elif event.type == "response.done": + break + +asyncio.run(main()) +``` + +However the real magic of the Realtime API is handling audio inputs / outputs, see this example [TUI script](https://github.com/stainless-sdks/openai-python/blob/robert/realtime-docs-preview/examples/realtime/push_to_talk_app.py) for a fully fledged example. + +### Realtime error handling + +Whenever an error occurs, the Realtime API will send an [`error` event](https://platform.openai.com/docs/guides/realtime/realtime-api-beta#handling-errors) and the connection will stay open and remain usable. This means you need to handle it yourself, as *no errors are raised directly* by the SDK when an `error` event comes in. + +```py +client = AsyncOpenAI() + +async with client.beta.realtime.connect(model="gpt-4o-realtime-preview-2024-10-01") as connection: + ... + async for event in connection: + if event.type == 'error': + print(event.error.type) + print(event.error.code) + print(event.error.event_id) + print(event.error.message) +``` + ## Using types Nested request parameters are [TypedDicts](https://docs.python.org/3/library/typing.html#typing.TypedDict). Responses are [Pydantic models](https://docs.pydantic.dev) which also provide helper methods for things like: diff --git a/examples/realtime/audio_util.py b/examples/realtime/audio_util.py new file mode 100644 index 0000000000..b073cc45be --- /dev/null +++ b/examples/realtime/audio_util.py @@ -0,0 +1,142 @@ +from __future__ import annotations + +import io +import base64 +import asyncio +import threading +from typing import Callable, Awaitable + +import numpy as np +import pyaudio +import sounddevice as sd +from pydub import AudioSegment + +from openai.resources.beta.realtime.realtime import AsyncRealtimeConnection + +CHUNK_LENGTH_S = 0.05 # 100ms +SAMPLE_RATE = 24000 +FORMAT = pyaudio.paInt16 +CHANNELS = 1 + +# pyright: reportUnknownMemberType=false, reportUnknownVariableType=false, reportUnknownArgumentType=false + + +def audio_to_pcm16_base64(audio_bytes: bytes) -> bytes: + # load the audio file from the byte stream + audio = AudioSegment.from_file(io.BytesIO(audio_bytes)) + print(f"Loaded audio: {audio.frame_rate=} {audio.channels=} {audio.sample_width=} {audio.frame_width=}") + # resample to 24kHz mono pcm16 + pcm_audio = audio.set_frame_rate(SAMPLE_RATE).set_channels(CHANNELS).set_sample_width(2).raw_data + return pcm_audio + + +class AudioPlayerAsync: + def __init__(self): + self.queue = [] + self.lock = threading.Lock() + self.stream = sd.OutputStream( + callback=self.callback, + samplerate=SAMPLE_RATE, + channels=CHANNELS, + dtype=np.int16, + blocksize=int(CHUNK_LENGTH_S * SAMPLE_RATE), + ) + self.playing = False + self._frame_count = 0 + + def callback(self, outdata, frames, time, status): # noqa + with self.lock: + data = np.empty(0, dtype=np.int16) + + # get next item from queue if there is still space in the buffer + while len(data) < frames and len(self.queue) > 0: + item = self.queue.pop(0) + frames_needed = frames - len(data) + data = np.concatenate((data, item[:frames_needed])) + if len(item) > frames_needed: + self.queue.insert(0, item[frames_needed:]) + + self._frame_count += len(data) + + # fill the rest of the frames with zeros if there is no more data + if len(data) < frames: + data = np.concatenate((data, np.zeros(frames - len(data), dtype=np.int16))) + + outdata[:] = data.reshape(-1, 1) + + def reset_frame_count(self): + self._frame_count = 0 + + def get_frame_count(self): + return self._frame_count + + def add_data(self, data: bytes): + with self.lock: + # bytes is pcm16 single channel audio data, convert to numpy array + np_data = np.frombuffer(data, dtype=np.int16) + self.queue.append(np_data) + if not self.playing: + self.start() + + def start(self): + self.playing = True + self.stream.start() + + def stop(self): + self.playing = False + self.stream.stop() + with self.lock: + self.queue = [] + + def terminate(self): + self.stream.close() + + +async def send_audio_worker_sounddevice( + connection: AsyncRealtimeConnection, + should_send: Callable[[], bool] | None = None, + start_send: Callable[[], Awaitable[None]] | None = None, +): + sent_audio = False + + device_info = sd.query_devices() + print(device_info) + + read_size = int(SAMPLE_RATE * 0.02) + + stream = sd.InputStream( + channels=CHANNELS, + samplerate=SAMPLE_RATE, + dtype="int16", + ) + stream.start() + + try: + while True: + if stream.read_available < read_size: + await asyncio.sleep(0) + continue + + data, _ = stream.read(read_size) + + if should_send() if should_send else True: + if not sent_audio and start_send: + await start_send() + await connection.send( + {"type": "input_audio_buffer.append", "audio": base64.b64encode(data).decode("utf-8")} + ) + sent_audio = True + + elif sent_audio: + print("Done, triggering inference") + await connection.send({"type": "input_audio_buffer.commit"}) + await connection.send({"type": "response.create", "response": {}}) + sent_audio = False + + await asyncio.sleep(0) + + except KeyboardInterrupt: + pass + finally: + stream.stop() + stream.close() diff --git a/examples/realtime/push_to_talk_app.py b/examples/realtime/push_to_talk_app.py new file mode 100755 index 0000000000..d46945a8ed --- /dev/null +++ b/examples/realtime/push_to_talk_app.py @@ -0,0 +1,281 @@ +#!/usr/bin/env uv run +#################################################################### +# Sample TUI app with a push to talk interface to the Realtime API # +# If you have `uv` installed and the `OPENAI_API_KEY` # +# environment variable set, you can run this example with just # +# # +# `./examples/realtime/push_to_talk_app.py` # +#################################################################### +# +# /// script +# requires-python = ">=3.9" +# dependencies = [ +# "textual", +# "numpy", +# "pyaudio", +# "pydub", +# "sounddevice", +# "openai[realtime]", +# ] +# +# [tool.uv.sources] +# openai = { path = "../../", editable = true } +# /// +from __future__ import annotations + +import base64 +import asyncio +from typing import Any, cast +from typing_extensions import override + +from textual import events +from audio_util import CHANNELS, SAMPLE_RATE, AudioPlayerAsync +from textual.app import App, ComposeResult +from textual.widgets import Button, Static, RichLog +from textual.reactive import reactive +from textual.containers import Container + +from openai import AsyncOpenAI +from openai.types.beta.realtime.session import Session +from openai.resources.beta.realtime.realtime import AsyncRealtimeConnection + + +class SessionDisplay(Static): + """A widget that shows the current session ID.""" + + session_id = reactive("") + + @override + def render(self) -> str: + return f"Session ID: {self.session_id}" if self.session_id else "Connecting..." + + +class AudioStatusIndicator(Static): + """A widget that shows the current audio recording status.""" + + is_recording = reactive(False) + + @override + def render(self) -> str: + status = ( + "🔴 Recording... (Press K to stop)" if self.is_recording else "⚪ Press K to start recording (Q to quit)" + ) + return status + + +class RealtimeApp(App[None]): + CSS = """ + Screen { + background: #1a1b26; /* Dark blue-grey background */ + } + + Container { + border: double rgb(91, 164, 91); + } + + Horizontal { + width: 100%; + } + + #input-container { + height: 5; /* Explicit height for input container */ + margin: 1 1; + padding: 1 2; + } + + Input { + width: 80%; + height: 3; /* Explicit height for input */ + } + + Button { + width: 20%; + height: 3; /* Explicit height for button */ + } + + #bottom-pane { + width: 100%; + height: 82%; /* Reduced to make room for session display */ + border: round rgb(205, 133, 63); + content-align: center middle; + } + + #status-indicator { + height: 3; + content-align: center middle; + background: #2a2b36; + border: solid rgb(91, 164, 91); + margin: 1 1; + } + + #session-display { + height: 3; + content-align: center middle; + background: #2a2b36; + border: solid rgb(91, 164, 91); + margin: 1 1; + } + + Static { + color: white; + } + """ + + client: AsyncOpenAI + should_send_audio: asyncio.Event + audio_player: AudioPlayerAsync + last_audio_item_id: str | None + connection: AsyncRealtimeConnection | None + session: Session | None + connected: asyncio.Event + + def __init__(self) -> None: + super().__init__() + self.connection = None + self.session = None + self.client = AsyncOpenAI() + self.audio_player = AudioPlayerAsync() + self.last_audio_item_id = None + self.should_send_audio = asyncio.Event() + self.connected = asyncio.Event() + + @override + def compose(self) -> ComposeResult: + """Create child widgets for the app.""" + with Container(): + yield SessionDisplay(id="session-display") + yield AudioStatusIndicator(id="status-indicator") + yield RichLog(id="bottom-pane", wrap=True, highlight=True, markup=True) + + async def on_mount(self) -> None: + self.run_worker(self.handle_realtime_connection()) + self.run_worker(self.send_mic_audio()) + + async def handle_realtime_connection(self) -> None: + async with self.client.beta.realtime.connect(model="gpt-4o-realtime-preview-2024-10-01") as conn: + self.connection = conn + self.connected.set() + + # note: this is the default and can be omitted + # if you want to manually handle VAD yourself, then set `'turn_detection': None` + await conn.session.update(session={"turn_detection": {"type": "server_vad"}}) + + acc_items: dict[str, Any] = {} + + async for event in conn: + if event.type == "session.created": + self.session = event.session + session_display = self.query_one(SessionDisplay) + assert event.session.id is not None + session_display.session_id = event.session.id + continue + + if event.type == "session.updated": + self.session = event.session + continue + + if event.type == "response.audio.delta": + if event.item_id != self.last_audio_item_id: + self.audio_player.reset_frame_count() + self.last_audio_item_id = event.item_id + + bytes_data = base64.b64decode(event.delta) + self.audio_player.add_data(bytes_data) + continue + + if event.type == "response.audio_transcript.delta": + try: + text = acc_items[event.item_id] + except KeyError: + acc_items[event.item_id] = event.delta + else: + acc_items[event.item_id] = text + event.delta + + # Clear and update the entire content because RichLog otherwise treats each delta as a new line + bottom_pane = self.query_one("#bottom-pane", RichLog) + bottom_pane.clear() + bottom_pane.write(acc_items[event.item_id]) + continue + + async def _get_connection(self) -> AsyncRealtimeConnection: + await self.connected.wait() + assert self.connection is not None + return self.connection + + async def send_mic_audio(self) -> None: + import sounddevice as sd # type: ignore + + sent_audio = False + + device_info = sd.query_devices() + print(device_info) + + read_size = int(SAMPLE_RATE * 0.02) + + stream = sd.InputStream( + channels=CHANNELS, + samplerate=SAMPLE_RATE, + dtype="int16", + ) + stream.start() + + status_indicator = self.query_one(AudioStatusIndicator) + + try: + while True: + if stream.read_available < read_size: + await asyncio.sleep(0) + continue + + await self.should_send_audio.wait() + status_indicator.is_recording = True + + data, _ = stream.read(read_size) + + connection = await self._get_connection() + if not sent_audio: + asyncio.create_task(connection.send({"type": "response.cancel"})) + sent_audio = True + + await connection.input_audio_buffer.append(audio=base64.b64encode(cast(Any, data)).decode("utf-8")) + + await asyncio.sleep(0) + except KeyboardInterrupt: + pass + finally: + stream.stop() + stream.close() + + async def on_key(self, event: events.Key) -> None: + """Handle key press events.""" + if event.key == "enter": + self.query_one(Button).press() + return + + if event.key == "q": + self.exit() + return + + if event.key == "k": + status_indicator = self.query_one(AudioStatusIndicator) + if status_indicator.is_recording: + self.should_send_audio.clear() + status_indicator.is_recording = False + + if self.session and self.session.turn_detection is None: + # The default in the API is that the model will automatically detect when the user has + # stopped talking and then start responding itself. + # + # However if we're in manual `turn_detection` mode then we need to + # manually tell the model to commit the audio buffer and start responding. + conn = await self._get_connection() + await conn.input_audio_buffer.commit() + await conn.response.create() + else: + self.should_send_audio.set() + status_indicator.is_recording = True + + +if __name__ == "__main__": + app = RealtimeApp() + app.run() diff --git a/mypy.ini b/mypy.ini index 50e5add04b..1ea1fe909d 100644 --- a/mypy.ini +++ b/mypy.ini @@ -8,7 +8,10 @@ show_error_codes = True # # We also exclude our `tests` as mypy doesn't always infer # types correctly and Pyright will still catch any type errors. -exclude = ^(src/openai/_files\.py|src/openai/_utils/_logs\.py|_dev/.*\.py|tests/.*)$ + +# realtime examples use inline `uv` script dependencies +# which means it can't be type checked +exclude = ^(src/openai/_files\.py|_dev/.*\.py|tests/.*|src/openai/_utils/_logs\.py|examples/realtime/audio_util\.py|examples/realtime/push_to_talk_app\.py)$ strict_equality = True implicit_reexport = True diff --git a/pyproject.toml b/pyproject.toml index f83aff6fee..8e78257e67 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -157,6 +157,11 @@ exclude = [ "_dev", ".venv", ".nox", + + # uses inline `uv` script dependencies + # which means it can't be type checked + "examples/realtime/audio_util.py", + "examples/realtime/push_to_talk_app.py" ] reportImplicitOverride = true