diff --git a/autogen/agentchat/conversable_agent.py b/autogen/agentchat/conversable_agent.py index 59e7fd21cc9f..1a3ec3bae373 100644 --- a/autogen/agentchat/conversable_agent.py +++ b/autogen/agentchat/conversable_agent.py @@ -9,7 +9,15 @@ from typing import Any, Awaitable, Callable, Dict, List, Literal, Optional, Tuple, Type, TypeVar, Union from .. import OpenAIWrapper -from ..code_utils import DEFAULT_MODEL, UNKNOWN, content_str, execute_code, extract_code, infer_lang +from ..code_utils import ( + DEFAULT_MODEL, + UNKNOWN, + content_str, + execute_code, + execute_code_from_work_dir, + extract_code, + infer_lang, +) from ..function_utils import get_function_schema, load_basemodels_if_needed, serialize_to_str from .agent import Agent from .._pydantic import model_dump @@ -1469,16 +1477,16 @@ def execute_function(self, func_call, verbose: bool = False) -> Tuple[bool, Dict func_name = func_call.get("name", "") func = self._function_map.get(func_name, None) + input_string = self._format_json_str(func_call.get("arguments", "{}")) + try: + arguments = json.loads(input_string) + except json.JSONDecodeError as e: + arguments = None + content = f"Error: {e}\n You argument should follow json format." + is_exec_success = False if func is not None: # Extract arguments from a json-like string and put it into a dict. - input_string = self._format_json_str(func_call.get("arguments", "{}")) - try: - arguments = json.loads(input_string) - except json.JSONDecodeError as e: - arguments = None - content = f"Error: {e}\n You argument should follow json format." - # Try to execute the function if arguments is not None: print( @@ -1490,8 +1498,20 @@ def execute_function(self, func_call, verbose: bool = False) -> Tuple[bool, Dict is_exec_success = True except Exception as e: content = f"Error: {e}" + elif func_name == "python": + exitcode, content = self.execute_code_blocks([("python", func_call.get("arguments", "{}"))]) + if exitcode != 0: + is_exec_success = False else: - content = f"Error: Function {func_name} not found." + try: + content = execute_code_from_work_dir( + self._code_execution_config["work_dir"] + "/skills.py", func_name, arguments + ) + is_exec_success = True + except KeyError: + content = f"Error: Function {func_name} not found." + except Exception as e: + content = f"Error: {e}" if verbose: print( diff --git a/autogen/code_utils.py b/autogen/code_utils.py index d8a936704a62..caf1d85fc5a4 100644 --- a/autogen/code_utils.py +++ b/autogen/code_utils.py @@ -8,7 +8,7 @@ import time from concurrent.futures import ThreadPoolExecutor, TimeoutError from hashlib import md5 -from typing import Callable, Dict, List, Optional, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Tuple, Union from autogen import oai @@ -636,3 +636,12 @@ def implement( # cost += metrics["gen_cost"] # if metrics["succeed_assertions"] or i == len(configs) - 1: # return responses[metrics["index_selected"]], cost, i + + +def execute_code_from_work_dir(file_path: str, func_name: str, arguments: Dict[str, Any]) -> Any: + with open(file_path, "r") as file: + code = compile(file.read(), file_path, "exec") + exec(code, globals()) + + func = globals()[func_name] + return func(**arguments) diff --git a/samples/apps/autogen-studio/autogenstudio/workflowmanager.py b/samples/apps/autogen-studio/autogenstudio/workflowmanager.py index a28bc9a7dd34..1d5939f18543 100644 --- a/samples/apps/autogen-studio/autogenstudio/workflowmanager.py +++ b/samples/apps/autogen-studio/autogenstudio/workflowmanager.py @@ -102,7 +102,8 @@ def sanitize_agent_spec(self, agent_spec: AgentFlowSpec) -> AgentFlowSpec: """ agent_spec.config.is_termination_msg = agent_spec.config.is_termination_msg or ( - lambda x: "TERMINATE" in x.get("content", "").rstrip() + lambda x: x.get("content", "") + and "TERMINATE" in x.get("content", "").rstrip() ) skills_prompt = "" if agent_spec.skills: @@ -139,10 +140,18 @@ def load(self, agent_spec: AgentFlowSpec) -> autogen.Agent: agent_spec = self.sanitize_agent_spec(agent_spec) if agent_spec.type == "assistant": agent = autogen.AssistantAgent(**asdict(agent_spec.config)) - agent.register_reply([autogen.Agent, None], reply_func=self.process_reply, config={"callback": None}) + agent.register_reply( + [autogen.Agent, None], + reply_func=self.process_reply, + config={"callback": None}, + ) elif agent_spec.type == "userproxy": agent = autogen.UserProxyAgent(**asdict(agent_spec.config)) - agent.register_reply([autogen.Agent, None], reply_func=self.process_reply, config={"callback": None}) + agent.register_reply( + [autogen.Agent, None], + reply_func=self.process_reply, + config={"callback": None}, + ) else: raise ValueError(f"Unknown agent type: {agent_spec.type}") return agent diff --git a/samples/apps/fireworks-studio/.gitignore b/samples/apps/fireworks-studio/.gitignore new file mode 100644 index 000000000000..b43ab02b5e27 --- /dev/null +++ b/samples/apps/fireworks-studio/.gitignore @@ -0,0 +1,25 @@ +database.sqlite +.cache/* +firestudio/web/files/user/* +firestudio/web/files/ui/* +OAI_CONFIG_LIST +scratch/ +firestudio/web/workdir/* +firestudio/web/ui/* +firestudio/web/skills/user/* +.release.sh + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ +.virtual_documents/* diff --git a/samples/apps/fireworks-studio/.virtual_documents/notebooks/tutorial.ipynb b/samples/apps/fireworks-studio/.virtual_documents/notebooks/tutorial.ipynb new file mode 100644 index 000000000000..efc6d53440fe --- /dev/null +++ b/samples/apps/fireworks-studio/.virtual_documents/notebooks/tutorial.ipynb @@ -0,0 +1,28 @@ + + + +import json + +from firestudio import AgentWorkFlowConfig, AutoGenWorkFlowManager +import uuid +import os + +work_dir = f"/tmp/{uuid.uuid4()}" +if not os.path.exists(work_dir): + os.makedirs(work_dir) + +# load an agent specification in JSON +agent_spec = json.load(open("agent_spec.json")) + +# Creat a An AutoGen Workflow Configuration from the agent specification +agent_work_flow_config = AgentWorkFlowConfig(**agent_spec) + +# Create a Workflow from the configuration +agent_work_flow = AutoGenWorkFlowManager(agent_work_flow_config, work_dir=work_dir) + +# Run the workflow on a task +task_query = "Compare the stock price of nvidia and supermicro over the past 1 year and plot a chart with their prices." +agent_work_flow.run(message=task_query, clear_history=True) + + + diff --git a/samples/apps/fireworks-studio/MANIFEST.in b/samples/apps/fireworks-studio/MANIFEST.in new file mode 100644 index 000000000000..3f995a446473 --- /dev/null +++ b/samples/apps/fireworks-studio/MANIFEST.in @@ -0,0 +1,5 @@ +recursive-include firestudio/web/ui * +recursive-exclude notebooks * +recursive-exclude frontend * +recursive-exclude docs * +recursive-exclude tests * diff --git a/samples/apps/fireworks-studio/README.md b/samples/apps/fireworks-studio/README.md new file mode 100644 index 000000000000..e6c0b2139c65 --- /dev/null +++ b/samples/apps/fireworks-studio/README.md @@ -0,0 +1,124 @@ +# AutoGen Studio +[![PyPI version](https://badge.fury.io/py/autogenstudio.svg)](https://badge.fury.io/py/autogenstudio) +[![Downloads](https://static.pepy.tech/badge/autogenstudio/week)](https://pepy.tech/project/autogenstudio) + +![ARA](./docs/ara_stockprices.png) + +AutoGen Studio is an AutoGen-powered AI app (user interface) to help you rapidly prototype AI agents, enhance them with skills, compose them into workflows and interact with them to accomplish tasks. It is built on top of the [AutoGen](https://microsoft.github.io/autogen) framework, which is a toolkit for building AI agents. + +Code for AutoGen Studio is on GitHub at [microsoft/autogen](https://github.com/microsoft/autogen/tree/main/samples/apps/autogen-studio) + +> **Note**: AutoGen Studio is meant to help you rapidly prototype multi-agent workflows and demonstrate an example of end user interfaces built with AutoGen. It is not meant to be a production-ready app. + +### Capabilities / Roadmap + +Some of the capabilities supported by the app frontend include the following: + +- [x] Build / Configure agents (currently supports two agent workflows based on `UserProxyAgent` and `AssistantAgent`), modify their configuration (e.g. skills, temperature, model, agent system message, model etc) and compose them into workflows. +- [x] Chat with agent works and specify tasks. +- [x] View agent messages and output files in the UI from agent runs. +- [x] Add interaction sessions to a gallery. +- [ ] Support for more complex agent workflows (e.g. `GroupChat` workflows). +- [ ] Improved user experience (e.g., streaming intermediate model output, better summarization of agent responses, etc). + +Project Structure: + +- _autogenstudio/_ code for the backend classes and web api (FastAPI) +- _frontend/_ code for the webui, built with Gatsby and TailwindCSS + +### Installation + +1. **Install from PyPi** + + We recommend using a virtual environment (e.g., conda) to avoid conflicts with existing Python packages. With Python 3.10 or newer active in your virtual environment, use pip to install AutoGen Studio: + + ```bash + pip install autogenstudio + ``` + +2. **Install from Source** + + > Note: This approach requires some familiarity with building interfaces in React. + + If you prefer to install from source, ensure you have Python 3.10+ and Node.js (version above 14.15.0) installed. Here's how you get started: + + - Clone the AutoGen Studio repository and install its Python dependencies: + + ```bash + pip install -e . + ``` + + - Navigate to the `samples/apps/autogen-studio/frontend` directory, install dependencies, and build the UI: + + ```bash + npm install -g gatsby-cli + npm install --global yarn + cd frontend + yarn install + yarn build + ``` + +For Windows users, to build the frontend, you may need alternative commands to build the frontend. + +```bash + + gatsby clean && rmdir /s /q ..\\autogenstudio\\web\\ui && (set \"PREFIX_PATH_VALUE=\" || ver>nul) && gatsby build --prefix-paths && xcopy /E /I /Y public ..\\autogenstudio\\web\\ui + +``` + +### Running the Application + +Once installed, run the web UI by entering the following in your terminal: + +```bash +autogenstudio ui --port 8081 +``` + +This will start the application on the specified port. Open your web browser and go to `http://localhost:8081/` to begin using AutoGen Studio. + +Now that you have AutoGen Studio installed and running, you are ready to explore its capabilities, including defining and modifying agent workflows, interacting with agents and sessions, and expanding agent skills. + +## Capabilities + +AutoGen Studio proposes some high-level concepts. + +**Agent Workflow**: An agent workflow is a specification of a set of agents that can work together to accomplish a task. The simplest version of this is a setup with two agents – a user proxy agent (that represents a user i.e. it compiles code and prints result) and an assistant that can address task requests (e.g., generating plans, writing code, evaluating responses, proposing error recovery steps, etc.). A more complex flow could be a group chat where even more agents work towards a solution. + +**Session**: A session refers to a period of continuous interaction or engagement with an agent workflow, typically characterized by a sequence of activities or operations aimed at achieving specific objectives. It includes the agent workflow configuration, the interactions between the user and the agents. A session can be “published” to a “gallery”. + +**Skills**: Skills are functions (e.g., Python functions) that describe how to solve a task. In general, a good skill has a descriptive name (e.g. `generate_images`), extensive docstrings and good defaults (e.g., writing out files to disk for persistence and reuse). You can add new skills AutoGen Studio app via the provided UI. At inference time, these skills are made available to the assistant agent as they address your tasks. + +AutoGen Studio comes with 3 example skills: `fetch_profile`, `find_papers`, `generate_images`. The default skills, agents and workflows are based on the [dbdefaults.json](autogentstudio/utils/dbdefaults.json) file which is used to initialize the database. + +## Example Usage + +Consider the following query. + +``` +Plot a chart of NVDA and TESLA stock price YTD. Save the result to a file named nvda_tesla.png +``` + +The agent workflow responds by _writing and executing code_ to create a python program to generate the chart with the stock prices. + +> Note than there could be multiple turns between the `AssistantAgent` and the `UserProxyAgent` to produce and execute the code in order to complete the task. + +![ARA](./docs/ara_stockprices.png) + +> Note: You can also view the debug console that generates useful information to see how the agents are interacting in the background. + + + +## FAQ + +**Q: Where can I adjust the default skills, agent and workflow configurations?** +A: You can modify agent configurations directly from the UI or by editing the [dbdefaults.json](autogentstudio/utils/dbdefaults.json) file which is used to initialize the database. + +**Q: If I want to reset the entire conversation with an agent, how do I go about it?** +A: To reset your conversation history, you can delete the `database.sqlite` file. If you need to clear user-specific data, remove the relevant `autogenstudio/web/files/user/` folder. + +**Q: Is it possible to view the output and messages generated by the agents during interactions?** +A: Yes, you can view the generated messages in the debug console of the web UI, providing insights into the agent interactions. Alternatively, you can inspect the `database.sqlite` file for a comprehensive record of messages. + +## Acknowledgements + +AutoGen Studio is Based on the [AutoGen](https://microsoft.github.io/autogen) project. It was adapted from a research prototype built in October 2023 (original credits: Gagan Bansal, Adam Fourney, Victor Dibia, Piali Choudhury, Saleema Amershi, Ahmed Awadallah, Chi Wang). diff --git a/samples/apps/fireworks-studio/docs/ara_stockprices.png b/samples/apps/fireworks-studio/docs/ara_stockprices.png new file mode 100644 index 000000000000..fafb830ef1a1 Binary files /dev/null and b/samples/apps/fireworks-studio/docs/ara_stockprices.png differ diff --git a/samples/apps/fireworks-studio/firestudio/__init__.py b/samples/apps/fireworks-studio/firestudio/__init__.py new file mode 100644 index 000000000000..784328a73bd5 --- /dev/null +++ b/samples/apps/fireworks-studio/firestudio/__init__.py @@ -0,0 +1,3 @@ +from .chatmanager import * +from .workflowmanager import * +from .datamodel import * diff --git a/samples/apps/fireworks-studio/firestudio/chatmanager.py b/samples/apps/fireworks-studio/firestudio/chatmanager.py new file mode 100644 index 000000000000..c263fd719978 --- /dev/null +++ b/samples/apps/fireworks-studio/firestudio/chatmanager.py @@ -0,0 +1,107 @@ +from copy import deepcopy +import json +import time +from typing import Any, Dict, List +from .datamodel import AgentWorkFlowConfig, Message +from .utils import ( + extract_successful_code_blocks, + get_default_agent_config, + get_modified_files, +) +from .workflowmanager import AutoGenWorkFlowManager +import os + + +class AutoGenChatManager: + def __init__(self) -> None: + pass + + def chat( + self, + message: Message, + history_list: List[Message], + agent_flow_config: AgentWorkFlowConfig = None, + **kwargs, + ) -> Dict[str, Message]: + _work_dir_prefix = kwargs.get("work_dir", None) + output_message: List[Message] = [] + + parent_flow_config = deepcopy(agent_flow_config) + if not isinstance(parent_flow_config.receiver, list): + parent_flow_config.receiver = [parent_flow_config.receiver] + + for idx in range(len(parent_flow_config.receiver)): + if parent_flow_config is None: + flow_config = None + else: + flow_config = deepcopy(parent_flow_config) + flow_config.receiver = flow_config.receiver[idx] + + # if no flow config is provided, use the default + if flow_config is None: + flow_config = get_default_agent_config(scratch_dir) + + work_dir = os.path.join(_work_dir_prefix, f"{str(idx)}") + scratch_dir = os.path.join(work_dir, "scratch") + os.makedirs(scratch_dir, exist_ok=True) + receiver_name = flow_config.receiver.config.name + history = [ + message + for message in history_list + if message.receiver_name == receiver_name + ] + + flow = AutoGenWorkFlowManager( + config=flow_config, history=history, work_dir=scratch_dir + ) + message_text = message.content.strip() + + output = "" + start_time = time.time() + + metadata = {} + flow.run(message=f"{message_text}", clear_history=False) + + metadata["messages"] = flow.agent_history + + output = "" + + if flow_config.summary_method == "last": + successful_code_blocks = extract_successful_code_blocks( + flow.agent_history + ) + last_message = flow.agent_history[-1]["message"]["content"] + successful_code_blocks = "\n\n".join(successful_code_blocks) + output = ( + (last_message + "\n" + successful_code_blocks) + if successful_code_blocks + else last_message + ) + elif flow_config.summary_method == "llm": + output = "" + elif flow_config.summary_method == "none": + output = "" + + metadata["code"] = "" + end_time = time.time() + metadata["time"] = end_time - start_time + modified_files = get_modified_files( + start_time, end_time, scratch_dir, dest_dir=work_dir + ) + metadata["files"] = modified_files + + print("Modified files: ", len(modified_files)) + + output_message.append( + Message( + user_id=message.user_id, + root_msg_id=message.root_msg_id, + role="assistant", + content=output, + receiver_name=receiver_name, + metadata=json.dumps(metadata), + session_id=message.session_id, + ) + ) + + return output_message diff --git a/samples/apps/fireworks-studio/firestudio/cli.py b/samples/apps/fireworks-studio/firestudio/cli.py new file mode 100644 index 000000000000..64d639f6033a --- /dev/null +++ b/samples/apps/fireworks-studio/firestudio/cli.py @@ -0,0 +1,48 @@ +import os +from typing_extensions import Annotated +import typer +import uvicorn + +from .version import VERSION + +app = typer.Typer() + + +@app.command() +def ui( + host: str = "127.0.0.1", + port: int = 8081, + workers: int = 1, + reload: Annotated[bool, typer.Option("--reload")] = False, + docs: bool = False, +): + """ + Launch the Fire Studio UI CLI .Pass in parameters host, port, workers, and reload to override the default values. + """ + + os.environ["AUTOGENUI_API_DOCS"] = str(docs) + + uvicorn.run( + "firestudio.web.app:app", + host=host, + port=port, + workers=workers, + reload=reload, + ) + + +@app.command() +def version(): + """ + Print the version of the Fire Studio UI CLI. + """ + + typer.echo(f"Fire Studio UI CLI version: {VERSION}") + + +def run(): + app() + + +if __name__ == "__main__": + app() diff --git a/samples/apps/fireworks-studio/firestudio/datamodel.py b/samples/apps/fireworks-studio/firestudio/datamodel.py new file mode 100644 index 000000000000..dbd9d37c49c2 --- /dev/null +++ b/samples/apps/fireworks-studio/firestudio/datamodel.py @@ -0,0 +1,225 @@ +import uuid +from datetime import datetime +from typing import Any, Callable, Dict, List, Literal, Optional, Union +from pydantic.dataclasses import dataclass +from dataclasses import asdict, field + + +@dataclass +class Message(object): + user_id: str + role: str + content: str + receiver_name: str + root_msg_id: Optional[str] = None + msg_id: Optional[str] = None + timestamp: Optional[str] = None + personalize: Optional[bool] = False + ra: Optional[str] = None + code: Optional[str] = None + metadata: Optional[Any] = None + session_id: Optional[str] = None + + def __post_init__(self): + if self.msg_id is None: + self.msg_id = str(uuid.uuid4()) + if self.timestamp is None: + self.timestamp = datetime.now().isoformat() + + def dict(self): + result = asdict(self) + return result + + +@dataclass +class Skill(object): + title: str + file_name: str + content: str + id: Optional[str] = None + description: Optional[str] = None + timestamp: Optional[str] = None + user_id: Optional[str] = None + + def __post_init__(self): + if self.id is None: + self.id = str(uuid.uuid4()) + if self.timestamp is None: + self.timestamp = datetime.now().isoformat() + if self.user_id is None: + self.user_id = "default" + + def dict(self): + result = asdict(self) + return result + + +# web api data models + + +# autogenflow data models +@dataclass +class ModelConfig: + """Data model for Model Config item in LLMConfig for AutoGen""" + + model: str + api_key: Optional[str] = None + base_url: Optional[str] = None + api_type: Optional[str] = None + api_version: Optional[str] = None + + +@dataclass +class LLMConfig: + """Data model for LLM Config for AutoGen""" + + config_list: List[Any] = field(default_factory=List) + temperature: float = 0 + cache_seed: Optional[Union[int, None]] = None + timeout: Optional[int] = None + tools: Optional[List[Dict[str, Any]]] = None + + +@dataclass +class AgentConfig: + """Data model for Agent Config for AutoGen""" + + name: str + llm_config: Optional[Union[LLMConfig, bool]] = False + human_input_mode: str = "NEVER" + max_consecutive_auto_reply: int = 10 + system_message: Optional[str] = None + is_termination_msg: Optional[Union[bool, str, Callable]] = None + code_execution_config: Optional[Union[bool, str, Dict[str, Any]]] = None + + def dict(self): + result = asdict(self) + if isinstance(result["llm_config"], LLMConfig): + result["llm_config"] = result["llm_config"].dict() + return result + + +@dataclass +class AgentFlowSpec: + """Data model to help flow load agents from config""" + + type: Literal["assistant", "userproxy", "groupchat"] + config: AgentConfig = field(default_factory=AgentConfig) + id: Optional[str] = None + timestamp: Optional[str] = None + user_id: Optional[str] = None + skills: Optional[Union[None, List[Skill]]] = None + description: Optional[str] = None + + def __post_init__(self): + if self.timestamp is None: + self.timestamp = datetime.now().isoformat() + if self.id is None: + self.id = str(uuid.uuid4()) + if self.user_id is None: + self.user_id = "default" + + def dict(self): + result = asdict(self) + return result + + +@dataclass +class AgentWorkFlowConfig: + """Data model for Flow Config for AutoGen""" + + name: str + description: str + sender: AgentFlowSpec + receiver: Union[AgentFlowSpec, List[AgentFlowSpec]] + type: Literal["default", "groupchat"] = "default" + id: Optional[str] = None + user_id: Optional[str] = None + timestamp: Optional[str] = None + # how the agent message summary is generated. last: only last message is used, none: no summary, llm: use llm to generate summary + summary_method: Optional[Literal["last", "none", "llm"]] = "last" + + def __post_init__(self): + if self.id is None: + self.id = str(uuid.uuid4()) + if self.user_id is None: + self.user_id = "default" + if self.timestamp is None: + self.timestamp = datetime.now().isoformat() + + def dict(self): + result = asdict(self) + result["sender"] = self.sender.dict() + if isinstance(self.receiver, list): + result["receiver"] = [r.dict() for r in self.receiver] + else: + result["receiver"] = self.receiver.dict() + return result + + +@dataclass +class Session(object): + """Data model for AutoGen Chat Session""" + + user_id: str + id: Optional[str] = None + timestamp: Optional[str] = None + flow_config: AgentWorkFlowConfig = None + + def __post_init__(self): + if self.timestamp is None: + self.timestamp = datetime.now().isoformat() + if self.id is None: + self.id = str(uuid.uuid4()) + + def dict(self): + result = asdict(self) + result["flow_config"] = self.flow_config.dict() + return result + + +@dataclass +class Gallery(object): + """Data model for Gallery Item""" + + session: Session + messages: List[Message] + tags: List[str] + id: Optional[str] = None + timestamp: Optional[str] = None + + def __post_init__(self): + if self.timestamp is None: + self.timestamp = datetime.now().isoformat() + if self.id is None: + self.id = str(uuid.uuid4()) + + def dict(self): + result = asdict(self) + return result + + +@dataclass +class ChatWebRequestModel(object): + """Data model for Chat Web Request for Web End""" + + message: Message + flow_config: AgentWorkFlowConfig + + +@dataclass +class DeleteMessageWebRequestModel(object): + user_id: str + msg_id: str + session_id: Optional[str] = None + + +@dataclass +class DBWebRequestModel(object): + user_id: str + msg_id: Optional[str] = None + session: Optional[Session] = None + skill: Optional[Skill] = None + tags: Optional[List[str]] = None + agent: Optional[AgentFlowSpec] = None + workflow: Optional[AgentWorkFlowConfig] = None diff --git a/samples/apps/fireworks-studio/firestudio/utils/__init__.py b/samples/apps/fireworks-studio/firestudio/utils/__init__.py new file mode 100644 index 000000000000..f37b0b0486a2 --- /dev/null +++ b/samples/apps/fireworks-studio/firestudio/utils/__init__.py @@ -0,0 +1,2 @@ +from .dbutils import * +from .utils import * diff --git a/samples/apps/fireworks-studio/firestudio/utils/dbdefaults.json b/samples/apps/fireworks-studio/firestudio/utils/dbdefaults.json new file mode 100644 index 000000000000..da288dd7b184 --- /dev/null +++ b/samples/apps/fireworks-studio/firestudio/utils/dbdefaults.json @@ -0,0 +1,263 @@ +{ + "agents": [ + { + "type": "userproxy", + "description": "A user proxy agent that executes code.", + "config": { + "name": "userproxy", + "human_input_mode": "NEVER", + "max_consecutive_auto_reply": 5, + "system_message": "", + "llm_config": false, + "code_execution_config": { + "work_dir": null, + "use_docker": false + } + } + }, + { + "type": "assistant", + "description": "A primary assistant agent that writes plans and uses tools to solve tasks.", + "skills": [ + { + "title": "generate_and_save_images", + "file_name": "generate_and_save_images.py", + "content": "\nfrom typing import List\nimport uuid\nimport requests # to perform HTTP requests\nfrom pathlib import Path\nfrom typing_extensions import Annotated\nfrom openai import OpenAI\nfrom firestudio.utils.utils import schema_recorder\n\n@schema_recorder(description=\"Function to paint, draw or illustrate images based on the users query or request. Generates images from a given query using OpenAI's DALL-E model and saves them to disk. Use the code below anytime there is a request to create an image\")\ndef generate_and_save_images(query: Annotated[str, \"A natural language description of the image to be generated.\"], image_size: Annotated[str, \"The size of the image to be generated. default is '1024x1024'\"] = \"1024x1024\") -> List[str]:\n client = OpenAI() # Initialize the OpenAI client\n response = client.images.generate(model=\"dall-e-3\", prompt=query, n=1, size=image_size) # Generate images\n\n # List to store the file names of saved images\n saved_files = []\n\n # Check if the response is successful\n if response.data:\n for image_data in response.data:\n # Generate a random UUID as the file name\n file_name = \"/tmp/\" + str(uuid.uuid4()) + \".png\" # Assuming the image is a PNG\n file_path = Path(file_name)\n\n img_url = image_data.url\n img_response = requests.get(img_url)\n if img_response.status_code == 200:\n # Write the binary content to a file\n with open(file_path, \"wb\") as img_file:\n img_file.write(img_response.content)\n print(f\"Image saved to {file_path}\")\n saved_files.append(str(file_path))\n else:\n print(f\"Failed to download the image from {img_url}\")\n else:\n print(\"No image data found in the response!\")\n\n # Return the list of saved files\n return saved_files\n\n\n# Example usage of the function:\n# generate_and_save_images(\"A cute baby sea otter\")\n" + }, + { + "title": "show_image", + "file_name": "show_image.py", + "content": "\nfrom typing import List\nimport uuid\nimport requests # to perform HTTP requests\nfrom pathlib import Path\nfrom typing_extensions import Annotated\nfrom firestudio.utils.utils import schema_recorder\nimport cv2\nfrom matplotlib import pyplot as plt\n\n@schema_recorder(description=\"A function that is capable for displaying an image given path to a image file in png or jpg or jpeg.\")\ndef show_image(path: Annotated[str, \"The path to the image file that needs to be displayed\"]) -> str:\n img = cv2.imread(path,-1)\n plt.imshow(img)\n plt.axis(\"off\")\n plt.show()\n return \"\"\n" + }, + { + "title": "find_papers_arxiv", + "description": "Function ability to find the papers on arxiv", + "file_name": "find_papers_arxiv.py", + "content": "\nimport os\nimport re\nimport json\nimport hashlib\nimport arxiv\nfrom firestudio.utils.utils import schema_recorder\nfrom typing_extensions import Annotated\n\n\n@schema_recorder(description=\"Searches arXiv for the given query using the arXiv API, then returns the search results. This is a helper function. In most cases, callers will want to use 'find_relevant_papers( query, max_results )' instead.\")\ndef search_arxiv(query: Annotated[str, \"The search query\"], max_results: Annotated[Optional[int], \"The maximum number of search results to return. Defaults to 10\"]=10) -> Annotated[List[Dict[str, Any]], \"A list of dictionaries. Each dictionary contains fields such as 'title', 'authors', 'summary', and 'pdf_url'\"]:\n # Example:\n # >>> results = search_arxiv(\"attention is all you need\")\n # >>> print(results)\n\n key = hashlib.md5((\"search_arxiv(\" + str(max_results) + \")\" + query).encode(\"utf-8\")).hexdigest()\n # Create the cache if it doesn't exist\n cache_dir = \".cache\"\n if not os.path.isdir(cache_dir):\n os.mkdir(cache_dir)\n\n fname = os.path.join(cache_dir, key + \".cache\")\n\n # Cache hit\n if os.path.isfile(fname):\n fh = open(fname, \"r\", encoding=\"utf-8\")\n data = json.loads(fh.read())\n fh.close()\n return data\n\n # Normalize the query, removing operator keywords\n query = re.sub(r\"[^\\s\\w]\", \" \", query.lower())\n query = re.sub(r\"\\s(and|or|not)\\s\", \" \", \" \" + query + \" \")\n query = re.sub(r\"[^\\s\\w]\", \" \", query.lower())\n query = re.sub(r\"\\s+\", \" \", query).strip()\n\n search = arxiv.Search(query=query, max_results=max_results, sort_by=arxiv.SortCriterion.Relevance)\n\n jresults = list()\n for result in search.results():\n r = dict()\n r[\"entry_id\"] = result.entry_id\n r[\"updated\"] = str(result.updated)\n r[\"published\"] = str(result.published)\n r[\"title\"] = result.title\n r[\"authors\"] = [str(a) for a in result.authors]\n r[\"summary\"] = result.summary\n r[\"comment\"] = result.comment\n r[\"journal_ref\"] = result.journal_ref\n r[\"doi\"] = result.doi\n r[\"primary_category\"] = result.primary_category\n r[\"categories\"] = result.categories\n r[\"links\"] = [str(link) for link in result.links]\n r[\"pdf_url\"] = result.pdf_url\n jresults.append(r)\n\n if len(jresults) > max_results:\n jresults = jresults[0:max_results]\n\n # Save to cache\n fh = open(fname, \"w\")\n fh.write(json.dumps(jresults))\n fh.close()\n return jresults\n" + }, + { + "title": "get_price", + "description": "Get price history for a stock ticker", + "file_name": "get_price.py", + "content": "\nimport yfinance as yf\nfrom firestudio.utils.utils import schema_recorder\nfrom typing_extensions import Annotated\nimport uuid\nfrom pathlib import Path\n\n@schema_recorder(description=\"Helper function to obtain stock price history of a company over specified period. The price information is written to a file and the path of the file is returned. The file is csv and contains following columns - Date,Open,High,Low,Close,Volume,Dividends,Stock Splits\")\ndef get_prices(ticker: Annotated[str, \"Stock ticker for a company\"], period: Annotated[str, \"data period to download (Either Use period parameter or use start and end) Valid periods are: 1d, 5d, 1mo, 3mo, 6mo, 1y, 2y, 5y, 10y, ytd, max\"]) -> Annotated[str, \"File which contains the price of the a ticker, each price in a new line\"]:\n # Generate a random UUID as the file name\n file_name = \"/tmp/\" + str(uuid.uuid4()) + \".csv\" \n file_path = Path(file_name)\n\n tk = yf.Ticker(ticker=ticker)\n prices = tk.history(period=period)\n\n with open(file_path, \"w\") as f:\n prices.to_csv(f) \n\n return file_name\n" + } + ], + "config": { + "name": "primary_assistant", + "llm_config": { + "config_list": [ + { + "model": "gpt-4-1106-preview" + } + ], + "temperature": 0.1, + "timeout": 600, + "cache_seed": null + }, + "human_input_mode": "NEVER", + "max_consecutive_auto_reply": 8, + "system_message": "You are a helpful assistant that can use available functions when needed to solve problems. At each point, do your best to determine if the user's request has been addressed. IF THE REQUEST HAS NOT BEEN ADDRESSED, RESPOND WITH CODE TO ADDRESS IT. IF A FAILURE OCCURRED (e.g., due to a missing library) AND SOME ADDITIONAL CODE WAS WRITTEN (e.g. code to install the library), ENSURE THAT THE ORIGINAL CODE TO ADDRESS THE TASK STILL GETS EXECUTED. If the request HAS been addressed, respond with a summary of the result. The summary must be written as a coherent helpful response to the user request e.g. 'Sure, here is result to your request ' or 'The tallest mountain in Africa is ..' etc. The summary MUST end with the word TERMINATE. If the user request is pleasantry or greeting, you should respond with a pleasantry or greeting and TERMINATE." + } + } + ], + "skills": [ + { + "title": "generate_and_save_images", + "file_name": "generate_and_save_images.py", + "content": "\nfrom typing import List\nimport uuid\nimport requests # to perform HTTP requests\nfrom pathlib import Path\nfrom typing_extensions import Annotated\nfrom openai import OpenAI\nfrom firestudio.utils.utils import schema_recorder\n\n@schema_recorder(description=\"Function to paint, draw or illustrate images based on the users query or request. Generates images from a given query using OpenAI's DALL-E model and saves them to disk. Use the code below anytime there is a request to create an image\")\ndef generate_and_save_images(query: Annotated[str, \"A natural language description of the image to be generated.\"], image_size: Annotated[str, \"The size of the image to be generated. default is '1024x1024'\"] = \"1024x1024\") -> List[str]:\n client = OpenAI() # Initialize the OpenAI client\n response = client.images.generate(model=\"dall-e-3\", prompt=query, n=1, size=image_size) # Generate images\n\n # List to store the file names of saved images\n saved_files = []\n\n # Check if the response is successful\n if response.data:\n for image_data in response.data:\n # Generate a random UUID as the file name\n file_name = \"/tmp/\" + str(uuid.uuid4()) + \".png\" # Assuming the image is a PNG\n file_path = Path(file_name)\n\n img_url = image_data.url\n img_response = requests.get(img_url)\n if img_response.status_code == 200:\n # Write the binary content to a file\n with open(file_path, \"wb\") as img_file:\n img_file.write(img_response.content)\n print(f\"Image saved to {file_path}\")\n saved_files.append(str(file_path))\n else:\n print(f\"Failed to download the image from {img_url}\")\n else:\n print(\"No image data found in the response!\")\n\n # Return the list of saved files\n return saved_files\n\n\n# Example usage of the function:\n# generate_and_save_images(\"A cute baby sea otter\")\n" + }, + { + "title": "show_image", + "file_name": "show_image.py", + "content": "\nfrom typing import List\nimport uuid\nimport requests # to perform HTTP requests\nfrom pathlib import Path\nfrom typing_extensions import Annotated\nfrom firestudio.utils.utils import schema_recorder\nimport cv2\nfrom matplotlib import pyplot as plt\n\n@schema_recorder(description=\"A function that is capable for displaying an image given path to a image file in png or jpg or jpeg.\")\ndef show_image(path: Annotated[str, \"The path to the image file that needs to be displayed\"]) -> str:\n img = cv2.imread(path,-1)\n plt.imshow(img)\n plt.axis(\"off\")\n plt.show()\n return \"\"\n" + }, + { + "title": "find_papers_arxiv", + "description": "Function ability to find the papers on arxiv", + "file_name": "find_papers_arxiv.py", + "content": "\nimport os\nimport re\nimport json\nimport hashlib\nimport arxiv\nfrom firestudio.utils.utils import schema_recorder\nfrom typing_extensions import Annotated\n\n\n@schema_recorder(description=\"Searches arXiv for the given query using the arXiv API, then returns the search results. This is a helper function. In most cases, callers will want to use 'find_relevant_papers( query, max_results )' instead.\")\ndef search_arxiv(query: Annotated[str, \"The search query\"], max_results: Annotated[Optional[int], \"The maximum number of search results to return. Defaults to 10\"]=10) -> Annotated[List[Dict[str, Any]], \"A list of dictionaries. Each dictionary contains fields such as 'title', 'authors', 'summary', and 'pdf_url'\"]:\n # Example:\n # >>> results = search_arxiv(\"attention is all you need\")\n # >>> print(results)\n\n key = hashlib.md5((\"search_arxiv(\" + str(max_results) + \")\" + query).encode(\"utf-8\")).hexdigest()\n # Create the cache if it doesn't exist\n cache_dir = \".cache\"\n if not os.path.isdir(cache_dir):\n os.mkdir(cache_dir)\n\n fname = os.path.join(cache_dir, key + \".cache\")\n\n # Cache hit\n if os.path.isfile(fname):\n fh = open(fname, \"r\", encoding=\"utf-8\")\n data = json.loads(fh.read())\n fh.close()\n return data\n\n # Normalize the query, removing operator keywords\n query = re.sub(r\"[^\\s\\w]\", \" \", query.lower())\n query = re.sub(r\"\\s(and|or|not)\\s\", \" \", \" \" + query + \" \")\n query = re.sub(r\"[^\\s\\w]\", \" \", query.lower())\n query = re.sub(r\"\\s+\", \" \", query).strip()\n\n search = arxiv.Search(query=query, max_results=max_results, sort_by=arxiv.SortCriterion.Relevance)\n\n jresults = list()\n for result in search.results():\n r = dict()\n r[\"entry_id\"] = result.entry_id\n r[\"updated\"] = str(result.updated)\n r[\"published\"] = str(result.published)\n r[\"title\"] = result.title\n r[\"authors\"] = [str(a) for a in result.authors]\n r[\"summary\"] = result.summary\n r[\"comment\"] = result.comment\n r[\"journal_ref\"] = result.journal_ref\n r[\"doi\"] = result.doi\n r[\"primary_category\"] = result.primary_category\n r[\"categories\"] = result.categories\n r[\"links\"] = [str(link) for link in result.links]\n r[\"pdf_url\"] = result.pdf_url\n jresults.append(r)\n\n if len(jresults) > max_results:\n jresults = jresults[0:max_results]\n\n # Save to cache\n fh = open(fname, \"w\")\n fh.write(json.dumps(jresults))\n fh.close()\n return jresults\n" + }, + { + "title": "get_price", + "description": "Get price history for a stock ticker", + "file_name": "get_price.py", + "content": "\nimport yfinance as yf\nfrom firestudio.utils.utils import schema_recorder\nfrom typing_extensions import Annotated\nimport uuid\nfrom pathlib import Path\n\n@schema_recorder(description=\"Helper function to obtain stock price history of a company over specified period. The price information is written to a file and the path of the file is returned. The file is csv and contains following columns - Date,Open,High,Low,Close,Volume,Dividends,Stock Splits\")\ndef get_prices(ticker: Annotated[str, \"Stock ticker for a company\"], period: Annotated[str, \"data period to download (Either Use period parameter or use start and end) Valid periods are: 1d, 5d, 1mo, 3mo, 6mo, 1y, 2y, 5y, 10y, ytd, max\"]) -> Annotated[str, \"File which contains the price of the a ticker, each price in a new line\"]:\n # Generate a random UUID as the file name\n file_name = \"/tmp/\" + str(uuid.uuid4()) + \".csv\" \n file_path = Path(file_name)\n\n tk = yf.Ticker(ticker=ticker)\n prices = tk.history(period=period)\n\n with open(file_path, \"w\") as f:\n prices.to_csv(f) \n\n return file_name\n" + } + ], + "workflows": [ + { + "name": "General Agent Workflow", + "description": "This workflow is used for general purpose tasks.", + "sender": { + "type": "userproxy", + "config": { + "name": "userproxy", + "human_input_mode": "NEVER", + "max_consecutive_auto_reply": 10, + "system_message": "", + "llm_config": false, + "code_execution_config": { + "work_dir": null, + "use_docker": false + } + } + }, + "receiver": [ + { + "type": "assistant", + "description": "Default assistant to generate plans and write code to solve tasks.", + "skills": [ + { + "title": "generate_and_save_images", + "file_name": "generate_and_save_images.py", + "content": "\nfrom typing import List\nimport uuid\nimport requests # to perform HTTP requests\nfrom pathlib import Path\nfrom typing_extensions import Annotated\nfrom openai import OpenAI\nfrom firestudio.utils.utils import schema_recorder\n\n@schema_recorder(description=\"Function to paint, draw or illustrate images based on the users query or request. Generates images from a given query using OpenAI's DALL-E model and saves them to disk. Use the code below anytime there is a request to create an image\")\ndef generate_and_save_images(query: Annotated[str, \"A natural language description of the image to be generated.\"], image_size: Annotated[str, \"The size of the image to be generated. default is '1024x1024'\"] = \"1024x1024\") -> List[str]:\n client = OpenAI() # Initialize the OpenAI client\n response = client.images.generate(model=\"dall-e-3\", prompt=query, n=1, size=image_size) # Generate images\n\n # List to store the file names of saved images\n saved_files = []\n\n # Check if the response is successful\n if response.data:\n for image_data in response.data:\n # Generate a random UUID as the file name\n file_name = \"/tmp/\" + str(uuid.uuid4()) + \".png\" # Assuming the image is a PNG\n file_path = Path(file_name)\n\n img_url = image_data.url\n img_response = requests.get(img_url)\n if img_response.status_code == 200:\n # Write the binary content to a file\n with open(file_path, \"wb\") as img_file:\n img_file.write(img_response.content)\n print(f\"Image saved to {file_path}\")\n saved_files.append(str(file_path))\n else:\n print(f\"Failed to download the image from {img_url}\")\n else:\n print(\"No image data found in the response!\")\n\n # Return the list of saved files\n return saved_files\n\n\n# Example usage of the function:\n# generate_and_save_images(\"A cute baby sea otter\")\n" + }, + { + "title": "show_image", + "file_name": "show_image.py", + "content": "\nfrom typing import List\nimport uuid\nimport requests # to perform HTTP requests\nfrom pathlib import Path\nfrom typing_extensions import Annotated\nfrom firestudio.utils.utils import schema_recorder\nimport cv2\nfrom matplotlib import pyplot as plt\n\n@schema_recorder(description=\"A function that is capable for displaying an image given path to a image file in png or jpg or jpeg.\")\ndef show_image(path: Annotated[str, \"The path to the image file that needs to be displayed\"]) -> str:\n img = cv2.imread(path,-1)\n plt.imshow(img)\n plt.axis(\"off\")\n plt.show()\n return \"\"\n" + }, + { + "title": "find_papers_arxiv", + "description": "Function ability to find the papers on arxiv", + "file_name": "find_papers_arxiv.py", + "content": "\nimport os\nimport re\nimport json\nimport hashlib\nimport arxiv\nfrom firestudio.utils.utils import schema_recorder\nfrom typing_extensions import Annotated\n\n\n@schema_recorder(description=\"Searches arXiv for the given query using the arXiv API, then returns the search results. This is a helper function. In most cases, callers will want to use 'find_relevant_papers( query, max_results )' instead.\")\ndef search_arxiv(query: Annotated[str, \"The search query\"], max_results: Annotated[Optional[int], \"The maximum number of search results to return. Defaults to 10\"]=10) -> Annotated[List[Dict[str, Any]], \"A list of dictionaries. Each dictionary contains fields such as 'title', 'authors', 'summary', and 'pdf_url'\"]:\n # Example:\n # >>> results = search_arxiv(\"attention is all you need\")\n # >>> print(results)\n\n key = hashlib.md5((\"search_arxiv(\" + str(max_results) + \")\" + query).encode(\"utf-8\")).hexdigest()\n # Create the cache if it doesn't exist\n cache_dir = \".cache\"\n if not os.path.isdir(cache_dir):\n os.mkdir(cache_dir)\n\n fname = os.path.join(cache_dir, key + \".cache\")\n\n # Cache hit\n if os.path.isfile(fname):\n fh = open(fname, \"r\", encoding=\"utf-8\")\n data = json.loads(fh.read())\n fh.close()\n return data\n\n # Normalize the query, removing operator keywords\n query = re.sub(r\"[^\\s\\w]\", \" \", query.lower())\n query = re.sub(r\"\\s(and|or|not)\\s\", \" \", \" \" + query + \" \")\n query = re.sub(r\"[^\\s\\w]\", \" \", query.lower())\n query = re.sub(r\"\\s+\", \" \", query).strip()\n\n search = arxiv.Search(query=query, max_results=max_results, sort_by=arxiv.SortCriterion.Relevance)\n\n jresults = list()\n for result in search.results():\n r = dict()\n r[\"entry_id\"] = result.entry_id\n r[\"updated\"] = str(result.updated)\n r[\"published\"] = str(result.published)\n r[\"title\"] = result.title\n r[\"authors\"] = [str(a) for a in result.authors]\n r[\"summary\"] = result.summary\n r[\"comment\"] = result.comment\n r[\"journal_ref\"] = result.journal_ref\n r[\"doi\"] = result.doi\n r[\"primary_category\"] = result.primary_category\n r[\"categories\"] = result.categories\n r[\"links\"] = [str(link) for link in result.links]\n r[\"pdf_url\"] = result.pdf_url\n jresults.append(r)\n\n if len(jresults) > max_results:\n jresults = jresults[0:max_results]\n\n # Save to cache\n fh = open(fname, \"w\")\n fh.write(json.dumps(jresults))\n fh.close()\n return jresults\n" + }, + { + "title": "get_price", + "description": "Get price history for a stock ticker", + "file_name": "get_price.py", + "content": "\nimport yfinance as yf\nfrom firestudio.utils.utils import schema_recorder\nfrom typing_extensions import Annotated\nimport uuid\nfrom pathlib import Path\n\n@schema_recorder(description=\"Helper function to obtain stock price history of a company over specified period. The price information is written to a file and the path of the file is returned. The file is csv and contains following columns - Date,Open,High,Low,Close,Volume,Dividends,Stock Splits\")\ndef get_prices(ticker: Annotated[str, \"Stock ticker for a company\"], period: Annotated[str, \"data period to download (Either Use period parameter or use start and end) Valid periods are: 1d, 5d, 1mo, 3mo, 6mo, 1y, 2y, 5y, 10y, ytd, max\"]) -> Annotated[str, \"File which contains the price of the a ticker, each price in a new line\"]:\n # Generate a random UUID as the file name\n file_name = \"/tmp/\" + str(uuid.uuid4()) + \".csv\" \n file_path = Path(file_name)\n\n tk = yf.Ticker(ticker=ticker)\n prices = tk.history(period=period)\n\n with open(file_path, \"w\") as f:\n prices.to_csv(f) \n\n return file_name\n" + } + ], + "config": { + "name": "primary_assistant", + "llm_config": { + "config_list": [ + { + "model": "gpt-4-1106-preview" + } + ], + "temperature": 0.1, + "timeout": 600, + "cache_seed": null + }, + "human_input_mode": "NEVER", + "max_consecutive_auto_reply": 15, + "system_message": "You are a helpful assistant that can use available functions when needed to solve problems. At each point, do your best to determine if the user's request has been addressed. IF THE REQUEST HAS NOT BEEN ADDRESSED, RESPOND WITH CODE TO ADDRESS IT. IF A FAILURE OCCURRED (e.g., due to a missing library) AND SOME ADDITIONAL CODE WAS WRITTEN (e.g. code to install the library), ENSURE THAT THE ORIGINAL CODE TO ADDRESS THE TASK STILL GETS EXECUTED. If the request HAS been addressed, respond with a summary of the result. The summary must be written as a coherent helpful response to the user request e.g. 'Sure, here is result to your request ' or 'The tallest mountain in Africa is ..' etc. The summary MUST end with the word TERMINATE. If the user request is pleasantry or greeting, you should respond with a pleasantry or greeting and TERMINATE." + } + }, + { + "type": "assistant", + "description": "Default assistant to generate plans and write code to solve tasks.", + "skills": [ + { + "title": "generate_and_save_images", + "file_name": "generate_and_save_images.py", + "content": "\nfrom typing import List\nimport uuid\nimport requests # to perform HTTP requests\nfrom pathlib import Path\nfrom typing_extensions import Annotated\nfrom openai import OpenAI\nfrom firestudio.utils.utils import schema_recorder\n\n@schema_recorder(description=\"Function to paint, draw or illustrate images based on the users query or request. Generates images from a given query using OpenAI's DALL-E model and saves them to disk. Use the code below anytime there is a request to create an image\")\ndef generate_and_save_images(query: Annotated[str, \"A natural language description of the image to be generated.\"], image_size: Annotated[str, \"The size of the image to be generated. default is '1024x1024'\"] = \"1024x1024\") -> List[str]:\n client = OpenAI() # Initialize the OpenAI client\n response = client.images.generate(model=\"dall-e-3\", prompt=query, n=1, size=image_size) # Generate images\n\n # List to store the file names of saved images\n saved_files = []\n\n # Check if the response is successful\n if response.data:\n for image_data in response.data:\n # Generate a random UUID as the file name\n file_name = \"/tmp/\" + str(uuid.uuid4()) + \".png\" # Assuming the image is a PNG\n file_path = Path(file_name)\n\n img_url = image_data.url\n img_response = requests.get(img_url)\n if img_response.status_code == 200:\n # Write the binary content to a file\n with open(file_path, \"wb\") as img_file:\n img_file.write(img_response.content)\n print(f\"Image saved to {file_path}\")\n saved_files.append(str(file_path))\n else:\n print(f\"Failed to download the image from {img_url}\")\n else:\n print(\"No image data found in the response!\")\n\n # Return the list of saved files\n return saved_files\n\n\n# Example usage of the function:\n# generate_and_save_images(\"A cute baby sea otter\")\n" + }, + { + "title": "show_image", + "file_name": "show_image.py", + "content": "\nfrom typing import List\nimport uuid\nimport requests # to perform HTTP requests\nfrom pathlib import Path\nfrom typing_extensions import Annotated\nfrom firestudio.utils.utils import schema_recorder\nimport cv2\nfrom matplotlib import pyplot as plt\n\n@schema_recorder(description=\"A function that is capable for displaying an image given path to a image file in png or jpg or jpeg.\")\ndef show_image(path: Annotated[str, \"The path to the image file that needs to be displayed\"]) -> str:\n img = cv2.imread(path,-1)\n plt.imshow(img)\n plt.axis(\"off\")\n plt.show()\n return \"\"\n" + }, + { + "title": "find_papers_arxiv", + "description": "Function ability to find the papers on arxiv", + "file_name": "find_papers_arxiv.py", + "content": "\nimport os\nimport re\nimport json\nimport hashlib\nimport arxiv\nfrom firestudio.utils.utils import schema_recorder\nfrom typing_extensions import Annotated\n\n\n@schema_recorder(description=\"Searches arXiv for the given query using the arXiv API, then returns the search results. This is a helper function. In most cases, callers will want to use 'find_relevant_papers( query, max_results )' instead.\")\ndef search_arxiv(query: Annotated[str, \"The search query\"], max_results: Annotated[Optional[int], \"The maximum number of search results to return. Defaults to 10\"]=10) -> Annotated[List[Dict[str, Any]], \"A list of dictionaries. Each dictionary contains fields such as 'title', 'authors', 'summary', and 'pdf_url'\"]:\n # Example:\n # >>> results = search_arxiv(\"attention is all you need\")\n # >>> print(results)\n\n key = hashlib.md5((\"search_arxiv(\" + str(max_results) + \")\" + query).encode(\"utf-8\")).hexdigest()\n # Create the cache if it doesn't exist\n cache_dir = \".cache\"\n if not os.path.isdir(cache_dir):\n os.mkdir(cache_dir)\n\n fname = os.path.join(cache_dir, key + \".cache\")\n\n # Cache hit\n if os.path.isfile(fname):\n fh = open(fname, \"r\", encoding=\"utf-8\")\n data = json.loads(fh.read())\n fh.close()\n return data\n\n # Normalize the query, removing operator keywords\n query = re.sub(r\"[^\\s\\w]\", \" \", query.lower())\n query = re.sub(r\"\\s(and|or|not)\\s\", \" \", \" \" + query + \" \")\n query = re.sub(r\"[^\\s\\w]\", \" \", query.lower())\n query = re.sub(r\"\\s+\", \" \", query).strip()\n\n search = arxiv.Search(query=query, max_results=max_results, sort_by=arxiv.SortCriterion.Relevance)\n\n jresults = list()\n for result in search.results():\n r = dict()\n r[\"entry_id\"] = result.entry_id\n r[\"updated\"] = str(result.updated)\n r[\"published\"] = str(result.published)\n r[\"title\"] = result.title\n r[\"authors\"] = [str(a) for a in result.authors]\n r[\"summary\"] = result.summary\n r[\"comment\"] = result.comment\n r[\"journal_ref\"] = result.journal_ref\n r[\"doi\"] = result.doi\n r[\"primary_category\"] = result.primary_category\n r[\"categories\"] = result.categories\n r[\"links\"] = [str(link) for link in result.links]\n r[\"pdf_url\"] = result.pdf_url\n jresults.append(r)\n\n if len(jresults) > max_results:\n jresults = jresults[0:max_results]\n\n # Save to cache\n fh = open(fname, \"w\")\n fh.write(json.dumps(jresults))\n fh.close()\n return jresults\n" + }, + { + "title": "get_price", + "description": "Get price history for a stock ticker", + "file_name": "get_price.py", + "content": "\nimport yfinance as yf\nfrom firestudio.utils.utils import schema_recorder\nfrom typing_extensions import Annotated\nimport uuid\nfrom pathlib import Path\n\n@schema_recorder(description=\"Helper function to obtain stock price history of a company over specified period. The price information is written to a file and the path of the file is returned. The file is csv and contains following columns - Date,Open,High,Low,Close,Volume,Dividends,Stock Splits\")\ndef get_prices(ticker: Annotated[str, \"Stock ticker for a company\"], period: Annotated[str, \"data period to download (Either Use period parameter or use start and end) Valid periods are: 1d, 5d, 1mo, 3mo, 6mo, 1y, 2y, 5y, 10y, ytd, max\"]) -> Annotated[str, \"File which contains the price of the a ticker, each price in a new line\"]:\n # Generate a random UUID as the file name\n file_name = \"/tmp/\" + str(uuid.uuid4()) + \".csv\" \n file_path = Path(file_name)\n\n tk = yf.Ticker(ticker=ticker)\n prices = tk.history(period=period)\n\n with open(file_path, \"w\") as f:\n prices.to_csv(f) \n\n return file_name\n" + } + ], + "config": { + "name": "secondary_assistant", + "llm_config": { + "config_list": [ + { + "model": "gpt-4-1106-preview" + } + ], + "temperature": 0.1, + "timeout": 600, + "cache_seed": null + }, + "human_input_mode": "NEVER", + "max_consecutive_auto_reply": 15, + "system_message": "You are a helpful assistant that can use available functions when needed to solve problems. At each point, do your best to determine if the user's request has been addressed. IF THE REQUEST HAS NOT BEEN ADDRESSED, RESPOND WITH CODE TO ADDRESS IT. IF A FAILURE OCCURRED (e.g., due to a missing library) AND SOME ADDITIONAL CODE WAS WRITTEN (e.g. code to install the library), ENSURE THAT THE ORIGINAL CODE TO ADDRESS THE TASK STILL GETS EXECUTED. If the request HAS been addressed, respond with a summary of the result. The summary must be written as a coherent helpful response to the user request e.g. 'Sure, here is result to your request ' or 'The tallest mountain in Africa is ..' etc. The summary MUST end with the word TERMINATE. If the user request is pleasantry or greeting, you should respond with a pleasantry or greeting and TERMINATE." + } + } + ], + "type": "default" + }, + { + "name": "FW General Agent Workflow", + "description": "This workflow is used for general purpose tasks.", + "sender": { + "type": "userproxy", + "config": { + "name": "userproxy", + "human_input_mode": "NEVER", + "max_consecutive_auto_reply": 10, + "system_message": "", + "llm_config": false, + "code_execution_config": { + "work_dir": null, + "use_docker": false + } + } + }, + "receiver": { + "type": "assistant", + "description": "Default assistant to generate plans and write code to solve tasks.", + "skills": [ + { + "title": "generate_and_save_images", + "file_name": "generate_and_save_images.py", + "content": "\nfrom typing import List\nimport uuid\nimport requests # to perform HTTP requests\nfrom pathlib import Path\nfrom typing_extensions import Annotated\nfrom openai import OpenAI\nfrom firestudio.utils.utils import schema_recorder\n\n@schema_recorder(description=\"Function to paint, draw or illustrate images based on the users query or request. Generates images from a given query using OpenAI's DALL-E model and saves them to disk. Use the code below anytime there is a request to create an image\")\ndef generate_and_save_images(query: Annotated[str, \"A natural language description of the image to be generated.\"], image_size: Annotated[str, \"The size of the image to be generated. default is '1024x1024'\"] = \"1024x1024\") -> List[str]:\n client = OpenAI() # Initialize the OpenAI client\n response = client.images.generate(model=\"dall-e-3\", prompt=query, n=1, size=image_size) # Generate images\n\n # List to store the file names of saved images\n saved_files = []\n\n # Check if the response is successful\n if response.data:\n for image_data in response.data:\n # Generate a random UUID as the file name\n file_name = \"/tmp/\" + str(uuid.uuid4()) + \".png\" # Assuming the image is a PNG\n file_path = Path(file_name)\n\n img_url = image_data.url\n img_response = requests.get(img_url)\n if img_response.status_code == 200:\n # Write the binary content to a file\n with open(file_path, \"wb\") as img_file:\n img_file.write(img_response.content)\n print(f\"Image saved to {file_path}\")\n saved_files.append(str(file_path))\n else:\n print(f\"Failed to download the image from {img_url}\")\n else:\n print(\"No image data found in the response!\")\n\n # Return the list of saved files\n return saved_files\n\n\n# Example usage of the function:\n# generate_and_save_images(\"A cute baby sea otter\")\n" + }, + { + "title": "show_image", + "file_name": "show_image.py", + "content": "\nfrom typing import List\nimport uuid\nimport requests # to perform HTTP requests\nfrom pathlib import Path\nfrom typing_extensions import Annotated\nfrom firestudio.utils.utils import schema_recorder\nimport cv2\nfrom matplotlib import pyplot as plt\n\n@schema_recorder(description=\"A function that is capable for displaying an image given path to a image file in png or jpg or jpeg.\")\ndef show_image(path: Annotated[str, \"The path to the image file that needs to be displayed\"]) -> str:\n img = cv2.imread(path,-1)\n plt.imshow(img)\n plt.axis(\"off\")\n plt.show()\n return \"\"\n" + }, + { + "title": "find_papers_arxiv", + "description": "Function ability to find the papers on arxiv", + "file_name": "find_papers_arxiv.py", + "content": "\nimport os\nimport re\nimport json\nimport hashlib\nimport arxiv\nfrom firestudio.utils.utils import schema_recorder\nfrom typing_extensions import Annotated\n\n\n@schema_recorder(description=\"Searches arXiv for the given query using the arXiv API, then returns the search results. This is a helper function. In most cases, callers will want to use 'find_relevant_papers( query, max_results )' instead.\")\ndef search_arxiv(query: Annotated[str, \"The search query\"], max_results: Annotated[Optional[int], \"The maximum number of search results to return. Defaults to 10\"]=10) -> Annotated[List[Dict[str, Any]], \"A list of dictionaries. Each dictionary contains fields such as 'title', 'authors', 'summary', and 'pdf_url'\"]:\n # Example:\n # >>> results = search_arxiv(\"attention is all you need\")\n # >>> print(results)\n\n key = hashlib.md5((\"search_arxiv(\" + str(max_results) + \")\" + query).encode(\"utf-8\")).hexdigest()\n # Create the cache if it doesn't exist\n cache_dir = \".cache\"\n if not os.path.isdir(cache_dir):\n os.mkdir(cache_dir)\n\n fname = os.path.join(cache_dir, key + \".cache\")\n\n # Cache hit\n if os.path.isfile(fname):\n fh = open(fname, \"r\", encoding=\"utf-8\")\n data = json.loads(fh.read())\n fh.close()\n return data\n\n # Normalize the query, removing operator keywords\n query = re.sub(r\"[^\\s\\w]\", \" \", query.lower())\n query = re.sub(r\"\\s(and|or|not)\\s\", \" \", \" \" + query + \" \")\n query = re.sub(r\"[^\\s\\w]\", \" \", query.lower())\n query = re.sub(r\"\\s+\", \" \", query).strip()\n\n search = arxiv.Search(query=query, max_results=max_results, sort_by=arxiv.SortCriterion.Relevance)\n\n jresults = list()\n for result in search.results():\n r = dict()\n r[\"entry_id\"] = result.entry_id\n r[\"updated\"] = str(result.updated)\n r[\"published\"] = str(result.published)\n r[\"title\"] = result.title\n r[\"authors\"] = [str(a) for a in result.authors]\n r[\"summary\"] = result.summary\n r[\"comment\"] = result.comment\n r[\"journal_ref\"] = result.journal_ref\n r[\"doi\"] = result.doi\n r[\"primary_category\"] = result.primary_category\n r[\"categories\"] = result.categories\n r[\"links\"] = [str(link) for link in result.links]\n r[\"pdf_url\"] = result.pdf_url\n jresults.append(r)\n\n if len(jresults) > max_results:\n jresults = jresults[0:max_results]\n\n # Save to cache\n fh = open(fname, \"w\")\n fh.write(json.dumps(jresults))\n fh.close()\n return jresults\n" + }, + { + "title": "get_price", + "description": "Get price history for a stock ticker", + "file_name": "get_price.py", + "content": "\nimport yfinance as yf\nfrom firestudio.utils.utils import schema_recorder\nfrom typing_extensions import Annotated\nimport uuid\nfrom pathlib import Path\n\n@schema_recorder(description=\"Helper function to obtain stock price history of a company over specified period. The price information is written to a file and the path of the file is returned. The file is csv and contains following columns - Date,Open,High,Low,Close,Volume,Dividends,Stock Splits\")\ndef get_prices(ticker: Annotated[str, \"Stock ticker for a company\"], period: Annotated[str, \"data period to download (Either Use period parameter or use start and end) Valid periods are: 1d, 5d, 1mo, 3mo, 6mo, 1y, 2y, 5y, 10y, ytd, max\"]) -> Annotated[str, \"File which contains the price of the a ticker, each price in a new line\"]:\n # Generate a random UUID as the file name\n file_name = \"/tmp/\" + str(uuid.uuid4()) + \".csv\" \n file_path = Path(file_name)\n\n tk = yf.Ticker(ticker=ticker)\n prices = tk.history(period=period)\n\n with open(file_path, \"w\") as f:\n prices.to_csv(f) \n\n return file_name\n" + } + ], + "config": { + "name": "primary_assistant", + "llm_config": { + "config_list": [ + { + "model": "accounts/fireworks/models/firefunction-v1", + "api_key": "vnAo7YlsAt4Pwv2LpOAi8TyXklMqUOazjwx9mDLmHcdXoLUH", + "base_url": "https://api.fireworks.ai/inference/v1" + } + ], + "temperature": 0.1, + "timeout": 600, + "cache_seed": null + }, + "human_input_mode": "NEVER", + "max_consecutive_auto_reply": 15, + "system_message": "You are a helpful assistant that can use available functions when needed to solve problems. At each point, do your best to determine if the user's request has been addressed. IF THE REQUEST HAS NOT BEEN ADDRESSED, RESPOND WITH CODE TO ADDRESS IT. IF A FAILURE OCCURRED (e.g., due to a missing library) AND SOME ADDITIONAL CODE WAS WRITTEN (e.g. code to install the library), ENSURE THAT THE ORIGINAL CODE TO ADDRESS THE TASK STILL GETS EXECUTED. If the request HAS been addressed, respond with a summary of the result. The summary must be written as a coherent helpful response to the user request e.g. 'Sure, here is result to your request ' or 'The tallest mountain in Africa is ..' etc. The summary MUST end with the word TERMINATE. If the user request is pleasantry or greeting, you should respond with a pleasantry or greeting and TERMINATE." + } + }, + "type": "default" + } + ] +} \ No newline at end of file diff --git a/samples/apps/fireworks-studio/firestudio/utils/dbutils.py b/samples/apps/fireworks-studio/firestudio/utils/dbutils.py new file mode 100644 index 000000000000..50df22e5e9f2 --- /dev/null +++ b/samples/apps/fireworks-studio/firestudio/utils/dbutils.py @@ -0,0 +1,745 @@ +import json +import logging +import sqlite3 +import threading +import os +from typing import Any, List, Dict, Optional, Tuple +from ..datamodel import ( + AgentFlowSpec, + AgentWorkFlowConfig, + Gallery, + Message, + Session, + Skill, +) + + +MESSAGES_TABLE_SQL = """ + CREATE TABLE IF NOT EXISTS messages ( + user_id TEXT NOT NULL, + session_id TEXT, + root_msg_id TEXT NOT NULL, + msg_id TEXT, + role TEXT NOT NULL, + content TEXT NOT NULL, + received_name TEXT NOT NULL, + metadata TEXT, + timestamp DATETIME, + UNIQUE (user_id, root_msg_id, msg_id) + ) + """ + +SESSIONS_TABLE_SQL = """ + CREATE TABLE IF NOT EXISTS sessions ( + id TEXT NOT NULL, + user_id TEXT NOT NULL, + timestamp DATETIME NOT NULL, + flow_config TEXT, + UNIQUE (user_id, id) + ) + """ + +SKILLS_TABLE_SQL = """ + CREATE TABLE IF NOT EXISTS skills ( + id TEXT NOT NULL, + user_id TEXT NOT NULL, + timestamp DATETIME NOT NULL, + content TEXT, + title TEXT, + file_name TEXT, + UNIQUE (id, user_id) + ) + """ +AGENTS_TABLE_SQL = """ + CREATE TABLE IF NOT EXISTS agents ( + + id TEXT NOT NULL, + user_id TEXT NOT NULL, + timestamp DATETIME NOT NULL, + config TEXT, + type TEXT, + skills TEXT, + description TEXT, + UNIQUE (id, user_id) + ) + """ + +WORKFLOWS_TABLE_SQL = """ + CREATE TABLE IF NOT EXISTS workflows ( + id TEXT NOT NULL, + user_id TEXT NOT NULL, + timestamp DATETIME NOT NULL, + sender TEXT, + receiver TEXT, + type TEXT, + name TEXT, + description TEXT, + summary_method TEXT, + UNIQUE (id, user_id) + ) + """ + +GALLERY_TABLE_SQL = """ + CREATE TABLE IF NOT EXISTS gallery ( + id TEXT NOT NULL, + session TEXT, + messages TEXT, + tags TEXT, + timestamp DATETIME NOT NULL, + UNIQUE ( id) + ) + """ + + +lock = threading.Lock() +logger = logging.getLogger() + + +class DBManager: + """ + A database manager class that handles the creation and interaction with an SQLite database. + """ + + def __init__(self, path: str = "database.sqlite", **kwargs: Any) -> None: + """ + Initializes the DBManager object, creates a database if it does not exist, and establishes a connection. + + Args: + path (str): The file path to the SQLite database file. + **kwargs: Additional keyword arguments to pass to the sqlite3.connect method. + """ + + self.path = path + # check if the database exists, if not create it + # self.reset_db() + if not os.path.exists(self.path): + logger.info("Creating database") + self.init_db(path=self.path, **kwargs) + + try: + self.conn = sqlite3.connect(self.path, check_same_thread=False, **kwargs) + self.cursor = self.conn.cursor() + except Exception as e: + logger.error("Error connecting to database: %s", e) + raise e + + def reset_db(self): + """ + Reset the database by deleting the database file and creating a new one. + """ + print("resetting db") + if os.path.exists(self.path): + os.remove(self.path) + self.init_db(path=self.path) + + def init_db(self, path: str = "database.sqlite", **kwargs: Any) -> None: + """ + Initializes the database by creating necessary tables. + + Args: + path (str): The file path to the SQLite database file. + **kwargs: Additional keyword arguments to pass to the sqlite3.connect method. + """ + # Connect to the database (or create a new one if it doesn't exist) + self.conn = sqlite3.connect(path, check_same_thread=False, **kwargs) + self.cursor = self.conn.cursor() + + # Create the table with the specified columns, appropriate data types, and a UNIQUE constraint on (root_msg_id, msg_id) + self.cursor.execute(MESSAGES_TABLE_SQL) + + # Create a sessions table + self.cursor.execute(SESSIONS_TABLE_SQL) + + # Create a skills + self.cursor.execute(SKILLS_TABLE_SQL) + + # Create a gallery table + self.cursor.execute(GALLERY_TABLE_SQL) + + # Create a agents table + self.cursor.execute(AGENTS_TABLE_SQL) + + # Create a workflows table + self.cursor.execute(WORKFLOWS_TABLE_SQL) + + # init skills table with content of defaultskills.json in current directory + current_dir = os.path.dirname(os.path.realpath(__file__)) + with open( + os.path.join(current_dir, "dbdefaults.json"), "r", encoding="utf-8" + ) as json_file: + data = json.load(json_file) + skills = data["skills"] + agents = data["agents"] + for skill in skills: + skill = Skill(**skill) + + self.cursor.execute( + "INSERT INTO skills (id, user_id, timestamp, content, title, file_name) VALUES (?, ?, ?, ?, ?, ?)", + ( + skill.id, + "default", + skill.timestamp, + skill.content, + skill.title, + skill.file_name, + ), + ) + for agent in agents: + agent = AgentFlowSpec(**agent) + agent.skills = ( + [skill.dict() for skill in agent.skills] if agent.skills else None + ) + self.cursor.execute( + "INSERT INTO agents (id, user_id, timestamp, config, type, skills, description) VALUES (?, ?, ?, ?, ?, ?, ?)", + ( + agent.id, + "default", + agent.timestamp, + json.dumps(agent.config.dict()), + agent.type, + json.dumps(agent.skills), + agent.description, + ), + ) + + for workflow in data["workflows"]: + workflow = AgentWorkFlowConfig(**workflow) + self.cursor.execute( + "INSERT INTO workflows (id, user_id, timestamp, sender, receiver, type, name, description, summary_method) VALUES (?, ?, ?, ?, ?, ?, ?, ?,?)", + ( + workflow.id, + "default", + workflow.timestamp, + json.dumps(workflow.sender.dict()), + json.dumps(workflow.receiver.dict()) + if not isinstance(workflow.receiver, list) + else json.dumps( + [receiver.dict() for receiver in workflow.receiver] + ), + workflow.type, + workflow.name, + workflow.description, + workflow.summary_method, + ), + ) + + # Commit the changes and close the connection + self.conn.commit() + + def query( + self, query: str, args: Tuple = (), return_json: bool = False + ) -> List[Dict[str, Any]]: + """ + Executes a given SQL query and returns the results. + + Args: + query (str): The SQL query to execute. + args (Tuple): The arguments to pass to the SQL query. + return_json (bool): If True, the results will be returned as a list of dictionaries. + + Returns: + List[Dict[str, Any]]: The result of the SQL query. + """ + try: + with lock: + self.cursor.execute(query, args) + result = self.cursor.fetchall() + self.commit() + if return_json: + result = [ + dict(zip([key[0] for key in self.cursor.description], row)) + for row in result + ] + return result + except Exception as e: + logger.error( + "Error running query with query %s and args %s: %s", query, args, e + ) + raise e + + def commit(self) -> None: + """ + Commits the current transaction to the database. + """ + self.conn.commit() + + def close(self) -> None: + """ + Closes the database connection. + """ + self.conn.close() + + +def create_message(message: Message, dbmanager: DBManager) -> None: + """ + Save a message in the database using the provided database manager. + + :param message: The Message object containing message data + :param dbmanager: The DBManager instance used to interact with the database + """ + query = "INSERT INTO messages (user_id, root_msg_id, msg_id, role, content, metadata, timestamp, session_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?)" + args = ( + message.user_id, + message.root_msg_id, + message.msg_id, + message.role, + message.content, + message.metadata, + message.timestamp, + message.session_id, + ) + dbmanager.query(query=query, args=args) + + +def get_messages(user_id: str, session_id: str, dbmanager: DBManager) -> List[dict]: + """ + Load messages for a specific user and session from the database, sorted by timestamp. + + :param user_id: The ID of the user whose messages are to be loaded + :param session_id: The ID of the session whose messages are to be loaded + :param dbmanager: The DBManager instance to interact with the database + + :return: A list of dictionaries, each representing a message + """ + query = "SELECT * FROM messages WHERE user_id = ? AND session_id = ?" + args = (user_id, session_id) + result = dbmanager.query(query=query, args=args, return_json=True) + # Sort by timestamp ascending + result = sorted(result, key=lambda k: k["timestamp"], reverse=False) + return result + + +def get_sessions(user_id: str, dbmanager: DBManager) -> List[dict]: + """ + Load sessions for a specific user from the database, sorted by timestamp. + + :param user_id: The ID of the user whose sessions are to be loaded + :param dbmanager: The DBManager instance to interact with the database + :return: A list of dictionaries, each representing a session + """ + query = "SELECT * FROM sessions WHERE user_id = ?" + args = (user_id,) + result = dbmanager.query(query=query, args=args, return_json=True) + # Sort by timestamp ascending + result = sorted(result, key=lambda k: k["timestamp"], reverse=True) + for row in result: + row["flow_config"] = json.loads(row["flow_config"]) + return result + + +def create_session(user_id: str, session: Session, dbmanager: DBManager) -> List[dict]: + """ + Create a new session for a specific user in the database. + + :param user_id: The ID of the user whose session is to be created + :param dbmanager: The DBManager instance to interact with the database + :return: A list of dictionaries, each representing a session + """ + query = ( + "INSERT INTO sessions (user_id, id, timestamp, flow_config) VALUES (?, ?, ?,?)" + ) + args = ( + session.user_id, + session.id, + session.timestamp, + json.dumps(session.flow_config.dict()), + ) + dbmanager.query(query=query, args=args) + sessions = get_sessions(user_id=user_id, dbmanager=dbmanager) + + return sessions + + +def delete_session(session: Session, dbmanager: DBManager) -> List[dict]: + """ + Delete a specific session and all messages for that session in the database. + + :param session: The Session object containing session data + :param dbmanager: The DBManager instance to interact with the database + :return: A list of the remaining sessions + """ + + query = "DELETE FROM sessions WHERE id = ?" + args = (session.id,) + dbmanager.query(query=query, args=args) + + query = "DELETE FROM messages WHERE session_id = ?" + args = (session.id,) + dbmanager.query(query=query, args=args) + + return get_sessions(user_id=session.user_id, dbmanager=dbmanager) + + +def create_gallery( + session: Session, dbmanager: DBManager, tags: List[str] = [] +) -> Gallery: + """ + Publish a session to the gallery table in the database. Fetches the session messages first, then saves session and messages object to the gallery database table. + :param session: The Session object containing session data + :param dbmanager: The DBManager instance used to interact with the database + :param tags: A list of tags to associate with the session + :return: A gallery object containing the session and messages objects + """ + + messages = get_messages( + user_id=session.user_id, session_id=session.id, dbmanager=dbmanager + ) + gallery_item = Gallery(session=session, messages=messages, tags=tags) + query = "INSERT INTO gallery (id, session, messages, tags, timestamp) VALUES (?, ?, ?, ?,?)" + args = ( + gallery_item.id, + json.dumps(gallery_item.session.dict()), + json.dumps([message.dict() for message in gallery_item.messages]), + json.dumps(gallery_item.tags), + gallery_item.timestamp, + ) + dbmanager.query(query=query, args=args) + return gallery_item + + +def get_gallery(gallery_id, dbmanager: DBManager) -> List[Gallery]: + """ + Load gallery items from the database, sorted by timestamp. If gallery_id is provided, only the gallery item with the matching gallery_id will be returned. + + :param gallery_id: The ID of the gallery item to be loaded + :param dbmanager: The DBManager instance to interact with the database + :return: A list of Gallery objects + """ + + if gallery_id: + query = "SELECT * FROM gallery WHERE id = ?" + args = (gallery_id,) + else: + query = "SELECT * FROM gallery" + args = () + result = dbmanager.query(query=query, args=args, return_json=True) + # Sort by timestamp ascending + result = sorted(result, key=lambda k: k["timestamp"], reverse=True) + gallery = [] + for row in result: + gallery_item = Gallery( + id=row["id"], + session=Session(**json.loads(row["session"])), + messages=[Message(**message) for message in json.loads(row["messages"])], + tags=json.loads(row["tags"]), + timestamp=row["timestamp"], + ) + gallery.append(gallery_item) + return gallery + + +def get_skills(user_id: str, dbmanager: DBManager) -> List[Skill]: + """ + Load skills from the database, sorted by timestamp. Load skills where id = user_id or user_id = default. + + :param user_id: The ID of the user whose skills are to be loaded + :param dbmanager: The DBManager instance to interact with the database + :return: A list of Skill objects + """ + + query = "SELECT * FROM skills WHERE user_id = ? OR user_id = ?" + args = (user_id, "default") + result = dbmanager.query(query=query, args=args, return_json=True) + # Sort by timestamp ascending + result = sorted(result, key=lambda k: k["timestamp"], reverse=True) + skills = [] + for row in result: + skill = Skill(**row) + skills.append(skill) + return skills + + +def upsert_skill(skill: Skill, dbmanager: DBManager) -> List[Skill]: + """ + Insert or update a skill for a specific user in the database. + + If the skill with the given ID already exists, it will be updated with the new data. + Otherwise, a new skill will be created. + + :param skill: The Skill object containing skill data + :param dbmanager: The DBManager instance to interact with the database + :return: A list of dictionaries, each representing a skill + """ + + existing_skill = get_item_by_field("skills", "id", skill.id, dbmanager) + + if existing_skill: + updated_data = { + "user_id": skill.user_id, + "timestamp": skill.timestamp, + "content": skill.content, + "title": skill.title, + "file_name": skill.file_name, + } + update_item("skills", skill.id, updated_data, dbmanager) + else: + query = "INSERT INTO skills (id, user_id, timestamp, content, title, file_name) VALUES (?, ?, ?, ?, ?, ?)" + args = ( + skill.id, + skill.user_id, + skill.timestamp, + skill.content, + skill.title, + skill.file_name, + ) + dbmanager.query(query=query, args=args) + + skills = get_skills(user_id=skill.user_id, dbmanager=dbmanager) + + return skills + + +def delete_skill(skill: Skill, dbmanager: DBManager) -> List[Skill]: + """ + Delete a skill for a specific user in the database. + + :param skill: The Skill object containing skill data + :param dbmanager: The DBManager instance to interact with the database + :return: A list of dictionaries, each representing a skill + """ + # delete where id = skill.id and user_id = skill.user_id + query = "DELETE FROM skills WHERE id = ? AND user_id = ?" + args = (skill.id, skill.user_id) + dbmanager.query(query=query, args=args) + + return get_skills(user_id=skill.user_id, dbmanager=dbmanager) + + +def delete_message( + user_id: str, + msg_id: str, + session_id: str, + dbmanager: DBManager, + delete_all: bool = False, +) -> List[dict]: + """ + Delete a specific message or all messages for a user and session from the database. + + :param user_id: The ID of the user whose messages are to be deleted + :param msg_id: The ID of the specific message to be deleted (ignored if delete_all is True) + :param session_id: The ID of the session whose messages are to be deleted + :param dbmanager: The DBManager instance to interact with the database + :param delete_all: If True, all messages for the user will be deleted + :return: A list of the remaining messages if not all were deleted, otherwise an empty list + """ + + if delete_all: + query = "DELETE FROM messages WHERE user_id = ? AND session_id = ?" + args = (user_id, session_id) + dbmanager.query(query=query, args=args) + return [] + else: + query = ( + "DELETE FROM messages WHERE user_id = ? AND msg_id = ? AND session_id = ?" + ) + args = (user_id, msg_id, session_id) + dbmanager.query(query=query, args=args) + messages = get_messages( + user_id=user_id, session_id=session_id, dbmanager=dbmanager + ) + return messages + + +def get_agents(user_id: str, dbmanager: DBManager) -> List[AgentFlowSpec]: + """ + Load agents from the database, sorted by timestamp. Load agents where id = user_id or user_id = default. + + :param user_id: The ID of the user whose agents are to be loaded + :param dbmanager: The DBManager instance to interact with the database + :return: A list of AgentFlowSpec objects + """ + + query = "SELECT * FROM agents WHERE user_id = ? OR user_id = ?" + args = (user_id, "default") + result = dbmanager.query(query=query, args=args, return_json=True) + # Sort by timestamp ascending + result = sorted(result, key=lambda k: k["timestamp"], reverse=True) + agents = [] + for row in result: + row["config"] = json.loads(row["config"]) + row["skills"] = json.loads(row["skills"] or "[]") + agent = AgentFlowSpec(**row) + agents.append(agent) + return agents + + +def upsert_agent( + agent_flow_spec: AgentFlowSpec, dbmanager: DBManager +) -> List[Dict[str, Any]]: + """ + Insert or update an agent for a specific user in the database. + + If the agent with the given ID already exists, it will be updated with the new data. + Otherwise, a new agent will be created. + + :param agent_flow_spec: The AgentFlowSpec object containing agent configuration + :param dbmanager: The DBManager instance to interact with the database + :return: A list of dictionaries, each representing an agent after insertion or update + """ + + existing_agent = get_item_by_field("agents", "id", agent_flow_spec.id, dbmanager) + + if existing_agent: + updated_data = { + "user_id": agent_flow_spec.user_id, + "timestamp": agent_flow_spec.timestamp, + "config": json.dumps(agent_flow_spec.config.dict()), + "type": agent_flow_spec.type, + "description": agent_flow_spec.description, + "skills": json.dumps( + [x.dict() for x in agent_flow_spec.skills] + if agent_flow_spec.skills + else [] + ), + } + update_item("agents", agent_flow_spec.id, updated_data, dbmanager) + else: + query = "INSERT INTO agents (id, user_id, timestamp, config, type, description, skills) VALUES (?, ?, ?, ?, ?, ?, ?)" + config_json = json.dumps(agent_flow_spec.config.dict()) + args = ( + agent_flow_spec.id, + agent_flow_spec.user_id, + agent_flow_spec.timestamp, + config_json, + agent_flow_spec.type, + agent_flow_spec.description, + json.dumps( + [x.dict() for x in agent_flow_spec.skills] + if agent_flow_spec.skills + else [] + ), + ) + dbmanager.query(query=query, args=args) + + agents = get_agents(user_id=agent_flow_spec.user_id, dbmanager=dbmanager) + return agents + + +def delete_agent(agent: AgentFlowSpec, dbmanager: DBManager) -> List[Dict[str, Any]]: + """ + Delete an agent for a specific user from the database. + + :param agent: The AgentFlowSpec object containing agent configuration + :param dbmanager: The DBManager instance to interact with the database + :return: A list of dictionaries, each representing an agent after deletion + """ + + # delete based on agent.id and agent.user_id + query = "DELETE FROM agents WHERE id = ? AND user_id = ?" + args = (agent.id, agent.user_id) + dbmanager.query(query=query, args=args) + + return get_agents(user_id=agent.user_id, dbmanager=dbmanager) + + +def get_item_by_field( + table: str, field: str, value: Any, dbmanager: DBManager +) -> Optional[Dict[str, Any]]: + query = f"SELECT * FROM {table} WHERE {field} = ?" + args = (value,) + result = dbmanager.query(query=query, args=args) + return result[0] if result else None + + +def update_item( + table: str, item_id: str, updated_data: Dict[str, Any], dbmanager: DBManager +) -> None: + set_clause = ", ".join([f"{key} = ?" for key in updated_data.keys()]) + query = f"UPDATE {table} SET {set_clause} WHERE id = ?" + args = (*updated_data.values(), item_id) + dbmanager.query(query=query, args=args) + + +def get_workflows(user_id: str, dbmanager: DBManager) -> List[Dict[str, Any]]: + """ + Load workflows for a specific user from the database, sorted by timestamp. + + :param user_id: The ID of the user whose workflows are to be loaded + :param dbmanager: The DBManager instance to interact with the database + :return: A list of dictionaries, each representing a workflow + """ + query = "SELECT * FROM workflows WHERE user_id = ? OR user_id = ?" + args = (user_id, "default") + result = dbmanager.query(query=query, args=args, return_json=True) + # Sort by timestamp ascending + result = sorted(result, key=lambda k: k["timestamp"], reverse=True) + workflows = [] + for row in result: + row["sender"] = json.loads(row["sender"]) + row["receiver"] = json.loads(row["receiver"]) + workflow = AgentWorkFlowConfig(**row) + workflows.append(workflow) + return workflows + + +def upsert_workflow( + workflow: AgentWorkFlowConfig, dbmanager: DBManager +) -> List[Dict[str, Any]]: + """ + Insert or update a workflow for a specific user in the database. + + If the workflow with the given ID already exists, it will be updated with the new data. + Otherwise, a new workflow will be created. + + :param workflow: The AgentWorkFlowConfig object containing workflow data + :param dbmanager: The DBManager instance to interact with the database + :return: A list of dictionaries, each representing a workflow after insertion or update + """ + existing_workflow = get_item_by_field("workflows", "id", workflow.id, dbmanager) + + if existing_workflow: + updated_data = { + "user_id": workflow.user_id, + "timestamp": workflow.timestamp, + "sender": json.dumps(workflow.sender.dict()), + "receiver": json.dumps( + [receiver.dict() for receiver in workflow.receiver] + if isinstance(workflow.receiver, list) + else workflow.receiver.dict() + ), + "type": workflow.type, + "name": workflow.name, + "description": workflow.description, + "summary_method": workflow.summary_method, + } + update_item("workflows", workflow.id, updated_data, dbmanager) + else: + query = "INSERT INTO workflows (id, user_id, timestamp, sender, receiver, type, name, description, summary_method) VALUES (?, ?, ?, ?, ?, ?, ?, ?,?)" + args = ( + workflow.id, + workflow.user_id, + workflow.timestamp, + json.dumps(workflow.sender.dict()), + json.dumps( + [receiver.dict() for receiver in workflow.receiver] + if isinstance(workflow.receiver, list) + else workflow.receiver.dict() + ), + workflow.type, + workflow.name, + workflow.description, + workflow.summary_method, + ) + dbmanager.query(query=query, args=args) + + return get_workflows(user_id=workflow.user_id, dbmanager=dbmanager) + + +def delete_workflow( + workflow: AgentWorkFlowConfig, dbmanager: DBManager +) -> List[Dict[str, Any]]: + """ + Delete a workflow for a specific user from the database. If the workflow does not exist, do nothing. + + :param workflow: The AgentWorkFlowConfig object containing workflow data + :param dbmanager: The DBManager instance to interact with the database + :return: A list of dictionaries, each representing a workflow after deletion + """ + + # delete where workflow.id =id and workflow.user_id = user_id + + query = "DELETE FROM workflows WHERE id = ? AND user_id = ?" + args = (workflow.id, workflow.user_id) + dbmanager.query(query=query, args=args) + + return get_workflows(user_id=workflow.user_id, dbmanager=dbmanager) diff --git a/samples/apps/fireworks-studio/firestudio/utils/utils.py b/samples/apps/fireworks-studio/firestudio/utils/utils.py new file mode 100644 index 000000000000..12c65a79bf6f --- /dev/null +++ b/samples/apps/fireworks-studio/firestudio/utils/utils.py @@ -0,0 +1,441 @@ +import ast +import base64 +import hashlib +from typing import Any, Callable, List, Dict, Optional, Tuple, TypeVar, Union +import os +import shutil +import re +import autogen +from autogen.function_utils import get_function_schema +from ..datamodel import ( + AgentConfig, + AgentFlowSpec, + AgentWorkFlowConfig, + LLMConfig, + Skill, +) + +F = TypeVar("F", bound=Callable[..., Any]) + +PARSED_SCHEMA: Optional[Dict[str, Any]] = None +FN_INSTANCE: Optional[F] = None + + +def md5_hash(text: str) -> str: + """ + Compute the MD5 hash of a given text. + + :param text: The string to hash + :return: The MD5 hash of the text + """ + return hashlib.md5(text.encode()).hexdigest() + + +def clear_folder(folder_path: str) -> None: + """ + Clear the contents of a folder. + + :param folder_path: The path to the folder to clear. + """ + for file in os.listdir(folder_path): + file_path = os.path.join(folder_path, file) + if os.path.isfile(file_path): + os.remove(file_path) + elif os.path.isdir(file_path): + shutil.rmtree(file_path) + + +def get_file_type(file_path: str) -> str: + """ + + + Get file type determined by the file extension. If the file extension is not + recognized, 'unknown' will be used as the file type. + + :param file_path: The path to the file to be serialized. + :return: A string containing the file type. + """ + + # Extended list of file extensions for code and text files + CODE_EXTENSIONS = { + ".py", + ".js", + ".jsx", + ".java", + ".c", + ".cpp", + ".cs", + ".ts", + ".tsx", + ".html", + ".css", + ".scss", + ".less", + ".json", + ".xml", + ".yaml", + ".yml", + ".md", + ".rst", + ".tex", + ".sh", + ".bat", + ".ps1", + ".php", + ".rb", + ".go", + ".swift", + ".kt", + ".hs", + ".scala", + ".lua", + ".pl", + ".sql", + ".config", + } + + # Supported image extensions + IMAGE_EXTENSIONS = { + ".png", + ".jpg", + ".jpeg", + ".gif", + ".bmp", + ".tiff", + ".svg", + ".webp", + } + # Supported (web) video extensions + VIDEO_EXTENSIONS = {".mp4", ".webm", ".ogg", ".mov", ".avi", ".wmv"} + + # Supported PDF extension + PDF_EXTENSION = ".pdf" + + # Determine the file extension + _, file_extension = os.path.splitext(file_path) + + # Determine the file type based on the extension + if file_extension in CODE_EXTENSIONS: + file_type = "code" + elif file_extension in IMAGE_EXTENSIONS: + file_type = "image" + elif file_extension == PDF_EXTENSION: + file_type = "pdf" + elif file_extension in VIDEO_EXTENSIONS: + file_type = "video" + else: + file_type = "unknown" + + return file_type + + +def serialize_file(file_path: str) -> Tuple[str, str]: + """ + Reads a file from a given file path, base64 encodes its content, + and returns the base64 encoded string along with the file type. + + The file type is determined by the file extension. If the file extension is not + recognized, 'unknown' will be used as the file type. + + :param file_path: The path to the file to be serialized. + :return: A tuple containing the base64 encoded string of the file and the file type. + """ + + file_type = get_file_type(file_path) + + # Read the file and encode its contents + try: + with open(file_path, "rb") as file: + file_content = file.read() + base64_encoded_content = base64.b64encode(file_content).decode("utf-8") + except Exception as e: + raise IOError(f"An error occurred while reading the file: {e}") from e + + return base64_encoded_content, file_type + + +def get_modified_files( + start_timestamp: float, end_timestamp: float, source_dir: str, dest_dir: str +) -> List[Dict[str, str]]: + """ + Copy files from source_dir that were modified within a specified timestamp range + to dest_dir, renaming files if they already exist there. The function excludes + files with certain file extensions and names. + + :param start_timestamp: The start timestamp to filter modified files. + :param end_timestamp: The end timestamp to filter modified files. + :param source_dir: The directory to search for modified files. + :param dest_dir: The destination directory to copy modified files to. + + :return: A list of dictionaries with details of file paths in dest_dir that were modified and copied over. + Dictionary format: {path: "", name: "", extension: ""} + Files with extensions "__pycache__", "*.pyc", "__init__.py", and "*.cache" + are ignored. + """ + modified_files = [] + ignore_extensions = {".pyc", ".cache"} + ignore_files = {"__pycache__", "__init__.py"} + + for root, dirs, files in os.walk(source_dir): + # Excluding the directory "__pycache__" if present + dirs[:] = [d for d in dirs if d not in ignore_files] + + for file in files: + file_path = os.path.join(root, file) + file_ext = os.path.splitext(file)[1] + file_name = os.path.basename(file) + + if file_ext in ignore_extensions or file_name in ignore_files: + continue + + file_mtime = os.path.getmtime(file_path) + if start_timestamp < file_mtime < end_timestamp: + dest_file_path = os.path.join(dest_dir, file) + copy_idx = 1 + while os.path.exists(dest_file_path): + base, extension = os.path.splitext(file) + # Handling potential name conflicts by appending a number + dest_file_path = os.path.join( + dest_dir, f"{base}_{copy_idx}{extension}" + ) + copy_idx += 1 + + # Copying the modified file to the destination directory + shutil.copy2(file_path, dest_file_path) + + # Extract user id from the dest_dir and file path + uid = dest_dir.split("/")[-1] + relative_file_path = os.path.relpath(dest_file_path, start=dest_dir) + file_type = get_file_type(dest_file_path) + file_dict = { + "path": f"files/user/{uid}/{relative_file_path}", + "name": file_name, + "extension": file_ext.replace(".", ""), + "type": file_type, + } + modified_files.append(file_dict) + # sort by extension + modified_files.sort(key=lambda x: x["extension"]) + return modified_files + + +def init_webserver_folders(root_file_path: str) -> Dict[str, str]: + """ + Initialize folders needed for a web server, such as static file directories + and user-specific data directories. + + :param root_file_path: The root directory where webserver folders will be created + :return: A dictionary with the path of each created folder + """ + files_static_root = os.path.join(root_file_path, "files/") + static_folder_root = os.path.join(root_file_path, "ui") + workdir_root = os.path.join(root_file_path, "workdir") + + os.makedirs(files_static_root, exist_ok=True) + os.makedirs(os.path.join(files_static_root, "user"), exist_ok=True) + os.makedirs(static_folder_root, exist_ok=True) + os.makedirs(workdir_root, exist_ok=True) + + folders = { + "files_static_root": files_static_root, + "static_folder_root": static_folder_root, + "workdir_root": workdir_root, + } + return folders + + +def save_skils(prompt: str, work_dir: str) -> None: + # check if work_dir exists + if not os.path.exists(work_dir): + os.makedirs(work_dir) + + # check if skills.py exist. if exists, append to the file, else create a new file and write to it + + if os.path.exists(os.path.join(work_dir, "skills.py")): + with open(os.path.join(work_dir, "skills.py"), "a", encoding="utf-8") as f: + f.write(prompt) + else: + with open(os.path.join(work_dir, "skills.py"), "w", encoding="utf-8") as f: + f.write(prompt) + + +def generate_prompt_from_skill(skill: Skill) -> str: + return f""" + ##### Begin of {skill.title} ##### + + {skill.content} + + #### End of {skill.title} #### + """ + + +def get_prompt_and_tools_from_skills( + skills: List[Skill], work_dir: str +) -> Union[str, List[Dict[str, Any]]]: + global PARSED_SCHEMA + tools: List[Dict[str, Any]] = [] + fn_instance: List[F] = [] + for skill in skills: + # Set parsed schema to None + PARSED_SCHEMA = None + exec(skill.content) + if PARSED_SCHEMA is not None: + tools.append(PARSED_SCHEMA) + assert ( + FN_INSTANCE is not None + ), f"FN_INSTANCE is none for parsed schema {PARSED_SCHEMA}" + fn_instance.append(FN_INSTANCE) + + # Save every function to work_dir for execution later + prompt = get_skills_from_prompt(skills, work_dir) + + return prompt, tools, fn_instance + + +def get_skills_from_prompt(skills: List[Skill], work_dir: str) -> str: + """ + Create a prompt with the content of all skills and write the skills to a file named skills.py in the work_dir. + + :param skills: A dictionary skills + :return: A string containing the content of all skills + """ + + instruction = """ + +While solving the task you may use functions below which will be available in a file called skills.py . +To use a function skill.py in code, IMPORT THE FUNCTION FROM skills.py and then use the function. +If you need to install python packages, write shell code to +install via pip and use --quiet option. + + """ + prompt = "" # filename: skills.py + for skill in skills: + prompt += generate_prompt_from_skill(skill) + save_skils(prompt, work_dir) + + return instruction + prompt + + +def delete_files_in_folder(folders: Union[str, List[str]]) -> None: + """ + Delete all files and directories in the specified folders. + + :param folders: A list of folders or a single folder string + """ + + if isinstance(folders, str): + folders = [folders] + + for folder in folders: + # Check if the folder exists + if not os.path.isdir(folder): + print(f"The folder {folder} does not exist.") + continue + + # List all the entries in the directory + for entry in os.listdir(folder): + # Get the full path + path = os.path.join(folder, entry) + try: + if os.path.isfile(path) or os.path.islink(path): + # Remove the file or link + os.remove(path) + elif os.path.isdir(path): + # Remove the directory and all its content + shutil.rmtree(path) + except Exception as e: + # Print the error message and skip + print(f"Failed to delete {path}. Reason: {e}") + + +def get_default_agent_config(work_dir: str) -> AgentWorkFlowConfig: + """ + Get a default agent flow config . + """ + + llm_config = LLMConfig( + config_list=[{"model": "gpt-4"}], + temperature=0, + ) + + USER_PROXY_INSTRUCTIONS = """If the request has been addressed sufficiently, summarize the answer and end with the word TERMINATE. Otherwise, ask a follow-up question. + """ + + userproxy_spec = AgentFlowSpec( + type="userproxy", + config=AgentConfig( + name="user_proxy", + human_input_mode="NEVER", + system_message=USER_PROXY_INSTRUCTIONS, + code_execution_config={ + "work_dir": work_dir, + "use_docker": False, + }, + max_consecutive_auto_reply=10, + llm_config=llm_config, + is_termination_msg=lambda x: x.get("content", "") + .rstrip() + .endswith("TERMINATE"), + ), + ) + + assistant_spec = AgentFlowSpec( + type="assistant", + config=AgentConfig( + name="primary_assistant", + system_message=autogen.AssistantAgent.DEFAULT_SYSTEM_MESSAGE, + llm_config=llm_config, + ), + ) + + flow_config = AgentWorkFlowConfig( + name="default", + sender=userproxy_spec, + receiver=assistant_spec, + type="default", + description="Default agent flow config", + ) + + return flow_config + + +def extract_successful_code_blocks(messages: List[Dict[str, str]]) -> List[str]: + """ + Parses through a list of messages containing code blocks and execution statuses, + returning the array of code blocks that executed successfully and retains + the backticks for Markdown rendering. + + Parameters: + messages (List[Dict[str, str]]): A list of message dictionaries containing 'content' and 'role' keys. + + Returns: + List[str]: A list containing the code blocks that were successfully executed, including backticks. + """ + successful_code_blocks = [] + # Regex pattern to capture code blocks enclosed in triple backticks. + code_block_regex = r"```[\s\S]*?```" + + for i, row in enumerate(messages): + message = row["message"] + if message["role"] == "user" and "execution succeeded" in message["content"]: + if i > 0 and messages[i - 1]["message"]["role"] == "assistant": + prev_content = messages[i - 1]["message"]["content"] + # Find all matches for code blocks + code_blocks = re.findall(code_block_regex, prev_content) + # Add the code blocks with backticks + successful_code_blocks.extend(code_blocks) + + return successful_code_blocks + + +# TODO Think about moving this to layer above +def schema_recorder(*, description: Optional[str] = None) -> Callable[[F], F]: + def _decorator(func: F) -> F: + global PARSED_SCHEMA + global FN_INSTANCE + # TODO handle failure case scenario + PARSED_SCHEMA = get_function_schema(func, description=description) + FN_INSTANCE = func + return func + + return _decorator diff --git a/samples/apps/fireworks-studio/firestudio/version.py b/samples/apps/fireworks-studio/firestudio/version.py new file mode 100644 index 000000000000..a01681a79e40 --- /dev/null +++ b/samples/apps/fireworks-studio/firestudio/version.py @@ -0,0 +1,3 @@ +VERSION = "0.0.18a" +__version__ = VERSION +APP_NAME = "firestudio" diff --git a/samples/apps/fireworks-studio/firestudio/web/__init__.py b/samples/apps/fireworks-studio/firestudio/web/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/samples/apps/fireworks-studio/firestudio/web/app.py b/samples/apps/fireworks-studio/firestudio/web/app.py new file mode 100644 index 000000000000..56614e95c8b4 --- /dev/null +++ b/samples/apps/fireworks-studio/firestudio/web/app.py @@ -0,0 +1,433 @@ +import json +import os +import traceback +from typing import Any, Dict, List +from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware +from fastapi.staticfiles import StaticFiles +from fastapi import HTTPException + + +from ..datamodel import ( + ChatWebRequestModel, + DBWebRequestModel, + DeleteMessageWebRequestModel, + Message, + Session, +) +from ..utils import md5_hash, init_webserver_folders, DBManager, dbutils + +from ..chatmanager import AutoGenChatManager + + +app = FastAPI() + + +# allow cross origin requests for testing on localhost:800* ports only +app.add_middleware( + CORSMiddleware, + allow_origins=[ + "http://localhost:8000", + "http://127.0.0.1:8000", + "http://localhost:8001", + "http://localhost:8081", + "http://localhost:8082", + ], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + + +root_file_path = os.path.dirname(os.path.abspath(__file__)) +# init folders skills, workdir, static, files etc +folders = init_webserver_folders(root_file_path) + +api = FastAPI(root_path="/api") +# mount an api route such that the main route serves the ui and the /api +app.mount("/api", api) + +app.mount( + "/", StaticFiles(directory=folders["static_folder_root"], html=True), name="ui" +) +api.mount( + "/files", + StaticFiles(directory=folders["files_static_root"], html=True), + name="files", +) + + +db_path = os.path.join(root_file_path, "database.sqlite") +dbmanager = DBManager(path=db_path) # manage database operations +chatmanager = AutoGenChatManager() # manage calls to autogen + + +@api.post("/messages") +async def add_message(req: ChatWebRequestModel): + message = Message(**req.message.dict()) + user_history: List[Message] = dbutils.get_messages( + user_id=message.user_id, session_id=req.message.session_id, dbmanager=dbmanager + ) + + # save incoming message to db + dbutils.create_message(message=message, dbmanager=dbmanager) + user_dir = os.path.join( + folders["files_static_root"], "user", md5_hash(message.user_id) + ) + os.makedirs(user_dir, exist_ok=True) + + try: + response_messages: List[Message] = chatmanager.chat( + message=message, + history_list=user_history, + agent_flow_config=req.flow_config, + work_dir=user_dir, + ) + + # save assistant response to db + for response_message in response_messages: + dbutils.create_message(message=response_message, dbmanager=dbmanager) + response = { + "status": True, + "message": { + response_message.receiver_name: response_message.content + for response_message in response_messages + }, + "metadata": { + response_message.receiver_name: json.loads(response_message.metadata) + for response_message in response_messages + }, + } + return response + except Exception as ex_error: + print(traceback.format_exc()) + return { + "status": False, + "message": "Error occurred while processing message: " + str(ex_error), + } + + +@api.get("/messages") +async def get_messages(user_id: str = None, session_id: str = None): + if user_id is None: + raise HTTPException(status_code=400, detail="user_id is required") + try: + user_history = dbutils.get_messages( + user_id=user_id, session_id=session_id, dbmanager=dbmanager + ) + + return { + "status": True, + "data": user_history, + "message": "Messages retrieved successfully", + } + except Exception as ex_error: + print(ex_error) + return { + "status": False, + "message": "Error occurred while retrieving messages: " + str(ex_error), + } + + +@api.get("/gallery") +async def get_gallery_items(gallery_id: str = None): + try: + gallery = dbutils.get_gallery(gallery_id=gallery_id, dbmanager=dbmanager) + return { + "status": True, + "data": gallery, + "message": "Gallery items retrieved successfully", + } + except Exception as ex_error: + print(ex_error) + return { + "status": False, + "message": "Error occurred while retrieving messages: " + str(ex_error), + } + + +@api.get("/sessions") +async def get_user_sessions(user_id: str = None): + """Return a list of all sessions for a user""" + if user_id is None: + raise HTTPException(status_code=400, detail="user_id is required") + + try: + user_sessions = dbutils.get_sessions(user_id=user_id, dbmanager=dbmanager) + + return { + "status": True, + "data": user_sessions, + "message": "Sessions retrieved successfully", + } + except Exception as ex_error: + print(ex_error) + return { + "status": False, + "message": "Error occurred while retrieving sessions: " + str(ex_error), + } + + +@api.post("/sessions") +async def create_user_session(req: DBWebRequestModel): + """Create a new session for a user""" + # print(req.session, "**********" ) + + try: + session = Session( + user_id=req.session.user_id, flow_config=req.session.flow_config + ) + user_sessions = dbutils.create_session( + user_id=req.user_id, session=session, dbmanager=dbmanager + ) + return { + "status": True, + "message": "Session created successfully", + "data": user_sessions, + } + except Exception as ex_error: + print(traceback.format_exc()) + return { + "status": False, + "message": "Error occurred while creating session: " + str(ex_error), + } + + +@api.post("/sessions/publish") +async def publish_user_session_to_gallery(req: DBWebRequestModel): + """Create a new session for a user""" + + try: + gallery_item = dbutils.create_gallery( + req.session, tags=req.tags, dbmanager=dbmanager + ) + return { + "status": True, + "message": "Session successfully published", + "data": gallery_item, + } + except Exception as ex_error: + print(traceback.format_exc()) + return { + "status": False, + "message": "Error occurred while publishing session: " + str(ex_error), + } + + +@api.delete("/sessions/delete") +async def delete_user_session(req: DBWebRequestModel): + """Delete a session for a user""" + + try: + sessions = dbutils.delete_session(session=req.session, dbmanager=dbmanager) + return { + "status": True, + "message": "Session deleted successfully", + "data": sessions, + } + except Exception as ex_error: + print(traceback.format_exc()) + return { + "status": False, + "message": "Error occurred while deleting session: " + str(ex_error), + } + + +@api.post("/messages/delete") +async def remove_message(req: DeleteMessageWebRequestModel): + """Delete a message from the database""" + + try: + messages = dbutils.delete_message( + user_id=req.user_id, + msg_id=req.msg_id, + session_id=req.session_id, + dbmanager=dbmanager, + ) + return { + "status": True, + "message": "Message deleted successfully", + "data": messages, + } + except Exception as ex_error: + print(ex_error) + return { + "status": False, + "message": "Error occurred while deleting message: " + str(ex_error), + } + + +@api.get("/skills") +async def get_user_skills(user_id: str): + try: + skills = dbutils.get_skills(user_id, dbmanager=dbmanager) + + return { + "status": True, + "message": "Skills retrieved successfully", + "data": skills, + } + except Exception as ex_error: + print(ex_error) + return { + "status": False, + "message": "Error occurred while retrieving skills: " + str(ex_error), + } + + +@api.post("/skills") +async def create_user_skills(req: DBWebRequestModel): + try: + skills = dbutils.upsert_skill(skill=req.skill, dbmanager=dbmanager) + + return { + "status": True, + "message": "Skills retrieved successfully", + "data": skills, + } + + except Exception as ex_error: + print(ex_error) + return { + "status": False, + "message": "Error occurred while creating skills: " + str(ex_error), + } + + +@api.delete("/skills/delete") +async def delete_user_skills(req: DBWebRequestModel): + """Delete a skill for a user""" + + try: + skills = dbutils.delete_skill(req.skill, dbmanager=dbmanager) + + return { + "status": True, + "message": "Skill deleted successfully", + "data": skills, + } + + except Exception as ex_error: + print(ex_error) + return { + "status": False, + "message": "Error occurred while deleting skill: " + str(ex_error), + } + + +@api.get("/agents") +async def get_user_agents(user_id: str): + try: + agents = dbutils.get_agents(user_id, dbmanager=dbmanager) + + return { + "status": True, + "message": "Agents retrieved successfully", + "data": agents, + } + except Exception as ex_error: + print(ex_error) + return { + "status": False, + "message": "Error occurred while retrieving agents: " + str(ex_error), + } + + +@api.post("/agents") +async def create_user_agents(req: DBWebRequestModel): + """Create a new agent for a user""" + + try: + agents = dbutils.upsert_agent(agent_flow_spec=req.agent, dbmanager=dbmanager) + + return { + "status": True, + "message": "Agent created successfully", + "data": agents, + } + + except Exception as ex_error: + print(traceback.format_exc()) + return { + "status": False, + "message": "Error occurred while creating agent: " + str(ex_error), + } + + +@api.delete("/agents/delete") +async def delete_user_agent(req: DBWebRequestModel): + """Delete an agent for a user""" + + try: + agents = dbutils.delete_agent(agent=req.agent, dbmanager=dbmanager) + + return { + "status": True, + "message": "Agent deleted successfully", + "data": agents, + } + + except Exception as ex_error: + print(traceback.format_exc()) + return { + "status": False, + "message": "Error occurred while deleting agent: " + str(ex_error), + } + + +@api.get("/workflows") +async def get_user_workflows(user_id: str): + try: + workflows = dbutils.get_workflows(user_id, dbmanager=dbmanager) + + return { + "status": True, + "message": "Workflows retrieved successfully", + "data": workflows, + } + except Exception as ex_error: + print(ex_error) + return { + "status": False, + "message": "Error occurred while retrieving workflows: " + str(ex_error), + } + + +@api.post("/workflows") +async def create_user_workflow(req: DBWebRequestModel): + """Create a new workflow for a user""" + + try: + workflow = dbutils.upsert_workflow(workflow=req.workflow, dbmanager=dbmanager) + return { + "status": True, + "message": "Workflow created successfully", + "data": workflow, + } + + except Exception as ex_error: + print(ex_error) + return { + "status": False, + "message": "Error occurred while creating workflow: " + str(ex_error), + } + + +@api.delete("/workflows/delete") +async def delete_user_workflow(req: DBWebRequestModel): + """Delete a workflow for a user""" + + try: + workflow = dbutils.delete_workflow(workflow=req.workflow, dbmanager=dbmanager) + return { + "status": True, + "message": "Workflow deleted successfully", + "data": workflow, + } + + except Exception as ex_error: + print(ex_error) + return { + "status": False, + "message": "Error occurred while deleting workflow: " + str(ex_error), + } diff --git a/samples/apps/fireworks-studio/firestudio/workflowmanager.py b/samples/apps/fireworks-studio/firestudio/workflowmanager.py new file mode 100644 index 000000000000..90507158cf99 --- /dev/null +++ b/samples/apps/fireworks-studio/firestudio/workflowmanager.py @@ -0,0 +1,181 @@ +from dataclasses import asdict +import autogen +from .datamodel import AgentFlowSpec, AgentWorkFlowConfig, Message +from .utils import get_prompt_and_tools_from_skills, clear_folder +from datetime import datetime +from typing import Any, Callable, List, Dict, Optional, Tuple, TypeVar, Union + + +F = TypeVar("F", bound=Callable[..., Any]) + + +class AutoGenWorkFlowManager: + """ + AutoGenWorkFlowManager class to load agents from a provided configuration and run a chat between them + """ + + def __init__( + self, + config: AgentWorkFlowConfig, + history: Optional[List[Message]] = None, + work_dir: str = None, + clear_work_dir: bool = True, + ) -> None: + """ + Initializes the AutoGenFlow with agents specified in the config and optional + message history. + + Args: + config: The configuration settings for the sender and receiver agents. + history: An optional list of previous messages to populate the agents' history. + + """ + self.work_dir = work_dir or "work_dir" + if clear_work_dir: + clear_folder(self.work_dir) + + self.receiver = self.load(config.receiver) + self.sender = self.load(config.sender) + + self.agent_history = [] + + if history: + self.populate_history(history) + + def process_reply(self, recipient, messages, sender, config): + if "callback" in config and config["callback"] is not None: + callback = config["callback"] + callback(sender, recipient, messages[-1]) + iteration = { + "sender": sender.name, + "recipient": recipient.name, + "message": messages[-1], + "timestamp": datetime.now().isoformat(), + } + self.agent_history.append(iteration) + return False, None + + def _sanitize_history_message(self, message: str) -> str: + """ + Sanitizes the message e.g. remove references to execution completed + + Args: + message: The message to be sanitized. + + Returns: + The sanitized message. + """ + to_replace = ["execution succeeded", "exitcode"] + for replace in to_replace: + message = message.replace(replace, "") + return message + + def populate_history(self, history: List[Message]) -> None: + """ + Populates the agent message history from the provided list of messages. + + Args: + history: A list of messages to populate the agents' history. + """ + for msg in history: + if isinstance(msg, dict): + msg = Message(**msg) + if msg.role == "user": + self.sender.send( + msg.content, + self.receiver, + request_reply=False, + ) + elif msg.role == "assistant": + self.receiver.send( + msg.content, + self.sender, + request_reply=False, + ) + + def sanitize_agent_spec(self, agent_spec: AgentFlowSpec) -> AgentFlowSpec: + """ + Sanitizes the agent spec by setting loading defaults + + Args: + config: The agent configuration to be sanitized. + agent_type: The type of the agent. + + Returns: + The sanitized agent configuration. + """ + + agent_spec.config.is_termination_msg = agent_spec.config.is_termination_msg or ( + lambda x: x.get("content", "") + and "TERMINATE" in x.get("content", "").rstrip() + ) + skills_prompt = "" + if agent_spec.skills: + # Get both the skills prompt and the tools available to the model + # For functions that are annotated properly we get tools, for remaining + # we get the skills_prompt + skills_prompt, tools, functions = get_prompt_and_tools_from_skills( + agent_spec.skills, self.work_dir + ) + agent_spec.config.llm_config.tools = tools + + if agent_spec.type == "userproxy": + code_execution_config = agent_spec.config.code_execution_config or {} + code_execution_config["work_dir"] = self.work_dir + agent_spec.config.code_execution_config = code_execution_config + + if agent_spec.type == "assistant": + agent_spec.config.system_message = ( + autogen.AssistantAgent.DEFAULT_SYSTEM_MESSAGE + + "\n\n" + + agent_spec.config.system_message + + "\n\n" + + skills_prompt + ) + + return agent_spec + + def load(self, agent_spec: AgentFlowSpec) -> autogen.Agent: + """ + Loads an agent based on the provided agent specification. + + Args: + agent_spec: The specification of the agent to be loaded. + + Returns: + An instance of the loaded agent. + """ + agent: autogen.Agent + agent_spec = self.sanitize_agent_spec(agent_spec) + if agent_spec.type == "assistant": + agent = autogen.AssistantAgent(**asdict(agent_spec.config)) + agent.register_reply( + [autogen.Agent, None], + reply_func=self.process_reply, + config={"callback": None}, + ) + elif agent_spec.type == "userproxy": + agent = autogen.UserProxyAgent(**asdict(agent_spec.config)) + agent.register_reply( + [autogen.Agent, None], + reply_func=self.process_reply, + config={"callback": None}, + ) + else: + raise ValueError(f"Unknown agent type: {agent_spec.type}") + return agent + + def run(self, message: str, clear_history: bool = False) -> None: + """ + Initiates a chat between the sender and receiver agents with an initial message + and an option to clear the history. + + Args: + message: The initial message to start the chat. + clear_history: If set to True, clears the chat history before initiating. + """ + self.sender.initiate_chat( + self.receiver, + message=message, + clear_history=clear_history, + ) diff --git a/samples/apps/fireworks-studio/frontend/.env.default b/samples/apps/fireworks-studio/frontend/.env.default new file mode 100644 index 000000000000..da3ebffaa289 --- /dev/null +++ b/samples/apps/fireworks-studio/frontend/.env.default @@ -0,0 +1,5 @@ + # use this for .env.development assuming your backend is running on port 8081 +GATSBY_API_URL=http://127.0.0.1:8081/api + +# use this .env.production assuming your backend is running on same port as frontend. Remember toremove these comments. +GATSBY_API_URL=/api diff --git a/samples/apps/fireworks-studio/frontend/.gitignore b/samples/apps/fireworks-studio/frontend/.gitignore new file mode 100644 index 000000000000..8a0ea868f24b --- /dev/null +++ b/samples/apps/fireworks-studio/frontend/.gitignore @@ -0,0 +1,8 @@ +node_modules/ +.cache/ +public/ + +.env.development +.env.production + +yarn.lock diff --git a/samples/apps/fireworks-studio/frontend/LICENSE b/samples/apps/fireworks-studio/frontend/LICENSE new file mode 100644 index 000000000000..16ab6489c8ed --- /dev/null +++ b/samples/apps/fireworks-studio/frontend/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2022 Victor Dibia + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/samples/apps/fireworks-studio/frontend/README.md b/samples/apps/fireworks-studio/frontend/README.md new file mode 100644 index 000000000000..7af58ee311ec --- /dev/null +++ b/samples/apps/fireworks-studio/frontend/README.md @@ -0,0 +1,30 @@ +## 🚀 Running UI in Dev Mode + +Run the UI in dev mode (make changes and see them reflected in the browser with hotreloading): + +- npm install +- npm run start + +This should start the server on port 8000. + +## Design Elements + +- **Gatsby**: The app is created in Gatsby. A guide on bootstrapping a Gatsby app can be found here - https://www.gatsbyjs.com/docs/quick-start/. + This provides an overview of the project file structure include functionality of files like `gatsby-config.js`, `gatsby-node.js`, `gatsby-browser.js` and `gatsby-ssr.js`. +- **TailwindCSS**: The app uses TailwindCSS for styling. A guide on using TailwindCSS with Gatsby can be found here - https://tailwindcss.com/docs/guides/gatsby.https://tailwindcss.com/docs/guides/gatsby . This will explain the functionality in tailwind.config.js and postcss.config.js. + +## Modifying the UI, Adding Pages + +The core of the app can be found in the `src` folder. To add pages, add a new folder in `src/pages` and add a `index.js` file. This will be the entry point for the page. For example to add a route in the app like `/about`, add a folder `about` in `src/pages` and add a `index.tsx` file. You can follow the content style in `src/pages/index.tsx` to add content to the page. + +Core logic for each component should be written in the `src/components` folder and then imported in pages as needed. + +## connecting to front end + +the front end makes request to the backend api and expects it at /api on localhost port 8081 + +## setting env variables for the UI + +- please look at env.default +- make a copy of this file and name it `env.development` +- set the values for the variables in this file diff --git a/samples/apps/fireworks-studio/frontend/gatsby-browser.js b/samples/apps/fireworks-studio/frontend/gatsby-browser.js new file mode 100644 index 000000000000..b28e798f0d41 --- /dev/null +++ b/samples/apps/fireworks-studio/frontend/gatsby-browser.js @@ -0,0 +1,6 @@ +import "antd/dist/reset.css"; +import "./src/styles/global.css"; + +import AuthProvider from "./src/hooks/provider"; + +export const wrapRootElement = AuthProvider; diff --git a/samples/apps/fireworks-studio/frontend/gatsby-config.ts b/samples/apps/fireworks-studio/frontend/gatsby-config.ts new file mode 100644 index 000000000000..047412b52470 --- /dev/null +++ b/samples/apps/fireworks-studio/frontend/gatsby-config.ts @@ -0,0 +1,52 @@ +import type { GatsbyConfig } from "gatsby"; + +require("dotenv").config({ + path: `.env.${process.env.NODE_ENV}`, +}); + +const config: GatsbyConfig = { + pathPrefix: `${process.env.PREFIX_PATH_VALUE}`, + siteMetadata: { + title: `AutoGen Studio`, + description: `Build Multi-Agent Apps`, + siteUrl: `http://tbd.place`, + }, + flags: { + LAZY_IMAGES: true, + FAST_DEV: true, + DEV_SSR: false, + }, + plugins: [ + "gatsby-plugin-sass", + "gatsby-plugin-image", + "gatsby-plugin-sitemap", + "gatsby-plugin-postcss", + { + resolve: "gatsby-plugin-manifest", + options: { + icon: "src/images/icon.png", + }, + }, + "gatsby-plugin-mdx", + "gatsby-plugin-sharp", + "gatsby-transformer-sharp", + { + resolve: "gatsby-source-filesystem", + options: { + name: "images", + path: "./src/images/", + }, + __key: "images", + }, + { + resolve: "gatsby-source-filesystem", + options: { + name: "pages", + path: "./src/pages/", + }, + __key: "pages", + }, + ], +}; + +export default config; diff --git a/samples/apps/fireworks-studio/frontend/gatsby-ssr.tsx b/samples/apps/fireworks-studio/frontend/gatsby-ssr.tsx new file mode 100644 index 000000000000..7601c31d03ca --- /dev/null +++ b/samples/apps/fireworks-studio/frontend/gatsby-ssr.tsx @@ -0,0 +1,16 @@ +import React from "react"; + +const codeToRunOnClient = `(function() { + try { + var mode = localStorage.getItem('darkmode'); + document.getElementsByTagName("html")[0].className === 'dark' ? 'dark' : 'light'; + } catch (e) {} +})();`; + +export const onRenderBody = ({ setHeadComponents }) => + setHeadComponents([ +