diff --git a/README.md b/README.md index 9542674..0c64139 100644 --- a/README.md +++ b/README.md @@ -25,9 +25,26 @@ MQ: port: server: users: - : + : user: password: LLM_: - num_parallel_processes: 0> + num_parallel_processes: 0> ``` + +## Enabling Chatbot personas +An LLM may be configured to connect to a `/chatbots` vhost and participate in +discussions as described in the [chatbots project](https://github.com/NeonGeckoCom/chatbot-core). +One LLM may define multiple personas to participate as: +```yaml +llm_bots: + : + - name: Assistant + description: You are a personal assistant who responds in 40 words or less + - name: Author + description: You are an author and expert in literary history + - name: Student + description: You are a graduate student working in the field of artificial intelligence + enabled: False +``` +> `LLM Name` is defined in the property `NeonLLMMQConnector.name` diff --git a/neon_llm_core/chatbot.py b/neon_llm_core/chatbot.py new file mode 100644 index 0000000..72499d3 --- /dev/null +++ b/neon_llm_core/chatbot.py @@ -0,0 +1,163 @@ +# NEON AI (TM) SOFTWARE, Software Development Kit & Application Development System +# All trademark and other rights reserved by their respective owners +# Copyright 2008-2021 Neongecko.com Inc. +# BSD-3 +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# 1. Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# 3. Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from this +# software without specific prior written permission. +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, +# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, +# OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +from typing import List +from chatbot_core.v2 import ChatBot +from neon_mq_connector.utils.client_utils import send_mq_request +from ovos_utils.log import LOG + +from neon_llm_core.config import LLMMQConfig + + +class LLMBot(ChatBot): + + def __init__(self, *args, **kwargs): + ChatBot.__init__(self, *args, **kwargs) + self.bot_type = "submind" + self.base_llm = kwargs.get("llm_name") # chat_gpt, fastchat, etc. + self.persona = kwargs.get("persona") + self.mq_queue_config = self.get_llm_mq_config(self.base_llm) + LOG.info(f'Initialised config for llm={self.base_llm}|' + f'persona={self._bot_id}') + self.prompt_id_to_shout = dict() + + @property + def contextual_api_supported(self): + return True + + def ask_chatbot(self, user: str, shout: str, timestamp: str, + context: dict = None) -> str: + """ + Handles an incoming shout into the current conversation + :param user: user associated with shout + :param shout: text shouted by user + :param timestamp: formatted timestamp of shout + :param context: message context + """ + prompt_id = context.get('prompt_id') + if prompt_id: + self.prompt_id_to_shout[prompt_id] = shout + LOG.debug(f"Getting response to {shout}") + response = self._get_llm_api_response( + shout=shout).get("response", "I have nothing to say here...") + return response + + def ask_discusser(self, options: dict, context: dict = None) -> str: + """ + Provides one discussion response based on the given options + + :param options: proposed responses (botname: response) + :param context: message context + """ + options = {k: v for k, v in options.items() if k != self.service_name} + prompt_sentence = self.prompt_id_to_shout.get(context['prompt_id'], '') + LOG.info(f'prompt_sentence={prompt_sentence}, options={options}') + opinion = self._get_llm_api_opinion(prompt=prompt_sentence, + options=options).get('opinion', '') + return opinion + + def ask_appraiser(self, options: dict, context: dict = None) -> str: + """ + Selects one of the responses to a prompt and casts a vote in the conversation. + :param options: proposed responses (botname: response) + :param context: message context + """ + if options: + options = {k: v for k, v in options.items() + if k != self.service_name} + bots = list(options) + bot_responses = list(options.values()) + LOG.info(f'bots={bots}, answers={bot_responses}') + prompt = self.prompt_id_to_shout.pop(context['prompt_id'], '') + answer_data = self._get_llm_api_choice(prompt=prompt, + responses=bot_responses) + LOG.info(f'Received answer_data={answer_data}') + sorted_answer_indexes = answer_data.get('sorted_answer_indexes') + if sorted_answer_indexes: + return bots[sorted_answer_indexes[0]] + return "abstain" + + def _get_llm_api_response(self, shout: str) -> dict: + """ + Requests LLM API for response on provided shout + :param shout: provided should string + :returns response string from LLM API + """ + queue = self.mq_queue_config.ask_response_queue + LOG.info(f"Sending to {self.mq_queue_config.vhost}/{queue}") + try: + return send_mq_request(vhost=self.mq_queue_config.vhost, + request_data={"query": shout, + "history": [], + "persona": self.persona}, + target_queue=queue, + response_queue=f"{queue}.response") + except Exception as e: + LOG.exception(f"Failed to get response on " + f"{self.mq_queue_config.vhost}/" + f"{self.mq_queue_config.ask_response_queue}: " + f"{e}") + return dict() + + def _get_llm_api_opinion(self, prompt: str, options: dict) -> dict: + """ + Requests LLM API for opinion on provided submind responses + :param prompt: incoming prompt text + :param options: proposed responses (botname: response) + :returns response data from LLM API + """ + queue = self.mq_queue_config.ask_discusser_queue + return send_mq_request(vhost=self.mq_queue_config.vhost, + request_data={"query": prompt, + "options": options, + "persona": self.persona}, + target_queue=queue, + response_queue=f"{queue}.response") + + def _get_llm_api_choice(self, prompt: str, responses: List[str]) -> dict: + """ + Requests LLM API for choice among provided message list + :param prompt: incoming prompt text + :param responses: list of answers to select from + :returns response data from LLM API + """ + queue = self.mq_queue_config.ask_appraiser_queue + return send_mq_request(vhost=self.mq_queue_config.vhost, + request_data={"query": prompt, + "responses": responses, + "persona": self.persona}, + target_queue=queue, + response_queue=f"{queue}.response") + + @staticmethod + def get_llm_mq_config(llm_name: str) -> LLMMQConfig: + """ + Get MQ queue names that the LLM service has access to. These are + LLM-oriented, not bot/persona-oriented. + """ + return LLMMQConfig(ask_response_queue=f"{llm_name}_input", + ask_appraiser_queue=f"{llm_name}_score_input", + ask_discusser_queue=f"{llm_name}_discussion_input") diff --git a/neon_llm_core/config.py b/neon_llm_core/config.py index 2787463..a4c2e51 100644 --- a/neon_llm_core/config.py +++ b/neon_llm_core/config.py @@ -26,6 +26,7 @@ import json +from dataclasses import dataclass from os.path import join, dirname, isfile from ovos_utils.log import LOG from ovos_config.config import Configuration @@ -48,3 +49,11 @@ def load_config() -> dict: with open(default_config_path) as f: config = json.load(f) return config + + +@dataclass +class LLMMQConfig: + ask_response_queue: str + ask_appraiser_queue: str + ask_discusser_queue: str + vhost: str = '/llm' diff --git a/neon_llm_core/llm.py b/neon_llm_core/llm.py index 390569c..a4c9244 100644 --- a/neon_llm_core/llm.py +++ b/neon_llm_core/llm.py @@ -31,10 +31,21 @@ class NeonLLM(ABC): mq_to_llm_role = {} - def __init__(self, config): + def __init__(self, config: dict): + """ + @param config: Dict LLM configuration for this specific LLM + """ + self._llm_config = config self._tokenizer = None self._model = None + @property + def llm_config(self): + """ + Get the configuration for this LLM instance + """ + return self._llm_config + @property @abstractmethod def tokenizer(self): @@ -80,9 +91,11 @@ def get_sorted_answer_indexes(self, question: str, answers: List[str], persona: @abstractmethod def _call_model(self, prompt: str) -> str: """ - Wrapper for Model generation logic - :param prompt: Input text sequence - :returns: Output text sequence generated by model + Wrapper for Model generation logic. This method may be called + asynchronously, so it is up to the extending class to use locks or + queue inputs as necessary. + :param prompt: Input text sequence + :returns: Output text sequence generated by model """ pass @@ -106,5 +119,6 @@ def convert_role(cls, role: str) -> str: """ Maps MQ role to LLM's internal domain """ matching_llm_role = cls.mq_to_llm_role.get(role) if not matching_llm_role: - raise ValueError(f"role={role} is undefined, supported are: {list(cls.mq_to_llm_role)}") - return matching_llm_role \ No newline at end of file + raise ValueError(f"role={role} is undefined, supported are: " + f"{list(cls.mq_to_llm_role)}") + return matching_llm_role diff --git a/neon_llm_core/rmq.py b/neon_llm_core/rmq.py index 6c76ffb..c042542 100644 --- a/neon_llm_core/rmq.py +++ b/neon_llm_core/rmq.py @@ -23,7 +23,9 @@ # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + from abc import abstractmethod, ABC +from threading import Thread from neon_mq_connector.connector import MQConnector from neon_mq_connector.utils.rabbit_utils import create_mq_callback @@ -37,22 +39,39 @@ class NeonLLMMQConnector(MQConnector, ABC): """ Module for processing MQ requests to Fast Chat LLM """ - - opinion_prompt = "" - def __init__(self): self.service_name = f'neon_llm_{self.name}' self.ovos_config = load_config() - mq_config = self.ovos_config.get("MQ", None) + mq_config = self.ovos_config.get("MQ", dict()) super().__init__(config=mq_config, service_name=self.service_name) self.vhost = "/llm" self.register_consumers() self._model = None + self._bots = list() + + if self.ovos_config.get("llm_bots", {}).get(self.name): + from neon_llm_core.chatbot import LLMBot + LOG.info(f"Chatbot(s) configured for: {self.name}") + for persona in self.ovos_config['llm_bots'][self.name]: + # Spawn a service for each persona to support @user requests + if not persona.get('enabled', True): + LOG.warning(f"Persona disabled: {persona['name']}") + continue + # Get a configured username to use for LLM submind connections + if mq_config.get("users", {}).get("neon_llm_submind"): + self.ovos_config["MQ"]["users"][persona['name']] = \ + mq_config['users']['neon_llm_submind'] + bot = LLMBot(llm_name=self.name, service_name=persona['name'], + persona=persona, config=self.ovos_config, + vhost="/chatbots") + bot.run() + LOG.info(f"Started chatbot: {bot.service_name}") + self._bots.append(bot) def register_consumers(self): - for idx in range(self.model_config["num_parallel_processes"]): + for idx in range(self.model_config.get("num_parallel_processes", 1)): self.register_consumer(name=f"neon_llm_{self.name}_ask_{idx}", vhost=self.vhost, queue=self.queue_ask, @@ -76,7 +95,10 @@ def name(self): @property def model_config(self): - return self.ovos_config.get(f"LLM_{self.name.upper()}", None) + if f"LLM_{self.name.upper()}" not in self.ovos_config: + LOG.warning(f"No config for {self.name} found in " + f"{list(self.ovos_config.keys())}") + return self.ovos_config.get(f"LLM_{self.name.upper()}", dict()) @property def queue_ask(self): @@ -98,25 +120,34 @@ def model(self) -> NeonLLM: @create_mq_callback() def handle_request(self, body: dict): """ - Handles ask requests from MQ to LLM - :param body: request body (dict) + Handles ask requests from MQ to LLM + :param body: request body (dict) """ - message_id = body["message_id"] - routing_key = body["routing_key"] + # Handle this asynchronously so multiple subminds can be handled + # concurrently + Thread(target=self._handle_request_async, args=(body,), + daemon=True).start() - query = body["query"] - history = body["history"] - persona = body.get("persona",{}) + def _handle_request_async(self, request: dict): + message_id = request["message_id"] + routing_key = request["routing_key"] + + query = request["query"] + history = request["history"] + persona = request.get("persona", {}) try: - response = self.model.ask(message=query, chat_history=history, persona=persona) + response = self.model.ask(message=query, chat_history=history, + persona=persona) except ValueError as err: LOG.error(f'ValueError={err}') - response = 'Sorry, but I cannot respond to your message at the moment, please try again later' + response = ('Sorry, but I cannot respond to your message at the ' + 'moment, please try again later') api_response = { "message_id": message_id, "response": response } + LOG.info(f"Sending response: {response}") self.send_message(request_data=api_response, queue=routing_key) LOG.info(f"Handled ask request for message_id={message_id}") @@ -132,13 +163,14 @@ def handle_score_request(self, body: dict): query = body["query"] responses = body["responses"] - persona = body.get("persona",{}) + persona = body.get("persona", {}) if not responses: sorted_answer_indexes = [] else: try: - sorted_answer_indexes = self.model.get_sorted_answer_indexes(question=query, answers=responses, persona=persona) + sorted_answer_indexes = self.model.get_sorted_answer_indexes( + question=query, answers=responses, persona=persona) except ValueError as err: LOG.error(f'ValueError={err}') sorted_answer_indexes = [] @@ -161,22 +193,24 @@ def handle_opinion_request(self, body: dict): query = body["query"] options = body["options"] - persona = body.get("persona",{}) + persona = body.get("persona", {}) responses = list(options.values()) if not responses: opinion = "Sorry, but I got no options to choose from." else: try: - sorted_answer_indexes = self.model.get_sorted_answer_indexes(question=query, answers=responses, persona=persona) - best_respondent_nick, best_response = list(options.items())[sorted_answer_indexes[0]] - opinion = self._ask_model_for_opinion(respondent_nick=best_respondent_nick, - question=query, - answer=best_response, - persona=persona) + sorted_answer_indexes = self.model.get_sorted_answer_indexes( + question=query, answers=responses, persona=persona) + best_respondent_nick, best_response = list(options.items())[ + sorted_answer_indexes[0]] + opinion = self._ask_model_for_opinion( + respondent_nick=best_respondent_nick, + question=query, answer=best_response, persona=persona) except ValueError as err: LOG.error(f'ValueError={err}') - opinion = "Sorry, but I experienced an issue trying to make up an opinion on this topic" + opinion = ("Sorry, but I experienced an issue trying to form " + "an opinion on this topic") api_response = { "message_id": message_id, @@ -187,15 +221,24 @@ def handle_opinion_request(self, body: dict): queue=routing_key) LOG.info(f"Handled ask request for message_id={message_id}") - def _ask_model_for_opinion(self, respondent_nick: str, question: str, answer: str, persona: dict) -> str: + def _ask_model_for_opinion(self, respondent_nick: str, question: str, + answer: str, persona: dict) -> str: prompt = self.compose_opinion_prompt(respondent_nick=respondent_nick, question=question, answer=answer) - opinion = self.model.ask(message=prompt, chat_history=[], persona=persona) + opinion = self.model.ask(message=prompt, chat_history=[], + persona=persona) LOG.info(f'Received LLM opinion={opinion}, prompt={prompt}') return opinion @staticmethod @abstractmethod - def compose_opinion_prompt(respondent_nick: str, question: str, answer: str) -> str: + def compose_opinion_prompt(respondent_nick: str, question: str, + answer: str) -> str: + """ + Format a response into a prompt to evaluate another submind's response + @param respondent_nick: Name of submind providing a response + @param question: Prompt being responded to + @param answer: respondent's response to the question + """ pass diff --git a/requirements/chatbots.txt b/requirements/chatbots.txt new file mode 100644 index 0000000..72f84a3 --- /dev/null +++ b/requirements/chatbots.txt @@ -0,0 +1 @@ +neon-chatbot-core \ No newline at end of file diff --git a/setup.py b/setup.py index 47b3c05..5a36578 100644 --- a/setup.py +++ b/setup.py @@ -78,6 +78,7 @@ def get_requirements(requirements_filename: str): license='BSD-3.0', packages=setuptools.find_packages(), install_requires=get_requirements("requirements.txt"), + extras_require={"chatbots": get_requirements("chatbots.txt")}, zip_safe=True, classifiers=[ 'Intended Audience :: Developers',