diff --git a/ovos_hivemind_solver/__init__.py b/ovos_hivemind_solver/__init__.py index 13f7706..1627f3c 100644 --- a/ovos_hivemind_solver/__init__.py +++ b/ovos_hivemind_solver/__init__.py @@ -1,8 +1,8 @@ -from hivemind_bus_client import HiveMessageBusClient, HiveMessage, \ - HiveMessageType +from hivemind_bus_client import HiveMessageBusClient from ovos_bus_client.message import Message from ovos_plugin_manager.templates.solvers import QuestionSolver from ovos_utils.log import LOG +from threading import Event class HiveMindSolver(QuestionSolver): @@ -13,6 +13,8 @@ def __init__(self, config=None): config = config or {} super().__init__(config) self.hm = None + self._response = Event() + self._responses = [] if self.config.get("autoconnect"): self.connect() @@ -25,15 +27,46 @@ def connect(self): """ self.hm = HiveMessageBusClient(useragent="ovos-hivemind-solver") self.hm.run_in_thread() + self.hm.on_mycroft("speak", self._receive_answer) + # NOTE: this message not yet introduced in ovos-core + self.hm.on_mycroft("ovos.utterance.handled", self._end_of_response) + + ############## + # HACK - waiting for https://github.com/OpenVoiceOS/ovos-core/pull/478 + # TODO - new bus message in ovos-core + # to unambiguosly identify end of utterance parsing + # currently these are 4 possible outcomes for any request + # does not account for OCP pipeline requests + self.hm.on_mycroft("ovos.utterance.cancelled", + self._end_of_response) + self.hm.on_mycroft("complete_intent_failure", + self._end_of_response) + self.hm.on_mycroft("mycroft.skill.handler.complete", + self._end_of_response) + self.hm.on_mycroft( "skill.converse.response", + self._end_of_response) + + def _end_of_response(self, message): + if message.type == "skill.converse.response": + if not message.data.get("result"): + return + self._response.set() + + def _receive_answer(self, message): + utt = message.data["utterance"] + self._responses.append(utt) # abstract Solver methods def get_data(self, query, context=None): return {"answer": self.get_spoken_answer(query, context)} - def get_spoken_answer(self, query, context=None): + def get_spoken_answer(self, query, context=None, timeout=10): if self.hm is None: LOG.error("not connected to HiveMind") return + self._response.clear() + self._responses = [] + context = context or {} if "session" in context: lang = context["session"]["lang"] @@ -41,14 +74,11 @@ def get_spoken_answer(self, query, context=None): lang = context.get("lang") or self.config.get("lang", "en-us") mycroft_msg = Message("recognizer_loop:utterance", {"utterances": [query], "lang": lang}) - msg = HiveMessage(HiveMessageType.BUS, mycroft_msg) - response = self.hm.wait_for_payload_response( - message=msg, - reply_type=HiveMessageType.BUS, - payload_type="speak", - timeout=20) - if response: - return response.payload.data["utterance"] + self.hm.emit_mycroft(mycroft_msg) + self._response.wait(timeout=timeout) + if self._responses: + # merge multiple speak messages into one + return "\n".join(self._responses) return None # let next solver attempt