From 8467194dff7878d1af9720b8f7dbf263a9c77cc3 Mon Sep 17 00:00:00 2001 From: aoife cassidy Date: Tue, 23 Jul 2024 20:04:08 -0700 Subject: [PATCH] dev: fix examples (#496) --- examples/minimal_worker.py | 9 ++--- examples/simple-color/agent.py | 10 +++--- examples/speech-to-text/deepgram_stt.py | 11 +++---- examples/text-to-speech/elevenlabs_tts.py | 26 ++++++--------- examples/text-to-speech/openai_tts.py | 15 ++++----- .../text-to-speech/sync_tts_transcription.py | 33 ++++++++----------- examples/voice-assistant/function_calling.py | 11 +++---- .../voice-assistant/simple-rag/assistant.py | 12 +++---- 8 files changed, 48 insertions(+), 79 deletions(-) diff --git a/examples/minimal_worker.py b/examples/minimal_worker.py index f56e46d75..e4ecc5219 100644 --- a/examples/minimal_worker.py +++ b/examples/minimal_worker.py @@ -1,6 +1,6 @@ import logging -from livekit.agents import JobContext, JobRequest, WorkerOptions, cli +from livekit.agents import JobContext, WorkerOptions, cli async def entrypoint(ctx: JobContext): @@ -9,10 +9,5 @@ async def entrypoint(ctx: JobContext): # Add your agent logic here! -async def request_fnc(req: JobRequest) -> None: - logging.info("received request %s", req) - await req.accept(entrypoint) - - if __name__ == "__main__": - cli.run_app(WorkerOptions(request_fnc)) + cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint)) diff --git a/examples/simple-color/agent.py b/examples/simple-color/agent.py index 7b9519640..50251bce6 100644 --- a/examples/simple-color/agent.py +++ b/examples/simple-color/agent.py @@ -2,7 +2,7 @@ import logging from livekit import rtc -from livekit.agents import JobContext, JobRequest, WorkerOptions, cli +from livekit.agents import JobContext, WorkerOptions, cli WIDTH = 640 HEIGHT = 480 @@ -12,6 +12,8 @@ async def entrypoint(job: JobContext): + await job.connect() + room = job.room source = rtc.VideoSource(WIDTH, HEIGHT) track = rtc.LocalVideoTrack.create_video_track("single-color", source) @@ -31,9 +33,5 @@ async def _draw_color(): asyncio.create_task(_draw_color()) -async def request_fnc(req: JobRequest) -> None: - await req.accept(entrypoint) - - if __name__ == "__main__": - cli.run_app(WorkerOptions(request_fnc=request_fnc)) + cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint)) diff --git a/examples/speech-to-text/deepgram_stt.py b/examples/speech-to-text/deepgram_stt.py index 53dd09405..b1a7a3ccb 100644 --- a/examples/speech-to-text/deepgram_stt.py +++ b/examples/speech-to-text/deepgram_stt.py @@ -1,10 +1,9 @@ import asyncio import logging -from livekit import agents, rtc +from livekit import rtc from livekit.agents import ( JobContext, - JobRequest, WorkerOptions, cli, stt, @@ -45,6 +44,8 @@ async def transcribe_track(participant: rtc.RemoteParticipant, track: rtc.Track) async for ev in audio_stream: stt_stream.push_frame(ev.frame) + await job.connect(auto_subscribe="audio_only") + @job.room.on("track_subscribed") def on_track_subscribed( track: rtc.Track, @@ -55,9 +56,5 @@ def on_track_subscribed( tasks.append(asyncio.create_task(transcribe_track(participant, track))) -async def request_fnc(req: JobRequest) -> None: - await req.accept(entrypoint, auto_subscribe=agents.AutoSubscribe.AUDIO_ONLY) - - if __name__ == "__main__": - cli.run_app(WorkerOptions(request_fnc=request_fnc)) + cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint)) diff --git a/examples/text-to-speech/elevenlabs_tts.py b/examples/text-to-speech/elevenlabs_tts.py index 4d6abdac0..375389be1 100644 --- a/examples/text-to-speech/elevenlabs_tts.py +++ b/examples/text-to-speech/elevenlabs_tts.py @@ -3,7 +3,7 @@ from typing import Optional from livekit import rtc -from livekit.agents import JobContext, JobRequest, WorkerOptions, cli, tts +from livekit.agents import JobContext, WorkerOptions, cli from livekit.plugins import elevenlabs @@ -46,17 +46,19 @@ async def entrypoint(job: JobContext): track = rtc.LocalAudioTrack.create_audio_track("agent-mic", source) options = rtc.TrackPublishOptions() options.source = rtc.TrackSource.SOURCE_MICROPHONE + + await job.connect() await job.room.local_participant.publish_track(track, options) await asyncio.sleep(1) logging.info('Saying "Bonjour, comment allez-vous?"') async for output in tts_11labs.synthesize("Bonjour, comment allez-vous?"): - await source.capture_frame(output.data) + await source.capture_frame(output.frame) await asyncio.sleep(1) logging.info('Saying "Au revoir."') async for output in tts_11labs.synthesize("Au revoir."): - await source.capture_frame(output.data) + await source.capture_frame(output.frame) await asyncio.sleep(1) streamed_text = ( @@ -69,31 +71,23 @@ async def entrypoint(job: JobContext): ): # split into chunk just for the demonstration stream.push_text(chunk) - stream.mark_segment_end() + stream.flush() + stream.end_input() playout_q = asyncio.Queue[Optional[rtc.AudioFrame]]() async def _synth_task(): async for ev in stream: - if ev.type != tts.SynthesisEventType.AUDIO: - continue - assert ev.audio is not None - - playout_q.put_nowait(ev.audio.data) + playout_q.put_nowait(ev.frame) playout_q.put_nowait(None) synth_task = asyncio.create_task(_synth_task()) playout_task = asyncio.create_task(_playout_task(playout_q, source)) - await stream.aclose(wait=True) await asyncio.gather(synth_task, playout_task) - - -async def request_fnc(req: JobRequest) -> None: - logging.info("received request %s", req) - await req.accept(entrypoint) + await stream.aclose() if __name__ == "__main__": - cli.run_app(WorkerOptions(request_fnc=request_fnc)) + cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint)) diff --git a/examples/text-to-speech/openai_tts.py b/examples/text-to-speech/openai_tts.py index e61963c06..d65c8b4dc 100644 --- a/examples/text-to-speech/openai_tts.py +++ b/examples/text-to-speech/openai_tts.py @@ -2,7 +2,7 @@ import logging from livekit import rtc -from livekit.agents import JobContext, JobRequest, WorkerOptions, cli +from livekit.agents import JobContext, WorkerOptions, cli from livekit.plugins import openai @@ -15,23 +15,20 @@ async def entrypoint(job: JobContext): track = rtc.LocalAudioTrack.create_audio_track("agent-mic", source) options = rtc.TrackPublishOptions() options.source = rtc.TrackSource.SOURCE_MICROPHONE + + await job.connect() await job.room.local_participant.publish_track(track, options) await asyncio.sleep(1) logging.info('Saying "Hello!"') async for output in tts.synthesize("Hello!"): - await source.capture_frame(output.data) + await source.capture_frame(output.frame) await asyncio.sleep(1) logging.info('Saying "Goodbye."') async for output in tts.synthesize("Goodbye."): - await source.capture_frame(output.data) - - -async def request_fnc(req: JobRequest) -> None: - logging.info("received request %s", req) - await req.accept(entrypoint) + await source.capture_frame(output.frame) if __name__ == "__main__": - cli.run_app(WorkerOptions(request_fnc=request_fnc)) + cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint)) diff --git a/examples/text-to-speech/sync_tts_transcription.py b/examples/text-to-speech/sync_tts_transcription.py index 31d803480..0c54181ef 100644 --- a/examples/text-to-speech/sync_tts_transcription.py +++ b/examples/text-to-speech/sync_tts_transcription.py @@ -5,7 +5,6 @@ from livekit import rtc from livekit.agents import ( JobContext, - JobRequest, WorkerOptions, cli, transcription, @@ -58,7 +57,7 @@ async def _eg_streamed_tts_stream( tts_stream.push_text(chunk) tts_forwarder.push_text(chunk) - tts_stream.mark_segment_end() + tts_stream.flush() tts_forwarder.mark_text_segment_end() second_streamed_text = "This is another segment that will be streamed" @@ -67,25 +66,23 @@ async def _eg_streamed_tts_stream( tts_stream.push_text(chunk) tts_forwarder.push_text(chunk) - tts_stream.mark_segment_end() + tts_stream.flush() + tts_stream.end_input() tts_forwarder.mark_text_segment_end() playout_q = asyncio.Queue[Optional[rtc.AudioFrame]]() async def _synth_task(): async for ev in tts_stream: - if ev.type != tts.SynthesisEventType.AUDIO: - continue - assert ev.audio is not None - playout_q.put_nowait(ev.audio.data) + playout_q.put_nowait(ev.frame) playout_q.put_nowait(None) synth_task = asyncio.create_task(_synth_task()) playout_task = asyncio.create_task(_playout_task(playout_q, source)) - await tts_stream.aclose(wait=True) await asyncio.gather(synth_task, playout_task) + await tts_stream.aclose() await tts_forwarder.aclose() @@ -107,8 +104,8 @@ async def _eg_single_segment( playout_task = asyncio.create_task(_playout_task(playout_q, source)) async for output in tts_11labs.synthesize(text): - tts_forwarder.push_audio(output.data) - playout_q.put_nowait(output.data) + tts_forwarder.push_audio(output.frame) + playout_q.put_nowait(output.frame) tts_forwarder.mark_audio_segment_end() playout_q.put_nowait(None) @@ -122,7 +119,7 @@ async def _eg_deferred_playout( ): """example with deferred playout (We have a synthesized audio before starting to play it)""" tts_forwarder = transcription.TTSSegmentsForwarder( - room=ctx.room, participant=ctx.room.local_participant, auto_playout=False + room=ctx.room, participant=ctx.room.local_participant ) text = "Hello world, this is a single segment with deferred playout" @@ -133,8 +130,8 @@ async def _eg_deferred_playout( playout_q = asyncio.Queue[Optional[rtc.AudioFrame]]() async for output in tts_11labs.synthesize(text): - tts_forwarder.push_audio(output.data) - playout_q.put_nowait(output.data) + tts_forwarder.push_audio(output.frame) + playout_q.put_nowait(output.frame) tts_forwarder.mark_audio_segment_end() @@ -145,6 +142,7 @@ async def _eg_deferred_playout( playout_task = asyncio.create_task(_playout_task(playout_q, source)) playout_q.put_nowait(None) + tts_forwarder.segment_playout_finished() await playout_task await tts_forwarder.aclose() @@ -158,6 +156,8 @@ async def entrypoint(ctx: JobContext): source = rtc.AudioSource(tts_11labs.sample_rate, tts_11labs.num_channels) track = rtc.LocalAudioTrack.create_audio_track("agent-mic", source) options = rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE) + + await ctx.connect() await ctx.room.local_participant.publish_track(track, options) # start the transcription examples @@ -169,10 +169,5 @@ async def entrypoint(ctx: JobContext): await _eg_deferred_playout(ctx, tts_11labs, source) -async def request_fnc(req: JobRequest) -> None: - logging.info("received request %s", req) - await req.accept(entrypoint) - - if __name__ == "__main__": - cli.run_app(WorkerOptions(request_fnc=request_fnc)) + cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint)) diff --git a/examples/voice-assistant/function_calling.py b/examples/voice-assistant/function_calling.py index 64ae7c954..0f0fe81ed 100644 --- a/examples/voice-assistant/function_calling.py +++ b/examples/voice-assistant/function_calling.py @@ -3,7 +3,7 @@ import logging from typing import Annotated -from livekit.agents import JobContext, JobRequest, WorkerOptions, cli, llm +from livekit.agents import JobContext, WorkerOptions, cli, llm from livekit.agents.voice_assistant import VoiceAssistant from livekit.plugins import deepgram, openai, silero @@ -95,6 +95,8 @@ async def _will_synthesize_assistant_reply( will_synthesize_assistant_reply=_will_synthesize_assistant_reply, ) + await ctx.connect() + # Start the assistant. This will automatically publish a microphone track and listen to the first participant # it finds in the current room. If you need to specify a particular participant, use the participant parameter. assistant.start(ctx.room) @@ -103,10 +105,5 @@ async def _will_synthesize_assistant_reply( await assistant.say("Hey, how can I help you today?") -async def request_fnc(req: JobRequest) -> None: - logging.info("received request %s", req) - await req.accept(entrypoint) - - if __name__ == "__main__": - cli.run_app(WorkerOptions(request_fnc)) + cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint)) diff --git a/examples/voice-assistant/simple-rag/assistant.py b/examples/voice-assistant/simple-rag/assistant.py index 2bcf9e88e..fd89e6a44 100644 --- a/examples/voice-assistant/simple-rag/assistant.py +++ b/examples/voice-assistant/simple-rag/assistant.py @@ -1,8 +1,7 @@ import asyncio -import logging import pickle -from livekit.agents import JobContext, JobRequest, WorkerOptions, cli, llm +from livekit.agents import JobContext, WorkerOptions, cli, llm from livekit.agents.voice_assistant import VoiceAssistant from livekit.plugins import deepgram, openai, rag, silero @@ -51,16 +50,13 @@ async def _will_synthesize_assistant_answer( will_synthesize_assistant_reply=_will_synthesize_assistant_answer, plotting=True, ) + + await ctx.connect() assistant.start(ctx.room) await asyncio.sleep(1) await assistant.say("Hey, how can I help you today?", allow_interruptions=True) -async def request_fnc(req: JobRequest) -> None: - logging.info("received request %s", req) - await req.accept(entrypoint) - - if __name__ == "__main__": - cli.run_app(WorkerOptions(request_fnc)) + cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))