diff --git a/command_line_assistant/commands/history.py b/command_line_assistant/commands/history.py index fef2fd7..60622fe 100644 --- a/command_line_assistant/commands/history.py +++ b/command_line_assistant/commands/history.py @@ -1,8 +1,9 @@ import logging from argparse import Namespace -from pathlib import Path -from command_line_assistant.history import handle_history_write +from dasbus.error import DBusError + +from command_line_assistant.dbus.constants import HISTORY_IDENTIFIER from command_line_assistant.utils.cli import BaseCLICommand, SubParsersAction logger = logging.getLogger(__name__) @@ -14,10 +15,15 @@ def __init__(self, clear: bool) -> None: super().__init__() def run(self) -> None: + proxy = HISTORY_IDENTIFIER.get_proxy() + if self._clear: - logger.info("Clearing history of conversation") - # TODO(r0x0d): Rewrite this. - handle_history_write(Path("/tmp/test_history.json"), [], "") + try: + logger.info("Cleaning the history.") + proxy.ClearHistory() + except DBusError as e: + logger.info("Failed to clean the history: %s", e) + raise e def register_subcommand(parser: SubParsersAction): diff --git a/command_line_assistant/commands/query.py b/command_line_assistant/commands/query.py index d03bbf5..0a708cd 100644 --- a/command_line_assistant/commands/query.py +++ b/command_line_assistant/commands/query.py @@ -2,8 +2,8 @@ from dasbus.error import DBusError -from command_line_assistant.dbus.constants import SERVICE_IDENTIFIER -from command_line_assistant.dbus.definitions import MessageInput, MessageOutput +from command_line_assistant.dbus.constants import QUERY_IDENTIFIER +from command_line_assistant.dbus.structures import Message from command_line_assistant.rendering.decorators.colors import ColorDecorator from command_line_assistant.rendering.decorators.text import ( EmojiDecorator, @@ -68,16 +68,15 @@ def __init__(self, query_string: str) -> None: super().__init__() def run(self) -> None: - proxy = SERVICE_IDENTIFIER.get_proxy() - - input_query = MessageInput() + proxy = QUERY_IDENTIFIER.get_proxy() + input_query = Message() input_query.message = self._query output = "Nothing to see here..." try: with self._spinner_renderer: - proxy.ProcessQuery(MessageInput.to_structure(input_query)) - output = MessageOutput.from_structure(proxy.RetrieveAnswer).message + proxy.ProcessQuery(input_query.to_structure(input_query)) + output = Message.from_structure(proxy.RetrieveAnswer).message self._legal_renderer.render(LEGAL_NOTICE) self._text_renderer.render(output) diff --git a/command_line_assistant/daemon/http/query.py b/command_line_assistant/daemon/http/query.py index 13b2899..3776466 100644 --- a/command_line_assistant/daemon/http/query.py +++ b/command_line_assistant/daemon/http/query.py @@ -19,7 +19,7 @@ def submit(query: str, config: Config) -> str: query = handle_caret(query, config) # NOTE: Add more query handling here - query_endpoint = f"{config.backend.endpoint}/infer" + query_endpoint = f"{config.backend.endpoint}/v1/query" payload = {"query": query} try: @@ -33,7 +33,7 @@ def submit(query: str, config: Config) -> str: response.raise_for_status() data = response.json() - return data.get("answer", "") + return data.get("response", "") except RequestException as e: logger.error("Failed to get response from AI: %s", e) raise diff --git a/command_line_assistant/dbus/constants.py b/command_line_assistant/dbus/constants.py index 62b2bf0..c28fdc9 100644 --- a/command_line_assistant/dbus/constants.py +++ b/command_line_assistant/dbus/constants.py @@ -8,7 +8,14 @@ SERVICE_NAMESPACE = ("com", "redhat", "lightspeed") +QUERY_NAMESAPCE = (*SERVICE_NAMESPACE, "query") +HISTORY_NAMESPACE = (*SERVICE_NAMESPACE, "history") + # Define the service identifier for a query -SERVICE_IDENTIFIER = DBusServiceIdentifier( - namespace=SERVICE_NAMESPACE, message_bus=SESSION_BUS +QUERY_IDENTIFIER = DBusServiceIdentifier( + namespace=QUERY_NAMESAPCE, message_bus=SESSION_BUS +) +# Define the service identifier for a history +HISTORY_IDENTIFIER = DBusServiceIdentifier( + namespace=HISTORY_NAMESPACE, message_bus=SESSION_BUS ) diff --git a/command_line_assistant/dbus/context.py b/command_line_assistant/dbus/context.py new file mode 100644 index 0000000..1dbfc9b --- /dev/null +++ b/command_line_assistant/dbus/context.py @@ -0,0 +1,46 @@ +from typing import Optional + +from dasbus.signal import Signal + +from command_line_assistant.config import Config +from command_line_assistant.dbus.structures import Message + + +class BaseContext: + def __init__(self, config: Config) -> None: + self._config = config + + @property + def config(self) -> Config: + """Return the configuration from this context.""" + return self._config + + +class QueryContext(BaseContext): + """This is the process context that will handle anything query related""" + + def __init__(self, config: Config) -> None: + self._input_query: Optional[Message] = None + self._query_changed = Signal() + super().__init__(config) + + @property + def query(self) -> Optional[Message]: + """Make it accessible publicly""" + return self._input_query + + @property + def query_changed(self) -> Signal: + return self._query_changed + + def process_query(self, input_query: Message) -> None: + """Emit the signal that the query has changed""" + self._input_query = input_query + self._query_changed.emit() + + +class HistoryContext(BaseContext): + """This is the process context that will handle anything query related""" + + def __init__(self, config: Config) -> None: + super().__init__(config) diff --git a/command_line_assistant/dbus/definitions.py b/command_line_assistant/dbus/definitions.py deleted file mode 100644 index b13209e..0000000 --- a/command_line_assistant/dbus/definitions.py +++ /dev/null @@ -1,97 +0,0 @@ -from typing import Optional - -from dasbus.server.interface import dbus_interface -from dasbus.server.property import emits_properties_changed -from dasbus.server.template import InterfaceTemplate -from dasbus.signal import Signal -from dasbus.structure import DBusData -from dasbus.typing import Str, Structure - -from command_line_assistant.config import Config -from command_line_assistant.daemon.http.query import submit -from command_line_assistant.dbus.constants import SERVICE_IDENTIFIER - - -class MessageInput(DBusData): - """The message input from received from the client""" - - def __init__(self) -> None: - self._message: Str = "" - super().__init__() - - @property - def message(self) -> Str: - return self._message - - @message.setter - def message(self, value: Str) -> None: - self._message = value - - -class MessageOutput(DBusData): - """The message output that will be sent to the client""" - - def __init__(self) -> None: - self._message: Str = "" - super().__init__() - - @property - def message(self) -> Str: - return self._message - - @message.setter - def message(self, value: Str) -> None: - self._message = value - - -@dbus_interface(SERVICE_IDENTIFIER.interface_name) -class QueryInterface(InterfaceTemplate): - """The DBus interface of a query.""" - - def connect_signals(self) -> None: - """Connect the signals.""" - # Watch for property changes based on the query_changed method. - self.watch_property("RetrieveAnswer", self.implementation.query_changed) - - @property - def RetrieveAnswer(self) -> Structure: - """This method is mainly called by the client to retrieve it's answer.""" - output = MessageOutput() - llm_response = submit( - self.implementation.query.message, self.implementation.config - ) - output.message = llm_response - return output.to_structure(output) - - @emits_properties_changed - def ProcessQuery(self, query: Structure) -> None: - """Process the given query.""" - self.implementation.process_query(MessageInput.from_structure(query)) - - -class ProcessContext: - """This is the process context that will handle anything query related""" - - def __init__(self, config: Config) -> None: - self._config = config - self._input_query: Optional[MessageInput] = None - self._query_changed = Signal() - - @property - def config(self) -> Config: - """Return the configuration from this context.""" - return self._config - - @property - def query(self) -> Optional[MessageInput]: - """Make it accessible publicly""" - return self._input_query - - @property - def query_changed(self) -> Signal: - return self._query_changed - - def process_query(self, input_query: MessageInput) -> None: - """Emit the signal that the query has changed""" - self._input_query = input_query - self._query_changed.emit() diff --git a/command_line_assistant/dbus/interfaces.py b/command_line_assistant/dbus/interfaces.py new file mode 100644 index 0000000..e11b2a3 --- /dev/null +++ b/command_line_assistant/dbus/interfaces.py @@ -0,0 +1,50 @@ +from dasbus.server.interface import dbus_interface +from dasbus.server.property import emits_properties_changed +from dasbus.server.template import InterfaceTemplate +from dasbus.typing import Structure + +from command_line_assistant.daemon.http.query import submit +from command_line_assistant.dbus.constants import HISTORY_IDENTIFIER, QUERY_IDENTIFIER +from command_line_assistant.dbus.structures import ( + HistoryEntry, + Message, +) +from command_line_assistant.history import handle_history_read, handle_history_write + + +@dbus_interface(QUERY_IDENTIFIER.interface_name) +class QueryInterface(InterfaceTemplate): + """The DBus interface of a query.""" + + def connect_signals(self) -> None: + """Connect the signals.""" + # Watch for property changes based on the query_changed method. + self.watch_property("RetrieveAnswer", self.implementation.query_changed) + + @property + def RetrieveAnswer(self) -> Structure: + """This method is mainly called by the client to retrieve it's answer.""" + output = Message() + llm_response = submit( + self.implementation.query.message, self.implementation.config + ) + print("llm_response", llm_response) + output.message = llm_response + return Message.to_structure(output) + + @emits_properties_changed + def ProcessQuery(self, query: Structure) -> None: + """Process the given query.""" + self.implementation.process_query(Message.from_structure(query)) + + +@dbus_interface(HISTORY_IDENTIFIER.interface_name) +class HistoryInterface(InterfaceTemplate): + @property + def GetHistory(self) -> Structure: + history = HistoryEntry() + history.entries = handle_history_read(self.implementation.config) + return history.to_structure(history) + + def ClearHistory(self) -> None: + handle_history_write(self.implementation.config.history.file, [], "") diff --git a/command_line_assistant/dbus/server.py b/command_line_assistant/dbus/server.py index 3afd1eb..7a6ead8 100644 --- a/command_line_assistant/dbus/server.py +++ b/command_line_assistant/dbus/server.py @@ -4,8 +4,13 @@ from dasbus.loop import EventLoop from command_line_assistant.config import Config -from command_line_assistant.dbus.constants import SERVICE_IDENTIFIER, SESSION_BUS -from command_line_assistant.dbus.definitions import ProcessContext, QueryInterface +from command_line_assistant.dbus.constants import ( + HISTORY_IDENTIFIER, + QUERY_IDENTIFIER, + SESSION_BUS, +) +from command_line_assistant.dbus.context import HistoryContext, QueryContext +from command_line_assistant.dbus.interfaces import HistoryInterface, QueryInterface logger = logging.getLogger(__name__) @@ -14,16 +19,21 @@ def serve(config: Config): """Start the daemon.""" logger.info("Starting clad!") try: - process_context = ProcessContext(config=config) SESSION_BUS.publish_object( - SERVICE_IDENTIFIER.object_path, QueryInterface(process_context) + QUERY_IDENTIFIER.object_path, QueryInterface(QueryContext(config)) + ) + SESSION_BUS.publish_object( + HISTORY_IDENTIFIER.object_path, HistoryInterface(HistoryContext(config)) ) # The flag DBUS_NAME_FLAG_REPLACE_EXISTING is needed during development # so ew can replace the existing bus. # TODO(r0x0d): See what to do with it later. SESSION_BUS.register_service( - SERVICE_IDENTIFIER.service_name, flags=DBUS_NAME_FLAG_REPLACE_EXISTING + QUERY_IDENTIFIER.service_name, flags=DBUS_NAME_FLAG_REPLACE_EXISTING + ) + SESSION_BUS.register_service( + HISTORY_IDENTIFIER.service_name, flags=DBUS_NAME_FLAG_REPLACE_EXISTING ) loop = EventLoop() loop.run() diff --git a/command_line_assistant/dbus/structures.py b/command_line_assistant/dbus/structures.py new file mode 100644 index 0000000..ec16f27 --- /dev/null +++ b/command_line_assistant/dbus/structures.py @@ -0,0 +1,32 @@ +from dasbus.structure import DBusData +from dasbus.typing import Str + + +class Message(DBusData): + """Base class for message input and output""" + + def __init__(self) -> None: + self._message: Str = "" + super().__init__() + + @property + def message(self) -> Str: + return self._message + + @message.setter + def message(self, value: Str) -> None: + self._message = value + + +class HistoryEntry(DBusData): + def __init__(self) -> None: + self._entries: list[str] = [] + super().__init__() + + @property + def entries(self) -> list[str]: + return self._entries + + @entries.setter + def entries(self, value: list[str]) -> None: + self._entries = value diff --git a/data/development/config/command_line_assistant/config.toml b/data/development/config/command_line_assistant/config.toml index d26f06a..cac8767 100644 --- a/data/development/config/command_line_assistant/config.toml +++ b/data/development/config/command_line_assistant/config.toml @@ -13,7 +13,7 @@ file = "~/.local/share/command-line-assistant/command-line-assistant_history.jso max_size = 100 [backend] -endpoint = "https://rlsrag-rhel-lightspeed--runtime-int.apps.int.spoke.preprod.us-east-1.aws.paas.redhat.com" +endpoint = "https://rcs-rhel-lightspeed--runtime-int.apps.int.spoke.preprod.us-east-1.aws.paas.redhat.com" [backend.auth] cert_file = "data/development/certificate/fake-certificate.pem" diff --git a/tests/commands/test_history.py b/tests/commands/test_history.py index 795a0f5..9489a78 100644 --- a/tests/commands/test_history.py +++ b/tests/commands/test_history.py @@ -1,63 +1,81 @@ -from argparse import ArgumentParser, Namespace -from pathlib import Path -from unittest.mock import patch +from unittest.mock import MagicMock, patch import pytest -from command_line_assistant.commands import history from command_line_assistant.commands.history import ( HistoryCommand, _command_factory, + register_subcommand, ) +from command_line_assistant.dbus.constants import HISTORY_IDENTIFIER @pytest.fixture -def history_command(): - """Fixture to create a HistoryCommand instance""" - return HistoryCommand(clear=True) +def mock_get_proxy(mock_proxy): + """Mock the get_proxy method.""" + with patch.object(HISTORY_IDENTIFIER, "get_proxy", return_value=mock_proxy): + yield mock_proxy -@pytest.fixture -def history_command_no_clear(): - """Fixture to create a HistoryCommand instance""" - return HistoryCommand(clear=False) +def test_history_command_clear_success(mock_get_proxy): + """Test successful history clear operation.""" + cmd = HistoryCommand(clear=True) + cmd.run() + + mock_get_proxy.ClearHistory.assert_called_once() + + +def test_history_command_clear_failure(mock_get_proxy, caplog): + """Test failed history clear operation.""" + from dasbus.error import DBusError + + # Configure mock to raise DBusError + mock_get_proxy.ClearHistory.side_effect = DBusError("Failed to clear history") + cmd = HistoryCommand(clear=True) -class TestHistoryCommand: - def test_init(self, history_command): - """Test HistoryCommand initialization""" - assert history_command._clear is True + with pytest.raises(DBusError): + cmd.run() - @patch("command_line_assistant.commands.history.handle_history_write") - def test_run_with_clear(self, mock_history_write, history_command): - """Test run() method when clear=True""" - history_command.run() - mock_history_write.assert_called_once_with( - Path("/tmp/test_history.json"), [], "" - ) + assert "Failed to clean the history" in caplog.text - @patch("command_line_assistant.commands.history.handle_history_write") - def test_run_without_clear(self, mock_history_write, history_command_no_clear): - """Test run() method when clear=False""" - history_command_no_clear.run() - mock_history_write.assert_not_called() + +def test_history_command_no_clear(mock_get_proxy): + """Test history command without clear flag.""" + cmd = HistoryCommand(clear=False) + cmd.run() + + mock_get_proxy.ClearHistory.assert_not_called() def test_register_subcommand(): - """Test register_subcommand function""" - parser = ArgumentParser() - sub_parser = parser.add_subparsers() + """Test registration of history subcommand.""" + parser = MagicMock() + subparser = MagicMock() + parser.add_parser.return_value = subparser + + register_subcommand(parser) + + # Verify parser configuration + parser.add_parser.assert_called_once_with( + "history", help="Manage conversation history" + ) - history.register_subcommand(sub_parser) + # Verify arguments added to subparser + subparser.add_argument.assert_called_once_with( + "--clear", action="store_true", help="Clear the history." + ) - parser.parse_args(["history", "--clear"]) + # Verify defaults set + assert hasattr(subparser, "set_defaults") def test_command_factory(): - """Test _command_factory function""" + """Test the command factory creates correct command instance.""" + from argparse import Namespace args = Namespace(clear=True) - command = _command_factory(args) + cmd = _command_factory(args) - assert isinstance(command, HistoryCommand) - assert command._clear is True + assert isinstance(cmd, HistoryCommand) + assert cmd._clear is True diff --git a/tests/commands/test_query.py b/tests/commands/test_query.py index 6eb3024..c280175 100644 --- a/tests/commands/test_query.py +++ b/tests/commands/test_query.py @@ -1,27 +1,26 @@ from argparse import ArgumentParser, Namespace -from unittest.mock import MagicMock, patch +from unittest.mock import patch import pytest from command_line_assistant.commands.query import QueryCommand, register_subcommand -from command_line_assistant.dbus.definitions import MessageInput, MessageOutput +from command_line_assistant.dbus.structures import Message # Mock the entire DBus service/constants module @pytest.fixture(autouse=True) -def mock_dbus_service(): +def mock_dbus_service(mock_proxy): """Fixture to mock DBus service and automatically use it for all tests""" with patch( - "command_line_assistant.commands.query.SERVICE_IDENTIFIER" + "command_line_assistant.commands.query.QUERY_IDENTIFIER" ) as mock_service: # Create a mock proxy that will be returned by get_proxy() - mock_proxy = MagicMock() mock_service.get_proxy.return_value = mock_proxy # Setup default mock response - mock_output = MessageOutput() + mock_output = Message() mock_output.message = "default mock response" - mock_proxy.RetrieveAnswer = MessageOutput.to_structure(mock_output) + mock_proxy.RetrieveAnswer = Message.to_structure(mock_output) yield mock_proxy @@ -46,19 +45,19 @@ def test_query_command_initialization(): def test_query_command_run(mock_dbus_service, test_input, expected_output, capsys): """Test QueryCommand run method with different inputs""" # Setup mock response for this specific test - mock_output = MessageOutput() + mock_output = Message() mock_output.message = expected_output - mock_dbus_service.RetrieveAnswer = MessageOutput.to_structure(mock_output) + mock_dbus_service.RetrieveAnswer = Message.to_structure(mock_output) # Create and run command command = QueryCommand(test_input) command.run() # Verify ProcessQuery was called with correct input - expected_input = MessageInput() + expected_input = Message() expected_input.message = test_input mock_dbus_service.ProcessQuery.assert_called_once_with( - MessageInput.to_structure(expected_input) + Message.to_structure(expected_input) ) # Verify output was printed @@ -69,9 +68,9 @@ def test_query_command_run(mock_dbus_service, test_input, expected_output, capsy def test_query_command_empty_response(mock_dbus_service, capsys): """Test QueryCommand handling empty response""" # Setup empty response - mock_output = MessageOutput() + mock_output = Message() mock_output.message = "" - mock_dbus_service.RetrieveAnswer = MessageOutput.to_structure(mock_output) + mock_dbus_service.RetrieveAnswer = Message.to_structure(mock_output) command = QueryCommand("test query") command.run() @@ -142,17 +141,17 @@ def test_query_with_special_characters(mock_dbus_service, capsys): special_query = "test!@#$%^&*()_+ query" expected_response = "response with special chars !@#$" - mock_output = MessageOutput() + mock_output = Message() mock_output.message = expected_response - mock_dbus_service.RetrieveAnswer = MessageOutput.to_structure(mock_output) + mock_dbus_service.RetrieveAnswer = Message.to_structure(mock_output) command = QueryCommand(special_query) command.run() - expected_input = MessageInput() + expected_input = Message() expected_input.message = special_query mock_dbus_service.ProcessQuery.assert_called_once_with( - MessageInput.to_structure(expected_input) + Message.to_structure(expected_input) ) captured = capsys.readouterr() diff --git a/tests/conftest.py b/tests/conftest.py index 9d536b7..2d99afb 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,6 @@ import logging from pathlib import Path +from unittest.mock import MagicMock import pytest @@ -51,3 +52,16 @@ def mock_config(tmp_path): ), logging=LoggingSchema(level="debug"), ) + + +@pytest.fixture +def mock_dbus_session(): + """Create a mock DBus session.""" + return MagicMock() + + +@pytest.fixture +def mock_proxy(): + """Create a mock proxy for testing.""" + proxy = MagicMock() + return proxy diff --git a/tests/daemon/http/test_query.py b/tests/daemon/http/test_query.py index 11a7361..c7e5cfd 100644 --- a/tests/daemon/http/test_query.py +++ b/tests/daemon/http/test_query.py @@ -10,9 +10,9 @@ @responses.activate def test_handle_query(): responses.post( - url="http://localhost/infer", + url="http://localhost/v1/query", json={ - "answer": "test", + "response": "test", }, ) @@ -30,12 +30,12 @@ def test_handle_query(): @responses.activate def test_handle_query_raising_status(): responses.post( - url="http://localhost/infer", + url="http://localhost/v1/query", status=404, ) config = Config( backend=BackendSchema( - endpoint="http://localhost/infer", auth=AuthSchema(verify_ssl=False) + endpoint="http://localhost/v1/query", auth=AuthSchema(verify_ssl=False) ) ) with pytest.raises(requests.exceptions.RequestException): @@ -44,7 +44,7 @@ def test_handle_query_raising_status(): @responses.activate def test_disable_ssl_verification(caplog): - responses.post(url="https://localhost/infer", json={"answer": "yeah, test!"}) + responses.post(url="https://localhost/v1/query", json={"response": "yeah, test!"}) config = Config( backend=BackendSchema( diff --git a/tests/daemon/test_server.py b/tests/daemon/test_server.py deleted file mode 100644 index f040068..0000000 --- a/tests/daemon/test_server.py +++ /dev/null @@ -1,16 +0,0 @@ -from unittest import mock - -from command_line_assistant.config import Config -from command_line_assistant.dbus import server - - -def test_serve(monkeypatch): - event_loop_mock = mock.Mock() - session_bus_mock = mock.Mock() - monkeypatch.setattr(server, "EventLoop", event_loop_mock) - monkeypatch.setattr(server, "SESSION_BUS", session_bus_mock) - config = Config() - - server.serve(config) - - assert event_loop_mock.call_count == 1 diff --git a/tests/dbus/test_context.py b/tests/dbus/test_context.py new file mode 100644 index 0000000..c5ef8ca --- /dev/null +++ b/tests/dbus/test_context.py @@ -0,0 +1,60 @@ +import pytest + +from command_line_assistant.config import Config +from command_line_assistant.dbus.context import ( + BaseContext, + HistoryContext, + QueryContext, +) +from command_line_assistant.dbus.structures import Message + + +@pytest.fixture +def config(): + return Config() + + +@pytest.fixture +def base_context(config): + return BaseContext(config) + + +@pytest.fixture +def query_context(config): + return QueryContext(config) + + +@pytest.fixture +def history_context(config): + return HistoryContext(config) + + +def test_base_context_config_property(config): + context = BaseContext(config) + assert context.config == config + + +def test_query_context_initial_state(query_context): + assert query_context.query is None + assert query_context.query_changed is not None + + +def test_query_context_process_query(query_context): + message_obj = Message() + message_obj.message = "test query" + signal_emitted = False + + def on_signal(): + nonlocal signal_emitted + signal_emitted = True + + query_context.query_changed.connect(on_signal) + query_context.process_query(message_obj) + + assert query_context.query == message_obj + assert signal_emitted is True + + +def test_history_context_inheritance(history_context, config): + assert isinstance(history_context, BaseContext) + assert history_context.config == config diff --git a/tests/dbus/test_interfaces.py b/tests/dbus/test_interfaces.py new file mode 100644 index 0000000..86805cd --- /dev/null +++ b/tests/dbus/test_interfaces.py @@ -0,0 +1,124 @@ +from unittest.mock import Mock, patch + +import pytest + +from command_line_assistant.config import Config +from command_line_assistant.dbus.interfaces import ( + HistoryInterface, + Message, + QueryInterface, +) +from command_line_assistant.dbus.structures import HistoryEntry + + +@pytest.fixture +def mock_implementation(): + impl = Mock() + impl.config = Mock(spec=Config) + impl.config.history = Mock() + impl.query = Mock() + return impl + + +@pytest.fixture +def query_interface(mock_implementation): + interface = QueryInterface(mock_implementation) + interface.watch_property = Mock() + return interface + + +class TestQueryInterface: + def test_connect_signals(self, query_interface, mock_implementation): + query_interface.connect_signals() + query_interface.watch_property.assert_called_once_with( + "RetrieveAnswer", mock_implementation.query_changed + ) + + @patch("command_line_assistant.dbus.interfaces.submit") + def test_retrieve_answer_success( + self, mock_submit, query_interface, mock_implementation + ): + mock_submit.return_value = "test response" + mock_implementation.query.message = "test query" + + result = query_interface.RetrieveAnswer + + mock_submit.assert_called_once_with("test query", mock_implementation.config) + message = Message.from_structure(result) + assert message.message == "test response" + + @patch("command_line_assistant.dbus.interfaces.submit") + def test_retrieve_answer_empty_query( + self, mock_submit, query_interface, mock_implementation + ): + mock_submit.return_value = "" + mock_implementation.query.message = "" + + result = query_interface.RetrieveAnswer + + mock_submit.assert_called_once_with("", mock_implementation.config) + message = Message.from_structure(result) + assert message.message == "" + + def test_process_query(self, query_interface, mock_implementation): + test_query = Message() + test_query.message = "test message" + query_structure = Message.to_structure(test_query) + + query_interface.ProcessQuery(query_structure) + + mock_implementation.process_query.assert_called_once() + processed_message = mock_implementation.process_query.call_args[0][0] + assert isinstance(processed_message, Message) + assert processed_message.message == "test message" + + +class TestHistoryInterface: + @pytest.fixture + def history_interface(self, mock_implementation): + return HistoryInterface(mock_implementation) + + @patch("command_line_assistant.dbus.interfaces.handle_history_read") + def test_get_history_success( + self, mock_history_read, history_interface, mock_implementation + ): + test_entries = ["entry1", "entry2"] + mock_history_read.return_value = test_entries + + result = history_interface.GetHistory + + mock_history_read.assert_called_once_with(mock_implementation.config) + history = HistoryEntry.from_structure(result) + assert history.entries == test_entries + + @patch("command_line_assistant.dbus.interfaces.handle_history_read") + def test_get_history_empty( + self, mock_history_read, history_interface, mock_implementation + ): + mock_history_read.return_value = [] + + result = history_interface.GetHistory + + mock_history_read.assert_called_once_with(mock_implementation.config) + history = HistoryEntry.from_structure(result) + assert history.entries == [] + + @patch("command_line_assistant.dbus.interfaces.handle_history_write") + def test_clear_history( + self, mock_history_write, history_interface, mock_implementation + ): + history_interface.ClearHistory() + + mock_history_write.assert_called_once_with( + mock_implementation.config.history.file, [], "" + ) + + @patch("command_line_assistant.dbus.interfaces.handle_history_write") + def test_clear_history_with_nonexistent_file( + self, mock_history_write, history_interface, mock_implementation + ): + mock_implementation.config.history.file = "/nonexistent/path" + + history_interface.ClearHistory() + + mock_history_write.assert_called_once_with("/nonexistent/path", [], "") diff --git a/tests/dbus/test_server.py b/tests/dbus/test_server.py new file mode 100644 index 0000000..5b5e96c --- /dev/null +++ b/tests/dbus/test_server.py @@ -0,0 +1,70 @@ +from unittest import mock + +from dasbus.constants import DBUS_NAME_FLAG_REPLACE_EXISTING + +from command_line_assistant.config import Config +from command_line_assistant.dbus import server + + +def test_serve(monkeypatch): + event_loop_mock = mock.Mock() + session_bus_mock = mock.Mock() + monkeypatch.setattr(server, "EventLoop", event_loop_mock) + monkeypatch.setattr(server, "SESSION_BUS", session_bus_mock) + config = Config() + + server.serve(config) + + assert event_loop_mock.call_count == 1 + + +def test_serve_registers_services(monkeypatch): + event_loop_mock = mock.Mock() + session_bus_mock = mock.Mock() + monkeypatch.setattr(server, "EventLoop", event_loop_mock) + monkeypatch.setattr(server, "SESSION_BUS", session_bus_mock) + config = Config() + + server.serve(config) + + assert session_bus_mock.publish_object.call_count == 2 + assert session_bus_mock.register_service.call_count == 2 + assert ( + session_bus_mock.register_service.call_args_list[0][1]["flags"] + == DBUS_NAME_FLAG_REPLACE_EXISTING + ) + assert ( + session_bus_mock.register_service.call_args_list[1][1]["flags"] + == DBUS_NAME_FLAG_REPLACE_EXISTING + ) + + +def test_serve_cleanup_on_exception(monkeypatch): + event_loop_mock = mock.Mock() + event_loop_mock.return_value.run.side_effect = Exception("Test error") + session_bus_mock = mock.Mock() + monkeypatch.setattr(server, "EventLoop", event_loop_mock) + monkeypatch.setattr(server, "SESSION_BUS", session_bus_mock) + config = Config() + + try: + server.serve(config) + except Exception: + pass + + session_bus_mock.disconnect.assert_called_once() + + +def test_serve_creates_interfaces(monkeypatch): + event_loop_mock = mock.Mock() + session_bus_mock = mock.Mock() + monkeypatch.setattr(server, "EventLoop", event_loop_mock) + monkeypatch.setattr(server, "SESSION_BUS", session_bus_mock) + config = Config() + + server.serve(config) + + publish_calls = session_bus_mock.publish_object.call_args_list + assert len(publish_calls) == 2 + assert isinstance(publish_calls[0][0][1], server.QueryInterface) + assert isinstance(publish_calls[1][0][1], server.HistoryInterface) diff --git a/tests/dbus/test_structures.py b/tests/dbus/test_structures.py new file mode 100644 index 0000000..da6a2ff --- /dev/null +++ b/tests/dbus/test_structures.py @@ -0,0 +1,43 @@ +from command_line_assistant.dbus.structures import HistoryEntry, Message + + +def test_message_init(): + message = Message() + assert message.message == "" + + +def test_message_setter(): + message = Message() + message.message = "test message" + assert message.message == "test message" + + +def test_history_entry_init(): + history = HistoryEntry() + assert history.entries == [] + + +def test_history_entry_setter(): + history = HistoryEntry() + test_entries = ["entry1", "entry2", "entry3"] + history.entries = test_entries + assert history.entries == test_entries + + +def test_history_entry_empty_list(): + history = HistoryEntry() + history.entries = [] + assert history.entries == [] + + +def test_message_empty_string(): + message = Message() + message.message = "" + assert message.message == "" + + +def test_history_entry_single_entry(): + history = HistoryEntry() + history.entries = ["single entry"] + assert len(history.entries) == 1 + assert history.entries[0] == "single entry"