Skip to content

Commit

Permalink
Merge pull request #58 from pipecat-ai/mb/types
Browse files Browse the repository at this point in the history
Make FlowManager inputs more explicit, docstrings linting
  • Loading branch information
markbackman authored Dec 7, 2024
2 parents 8816ed0 + ba350b2 commit 38ed003
Show file tree
Hide file tree
Showing 22 changed files with 444 additions and 77 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ All notable changes to **Pipecat Flows** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.0.8] - 2024-12-07

### Changed

- Improved type safety in FlowManager by requiring keyword arguments for initialization
- Enhanced error messages for LLM service type validation

## [0.0.7] - 2024-12-06

### Added
Expand Down
4 changes: 3 additions & 1 deletion examples/dynamic/insurance_anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,9 @@ async def main():
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))

# Initialize flow manager with transition callback
flow_manager = FlowManager(task, llm, tts, transition_callback=handle_insurance_transition)
flow_manager = FlowManager(
task=task, llm=llm, tts=tts, transition_callback=handle_insurance_transition
)

@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
Expand Down
4 changes: 3 additions & 1 deletion examples/dynamic/insurance_gemini.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,9 @@ async def main():
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))

# Initialize flow manager with transition callback
flow_manager = FlowManager(task, llm, tts, transition_callback=handle_insurance_transition)
flow_manager = FlowManager(
task=task, llm=llm, tts=tts, transition_callback=handle_insurance_transition
)

@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
Expand Down
4 changes: 3 additions & 1 deletion examples/dynamic/insurance_openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,9 @@ async def main():
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))

# Initialize flow manager with transition callback
flow_manager = FlowManager(task, llm, tts, transition_callback=handle_insurance_transition)
flow_manager = FlowManager(
task=task, llm=llm, tts=tts, transition_callback=handle_insurance_transition
)

@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
Expand Down
2 changes: 1 addition & 1 deletion examples/static/food_ordering.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ async def main():
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))

# Initialize flow manager in static mode
flow_manager = FlowManager(task, llm, tts, flow_config=flow_config)
flow_manager = FlowManager(task=task, llm=llm, tts=tts, flow_config=flow_config)

@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
Expand Down
2 changes: 1 addition & 1 deletion examples/static/movie_explorer_anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ async def main():
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))

# Initialize flow manager
flow_manager = FlowManager(task, llm, tts, flow_config)
flow_manager = FlowManager(task=task, llm=llm, tts=tts, flow_config=flow_config)

@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
Expand Down
2 changes: 1 addition & 1 deletion examples/static/movie_explorer_gemini.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ async def main():
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))

# Initialize flow manager
flow_manager = FlowManager(task, llm, tts, flow_config)
flow_manager = FlowManager(task=task, llm=llm, tts=tts, flow_config=flow_config)

@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
Expand Down
2 changes: 1 addition & 1 deletion examples/static/movie_explorer_openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ async def main():
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))

# Initialize flow manager
flow_manager = FlowManager(task, llm, tts, flow_config)
flow_manager = FlowManager(task=task, llm=llm, tts=tts, flow_config=flow_config)

@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
Expand Down
2 changes: 1 addition & 1 deletion examples/static/patient_intake.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ async def main():
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))

# Initialize flow manager with LLM
flow_manager = FlowManager(task, llm, tts, flow_config)
flow_manager = FlowManager(task=task, llm=llm, tts=tts, flow_config=flow_config)

@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
Expand Down
2 changes: 1 addition & 1 deletion examples/static/restaurant_reservation.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ async def main():
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))

# Initialize flow manager with LLM
flow_manager = FlowManager(task, llm, tts, flow_config)
flow_manager = FlowManager(task=task, llm=llm, tts=tts, flow_config=flow_config)

@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
Expand Down
2 changes: 1 addition & 1 deletion examples/static/travel_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ async def main():
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))

# Initialize flow manager with LLM
flow_manager = FlowManager(task, llm, tts, flow_config)
flow_manager = FlowManager(task=task, llm=llm, tts=tts, flow_config=flow_config)

@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
Expand Down
15 changes: 13 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "pipecat-ai-flows"
version = "0.0.7"
version = "0.0.8"
description = "Conversation Flow management for Pipecat AI applications"
license = { text = "BSD 2-Clause License" }
readme = "README.md"
Expand Down Expand Up @@ -33,4 +33,15 @@ testpaths = ["tests"]
asyncio_mode = "auto"

[tool.ruff]
line-length = 100
line-length = 100

select = [
"D", # Docstring rules
]
ignore = ["D212"]
exclude = [
"examples"
]

[tool.ruff.pydocstyle]
convention = "google"
2 changes: 1 addition & 1 deletion src/pipecat_flows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
"""
Pipecat Flows
Pipecat Flows.
This package provides a framework for building structured conversations in Pipecat.
The FlowManager can handle both static and dynamic conversation flows:
Expand Down
17 changes: 17 additions & 0 deletions src/pipecat_flows/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,23 @@
# SPDX-License-Identifier: BSD 2-Clause License
#

"""Action management system for conversation flows.
This module provides the ActionManager class which handles execution of actions
during conversation state transitions. It supports:
- Built-in actions (TTS, conversation ending)
- Custom action registration
- Synchronous and asynchronous handlers
- Pre and post-transition actions
- Error handling and validation
Actions are used to perform side effects during conversations, such as:
- Text-to-speech output
- Database updates
- External API calls
- Custom integrations
"""

import asyncio
from typing import Any, Callable, Dict, List, Optional

Expand Down
149 changes: 139 additions & 10 deletions src/pipecat_flows/adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,20 @@
# SPDX-License-Identifier: BSD 2-Clause License
#

"""LLM provider adapters for normalizing function and message formats.
This module provides adapters that normalize interactions between different
LLM providers (OpenAI, Anthropic, Gemini). It handles:
- Function name extraction
- Argument parsing
- Message content formatting
- Provider-specific schema conversion
The adapter system allows the flow manager to work with different LLM
providers while maintaining a consistent internal format (based on OpenAI's
function calling convention).
"""

from abc import ABC, abstractmethod
from typing import Any, Dict, List

Expand Down Expand Up @@ -48,36 +62,112 @@ def format_functions(self, functions: List[Dict[str, Any]]) -> List[Dict[str, An


class OpenAIAdapter(LLMAdapter):
"""Format adapter for OpenAI."""
"""Format adapter for OpenAI.
Handles OpenAI's function calling format, which is used as the default format
in the flow system.
"""

def get_function_name(self, function_def: Dict[str, Any]) -> str:
"""Extract function name from OpenAI function definition.
Args:
function_def: OpenAI-formatted function definition dictionary
Returns:
Function name from the definition
"""
return function_def["function"]["name"]

def get_function_args(self, function_call: Dict[str, Any]) -> dict:
"""Extract arguments from OpenAI function call.
Args:
function_call: OpenAI-formatted function call dictionary
Returns:
Dictionary of function arguments, empty if none provided
"""
return function_call.get("arguments", {})

def get_message_content(self, message: Dict[str, Any]) -> str:
"""Extract content from OpenAI message format.
Args:
message: OpenAI-formatted message dictionary
Returns:
Message content as string
"""
return message["content"]

def format_functions(self, functions: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
return functions # OpenAI format is our default
"""Format functions for OpenAI use.
Args:
functions: List of function definitions
Returns:
Functions in OpenAI format (unchanged as this is our default format)
"""
return functions


class AnthropicAdapter(LLMAdapter):
"""Format adapter for Anthropic."""
"""Format adapter for Anthropic.
Handles Anthropic's native function format, converting between OpenAI's format
and Anthropic's as needed.
"""

def get_function_name(self, function_def: Dict[str, Any]) -> str:
"""Extract function name from Anthropic function definition.
Args:
function_def: Anthropic-formatted function definition dictionary
Returns:
Function name from the definition
"""
return function_def["name"]

def get_function_args(self, function_call: Dict[str, Any]) -> dict:
"""Extract arguments from Anthropic function call.
Args:
function_call: Anthropic-formatted function call dictionary
Returns:
Dictionary of function arguments, empty if none provided
"""
return function_call.get("arguments", {})

def get_message_content(self, message: Dict[str, Any]) -> str:
"""Extract content from Anthropic message format.
Handles both string content and structured content arrays.
Args:
message: Anthropic-formatted message dictionary
Returns:
Message content as string, concatenated if from multiple parts
"""
if isinstance(message.get("content"), list):
return " ".join(item["text"] for item in message["content"] if item["type"] == "text")
return message.get("content", "")

def format_functions(self, functions: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Format functions for Anthropic use.
Converts from OpenAI format to Anthropic's native function format if needed.
Args:
functions: List of function definitions in OpenAI format
Returns:
Functions converted to Anthropic's format
"""
formatted = []
for func in functions:
if "function" in func:
Expand All @@ -96,28 +186,61 @@ def format_functions(self, functions: List[Dict[str, Any]]) -> List[Dict[str, An


class GeminiAdapter(LLMAdapter):
"""Format adapter for Google's Gemini."""
"""Format adapter for Google's Gemini.
Handles Gemini's function declarations format, converting between OpenAI's format
and Gemini's as needed.
"""

def get_function_name(self, function_def: Dict[str, Any]) -> str:
"""Extract function name from provider-specific function definition."""
"""Extract function name from Gemini function definition.
Args:
function_def: Gemini-formatted function definition dictionary
Returns:
Function name from the first declaration, or empty string if none found
"""
logger.debug(f"Getting function name from: {function_def}")
if "function_declarations" in function_def:
declarations = function_def["function_declarations"]
if declarations and isinstance(declarations, list):
# Return name of current function being processed
return declarations[0]["name"]
return ""

def get_function_args(self, function_call: Dict[str, Any]) -> dict:
"""Extract function arguments from provider-specific function call."""
"""Extract arguments from Gemini function call.
Args:
function_call: Gemini-formatted function call dictionary
Returns:
Dictionary of function arguments, empty if none provided
"""
return function_call.get("args", {})

def get_message_content(self, message: Dict[str, Any]) -> str:
"""Extract message content from provider-specific format."""
"""Extract content from Gemini message format.
Args:
message: Gemini-formatted message dictionary
Returns:
Message content as string
"""
return message["content"]

def format_functions(self, functions: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Format functions for provider-specific use."""
"""Format functions for Gemini use.
Converts from OpenAI format to Gemini's function declarations format.
Args:
functions: List of function definitions in OpenAI format
Returns:
Functions converted to Gemini's format with declarations wrapper
"""
all_declarations = []
for func in functions:
if "function_declarations" in func:
Expand Down Expand Up @@ -158,4 +281,10 @@ def create_adapter(llm) -> LLMAdapter:
return AnthropicAdapter()
elif isinstance(llm, GoogleLLMService):
return GeminiAdapter()
raise ValueError(f"Unsupported LLM type: {type(llm)}")
raise ValueError(
f"Unsupported LLM type: {type(llm)}\n"
"Must provide one of:\n"
"- OpenAILLMService\n"
"- AnthropicLLMService\n"
"- GoogleLLMService"
)
Loading

0 comments on commit 38ed003

Please sign in to comment.