diff --git a/CHANGELOG.md b/CHANGELOG.md index 7c1b0b8..e3e155f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,25 @@ All notable changes to **Pipecat Flows** will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.0.7] - 2024-12-06 + +### Added + +- New `transition_to` field for static flows + - Combines function handlers with state transitions + - Supports all LLM providers (OpenAI, Anthropic, Gemini) + - Static examples updated to use this new transition + +### Changed + +- Static flow transitions now use `transition_to` instead of matching function names + - Before: Function name had to match target node name + - After: Function explicitly declares target via `transition_to` + +### Fixed + +- Duplicate LLM responses during transitions + ## [0.0.6] - 2024-12-02 ### Added diff --git a/README.md b/README.md index 00a6c0e..2704bdd 100644 --- a/README.md +++ b/README.md @@ -31,9 +31,9 @@ If you're starting fresh: pip install pipecat-ai-flows # Install Pipecat with specific LLM provider options: -pip install "pipecat-ai[daily,openai,deepgram]" # For OpenAI -pip install "pipecat-ai[daily,anthropic,deepgram]" # For Anthropic -pip install "pipecat-ai[daily,google,deepgram]" # For Google +pip install "pipecat-ai[daily,openai,deepgram,cartesia]" # For OpenAI +pip install "pipecat-ai[daily,anthropic,deepgram,cartesia]" # For Anthropic +pip install "pipecat-ai[daily,google,deepgram,cartesia]" # For Google ``` ## Quick Start @@ -93,14 +93,15 @@ Functions come in two types: "type": "function", "function": { "name": "select_size", - "handler": select_size_handler, # Required for node functions + "handler": select_size_handler, "description": "Select pizza size", "parameters": { "type": "object", "properties": { "size": {"type": "string", "enum": ["small", "medium", "large"]} } - } + }, + "transition_to": "next_node" # Optional: Specify next node } } ``` @@ -111,13 +112,21 @@ Functions come in two types: { "type": "function", "function": { - "name": "next_node", # Must match a node name + "name": "next_step", "description": "Move to next state", - "parameters": {"type": "object", "properties": {}} + "parameters": {"type": "object", "properties": {}}, + "transition_to": "target_node" # Required: Specify target node } } ``` +Functions can: + +- Have a handler (for data processing) +- Have a transition_to (for state changes) +- Have both (process data and transition) +- Have neither (end node functions) + #### Actions Actions execute during state transitions: @@ -183,7 +192,16 @@ flow_config = { "nodes": { "greeting": { "messages": [...], - "functions": [...] + "functions": [{ + "type": "function", + "function": { + "name": "collect_name", + "description": "Record user's name", + "parameters": {...}, + "handler": collect_name_handler, # Specify handler + "transition_to": "next_step" # Specify transition + } + }] } } } @@ -250,16 +268,16 @@ To run these examples: Install Pipecat with required options for examples: ```bash - pip install "pipecat-ai[daily,openai,deepgram,silero,examples]" + pip install "pipecat-ai[daily,openai,deepgram,cartesia,silero,examples]" ``` If you're running Google or Anthropic examples, you will need to update the installed options. For example: ```bash # Install Google Gemini - pip install "pipecat-ai[daily,google,deepgram,silero,examples]" + pip install "pipecat-ai[daily,google,deepgram,cartesia,silero,examples]" # Install Anthropic - pip install "pipecat-ai[daily,anthropic,deepgram,silero,examples]" + pip install "pipecat-ai[daily,anthropic,deepgram,cartesia,silero,examples]" ``` 3. **Configuration**: @@ -273,6 +291,7 @@ To run these examples: Add your API keys and configuration: - DEEPGRAM_API_KEY + - CARTESIA_API_KEY - OPENAI_API_KEY - ANTHROPIC_API_KEY - GOOGLE_API_KEY @@ -409,7 +428,6 @@ Open the page in your browser: http://localhost:5173. The `editor/examples/` directory contains sample flow configurations: - `food_ordering.json` -- `movie_booking.json` - `movie_explorer.py` - `patient_intake.json` - `restaurant_reservation.json` diff --git a/editor/examples/food_ordering.json b/editor/examples/food_ordering.json index 923d0d8..dbb57cb 100644 --- a/editor/examples/food_ordering.json +++ b/editor/examples/food_ordering.json @@ -13,22 +13,24 @@ "type": "function", "function": { "name": "choose_pizza", - "description": "User wants to order pizza", + "description": "User wants to order pizza. Let's get that order started.", "parameters": { "type": "object", "properties": {} - } + }, + "transition_to": "choose_pizza" } }, { "type": "function", "function": { "name": "choose_sushi", - "description": "User wants to order sushi", + "description": "User wants to order sushi. Let's get that order started.", "parameters": { "type": "object", "properties": {} - } + }, + "transition_to": "choose_sushi" } } ] @@ -37,15 +39,16 @@ "messages": [ { "role": "system", - "content": "You are handling a pizza order. Use the available functions:\n - Use select_pizza_size when the user specifies a size (can be used multiple times if they change their mind or want to order multiple pizzas)\n - Use the end function ONLY when the user confirms they are done with their order\n\nAfter each size selection, confirm the selection and ask if they want to change it or complete their order. Only use the end function after the user confirms they are satisfied with their order.\n\nStart off by acknowledging the user's choice. Once they've chosen a size, ask if they'd like anything else. Remember to be friendly and casual." + "content": "You are handling a pizza order. Use the available functions:\n\n- Use select_pizza_order when the user specifies both size AND type\n\n- Use confirm_order when the user confirms they are satisfied with their selection\n\nPricing:\n\n- Small: $10\n\n- Medium: $15\n\n- Large: $20\n\nAfter selection, confirm both the size and type, state the price, and ask if they want to confirm their order. Remember to be friendly and casual." } ], "functions": [ { "type": "function", "function": { - "name": "select_pizza_size", - "description": "Record the selected pizza size", + "name": "select_pizza_order", + "handler": "select_pizza_order", + "description": "Record the pizza order details", "parameters": { "type": "object", "properties": { @@ -53,44 +56,45 @@ "type": "string", "enum": ["small", "medium", "large"], "description": "Size of the pizza" + }, + "type": { + "type": "string", + "enum": ["pepperoni", "cheese", "supreme", "vegetarian"], + "description": "Type of pizza" } }, - "required": ["size"] + "required": ["size", "type"] } } }, { "type": "function", "function": { - "name": "end", - "description": "Complete the order (use only after user confirms)", + "name": "confirm_order", + "description": "Proceed to order confirmation", "parameters": { "type": "object", "properties": {} - } + }, + "transition_to": "confirm" } } - ], - "pre_actions": [ - { - "type": "tts_say", - "text": "Ok, let me help you with your pizza order..." - } ] }, "choose_sushi": { "messages": [ { "role": "system", - "content": "You are handling a sushi order. Use the available functions:\n - Use select_roll_count when the user specifies how many rolls (can be used multiple times if they change their mind or if they want to order multiple sushi rolls)\n - Use the end function ONLY when the user confirms they are done with their order\n\nAfter each roll count selection, confirm the count and ask if they want to change it or complete their order. Only use the end function after the user confirms they are satisfied with their order.\n\nStart off by acknowledging the user's choice. Once they've chosen a size, ask if they'd like anything else. Remember to be friendly and casual." + "content": "You are handling a sushi order. Use the available functions:\n\n- Use select_sushi_order when the user specifies both count AND type\n\n- Use confirm_order when the user confirms they are satisfied with their selection\n\nPricing:\n\n- $8 per roll\n\nAfter selection, confirm both the count and type, state the price, and ask if they want to confirm their order. Remember to be friendly and casual." } ], "functions": [ { "type": "function", "function": { - "name": "select_roll_count", - "description": "Record the number of sushi rolls", + "name": "select_sushi_order", + "handler": "select_sushi_order", + "description": "Record the sushi order details", "parameters": { "type": "object", "properties": { @@ -99,28 +103,50 @@ "minimum": 1, "maximum": 10, "description": "Number of rolls to order" + }, + "type": { + "type": "string", + "enum": ["california", "spicy tuna", "rainbow", "dragon"], + "description": "Type of sushi roll" } }, - "required": ["count"] + "required": ["count", "type"] } } }, { "type": "function", "function": { - "name": "end", - "description": "Complete the order (use only after user confirms)", + "name": "confirm_order", + "description": "Proceed to order confirmation", "parameters": { "type": "object", "properties": {} - } + }, + "transition_to": "confirm" } } + ] + }, + "confirm": { + "messages": [ + { + "role": "system", + "content": "Read back the complete order details to the user and ask for final confirmation. Use the available functions:\n\n- Use complete_order when the user confirms\n\n- Use revise_order if they want to change something\n\nBe friendly and clear when reading back the order details." + } ], - "pre_actions": [ + "functions": [ { - "type": "tts_say", - "text": "Ok, let me help you with your sushi order..." + "type": "function", + "function": { + "name": "complete_order", + "description": "User confirms the order is correct", + "parameters": { + "type": "object", + "properties": {} + }, + "transition_to": "end" + } } ] }, @@ -128,16 +154,10 @@ "messages": [ { "role": "system", - "content": "The order is complete. Thank the user and end the conversation." + "content": "Concisely end the conversation—1-3 words is appropriate. Just say 'Bye' or something similarly short." } ], "functions": [], - "pre_actions": [ - { - "type": "tts_say", - "text": "Thank you for your order! Goodbye!" - } - ], "post_actions": [ { "type": "end_conversation" diff --git a/editor/examples/movie_explorer.json b/editor/examples/movie_explorer.json index 9990b8a..a2c7401 100644 --- a/editor/examples/movie_explorer.json +++ b/editor/examples/movie_explorer.json @@ -5,45 +5,43 @@ "messages": [ { "role": "system", - "content": "You are a helpful movie expert. Start by greeting the user and asking if they'd like to know what movies are currently playing in theaters. Use get_movies to fetch the current movies when they're interested." + "content": "You are a helpful movie expert. Start by greeting the user and asking if they'd like to know about movies currently in theaters or upcoming releases. Wait for their choice before using either get_current_movies or get_upcoming_movies." } ], "functions": [ { "type": "function", "function": { - "name": "get_movies", - "description": "Fetch currently playing movies", + "name": "get_current_movies", + "handler": "get_movies", + "description": "Fetch movies currently playing in theaters", "parameters": { "type": "object", "properties": {} - } + }, + "transition_to": "explore_movie" } }, { "type": "function", "function": { - "name": "explore_movie", - "description": "Move to movie exploration", + "name": "get_upcoming_movies", + "handler": "get_upcoming_movies", + "description": "Fetch movies coming soon to theaters", "parameters": { "type": "object", "properties": {} - } + }, + "transition_to": "explore_movie" } } - ], - "pre_actions": [ - { - "type": "tts_say", - "text": "Welcome! I can tell you about movies currently playing in theaters." - } ] }, "explore_movie": { "messages": [ { "role": "system", - "content": "Help the user learn more about movies. Use get_movie_details when they express interest in a specific movie - this will show details including cast, runtime, and rating. After showing details, you can use get_similar_movies if they want recommendations. Ask if they'd like to explore another movie (use explore_movie) or end the conversation." + "content": "Help the user learn more about movies. You can:\n\n- Use get_movie_details when they express interest in a specific movie\n\n- Use get_similar_movies to show recommendations\n\n- Use get_current_movies to see what's playing now\n\n- Use get_upcoming_movies to see what's coming soon\n\n- Use end_conversation when they're done exploring\n\nAfter showing details or recommendations, ask if they'd like to explore another movie or end the conversation." } ], "functions": [ @@ -51,6 +49,7 @@ "type": "function", "function": { "name": "get_movie_details", + "handler": "get_movie_details", "description": "Get details about a specific movie including cast", "parameters": { "type": "object", @@ -68,6 +67,7 @@ "type": "function", "function": { "name": "get_similar_movies", + "handler": "get_similar_movies", "description": "Get similar movies as recommendations", "parameters": { "type": "object", @@ -84,8 +84,9 @@ { "type": "function", "function": { - "name": "explore_movie", - "description": "Return to current movies list", + "name": "get_current_movies", + "handler": "get_movies", + "description": "Show current movies in theaters", "parameters": { "type": "object", "properties": {} @@ -95,13 +96,26 @@ { "type": "function", "function": { - "name": "end", - "description": "End the conversation", + "name": "get_upcoming_movies", + "handler": "get_upcoming_movies", + "description": "Show movies coming soon", "parameters": { "type": "object", "properties": {} } } + }, + { + "type": "function", + "function": { + "name": "end_conversation", + "description": "End the conversation", + "parameters": { + "type": "object", + "properties": {} + }, + "transition_to": "end" + } } ] }, @@ -109,16 +123,10 @@ "messages": [ { "role": "system", - "content": "Thank the user and end the conversation." + "content": "Thank the user warmly and mention they can return anytime to discover more movies." } ], "functions": [], - "pre_actions": [ - { - "type": "tts_say", - "text": "Thanks for exploring movies with me! Goodbye!" - } - ], "post_actions": [ { "type": "end_conversation" diff --git a/editor/examples/patient_intake.json b/editor/examples/patient_intake.json index f00c990..7c7e026 100644 --- a/editor/examples/patient_intake.json +++ b/editor/examples/patient_intake.json @@ -5,7 +5,7 @@ "messages": [ { "role": "system", - "content": "Start by introducing yourself to Chad Bailey, then ask for their date of birth, including the year. Once they provide their birthday, use verify_birthday to check it. If the birthday is correct (1983-01-01), use get_prescriptions to proceed." + "content": "Start by introducing yourself to Chad Bailey, then ask for their date of birth, including the year. Once they provide their birthday, use verify_birthday to check it. If verified (1983-01-01), proceed to prescriptions." } ], "functions": [ @@ -13,7 +13,8 @@ "type": "function", "function": { "name": "verify_birthday", - "description": "Verify the user has provided their correct birthday", + "handler": "verify_birthday", + "description": "Verify the user has provided their correct birthday. Once confirmed, the next step is to recording the user's prescriptions.", "parameters": { "type": "object", "properties": { @@ -23,25 +24,9 @@ } }, "required": ["birthday"] - } + }, + "transition_to": "get_prescriptions" } - }, - { - "type": "function", - "function": { - "name": "get_prescriptions", - "description": "Proceed to collecting prescriptions and ask the user what medications they're taking", - "parameters": { - "type": "object", - "properties": {} - } - } - } - ], - "pre_actions": [ - { - "type": "tts_say", - "text": "Hello, I'm Jessica from Tri-County Health Services." } ] }, @@ -49,7 +34,7 @@ "messages": [ { "role": "system", - "content": "This step is for collecting a user's prescription. Ask them what presceriptions they're taking, including the dosage. Use the available functions:\n - Use record_prescriptions when the user lists their medications (must include both name and dosage)\n - Use get_allergies once all prescriptions are recorded\n\nAsk them what prescriptions they're currently taking, making sure to get both medication names and dosages. After they provide their prescriptions (or confirm they have none), acknowledge their response and proceed to allergies" + "content": "This step is for collecting prescriptions. Ask them what prescriptions they're taking, including the dosage. After recording prescriptions (or confirming none), proceed to allergies." } ], "functions": [ @@ -57,7 +42,8 @@ "type": "function", "function": { "name": "record_prescriptions", - "description": "Record the user's prescriptions", + "handler": "record_prescriptions", + "description": "Record the user's prescriptions. Once confirmed, the next step is to collect allergy information.", "parameters": { "type": "object", "properties": { @@ -80,18 +66,8 @@ } }, "required": ["prescriptions"] - } - } - }, - { - "type": "function", - "function": { - "name": "get_allergies", - "description": "Proceed to collecting allergies and ask the user if they have any allergies", - "parameters": { - "type": "object", - "properties": {} - } + }, + "transition_to": "get_allergies" } } ] @@ -100,7 +76,7 @@ "messages": [ { "role": "system", - "content": "You are collecting allergy information. Use the available functions:\n - Use record_allergies when the user lists their allergies (or confirms they have none)\n - Use get_conditions once allergies are recorded\n\nAsk them about any allergies they have. After they list their allergies (or confirm they have none), acknowledge their response and ask about any medical conditions they have." + "content": "Collect allergy information. Ask about any allergies they have. After recording allergies (or confirming none), proceed to medical conditions." } ], "functions": [ @@ -108,7 +84,8 @@ "type": "function", "function": { "name": "record_allergies", - "description": "Record the user's allergies", + "handler": "record_allergies", + "description": "Record the user's allergies. Once confirmed, then next step is to collect medical conditions.", "parameters": { "type": "object", "properties": { @@ -127,18 +104,8 @@ } }, "required": ["allergies"] - } - } - }, - { - "type": "function", - "function": { - "name": "get_conditions", - "description": "Proceed to collecting medical conditions and ask the user if they have any medical conditions", - "parameters": { - "type": "object", - "properties": {} - } + }, + "transition_to": "get_conditions" } } ] @@ -147,7 +114,7 @@ "messages": [ { "role": "system", - "content": "You are collecting medical condition information. Use the available functions:\n - Use record_conditions when the user lists their conditions (or confirms they have none)\n - Use get_visit_reasons once conditions are recorded\n\nAsk them about any medical conditions they have. After they list their conditions (or confirm they have none), acknowledge their response and ask about the reason for their visit today." + "content": "Collect medical condition information. Ask about any medical conditions they have. After recording conditions (or confirming none), proceed to visit reasons." } ], "functions": [ @@ -155,7 +122,8 @@ "type": "function", "function": { "name": "record_conditions", - "description": "Record the user's medical conditions", + "handler": "record_conditions", + "description": "Record the user's medical conditions. Once confirmed, the next step is to collect visit reasons.", "parameters": { "type": "object", "properties": { @@ -174,18 +142,8 @@ } }, "required": ["conditions"] - } - } - }, - { - "type": "function", - "function": { - "name": "get_visit_reasons", - "description": "Proceed to collecting visit reasons and ask the user why they're visiting", - "parameters": { - "type": "object", - "properties": {} - } + }, + "transition_to": "get_visit_reasons" } } ] @@ -194,7 +152,7 @@ "messages": [ { "role": "system", - "content": "You are collecting information about the reason for their visit. Use the available functions:\n - Use record_visit_reasons when they explain their reasons\n - Use verify_information once reasons are recorded\n\nAsk them what brings them to the doctor today. After they explain their reasons, acknowledge their response and let them know you'll review all the information they've provided." + "content": "Collect information about the reason for their visit. Ask what brings them to the doctor today. After recording their reasons, proceed to verification." } ], "functions": [ @@ -202,7 +160,8 @@ "type": "function", "function": { "name": "record_visit_reasons", - "description": "Record the reasons for their visit", + "handler": "record_visit_reasons", + "description": "Record the reasons for their visit. Once confirmed, the next step is to verify all information.", "parameters": { "type": "object", "properties": { @@ -221,79 +180,66 @@ } }, "required": ["visit_reasons"] - } - } - }, - { - "type": "function", - "function": { - "name": "verify_information", - "description": "Proceed to information verification, repeat the information back to the user, and have them verify", - "parameters": { - "type": "object", - "properties": {} - } + }, + "transition_to": "verify" } } ] }, - "verify_information": { + "verify": { "messages": [ { "role": "system", - "content": "Review all collected information. Summarize their prescriptions, allergies, conditions, and visit reasons. Use the available functions:\n - Use get_prescriptions if they need to make any changes\n - Use confirm_intake if they confirm everything is correct\n\nBe thorough in reviewing all details and ask for their explicit confirmation." + "content": "Review all collected information with the patient. Follow these steps:\n\n1. Summarize their prescriptions, allergies, conditions, and visit reasons\n\n2. Ask if everything is correct\n\n3. Use the appropriate function based on their response\n\nBe thorough in reviewing all details and wait for explicit confirmation." } ], "functions": [ { "type": "function", "function": { - "name": "get_prescriptions", + "name": "revise_information", "description": "Return to prescriptions to revise information", "parameters": { "type": "object", "properties": {} - } + }, + "transition_to": "get_prescriptions" } }, { "type": "function", "function": { - "name": "confirm_intake", - "description": "Confirm the intake information and proceed", + "name": "confirm_information", + "description": "Proceed with confirmed information", "parameters": { "type": "object", "properties": {} - } + }, + "transition_to": "confirm" } } ] }, - "confirm_intake": { + "confirm": { "messages": [ { "role": "system", - "content": "The intake information is confirmed. Thank them for providing their information, let them know what to expect next, and use end to complete the conversation." + "content": "Once confirmed, thank them, then use the complete_intake function to end the conversation." } ], "functions": [ { "type": "function", "function": { - "name": "end", - "description": "End the conversation", + "name": "complete_intake", + "description": "Complete the intake process", "parameters": { "type": "object", "properties": {} - } + }, + "transition_to": "end" } } - ], - "pre_actions": [ - { - "type": "tts_say", - "text": "Perfect! I've recorded all your information for your visit." - } ] }, "end": { @@ -304,12 +250,6 @@ } ], "functions": [], - "pre_actions": [ - { - "type": "tts_say", - "text": "Thank you for providing your information! We'll see you soon for your visit." - } - ], "post_actions": [ { "type": "end_conversation" diff --git a/editor/examples/restaurant_reservation.json b/editor/examples/restaurant_reservation.json index cf66291..724ad4b 100644 --- a/editor/examples/restaurant_reservation.json +++ b/editor/examples/restaurant_reservation.json @@ -13,6 +13,7 @@ "type": "function", "function": { "name": "record_party_size", + "handler": "record_party_size", "description": "Record the number of people in the party", "parameters": { "type": "object", @@ -24,18 +25,8 @@ } }, "required": ["size"] - } - } - }, - { - "type": "function", - "function": { - "name": "get_time", - "description": "Proceed to time selection", - "parameters": { - "type": "object", - "properties": {} - } + }, + "transition_to": "get_time" } } ] @@ -44,7 +35,7 @@ "messages": [ { "role": "system", - "content": "Ask what time they'd like to dine. Restaurant is open 5 PM to 10 PM." + "content": "Ask what time they'd like to dine. Restaurant is open 5 PM to 10 PM. After they provide a time, confirm it's within operating hours before recording. Use 24-hour format for internal recording (e.g., 17:00 for 5 PM)." } ], "functions": [ @@ -52,28 +43,20 @@ "type": "function", "function": { "name": "record_time", + "handler": "record_time", "description": "Record the requested time", "parameters": { "type": "object", "properties": { "time": { "type": "string", - "pattern": "^([0-1][0-9]|2[0-3]):[0-5][0-9]$" + "pattern": "^(17|18|19|20|21|22):([0-5][0-9])$", + "description": "Reservation time in 24-hour format (17:00-22:00)" } }, "required": ["time"] - } - } - }, - { - "type": "function", - "function": { - "name": "confirm", - "description": "Proceed to confirmation", - "parameters": { - "type": "object", - "properties": {} - } + }, + "transition_to": "confirm" } } ] @@ -94,7 +77,8 @@ "parameters": { "type": "object", "properties": {} - } + }, + "transition_to": "end" } } ] diff --git a/editor/examples/travel_planner.json b/editor/examples/travel_planner.json index 423074a..7b566db 100644 --- a/editor/examples/travel_planner.json +++ b/editor/examples/travel_planner.json @@ -17,7 +17,8 @@ "parameters": { "type": "object", "properties": {} - } + }, + "transition_to": "choose_beach" } }, { @@ -28,22 +29,17 @@ "parameters": { "type": "object", "properties": {} - } + }, + "transition_to": "choose_mountain" } } - ], - "pre_actions": [ - { - "type": "tts_say", - "text": "Welcome to Dream Vacations! I'll help you plan your perfect getaway." - } ] }, "choose_beach": { "messages": [ { "role": "system", - "content": "You are handling beach vacation planning. Use the available functions:\n - Use select_destination when the user chooses their preferred beach location\n - Use get_dates once they've selected a destination\n\nAvailable beach destinations are: 'Maui', 'Cancun', or 'Maldives'. After they choose, confirm their selection and proceed to dates. Be enthusiastic and paint a picture of each destination." + "content": "You are handling beach vacation planning. Use the available functions:\n - Use select_destination when the user chooses their preferred beach location\n - After destination is selected, dates will be collected automatically\n\nAvailable beach destinations are: 'Maui', 'Cancun', or 'Maldives'. After they choose, confirm their selection. Be enthusiastic and paint a picture of each destination." } ], "functions": [ @@ -51,6 +47,7 @@ "type": "function", "function": { "name": "select_destination", + "handler": "select_destination", "description": "Record the selected beach destination", "parameters": { "type": "object", @@ -62,33 +59,17 @@ } }, "required": ["destination"] - } - } - }, - { - "type": "function", - "function": { - "name": "get_dates", - "description": "Proceed to date selection", - "parameters": { - "type": "object", - "properties": {} - } + }, + "transition_to": "get_dates" } } - ], - "pre_actions": [ - { - "type": "tts_say", - "text": "Let's find your perfect beach paradise..." - } ] }, "choose_mountain": { "messages": [ { "role": "system", - "content": "You are handling mountain retreat planning. Use the available functions:\n - Use select_destination when the user chooses their preferred mountain location\n - Use get_dates once they've selected a destination\n\nAvailable mountain destinations are: 'Swiss Alps', 'Rocky Mountains', or 'Himalayas'. After they choose, confirm their selection and proceed to dates. Be enthusiastic and paint a picture of each destination." + "content": "You are handling mountain retreat planning. Use the available functions:\n - Use select_destination when the user chooses their preferred mountain location\n - After destination is selected, dates will be collected automatically\n\nAvailable mountain destinations are: 'Swiss Alps', 'Rocky Mountains', or 'Himalayas'. After they choose, confirm their selection. Be enthusiastic and paint a picture of each destination." } ], "functions": [ @@ -96,6 +77,7 @@ "type": "function", "function": { "name": "select_destination", + "handler": "select_destination", "description": "Record the selected mountain destination", "parameters": { "type": "object", @@ -107,25 +89,9 @@ } }, "required": ["destination"] - } + }, + "transition_to": "get_dates" } - }, - { - "type": "function", - "function": { - "name": "get_dates", - "description": "Proceed to date selection", - "parameters": { - "type": "object", - "properties": {} - } - } - } - ], - "pre_actions": [ - { - "type": "tts_say", - "text": "Let's find your perfect mountain getaway..." } ] }, @@ -133,7 +99,7 @@ "messages": [ { "role": "system", - "content": "Handle travel date selection. Use the available functions:\n - Use record_dates when the user specifies their travel dates (can be used multiple times if they change their mind)\n - Use get_activities once dates are confirmed\n\nAsk for their preferred travel dates within the next 6 months. After recording dates, confirm the selection and proceed to activities." + "content": "Handle travel date selection. Use the available functions:\n - Use record_dates when the user specifies their travel dates (can be used multiple times if they change their mind)\n - After dates are recorded, activities will be collected automatically\n\nAsk for their preferred travel dates within the next 6 months. After recording dates, confirm the selection." } ], "functions": [ @@ -141,6 +107,7 @@ "type": "function", "function": { "name": "record_dates", + "handler": "record_dates", "description": "Record the selected travel dates", "parameters": { "type": "object", @@ -157,18 +124,8 @@ } }, "required": ["check_in", "check_out"] - } - } - }, - { - "type": "function", - "function": { - "name": "get_activities", - "description": "Proceed to activity selection", - "parameters": { - "type": "object", - "properties": {} - } + }, + "transition_to": "get_activities" } } ] @@ -177,7 +134,7 @@ "messages": [ { "role": "system", - "content": "Handle activity preferences. Use the available functions:\n - Use record_activities to save their activity preferences\n - Use verify_itinerary once activities are selected\n\nFor beach destinations, suggest: snorkeling, surfing, sunset cruise\nFor mountain destinations, suggest: hiking, skiing, mountain biking\n\nAfter they choose, confirm their selections and proceed to verification." + "content": "Handle activity preferences. Use the available functions:\n - Use record_activities to save their activity preferences\n - After activities are recorded, verification will happen automatically\n\nFor beach destinations, suggest: snorkeling, surfing, sunset cruise\nFor mountain destinations, suggest: hiking, skiing, mountain biking\n\nAfter they choose, confirm their selections." } ], "functions": [ @@ -185,6 +142,7 @@ "type": "function", "function": { "name": "record_activities", + "handler": "record_activities", "description": "Record selected activities", "parameters": { "type": "object", @@ -200,18 +158,8 @@ } }, "required": ["activities"] - } - } - }, - { - "type": "function", - "function": { - "name": "verify_itinerary", - "description": "Proceed to itinerary verification", - "parameters": { - "type": "object", - "properties": {} - } + }, + "transition_to": "verify_itinerary" } } ] @@ -220,19 +168,20 @@ "messages": [ { "role": "system", - "content": "Review the complete itinerary with the user. Summarize their destination, dates, and chosen activities. Use the available functions:\n - Use get_dates if they want to make changes\n - Use confirm_booking if they're happy with everything\n\nBe thorough in reviewing all details and ask for their confirmation." + "content": "Review the complete itinerary with the user. Summarize their destination, dates, and chosen activities. Use revise_plan to make changes or confirm_booking if they're happy. Be thorough in reviewing all details and ask for their confirmation." } ], "functions": [ { "type": "function", "function": { - "name": "get_dates", + "name": "revise_plan", "description": "Return to date selection to revise the plan", "parameters": { "type": "object", "properties": {} - } + }, + "transition_to": "get_dates" } }, { @@ -243,7 +192,8 @@ "parameters": { "type": "object", "properties": {} - } + }, + "transition_to": "confirm_booking" } } ] @@ -264,7 +214,8 @@ "parameters": { "type": "object", "properties": {} - } + }, + "transition_to": "end" } } ], diff --git a/editor/js/editor/sidePanel.js b/editor/js/editor/sidePanel.js index a810a92..85e7e0f 100644 --- a/editor/js/editor/sidePanel.js +++ b/editor/js/editor/sidePanel.js @@ -307,7 +307,30 @@ export class SidePanel { (error) => { console.error("Invalid function JSON:", error); }, - (json) => json.type === "function" && json.function, // Validator for functions + // Updated validator to allow transition_to + (json) => { + // Basic structure check + if (json.type !== "function" || !json.function) { + return false; + } + + const func = json.function; + + // Must have name and parameters + if (!func.name || !func.parameters) { + return false; + } + + // transition_to is optional but must be a string if present + if ( + func.transition_to !== undefined && + typeof func.transition_to !== "string" + ) { + return false; + } + + return true; + }, ); // Store editor instances for use in updatePanel diff --git a/editor/js/nodes/functionNode.js b/editor/js/nodes/functionNode.js index 94f6c36..3360140 100644 --- a/editor/js/nodes/functionNode.js +++ b/editor/js/nodes/functionNode.js @@ -201,6 +201,18 @@ export class PipecatFunctionNode extends LGraphNode { ctx.fillText("Has Parameters ⚙️", padding, y + 25); } + // Optional: Draw transition indicator + if (this.properties.function.transition_to) { + ctx.fillStyle = "#666"; + ctx.font = "11px Arial"; + ctx.fillText( + `→ ${this.properties.function.transition_to}`, + padding, + y + (hasParameters ? 45 : 25), + ); + y += 20; // Increase y for the new line + } + // Adjust node height const desiredHeight = y + (hasParameters ? 45 : 25); if (Math.abs(this.size[1] - desiredHeight) > 10) { diff --git a/editor/js/utils/export.js b/editor/js/utils/export.js index d4f78a2..889946b 100644 --- a/editor/js/utils/export.js +++ b/editor/js/utils/export.js @@ -45,21 +45,49 @@ export function generateFlowConfig(graphInstance) { if (!targetNode) return; if (targetNode.constructor.name === "PipecatFunctionNode") { - functions.push({ + // Create base function configuration + const funcConfig = { type: "function", - function: targetNode.properties.function, - }); + function: { ...targetNode.properties.function }, + }; + + // Find where this function connects to (if anywhere) + const functionTargets = targetNode.outputs[0].links || []; + if (functionTargets.length > 0) { + // Look through all connections to find the actual target node + // (skipping merge nodes) + for (const targetLinkId of functionTargets) { + const targetLink = graphInstance.links[targetLinkId]; + if (!targetLink) continue; + + const nextNode = nodes.find((n) => n.id === targetLink.target_id); + if (!nextNode) continue; + + // If it connects to a merge node, follow through to final target + if (nextNode.constructor.name === "PipecatMergeNode") { + const mergeOutput = nextNode.outputs[0].links?.[0]; + if (!mergeOutput) continue; + + const mergeLink = graphInstance.links[mergeOutput]; + if (!mergeLink) continue; + + const finalNode = nodes.find( + (n) => n.id === mergeLink.target_id, + ); + if (finalNode) { + funcConfig.function.transition_to = finalNode.title; + break; // Use first valid target found + } + } else { + // Direct connection to target node + funcConfig.function.transition_to = nextNode.title; + break; // Use first valid target found + } + } + } + + functions.push(funcConfig); } else if (targetNode.constructor.name === "PipecatMergeNode") { - // Find where this merge node connects to - const mergeOutput = targetNode.outputs[0].links?.[0]; - if (!mergeOutput) return; - - const mergeLink = graphInstance.links[mergeOutput]; - if (!mergeLink) return; - - const finalNode = nodes.find((n) => n.id === mergeLink.target_id); - if (!finalNode) return; - // Find all functions that connect to this merge node const connectedFunctions = nodes.filter( (n) => @@ -70,11 +98,24 @@ export function generateFlowConfig(graphInstance) { }), ); - // Add all functions with their correct target + // Find the final target of the merge node + const mergeOutput = targetNode.outputs[0].links?.[0]; + if (!mergeOutput) return; + + const mergeLink = graphInstance.links[mergeOutput]; + if (!mergeLink) return; + + const finalNode = nodes.find((n) => n.id === mergeLink.target_id); + if (!finalNode) return; + + // Add all functions with their transition to the final target connectedFunctions.forEach((funcNode) => { functions.push({ type: "function", - function: funcNode.properties.function, + function: { + ...funcNode.properties.function, + transition_to: finalNode.title, + }, }); }); } diff --git a/editor/js/utils/import.js b/editor/js/utils/import.js index 9ed09c6..973ddaf 100644 --- a/editor/js/utils/import.js +++ b/editor/js/utils/import.js @@ -83,29 +83,31 @@ export function createFlowFromConfig(graph, flowConfig) { Object.entries(flowConfig.nodes).forEach(([sourceNodeId, nodeConfig]) => { if (nodeConfig.functions) { nodeConfig.functions.forEach((funcConfig) => { - const targetName = funcConfig.function.name; const functionNode = new PipecatFunctionNode(); - functionNode.properties.function = funcConfig.function; - functionNode.properties.isNodeFunction = !nodes[targetName]; + functionNode.properties.function = { ...funcConfig.function }; graph.add(functionNode); // Add function node to dagre graph - const funcNodeId = `func_${sourceNodeId}_${targetName}_${Math.random().toString(36).substr(2, 9)}`; + const funcNodeId = `func_${sourceNodeId}_${functionNode.properties.function.name}`; g.setNode(funcNodeId, { width: functionNode.size[0], height: functionNode.size[1], node: functionNode, }); - // Connect source to function node in dagre + // Connect source to function node g.setEdge(sourceNodeId, funcNodeId); - // Store for later LiteGraph connection + // If has transition_to, connect to target node + if (funcConfig.function.transition_to) { + g.setEdge(funcNodeId, funcConfig.function.transition_to); + } + functionNodes.set(functionNode, { source: nodes[sourceNodeId].node, - target: nodes[targetName]?.node, - targetName: targetName, + target: nodes[funcConfig.function.transition_to]?.node, + targetName: funcConfig.function.transition_to, funcNodeId: funcNodeId, }); }); diff --git a/editor/js/utils/validation.js b/editor/js/utils/validation.js index d7f45fa..5604e14 100644 --- a/editor/js/utils/validation.js +++ b/editor/js/utils/validation.js @@ -44,6 +44,7 @@ export class FlowValidator { this._validateInitialNode(); this._validateNodeReferences(); this._validateNodeContents(); + this._validateTransitions(); return this.errors; } @@ -73,10 +74,8 @@ export class FlowValidator { for (const node of Object.values(this.flow.nodes)) { const func = node.functions?.find((f) => f.function.name === funcName); if (func) { - // Node functions are those that: - // 1. Have parameters with properties (collecting data) - // 2. Have required fields (must collect specific data) - // 3. Have constraints (enum, min/max, etc.) + // Node functions are those that have a handler (indicated by parameters) + // Edge functions are those that have transition_to const params = func.function.parameters; const hasProperties = Object.keys(params.properties || {}).length > 0; const hasRequired = @@ -88,6 +87,8 @@ export class FlowValidator { prop.maximum !== undefined, ); + // Function is a node function if it has parameters + // Edge functions should only have transition_to return hasProperties && (hasRequired || hasConstraints); } } @@ -101,13 +102,28 @@ export class FlowValidator { _validateNodeReferences() { Object.entries(this.flow.nodes).forEach(([nodeId, node]) => { if (node.functions) { - const functionNames = node.functions - .map((func) => func.function?.name) - .filter(Boolean); + node.functions.forEach((func) => { + // Get the transition target from transition_to property + const transitionTo = func.function?.transition_to; + const hasHandler = func.function?.handler; - functionNames.forEach((funcName) => { + // If there's a transition_to, validate it points to a valid node + if (transitionTo && !this.flow.nodes[transitionTo]) { + this.errors.push( + `Node '${nodeId}' has function '${func.function.name}' with invalid transition_to: '${transitionTo}'`, + ); + } + + // Skip validation for functions that: + // - have parameters (node functions) + // - have a handler + // - have a transition_to + // - are end functions + const funcName = func.function?.name; if ( !this.isNodeFunction(funcName) && + !hasHandler && + !transitionTo && funcName !== "end" && !this.flow.nodes[funcName] ) { @@ -120,6 +136,21 @@ export class FlowValidator { }); } + _validateTransitions() { + Object.entries(this.flow.nodes).forEach(([nodeId, node]) => { + if (node.functions) { + node.functions.forEach((func) => { + const transition_to = func.function.transition_to; + if (transition_to && !this.flow.nodes[transition_to]) { + this.errors.push( + `Node '${nodeId}' has function '${func.function.name}' with invalid transition_to: '${transition_to}'`, + ); + } + }); + } + }); + } + /** * Validates node contents * @private diff --git a/env.example b/env.example index 335957c..7c620d4 100644 --- a/env.example +++ b/env.example @@ -1,4 +1,5 @@ DEEPGRAM_API_KEY= +CARTESIA_API_KEY= OPENAI_API_KEY= ANTHROPIC_API_KEY= GOOGLE_API_KEY= diff --git a/examples/dynamic/insurance_anthropic.py b/examples/dynamic/insurance_anthropic.py index 8792351..67e5d66 100644 --- a/examples/dynamic/insurance_anthropic.py +++ b/examples/dynamic/insurance_anthropic.py @@ -180,9 +180,6 @@ def create_initial_node() -> NodeConfig: }, } ], - "pre_actions": [ - {"type": "tts_say", "text": "Welcome! Let's find the right insurance coverage for you."} - ], } @@ -214,7 +211,6 @@ def create_marital_status_node() -> NodeConfig: }, } ], - "pre_actions": [{"type": "tts_say", "text": "Now, I'll need to know your marital status."}], } @@ -319,9 +315,13 @@ def create_end_node() -> NodeConfig: ], } ], - "functions": [], - "pre_actions": [ - {"type": "tts_say", "text": "Thank you for getting a quote with us today!"} + "functions": [ + # Add a dummy function to satisfy Anthropic's requirement + { + "name": "end_conversation", + "description": "End the conversation", + "input_schema": {"type": "object", "properties": {}}, + } ], "post_actions": [{"type": "end_conversation"}], } diff --git a/examples/static/food_ordering.py b/examples/static/food_ordering.py index e9e7e09..db91d91 100644 --- a/examples/static/food_ordering.py +++ b/examples/static/food_ordering.py @@ -17,15 +17,16 @@ from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext -from pipecat.services.deepgram import DeepgramSTTService, DeepgramTTSService +from pipecat.services.cartesia import CartesiaTTSService +from pipecat.services.deepgram import DeepgramSTTService from pipecat.services.openai import OpenAILLMService from pipecat.transports.services.daily import DailyParams, DailyTransport +from pipecat_flows import FlowArgs, FlowConfig, FlowManager, FlowResult + sys.path.append(str(Path(__file__).parent.parent)) from runner import configure -from pipecat_flows import FlowArgs, FlowConfig, FlowManager, FlowResult - load_dotenv(override=True) logger.remove(0) @@ -33,54 +34,78 @@ # Flow Configuration - Food ordering # -# This configuration defines a simple food ordering system with the following states: +# This configuration defines a food ordering system with the following states: # # 1. start # - Initial state where user chooses between pizza or sushi -# - Functions: choose_pizza, choose_sushi -# - Transitions to: choose_pizza or choose_sushi +# - Functions: +# * choose_pizza (transitions to choose_pizza) +# * choose_sushi (transitions to choose_sushi) # # 2. choose_pizza -# - Handles pizza size selection and order confirmation +# - Handles pizza order details # - Functions: -# * select_pizza_size (node function, can be called multiple times) -# * end (transitions to end node after order confirmation) -# - Pre-action: Immediate TTS acknowledgment +# * select_pizza_order (node function with size and type) +# * confirm_order (transitions to confirm) +# - Pricing: +# * Small: $10 +# * Medium: $15 +# * Large: $20 # # 3. choose_sushi -# - Handles sushi roll count selection and order confirmation +# - Handles sushi order details # - Functions: -# * select_roll_count (node function, can be called multiple times) -# * end (transitions to end node after order confirmation) -# - Pre-action: Immediate TTS acknowledgment +# * select_sushi_order (node function with count and type) +# * confirm_order (transitions to confirm) +# - Pricing: +# * $8 per roll # -# 4. end +# 4. confirm +# - Reviews order details with the user +# - Functions: +# * complete_order (transitions to end) +# +# 5. end # - Final state that closes the conversation # - No functions available -# - Pre-action: Farewell message # - Post-action: Ends conversation # Type definitions -class PizzaSizeResult(FlowResult): +class PizzaOrderResult(FlowResult): size: str + type: str + price: float -class RollCountResult(FlowResult): +class SushiOrderResult(FlowResult): count: int + type: str + price: float # Function handlers -async def select_pizza_size(args: FlowArgs) -> PizzaSizeResult: - """Handle pizza size selection.""" +async def select_pizza_order(args: FlowArgs) -> PizzaOrderResult: + """Handle pizza size and type selection.""" size = args["size"] - return {"size": size} + pizza_type = args["type"] + + # Simple pricing + base_price = {"small": 10.00, "medium": 15.00, "large": 20.00} + price = base_price[size] + + return {"size": size, "type": pizza_type, "price": price} -async def select_roll_count(args: FlowArgs) -> RollCountResult: - """Handle sushi roll count selection.""" +async def select_sushi_order(args: FlowArgs) -> SushiOrderResult: + """Handle sushi roll count and type selection.""" count = args["count"] - return {"count": count} + roll_type = args["type"] + + # Simple pricing: $8 per roll + price = count * 8.00 + + return {"count": count, "type": roll_type, "price": price} flow_config: FlowConfig = { @@ -98,16 +123,18 @@ async def select_roll_count(args: FlowArgs) -> RollCountResult: "type": "function", "function": { "name": "choose_pizza", - "description": "User wants to order pizza", + "description": "User wants to order pizza. Let's get that order started.", "parameters": {"type": "object", "properties": {}}, + "transition_to": "choose_pizza", }, }, { "type": "function", "function": { "name": "choose_sushi", - "description": "User wants to order sushi", + "description": "User wants to order sushi. Let's get that order started.", "parameters": {"type": "object", "properties": {}}, + "transition_to": "choose_sushi", }, }, ], @@ -116,16 +143,25 @@ async def select_roll_count(args: FlowArgs) -> RollCountResult: "messages": [ { "role": "system", - "content": "You are handling a pizza order. Use the available functions:\n - Use select_pizza_size when the user specifies a size (can be used multiple times if they change their mind or want to order multiple pizzas)\n - Use the end function ONLY when the user confirms they are done with their order\n\nAfter each size selection, confirm the selection and ask if they want to change it or complete their order. Only use the end function after the user confirms they are satisfied with their order.\n\nStart off by acknowledging the user's choice. Once they've chosen a size, ask if they'd like anything else. Remember to be friendly and casual.", + "content": """You are handling a pizza order. Use the available functions: +- Use select_pizza_order when the user specifies both size AND type +- Use confirm_order when the user confirms they are satisfied with their selection + +Pricing: +- Small: $10 +- Medium: $15 +- Large: $20 + +After selection, confirm both the size and type, state the price, and ask if they want to confirm their order. Remember to be friendly and casual.""", } ], "functions": [ { "type": "function", "function": { - "name": "select_pizza_size", - "handler": select_pizza_size, - "description": "Record the selected pizza size", + "name": "select_pizza_order", + "handler": select_pizza_order, + "description": "Record the pizza order details", "parameters": { "type": "object", "properties": { @@ -133,39 +169,49 @@ async def select_roll_count(args: FlowArgs) -> RollCountResult: "type": "string", "enum": ["small", "medium", "large"], "description": "Size of the pizza", - } + }, + "type": { + "type": "string", + "enum": ["pepperoni", "cheese", "supreme", "vegetarian"], + "description": "Type of pizza", + }, }, - "required": ["size"], + "required": ["size", "type"], }, }, }, { "type": "function", "function": { - "name": "end", - "description": "Complete the order (use only after user confirms)", + "name": "confirm_order", + "description": "Proceed to order confirmation", "parameters": {"type": "object", "properties": {}}, + "transition_to": "confirm", }, }, ], - "pre_actions": [ - {"type": "tts_say", "text": "Ok, let me help you with your pizza order..."} - ], }, "choose_sushi": { "messages": [ { "role": "system", - "content": "You are handling a sushi order. Use the available functions:\n - Use select_roll_count when the user specifies how many rolls (can be used multiple times if they change their mind or if they want to order multiple sushi rolls)\n - Use the end function ONLY when the user confirms they are done with their order\n\nAfter each roll count selection, confirm the count and ask if they want to change it or complete their order. Only use the end function after the user confirms they are satisfied with their order.\n\nStart off by acknowledging the user's choice. Once they've chosen a size, ask if they'd like anything else. Remember to be friendly and casual.", + "content": """You are handling a sushi order. Use the available functions: +- Use select_sushi_order when the user specifies both count AND type +- Use confirm_order when the user confirms they are satisfied with their selection + +Pricing: +- $8 per roll + +After selection, confirm both the count and type, state the price, and ask if they want to confirm their order. Remember to be friendly and casual.""", } ], "functions": [ { "type": "function", "function": { - "name": "select_roll_count", - "handler": select_roll_count, - "description": "Record the number of sushi rolls", + "name": "select_sushi_order", + "handler": select_sushi_order, + "description": "Record the sushi order details", "parameters": { "type": "object", "properties": { @@ -174,34 +220,59 @@ async def select_roll_count(args: FlowArgs) -> RollCountResult: "minimum": 1, "maximum": 10, "description": "Number of rolls to order", - } + }, + "type": { + "type": "string", + "enum": ["california", "spicy tuna", "rainbow", "dragon"], + "description": "Type of sushi roll", + }, }, - "required": ["count"], + "required": ["count", "type"], }, }, }, { "type": "function", "function": { - "name": "end", - "description": "Complete the order (use only after user confirms)", + "name": "confirm_order", + "description": "Proceed to order confirmation", "parameters": {"type": "object", "properties": {}}, + "transition_to": "confirm", }, }, ], - "pre_actions": [ - {"type": "tts_say", "text": "Ok, let me help you with your sushi order..."} + }, + "confirm": { + "messages": [ + { + "role": "system", + "content": """Read back the complete order details to the user and ask for final confirmation. Use the available functions: +- Use complete_order when the user confirms +- Use revise_order if they want to change something + +Be friendly and clear when reading back the order details.""", + } + ], + "functions": [ + { + "type": "function", + "function": { + "name": "complete_order", + "description": "User confirms the order is correct", + "parameters": {"type": "object", "properties": {}}, + "transition_to": "end", + }, + }, ], }, "end": { "messages": [ { "role": "system", - "content": "The order is complete. Thank the user and end the conversation.", + "content": "Concisely end the conversation—1-3 words is appropriate. Just say 'Bye' or something similarly short.", } ], "functions": [], - "pre_actions": [{"type": "tts_say", "text": "Thank you for your order! Goodbye!"}], "post_actions": [{"type": "end_conversation"}], }, }, @@ -227,14 +298,17 @@ async def main(): ) stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) - tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-helios-en") + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="820a3788-2b37-4d21-847a-b65d8a68c99a", # Salesman + ) llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o") # Create initial context messages = [ { "role": "system", - "content": "You are an order-taking assistant. You must ALWAYS use the available functions to progress the conversation. This is a phone conversation and your responses will be converted to audio. Avoid outputting special characters and emojis.", + "content": "You are an order-taking assistant. You must ALWAYS use the available functions to progress the conversation. This is a phone conversation and your responses will be converted to audio. Keep the conversation friendly, casual, and polite. Avoid outputting special characters and emojis.", } ] diff --git a/examples/static/movie_explorer_anthropic.py b/examples/static/movie_explorer_anthropic.py index d236205..50d1c37 100644 --- a/examples/static/movie_explorer_anthropic.py +++ b/examples/static/movie_explorer_anthropic.py @@ -12,7 +12,7 @@ # - Edge functions for state transitions (explore_movie, greeting, end) # # The flow allows users to: -# 1. See what movies are currently playing +# 1. See what movies are currently playing or coming soon # 2. Get detailed information about specific movies (including cast) # 3. Find similar movies as recommendations # @@ -37,8 +37,10 @@ from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext from pipecat.services.anthropic import AnthropicLLMService -from pipecat.services.deepgram import DeepgramSTTService, DeepgramTTSService +from pipecat.services.cartesia import CartesiaTTSService +from pipecat.services.deepgram import DeepgramSTTService from pipecat.transports.services.daily import DailyParams, DailyTransport +from pipecat.utils.text.markdown_text_filter import MarkdownTextFilter sys.path.append(str(Path(__file__).parent.parent)) from runner import configure @@ -122,6 +124,33 @@ async def fetch_current_movies(self, session: aiohttp.ClientSession) -> List[Mov for movie in data["results"][:5] ] + async def fetch_upcoming_movies(self, session: aiohttp.ClientSession) -> List[MovieBasic]: + """Fetch upcoming movies from TMDB. + + Returns top 5 upcoming movies with basic information. + """ + url = f"{self.base_url}/movie/upcoming" + params = {"api_key": self.api_key, "language": "en-US", "page": 1} + + async with session.get(url, params=params) as response: + if response.status != 200: + logger.error(f"TMDB API Error: {response.status}") + raise ValueError(f"API returned status {response.status}") + + data = await response.json() + if "results" not in data: + logger.error(f"Unexpected API response: {data}") + raise ValueError("Invalid API response format") + + return [ + { + "id": movie["id"], + "title": movie["title"], + "overview": movie["overview"][:100] + "...", + } + for movie in data["results"][:5] + ] + async def fetch_movie_credits(self, session: aiohttp.ClientSession, movie_id: int) -> List[str]: """Fetch top cast members for a movie. @@ -227,6 +256,19 @@ async def get_movies() -> Union[MoviesResult, ErrorResult]: return ErrorResult(error="Failed to fetch movies") +async def get_upcoming_movies() -> Union[MoviesResult, ErrorResult]: + """Handler for fetching upcoming movies.""" + logger.debug("Calling TMDB API: get_upcoming_movies") + async with aiohttp.ClientSession() as session: + try: + movies = await tmdb_api.fetch_upcoming_movies(session) + logger.debug(f"TMDB API Response: {movies}") + return MoviesResult(movies=movies) + except Exception as e: + logger.error(f"TMDB API Error: {e}") + return ErrorResult(error="Failed to fetch upcoming movies") + + async def get_movie_details(args: FlowArgs) -> Union[MovieDetailsResult, ErrorResult]: """Handler for fetching movie details including cast.""" movie_id = args["movie_id"] @@ -266,30 +308,27 @@ async def get_similar_movies(args: FlowArgs) -> Union[SimilarMoviesResult, Error "content": [ { "type": "text", - "text": "You are a helpful movie expert. Start by greeting the user and asking if they'd like to know what movies are currently playing in theaters. Use get_movies to fetch the current movies when they're interested.", + "text": "You are a helpful movie expert. Start by greeting the user and asking if they'd like to know about movies currently in theaters or upcoming releases. Wait for their choice before using either get_current_movies or get_upcoming_movies.", } ], } ], "functions": [ { - "name": "get_movies", + "name": "get_current_movies", "handler": get_movies, - "description": "Fetch currently playing movies", + "description": "Fetch movies currently playing in theaters", "input_schema": {"type": "object", "properties": {}}, + "transition_to": "explore_movie", }, { - "name": "explore_movie", - "description": "Move to movie exploration", + "name": "get_upcoming_movies", + "handler": get_upcoming_movies, + "description": "Fetch movies coming soon to theaters", "input_schema": {"type": "object", "properties": {}}, + "transition_to": "explore_movie", }, ], - "pre_actions": [ - { - "type": "tts_say", - "text": "Welcome! I can tell you about movies currently playing in theaters.", - } - ], }, "explore_movie": { "messages": [ @@ -298,7 +337,14 @@ async def get_similar_movies(args: FlowArgs) -> Union[SimilarMoviesResult, Error "content": [ { "type": "text", - "text": "Help the user learn more about movies. Use get_movie_details when they express interest in a specific movie - this will show details including cast, runtime, and rating. After showing details, you can use get_similar_movies if they want recommendations. Ask if they'd like to explore another movie (use explore_movie) or end the conversation.", + "text": """Help the user learn more about movies. You can: +- Use get_movie_details when they express interest in a specific movie +- Use get_similar_movies to show recommendations +- Use get_current_movies to see what's playing now +- Use get_upcoming_movies to see what's coming soon +- Use end_conversation when they're done exploring + +After showing details or recommendations, ask if they'd like to explore another movie or end the conversation.""", } ], } @@ -329,14 +375,22 @@ async def get_similar_movies(args: FlowArgs) -> Union[SimilarMoviesResult, Error }, }, { - "name": "explore_movie", - "description": "Return to current movies list", + "name": "get_current_movies", + "handler": get_movies, + "description": "Show current movies in theaters", "input_schema": {"type": "object", "properties": {}}, }, { - "name": "end", + "name": "get_upcoming_movies", + "handler": get_upcoming_movies, + "description": "Show movies coming soon", + "input_schema": {"type": "object", "properties": {}}, + }, + { + "name": "end_conversation", "description": "End the conversation", "input_schema": {"type": "object", "properties": {}}, + "transition_to": "end", }, ], }, @@ -345,13 +399,20 @@ async def get_similar_movies(args: FlowArgs) -> Union[SimilarMoviesResult, Error { "role": "user", "content": [ - {"type": "text", "text": "Thank the user and end the conversation."} + { + "type": "text", + "text": "Thank the user warmly and mention they can return anytime to discover more movies.", + } ], } ], - "functions": [], - "pre_actions": [ - {"type": "tts_say", "text": "Thanks for exploring movies with me! Goodbye!"} + "functions": [ + # Add a dummy function to satisfy Anthropic's requirement + { + "name": "end_conversation", + "description": "End the conversation", + "input_schema": {"type": "object", "properties": {}}, + } ], "post_actions": [{"type": "end_conversation"}], }, @@ -377,7 +438,11 @@ async def main(): ) stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) - tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-helios-en") + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="c45bc5ec-dc68-4feb-8829-6e6b2748095d", # Movieman + text_filter=MarkdownTextFilter(), + ) llm = AnthropicLLMService( api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3-5-sonnet-latest" ) diff --git a/examples/static/movie_explorer_gemini.py b/examples/static/movie_explorer_gemini.py index 3726782..e8e0631 100644 --- a/examples/static/movie_explorer_gemini.py +++ b/examples/static/movie_explorer_gemini.py @@ -12,7 +12,7 @@ # - Edge functions for state transitions (explore_movie, greeting, end) # # The flow allows users to: -# 1. See what movies are currently playing +# 1. See what movies are currently playing or coming soon # 2. Get detailed information about specific movies (including cast) # 3. Find similar movies as recommendations # @@ -36,9 +36,11 @@ from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext -from pipecat.services.deepgram import DeepgramSTTService, DeepgramTTSService +from pipecat.services.cartesia import CartesiaTTSService +from pipecat.services.deepgram import DeepgramSTTService from pipecat.services.google import GoogleLLMService from pipecat.transports.services.daily import DailyParams, DailyTransport +from pipecat.utils.text.markdown_text_filter import MarkdownTextFilter sys.path.append(str(Path(__file__).parent.parent)) from runner import configure @@ -122,6 +124,33 @@ async def fetch_current_movies(self, session: aiohttp.ClientSession) -> List[Mov for movie in data["results"][:5] ] + async def fetch_upcoming_movies(self, session: aiohttp.ClientSession) -> List[MovieBasic]: + """Fetch upcoming movies from TMDB. + + Returns top 5 upcoming movies with basic information. + """ + url = f"{self.base_url}/movie/upcoming" + params = {"api_key": self.api_key, "language": "en-US", "page": 1} + + async with session.get(url, params=params) as response: + if response.status != 200: + logger.error(f"TMDB API Error: {response.status}") + raise ValueError(f"API returned status {response.status}") + + data = await response.json() + if "results" not in data: + logger.error(f"Unexpected API response: {data}") + raise ValueError("Invalid API response format") + + return [ + { + "id": movie["id"], + "title": movie["title"], + "overview": movie["overview"][:100] + "...", + } + for movie in data["results"][:5] + ] + async def fetch_movie_credits(self, session: aiohttp.ClientSession, movie_id: int) -> List[str]: """Fetch top cast members for a movie. @@ -225,6 +254,19 @@ async def get_movies() -> Union[MoviesResult, ErrorResult]: return ErrorResult(error="Failed to fetch movies") +async def get_upcoming_movies() -> Union[MoviesResult, ErrorResult]: + """Handler for fetching upcoming movies.""" + logger.debug("Calling TMDB API: get_upcoming_movies") + async with aiohttp.ClientSession() as session: + try: + movies = await tmdb_api.fetch_upcoming_movies(session) + logger.debug(f"TMDB API Response: {movies}") + return MoviesResult(movies=movies) + except Exception as e: + logger.error(f"TMDB API Error: {e}") + return ErrorResult(error="Failed to fetch upcoming movies") + + async def get_movie_details(args: FlowArgs) -> Union[MovieDetailsResult, ErrorResult]: """Handler for fetching movie details including cast.""" movie_id = args["movie_id"] @@ -261,33 +303,48 @@ async def get_similar_movies(args: FlowArgs) -> Union[SimilarMoviesResult, Error "messages": [ { "role": "system", - "content": "You are a helpful movie expert. Start by greeting the user and asking if they'd like to know what movies are currently playing in theaters. Use get_movies to fetch the current movies when they're interested. Then move to explore_movie to help them learn more about a specific movie.", + "content": "You are a helpful movie expert. Start by greeting the user and asking if they'd like to know about movies currently in theaters or upcoming releases. Wait for their choice before using either get_current_movies or get_upcoming_movies.", } ], "functions": [ { "function_declarations": [ { - "name": "get_movies", + "name": "get_current_movies", "handler": get_movies, - "description": "Fetch currently playing movies", + "description": "Fetch movies currently playing in theaters", + "parameters": { + "type": "object", + "properties": {}, + }, + "transition_to": "explore_movie", + }, + { + "name": "get_upcoming_movies", + "handler": get_upcoming_movies, + "description": "Fetch movies coming soon to theaters", + "parameters": { + "type": "object", + "properties": {}, + }, + "transition_to": "explore_movie", }, - {"name": "explore_movie", "description": "Move to movie exploration"}, ] } ], - "pre_actions": [ - { - "type": "tts_say", - "text": "Welcome! I can tell you about movies currently playing in theaters.", - } - ], }, "explore_movie": { "messages": [ { "role": "system", - "content": "Help the user learn more about movies. Use get_movie_details when they express interest in a specific movie - this will show details including cast, runtime, and rating. After showing details, you can use get_similar_movies if they want recommendations. Ask if they'd like to explore another movie (use explore_movie) or end the conversation (use end) if they're finished.", + "content": """Help the user learn more about movies. You can: +- Use get_movie_details when they express interest in a specific movie +- Use get_similar_movies to show recommendations +- Use get_current_movies to see what's playing now +- Use get_upcoming_movies to see what's coming soon +- Use end_conversation when they're done exploring + +After showing details or recommendations, ask if they'd like to explore another movie or end the conversation.""", } ], "functions": [ @@ -317,18 +374,45 @@ async def get_similar_movies(args: FlowArgs) -> Union[SimilarMoviesResult, Error "required": ["movie_id"], }, }, - {"name": "explore_movie", "description": "Return to current movies list"}, - {"name": "end", "description": "End the conversation"}, + { + "name": "get_current_movies", + "handler": get_movies, + "description": "Show current movies in theaters", + "parameters": { + "type": "object", + "properties": {}, + }, + }, + { + "name": "get_upcoming_movies", + "handler": get_upcoming_movies, + "description": "Show movies coming soon", + "parameters": { + "type": "object", + "properties": {}, + }, + }, + { + "name": "end_conversation", + "description": "End the conversation", + "parameters": { + "type": "object", + "properties": {}, + }, + "transition_to": "end", + }, ] } ], }, "end": { - "messages": [{"role": "system", "content": "Thank the user and end the conversation."}], - "functions": [], - "pre_actions": [ - {"type": "tts_say", "text": "Thanks for exploring movies with me! Goodbye!"} + "messages": [ + { + "role": "system", + "content": "Thank the user warmly and mention they can return anytime to discover more movies.", + } ], + "functions": [], "post_actions": [{"type": "end_conversation"}], }, }, @@ -353,7 +437,11 @@ async def main(): ) stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) - tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-helios-en") + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="c45bc5ec-dc68-4feb-8829-6e6b2748095d", # Movieman + text_filter=MarkdownTextFilter(), + ) llm = GoogleLLMService(api_key=os.getenv("GOOGLE_API_KEY"), model="gemini-1.5-flash-latest") # Get initial tools diff --git a/examples/static/movie_explorer_openai.py b/examples/static/movie_explorer_openai.py index 7254e9a..4e9830e 100644 --- a/examples/static/movie_explorer_openai.py +++ b/examples/static/movie_explorer_openai.py @@ -12,7 +12,7 @@ # - Edge functions for state transitions (explore_movie, greeting, end) # # The flow allows users to: -# 1. See what movies are currently playing +# 1. See what movies are currently playing or coming soon # 2. Get detailed information about specific movies (including cast) # 3. Find similar movies as recommendations # @@ -36,9 +36,11 @@ from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext -from pipecat.services.deepgram import DeepgramSTTService, DeepgramTTSService +from pipecat.services.cartesia import CartesiaTTSService +from pipecat.services.deepgram import DeepgramSTTService from pipecat.services.openai import OpenAILLMService from pipecat.transports.services.daily import DailyParams, DailyTransport +from pipecat.utils.text.markdown_text_filter import MarkdownTextFilter sys.path.append(str(Path(__file__).parent.parent)) from runner import configure @@ -122,6 +124,33 @@ async def fetch_current_movies(self, session: aiohttp.ClientSession) -> List[Mov for movie in data["results"][:5] ] + async def fetch_upcoming_movies(self, session: aiohttp.ClientSession) -> List[MovieBasic]: + """Fetch upcoming movies from TMDB. + + Returns top 5 upcoming movies with basic information. + """ + url = f"{self.base_url}/movie/upcoming" + params = {"api_key": self.api_key, "language": "en-US", "page": 1} + + async with session.get(url, params=params) as response: + if response.status != 200: + logger.error(f"TMDB API Error: {response.status}") + raise ValueError(f"API returned status {response.status}") + + data = await response.json() + if "results" not in data: + logger.error(f"Unexpected API response: {data}") + raise ValueError("Invalid API response format") + + return [ + { + "id": movie["id"], + "title": movie["title"], + "overview": movie["overview"][:100] + "...", + } + for movie in data["results"][:5] + ] + async def fetch_movie_credits(self, session: aiohttp.ClientSession, movie_id: int) -> List[str]: """Fetch top cast members for a movie. @@ -222,7 +251,20 @@ async def get_movies() -> Union[MoviesResult, ErrorResult]: return MoviesResult(movies=movies) except Exception as e: logger.error(f"TMDB API Error: {e}") - return ErrorResult(error="Failed to fetch movies") + return ErrorResult(status="error", error="Failed to fetch movies") + + +async def get_upcoming_movies() -> Union[MoviesResult, ErrorResult]: + """Handler for fetching upcoming movies.""" + logger.debug("Calling TMDB API: get_upcoming_movies") + async with aiohttp.ClientSession() as session: + try: + movies = await tmdb_api.fetch_upcoming_movies(session) + logger.debug(f"TMDB API Response: {movies}") + return MoviesResult(movies=movies) + except Exception as e: + logger.error(f"TMDB API Error: {e}") + return ErrorResult(status="error", error="Failed to fetch upcoming movies") async def get_movie_details(args: FlowArgs) -> Union[MovieDetailsResult, ErrorResult]: @@ -236,7 +278,9 @@ async def get_movie_details(args: FlowArgs) -> Union[MovieDetailsResult, ErrorRe return MovieDetailsResult(**details) except Exception as e: logger.error(f"TMDB API Error: {e}") - return ErrorResult(error=f"Failed to fetch details for movie {movie_id}") + return ErrorResult( + status="error", error=f"Failed to fetch details for movie {movie_id}" + ) async def get_similar_movies(args: FlowArgs) -> Union[SimilarMoviesResult, ErrorResult]: @@ -250,7 +294,9 @@ async def get_similar_movies(args: FlowArgs) -> Union[SimilarMoviesResult, Error return SimilarMoviesResult(movies=similar) except Exception as e: logger.error(f"TMDB API Error: {e}") - return ErrorResult(error=f"Failed to fetch similar movies for {movie_id}") + return ErrorResult( + status="error", error=f"Failed to fetch similar movies for {movie_id}" + ) # Flow configuration @@ -261,40 +307,44 @@ async def get_similar_movies(args: FlowArgs) -> Union[SimilarMoviesResult, Error "messages": [ { "role": "system", - "content": "You are a helpful movie expert. Start by greeting the user and asking if they'd like to know what movies are currently playing in theaters. Use get_movies to fetch the current movies when they're interested.", + "content": "You are a helpful movie expert. Start by greeting the user and asking if they'd like to know about movies currently in theaters or upcoming releases. Wait for their choice before using either get_current_movies or get_upcoming_movies.", } ], "functions": [ { "type": "function", "function": { - "name": "get_movies", + "name": "get_current_movies", "handler": get_movies, - "description": "Fetch currently playing movies", + "description": "Fetch movies currently playing in theaters", "parameters": {"type": "object", "properties": {}}, + "transition_to": "explore_movie", }, }, { "type": "function", "function": { - "name": "explore_movie", - "description": "Move to movie exploration", + "name": "get_upcoming_movies", + "handler": get_upcoming_movies, + "description": "Fetch movies coming soon to theaters", "parameters": {"type": "object", "properties": {}}, + "transition_to": "explore_movie", }, }, ], - "pre_actions": [ - { - "type": "tts_say", - "text": "Welcome! I can tell you about movies currently playing in theaters.", - } - ], }, "explore_movie": { "messages": [ { "role": "system", - "content": "Help the user learn more about movies. Use get_movie_details when they express interest in a specific movie - this will show details including cast, runtime, and rating. After showing details, you can use get_similar_movies if they want recommendations. Ask if they'd like to explore another movie (use explore_movie) or end the conversation.", + "content": """Help the user learn more about movies. You can: +- Use get_movie_details when they express interest in a specific movie +- Use get_similar_movies to show recommendations +- Use get_current_movies to see what's playing now +- Use get_upcoming_movies to see what's coming soon +- Use end_conversation when they're done exploring + +After showing details or recommendations, ask if they'd like to explore another movie or end the conversation.""", } ], "functions": [ @@ -331,27 +381,40 @@ async def get_similar_movies(args: FlowArgs) -> Union[SimilarMoviesResult, Error { "type": "function", "function": { - "name": "explore_movie", - "description": "Return to current movies list", + "name": "get_current_movies", + "handler": get_movies, + "description": "Show current movies in theaters", + "parameters": {"type": "object", "properties": {}}, + }, + }, + { + "type": "function", + "function": { + "name": "get_upcoming_movies", + "handler": get_upcoming_movies, + "description": "Show movies coming soon", "parameters": {"type": "object", "properties": {}}, }, }, { "type": "function", "function": { - "name": "end", + "name": "end_conversation", "description": "End the conversation", "parameters": {"type": "object", "properties": {}}, + "transition_to": "end", }, }, ], }, "end": { - "messages": [{"role": "system", "content": "Thank the user and end the conversation."}], - "functions": [], - "pre_actions": [ - {"type": "tts_say", "text": "Thanks for exploring movies with me! Goodbye!"} + "messages": [ + { + "role": "system", + "content": "Thank the user warmly and mention they can return anytime to discover more movies.", + } ], + "functions": [], "post_actions": [{"type": "end_conversation"}], }, }, @@ -376,7 +439,11 @@ async def main(): ) stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) - tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-helios-en") + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="c45bc5ec-dc68-4feb-8829-6e6b2748095d", # Movieman + text_filter=MarkdownTextFilter(), + ) llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o") # Get initial tools from the first node diff --git a/examples/static/patient_intake.py b/examples/static/patient_intake.py index ab9d32c..2098249 100644 --- a/examples/static/patient_intake.py +++ b/examples/static/patient_intake.py @@ -18,7 +18,8 @@ from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext -from pipecat.services.deepgram import DeepgramSTTService, DeepgramTTSService +from pipecat.services.cartesia import CartesiaTTSService +from pipecat.services.deepgram import DeepgramSTTService from pipecat.services.openai import OpenAILLMService from pipecat.transports.services.daily import DailyParams, DailyTransport @@ -169,7 +170,7 @@ async def record_visit_reasons(args: FlowArgs) -> VisitReasonRecordResult: "messages": [ { "role": "system", - "content": "Start by introducing yourself to Chad Bailey, then ask for their date of birth, including the year. Once they provide their birthday, use verify_birthday to check it. If the birthday is correct (1983-01-01), use get_prescriptions to proceed.", + "content": "Start by introducing yourself to Chad Bailey, then ask for their date of birth, including the year. Once they provide their birthday, use verify_birthday to check it. If verified (1983-01-01), proceed to prescriptions.", } ], "functions": [ @@ -178,7 +179,7 @@ async def record_visit_reasons(args: FlowArgs) -> VisitReasonRecordResult: "function": { "name": "verify_birthday", "handler": verify_birthday, - "description": "Verify the user has provided their correct birthday", + "description": "Verify the user has provided their correct birthday. Once confirmed, the next step is to recording the user's prescriptions.", "parameters": { "type": "object", "properties": { @@ -189,14 +190,7 @@ async def record_visit_reasons(args: FlowArgs) -> VisitReasonRecordResult: }, "required": ["birthday"], }, - }, - }, - { - "type": "function", - "function": { - "name": "get_prescriptions", - "description": "Proceed to collecting prescriptions and ask the user what medications they're taking", - "parameters": {"type": "object", "properties": {}}, + "transition_to": "get_prescriptions", }, }, ], @@ -205,7 +199,7 @@ async def record_visit_reasons(args: FlowArgs) -> VisitReasonRecordResult: "messages": [ { "role": "system", - "content": "This step is for collecting a user's prescription. Ask them what presceriptions they're taking, including the dosage. Use the available functions:\n - Use record_prescriptions when the user lists their medications (must include both name and dosage)\n - Use get_allergies once all prescriptions are recorded\n\nAsk them what prescriptions they're currently taking, making sure to get both medication names and dosages. After they provide their prescriptions (or confirm they have none), acknowledge their response and proceed to allergies", + "content": "This step is for collecting prescriptions. Ask them what prescriptions they're taking, including the dosage. After recording prescriptions (or confirming none), proceed to allergies.", } ], "functions": [ @@ -214,7 +208,7 @@ async def record_visit_reasons(args: FlowArgs) -> VisitReasonRecordResult: "function": { "name": "record_prescriptions", "handler": record_prescriptions, - "description": "Record the user's prescriptions", + "description": "Record the user's prescriptions. Once confirmed, the next step is to collect allergy information.", "parameters": { "type": "object", "properties": { @@ -238,14 +232,7 @@ async def record_visit_reasons(args: FlowArgs) -> VisitReasonRecordResult: }, "required": ["prescriptions"], }, - }, - }, - { - "type": "function", - "function": { - "name": "get_allergies", - "description": "Proceed to collecting allergies and ask the user if they have any allergies", - "parameters": {"type": "object", "properties": {}}, + "transition_to": "get_allergies", }, }, ], @@ -254,7 +241,7 @@ async def record_visit_reasons(args: FlowArgs) -> VisitReasonRecordResult: "messages": [ { "role": "system", - "content": "You are collecting allergy information. Use the available functions:\n - Use record_allergies when the user lists their allergies (or confirms they have none)\n - Use get_conditions once allergies are recorded\n\nAsk them about any allergies they have. After they list their allergies (or confirm they have none), acknowledge their response and ask about any medical conditions they have.", + "content": "Collect allergy information. Ask about any allergies they have. After recording allergies (or confirming none), proceed to medical conditions.", } ], "functions": [ @@ -263,7 +250,7 @@ async def record_visit_reasons(args: FlowArgs) -> VisitReasonRecordResult: "function": { "name": "record_allergies", "handler": record_allergies, - "description": "Record the user's allergies", + "description": "Record the user's allergies. Once confirmed, then next step is to collect medical conditions.", "parameters": { "type": "object", "properties": { @@ -275,7 +262,7 @@ async def record_visit_reasons(args: FlowArgs) -> VisitReasonRecordResult: "name": { "type": "string", "description": "What the user is allergic to", - } + }, }, "required": ["name"], }, @@ -283,14 +270,7 @@ async def record_visit_reasons(args: FlowArgs) -> VisitReasonRecordResult: }, "required": ["allergies"], }, - }, - }, - { - "type": "function", - "function": { - "name": "get_conditions", - "description": "Proceed to collecting medical conditions and ask the user if they have any medical conditions", - "parameters": {"type": "object", "properties": {}}, + "transition_to": "get_conditions", }, }, ], @@ -299,7 +279,7 @@ async def record_visit_reasons(args: FlowArgs) -> VisitReasonRecordResult: "messages": [ { "role": "system", - "content": "You are collecting medical condition information. Use the available functions:\n - Use record_conditions when the user lists their conditions (or confirms they have none)\n - Use get_visit_reasons once conditions are recorded\n\nAsk them about any medical conditions they have. After they list their conditions (or confirm they have none), acknowledge their response and ask about the reason for their visit today.", + "content": "Collect medical condition information. Ask about any medical conditions they have. After recording conditions (or confirming none), proceed to visit reasons.", } ], "functions": [ @@ -308,7 +288,7 @@ async def record_visit_reasons(args: FlowArgs) -> VisitReasonRecordResult: "function": { "name": "record_conditions", "handler": record_conditions, - "description": "Record the user's medical conditions", + "description": "Record the user's medical conditions. Once confirmed, the next step is to collect visit reasons.", "parameters": { "type": "object", "properties": { @@ -320,7 +300,7 @@ async def record_visit_reasons(args: FlowArgs) -> VisitReasonRecordResult: "name": { "type": "string", "description": "The user's medical condition", - } + }, }, "required": ["name"], }, @@ -328,14 +308,7 @@ async def record_visit_reasons(args: FlowArgs) -> VisitReasonRecordResult: }, "required": ["conditions"], }, - }, - }, - { - "type": "function", - "function": { - "name": "get_visit_reasons", - "description": "Proceed to collecting visit reasons and ask the user why they're visiting", - "parameters": {"type": "object", "properties": {}}, + "transition_to": "get_visit_reasons", }, }, ], @@ -344,7 +317,7 @@ async def record_visit_reasons(args: FlowArgs) -> VisitReasonRecordResult: "messages": [ { "role": "system", - "content": "You are collecting information about the reason for their visit. Use the available functions:\n - Use record_visit_reasons when they explain their reasons\n - Use verify_information once reasons are recorded\n\nAsk them what brings them to the doctor today. After they explain their reasons, acknowledge their response and let them know you'll review all the information they've provided.", + "content": "Collect information about the reason for their visit. Ask what brings them to the doctor today. After recording their reasons, proceed to verification.", } ], "functions": [ @@ -353,7 +326,7 @@ async def record_visit_reasons(args: FlowArgs) -> VisitReasonRecordResult: "function": { "name": "record_visit_reasons", "handler": record_visit_reasons, - "description": "Record the reasons for their visit", + "description": "Record the reasons for their visit. Once confirmed, the next step is to verify all information.", "parameters": { "type": "object", "properties": { @@ -365,7 +338,7 @@ async def record_visit_reasons(args: FlowArgs) -> VisitReasonRecordResult: "name": { "type": "string", "description": "The user's reason for visiting", - } + }, }, "required": ["name"], }, @@ -373,66 +346,61 @@ async def record_visit_reasons(args: FlowArgs) -> VisitReasonRecordResult: }, "required": ["visit_reasons"], }, - }, - }, - { - "type": "function", - "function": { - "name": "verify_information", - "description": "Proceed to information verification, repeat the information back to the user, and have them verify", - "parameters": {"type": "object", "properties": {}}, + "transition_to": "verify", }, }, ], }, - "verify_information": { + "verify": { "messages": [ { "role": "system", - "content": "Review all collected information. Summarize their prescriptions, allergies, conditions, and visit reasons. Use the available functions:\n - Use get_prescriptions if they need to make any changes\n - Use confirm_intake if they confirm everything is correct\n\nBe thorough in reviewing all details and ask for their explicit confirmation.", + "content": """Review all collected information with the patient. Follow these steps: +1. Summarize their prescriptions, allergies, conditions, and visit reasons +2. Ask if everything is correct +3. Use the appropriate function based on their response + +Be thorough in reviewing all details and wait for explicit confirmation.""", } ], "functions": [ { "type": "function", "function": { - "name": "get_prescriptions", + "name": "revise_information", "description": "Return to prescriptions to revise information", "parameters": {"type": "object", "properties": {}}, + "transition_to": "get_prescriptions", }, }, { "type": "function", "function": { - "name": "confirm_intake", - "description": "Confirm the intake information and proceed", + "name": "confirm_information", + "description": "Proceed with confirmed information", "parameters": {"type": "object", "properties": {}}, + "transition_to": "confirm", }, }, ], }, - "confirm_intake": { + "confirm": { "messages": [ { "role": "system", - "content": "The intake information is confirmed. Thank them for providing their information, let them know what to expect next, and use end to complete the conversation.", + "content": "Once confirmed, thank them, then use the complete_intake function to end the conversation.", } ], "functions": [ { "type": "function", "function": { - "name": "end", - "description": "End the conversation", + "name": "complete_intake", + "description": "Complete the intake process", "parameters": {"type": "object", "properties": {}}, + "transition_to": "end", }, - } - ], - "pre_actions": [ - { - "type": "tts_say", - "text": "Perfect! I've recorded all your information for your visit.", - } + }, ], }, "end": { @@ -443,12 +411,6 @@ async def record_visit_reasons(args: FlowArgs) -> VisitReasonRecordResult: } ], "functions": [], - "pre_actions": [ - { - "type": "tts_say", - "text": "Thank you for providing your information! We'll see you soon for your visit.", - } - ], "post_actions": [{"type": "end_conversation"}], }, }, @@ -473,7 +435,10 @@ async def main(): ) stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) - tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-asteria-en") + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady + ) llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o") # Get initial tools from the first node diff --git a/examples/static/restaurant_reservation.py b/examples/static/restaurant_reservation.py index 5e3fa3c..d38f6cf 100644 --- a/examples/static/restaurant_reservation.py +++ b/examples/static/restaurant_reservation.py @@ -17,7 +17,8 @@ from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext -from pipecat.services.deepgram import DeepgramSTTService, DeepgramTTSService +from pipecat.services.cartesia import CartesiaTTSService +from pipecat.services.deepgram import DeepgramSTTService from pipecat.services.openai import OpenAILLMService from pipecat.transports.services.daily import DailyParams, DailyTransport @@ -114,14 +115,7 @@ async def record_time(args: FlowArgs) -> FlowResult: }, "required": ["size"], }, - }, - }, - { - "type": "function", - "function": { - "name": "get_time", - "description": "Proceed to time selection", - "parameters": {"type": "object", "properties": {}}, + "transition_to": "get_time", }, }, ], @@ -130,7 +124,7 @@ async def record_time(args: FlowArgs) -> FlowResult: "messages": [ { "role": "system", - "content": "Ask what time they'd like to dine. Restaurant is open 5 PM to 10 PM.", + "content": "Ask what time they'd like to dine. Restaurant is open 5 PM to 10 PM. After they provide a time, confirm it's within operating hours before recording. Use 24-hour format for internal recording (e.g., 17:00 for 5 PM).", } ], "functions": [ @@ -145,19 +139,13 @@ async def record_time(args: FlowArgs) -> FlowResult: "properties": { "time": { "type": "string", - "pattern": "^([0-1][0-9]|2[0-3]):[0-5][0-9]$", + "pattern": "^(17|18|19|20|21|22):([0-5][0-9])$", + "description": "Reservation time in 24-hour format (17:00-22:00)", } }, "required": ["time"], }, - }, - }, - { - "type": "function", - "function": { - "name": "confirm", - "description": "Proceed to confirmation", - "parameters": {"type": "object", "properties": {}}, + "transition_to": "confirm", }, }, ], @@ -176,6 +164,7 @@ async def record_time(args: FlowArgs) -> FlowResult: "name": "end", "description": "End the conversation", "parameters": {"type": "object", "properties": {}}, + "transition_to": "end", }, } ], @@ -206,7 +195,10 @@ async def main(): ) stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) - tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-helios-en") + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady + ) llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o") # Get initial tools from the first node diff --git a/examples/static/travel_planner.py b/examples/static/travel_planner.py index 5888e9e..0ed9fb0 100644 --- a/examples/static/travel_planner.py +++ b/examples/static/travel_planner.py @@ -18,9 +18,11 @@ from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext -from pipecat.services.deepgram import DeepgramSTTService, DeepgramTTSService +from pipecat.services.cartesia import CartesiaTTSService +from pipecat.services.deepgram import DeepgramSTTService from pipecat.services.openai import OpenAILLMService from pipecat.transports.services.daily import DailyParams, DailyTransport +from pipecat.utils.text.markdown_text_filter import MarkdownTextFilter sys.path.append(str(Path(__file__).parent.parent)) from runner import configure @@ -132,6 +134,7 @@ async def record_activities(args: FlowArgs) -> ActivitiesResult: "name": "choose_beach", "description": "User wants to plan a beach vacation", "parameters": {"type": "object", "properties": {}}, + "transition_to": "choose_beach", }, }, { @@ -140,21 +143,16 @@ async def record_activities(args: FlowArgs) -> ActivitiesResult: "name": "choose_mountain", "description": "User wants to plan a mountain retreat", "parameters": {"type": "object", "properties": {}}, + "transition_to": "choose_mountain", }, }, ], - "pre_actions": [ - { - "type": "tts_say", - "text": "Welcome to Dream Vacations! I'll help you plan your perfect getaway.", - } - ], }, "choose_beach": { "messages": [ { "role": "system", - "content": "You are handling beach vacation planning. Use the available functions:\n - Use select_destination when the user chooses their preferred beach location\n - Use get_dates once they've selected a destination\n\nAvailable beach destinations are: 'Maui', 'Cancun', or 'Maldives'. After they choose, confirm their selection and proceed to dates. Be enthusiastic and paint a picture of each destination.", + "content": "You are handling beach vacation planning. Use the available functions:\n - Use select_destination when the user chooses their preferred beach location\n - After destination is selected, dates will be collected automatically\n\nAvailable beach destinations are: 'Maui', 'Cancun', or 'Maldives'. After they choose, confirm their selection. Be enthusiastic and paint a picture of each destination.", } ], "functions": [ @@ -175,26 +173,16 @@ async def record_activities(args: FlowArgs) -> ActivitiesResult: }, "required": ["destination"], }, + "transition_to": "get_dates", }, }, - { - "type": "function", - "function": { - "name": "get_dates", - "description": "Proceed to date selection", - "parameters": {"type": "object", "properties": {}}, - }, - }, - ], - "pre_actions": [ - {"type": "tts_say", "text": "Let's find your perfect beach paradise..."} ], }, "choose_mountain": { "messages": [ { "role": "system", - "content": "You are handling mountain retreat planning. Use the available functions:\n - Use select_destination when the user chooses their preferred mountain location\n - Use get_dates once they've selected a destination\n\nAvailable mountain destinations are: 'Swiss Alps', 'Rocky Mountains', or 'Himalayas'. After they choose, confirm their selection and proceed to dates. Be enthusiastic and paint a picture of each destination.", + "content": "You are handling mountain retreat planning. Use the available functions:\n - Use select_destination when the user chooses their preferred mountain location\n - After destination is selected, dates will be collected automatically\n\nAvailable mountain destinations are: 'Swiss Alps', 'Rocky Mountains', or 'Himalayas'. After they choose, confirm their selection. Be enthusiastic and paint a picture of each destination.", } ], "functions": [ @@ -215,26 +203,16 @@ async def record_activities(args: FlowArgs) -> ActivitiesResult: }, "required": ["destination"], }, + "transition_to": "get_dates", }, }, - { - "type": "function", - "function": { - "name": "get_dates", - "description": "Proceed to date selection", - "parameters": {"type": "object", "properties": {}}, - }, - }, - ], - "pre_actions": [ - {"type": "tts_say", "text": "Let's find your perfect mountain getaway..."} ], }, "get_dates": { "messages": [ { "role": "system", - "content": "Handle travel date selection. Use the available functions:\n - Use record_dates when the user specifies their travel dates (can be used multiple times if they change their mind)\n - Use get_activities once dates are confirmed\n\nAsk for their preferred travel dates within the next 6 months. After recording dates, confirm the selection and proceed to activities.", + "content": "Handle travel date selection. Use the available functions:\n - Use record_dates when the user specifies their travel dates (can be used multiple times if they change their mind)\n - After dates are recorded, activities will be collected automatically\n\nAsk for their preferred travel dates within the next 6 months. After recording dates, confirm the selection.", } ], "functions": [ @@ -260,14 +238,7 @@ async def record_activities(args: FlowArgs) -> ActivitiesResult: }, "required": ["check_in", "check_out"], }, - }, - }, - { - "type": "function", - "function": { - "name": "get_activities", - "description": "Proceed to activity selection", - "parameters": {"type": "object", "properties": {}}, + "transition_to": "get_activities", }, }, ], @@ -276,7 +247,7 @@ async def record_activities(args: FlowArgs) -> ActivitiesResult: "messages": [ { "role": "system", - "content": "Handle activity preferences. Use the available functions:\n - Use record_activities to save their activity preferences\n - Use verify_itinerary once activities are selected\n\nFor beach destinations, suggest: snorkeling, surfing, sunset cruise\nFor mountain destinations, suggest: hiking, skiing, mountain biking\n\nAfter they choose, confirm their selections and proceed to verification.", + "content": "Handle activity preferences. Use the available functions:\n - Use record_activities to save their activity preferences\n - After activities are recorded, verification will happen automatically\n\nFor beach destinations, suggest: snorkeling, surfing, sunset cruise\nFor mountain destinations, suggest: hiking, skiing, mountain biking\n\nAfter they choose, confirm their selections.", } ], "functions": [ @@ -299,14 +270,7 @@ async def record_activities(args: FlowArgs) -> ActivitiesResult: }, "required": ["activities"], }, - }, - }, - { - "type": "function", - "function": { - "name": "verify_itinerary", - "description": "Proceed to itinerary verification", - "parameters": {"type": "object", "properties": {}}, + "transition_to": "verify_itinerary", }, }, ], @@ -315,16 +279,17 @@ async def record_activities(args: FlowArgs) -> ActivitiesResult: "messages": [ { "role": "system", - "content": "Review the complete itinerary with the user. Summarize their destination, dates, and chosen activities. Use the available functions:\n - Use get_dates if they want to make changes\n - Use confirm_booking if they're happy with everything\n\nBe thorough in reviewing all details and ask for their confirmation.", + "content": "Review the complete itinerary with the user. Summarize their destination, dates, and chosen activities. Use revise_plan to make changes or confirm_booking if they're happy. Be thorough in reviewing all details and ask for their confirmation.", } ], "functions": [ { "type": "function", "function": { - "name": "get_dates", + "name": "revise_plan", "description": "Return to date selection to revise the plan", "parameters": {"type": "object", "properties": {}}, + "transition_to": "get_dates", }, }, { @@ -333,6 +298,7 @@ async def record_activities(args: FlowArgs) -> ActivitiesResult: "name": "confirm_booking", "description": "Confirm the booking and proceed to end", "parameters": {"type": "object", "properties": {}}, + "transition_to": "confirm_booking", }, }, ], @@ -351,6 +317,7 @@ async def record_activities(args: FlowArgs) -> ActivitiesResult: "name": "end", "description": "End the conversation", "parameters": {"type": "object", "properties": {}}, + "transition_to": "end", }, } ], @@ -390,7 +357,11 @@ async def main(): ) stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) - tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-helios-en") + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady + text_filter=MarkdownTextFilter(), + ) llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o") # Get initial tools from the first node @@ -400,7 +371,7 @@ async def main(): messages = [ { "role": "system", - "content": "You are a travel planning assistant. You must ALWAYS use one of the available functions to progress the conversation. This is a phone conversation and your responses will be converted to audio. Avoid outputting special characters and emojis.", + "content": "You are a travel planning assistant with Summit & Sand Getaways. You must ALWAYS use one of the available functions to progress the conversation. This is a phone conversation and your responses will be converted to audio. Avoid outputting special characters and emojis.", } ] diff --git a/pyproject.toml b/pyproject.toml index d43548e..641840c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "pipecat-ai-flows" -version = "0.0.6" +version = "0.0.7" description = "Conversation Flow management for Pipecat AI applications" license = { text = "BSD 2-Clause License" } readme = "README.md" diff --git a/src/pipecat_flows/__init__.py b/src/pipecat_flows/__init__.py index 0d926bc..d467a5a 100644 --- a/src/pipecat_flows/__init__.py +++ b/src/pipecat_flows/__init__.py @@ -30,7 +30,8 @@ async def collect_name(args: FlowArgs) -> FlowResult: "name": "collect_name", "handler": collect_name, "description": "...", - "parameters": {...} + "parameters": {...}, + "transition_to": "next_step" } }] } diff --git a/src/pipecat_flows/manager.py b/src/pipecat_flows/manager.py index 476bdf8..949e8ce 100644 --- a/src/pipecat_flows/manager.py +++ b/src/pipecat_flows/manager.py @@ -201,15 +201,32 @@ async def _call_handler(self, handler: Callable, args: FlowArgs) -> FlowResult: return await handler(args) return await handler() - async def _create_transition_func(self, name: str, handler: Optional[Callable]) -> Callable: + async def _handle_static_transition( + self, + function_name: str, + args: Dict[str, Any], + flow_manager: "FlowManager", + ) -> None: + """Handle transitions for static flows.""" + if function_name in self.nodes: + logger.debug(f"Static transition to node: {function_name}") + await self.set_node(function_name, self.nodes[function_name]) + + async def _create_transition_func( + self, name: str, handler: Optional[Callable], transition_to: Optional[str] + ) -> Callable: """Create a transition function for the given name and handler. + This method creates a function that will be called when the LLM invokes a tool. + It handles both data processing (via handlers) and state transitions. + Args: - name: Function name - handler: Optional handler for node functions + name: Name of the function being registered + handler: Optional function to process data (for node functions) + transition_to: Optional node to transition to after processing Returns: - Callable: Transition function that handles both node and edge functions + Callable: Async function that handles the tool invocation """ async def transition_func( @@ -220,21 +237,35 @@ async def transition_func( context: Any, result_callback: Callable, ) -> None: + """Inner function that handles the actual tool invocation. + + Args: + function_name: Name of the called function + tool_call_id: Unique identifier for this tool call + args: Arguments passed to the function + llm: LLM service instance + context: Current conversation context + result_callback: Function to call with results + """ try: if handler: - # Node function with handler + # Execute handler for node functions (e.g., data processing) result = await self._call_handler(handler, args) await result_callback(result) logger.debug(f"Handler completed for {name}") else: - # Edge function without handler + # Acknowledge edge functions without handlers await result_callback({"status": "acknowledged"}) logger.debug(f"Edge function called: {name}") - # Execute transition callback if provided - if self.transition_callback: - logger.debug(f"Executing transition for {name}") + # Handle transitions based on flow type + if transition_to and self.nodes: # Static flow with transition_to + logger.debug(f"Static transition via transition_to: {transition_to}") + await self._handle_static_transition(transition_to, args, self) + elif self.transition_callback and not self.nodes: # Dynamic flow + logger.debug(f"Executing dynamic transition for {name}") await self.transition_callback(function_name, args, self) + except Exception as e: logger.error(f"Error in transition function {name}: {str(e)}") error_result = {"status": "error", "error": str(e)} @@ -243,18 +274,18 @@ async def transition_func( return transition_func async def _register_function( - self, name: str, handler: Optional[Callable], new_functions: Set[str] + self, + name: str, + handler: Optional[Callable], + transition_to: Optional[str], + new_functions: Set[str], ) -> None: - """Register a function with the LLM if not already registered. - - Args: - name: Function name - handler: Optional function handler - new_functions: Set to track newly registered functions - """ + """Register a function with the LLM if not already registered.""" if name not in self.current_functions: try: - self.llm.register_function(name, await self._create_transition_func(name, handler)) + self.llm.register_function( + name, await self._create_transition_func(name, handler, transition_to) + ) new_functions.add(name) logger.debug(f"Registered function: {name}") except Exception as e: @@ -276,50 +307,19 @@ def _remove_handlers(self, tool_config: Dict[str, Any]) -> None: if "handler" in decl: del decl["handler"] - async def set_node(self, node_id: str, node_config: NodeConfig) -> None: - """Set up a new conversation node and transition to it. - - This method handles the complete node transition process: - 1. Validates the node configuration - 2. Executes pre-transition actions - 3. Registers node functions with the LLM - 4. Updates the LLM context with new messages - 5. Executes post-transition actions - 6. Updates internal state tracking - - Args: - node_id: Unique identifier for the new node - node_config: Complete node configuration including: - messages (List[dict]): LLM context messages - functions (List[dict]): Available functions for this node - pre_actions (Optional[List[dict]]): Actions to execute before transition - post_actions (Optional[List[dict]]): Actions to execute after transition - - Example: - node_config = { - "messages": [ - {"role": "system", "content": "You are collecting user info"} - ], - "functions": [{ - "type": "function", - "function": { - "name": "save_info", - "handler": save_handler, - "description": "Save user information", - "parameters": {...} - } - }], - "pre_actions": [ - {"type": "tts_say", "text": "Processing..."} - ] - } - await flow_manager.set_node("collect_info", node_config) + def _remove_transition_info(self, tool_config: Dict[str, Any]) -> None: + """Remove transition information from tool configuration.""" + if "function" in tool_config and "transition_to" in tool_config["function"]: + del tool_config["function"]["transition_to"] + elif "transition_to" in tool_config: + del tool_config["transition_to"] + elif "function_declarations" in tool_config: + for decl in tool_config["function_declarations"]: + if "transition_to" in decl: + del decl["transition_to"] - Raises: - FlowError: If node setup or transition fails - FlowTransitionError: If manager isn't initialized - ValueError: If node configuration is invalid - """ + async def set_node(self, node_id: str, node_config: NodeConfig) -> None: + """Set up a new conversation node and transition to it.""" if not self.initialized: raise FlowTransitionError(f"{self.__class__.__name__} must be initialized first") @@ -339,23 +339,27 @@ async def set_node(self, node_id: str, node_config: NodeConfig) -> None: for declaration in func_config["function_declarations"]: name = declaration["name"] handler = declaration.get("handler") + transition_to = declaration.get("transition_to") logger.debug(f"Processing function: {name}") - await self._register_function(name, handler, new_functions) + await self._register_function(name, handler, transition_to, new_functions) else: name = self.adapter.get_function_name(func_config) logger.debug(f"Processing function: {name}") - handler = None + # Extract handler and transition info based on format if "function" in func_config: handler = func_config["function"].get("handler") - elif "handler" in func_config: + transition_to = func_config["function"].get("transition_to") + else: handler = func_config.get("handler") + transition_to = func_config.get("transition_to") - await self._register_function(name, handler, new_functions) + await self._register_function(name, handler, transition_to, new_functions) - # Create tool config + # Create tool config (after removing handler and transition_to) tool_config = copy.deepcopy(func_config) self._remove_handlers(tool_config) + self._remove_transition_info(tool_config) tools.append(tool_config) # Let adapter format tools for provider @@ -379,26 +383,6 @@ async def set_node(self, node_id: str, node_config: NodeConfig) -> None: logger.error(f"Error setting node {node_id}: {str(e)}") raise FlowError(f"Failed to set node {node_id}: {str(e)}") from e - async def _handle_static_transition( - self, - function_name: str, - args: Dict[str, Any], - flow_manager: "FlowManager", - ) -> None: - """Handle transitions for static flows. - - In static flows, transitions occur when a function name matches - a node name in the configuration. - - Args: - function_name: Name of the called function - args: Arguments passed to the function - flow_manager: Reference to this instance - """ - if function_name in self.nodes: - logger.debug(f"Static transition to node: {function_name}") - await self.set_node(function_name, self.nodes[function_name]) - async def _update_llm_context(self, messages: List[dict], functions: List[dict]) -> None: """Update LLM context with new messages and functions. @@ -429,7 +413,23 @@ async def _execute_actions( await self.action_manager.execute_actions(post_actions) def _validate_node_config(self, node_id: str, config: NodeConfig) -> None: - """Validate node configuration structure.""" + """Validate the configuration of a conversation node. + + This method ensures that: + 1. Required fields (messages, functions) are present + 2. Functions have valid configurations based on their type: + - Node functions must have either a handler or transition_to + - Edge functions (matching node names) are allowed without handlers + 3. Function configurations match the LLM provider's format + + Args: + node_id: Identifier for the node being validated + config: Complete node configuration to validate + + Raises: + ValueError: If configuration is invalid + """ + # Check required fields if "messages" not in config: raise ValueError(f"Node '{node_id}' missing required 'messages' field") if "functions" not in config: @@ -437,23 +437,40 @@ def _validate_node_config(self, node_id: str, config: NodeConfig) -> None: # Validate each function configuration for func in config["functions"]: - # Get function name based on provider format try: name = self.adapter.get_function_name(func) except KeyError: raise ValueError(f"Function in node '{node_id}' missing name field") - # Node functions (not matching node names) require handlers - if name not in self.nodes: - # Check for handler in all formats - has_handler = ( - ("function" in func and "handler" in func["function"]) # OpenAI format - or "handler" in func # Anthropic format - or ( # Gemini format - "function_declarations" in func - and func["function_declarations"] - and "handler" in func["function_declarations"][0] - ) + # Skip validation for edge functions (matching node names) + if name in self.nodes: + continue + + # Check for handler in provider-specific formats + has_handler = ( + ("function" in func and "handler" in func["function"]) # OpenAI format + or "handler" in func # Anthropic format + or ( # Gemini format + "function_declarations" in func + and func["function_declarations"] + and "handler" in func["function_declarations"][0] + ) + ) + + # Check for transition_to in provider-specific formats + has_transition = ( + ("function" in func and "transition_to" in func["function"]) + or "transition_to" in func + or ( + "function_declarations" in func + and func["function_declarations"] + and "transition_to" in func["function_declarations"][0] + ) + ) + + # Warn if function has neither handler nor transition_to + # End nodes may have neither, so just warn rather than error + if not has_handler and not has_transition: + logger.warning( + f"Function '{name}' in node '{node_id}' has neither handler nor transition_to" ) - if not has_handler: - raise ValueError(f"Node function '{name}' in node '{node_id}' missing handler") diff --git a/src/pipecat_flows/types.py b/src/pipecat_flows/types.py index cd90c32..8246eb6 100644 --- a/src/pipecat_flows/types.py +++ b/src/pipecat_flows/types.py @@ -34,14 +34,23 @@ class FlowResult(TypedDict, total=False): """ -class NodeConfig(TypedDict, total=False): +class NodeConfigRequired(TypedDict): + """Required fields for node configuration.""" + + messages: List[dict] + functions: List[dict] + + +class NodeConfig(NodeConfigRequired, total=False): """Configuration for a single node in the flow. - Attributes: + Required fields: messages: List of message dicts in provider-specific format functions: List of function definitions in provider-specific format - pre_actions: Optional list of actions to execute before LLM inference - post_actions: Optional list of actions to execute after LLM inference + + Optional fields: + pre_actions: Actions to execute before LLM inference + post_actions: Actions to execute after LLM inference Example: { @@ -77,8 +86,6 @@ class NodeConfig(TypedDict, total=False): } """ - messages: List[dict] - functions: List[dict] pre_actions: List[Dict[str, Any]] post_actions: List[Dict[str, Any]] diff --git a/tests/test_manager.py b/tests/test_manager.py index 7318455..6b1f6c5 100644 --- a/tests/test_manager.py +++ b/tests/test_manager.py @@ -282,11 +282,12 @@ async def test_transition_func_error_handling(self): flow_manager = FlowManager(self.mock_task, self.mock_llm) await flow_manager.initialize([]) - # Test handler that raises an error async def error_handler(args): raise ValueError("Test error") - transition_func = await flow_manager._create_transition_func("test", error_handler) + transition_func = await flow_manager._create_transition_func( + "test", error_handler, transition_to=None + ) # Mock result callback callback_called = False @@ -316,23 +317,35 @@ async def test_node_validation_edge_cases(self): await flow_manager.set_node("test", invalid_config) self.assertIn("missing name field", str(context.exception)) - # Test node function without handler + # Test node function without handler or transition_to invalid_config = { "messages": [], "functions": [ { "type": "function", "function": { - "name": "test_func", # Not a node name, should require handler + "name": "test_func", "description": "Test", "parameters": {}, }, } ], } - with self.assertRaises(FlowError) as context: + + # Mock loguru.logger.warning to capture the warning + warning_message = None + + def capture_warning(msg, *args, **kwargs): + nonlocal warning_message + warning_message = msg + + with patch("loguru.logger.warning", side_effect=capture_warning): await flow_manager.set_node("test", invalid_config) - self.assertIn("missing handler", str(context.exception)) + self.assertIsNotNone(warning_message) + self.assertIn( + "Function 'test_func' in node 'test' has neither handler nor transition_to", + warning_message, + ) async def test_pre_post_actions(self): """Test pre and post actions in set_node.""" @@ -364,13 +377,13 @@ async def failing_transition(function_name, args, flow_manager): ) await flow_manager.initialize([]) - # Create and execute transition function - transition_func = await flow_manager._create_transition_func("test", None) + transition_func = await flow_manager._create_transition_func( + "test", None, transition_to=None + ) async def result_callback(result): pass - # Should not raise error even if transition callback fails await transition_func("test", "id", {}, None, None, result_callback) async def test_register_function_error_handling(self): @@ -381,8 +394,9 @@ async def test_register_function_error_handling(self): # Mock LLM to raise error on register_function flow_manager.llm.register_function.side_effect = Exception("Registration error") + new_functions = set() with self.assertRaises(FlowError): - await flow_manager._register_function("test", None, set()) + await flow_manager._register_function("test", None, None, new_functions) async def test_action_execution_error_handling(self): """Test error handling in action execution.""" @@ -426,7 +440,6 @@ async def test_handler_callback_completion(self): flow_manager = FlowManager(self.mock_task, self.mock_llm) await flow_manager.initialize([]) - # Create a handler that returns a result async def test_handler(args): return {"status": "success", "data": "test"} @@ -437,8 +450,9 @@ async def result_callback(result): callback_called = True self.assertEqual(result["status"], "success") - # Create and execute transition function - transition_func = await flow_manager._create_transition_func("test", test_handler) + transition_func = await flow_manager._create_transition_func( + "test", test_handler, transition_to=None + ) await transition_func("test", "id", {}, None, None, result_callback) self.assertTrue(callback_called)