diff --git a/pepys_admin/maintenance/dialogs/progress_dialog.py b/pepys_admin/maintenance/dialogs/progress_dialog.py index f70fcf41f..a76001452 100644 --- a/pepys_admin/maintenance/dialogs/progress_dialog.py +++ b/pepys_admin/maintenance/dialogs/progress_dialog.py @@ -66,6 +66,7 @@ async def coroutine(): is_cancelled=self.is_cancelled, ), ) + get_app().progress_bar_finished.fire() self.future.set_result(result) except Exception as e: try: diff --git a/pepys_admin/maintenance/gui.py b/pepys_admin/maintenance/gui.py index 2e41b5231..5629e82bc 100644 --- a/pepys_admin/maintenance/gui.py +++ b/pepys_admin/maintenance/gui.py @@ -29,6 +29,7 @@ from prompt_toolkit.layout.menus import CompletionsMenu from prompt_toolkit.lexers.pygments import PygmentsLexer from prompt_toolkit.styles import Style +from prompt_toolkit.utils import Event from prompt_toolkit.widgets.base import Border, Label from pygments.lexers.sql import SqlLexer from sqlalchemy.orm import joinedload, undefer @@ -141,6 +142,8 @@ def __init__(self, data_store=None): ) self.app.dropdown_opened = False + self.app.progress_bar_finished = Event(self.app) + self.app.preview_table_updated = Event(self.app) def init_ui_components(self): """Initialise all of the UI components, controls, containers and widgets""" @@ -381,6 +384,7 @@ def filter_column_data(self, column_data): def run_query(self): """Runs the query as defined by the FilterWidget, and displays the result in the preview table.""" + logger.debug("Running query") if self.current_table_object is None: return @@ -460,6 +464,11 @@ def run_query(self): # the selected items label self.preview_table.current_values = [] self.update_selected_items_label() + app.invalidate() + + logger.debug("Firing preview updated") + logger.debug(f"{len(self.table_objects)=}") + self.app.preview_table_updated.fire() def get_table_data(self): return self.table_data @@ -582,6 +591,7 @@ def on_filter_widget_change(self, value): widget is sensible about this and only raises this event if there has actually been a change in the output of the filters property. That means we can run a query each time this is called, and the query shouldn't get run more often than is needed.""" + logger.debug("on filter widget change") # Convert the filter object to a SQL string to display in the Complete Query tab if value != []: filter_query = filter_widget_output_to_query( diff --git a/pepys_admin/maintenance/widgets/filter_widget.py b/pepys_admin/maintenance/widgets/filter_widget.py index 0ad87ff0f..ab051577e 100644 --- a/pepys_admin/maintenance/widgets/filter_widget.py +++ b/pepys_admin/maintenance/widgets/filter_widget.py @@ -15,6 +15,7 @@ ) CONTEXTUAL_HELP_STRING = "# Second panel: Build filters (F3)" +from loguru import logger class FilterWidget: @@ -104,6 +105,8 @@ def set_contextual_help(self, widget, text): def trigger_on_change(self, event=None): """Triggers the on_change_handler, if it is defined""" + logger.debug("trigger on change") + logger.debug(f"{self.filters=}") if self.on_change_handler is not None: if not list_deep_equals(self.filters, self.last_filters_output): # Only call event there is a difference from last time we called @@ -202,6 +205,7 @@ def filters(self): for entry_or_boolean in entries_and_booleans: strings = entry_or_boolean.get_string_values() + logger.debug(f"{strings=}") if strings[0] == self.column_prompt: # The column dropdown is still at the default value continue diff --git a/tests/gui_tests/test_gui_interactively.py b/tests/gui_tests/test_gui_interactively.py index 30f326b04..2a7cb16bf 100644 --- a/tests/gui_tests/test_gui_interactively.py +++ b/tests/gui_tests/test_gui_interactively.py @@ -9,157 +9,273 @@ from pepys_admin.maintenance.dialogs.help_dialog import HelpDialog from pepys_admin.maintenance.gui import MaintenanceGUI -# @asynccontextmanager -# async def create_app_and_pipe(datastore, show_output=False): -# inp = create_pipe_input() -# params = {"input": inp} -# if not show_output: -# params["output"] = DummyOutput() -# with create_app_session(**params): -# # Create our app -# gui = MaintenanceGUI(datastore) - -# app_task = asyncio.create_task(gui.app.run_async()) -# await asyncio.sleep(2) -# yield (inp, gui) +class InteractiveGUITest: + def __init__(self, datastore, show_output=False, delay=None): + self.datastore = datastore + self.show_output = show_output + self.delay = delay -# gui.app.exit() # or: app_task.cancel() - -# await app_task - - -# async def send_text_with_delay(inp, text, delay=0.5): -# # Just a key by itself -# if isinstance(text, Keys): -# char = REVERSE_ANSI_SEQUENCES[text] -# inp.send_text(char) -# await asyncio.sleep(delay) -# # A string or a list of keys -# for char in text: -# if isinstance(char, Keys): -# char = REVERSE_ANSI_SEQUENCES[char] -# inp.send_text(char) -# await asyncio.sleep(delay) + async def __aenter__(self): + input = DummyInput() + if self.show_output: + output = None + else: + output = DummyOutput() + self.cas = create_app_session(output=output, input=input) + self.cas.__enter__() -# # async def test_select_platform_type(test_datastore): -# # # Setup for our database access -# # async with create_app_and_pipe(test_datastore) as (inp, gui): -# # await send_text_with_delay(inp, "PlatformTy\r", 0.5) + self.gui = MaintenanceGUI(self.datastore) -# # # Check state here. -# # assert gui.current_table_object == test_datastore.db_classes.PlatformType + self.task = asyncio.create_task(self.gui.app.run_async()) + print("Created task") + self.ready_event = asyncio.Event() + self.gui.app.after_render += lambda _: self.ready_event.set() + await self.ready_event.wait() + self.ready_event.clear() + print("Ready") -# # # First entry is header, so we check 2nd entry -# # assert isinstance(gui.table_objects[1], test_datastore.db_classes.PlatformType) -# # assert gui.table_data[1] == ["Naval - aircraft"] -# # # 19 entries plus a header -# # assert len(gui.table_data) == 20 + self.progress_bar_event = asyncio.Event() + self.gui.app.progress_bar_finished += lambda _: self.progress_bar_event.set() + self.preview_updated = asyncio.Event() + self.gui.app.preview_table_updated += lambda _: self.preview_updated.set() -async def test_open_help_dialog(test_datastore): - # Test application in a dummy session. - input = DummyInput() - output = DummyOutput() - # output = None + return self - with create_app_session(output=output, input=input): - gui = MaintenanceGUI(test_datastore) + async def wait_for_progress_bar(self): + self.progress_bar_event.clear() + await self.progress_bar_event.wait() + self.progress_bar_event.clear() - # Run the application. - # We run it by scheduling the run_async coroutine in the current event - # loop. - task = asyncio.create_task(gui.app.run_async()) + await self.wait_for_redraw() - ready_event = asyncio.Event() - gui.app.after_render += lambda _: ready_event.set() - await ready_event.wait() - ready_event.clear() - print("Initial ready") + async def wait_for_redraw(self): + await self.ready_event.wait() + self.ready_event.clear() - # Send F1 to open the help dialog - gui.app.key_processor.feed(KeyPress(Keys.F1)) - gui.app.key_processor.process_keys() + async def wait_for_preview_table_update(self): + self.preview_updated.clear() + await self.preview_updated.wait() + self.preview_updated.clear() - await ready_event.wait() - ready_event.clear() - print("Ready after F1") + async def send_keys(self, keys): + if isinstance(keys, Keys): + keys = [keys] - # Check the help dialog has opened - assert isinstance(gui.current_dialog, HelpDialog) + for key in keys: + print(f"Sending {key}") + # print(f"{key}: {self.gui.app.layout.current_window.content.text()}") + # print("----") + self.gui.app.key_processor.feed(KeyPress(key)) + self.gui.app.key_processor.process_keys() - # Send Tab and Enter to exit the help dialog - gui.app.key_processor.feed(KeyPress(Keys.Tab, "\t")) - gui.app.key_processor.feed(KeyPress(Keys.ControlM, "\r")) - gui.app.key_processor.process_keys() + await self.wait_for_redraw() - # ready_event = asyncio.Event() - # gui.app.after_render += lambda _: ready_event.set() - await ready_event.wait() - ready_event.clear() - print("Ready after dialog exit") + if self.delay is not None: + await asyncio.sleep(self.delay) - # Check no dialog is open - assert gui.current_dialog is None + async def __aexit__(self, exc_type, exc_value, tb): + self.gui.app.exit() - # Send ESC to open the 'Do you want to exit?' dialog - gui.app.key_processor.feed(KeyPress(Keys.Escape)) - gui.app.key_processor.process_keys() + await self.task - # ready_event = asyncio.Event() - # gui.app.after_render += lambda _: ready_event.set() - await ready_event.wait() - ready_event.clear() + self.cas.__exit__(None, None, None) - gui.app.exit() + if exc_type is not None: + return False + else: + return True - # # Send Enter to say yes - # gui.app.key_processor.feed(KeyPress(Keys.ControlM, "\r")) - # gui.app.key_processor.process_keys() - # Wait for the application to properly terminate. - await task +async def test_new_open_help_dialog(test_datastore): + async with InteractiveGUITest(test_datastore, show_output=False) as gui_test: + await gui_test.send_keys([Keys.F1]) + assert isinstance(gui_test.gui.current_dialog, HelpDialog) -async def test_select_platform_type(test_datastore): - # Test application in a dummy session. - input = DummyInput() - output = DummyOutput() - # output = None + await gui_test.send_keys([Keys.Tab, Keys.ControlM]) - with create_app_session(output=output, input=input): - gui = MaintenanceGUI(test_datastore) + assert gui_test.gui.current_dialog is None - task = asyncio.create_task(gui.app.run_async()) - ready_event = asyncio.Event() - gui.app.after_render += lambda _: ready_event.set() - await ready_event.wait() - ready_event.clear() +async def test_new_select_platform_type(test_datastore): + async with InteractiveGUITest(test_datastore) as gui_test: + await gui_test.send_keys("PlatformTy") + await gui_test.send_keys([Keys.Enter]) - # Type 'PlatformTy' - which filters a dropdown box on every keypress - for letter in "PlatformTy": - gui.app.key_processor.feed(KeyPress(letter)) - gui.app.key_processor.process_keys() + assert gui_test.gui.current_table_object == test_datastore.db_classes.PlatformType - await ready_event.wait() - ready_event.clear() + await gui_test.wait_for_progress_bar() - # Send Enter to select the 'PlatformTypes' entry - gui.app.key_processor.feed(KeyPress(Keys.ControlM, "\r")) - gui.app.key_processor.process_keys() + # First entry is header, so we check 2nd entry + assert isinstance(gui_test.gui.table_objects[1], test_datastore.db_classes.PlatformType) + assert gui_test.gui.table_data[1] == ["Naval - aircraft"] + # 19 entries plus a header + assert len(gui_test.gui.table_data) == 20 - await ready_event.wait() - ready_event.clear() - assert gui.current_table_object == test_datastore.db_classes.PlatformType +async def test_tabbing(test_datastore): + async with InteractiveGUITest(test_datastore, show_output=True, delay=0.5) as gui_test: + await gui_test.send_keys("Plat") + await gui_test.send_keys([Keys.Enter]) - # Send Enter to say yes - gui.app.exit() + await gui_test.send_keys([Keys.Tab]) + await gui_test.send_keys([Keys.Tab]) + await gui_test.send_keys([Keys.Tab]) - # Wait for the application to properly terminate. - await task + +async def test_filtering(test_datastore): + async with InteractiveGUITest(test_datastore, show_output=True) as gui_test: + await gui_test.send_keys("Plat") + await gui_test.send_keys(Keys.Enter) + + # await gui_test.wait_for_progress_bar() + + await gui_test.send_keys( + [ + Keys.Tab, + Keys.Enter, + Keys.Down, + Keys.Enter, + Keys.Tab, + Keys.Tab, + Keys.Enter, + Keys.Down, + Keys.Enter, + ] + ) + await asyncio.sleep(2) + # await gui_test.wait_for_preview_table_update() + + assert len(gui_test.gui.table_data) == 2 + + # async with create_app_and_pipe(test_datastore, show_output=True) as (inp, gui): + # await send_text_with_delay(inp, "Plat\r", 0.5) + # await asyncio.sleep(1) + # await send_text_with_delay( + # inp, ["\t", Keys.Down, Keys.Down, "\r", "\t", "\t", Keys.Down, "\r"] + # ) + + # assert len(gui.table_data) == 2 + + +# async def test_open_help_dialog(test_datastore): +# # Test application in a dummy session. +# input = DummyInput() +# output = DummyOutput() +# # output = None + +# with create_app_session(output=output, input=input): +# gui = MaintenanceGUI(test_datastore) + +# # Run the application. +# # We run it by scheduling the run_async coroutine in the current event +# # loop. +# task = asyncio.create_task(gui.app.run_async()) + +# ready_event = asyncio.Event() +# gui.app.after_render += lambda _: ready_event.set() +# await ready_event.wait() +# ready_event.clear() +# print("Initial ready") + +# # Send F1 to open the help dialog +# gui.app.key_processor.feed(KeyPress(Keys.F1)) +# gui.app.key_processor.process_keys() + +# await ready_event.wait() +# ready_event.clear() +# print("Ready after F1") + +# # Check the help dialog has opened +# assert isinstance(gui.current_dialog, HelpDialog) + +# # Send Tab and Enter to exit the help dialog +# gui.app.key_processor.feed(KeyPress(Keys.Tab, "\t")) +# gui.app.key_processor.feed(KeyPress(Keys.ControlM, "\r")) +# gui.app.key_processor.process_keys() + +# # ready_event = asyncio.Event() +# # gui.app.after_render += lambda _: ready_event.set() +# await ready_event.wait() +# ready_event.clear() +# print("Ready after dialog exit") + +# # Check no dialog is open +# assert gui.current_dialog is None + +# # Send ESC to open the 'Do you want to exit?' dialog +# gui.app.key_processor.feed(KeyPress(Keys.Escape)) +# gui.app.key_processor.process_keys() + +# # ready_event = asyncio.Event() +# # gui.app.after_render += lambda _: ready_event.set() +# await ready_event.wait() +# ready_event.clear() + +# gui.app.exit() + +# # Send Enter to say yes +# # gui.app.key_processor.feed(KeyPress(Keys.ControlM, "\r")) +# # gui.app.key_processor.process_keys() + +# # Wait for the application to properly terminate. +# await task + + +# async def test_select_platform_type(test_datastore): +# # Test application in a dummy session. +# input = DummyInput() +# output = DummyOutput() +# # output = None + +# with create_app_session(output=output, input=input): +# gui = MaintenanceGUI(test_datastore) + +# task = asyncio.create_task(gui.app.run_async()) + +# ready_event = asyncio.Event() +# gui.app.after_render += lambda _: ready_event.set() +# await ready_event.wait() +# ready_event.clear() + +# # Type 'PlatformTy' - which filters a dropdown box on every keypress +# for letter in "PlatformTy": +# gui.app.key_processor.feed(KeyPress(letter)) +# gui.app.key_processor.process_keys() + +# await ready_event.wait() +# ready_event.clear() + +# # Send Enter to select the 'PlatformTypes' entry +# gui.app.key_processor.feed(KeyPress(Keys.ControlM, "\r")) +# gui.app.key_processor.process_keys() + +# await ready_event.wait() +# ready_event.clear() + +# assert gui.current_table_object == test_datastore.db_classes.PlatformType + +# progress_bar_event = asyncio.Event() +# gui.app.progress_bar_finished += lambda _: progress_bar_event.set() +# await progress_bar_event.wait() +# progress_bar_event.clear() + +# await ready_event.wait() +# ready_event.clear() + +# # First entry is header, so we check 2nd entry +# assert isinstance(gui.table_objects[1], test_datastore.db_classes.PlatformType) +# assert gui.table_data[1] == ["Naval - aircraft"] +# # 19 entries plus a header +# assert len(gui.table_data) == 20 + +# gui.app.exit() + +# # Wait for the application to properly terminate. +# await task # async def test_show_help(test_datastore):