From 398be84bcfb207fe42c1d53277aa64fb1c6c4b13 Mon Sep 17 00:00:00 2001 From: Kenneth Yang Date: Fri, 3 Jan 2025 16:55:01 -0800 Subject: [PATCH 01/14] Add strict basedpyright --- pyproject.toml | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index d083bb0..a1c451b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -61,10 +61,13 @@ exclude = ["/.github", "/.idea"] python = "3.13.1" dependencies = [ "pyinstaller==6.11.1", + "basedpyright==1.23.1" ] [tool.hatch.envs.default.scripts] exe = "pyinstaller.exe ephys_link.spec -y -- -d && pyinstaller.exe ephys_link.spec -y" exe-clean = "pyinstaller.exe ephys_link.spec -y --clean" +check = "basedpyright" +check-watched = "basedpyright --watch" [tool.hatch.envs.docs] python = "3.13.1" @@ -83,5 +86,6 @@ build = "mkdocs build" [tool.ruff] unsafe-fixes = true -[tool.ruff.lint] -extend-ignore = ["DTZ005"] \ No newline at end of file +[tool.basedpyright] +include = ["src/ephys_link"] +strict = ["src/ephys_link"] From 4408b5d991ad90b7b263bf19183b955392b69719 Mon Sep 17 00:00:00 2001 From: Kenneth Yang Date: Fri, 3 Jan 2025 16:58:21 -0800 Subject: [PATCH 02/14] Start fixing --- src/ephys_link/utils/common.py | 2 +- src/ephys_link/utils/console.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/ephys_link/utils/common.py b/src/ephys_link/utils/common.py index 114177c..e85d28e 100644 --- a/src/ephys_link/utils/common.py +++ b/src/ephys_link/utils/common.py @@ -50,7 +50,7 @@ def check_for_updates() -> None: """Check for updates to the Ephys Link.""" try: response = get("https://api.github.com/repos/VirtualBrainLab/ephys-link/tags", timeout=10) - latest_version = response.json()[0]["name"] + latest_version = str(response.json()[0]["name"]) if parse(latest_version) > parse(__version__): print(f"Update available: {latest_version} !") print("Download at: https://github.com/VirtualBrainLab/ephys-link/releases/latest") diff --git a/src/ephys_link/utils/console.py b/src/ephys_link/utils/console.py index 23b64e9..be5bc28 100644 --- a/src/ephys_link/utils/console.py +++ b/src/ephys_link/utils/console.py @@ -7,11 +7,13 @@ """ from logging import DEBUG, ERROR, INFO, basicConfig, getLogger +from typing import final from rich.logging import RichHandler from rich.traceback import install +@final class Console: def __init__(self, *, enable_debug: bool) -> None: """Initialize console properties. @@ -33,7 +35,7 @@ def __init__(self, *, enable_debug: bool) -> None: self._log.setLevel(DEBUG if enable_debug else INFO) # Install Rich traceback. - install() + _=install() def debug_print(self, label: str, msg: str) -> None: """Print a debug message to the console. From 545584ebc632eddc4c114267c532e34122d025eb Mon Sep 17 00:00:00 2001 From: Kenneth Yang Date: Sat, 4 Jan 2025 17:34:33 -0800 Subject: [PATCH 03/14] Remove base binding constructor --- src/ephys_link/back_end/platform_handler.py | 3 ++- src/ephys_link/bindings/fake_binding.py | 3 +-- src/ephys_link/bindings/mpm_binding.py | 1 - src/ephys_link/bindings/ump_4_binding.py | 3 +-- src/ephys_link/utils/base_binding.py | 3 --- src/ephys_link/utils/common.py | 2 +- 6 files changed, 5 insertions(+), 10 deletions(-) diff --git a/src/ephys_link/back_end/platform_handler.py b/src/ephys_link/back_end/platform_handler.py index c3152ce..393b51b 100644 --- a/src/ephys_link/back_end/platform_handler.py +++ b/src/ephys_link/back_end/platform_handler.py @@ -25,6 +25,7 @@ ) from vbl_aquarium.models.unity import Vector4 +from ephys_link.bindings.mpm_binding import MPMBinding from ephys_link.utils.base_binding import BaseBinding from ephys_link.utils.common import get_bindings, vector4_to_array from ephys_link.utils.console import Console @@ -73,7 +74,7 @@ def _get_binding_instance(self, options: EphysLinkOptions) -> BaseBinding: if binding_cli_name == options.type: # Pass in HTTP port for Pathfinder MPM. if binding_cli_name == "pathfinder-mpm": - return binding_type(options.mpm_port) + return MPMBinding(options.mpm_port) # Otherwise just return the binding. return binding_type() diff --git a/src/ephys_link/bindings/fake_binding.py b/src/ephys_link/bindings/fake_binding.py index 01db180..d200f43 100644 --- a/src/ephys_link/bindings/fake_binding.py +++ b/src/ephys_link/bindings/fake_binding.py @@ -5,10 +5,9 @@ class FakeBinding(BaseBinding): - def __init__(self, *args, **kwargs) -> None: + def __init__(self) -> None: """Initialize fake manipulator infos.""" - super().__init__(*args, **kwargs) self._positions = [Vector4() for _ in range(8)] self._angles = [ Vector3(x=90, y=60, z=0), diff --git a/src/ephys_link/bindings/mpm_binding.py b/src/ephys_link/bindings/mpm_binding.py index 304e585..15053cf 100644 --- a/src/ephys_link/bindings/mpm_binding.py +++ b/src/ephys_link/bindings/mpm_binding.py @@ -78,7 +78,6 @@ def __init__(self, port: int = 8080, *args, **kwargs) -> None: Args: port: Port number for MPM HTTP server. """ - super().__init__(*args, **kwargs) self._url = f"http://localhost:{port}" self._movement_stopped = False diff --git a/src/ephys_link/bindings/ump_4_binding.py b/src/ephys_link/bindings/ump_4_binding.py index afa7060..a6de67c 100644 --- a/src/ephys_link/bindings/ump_4_binding.py +++ b/src/ephys_link/bindings/ump_4_binding.py @@ -22,11 +22,10 @@ class Ump4Binding(BaseBinding): """Bindings for UMP-4 platform""" - def __init__(self, *args, **kwargs) -> None: + def __init__(self) -> None: """Initialize UMP-4 bindings.""" # Establish connection to Sensapex API (exit if connection fails). - super().__init__(*args, **kwargs) UMP.set_library_path(RESOURCES_DIRECTORY) self._ump = UMP.get_ump() if self._ump is None: diff --git a/src/ephys_link/utils/base_binding.py b/src/ephys_link/utils/base_binding.py index c051c43..516751a 100644 --- a/src/ephys_link/utils/base_binding.py +++ b/src/ephys_link/utils/base_binding.py @@ -17,9 +17,6 @@ class BaseBinding(ABC): No need to catch exceptions as the [Platform Handler][ephys_link.back_end.platform_handler] will catch them. """ - def __init__(self, *args, **kwargs): - """Initialize the platform binding with any necessary arguments.""" - @staticmethod @abstractmethod def get_display_name() -> str: diff --git a/src/ephys_link/utils/common.py b/src/ephys_link/utils/common.py index e85d28e..ec22199 100644 --- a/src/ephys_link/utils/common.py +++ b/src/ephys_link/utils/common.py @@ -50,7 +50,7 @@ def check_for_updates() -> None: """Check for updates to the Ephys Link.""" try: response = get("https://api.github.com/repos/VirtualBrainLab/ephys-link/tags", timeout=10) - latest_version = str(response.json()[0]["name"]) + latest_version = str(response.json()[0]["name"]) # pyright: ignore [reportAny] if parse(latest_version) > parse(__version__): print(f"Update available: {latest_version} !") print("Download at: https://github.com/VirtualBrainLab/ephys-link/releases/latest") From 4e03716ab8402a1436784ec8447e0f99e91aab34 Mon Sep 17 00:00:00 2001 From: Kenneth Yang Date: Sat, 4 Jan 2025 18:21:11 -0800 Subject: [PATCH 04/14] WIP, working on typing sensapex --- src/ephys_link/back_end/platform_handler.py | 2 +- src/ephys_link/bindings/fake_binding.py | 4 +- src/ephys_link/bindings/mpm_binding.py | 4 +- src/ephys_link/bindings/ump_4_binding.py | 43 +- src/ephys_link/front_end/cli.py | 25 +- src/ephys_link/front_end/gui.py | 14 +- src/ephys_link/utils/base_binding.py | 2 +- src/ephys_link/utils/common.py | 16 +- src/ephys_link/utils/console.py | 2 +- typings/sensapex/__init__.pyi | 6 + typings/sensapex/accuracy_test.pyi | 55 +++ typings/sensapex/pressure_test.pyi | 21 + typings/sensapex/sensapex.pyi | 459 ++++++++++++++++++++ typings/sensapex/test.pyi | 13 + typings/sensapex/utils.pyi | 8 + 15 files changed, 627 insertions(+), 47 deletions(-) create mode 100644 typings/sensapex/__init__.pyi create mode 100644 typings/sensapex/accuracy_test.pyi create mode 100644 typings/sensapex/pressure_test.pyi create mode 100644 typings/sensapex/sensapex.pyi create mode 100644 typings/sensapex/test.pyi create mode 100644 typings/sensapex/utils.pyi diff --git a/src/ephys_link/back_end/platform_handler.py b/src/ephys_link/back_end/platform_handler.py index 393b51b..4b811f2 100644 --- a/src/ephys_link/back_end/platform_handler.py +++ b/src/ephys_link/back_end/platform_handler.py @@ -104,7 +104,7 @@ async def get_platform_info(self) -> PlatformInfo: name=self._bindings.get_display_name(), cli_name=self._bindings.get_cli_name(), axes_count=await self._bindings.get_axes_count(), - dimensions=await self._bindings.get_dimensions(), + dimensions=self._bindings.get_dimensions(), ) # Manipulator commands. diff --git a/src/ephys_link/bindings/fake_binding.py b/src/ephys_link/bindings/fake_binding.py index d200f43..2df2328 100644 --- a/src/ephys_link/bindings/fake_binding.py +++ b/src/ephys_link/bindings/fake_binding.py @@ -1,7 +1,7 @@ from vbl_aquarium.models.unity import Vector3, Vector4 from ephys_link.utils.base_binding import BaseBinding -from ephys_link.utils.common import array_to_vector4 +from ephys_link.utils.common import list_to_vector4 class FakeBinding(BaseBinding): @@ -35,7 +35,7 @@ async def get_axes_count(self) -> int: return 4 def get_dimensions(self) -> Vector4: - return array_to_vector4([20] * 4) + return list_to_vector4([20] * 4) async def get_position(self, manipulator_id: str) -> Vector4: return self._positions[int(manipulator_id)] diff --git a/src/ephys_link/bindings/mpm_binding.py b/src/ephys_link/bindings/mpm_binding.py index 15053cf..75be694 100644 --- a/src/ephys_link/bindings/mpm_binding.py +++ b/src/ephys_link/bindings/mpm_binding.py @@ -72,7 +72,7 @@ class MPMBinding(BaseBinding): COARSE_SPEED_THRESHOLD = 0.1 INSERTION_SPEED_LIMIT = 9_000 - def __init__(self, port: int = 8080, *args, **kwargs) -> None: + def __init__(self, port: int = 8080) -> None: """Initialize connection to MPM HTTP server. Args: @@ -95,7 +95,7 @@ async def get_manipulators(self) -> list[str]: async def get_axes_count(self) -> int: return 3 - async def get_dimensions(self) -> Vector4: + def get_dimensions(self) -> Vector4: return Vector4(x=15, y=15, z=15, w=15) async def get_position(self, manipulator_id: str) -> Vector4: diff --git a/src/ephys_link/bindings/ump_4_binding.py b/src/ephys_link/bindings/ump_4_binding.py index a6de67c..d1d16fe 100644 --- a/src/ephys_link/bindings/ump_4_binding.py +++ b/src/ephys_link/bindings/ump_4_binding.py @@ -4,14 +4,15 @@ """ from asyncio import get_running_loop +from typing import NoReturn, final, override from sensapex import UMP, SensapexDevice -from vbl_aquarium.models.unity import Vector3, Vector4 +from vbl_aquarium.models.unity import Vector4 from ephys_link.utils.base_binding import BaseBinding from ephys_link.utils.common import ( RESOURCES_DIRECTORY, - array_to_vector4, + list_to_vector4, scalar_mm_to_um, um_to_mm, vector4_to_array, @@ -19,6 +20,7 @@ ) +@final class Ump4Binding(BaseBinding): """Bindings for UMP-4 platform""" @@ -28,32 +30,35 @@ def __init__(self) -> None: # Establish connection to Sensapex API (exit if connection fails). UMP.set_library_path(RESOURCES_DIRECTORY) self._ump = UMP.get_ump() - if self._ump is None: - error_message = "Unable to connect to uMp" - raise ValueError(error_message) @staticmethod + @override def get_display_name() -> str: return "Sensapex uMp-4" @staticmethod + @override def get_cli_name() -> str: return "ump-4" + @override async def get_manipulators(self) -> list[str]: return list(map(str, self._ump.list_devices())) + @override async def get_axes_count(self) -> int: return 4 - async def get_dimensions(self) -> Vector4: + @override + def get_dimensions(self) -> Vector4: return Vector4(x=20, y=20, z=20, w=20) + @override async def get_position(self, manipulator_id: str) -> Vector4: - return um_to_mm(array_to_vector4(self._get_device(manipulator_id).get_pos(1))) + return um_to_mm(list_to_vector4(self._get_device(manipulator_id).get_pos(1))) - # noinspection PyTypeChecker - async def get_angles(self, _: str) -> Vector3: + @override + async def get_angles(self, manipulator_id: str) -> NoReturn: """uMp-4 does not support getting angles so raise an error. Raises: @@ -62,8 +67,8 @@ async def get_angles(self, _: str) -> Vector3: error_message = "UMP-4 does not support getting angles" raise AttributeError(error_message) - # noinspection PyTypeChecker - async def get_shank_count(self, _: str) -> int: + @override + async def get_shank_count(self, manipulator_id: str) -> NoReturn: """uMp-4 does not support getting shank count so raise an error. Raises: @@ -72,10 +77,11 @@ async def get_shank_count(self, _: str) -> int: error_message = "UMP-4 does not support getting shank count" raise AttributeError(error_message) + @override def get_movement_tolerance(self) -> float: return 0.001 - # noinspection DuplicatedCode + @override async def set_position(self, manipulator_id: str, position: Vector4, speed: float) -> Vector4: # Convert position to micrometers. target_position_um = vector_mm_to_um(position) @@ -86,15 +92,21 @@ async def set_position(self, manipulator_id: str, position: Vector4, speed: floa ) # Wait for movement to finish. - await get_running_loop().run_in_executor(None, movement.finished_event.wait, None) + _ = await get_running_loop().run_in_executor(None, movement.finished_event.wait, None) # Handle interrupted movement. if movement.interrupted: error_message = f"Manipulator {manipulator_id} interrupted: {movement.interrupt_reason}" raise RuntimeError(error_message) - return um_to_mm(array_to_vector4(movement.last_pos)) + # Handle empty end position. + if not movement.last_pos: + error_message = f"Manipulator {manipulator_id} did not reach target position" + raise RuntimeError(error_message) + + return um_to_mm(list_to_vector4(movement.last_pos)) + @override async def set_depth(self, manipulator_id: str, depth: float, speed: float) -> float: # Augment current position with depth. current_position = await self.get_position(manipulator_id) @@ -106,9 +118,11 @@ async def set_depth(self, manipulator_id: str, depth: float, speed: float) -> fl # Return the final depth. return float(final_platform_position.w) + @override async def stop(self, manipulator_id: str) -> None: self._get_device(manipulator_id).stop() + @override def platform_space_to_unified_space(self, platform_space: Vector4) -> Vector4: # unified <- platform # +x <- +y @@ -123,6 +137,7 @@ def platform_space_to_unified_space(self, platform_space: Vector4) -> Vector4: w=platform_space.w, ) + @override def unified_space_to_platform_space(self, unified_space: Vector4) -> Vector4: # platform <- unified # +x <- +z diff --git a/src/ephys_link/front_end/cli.py b/src/ephys_link/front_end/cli.py index a5808a4..ad42689 100644 --- a/src/ephys_link/front_end/cli.py +++ b/src/ephys_link/front_end/cli.py @@ -9,12 +9,14 @@ """ from argparse import ArgumentParser +from typing import final from vbl_aquarium.models.ephys_link import EphysLinkOptions from ephys_link.__about__ import __version__ as version +@final class CLI: """Command-line interface for the Electrophysiology Manipulator Link. @@ -25,22 +27,21 @@ def __init__(self) -> None: """Initialize CLI parser.""" self._parser = ArgumentParser( - description="Electrophysiology Manipulator Link:" - " a Socket.IO interface for manipulators in electrophysiology experiments.", + description="Electrophysiology Manipulator Link: a Socket.IO interface for manipulators in electrophysiology experiments.", prog="python -m ephys-link", ) - self._parser.add_argument( + _ = self._parser.add_argument( "-b", "--background", dest="background", action="store_true", help="Skip configuration window." ) - self._parser.add_argument( + _ = self._parser.add_argument( "-i", "--ignore-updates", dest="ignore_updates", action="store_true", help="Skip (ignore) checking for updates.", ) - self._parser.add_argument( + _ = self._parser.add_argument( "-t", "--type", type=str, @@ -48,21 +49,21 @@ def __init__(self) -> None: default="ump-4", help='Manipulator type (i.e. "ump-4", "pathfinder-mpm", "fake"). Default: "ump-4".', ) - self._parser.add_argument( + _ = self._parser.add_argument( "-d", "--debug", dest="debug", action="store_true", help="Enable debug mode.", ) - self._parser.add_argument( + _ = self._parser.add_argument( "-p", "--use-proxy", dest="use_proxy", action="store_true", help="Enable proxy mode.", ) - self._parser.add_argument( + _ = self._parser.add_argument( "-a", "--proxy-address", type=str, @@ -70,14 +71,14 @@ def __init__(self) -> None: dest="proxy_address", help="Proxy IP address.", ) - self._parser.add_argument( + _ = self._parser.add_argument( "--mpm-port", type=int, default=8080, dest="mpm_port", help="Port New Scale Pathfinder MPM's server is on. Default: 8080.", ) - self._parser.add_argument( + _ = self._parser.add_argument( "-s", "--serial", type=str, @@ -86,7 +87,7 @@ def __init__(self) -> None: nargs="?", help="Emergency stop serial port (i.e. COM3). Default: disables emergency stop.", ) - self._parser.add_argument( + _ = self._parser.add_argument( "-v", "--version", action="version", @@ -100,4 +101,4 @@ def parse_args(self) -> EphysLinkOptions: Returns: Parsed arguments """ - return EphysLinkOptions(**vars(self._parser.parse_args())) + return EphysLinkOptions(**vars(self._parser.parse_args())) # pyright: ignore [reportAny] diff --git a/src/ephys_link/front_end/gui.py b/src/ephys_link/front_end/gui.py index 6430973..1a2e3ae 100644 --- a/src/ephys_link/front_end/gui.py +++ b/src/ephys_link/front_end/gui.py @@ -14,6 +14,7 @@ from socket import gethostbyname, gethostname from sys import exit from tkinter import CENTER, RIGHT, BooleanVar, E, IntVar, StringVar, Tk, ttk +from typing import final from platformdirs import user_config_dir from vbl_aquarium.models.ephys_link import EphysLinkOptions @@ -27,6 +28,7 @@ OPTIONS_PATH = join(OPTIONS_DIR, OPTIONS_FILENAME) +@final class GUI: """Graphical User Interface for Ephys Link. @@ -44,7 +46,7 @@ def __init__(self) -> None: # Read options. if exists(OPTIONS_PATH): with open(OPTIONS_PATH) as options_file: - options = EphysLinkOptions(**load(options_file)) + options = EphysLinkOptions(**load(options_file)) # pyright: ignore [reportAny] # Load options into GUI variables. self._ignore_updates = BooleanVar(value=options.ignore_updates) @@ -87,7 +89,7 @@ def get_options(self) -> EphysLinkOptions: # Save options. makedirs(OPTIONS_DIR, exist_ok=True) with open(OPTIONS_PATH, "w+") as options_file: - options_file.write(options.model_dump_json()) + _ = options_file.write(options.model_dump_json()) # Return options return options @@ -99,10 +101,10 @@ def _build_gui(self) -> None: mainframe = ttk.Frame(self._root, padding=3) mainframe.grid(column=0, row=0, sticky="news") - self._root.columnconfigure(0, weight=1) - self._root.rowconfigure(0, weight=1) - mainframe.columnconfigure(0, weight=1) - mainframe.rowconfigure(0, weight=1) + _ = self._root.columnconfigure(0, weight=1) + _ = self._root.rowconfigure(0, weight=1) + _ = mainframe.columnconfigure(0, weight=1) + _ = mainframe.rowconfigure(0, weight=1) # Server serving settings. diff --git a/src/ephys_link/utils/base_binding.py b/src/ephys_link/utils/base_binding.py index 516751a..a30dad0 100644 --- a/src/ephys_link/utils/base_binding.py +++ b/src/ephys_link/utils/base_binding.py @@ -46,7 +46,7 @@ async def get_axes_count(self) -> int: """ @abstractmethod - async def get_dimensions(self) -> Vector4: + def get_dimensions(self) -> Vector4: """Get the dimensions of the manipulators on the current platform (mm). For 3-axis manipulators, copy the dimension of the axis parallel to the probe into w. diff --git a/src/ephys_link/utils/common.py b/src/ephys_link/utils/common.py index ec22199..b458ea3 100644 --- a/src/ephys_link/utils/common.py +++ b/src/ephys_link/utils/common.py @@ -50,7 +50,7 @@ def check_for_updates() -> None: """Check for updates to the Ephys Link.""" try: response = get("https://api.github.com/repos/VirtualBrainLab/ephys-link/tags", timeout=10) - latest_version = str(response.json()[0]["name"]) # pyright: ignore [reportAny] + latest_version = str(response.json()[0]["name"]) # pyright: ignore [reportAny] if parse(latest_version) > parse(__version__): print(f"Update available: {latest_version} !") print("Download at: https://github.com/VirtualBrainLab/ephys-link/releases/latest") @@ -132,17 +132,17 @@ def vector4_to_array(vector4: Vector4) -> list[float]: return [vector4.x, vector4.y, vector4.z, vector4.w] -def array_to_vector4(array: list[float]) -> Vector4: +def list_to_vector4(float_list: list[float | int]) -> Vector4: """Convert a list of floats to a [Vector4][vbl_aquarium.models.unity.Vector4]. Args: - array: List of floats. + float_list: List of floats. Returns: First four elements of the list as a Vector4 padded with zeros if necessary. """ - def get_element(this_array: list[float], index: int) -> float: + def get_element(this_array: list[float | int], index: int) -> float: """Safely get an element from an array. Return 0 if the index is out of bounds. @@ -160,8 +160,8 @@ def get_element(this_array: list[float], index: int) -> float: return 0.0 return Vector4( - x=get_element(array, 0), - y=get_element(array, 1), - z=get_element(array, 2), - w=get_element(array, 3), + x=get_element(float_list, 0), + y=get_element(float_list, 1), + z=get_element(float_list, 2), + w=get_element(float_list, 3), ) diff --git a/src/ephys_link/utils/console.py b/src/ephys_link/utils/console.py index be5bc28..80f0d70 100644 --- a/src/ephys_link/utils/console.py +++ b/src/ephys_link/utils/console.py @@ -35,7 +35,7 @@ def __init__(self, *, enable_debug: bool) -> None: self._log.setLevel(DEBUG if enable_debug else INFO) # Install Rich traceback. - _=install() + _ = install() def debug_print(self, label: str, msg: str) -> None: """Print a debug message to the console. diff --git a/typings/sensapex/__init__.pyi b/typings/sensapex/__init__.pyi new file mode 100644 index 0000000..705c3cd --- /dev/null +++ b/typings/sensapex/__init__.pyi @@ -0,0 +1,6 @@ +""" +This type stub file was generated by pyright. +""" + + +__version__ = ... diff --git a/typings/sensapex/accuracy_test.pyi b/typings/sensapex/accuracy_test.pyi new file mode 100644 index 0000000..b8d1ac5 --- /dev/null +++ b/typings/sensapex/accuracy_test.pyi @@ -0,0 +1,55 @@ +""" +This type stub file was generated by pyright. +""" + +import sys + +import numpy as np + +parser = ... +args = ... +ump = ... +if args.debug: + ... +devids = ... +devs = ... +dev = ... +app = ... +win = ... +plots = ... +errplots = ... +if args.linear: + linerrplots = ... +start = ... +pos = ... +tgt = ... +err = ... +closest = ... +linear_err = ... +bus = ... +mov = ... +times = ... +lastupdate = ... +def update(moving=...): # -> None: + ... + +def update_plots(): # -> None: + ... + +start_pos = ... +diffs = ... +errs = ... +positions = ... +n_axes = ... +if args.test_pos is None: + moves = ... + move_axes = ... + targets = ... +else: + test_pos = np.array(list(map(float, args.test_pos.split(",")))) + targets = np.zeros((args.iter, 3)) +speeds = ... +if args.retry_threshold is not None: + ... +if sys.flags.interactive == 0: + ... diff --git a/typings/sensapex/pressure_test.pyi b/typings/sensapex/pressure_test.pyi new file mode 100644 index 0000000..416da2a --- /dev/null +++ b/typings/sensapex/pressure_test.pyi @@ -0,0 +1,21 @@ +""" +This type stub file was generated by pyright. +""" + +import sys + +parser = ... +args = ... +ump = ... +if args.debug: + ... +devids = ... +devs = ... +dev = ... +app = ... +win = ... +plots = ... +err_plots = ... +err_curves = ... +if sys.flags.interactive == 0: + ... diff --git a/typings/sensapex/sensapex.pyi b/typings/sensapex/sensapex.pyi new file mode 100644 index 0000000..7359e5c --- /dev/null +++ b/typings/sensapex/sensapex.pyi @@ -0,0 +1,459 @@ +""" +This type stub file was generated by pyright. +""" + +import platform +import subprocess +import sys +import threading +from ctypes import Structure + +if sys.platform == "win32": + DUMPCAP = ... +else: + DUMPCAP = ... +SOCKET = ... +if sys.platform == "win32" and platform.architecture()[0] == "64bit": + SOCKET = ... +LIBUM_MAX_MANIPULATORS = ... +LIBUM_MAX_LOG_LINE_LENGTH = ... +LIBUM_DEF_TIMEOUT = ... +LIBUM_DEF_BCAST_ADDRESS = ... +LIBUM_DEVICE_SUBNET = ... +LIBUM_DEF_GROUP = ... +LIBUM_MAX_MESSAGE_SIZE = ... +LIBUM_ARG_UNDEF = ... +X_AXIS = ... +Y_AXIS = ... +Z_AXIS = ... +D_AXIS = ... +LIBUM_NO_ERROR = ... +LIBUM_OS_ERROR = ... +LIBUM_NOT_OPEN = ... +LIBUM_TIMEOUT = ... +LIBUM_INVALID_ARG = ... +LIBUM_INVALID_DEV = ... +LIBUM_INVALID_RESP = ... +class sockaddr_in(Structure): + _fields_ = ... + + +log_func_ptr = ... +class um_positions(Structure): + _fields_ = ... + + +class um_state(Structure): + _fields_ = ... + + +class MoveRequest: + """Class for coordinating and tracking moves. + """ + max_attempts = ... + def __init__(self, ump, dev, dest, speed, simultaneous=..., linear=..., max_acceleration=..., retry_threshold=...) -> None: + ... + + def interrupt(self, reason): # -> None: + ... + + def finish(self): # -> None: + ... + + def start(self): # -> None: + ... + + def is_in_progress(self): + ... + + def can_retry(self): # -> bool: + ... + + def is_close_enough(self): # -> numpy.bool[builtins.bool]: + ... + + def has_more_calls_to_make(self): # -> bool: + ... + + def make_next_call(self): # -> None: + ... + + + +class UMError(Exception): + def __init__(self, msg, errno, oserrno) -> None: + ... + + + +_timer_offset = ... +def timer(): # -> float: + ... + +class UMP: + """Wrapper for the Sensapex uMp API. + + All calls except get_ump are thread-safe. + """ + _pcap_proc: subprocess.Popen | None + _last_move: dict[int, MoveRequest] + _lib = ... + _lib_path = ... + _single = ... + _um_state = ... + _debug_at_cls = ... + _default_group = ... + _default_address = ... + @classmethod + def set_library_path(cls, path: str): # -> None: + ... + + @classmethod + def set_default_address(cls, address): # -> None: + ... + + @classmethod + def set_default_group(cls, group): # -> None: + ... + + @classmethod + def get_lib(cls): # -> WinDLL | CDLL: + ... + + @classmethod + def load_lib(cls): # -> WinDLL | CDLL: + ... + + @classmethod + def get_um_state_class(cls): # -> type[um_state]: + ... + + @classmethod + def get_ump(cls, address=..., group=..., start_poller=...) -> UMP: + """Return a singleton UM instance. + """ + + def __init__(self, address, group, start_poller=...) -> None: + ... + + def set_timeout(self, value: int): # -> None: + ... + + @classmethod + def set_debug_mode(cls, enabled: bool) -> None: + ... + + def create_debug_archive(self) -> str: + """Zip up the debug log and all pcap files for distribution to Sensapex.""" + + def get_device(self, dev_id, *args, **kwargs) -> SensapexDevice: + """ + + Returns + ------- + SensapexDevice + """ + + def sdk_version(self): # -> Any: + """Return version of UM SDK. + """ + + def list_devices(self, max_id=...): # -> list[Any]: + """Return a list of all connected device IDs. + """ + + def axis_count(self, dev): # -> Any | Literal[4]: + ... + + def set_axis_count(self, dev, count): # -> None: + ... + + def call(self, fn, *args, retries: int | None = ...): # -> Any: + ... + + def set_max_acceleration(self, dev, max_acc): # -> None: + ... + + def open(self, address, group): # -> None: + """Open the UM devices at the given address. + + The default address "169.254.255.255" should suffice in most situations. + """ + + def close(self): # -> None: + """Close the UM device. + """ + + @staticmethod + def is_positionable(dev_id): + ... + + def get_pos(self, dev, timeout=...): # -> list[float | int]: + """Return the absolute position of the specified device (in um). + + If *timeout* == 0, then the position is returned directly from cache + and not queried from the device. + """ + + def goto_pos(self, dev, dest, speed, simultaneous=..., linear=..., max_acceleration=...): # -> MoveRequest: + """Request the specified device to move to an absolute position (in um). + + Parameters + ---------- + dev : int + ID of device to move + dest : array-like + X,Y,Z,W coordinates to move to. Values may be NaN or omitted to leave + the axis unaffected. + speed : float + Manipulator speed in um/sec + simultaneous: bool + If True, then all axes begin moving at the same time + linear : bool + If True, then axis speeds are scaled to produce more linear movement, requires simultaneous + max_acceleration : int + Maximum acceleration in um/s^2 + + Returns + ------- + move_request : MoveRequest + Object that can be used to retrieve the status of this move at a later time. + """ + + def is_busy(self, dev): # -> Any | Literal[False]: + """Return True if the specified device is currently moving. + + Note: this should not be used to determine whether a move has completed; + use MoveRequest.finished or .finished_event as returned from goto_pos(). + """ + + def stop(self, dev): # -> None: + """Stop the specified manipulator. + """ + + def set_pressure(self, dev, channel, value): # -> Any: + ... + + def get_pressure(self, dev, channel): # -> float: + ... + + def measure_pressure(self, dev, channel): # -> float: + ... + + def set_valve(self, dev, channel, value): # -> Any: + ... + + def get_valve(self, dev, channel): # -> Any: + ... + + def set_custom_slow_speed(self, dev, enabled): # -> Any: + ... + + def get_custom_slow_speed(self, dev): # -> Any: + ... + + def get_um_param(self, dev, param): # -> c_int: + ... + + def set_um_param(self, dev, param, value): # -> Any: + ... + + def run_um_cmd(self, dev_id, cmd, *args): # -> None: + ... + + def restart_device(self, dev_id): # -> None: + ... + + def set_device_group(self, dev_id, group): # -> None: + ... + + def calibrate_zero_position(self, dev): # -> None: + ... + + def calibrate_load(self, dev): # -> None: + ... + + def calibrate_pressure(self, dev, channel, delay): # -> None: + ... + + def led_control(self, dev, off): # -> None: + ... + + def get_soft_start_state(self, dev): # -> Any: + ... + + def set_soft_start_state(self, dev, enabled): # -> Any: + ... + + def get_soft_start_value(self, dev): # -> c_int: + ... + + def set_soft_start_value(self, dev, value): # -> Any: + ... + + def set_retry_threshold(self, threshold): # -> None: + """ + If we miss any axis by too much, try again. + + Parameters + ---------- + threshold : float + Maximum allowable error in µm. + """ + + def recv_all(self): # -> None: + """Receive all queued position/status update packets and update any pending moves. + """ + + def track_device_ids(self, *dev_ids): # -> None: + ... + + def get_firmware_version(self, dev_id): # -> tuple[Any, ...]: + """Return the firmware version installed on a device. + """ + + def ping_device(self, dev_id): # -> None: + """Ping a device. + + Returns after ping is received, or raises an exception on timeout. + """ + + + +class SensapexDevice: + """UM wrapper for accessing a single sensapex device. + + Example: + + dev = SensapexDevice(1) # get handle to manipulator 1 + pos = dev.get_pos() + pos[0] += 10000 # add 10 um to x axis + dev.goto_pos(pos, speed=10) + """ + def __init__(self, dev_id: int, callback=..., n_axes=..., max_acceleration=..., is_stage=...) -> None: + ... + + def set_n_axes(self, n_axes): # -> None: + ... + + def n_axes(self): # -> Any | Literal[4]: + ... + + def set_max_acceleration(self, max_acceleration): # -> None: + ... + + def add_callback(self, callback): # -> None: + ... + + def get_pos(self, timeout=...): # -> list[float | int]: + ... + + def goto_pos(self, pos, speed, simultaneous=..., linear=..., max_acceleration=...): # -> MoveRequest: + ... + + @property + def is_stage(self): # -> Literal[False]: + ... + + def is_busy(self): # -> Any | Literal[False]: + ... + + def stop(self): # -> None: + ... + + def set_pressure(self, channel, value): # -> Any: + """ + Parameters + ---------- + channel : int + channel number + value : float + pressure in kPa + """ + + def get_pressure(self, channel): # -> float: + """ + Returns + ------- + float + expected pressure in kPa + """ + + def measure_pressure(self, channel): # -> float: + """ + Returns + ------- + float + actual pressure in kPa + """ + + def set_valve(self, channel, value): # -> Any: + ... + + def get_valve(self, channel): # -> Any: + ... + + def set_lens_position(self, pos, lift=..., dip=...): # -> Any: + ... + + def get_lens_position(self): # -> Any: + ... + + def set_custom_slow_speed(self, enabled): # -> Any: + ... + + def calibrate_zero_position(self): # -> None: + ... + + def calibrate_load(self): # -> None: + ... + + def calibrate_pressure(self, channel, delay=...): # -> None: + ... + + def set_led_enabled(self, on: bool): # -> None: + ... + + def get_soft_start_state(self): # -> Any: + ... + + def set_soft_start_state(self, enabled): # -> Any: + ... + + def get_soft_start_value(self): # -> int: + ... + + def set_soft_start_value(self, value): # -> Any: + ... + + + +class PollThread(threading.Thread): + """Thread to poll for all manipulator position changes. + + Running this thread ensures that calling get_pos will always return the most recent + values available. + + An optional callback function is called periodically with a list of + device IDs from which position updates have been received. + """ + def __init__(self, ump, interval=...) -> None: + ... + + def start(self): # -> None: + ... + + def stop(self): # -> None: + ... + + def add_callback(self, dev_id, callback): # -> None: + ... + + def remove_callback(self, dev_id, callback): # -> None: + ... + + def run(self): # -> None: + ... + + + diff --git a/typings/sensapex/test.pyi b/typings/sensapex/test.pyi new file mode 100644 index 0000000..d91d006 --- /dev/null +++ b/typings/sensapex/test.pyi @@ -0,0 +1,13 @@ +""" +This type stub file was generated by pyright. +""" + +parser = ... +args = ... +um = ... +devids = ... +devs = ... +def print_pos(timeout=...): # -> None: + ... + +t = ... diff --git a/typings/sensapex/utils.pyi b/typings/sensapex/utils.pyi new file mode 100644 index 0000000..716bf13 --- /dev/null +++ b/typings/sensapex/utils.pyi @@ -0,0 +1,8 @@ +""" +This type stub file was generated by pyright. +""" + +def bytes_str(s): # -> bytes: + ... + +packet_count_param = ... From a3edfe26d1c0701cc40d3176fcc3c023059ec811 Mon Sep 17 00:00:00 2001 From: Kenneth Yang Date: Sat, 4 Jan 2025 18:33:54 -0800 Subject: [PATCH 05/14] Don't do stubs, ignore unknown --- pyproject.toml | 3 +- src/ephys_link/bindings/ump_4_binding.py | 15 +- typings/sensapex/__init__.pyi | 6 - typings/sensapex/accuracy_test.pyi | 55 --- typings/sensapex/pressure_test.pyi | 21 -- typings/sensapex/sensapex.pyi | 459 ----------------------- typings/sensapex/test.pyi | 13 - typings/sensapex/utils.pyi | 8 - 8 files changed, 10 insertions(+), 570 deletions(-) delete mode 100644 typings/sensapex/__init__.pyi delete mode 100644 typings/sensapex/accuracy_test.pyi delete mode 100644 typings/sensapex/pressure_test.pyi delete mode 100644 typings/sensapex/sensapex.pyi delete mode 100644 typings/sensapex/test.pyi delete mode 100644 typings/sensapex/utils.pyi diff --git a/pyproject.toml b/pyproject.toml index a1c451b..cf5b3d4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -84,8 +84,9 @@ serve = "mkdocs serve" build = "mkdocs build" [tool.ruff] +exclude = ["typings"] unsafe-fixes = true [tool.basedpyright] include = ["src/ephys_link"] -strict = ["src/ephys_link"] +strict = ["src/ephys_link"] \ No newline at end of file diff --git a/src/ephys_link/bindings/ump_4_binding.py b/src/ephys_link/bindings/ump_4_binding.py index d1d16fe..cce1bb2 100644 --- a/src/ephys_link/bindings/ump_4_binding.py +++ b/src/ephys_link/bindings/ump_4_binding.py @@ -1,3 +1,4 @@ +# pyright: strict, reportMissingTypeStubs=false """Bindings for Sensapex uMp-4 platform. Usage: Instantiate Ump4Bindings to interact with the Sensapex uMp-4 platform. @@ -29,7 +30,7 @@ def __init__(self) -> None: # Establish connection to Sensapex API (exit if connection fails). UMP.set_library_path(RESOURCES_DIRECTORY) - self._ump = UMP.get_ump() + self._ump = UMP.get_ump() # pyright: ignore [reportUnknownMemberType] @staticmethod @override @@ -55,7 +56,7 @@ def get_dimensions(self) -> Vector4: @override async def get_position(self, manipulator_id: str) -> Vector4: - return um_to_mm(list_to_vector4(self._get_device(manipulator_id).get_pos(1))) + return um_to_mm(list_to_vector4(self._get_device(manipulator_id).get_pos(1))) # pyright: ignore [reportUnknownMemberType] @override async def get_angles(self, manipulator_id: str) -> NoReturn: @@ -87,7 +88,7 @@ async def set_position(self, manipulator_id: str, position: Vector4, speed: floa target_position_um = vector_mm_to_um(position) # Request movement. - movement = self._get_device(manipulator_id).goto_pos( + movement = self._get_device(manipulator_id).goto_pos( # pyright: ignore [reportUnknownMemberType] vector4_to_array(target_position_um), scalar_mm_to_um(speed) ) @@ -96,15 +97,15 @@ async def set_position(self, manipulator_id: str, position: Vector4, speed: floa # Handle interrupted movement. if movement.interrupted: - error_message = f"Manipulator {manipulator_id} interrupted: {movement.interrupt_reason}" + error_message = f"Manipulator {manipulator_id} interrupted: {movement.interrupt_reason}" # pyright: ignore [reportUnknownMemberType] raise RuntimeError(error_message) # Handle empty end position. - if not movement.last_pos: + if not movement.last_pos: # pyright: ignore [reportUnknownMemberType] error_message = f"Manipulator {manipulator_id} did not reach target position" raise RuntimeError(error_message) - return um_to_mm(list_to_vector4(movement.last_pos)) + return um_to_mm(list_to_vector4(movement.last_pos)) # pyright: ignore [reportArgumentType, reportUnknownMemberType] @override async def set_depth(self, manipulator_id: str, depth: float, speed: float) -> float: @@ -154,4 +155,4 @@ def unified_space_to_platform_space(self, unified_space: Vector4) -> Vector4: # Helper methods. def _get_device(self, manipulator_id: str) -> SensapexDevice: - return self._ump.get_device(int(manipulator_id)) + return self._ump.get_device(int(manipulator_id)) # pyright: ignore [reportUnknownMemberType] diff --git a/typings/sensapex/__init__.pyi b/typings/sensapex/__init__.pyi deleted file mode 100644 index 705c3cd..0000000 --- a/typings/sensapex/__init__.pyi +++ /dev/null @@ -1,6 +0,0 @@ -""" -This type stub file was generated by pyright. -""" - - -__version__ = ... diff --git a/typings/sensapex/accuracy_test.pyi b/typings/sensapex/accuracy_test.pyi deleted file mode 100644 index b8d1ac5..0000000 --- a/typings/sensapex/accuracy_test.pyi +++ /dev/null @@ -1,55 +0,0 @@ -""" -This type stub file was generated by pyright. -""" - -import sys - -import numpy as np - -parser = ... -args = ... -ump = ... -if args.debug: - ... -devids = ... -devs = ... -dev = ... -app = ... -win = ... -plots = ... -errplots = ... -if args.linear: - linerrplots = ... -start = ... -pos = ... -tgt = ... -err = ... -closest = ... -linear_err = ... -bus = ... -mov = ... -times = ... -lastupdate = ... -def update(moving=...): # -> None: - ... - -def update_plots(): # -> None: - ... - -start_pos = ... -diffs = ... -errs = ... -positions = ... -n_axes = ... -if args.test_pos is None: - moves = ... - move_axes = ... - targets = ... -else: - test_pos = np.array(list(map(float, args.test_pos.split(",")))) - targets = np.zeros((args.iter, 3)) -speeds = ... -if args.retry_threshold is not None: - ... -if sys.flags.interactive == 0: - ... diff --git a/typings/sensapex/pressure_test.pyi b/typings/sensapex/pressure_test.pyi deleted file mode 100644 index 416da2a..0000000 --- a/typings/sensapex/pressure_test.pyi +++ /dev/null @@ -1,21 +0,0 @@ -""" -This type stub file was generated by pyright. -""" - -import sys - -parser = ... -args = ... -ump = ... -if args.debug: - ... -devids = ... -devs = ... -dev = ... -app = ... -win = ... -plots = ... -err_plots = ... -err_curves = ... -if sys.flags.interactive == 0: - ... diff --git a/typings/sensapex/sensapex.pyi b/typings/sensapex/sensapex.pyi deleted file mode 100644 index 7359e5c..0000000 --- a/typings/sensapex/sensapex.pyi +++ /dev/null @@ -1,459 +0,0 @@ -""" -This type stub file was generated by pyright. -""" - -import platform -import subprocess -import sys -import threading -from ctypes import Structure - -if sys.platform == "win32": - DUMPCAP = ... -else: - DUMPCAP = ... -SOCKET = ... -if sys.platform == "win32" and platform.architecture()[0] == "64bit": - SOCKET = ... -LIBUM_MAX_MANIPULATORS = ... -LIBUM_MAX_LOG_LINE_LENGTH = ... -LIBUM_DEF_TIMEOUT = ... -LIBUM_DEF_BCAST_ADDRESS = ... -LIBUM_DEVICE_SUBNET = ... -LIBUM_DEF_GROUP = ... -LIBUM_MAX_MESSAGE_SIZE = ... -LIBUM_ARG_UNDEF = ... -X_AXIS = ... -Y_AXIS = ... -Z_AXIS = ... -D_AXIS = ... -LIBUM_NO_ERROR = ... -LIBUM_OS_ERROR = ... -LIBUM_NOT_OPEN = ... -LIBUM_TIMEOUT = ... -LIBUM_INVALID_ARG = ... -LIBUM_INVALID_DEV = ... -LIBUM_INVALID_RESP = ... -class sockaddr_in(Structure): - _fields_ = ... - - -log_func_ptr = ... -class um_positions(Structure): - _fields_ = ... - - -class um_state(Structure): - _fields_ = ... - - -class MoveRequest: - """Class for coordinating and tracking moves. - """ - max_attempts = ... - def __init__(self, ump, dev, dest, speed, simultaneous=..., linear=..., max_acceleration=..., retry_threshold=...) -> None: - ... - - def interrupt(self, reason): # -> None: - ... - - def finish(self): # -> None: - ... - - def start(self): # -> None: - ... - - def is_in_progress(self): - ... - - def can_retry(self): # -> bool: - ... - - def is_close_enough(self): # -> numpy.bool[builtins.bool]: - ... - - def has_more_calls_to_make(self): # -> bool: - ... - - def make_next_call(self): # -> None: - ... - - - -class UMError(Exception): - def __init__(self, msg, errno, oserrno) -> None: - ... - - - -_timer_offset = ... -def timer(): # -> float: - ... - -class UMP: - """Wrapper for the Sensapex uMp API. - - All calls except get_ump are thread-safe. - """ - _pcap_proc: subprocess.Popen | None - _last_move: dict[int, MoveRequest] - _lib = ... - _lib_path = ... - _single = ... - _um_state = ... - _debug_at_cls = ... - _default_group = ... - _default_address = ... - @classmethod - def set_library_path(cls, path: str): # -> None: - ... - - @classmethod - def set_default_address(cls, address): # -> None: - ... - - @classmethod - def set_default_group(cls, group): # -> None: - ... - - @classmethod - def get_lib(cls): # -> WinDLL | CDLL: - ... - - @classmethod - def load_lib(cls): # -> WinDLL | CDLL: - ... - - @classmethod - def get_um_state_class(cls): # -> type[um_state]: - ... - - @classmethod - def get_ump(cls, address=..., group=..., start_poller=...) -> UMP: - """Return a singleton UM instance. - """ - - def __init__(self, address, group, start_poller=...) -> None: - ... - - def set_timeout(self, value: int): # -> None: - ... - - @classmethod - def set_debug_mode(cls, enabled: bool) -> None: - ... - - def create_debug_archive(self) -> str: - """Zip up the debug log and all pcap files for distribution to Sensapex.""" - - def get_device(self, dev_id, *args, **kwargs) -> SensapexDevice: - """ - - Returns - ------- - SensapexDevice - """ - - def sdk_version(self): # -> Any: - """Return version of UM SDK. - """ - - def list_devices(self, max_id=...): # -> list[Any]: - """Return a list of all connected device IDs. - """ - - def axis_count(self, dev): # -> Any | Literal[4]: - ... - - def set_axis_count(self, dev, count): # -> None: - ... - - def call(self, fn, *args, retries: int | None = ...): # -> Any: - ... - - def set_max_acceleration(self, dev, max_acc): # -> None: - ... - - def open(self, address, group): # -> None: - """Open the UM devices at the given address. - - The default address "169.254.255.255" should suffice in most situations. - """ - - def close(self): # -> None: - """Close the UM device. - """ - - @staticmethod - def is_positionable(dev_id): - ... - - def get_pos(self, dev, timeout=...): # -> list[float | int]: - """Return the absolute position of the specified device (in um). - - If *timeout* == 0, then the position is returned directly from cache - and not queried from the device. - """ - - def goto_pos(self, dev, dest, speed, simultaneous=..., linear=..., max_acceleration=...): # -> MoveRequest: - """Request the specified device to move to an absolute position (in um). - - Parameters - ---------- - dev : int - ID of device to move - dest : array-like - X,Y,Z,W coordinates to move to. Values may be NaN or omitted to leave - the axis unaffected. - speed : float - Manipulator speed in um/sec - simultaneous: bool - If True, then all axes begin moving at the same time - linear : bool - If True, then axis speeds are scaled to produce more linear movement, requires simultaneous - max_acceleration : int - Maximum acceleration in um/s^2 - - Returns - ------- - move_request : MoveRequest - Object that can be used to retrieve the status of this move at a later time. - """ - - def is_busy(self, dev): # -> Any | Literal[False]: - """Return True if the specified device is currently moving. - - Note: this should not be used to determine whether a move has completed; - use MoveRequest.finished or .finished_event as returned from goto_pos(). - """ - - def stop(self, dev): # -> None: - """Stop the specified manipulator. - """ - - def set_pressure(self, dev, channel, value): # -> Any: - ... - - def get_pressure(self, dev, channel): # -> float: - ... - - def measure_pressure(self, dev, channel): # -> float: - ... - - def set_valve(self, dev, channel, value): # -> Any: - ... - - def get_valve(self, dev, channel): # -> Any: - ... - - def set_custom_slow_speed(self, dev, enabled): # -> Any: - ... - - def get_custom_slow_speed(self, dev): # -> Any: - ... - - def get_um_param(self, dev, param): # -> c_int: - ... - - def set_um_param(self, dev, param, value): # -> Any: - ... - - def run_um_cmd(self, dev_id, cmd, *args): # -> None: - ... - - def restart_device(self, dev_id): # -> None: - ... - - def set_device_group(self, dev_id, group): # -> None: - ... - - def calibrate_zero_position(self, dev): # -> None: - ... - - def calibrate_load(self, dev): # -> None: - ... - - def calibrate_pressure(self, dev, channel, delay): # -> None: - ... - - def led_control(self, dev, off): # -> None: - ... - - def get_soft_start_state(self, dev): # -> Any: - ... - - def set_soft_start_state(self, dev, enabled): # -> Any: - ... - - def get_soft_start_value(self, dev): # -> c_int: - ... - - def set_soft_start_value(self, dev, value): # -> Any: - ... - - def set_retry_threshold(self, threshold): # -> None: - """ - If we miss any axis by too much, try again. - - Parameters - ---------- - threshold : float - Maximum allowable error in µm. - """ - - def recv_all(self): # -> None: - """Receive all queued position/status update packets and update any pending moves. - """ - - def track_device_ids(self, *dev_ids): # -> None: - ... - - def get_firmware_version(self, dev_id): # -> tuple[Any, ...]: - """Return the firmware version installed on a device. - """ - - def ping_device(self, dev_id): # -> None: - """Ping a device. - - Returns after ping is received, or raises an exception on timeout. - """ - - - -class SensapexDevice: - """UM wrapper for accessing a single sensapex device. - - Example: - - dev = SensapexDevice(1) # get handle to manipulator 1 - pos = dev.get_pos() - pos[0] += 10000 # add 10 um to x axis - dev.goto_pos(pos, speed=10) - """ - def __init__(self, dev_id: int, callback=..., n_axes=..., max_acceleration=..., is_stage=...) -> None: - ... - - def set_n_axes(self, n_axes): # -> None: - ... - - def n_axes(self): # -> Any | Literal[4]: - ... - - def set_max_acceleration(self, max_acceleration): # -> None: - ... - - def add_callback(self, callback): # -> None: - ... - - def get_pos(self, timeout=...): # -> list[float | int]: - ... - - def goto_pos(self, pos, speed, simultaneous=..., linear=..., max_acceleration=...): # -> MoveRequest: - ... - - @property - def is_stage(self): # -> Literal[False]: - ... - - def is_busy(self): # -> Any | Literal[False]: - ... - - def stop(self): # -> None: - ... - - def set_pressure(self, channel, value): # -> Any: - """ - Parameters - ---------- - channel : int - channel number - value : float - pressure in kPa - """ - - def get_pressure(self, channel): # -> float: - """ - Returns - ------- - float - expected pressure in kPa - """ - - def measure_pressure(self, channel): # -> float: - """ - Returns - ------- - float - actual pressure in kPa - """ - - def set_valve(self, channel, value): # -> Any: - ... - - def get_valve(self, channel): # -> Any: - ... - - def set_lens_position(self, pos, lift=..., dip=...): # -> Any: - ... - - def get_lens_position(self): # -> Any: - ... - - def set_custom_slow_speed(self, enabled): # -> Any: - ... - - def calibrate_zero_position(self): # -> None: - ... - - def calibrate_load(self): # -> None: - ... - - def calibrate_pressure(self, channel, delay=...): # -> None: - ... - - def set_led_enabled(self, on: bool): # -> None: - ... - - def get_soft_start_state(self): # -> Any: - ... - - def set_soft_start_state(self, enabled): # -> Any: - ... - - def get_soft_start_value(self): # -> int: - ... - - def set_soft_start_value(self, value): # -> Any: - ... - - - -class PollThread(threading.Thread): - """Thread to poll for all manipulator position changes. - - Running this thread ensures that calling get_pos will always return the most recent - values available. - - An optional callback function is called periodically with a list of - device IDs from which position updates have been received. - """ - def __init__(self, ump, interval=...) -> None: - ... - - def start(self): # -> None: - ... - - def stop(self): # -> None: - ... - - def add_callback(self, dev_id, callback): # -> None: - ... - - def remove_callback(self, dev_id, callback): # -> None: - ... - - def run(self): # -> None: - ... - - - diff --git a/typings/sensapex/test.pyi b/typings/sensapex/test.pyi deleted file mode 100644 index d91d006..0000000 --- a/typings/sensapex/test.pyi +++ /dev/null @@ -1,13 +0,0 @@ -""" -This type stub file was generated by pyright. -""" - -parser = ... -args = ... -um = ... -devids = ... -devs = ... -def print_pos(timeout=...): # -> None: - ... - -t = ... diff --git a/typings/sensapex/utils.pyi b/typings/sensapex/utils.pyi deleted file mode 100644 index 716bf13..0000000 --- a/typings/sensapex/utils.pyi +++ /dev/null @@ -1,8 +0,0 @@ -""" -This type stub file was generated by pyright. -""" - -def bytes_str(s): # -> bytes: - ... - -packet_count_param = ... From e74a6c2a4c725f3fc7b836fc01096103f9faf81c Mon Sep 17 00:00:00 2001 From: Kenneth Yang Date: Sat, 4 Jan 2025 19:30:35 -0800 Subject: [PATCH 06/14] Allow Any for MPM --- src/ephys_link/bindings/fake_binding.py | 4 +-- src/ephys_link/bindings/mpm_binding.py | 34 ++++++++++++++++++++----- 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/src/ephys_link/bindings/fake_binding.py b/src/ephys_link/bindings/fake_binding.py index 2df2328..99a3797 100644 --- a/src/ephys_link/bindings/fake_binding.py +++ b/src/ephys_link/bindings/fake_binding.py @@ -61,7 +61,7 @@ async def stop(self, _: str) -> None: pass def platform_space_to_unified_space(self, platform_space: Vector4) -> Vector4: - pass + return platform_space def unified_space_to_platform_space(self, unified_space: Vector4) -> Vector4: - pass + return unified_space diff --git a/src/ephys_link/bindings/mpm_binding.py b/src/ephys_link/bindings/mpm_binding.py index 75be694..e3efdbf 100644 --- a/src/ephys_link/bindings/mpm_binding.py +++ b/src/ephys_link/bindings/mpm_binding.py @@ -1,3 +1,4 @@ +# pyright: strict, reportAny=false """Bindings for New Scale Pathfinder MPM HTTP server platform. MPM works slightly differently than the other platforms since it operates in stereotactic coordinates. @@ -8,7 +9,7 @@ from asyncio import get_running_loop, sleep from json import dumps -from typing import Any +from typing import Any, final, override from requests import JSONDecodeError, get, put from vbl_aquarium.models.unity import Vector3, Vector4 @@ -17,6 +18,7 @@ from ephys_link.utils.common import scalar_mm_to_um, vector4_to_array +@final class MPMBinding(BaseBinding): """Bindings for New Scale Pathfinder MPM HTTP server platform.""" @@ -82,22 +84,28 @@ def __init__(self, port: int = 8080) -> None: self._movement_stopped = False @staticmethod + @override def get_display_name() -> str: return "Pathfinder MPM Control v2.8.8+" @staticmethod + @override def get_cli_name() -> str: return "pathfinder-mpm" + @override async def get_manipulators(self) -> list[str]: return [manipulator["Id"] for manipulator in (await self._query_data())["ProbeArray"]] + @override async def get_axes_count(self) -> int: return 3 + @override def get_dimensions(self) -> Vector4: return Vector4(x=15, y=15, z=15, w=15) + @override async def get_position(self, manipulator_id: str) -> Vector4: manipulator_data = await self._manipulator_data(manipulator_id) stage_z = manipulator_data["Stage_Z"] @@ -111,6 +119,7 @@ async def get_position(self, manipulator_id: str) -> Vector4: w=stage_z, ) + @override async def get_angles(self, manipulator_id: str) -> Vector3: manipulator_data = await self._manipulator_data(manipulator_id) @@ -123,12 +132,15 @@ async def get_angles(self, manipulator_id: str) -> Vector3: z=manipulator_data["ShankOrientation"], ) + @override async def get_shank_count(self, manipulator_id: str) -> int: return int((await self._manipulator_data(manipulator_id))["ShankCount"]) + @override def get_movement_tolerance(self) -> float: return 0.01 + @override async def set_position(self, manipulator_id: str, position: Vector4, speed: float) -> Vector4: # Keep track of the previous position to check if the manipulator stopped advancing. current_position = await self.get_position(manipulator_id) @@ -185,6 +197,7 @@ async def set_position(self, manipulator_id: str, position: Vector4, speed: floa # Return the final position. return await self.get_position(manipulator_id) + @override async def set_depth(self, manipulator_id: str, depth: float, speed: float) -> float: # Keep track of the previous depth to check if the manipulator stopped advancing unexpectedly. current_depth = (await self.get_position(manipulator_id)).w @@ -225,11 +238,16 @@ async def set_depth(self, manipulator_id: str, depth: float, speed: float) -> fl # Return the final depth. return float((await self.get_position(manipulator_id)).w) + @override async def stop(self, manipulator_id: str) -> None: - request = {"PutId": "ProbeStop", "Probe": self.VALID_MANIPULATOR_IDS.index(manipulator_id)} + request: dict[str, str | int | float] = { + "PutId": "ProbeStop", + "Probe": self.VALID_MANIPULATOR_IDS.index(manipulator_id), + } await self._put_request(request) self._movement_stopped = True + @override def platform_space_to_unified_space(self, platform_space: Vector4) -> Vector4: # unified <- platform # +x <- -x @@ -244,6 +262,7 @@ def platform_space_to_unified_space(self, platform_space: Vector4) -> Vector4: w=self.get_dimensions().w - platform_space.w, ) + @override def unified_space_to_platform_space(self, unified_space: Vector4) -> Vector4: # platform <- unified # +x <- -x @@ -259,8 +278,9 @@ def unified_space_to_platform_space(self, unified_space: Vector4) -> Vector4: ) # Helper functions. - async def _query_data(self) -> Any: + async def _query_data(self) -> dict[str, Any]: # pyright: ignore [reportExplicitAny] try: + # noinspection PyTypeChecker return (await get_running_loop().run_in_executor(None, get, self._url)).json() except ConnectionError as connectionError: error_message = f"Unable to connect to MPM HTTP server: {connectionError}" @@ -269,8 +289,8 @@ async def _query_data(self) -> Any: error_message = f"Unable to decode JSON response from MPM HTTP server: {jsonDecodeError}" raise ValueError(error_message) from jsonDecodeError - async def _manipulator_data(self, manipulator_id: str) -> Any: - probe_data = (await self._query_data())["ProbeArray"] + async def _manipulator_data(self, manipulator_id: str) -> dict[str, Any]: # pyright: ignore [reportExplicitAny] + probe_data: list[dict[str, Any]] = (await self._query_data())["ProbeArray"] # pyright: ignore [reportExplicitAny] for probe in probe_data: if probe["Id"] == manipulator_id: return probe @@ -279,8 +299,8 @@ async def _manipulator_data(self, manipulator_id: str) -> Any: error_message = f"Manipulator {manipulator_id} not found." raise ValueError(error_message) - async def _put_request(self, request: dict[str, Any]) -> None: - await get_running_loop().run_in_executor(None, put, self._url, dumps(request)) + async def _put_request(self, request: dict[str, Any]) -> None: # pyright: ignore [reportExplicitAny] + _ = await get_running_loop().run_in_executor(None, put, self._url, dumps(request)) def _is_vector_close(self, target: Vector4, current: Vector4) -> bool: return all(abs(axis) <= self.get_movement_tolerance() for axis in vector4_to_array(target - current)[:3]) From 625c8824e5f149cc7dd8cc69d0ba14ff817660e7 Mon Sep 17 00:00:00 2001 From: Kenneth Yang Date: Sat, 4 Jan 2025 19:33:09 -0800 Subject: [PATCH 07/14] Fixed fake --- src/ephys_link/bindings/fake_binding.py | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/src/ephys_link/bindings/fake_binding.py b/src/ephys_link/bindings/fake_binding.py index 99a3797..63f7975 100644 --- a/src/ephys_link/bindings/fake_binding.py +++ b/src/ephys_link/bindings/fake_binding.py @@ -1,9 +1,12 @@ +from typing import final, override + from vbl_aquarium.models.unity import Vector3, Vector4 from ephys_link.utils.base_binding import BaseBinding from ephys_link.utils.common import list_to_vector4 +@final class FakeBinding(BaseBinding): def __init__(self) -> None: """Initialize fake manipulator infos.""" @@ -21,47 +24,61 @@ def __init__(self) -> None: ] @staticmethod + @override def get_display_name() -> str: return "Fake Manipulator" @staticmethod + @override def get_cli_name() -> str: return "fake" + @override async def get_manipulators(self) -> list[str]: return list(map(str, range(8))) + @override async def get_axes_count(self) -> int: return 4 + @override def get_dimensions(self) -> Vector4: return list_to_vector4([20] * 4) + @override async def get_position(self, manipulator_id: str) -> Vector4: return self._positions[int(manipulator_id)] + @override async def get_angles(self, manipulator_id: str) -> Vector3: return self._angles[int(manipulator_id)] - async def get_shank_count(self, _: str) -> int: + @override + async def get_shank_count(self, manipulator_id: str) -> int: return 1 + @override def get_movement_tolerance(self) -> float: return 0.001 - async def set_position(self, manipulator_id: str, position: Vector4, _: float) -> Vector4: + @override + async def set_position(self, manipulator_id: str, position: Vector4, speed: float) -> Vector4: self._positions[int(manipulator_id)] = position return position - async def set_depth(self, manipulator_id: str, depth: float, _: float) -> float: + @override + async def set_depth(self, manipulator_id: str, depth: float, speed: float) -> float: self._positions[int(manipulator_id)].w = depth return depth - async def stop(self, _: str) -> None: + @override + async def stop(self, manipulator_id: str) -> None: pass + @override def platform_space_to_unified_space(self, platform_space: Vector4) -> Vector4: return platform_space + @override def unified_space_to_platform_space(self, unified_space: Vector4) -> Vector4: return unified_space From 54aa7e3cae3060ad494ef0db3fb4940b3f4c591f Mon Sep 17 00:00:00 2001 From: Kenneth Yang Date: Sat, 4 Jan 2025 19:55:59 -0800 Subject: [PATCH 08/14] Server --- src/ephys_link/back_end/server.py | 35 +++++++++++++++++++++++-------- 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/src/ephys_link/back_end/server.py b/src/ephys_link/back_end/server.py index 76dd739..067e327 100644 --- a/src/ephys_link/back_end/server.py +++ b/src/ephys_link/back_end/server.py @@ -1,3 +1,4 @@ +# pyright: strict, reportExplicitAny=false, reportMissingTypeStubs=false """Socket.IO Server. Responsible to managing the Socket.IO connection and events. @@ -15,7 +16,7 @@ from asyncio import get_event_loop, run from collections.abc import Callable, Coroutine from json import JSONDecodeError, dumps, loads -from typing import Any +from typing import Any, TypeVar, final from uuid import uuid4 from aiohttp.web import Application, run_app @@ -35,7 +36,12 @@ from ephys_link.utils.common import PORT, check_for_updates, server_preamble from ephys_link.utils.console import Console +# Server message generic types. +INPUT_TYPE = TypeVar("INPUT_TYPE", bound=VBLBaseModel) +OUTPUT_TYPE = TypeVar("OUTPUT_TYPE", bound=VBLBaseModel) + +@final class Server: def __init__(self, options: EphysLinkOptions, platform_handler: PlatformHandler, console: Console) -> None: """Initialize server fields based on options and platform handler. @@ -54,12 +60,18 @@ def __init__(self, options: EphysLinkOptions, platform_handler: PlatformHandler, # Initialize based on proxy usage. self._sio: AsyncServer | AsyncClient = AsyncClient() if self._options.use_proxy else AsyncServer() if not self._options.use_proxy: + # Exit if _sio is not a Server. + if not isinstance(self._sio, AsyncServer): + error = "Server not initialized." + self._console.critical_print(error) + raise TypeError(error) + self._app = Application() - self._sio.attach(self._app) + self._sio.attach(self._app) # pyright: ignore [reportUnknownMemberType] # Bind connection events. - self._sio.on("connect", self.connect) - self._sio.on("disconnect", self.disconnect) + _ = self._sio.on("connect", self.connect) # pyright: ignore [reportUnknownMemberType, reportUnknownVariableType] + _ = self._sio.on("disconnect", self.disconnect) # pyright: ignore [reportUnknownMemberType, reportUnknownVariableType] # Store connected client. self._client_sid: str = "" @@ -68,7 +80,7 @@ def __init__(self, options: EphysLinkOptions, platform_handler: PlatformHandler, self._pinpoint_id = str(uuid4())[:8] # Bind events. - self._sio.on("*", self.platform_event_handler) + _ = self._sio.on("*", self.platform_event_handler) # pyright: ignore [reportUnknownMemberType, reportUnknownVariableType] def launch(self) -> None: """Launch the server. @@ -94,8 +106,14 @@ def launch(self) -> None: self._console.info_print("PINPOINT ID", self._pinpoint_id) async def connect_proxy() -> None: + # Exit if _sio is not a proxy client. + if not isinstance(self._sio, AsyncClient): + error = "Proxy client not initialized." + self._console.critical_print(error) + raise TypeError(error) + # noinspection HttpUrlsUsage - await self._sio.connect(f"http://{self._options.proxy_address}:{PORT}") + await self._sio.connect(f"http://{self._options.proxy_address}:{PORT}") # pyright: ignore [reportUnknownMemberType] await self._sio.wait() run(connect_proxy()) @@ -136,8 +154,8 @@ async def _run_if_data_available( async def _run_if_data_parses( self, - function: Callable[[VBLBaseModel], Coroutine[Any, Any, VBLBaseModel]], - data_type: type[VBLBaseModel], + function: Callable[[INPUT_TYPE], Coroutine[Any, Any, OUTPUT_TYPE]], + data_type: type[INPUT_TYPE], event: str, data: tuple[tuple[Any], ...], ) -> str: @@ -203,7 +221,6 @@ async def disconnect(self, sid: str) -> None: else: self._console.error_print("DISCONNECTION", f"Client {sid} disconnected without being connected.") - # noinspection PyTypeChecker async def platform_event_handler(self, event: str, *args: tuple[Any]) -> str: """Handle events from the server. From 11e89e2c5bb208e35ad41e9521db3a8c1ce7b35b Mon Sep 17 00:00:00 2001 From: Kenneth Yang Date: Sat, 4 Jan 2025 19:56:25 -0800 Subject: [PATCH 09/14] Platform handler --- src/ephys_link/back_end/platform_handler.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/ephys_link/back_end/platform_handler.py b/src/ephys_link/back_end/platform_handler.py index 4b811f2..4e136cb 100644 --- a/src/ephys_link/back_end/platform_handler.py +++ b/src/ephys_link/back_end/platform_handler.py @@ -8,6 +8,7 @@ Instantiate PlatformHandler with the platform type and call the desired command. """ +from typing import final from uuid import uuid4 from vbl_aquarium.models.ephys_link import ( @@ -31,6 +32,7 @@ from ephys_link.utils.console import Console +@final class PlatformHandler: """Handler for platform commands.""" From 5ae48a73843afd24fada16ef8ba7ecd7f51fb326 Mon Sep 17 00:00:00 2001 From: Kenneth Yang Date: Sat, 4 Jan 2025 20:08:29 -0800 Subject: [PATCH 10/14] Add static analysis to docs --- docs/development/adding_a_manipulator.md | 7 +++++++ docs/development/code_organization.md | 13 +++++++++++++ 2 files changed, 20 insertions(+) diff --git a/docs/development/adding_a_manipulator.md b/docs/development/adding_a_manipulator.md index a7038c9..20440ef 100644 --- a/docs/development/adding_a_manipulator.md +++ b/docs/development/adding_a_manipulator.md @@ -9,6 +9,7 @@ read [how the system works first](../home/how_it_works.md) before proceeding. 1. Fork the [Ephys Link repository](https://github.com/VirtualBrainLab/ephys-link). 2. Follow the instructions for [installing Ephys Link for development](index.md#installing-for-development) to get all the necessary dependencies and tools set up. In this case, you'll want to clone your fork. +3. (Optional) Familiarize yourself with the [repo's organization](code_organization.md). ## Create a Manipulator Binding @@ -62,6 +63,12 @@ Use [New Scale Pathfinder MPM's binding][ephys_link.bindings.mpm_binding] as an Once you've implemented your binding, you can test it by running Ephys Link using your binding `ephys_link -b -t `. You can interact with it using the [Socket.IO API](socketio_api.md) or Pinpoint. +## Code standards + +We use automatic static analyzers to check code quality. See +the [corresponding section in the code organization documentation](code_organization.md#static-analysis) for more +information. + ## Submit Your Changes When you're satisfied with your changes, submit a pull request to the main repository. We will review your changes and diff --git a/docs/development/code_organization.md b/docs/development/code_organization.md index 22b6dee..bb4a7c6 100644 --- a/docs/development/code_organization.md +++ b/docs/development/code_organization.md @@ -25,3 +25,16 @@ to return responses to the clients when ready. [`PlatformHandler`][ephys_link.back_end.platform_handler] is responsible for converting between the server API and the manipulator binding API. Because of this module, you don't have to worry about the details of the server API when writing a manipulator binding. + +## Static Analysis + +The project is strictly type checked using [`hatch fmt` (ruff)](https://hatch.pypa.io/1.9/config/static-analysis/) +and [basedpyright](https://docs.basedpyright.com/latest/). All PR's are checked against these tools. + +While they are very helpful in enforcing good code, they can be annoying when working with libraries that inherently +return `Any` (like HTTP requests) or are not strictly statically typed. In those situations we have added inline +comments to ignore specific checks. We try to only use this in scenarios where missing typing information came from +external sources and were out of developer control. We also do not make stubs. + +We encourage using the type checker as a tool to help strengthen your code and only apply inline comments to ignore +specific instances where external libraries cause errors. Do not use file-wide ignores. \ No newline at end of file From 3801ee94b44277a1792ac464bf9cb04b5c285415 Mon Sep 17 00:00:00 2001 From: Kenneth Yang Date: Sat, 4 Jan 2025 20:31:43 -0800 Subject: [PATCH 11/14] Remove most file ignores --- docs/development/code_organization.md | 6 ++++-- src/ephys_link/back_end/platform_handler.py | 19 +++++++++---------- src/ephys_link/back_end/server.py | 16 +++++++++------- src/ephys_link/bindings/mpm_binding.py | 15 +++++++-------- src/ephys_link/bindings/ump_4_binding.py | 3 +-- 5 files changed, 30 insertions(+), 29 deletions(-) diff --git a/docs/development/code_organization.md b/docs/development/code_organization.md index bb4a7c6..4647867 100644 --- a/docs/development/code_organization.md +++ b/docs/development/code_organization.md @@ -34,7 +34,9 @@ and [basedpyright](https://docs.basedpyright.com/latest/). All PR's are checked While they are very helpful in enforcing good code, they can be annoying when working with libraries that inherently return `Any` (like HTTP requests) or are not strictly statically typed. In those situations we have added inline comments to ignore specific checks. We try to only use this in scenarios where missing typing information came from -external sources and were out of developer control. We also do not make stubs. +external sources, and it is not possible to make local type hints. We also do not make stubs since they would be +challenging to maintain. We encourage using the type checker as a tool to help strengthen your code and only apply inline comments to ignore -specific instances where external libraries cause errors. Do not use file-wide ignores. \ No newline at end of file +specific instances where external libraries cause errors. Do not use file-wide ignores unless absolutely necessary (for +example, `common.py` bypasses ruff's "no `print`" check to print the server's startup). \ No newline at end of file diff --git a/src/ephys_link/back_end/platform_handler.py b/src/ephys_link/back_end/platform_handler.py index 4e136cb..7c38304 100644 --- a/src/ephys_link/back_end/platform_handler.py +++ b/src/ephys_link/back_end/platform_handler.py @@ -1,4 +1,3 @@ -# ruff: noqa: BLE001 """Manipulator platform handler. Responsible for performing the various manipulator commands. @@ -119,7 +118,7 @@ async def get_manipulators(self) -> GetManipulatorsResponse: """ try: manipulators = await self._bindings.get_manipulators() - except Exception as e: + except Exception as e: # noqa: BLE001 self._console.exception_error_print("Get Manipulators", e) return GetManipulatorsResponse(error=self._console.pretty_exception(e)) else: @@ -138,7 +137,7 @@ async def get_position(self, manipulator_id: str) -> PositionalResponse: unified_position = self._bindings.platform_space_to_unified_space( await self._bindings.get_position(manipulator_id) ) - except Exception as e: + except Exception as e: # noqa: BLE001 self._console.exception_error_print("Get Position", e) return PositionalResponse(error=str(e)) else: @@ -155,7 +154,7 @@ async def get_angles(self, manipulator_id: str) -> AngularResponse: """ try: angles = await self._bindings.get_angles(manipulator_id) - except Exception as e: + except Exception as e: # noqa: BLE001 self._console.exception_error_print("Get Angles", e) return AngularResponse(error=self._console.pretty_exception(e)) else: @@ -172,7 +171,7 @@ async def get_shank_count(self, manipulator_id: str) -> ShankCountResponse: """ try: shank_count = await self._bindings.get_shank_count(manipulator_id) - except Exception as e: + except Exception as e: # noqa: BLE001 self._console.exception_error_print("Get Shank Count", e) return ShankCountResponse(error=self._console.pretty_exception(e)) else: @@ -217,7 +216,7 @@ async def set_position(self, request: SetPositionRequest) -> PositionalResponse: ) self._console.error_print("Set Position", error_message) return PositionalResponse(error=error_message) - except Exception as e: + except Exception as e: # noqa: BLE001 self._console.exception_error_print("Set Position", e) return PositionalResponse(error=self._console.pretty_exception(e)) else: @@ -249,7 +248,7 @@ async def set_depth(self, request: SetDepthRequest) -> SetDepthResponse: ) self._console.error_print("Set Depth", error_message) return SetDepthResponse(error=error_message) - except Exception as e: + except Exception as e: # noqa: BLE001 self._console.exception_error_print("Set Depth", e) return SetDepthResponse(error=self._console.pretty_exception(e)) else: @@ -271,7 +270,7 @@ async def set_inside_brain(self, request: SetInsideBrainRequest) -> BooleanState self._inside_brain.add(request.manipulator_id) else: self._inside_brain.discard(request.manipulator_id) - except Exception as e: + except Exception as e: # noqa: BLE001 self._console.exception_error_print("Set Inside Brain", e) return BooleanStateResponse(error=self._console.pretty_exception(e)) else: @@ -288,7 +287,7 @@ async def stop(self, manipulator_id: str) -> str: """ try: await self._bindings.stop(manipulator_id) - except Exception as e: + except Exception as e: # noqa: BLE001 self._console.exception_error_print("Stop", e) return self._console.pretty_exception(e) else: @@ -303,7 +302,7 @@ async def stop_all(self) -> str: try: for manipulator_id in await self._bindings.get_manipulators(): await self._bindings.stop(manipulator_id) - except Exception as e: + except Exception as e: # noqa: BLE001 self._console.exception_error_print("Stop", e) return self._console.pretty_exception(e) else: diff --git a/src/ephys_link/back_end/server.py b/src/ephys_link/back_end/server.py index 067e327..67bef44 100644 --- a/src/ephys_link/back_end/server.py +++ b/src/ephys_link/back_end/server.py @@ -1,4 +1,3 @@ -# pyright: strict, reportExplicitAny=false, reportMissingTypeStubs=false """Socket.IO Server. Responsible to managing the Socket.IO connection and events. @@ -21,7 +20,7 @@ from aiohttp.web import Application, run_app from pydantic import ValidationError -from socketio import AsyncClient, AsyncServer +from socketio import AsyncClient, AsyncServer # pyright: ignore [reportMissingTypeStubs] from vbl_aquarium.models.ephys_link import ( EphysLinkOptions, SetDepthRequest, @@ -121,7 +120,7 @@ async def connect_proxy() -> None: run_app(self._app, port=PORT) # Helper functions. - def _malformed_request_response(self, request: str, data: tuple[tuple[Any], ...]) -> str: + def _malformed_request_response(self, request: str, data: tuple[tuple[Any], ...]) -> str: # pyright: ignore [reportExplicitAny] """Return a response for a malformed request. Args: @@ -135,7 +134,10 @@ def _malformed_request_response(self, request: str, data: tuple[tuple[Any], ...] return dumps({"error": "Malformed request."}) async def _run_if_data_available( - self, function: Callable[[str], Coroutine[Any, Any, VBLBaseModel]], event: str, data: tuple[tuple[Any], ...] + self, + function: Callable[[str], Coroutine[Any, Any, VBLBaseModel]], # pyright: ignore [reportExplicitAny] + event: str, + data: tuple[tuple[Any], ...], # pyright: ignore [reportExplicitAny] ) -> str: """Run a function if data is available. @@ -154,10 +156,10 @@ async def _run_if_data_available( async def _run_if_data_parses( self, - function: Callable[[INPUT_TYPE], Coroutine[Any, Any, OUTPUT_TYPE]], + function: Callable[[INPUT_TYPE], Coroutine[Any, Any, OUTPUT_TYPE]], # pyright: ignore [reportExplicitAny] data_type: type[INPUT_TYPE], event: str, - data: tuple[tuple[Any], ...], + data: tuple[tuple[Any], ...], # pyright: ignore [reportExplicitAny] ) -> str: """Run a function if data parses. @@ -221,7 +223,7 @@ async def disconnect(self, sid: str) -> None: else: self._console.error_print("DISCONNECTION", f"Client {sid} disconnected without being connected.") - async def platform_event_handler(self, event: str, *args: tuple[Any]) -> str: + async def platform_event_handler(self, event: str, *args: tuple[Any]) -> str: # pyright: ignore [reportExplicitAny] """Handle events from the server. Matches incoming events based on the Socket.IO API. diff --git a/src/ephys_link/bindings/mpm_binding.py b/src/ephys_link/bindings/mpm_binding.py index e3efdbf..bee0fac 100644 --- a/src/ephys_link/bindings/mpm_binding.py +++ b/src/ephys_link/bindings/mpm_binding.py @@ -1,4 +1,3 @@ -# pyright: strict, reportAny=false """Bindings for New Scale Pathfinder MPM HTTP server platform. MPM works slightly differently than the other platforms since it operates in stereotactic coordinates. @@ -95,7 +94,7 @@ def get_cli_name() -> str: @override async def get_manipulators(self) -> list[str]: - return [manipulator["Id"] for manipulator in (await self._query_data())["ProbeArray"]] + return [manipulator["Id"] for manipulator in (await self._query_data())["ProbeArray"]] # pyright: ignore [reportAny] @override async def get_axes_count(self) -> int: @@ -107,8 +106,8 @@ def get_dimensions(self) -> Vector4: @override async def get_position(self, manipulator_id: str) -> Vector4: - manipulator_data = await self._manipulator_data(manipulator_id) - stage_z = manipulator_data["Stage_Z"] + manipulator_data: dict[str, float] = await self._manipulator_data(manipulator_id) + stage_z: float = manipulator_data["Stage_Z"] await sleep(self.POLL_INTERVAL) # Wait for the stage to stabilize. @@ -121,10 +120,10 @@ async def get_position(self, manipulator_id: str) -> Vector4: @override async def get_angles(self, manipulator_id: str) -> Vector3: - manipulator_data = await self._manipulator_data(manipulator_id) + manipulator_data: dict[str, float] = await self._manipulator_data(manipulator_id) # Apply PosteriorAngle to Polar to get the correct angle. - adjusted_polar = manipulator_data["Polar"] - (await self._query_data())["PosteriorAngle"] + adjusted_polar: int = manipulator_data["Polar"] - (await self._query_data())["PosteriorAngle"] return Vector3( x=adjusted_polar if adjusted_polar > 0 else 360 + adjusted_polar, @@ -134,7 +133,7 @@ async def get_angles(self, manipulator_id: str) -> Vector3: @override async def get_shank_count(self, manipulator_id: str) -> int: - return int((await self._manipulator_data(manipulator_id))["ShankCount"]) + return int((await self._manipulator_data(manipulator_id))["ShankCount"]) # pyright: ignore [reportAny] @override def get_movement_tolerance(self) -> float: @@ -281,7 +280,7 @@ def unified_space_to_platform_space(self, unified_space: Vector4) -> Vector4: async def _query_data(self) -> dict[str, Any]: # pyright: ignore [reportExplicitAny] try: # noinspection PyTypeChecker - return (await get_running_loop().run_in_executor(None, get, self._url)).json() + return (await get_running_loop().run_in_executor(None, get, self._url)).json() # pyright: ignore [reportAny] except ConnectionError as connectionError: error_message = f"Unable to connect to MPM HTTP server: {connectionError}" raise RuntimeError(error_message) from connectionError diff --git a/src/ephys_link/bindings/ump_4_binding.py b/src/ephys_link/bindings/ump_4_binding.py index cce1bb2..2d73cb7 100644 --- a/src/ephys_link/bindings/ump_4_binding.py +++ b/src/ephys_link/bindings/ump_4_binding.py @@ -1,4 +1,3 @@ -# pyright: strict, reportMissingTypeStubs=false """Bindings for Sensapex uMp-4 platform. Usage: Instantiate Ump4Bindings to interact with the Sensapex uMp-4 platform. @@ -7,7 +6,7 @@ from asyncio import get_running_loop from typing import NoReturn, final, override -from sensapex import UMP, SensapexDevice +from sensapex import UMP, SensapexDevice # pyright: ignore [reportMissingTypeStubs] from vbl_aquarium.models.unity import Vector4 from ephys_link.utils.base_binding import BaseBinding From 5141345a05bd93349f767fa4866350dd75360fd8 Mon Sep 17 00:00:00 2001 From: Kenneth Yang Date: Sat, 4 Jan 2025 20:57:36 -0800 Subject: [PATCH 12/14] Remove all file ignores, split common --- docs/development/code_organization.md | 9 +- src/ephys_link/__main__.py | 21 +-- src/ephys_link/back_end/platform_handler.py | 3 +- src/ephys_link/back_end/server.py | 8 +- src/ephys_link/bindings/fake_binding.py | 2 +- src/ephys_link/bindings/mpm_binding.py | 2 +- src/ephys_link/bindings/ump_4_binding.py | 4 +- src/ephys_link/front_end/gui.py | 2 +- src/ephys_link/utils/common.py | 167 -------------------- src/ephys_link/utils/constants.py | 23 +++ src/ephys_link/utils/converters.py | 86 ++++++++++ src/ephys_link/utils/startup.py | 61 +++++++ 12 files changed, 193 insertions(+), 195 deletions(-) delete mode 100644 src/ephys_link/utils/common.py create mode 100644 src/ephys_link/utils/constants.py create mode 100644 src/ephys_link/utils/converters.py create mode 100644 src/ephys_link/utils/startup.py diff --git a/docs/development/code_organization.md b/docs/development/code_organization.md index 4647867..c9f229c 100644 --- a/docs/development/code_organization.md +++ b/docs/development/code_organization.md @@ -29,14 +29,13 @@ writing a manipulator binding. ## Static Analysis The project is strictly type checked using [`hatch fmt` (ruff)](https://hatch.pypa.io/1.9/config/static-analysis/) -and [basedpyright](https://docs.basedpyright.com/latest/). All PR's are checked against these tools. +and [basedpyright](https://docs.basedpyright.com/latest/). All PRs are checked against these tools. While they are very helpful in enforcing good code, they can be annoying when working with libraries that inherently return `Any` (like HTTP requests) or are not strictly statically typed. In those situations we have added inline comments to ignore specific checks. We try to only use this in scenarios where missing typing information came from -external sources, and it is not possible to make local type hints. We also do not make stubs since they would be -challenging to maintain. +external sources, and it is not possible to make local type hints. Do not use file-wide ignores under any circumstances. +We also do not make stubs since they would be challenging to maintain. We encourage using the type checker as a tool to help strengthen your code and only apply inline comments to ignore -specific instances where external libraries cause errors. Do not use file-wide ignores unless absolutely necessary (for -example, `common.py` bypasses ruff's "no `print`" check to print the server's startup). \ No newline at end of file +specific instances where external libraries cause errors. \ No newline at end of file diff --git a/src/ephys_link/__main__.py b/src/ephys_link/__main__.py index e8dc81f..cac50c7 100644 --- a/src/ephys_link/__main__.py +++ b/src/ephys_link/__main__.py @@ -15,17 +15,14 @@ from ephys_link.front_end.cli import CLI from ephys_link.front_end.gui import GUI from ephys_link.utils.console import Console +from ephys_link.utils.startup import check_for_updates, preamble def main() -> None: - """Ephys Link entry point. + """Ephys Link entry point.""" - 1. Get options via CLI or GUI. - 2. Instantiate the Console and make it globally accessible. - 3. Instantiate the Platform Handler with the appropriate platform bindings. - 4. Instantiate the Emergency Stop service. - 5. Start the server. - """ + # 0. Print the startup preamble. + preamble() # 1. Get options via CLI or GUI (if no CLI options are provided). options = CLI().parse_args() if len(argv) > 1 else GUI().get_options() @@ -33,12 +30,16 @@ def main() -> None: # 2. Instantiate the Console and make it globally accessible. console = Console(enable_debug=options.debug) - # 3. Instantiate the Platform Handler with the appropriate platform bindings. + # 3. Check for updates if not disabled. + if not options.ignore_updates: + check_for_updates(console) + + # 4. Instantiate the Platform Handler with the appropriate platform bindings. platform_handler = PlatformHandler(options, console) - # 4. Instantiate the Emergency Stop service. + # 5. Instantiate the Emergency Stop service. - # 5. Start the server. + # 6. Start the server. Server(options, platform_handler, console).launch() diff --git a/src/ephys_link/back_end/platform_handler.py b/src/ephys_link/back_end/platform_handler.py index 7c38304..02123b0 100644 --- a/src/ephys_link/back_end/platform_handler.py +++ b/src/ephys_link/back_end/platform_handler.py @@ -27,8 +27,9 @@ from ephys_link.bindings.mpm_binding import MPMBinding from ephys_link.utils.base_binding import BaseBinding -from ephys_link.utils.common import get_bindings, vector4_to_array from ephys_link.utils.console import Console +from ephys_link.utils.converters import vector4_to_array +from ephys_link.utils.startup import get_bindings @final diff --git a/src/ephys_link/back_end/server.py b/src/ephys_link/back_end/server.py index 67bef44..90f271e 100644 --- a/src/ephys_link/back_end/server.py +++ b/src/ephys_link/back_end/server.py @@ -32,8 +32,8 @@ from ephys_link.__about__ import __version__ from ephys_link.back_end.platform_handler import PlatformHandler -from ephys_link.utils.common import PORT, check_for_updates, server_preamble from ephys_link.utils.console import Console +from ephys_link.utils.constants import PORT # Server message generic types. INPUT_TYPE = TypeVar("INPUT_TYPE", bound=VBLBaseModel) @@ -86,12 +86,6 @@ def launch(self) -> None: Based on the options, either connect to a proxy or launch the server locally. """ - # Preamble. - server_preamble() - - # Check for updates. - if not self._options.ignore_updates: - check_for_updates() # List platform and available manipulators. self._console.info_print("PLATFORM", self._platform_handler.get_display_name()) diff --git a/src/ephys_link/bindings/fake_binding.py b/src/ephys_link/bindings/fake_binding.py index 63f7975..f59fadd 100644 --- a/src/ephys_link/bindings/fake_binding.py +++ b/src/ephys_link/bindings/fake_binding.py @@ -3,7 +3,7 @@ from vbl_aquarium.models.unity import Vector3, Vector4 from ephys_link.utils.base_binding import BaseBinding -from ephys_link.utils.common import list_to_vector4 +from ephys_link.utils.converters import list_to_vector4 @final diff --git a/src/ephys_link/bindings/mpm_binding.py b/src/ephys_link/bindings/mpm_binding.py index bee0fac..082fdba 100644 --- a/src/ephys_link/bindings/mpm_binding.py +++ b/src/ephys_link/bindings/mpm_binding.py @@ -14,7 +14,7 @@ from vbl_aquarium.models.unity import Vector3, Vector4 from ephys_link.utils.base_binding import BaseBinding -from ephys_link.utils.common import scalar_mm_to_um, vector4_to_array +from ephys_link.utils.converters import scalar_mm_to_um, vector4_to_array @final diff --git a/src/ephys_link/bindings/ump_4_binding.py b/src/ephys_link/bindings/ump_4_binding.py index 2d73cb7..9146df1 100644 --- a/src/ephys_link/bindings/ump_4_binding.py +++ b/src/ephys_link/bindings/ump_4_binding.py @@ -10,8 +10,8 @@ from vbl_aquarium.models.unity import Vector4 from ephys_link.utils.base_binding import BaseBinding -from ephys_link.utils.common import ( - RESOURCES_DIRECTORY, +from ephys_link.utils.constants import RESOURCES_DIRECTORY +from ephys_link.utils.converters import ( list_to_vector4, scalar_mm_to_um, um_to_mm, diff --git a/src/ephys_link/front_end/gui.py b/src/ephys_link/front_end/gui.py index 1a2e3ae..1e54e7d 100644 --- a/src/ephys_link/front_end/gui.py +++ b/src/ephys_link/front_end/gui.py @@ -20,7 +20,7 @@ from vbl_aquarium.models.ephys_link import EphysLinkOptions from ephys_link.__about__ import __version__ as version -from ephys_link.utils.common import get_binding_display_to_cli_name +from ephys_link.utils.startup import get_binding_display_to_cli_name # Define options path. OPTIONS_DIR = join(user_config_dir(), "VBL", "Ephys Link") diff --git a/src/ephys_link/utils/common.py b/src/ephys_link/utils/common.py deleted file mode 100644 index b458ea3..0000000 --- a/src/ephys_link/utils/common.py +++ /dev/null @@ -1,167 +0,0 @@ -# ruff: noqa: T201 -"""Commonly used utility functions and constants.""" - -from importlib import import_module -from inspect import getmembers, isclass -from os.path import abspath, dirname, join -from pkgutil import iter_modules - -from packaging.version import parse -from requests import ConnectionError, ConnectTimeout, get -from vbl_aquarium.models.unity import Vector4 - -from ephys_link.__about__ import __version__ -from ephys_link.utils.base_binding import BaseBinding - -# Ephys Link ASCII. -ASCII = r""" - ______ _ _ _ _ -| ____| | | | | (_) | | -| |__ _ __ | |__ _ _ ___ | | _ _ __ | | __ -| __| | '_ \| '_ \| | | / __| | | | | '_ \| |/ / -| |____| |_) | | | | |_| \__ \ | |____| | | | | < -|______| .__/|_| |_|\__, |___/ |______|_|_| |_|_|\_\ - | | __/ | - |_| |___/ -""" - -# Absolute path to the resource folder. -PACKAGE_DIRECTORY = dirname(dirname(abspath(__file__))) -RESOURCES_DIRECTORY = join(PACKAGE_DIRECTORY, "resources") -BINDINGS_DIRECTORY = join(PACKAGE_DIRECTORY, "bindings") - -# Ephys Link Port -PORT = 3000 - - -# Server startup. -def server_preamble() -> None: - """Print the server startup preamble.""" - print(ASCII) - print(__version__) - print() - print("This is the Ephys Link server window.") - print("You may safely leave it running in the background.") - print("To stop it, close this window or press CTRL + Pause/Break.") - print() - - -def check_for_updates() -> None: - """Check for updates to the Ephys Link.""" - try: - response = get("https://api.github.com/repos/VirtualBrainLab/ephys-link/tags", timeout=10) - latest_version = str(response.json()[0]["name"]) # pyright: ignore [reportAny] - if parse(latest_version) > parse(__version__): - print(f"Update available: {latest_version} !") - print("Download at: https://github.com/VirtualBrainLab/ephys-link/releases/latest") - except (ConnectionError, ConnectTimeout): - print("Unable to check for updates. Ignore updates or use the the -i flag to disable checks.\n") - - -def get_bindings() -> list[type[BaseBinding]]: - """Get all binding classes from the bindings directory. - - Returns: - List of binding classes. - """ - return [ - binding_type - for module in iter_modules([BINDINGS_DIRECTORY]) - for _, binding_type in getmembers(import_module(f"ephys_link.bindings.{module.name}"), isclass) - if issubclass(binding_type, BaseBinding) and binding_type != BaseBinding - ] - - -def get_binding_display_to_cli_name() -> dict[str, str]: - """Get mapping of display to CLI option names of the available platform bindings. - - Returns: - Dictionary of platform binding display name to CLI option name. - """ - return {binding_type.get_display_name(): binding_type.get_cli_name() for binding_type in get_bindings()} - - -# Unit conversions - - -def scalar_mm_to_um(mm: float) -> float: - """Convert scalar values of millimeters to micrometers. - - Args: - mm: Scalar value in millimeters. - - Returns: - Scalar value in micrometers. - """ - return mm * 1_000 - - -def vector_mm_to_um(mm: Vector4) -> Vector4: - """Convert vector values of millimeters to micrometers. - - Args: - mm: Vector in millimeters. - - Returns: - Vector in micrometers. - """ - return mm * 1_000 - - -def um_to_mm(um: Vector4) -> Vector4: - """Convert micrometers to millimeters. - - Args: - um: Length in micrometers. - - Returns: - Length in millimeters. - """ - return um / 1_000 - - -def vector4_to_array(vector4: Vector4) -> list[float]: - """Convert a [Vector4][vbl_aquarium.models.unity.Vector4] to a list of floats. - - Args: - vector4: [Vector4][vbl_aquarium.models.unity.Vector4] to convert. - - Returns: - List of floats. - """ - return [vector4.x, vector4.y, vector4.z, vector4.w] - - -def list_to_vector4(float_list: list[float | int]) -> Vector4: - """Convert a list of floats to a [Vector4][vbl_aquarium.models.unity.Vector4]. - - Args: - float_list: List of floats. - - Returns: - First four elements of the list as a Vector4 padded with zeros if necessary. - """ - - def get_element(this_array: list[float | int], index: int) -> float: - """Safely get an element from an array. - - Return 0 if the index is out of bounds. - - Args: - this_array: Array to get the element from. - index: Index to get. - - Returns: - Element at the index or 0 if the index is out of bounds. - """ - try: - return this_array[index] - except IndexError: - return 0.0 - - return Vector4( - x=get_element(float_list, 0), - y=get_element(float_list, 1), - z=get_element(float_list, 2), - w=get_element(float_list, 3), - ) diff --git a/src/ephys_link/utils/constants.py b/src/ephys_link/utils/constants.py new file mode 100644 index 0000000..198ea22 --- /dev/null +++ b/src/ephys_link/utils/constants.py @@ -0,0 +1,23 @@ +"""Globally accessible constants""" + +from os.path import abspath, dirname, join + +# Ephys Link ASCII. +ASCII = r""" + ______ _ _ _ _ +| ____| | | | | (_) | | +| |__ _ __ | |__ _ _ ___ | | _ _ __ | | __ +| __| | '_ \| '_ \| | | / __| | | | | '_ \| |/ / +| |____| |_) | | | | |_| \__ \ | |____| | | | | < +|______| .__/|_| |_|\__, |___/ |______|_|_| |_|_|\_\ + | | __/ | + |_| |___/ +""" + +# Absolute path to the resource folder. +PACKAGE_DIRECTORY = dirname(dirname(abspath(__file__))) +RESOURCES_DIRECTORY = join(PACKAGE_DIRECTORY, "resources") +BINDINGS_DIRECTORY = join(PACKAGE_DIRECTORY, "bindings") + +# Ephys Link Port +PORT = 3000 diff --git a/src/ephys_link/utils/converters.py b/src/ephys_link/utils/converters.py new file mode 100644 index 0000000..947bb43 --- /dev/null +++ b/src/ephys_link/utils/converters.py @@ -0,0 +1,86 @@ +"""Commonly used conversion functions.""" + +from vbl_aquarium.models.unity import Vector4 + + +def scalar_mm_to_um(mm: float) -> float: + """Convert scalar values of millimeters to micrometers. + + Args: + mm: Scalar value in millimeters. + + Returns: + Scalar value in micrometers. + """ + return mm * 1_000 + + +def vector_mm_to_um(mm: Vector4) -> Vector4: + """Convert vector values of millimeters to micrometers. + + Args: + mm: Vector in millimeters. + + Returns: + Vector in micrometers. + """ + return mm * 1_000 + + +def um_to_mm(um: Vector4) -> Vector4: + """Convert micrometers to millimeters. + + Args: + um: Length in micrometers. + + Returns: + Length in millimeters. + """ + return um / 1_000 + + +def vector4_to_array(vector4: Vector4) -> list[float]: + """Convert a [Vector4][vbl_aquarium.models.unity.Vector4] to a list of floats. + + Args: + vector4: [Vector4][vbl_aquarium.models.unity.Vector4] to convert. + + Returns: + List of floats. + """ + return [vector4.x, vector4.y, vector4.z, vector4.w] + + +def list_to_vector4(float_list: list[float | int]) -> Vector4: + """Convert a list of floats to a [Vector4][vbl_aquarium.models.unity.Vector4]. + + Args: + float_list: List of floats. + + Returns: + First four elements of the list as a Vector4 padded with zeros if necessary. + """ + + def get_element(this_array: list[float | int], index: int) -> float: + """Safely get an element from an array. + + Return 0 if the index is out of bounds. + + Args: + this_array: Array to get the element from. + index: Index to get. + + Returns: + Element at the index or 0 if the index is out of bounds. + """ + try: + return this_array[index] + except IndexError: + return 0.0 + + return Vector4( + x=get_element(float_list, 0), + y=get_element(float_list, 1), + z=get_element(float_list, 2), + w=get_element(float_list, 3), + ) diff --git a/src/ephys_link/utils/startup.py b/src/ephys_link/utils/startup.py new file mode 100644 index 0000000..c716370 --- /dev/null +++ b/src/ephys_link/utils/startup.py @@ -0,0 +1,61 @@ +"""Program startup helper functions.""" + +from importlib import import_module +from inspect import getmembers, isclass +from pkgutil import iter_modules + +from packaging.version import parse +from requests import ConnectionError, ConnectTimeout, get + +from ephys_link.__about__ import __version__ +from ephys_link.utils.base_binding import BaseBinding +from ephys_link.utils.console import Console +from ephys_link.utils.constants import ASCII, BINDINGS_DIRECTORY + + +def preamble() -> None: + """Print the server startup preamble.""" + print(ASCII) # noqa: T201 + print(__version__) # noqa: T201 + print() # noqa: T201 + print("This is the Ephys Link server window.") # noqa: T201 + print("You may safely leave it running in the background.") # noqa: T201 + print("To stop it, close this window or press CTRL + Pause/Break.") # noqa: T201 + print() # noqa: T201 + + +def check_for_updates(console: Console) -> None: + """Check for updates to the Ephys Link.""" + try: + response = get("https://api.github.com/repos/VirtualBrainLab/ephys-link/tags", timeout=10) + latest_version = str(response.json()[0]["name"]) # pyright: ignore [reportAny] + if parse(latest_version) > parse(__version__): + console.critical_print(f"Update available: {latest_version} (current: {__version__})") + console.critical_print("Download at: https://github.com/VirtualBrainLab/ephys-link/releases/latest") + except (ConnectionError, ConnectTimeout): + console.error_print( + "UPDATE", "Unable to check for updates. Ignore updates or use the the -i flag to disable checks.\n" + ) + + +def get_bindings() -> list[type[BaseBinding]]: + """Get all binding classes from the bindings directory. + + Returns: + List of binding classes. + """ + return [ + binding_type + for module in iter_modules([BINDINGS_DIRECTORY]) + for _, binding_type in getmembers(import_module(f"ephys_link.bindings.{module.name}"), isclass) + if issubclass(binding_type, BaseBinding) and binding_type != BaseBinding + ] + + +def get_binding_display_to_cli_name() -> dict[str, str]: + """Get mapping of display to CLI option names of the available platform bindings. + + Returns: + Dictionary of platform binding display name to CLI option name. + """ + return {binding_type.get_display_name(): binding_type.get_cli_name() for binding_type in get_bindings()} From 3a59ff266e059091c5de101c5c3eae111336c78a Mon Sep 17 00:00:00 2001 From: Kenneth Yang Date: Sat, 4 Jan 2025 21:05:52 -0800 Subject: [PATCH 13/14] Add argument documentation --- src/ephys_link/utils/startup.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/ephys_link/utils/startup.py b/src/ephys_link/utils/startup.py index c716370..db3fd45 100644 --- a/src/ephys_link/utils/startup.py +++ b/src/ephys_link/utils/startup.py @@ -25,7 +25,11 @@ def preamble() -> None: def check_for_updates(console: Console) -> None: - """Check for updates to the Ephys Link.""" + """Check for updates to the Ephys Link. + + Args: + console: Console instance for printing messages. + """ try: response = get("https://api.github.com/repos/VirtualBrainLab/ephys-link/tags", timeout=10) latest_version = str(response.json()[0]["name"]) # pyright: ignore [reportAny] From befc8ce5361fb14c7c1cf898f4796a3cc959f312 Mon Sep 17 00:00:00 2001 From: Kenneth Yang Date: Sat, 4 Jan 2025 21:06:36 -0800 Subject: [PATCH 14/14] Reformat --- src/ephys_link/utils/startup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ephys_link/utils/startup.py b/src/ephys_link/utils/startup.py index db3fd45..f00f518 100644 --- a/src/ephys_link/utils/startup.py +++ b/src/ephys_link/utils/startup.py @@ -26,7 +26,7 @@ def preamble() -> None: def check_for_updates(console: Console) -> None: """Check for updates to the Ephys Link. - + Args: console: Console instance for printing messages. """