diff --git a/.github/workflows/code-style.yaml b/.github/workflows/code-style.yaml deleted file mode 100644 index 499b8a5b5..000000000 --- a/.github/workflows/code-style.yaml +++ /dev/null @@ -1,49 +0,0 @@ -name: code-style -concurrency: - group: ${{ github.workflow }}-${{ github.event.number }}-${{ github.event.ref }} - cancel-in-progress: true -on: # yamllint disable-line rule:truthy - pull_request: - push: - branches: [main] - workflow_dispatch: - -jobs: - style: - timeout-minutes: 10 - runs-on: ubuntu-latest - steps: - - name: Checkout repository - uses: actions/checkout@v4 - - name: Setup Python 3.9 - uses: actions/setup-python@v5 - with: - python-version: '3.9' - architecture: 'x64' - - name: Install dependencies - run: | - python -m pip install --progress-bar off --upgrade pip setuptools wheel - python -m pip install --progress-bar off .[style] - - name: Run Ruff - run: ruff check . - - name: Run isort - uses: isort/isort-action@master - - name: Run black - uses: psf/black@stable - with: - options: "--check --verbose" - - name: Run codespell - uses: codespell-project/actions-codespell@master - with: - check_filenames: true - check_hidden: true - skip: './.git,./build,./.pytest_cache,./doc/_static/logos*' - ignore_words_file: ./.codespellignore - - name: Run pydocstyle - run: pydocstyle . - - name: Run bibclean - run: bibclean-check doc/references.bib - - name: Run toml-sort - run: toml-sort pyproject.toml --check - - name: Run yamllint - run: yamllint .github -c .yamllint.yaml --strict diff --git a/.github/workflows/doc.yaml b/.github/workflows/doc.yaml index 094c48e23..787d5029c 100644 --- a/.github/workflows/doc.yaml +++ b/.github/workflows/doc.yaml @@ -10,7 +10,7 @@ on: # yamllint disable-line rule:truthy jobs: build: - timeout-minutes: 10 + timeout-minutes: 30 runs-on: ubuntu-latest defaults: run: @@ -36,7 +36,11 @@ jobs: - name: Display system information run: mne_lsl-sys_info --developer - name: Build doc - run: make -C doc html + uses: nick-fields/retry@v2 + with: + timeout_minutes: 10 + max_attempts: 3 + command: make -C doc html - name: Prune sphinx environment run: rm -R ./doc/_build/html/.doctrees - name: Upload documentation diff --git a/.github/workflows/publish.yaml b/.github/workflows/publish.yaml index 20554e113..54e728873 100644 --- a/.github/workflows/publish.yaml +++ b/.github/workflows/publish.yaml @@ -23,9 +23,11 @@ jobs: - name: Install dependencies run: | python -m pip install --progress-bar off --upgrade pip setuptools wheel - python -m pip install --progress-bar off .[build] + python -m pip install --progress-bar off -e .[build,stubs] - name: Display system information run: mne_lsl-sys_info --developer + - name: Generate stub files + run: python tools/stubgen.py - name: Build and publish env: TWINE_USERNAME: __token__ diff --git a/.github/workflows/pytest.yaml b/.github/workflows/pytest.yaml index 4f912b233..2a604afac 100644 --- a/.github/workflows/pytest.yaml +++ b/.github/workflows/pytest.yaml @@ -54,7 +54,11 @@ jobs: - name: Display system information run: mne_lsl-sys_info --developer - name: Run pytest - run: pytest mne_lsl --cov=mne_lsl --cov-report=xml --cov-config=pyproject.toml -s + uses: nick-fields/retry@v2 + with: + timeout_minutes: 10 + max_attempts: 3 + command: pytest mne_lsl --cov=mne_lsl --cov-report=xml --cov-config=pyproject.toml -s - name: Upload to codecov uses: codecov/codecov-action@v3 with: @@ -98,7 +102,11 @@ jobs: - name: Display system information run: mne_lsl-sys_info --developer - name: Run pytest - run: pytest mne_lsl --cov=mne_lsl --cov-report=xml --cov-config=pyproject.toml + uses: nick-fields/retry@v2 + with: + timeout_minutes: 10 + max_attempts: 3 + command: pytest mne_lsl --cov=mne_lsl --cov-report=xml --cov-config=pyproject.toml - name: Upload to codecov uses: codecov/codecov-action@v3 with: diff --git a/.github/workflows/stubs.yaml b/.github/workflows/stubs.yaml new file mode 100644 index 000000000..78642ddfb --- /dev/null +++ b/.github/workflows/stubs.yaml @@ -0,0 +1,41 @@ +name: stubs +concurrency: + group: ${{ github.workflow }}-${{ github.event.number }}-${{ github.event.ref }} + cancel-in-progress: true +on: # yamllint disable-line rule:truthy + schedule: + - cron: '0 3 * * *' + workflow_dispatch: + +jobs: + generate: + timeout-minutes: 10 + runs-on: ubuntu-latest + defaults: + run: + shell: bash + steps: + - name: Checkout repository + uses: actions/checkout@v4 + - name: Setup Python 3.9 + uses: actions/setup-python@v5 + with: + python-version: 3.9 + architecture: 'x64' + - name: Install package + run: | + python -m pip install --progress-bar off --upgrade pip setuptools + python -m pip install --progress-bar off -e .[stubs] + - name: Display system information + run: mne_lsl-sys_info --developer + - name: Generate stub files + run: python tools/stubgen.py + - name: Push stub files + run: | + git config --global user.name 'github-actions[bot]' + git config --global user.email 'github-actions[bot]@users.noreply.github.com' + if [ -n "$(git status --porcelain)" ]; then + git add . + git commit -m "deploy stub files [ci skip]" + git push + fi diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 5c6d6d2b8..74a2a9da2 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,6 +1,3 @@ -ci: - skip: [codespell, pydocstyle, yamllint] - repos: - repo: https://github.com/pycqa/isort rev: 5.13.2 @@ -8,25 +5,23 @@ repos: - id: isort files: mne_lsl - - repo: https://github.com/psf/black-pre-commit-mirror - rev: 23.12.0 - hooks: - - id: black - args: [--quiet] - files: mne_lsl - - repo: https://github.com/astral-sh/ruff-pre-commit rev: v0.1.8 hooks: - id: ruff - args: [--fix, --exit-non-zero-on-fix] + name: ruff linter + args: [--fix] + files: mne_lsl + - id: ruff-format + name: ruff formatter files: mne_lsl - repo: https://github.com/codespell-project/codespell rev: v2.2.6 hooks: - id: codespell - args: [--check-filenames, --ignore-words=.codespellignore, --skip=*.svg] + args: [--write-changes] + additional_dependencies: [tomli] - repo: https://github.com/pycqa/pydocstyle rev: 6.3.0 diff --git a/README.md b/README.md index 620d98ea6..7b9bb788c 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,4 @@ +[![Ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json)](https://github.com/astral-sh/ruff) [![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black) [![Imports: isort](https://img.shields.io/badge/%20imports-isort-%231674b1?style=flat&labelColor=ef8336)](https://pycqa.github.io/isort/) [![codecov](https://codecov.io/gh/mne-tools/mne-lsl/graph/badge.svg?token=Xoeh6T13qi)](https://codecov.io/gh/mne-tools/mne-lsl) diff --git a/mne_lsl/__init__.pyi b/mne_lsl/__init__.pyi new file mode 100644 index 000000000..676715b78 --- /dev/null +++ b/mne_lsl/__init__.pyi @@ -0,0 +1,5 @@ +from ._version import __version__ as __version__ +from .utils.config import sys_info as sys_info +from .utils.logs import add_file_handler as add_file_handler +from .utils.logs import logger as logger +from .utils.logs import set_log_level as set_log_level diff --git a/mne_lsl/_typing.pyi b/mne_lsl/_typing.pyi new file mode 100644 index 000000000..4bf0456d0 --- /dev/null +++ b/mne_lsl/_typing.pyi @@ -0,0 +1,5 @@ +from numpy.typing import DTypeLike as DTypeLike + +ScalarFloatType: tuple[DTypeLike, ...] +ScalarIntType: tuple[DTypeLike, ...] +ScalarType: tuple[DTypeLike, ...] diff --git a/mne_lsl/datasets/__init__.pyi b/mne_lsl/datasets/__init__.pyi new file mode 100644 index 000000000..e69de29bb diff --git a/mne_lsl/datasets/_fetch.pyi b/mne_lsl/datasets/_fetch.pyi new file mode 100644 index 000000000..fe86a6967 --- /dev/null +++ b/mne_lsl/datasets/_fetch.pyi @@ -0,0 +1,26 @@ +from pathlib import Path +from typing import Union + +from ..utils._checks import ensure_path as ensure_path +from ..utils._path import walk as walk +from ..utils.logs import logger as logger + +def fetch_dataset(path: Path, base_url: str, registry: Union[str, Path]) -> Path: + """Fetch a dataset from the remote. + + Parameters + ---------- + path : str | Path + Local path where the dataset should be cloned. + base_url : str + Base URL for the remote data sources. All requests will be made relative to this + URL. If the URL does not end in a '/', a trailing '/' will be added + automatically. + registry : str | Path + Path to the txt file containing the registry. + + Returns + ------- + path : Path + Absolute path to the local clone of the dataset. + """ diff --git a/mne_lsl/datasets/sample.py b/mne_lsl/datasets/sample.py index 79160934c..3e38e6f87 100644 --- a/mne_lsl/datasets/sample.py +++ b/mne_lsl/datasets/sample.py @@ -13,7 +13,7 @@ if TYPE_CHECKING: from typing import Optional, Union -_REGISTRY = files("mne_lsl.datasets") / "sample-registry.txt" +_REGISTRY: Path = files("mne_lsl.datasets") / "sample-registry.txt" def _make_registry( diff --git a/mne_lsl/datasets/sample.pyi b/mne_lsl/datasets/sample.pyi new file mode 100644 index 000000000..3dcf8710c --- /dev/null +++ b/mne_lsl/datasets/sample.pyi @@ -0,0 +1,29 @@ +from pathlib import Path +from typing import Optional, Union + +from ..utils._checks import ensure_path as ensure_path +from ._fetch import fetch_dataset as fetch_dataset + +_REGISTRY: Path + +def _make_registry( + folder: Union[str, Path], output: Optional[Union[str, Path]] = ... +) -> None: + """Create the registry file for the sample dataset. + + Parameters + ---------- + folder : path-like + Path to the sample dataset. + output : path-like + Path to the output registry file. + """ + +def data_path() -> Path: + """Return the path to the sample dataset, downloaded if needed. + + Returns + ------- + path : Path + Path to the sample dataset, by default in ``"~/mne_data/MNE-LSL"``. + """ diff --git a/mne_lsl/datasets/testing.py b/mne_lsl/datasets/testing.py index 19c32f77e..20ddd6f4f 100644 --- a/mne_lsl/datasets/testing.py +++ b/mne_lsl/datasets/testing.py @@ -13,7 +13,7 @@ if TYPE_CHECKING: from typing import Optional, Union -_REGISTRY = files("mne_lsl.datasets") / "testing-registry.txt" +_REGISTRY: Path = files("mne_lsl.datasets") / "testing-registry.txt" def _make_registry( @@ -25,7 +25,7 @@ def _make_registry( ---------- folder : path-like Path to the sample dataset. - output : str | Path + output : path-like Path to the output registry file. """ folder = ensure_path(folder, must_exist=True) diff --git a/mne_lsl/datasets/testing.pyi b/mne_lsl/datasets/testing.pyi new file mode 100644 index 000000000..dd32a19b2 --- /dev/null +++ b/mne_lsl/datasets/testing.pyi @@ -0,0 +1,29 @@ +from pathlib import Path +from typing import Optional, Union + +from ..utils._checks import ensure_path as ensure_path +from ._fetch import fetch_dataset as fetch_dataset + +_REGISTRY: Path + +def _make_registry( + folder: Union[str, Path], output: Optional[Union[str, Path]] = ... +) -> None: + """Create the registry file for the sample dataset. + + Parameters + ---------- + folder : path-like + Path to the sample dataset. + output : path-like + Path to the output registry file. + """ + +def data_path() -> Path: + """Return the path to the sample dataset, downloaded if needed. + + Returns + ------- + path : Path + Path to the sample dataset, by default in ``"~/mne_data/mne_lsl"``. + """ diff --git a/mne_lsl/lsl/__init__.pyi b/mne_lsl/lsl/__init__.pyi new file mode 100644 index 000000000..4279ad3a0 --- /dev/null +++ b/mne_lsl/lsl/__init__.pyi @@ -0,0 +1,7 @@ +from .functions import library_version as library_version +from .functions import local_clock as local_clock +from .functions import protocol_version as protocol_version +from .functions import resolve_streams as resolve_streams +from .stream_info import StreamInfo as StreamInfo +from .stream_inlet import StreamInlet as StreamInlet +from .stream_outlet import StreamOutlet as StreamOutlet diff --git a/mne_lsl/lsl/_utils.pyi b/mne_lsl/lsl/_utils.pyi new file mode 100644 index 000000000..261fec709 --- /dev/null +++ b/mne_lsl/lsl/_utils.pyi @@ -0,0 +1,131 @@ +from typing import Optional + +from _typeshed import Incomplete + +from .load_liblsl import lib as lib + +class XMLElement: + """A lightweight XML element tree modeling the .desc() field of StreamInfo. + + Has a name and can have multiple named children or have text content as value; + attributes are omitted. Insider note: The interface is modeled after a subset of + pugixml's node type and is compatible with it. + """ + + e: Incomplete + + def __init__(self, handle) -> None: + """Construct a new XML element from existing handle.""" + + def first_child(self): + """Get the first child of the element.""" + + def last_child(self): + """Get the last child of the element.""" + + def child(self, name): + """Get a child with a specified name.""" + + def next_sibling(self, name: Incomplete | None = ...): + """Get the next sibling in the children list of the parent node. + + If a name is provided, the next sibling with the given name is returned. + """ + + def previous_sibling(self, name: Incomplete | None = ...): + """Get the previous sibling in the children list of the parent node. + + If a name is provided, the previous sibling with the given name is returned. + """ + + def parent(self): + """Get the parent node.""" + + def empty(self): + """True if this node is empty.""" + + def is_text(self): + """True if this node is a text body (instead of an XML element). + + True both for plain char data and CData. + """ + + def name(self): + """Name of the element.""" + + def value(self): + """Value of the element.""" + + def child_value(self, name: Incomplete | None = ...): + """Get child value (value of the first child that is text). + + If a name is provided, then the value of the first child with the given name is + returned. + """ + + def append_child_value(self, name, value): + """Append a child node with a given name, which has a (nameless) plain-text + child with the given text value. + """ + + def prepend_child_value(self, name, value): + """Prepend a child node with a given name, which has a (nameless) plain-text + child with the given text value. + """ + + def set_child_value(self, name, value): + """Set the text value of the (nameless) plain-text child of a named + child node. + """ + + def set_name(self, name): + """Set the element's name. + + Return False if the node is empty. + """ + + def set_value(self, value): + """Set the element's value. + + Return False if the node is empty. + """ + + def append_child(self, name): + """Append a child element with the specified name.""" + + def prepend_child(self, name): + """Prepend a child element with the specified name.""" + + def append_copy(self, elem): + """Append a copy of the specified element as a child.""" + + def prepend_copy(self, elem): + """Prepend a copy of the specified element as a child.""" + + def remove_child(self, rhs) -> None: + """Remove a given child element, specified by name or as element.""" + +class LostError(RuntimeError): ... +class InvalidArgumentError(RuntimeError): ... +class InternalError(RuntimeError): ... + +def handle_error(errcode) -> None: + """Error handler function. + + Translates an error code into an exception. + """ + +def free_char_p_array_memory(char_p_array) -> None: ... +def check_timeout(timeout: Optional[float]) -> float: + """Check that the provided timeout is valid. + + Parameters + ---------- + timeout : float | None + Timeout (in seconds) or None to disable timeout. + + Returns + ------- + timeout : float + Timeout (in seconds). If None was provided, a very large float is returned. + """ diff --git a/mne_lsl/lsl/constants.py b/mne_lsl/lsl/constants.py index 579b5cd34..36fe7582f 100644 --- a/mne_lsl/lsl/constants.py +++ b/mne_lsl/lsl/constants.py @@ -127,7 +127,7 @@ def push_sample_int64(*_): # noqa: D103 # --------------------- # Post processing flags # --------------------- -post_processing_flags = { +post_processing_flags: dict[str, int] = { "clocksync": 1, "dejitter": 2, "monotize": 4, diff --git a/mne_lsl/lsl/functions.pyi b/mne_lsl/lsl/functions.pyi new file mode 100644 index 000000000..fc91f82eb --- /dev/null +++ b/mne_lsl/lsl/functions.pyi @@ -0,0 +1,83 @@ +from typing import Optional + +from ..utils._checks import check_type as check_type +from ..utils._checks import ensure_int as ensure_int +from .load_liblsl import lib as lib +from .stream_info import _BaseStreamInfo as _BaseStreamInfo + +def library_version() -> int: + """Version of the binary LSL library. + + Returns + ------- + version : int + Version of the binary LSL library. + The major version is ``version // 100``. + The minor version is ``version % 100``. + """ + +def protocol_version() -> int: + """Version of the LSL protocol. + + Returns + ------- + version : int + Version of the binary LSL library. + The major version is ``version // 100``. + The minor version is ``version % 100``. + + Notes + ----- + Clients with different minor versions are protocol-compatible with each other, while + clients with different major versions will refuse to work together. + """ + +def local_clock() -> float: + """Obtain a local system timestamp in seconds. + + Returns + ------- + time : int + Local timestamp in seconds. + """ + +def resolve_streams( + timeout: float = ..., + name: Optional[str] = ..., + stype: Optional[str] = ..., + source_id: Optional[str] = ..., + minimum: int = ..., +) -> list[_BaseStreamInfo]: + """Resolve streams on the network. + + This function returns all currently available streams from any outlet on the + network. The network is usually the subnet specified at the local router, but may + also include a group of machines visible to each other via multicast packets (given + that the network supports it), or list of hostnames. These details may optionally be + customized by the experimenter in a configuration file (see Network Connectivity in + the LSL wiki). + + Parameters + ---------- + timeout : float + Timeout (in seconds) of the operation. If this is too short (e.g. + ``< 0.5 seconds``) only a subset (or none) of the outlets that are present on + the network may be returned. + name : str | None + Restrict the selected streams to this name. + stype : str | None + Restrict the selected stream to this type. + source_id : str | None + Restrict the selected stream to this source ID. + minimum : int + Minimum number of stream to return where restricting the selection. As soon as + this minimum is hit, the search will end. Only works if at least one of the 3 + identifiers ``name``, ``stype`` or ``source_id`` is not ``None``. + + Returns + ------- + sinfos : list + List of :class:`~mne_lsl.lsl.StreamInfo` objects found on the network. While a + :class:`~mne_lsl.lsl.StreamInfo` is not bound to an Inlet, the description field + remains empty. + """ diff --git a/mne_lsl/lsl/load_liblsl.py b/mne_lsl/lsl/load_liblsl.py index 83d08c092..2ac9e5423 100644 --- a/mne_lsl/lsl/load_liblsl.py +++ b/mne_lsl/lsl/load_liblsl.py @@ -26,23 +26,23 @@ # Minimum liblsl version. The major version is given by version // 100 # and the minor version is given by version % 100. -_VERSION_MIN = 115 +_VERSION_MIN: int = 115 # liblsl objects created with the same protocol version are inter-compatible. -_VERSION_PROTOCOL = 110 -_PLATFORM = platform.system().lower().strip() -_PLATFORM_SUFFIXES = { +_VERSION_PROTOCOL: int = 110 +_PLATFORM: str = platform.system().lower().strip() +_PLATFORM_SUFFIXES: dict[str, str] = { "windows": ".dll", "darwin": ".dylib", "linux": ".so", } # variables which should be kept in sync with liblsl release -_SUPPORTED_DISTRO = { +_SUPPORTED_DISTRO: dict[str, tuple[str, ...]] = { # TODO: check if liblsl bookworm works as expected with mne-lsl "debian": ("12",), "ubuntu": ("18.04", "20.04", "22.04"), } # generic error message -_ERROR_MSG = ( +_ERROR_MSG: str = ( "Please visit liblsl library github page (https://github.com/sccn/liblsl) and " "install a release in the system directories or provide its path in the " "environment variable MNE_LSL_LIB or PYLSL_LIB." diff --git a/mne_lsl/lsl/load_liblsl.pyi b/mne_lsl/lsl/load_liblsl.pyi new file mode 100644 index 000000000..622eae873 --- /dev/null +++ b/mne_lsl/lsl/load_liblsl.pyi @@ -0,0 +1,98 @@ +from ctypes import CDLL +from pathlib import Path +from typing import Optional, Union + +from _typeshed import Incomplete +from pooch import Pooch + +from .. import __version__ as __version__ +from ..utils._path import walk as walk +from ..utils.logs import logger as logger + +_VERSION_MIN: int +_VERSION_PROTOCOL: int +_PLATFORM: str +_PLATFORM_SUFFIXES: dict[str, str] +_SUPPORTED_DISTRO: dict[str, tuple[str, ...]] +_ERROR_MSG: str + +def load_liblsl() -> CDLL: + """Load the binary LSL library on the system.""" + +def _find_liblsl() -> Optional[CDLL]: + """Search for liblsl in the environment variable and in the system folders. + + Returns + ------- + lib : CDLL | None + Loaded binary LSL library. None if not found. + """ + +def _fetch_liblsl(folder: Path = ...) -> Optional[CDLL]: + """Fetch liblsl on the release page. + + Parameters + ---------- + folder : Path + Folder where the fetched liblsl is stored. + + Returns + ------- + lib : CDLL | None + Loaded binary LSL library. None if not found for this platform. + """ + +def _pooch_processor_liblsl(fname: str, action: str, pooch: Pooch) -> str: + """Processor of the pooch-downloaded liblsl. + + Parameters + ---------- + fname : str + The full path of the file in the local data storage. + action : str + Either: + * "download" (file doesn't exist and will be downloaded) + * "update" (file is outdated and will be downloaded) + * "fetch" (file exists and is updated so no download is necessary) + pooch : Pooch + The instance of the Pooch class that is calling this function. + + Returns + ------- + fname : str + The full path to the file in the local data storage. + """ + +def _attempt_load_liblsl(libpath: Union[str, Path]) -> tuple[str, Optional[int]]: + """Try loading a binary LSL library. + + Parameters + ---------- + libpath : Path + Path to the binary LSL library. + + Returns + ------- + libpath : str + Path to the binary LSL library, converted to string for the given OS. + version : int + Version of the binary LSL library. + The major version is version // 100. + The minor version is version % 100. + """ + +def _set_types(lib: CDLL) -> CDLL: + """Set the argument and return types for the different liblsl functions. + + Parameters + ---------- + lib : CDLL + Loaded binary LSL library. + + Returns + ------- + lib : CDLL + Loaded binary LSL library with the return types set. + """ + +lib: Incomplete diff --git a/mne_lsl/lsl/stream_info.pyi b/mne_lsl/lsl/stream_info.pyi new file mode 100644 index 000000000..1198769d5 --- /dev/null +++ b/mne_lsl/lsl/stream_info.pyi @@ -0,0 +1,404 @@ +from typing import Any, Optional, Union + +from _typeshed import Incomplete +from mne import Info, Projection +from mne.io._digitization import DigPoint +from numpy.typing import DTypeLike as DTypeLike +from numpy.typing import NDArray as NDArray + +from .._typing import ScalarIntType as ScalarIntType +from ..utils._checks import check_type as check_type +from ..utils._checks import check_value as check_value +from ..utils._checks import ensure_int as ensure_int +from ..utils.logs import logger as logger +from ..utils.meas_info import create_info as create_info +from ._utils import XMLElement as XMLElement +from .constants import fmt2idx as fmt2idx +from .constants import fmt2numpy as fmt2numpy +from .constants import idx2fmt as idx2fmt +from .constants import numpy2fmt as numpy2fmt +from .constants import string2fmt as string2fmt +from .load_liblsl import lib as lib + +_MAPPING_LSL: Incomplete +_LOC_NAMES: Incomplete + +class _BaseStreamInfo: + """Base Stream information object, storing the declaration of a stream. + + A StreamInfo contains the following information: + + * Core information (name, number of channels, sampling frequency, channel format, + ...) + * Optional metadata about the stream content (channel labels, measurement units, + ...) + * Hosting information (uID, hostname, ...) if bound to an inlet or outlet + """ + + _obj: Incomplete + _dtype: Incomplete + + def __init__(self, obj) -> None: ... + def __del__(self) -> None: + """Destroy a `~mne_lsl.lsl.StreamInfo`.""" + + def __eq__(self, other: Any) -> bool: + """Equality == method.""" + + def __ne__(self, other: Any) -> bool: + """Inequality != method.""" + + def __hash__(self) -> int: + """Determine a hash from the properties.""" + + def __repr__(self) -> str: + """Representation of the Info.""" + + @property + def dtype(self) -> Union[str, DTypeLike]: + """Channel format of a stream. + + All channels in a stream have the same format. + + :type: :class:`~numpy.dtype` | ``"string"`` + """ + + @property + def name(self) -> str: + """Name of the stream. + + The name of the stream is defined by the application creating the LSL outlet. + Streams with identical names can coexist, at the cost of ambiguity for the + recording application and/or the experimenter. + + :type: :class:`str` + """ + + @property + def n_channels(self) -> int: + """Number of channels. + + A stream must have at least one channel. The number of channels remains constant + for all samples. + + :type: :class:`int` + """ + + @property + def sfreq(self) -> float: + """Sampling rate of the stream, according to the source (in Hz). + + If a stream is irregularly sampled, the sampling rate is set to ``0``. + + :type: :class:`float` + """ + + @property + def source_id(self) -> str: + """Unique identifier of the stream's source. + + The unique source (or device) identifier is an optional piece of information + that, if available, allows endpoints (such as the recording program) to + re-acquire a stream automatically once if it came back online. + + :type: :class:`str` + """ + + @property + def stype(self) -> str: + """Type of the stream. + + The content type is a short string, such as ``"EEG"``, ``"Gaze"``, ... which + describes the content carried by the channel. If a stream contains mixed + content, this value should be an empty string and the type should be stored in + the description of individual channels. + + :type: :class:`str` + """ + + @property + def created_at(self) -> float: + """Timestamp at which the stream was created. + + This is the time stamps at which the stream was first created, as determined by + :func:`mne_lsl.lsl.local_clock` on the providing machine. + + :type: :class:`float` + """ + + @property + def hostname(self) -> str: + """Hostname of the providing machine. + + :type: :class:`str` + """ + + @property + def session_id(self) -> str: + """Session ID for the given stream. + + The session ID is an optional human-assigned identifier of the recording + session. While it is rarely used, it can be used to prevent concurrent recording + activities on the same sub-network (e.g., in multiple experiment areas) from + seeing each other's streams (can be assigned in a configuration file read by + liblsl, see also Network Connectivity in the LSL wiki). + + :type: :class:`str` + """ + + @property + def uid(self) -> str: + """Unique ID of the :class:`~mne_lsl.lsl.StreamOutlet` instance. + + This ID is guaranteed to be different across multiple instantiations of the same + :class:`~mne_lsl.lsl.StreamOutlet`, e.g. after a re-start. + + :type: :class:`str` + """ + + @property + def protocol_version(self) -> int: + """Version of the LSL protocol. + + The major version is ``version // 100``. + The minor version is ``version % 100``. + + :type: :class:`int` + """ + + @property + def as_xml(self) -> str: + """Retrieve the entire stream_info in XML format. + + This yields an XML document (in string form) whose top-level element is + ````. The info element contains one element for each field of the + :class:`~mne_lsl.lsl.StreamInfo` class, including: + + * the core elements ``name``, ``type`` (eq. ``stype``), ``channel_count`` + (eq. ``n_channels``), ``nominal_srate`` (eq. ``sfreq``), ``channel_format`` + (eq. ``dtype``), ``source_id`` + * the misc elements ``version``, ``created_at``, ``uid``, ``session_id``, + ``v4address``, ``v4data_port``, ``v4service_port``, ``v6address``, + ``v6data_port``, ``v6service_port`` + * the extended description element ``desc`` with user-defined sub-elements. + + :type: :class:`str` + """ + + @property + def desc(self) -> XMLElement: + """Extended description of the stream. + + It is highly recommended that at least the channel labels are described here. + See code examples on the LSL wiki. Other information, such as amplifier + settings, measurement units if deviating from defaults, setup information, + subject information, etc.. can be specified here, as well. Meta-data + recommendations follow the `XDF file format project`_. + + Important: if you use a stream content type for which meta-data recommendations + exist, please try to lay out your meta-data in agreement with these + recommendations for compatibility with other applications. + + .. _XDF file format project: https://github.com/sccn/xdf/wiki/Meta-Data + """ + + def get_channel_info(self) -> Info: + """Get the FIFF measurement :class:`~mne.Info` in the description. + + Returns + ------- + info : Info + :class:`~mne.Info` containing the measurement information. + """ + + def get_channel_names(self) -> Optional[list[str]]: + """Get the channel names in the description. + + Returns + ------- + ch_names : list of str or ``None`` | None + List of channel names, matching the number of total channels. + If ``None``, the channel names are not set. + + .. warning:: + + If a list of str and ``None`` are returned, some of the channel names + are missing. This is not expected and could occur if the XML tree in + the ``desc`` property is tempered with outside of the defined getter and + setter. + """ + + def get_channel_types(self) -> Optional[list[str]]: + """Get the channel types in the description. + + Returns + ------- + ch_types : list of str or ``None`` | None + List of channel types, matching the number of total channels. + If ``None``, the channel types are not set. + + .. warning:: + + If a list of str and ``None`` are returned, some of the channel types + are missing. This is not expected and could occur if the XML tree in + the ``desc`` property is tempered with outside of the defined getter and + setter. + """ + + def get_channel_units(self) -> Optional[list[str]]: + """Get the channel units in the description. + + Returns + ------- + ch_units : list of str or ``None`` | None + List of channel units, matching the number of total channels. + If ``None``, the channel units are not set. + + .. warning:: + + If a list of str and ``None`` are returned, some of the channel units + are missing. This is not expected and could occur if the XML tree in + the ``desc`` property is tempered with outside of the defined getter and + setter. + """ + + def _get_channel_info(self, name: str) -> Optional[list[str]]: + """Get the 'channel/name' element in the XML tree.""" + + def _get_channel_projectors(self) -> list[Projection]: + """Get the SSP vectors in the XML tree.""" + + def _get_digitization(self) -> list[DigPoint]: + """Get the digitization in the XML tree.""" + + def set_channel_info(self, info: Info) -> None: + """Set the channel info from a FIFF measurement :class:`~mne.Info`. + + Parameters + ---------- + info : Info + :class:`~mne.Info` containing the measurement information. + """ + + def set_channel_names(self, ch_names: Union[list[str], tuple[str]]) -> None: + """Set the channel names in the description. Existing labels are overwritten. + + Parameters + ---------- + ch_names : list of str + List of channel names, matching the number of total channels. + """ + + def set_channel_types(self, ch_types: Union[str, list[str]]) -> None: + """Set the channel types in the description. Existing types are overwritten. + + The types are given as human readable strings, e.g. ``'eeg'``. + + Parameters + ---------- + ch_types : list of str | str + List of channel types, matching the number of total channels. + If a single :class:`str` is provided, the type is applied to all channels. + """ + + def set_channel_units( + self, ch_units: Union[str, list[str], int, list[int], NDArray[None]] + ) -> None: + """Set the channel units in the description. Existing units are overwritten. + + The units are given as human readable strings, e.g. ``'microvolts'``, or as + multiplication factor, e.g. ``-6`` for ``1e-6`` thus converting e.g. Volts to + microvolts. + + Parameters + ---------- + ch_units : list of str | list of int | array of int | str | int + List of channel units, matching the number of total channels. + If a single :class:`str` or :class:`int` is provided, the unit is applied to + all channels. + + Notes + ----- + Some channel types doch_units not have a unit. The :class:`str` ``none`` or the + :class:`int` 0 should be used to denote this channel unit, corresponding to + ``FIFF_UNITM_NONE`` in MNE-Python. + """ + + def _set_channel_info(self, ch_infos: list[str], name: str) -> None: + """Set the 'channel/name' element in the XML tree.""" + + def _set_channel_projectors(self, projs: list[Projection]) -> None: + """Set the SSP projector.""" + + def _set_digitization(self, dig_points: list[DigPoint]) -> None: + """Set the digitization points.""" + + @staticmethod + def _add_first_node(desc: XMLElement, name: str) -> XMLElement: + """Add the first node in the description and return it.""" + + @staticmethod + def _prune_description_node(node: XMLElement, parent: XMLElement) -> None: + """Prune a node and remove outdated entries.""" + + @staticmethod + def _set_description_node(node: XMLElement, mapping: dict[str, Any]) -> None: + """Set the key: value child(s) of a node.""" + + @staticmethod + def _get_fiff_int_named( + value: Optional[str], name: str, mapping: dict[int, int] + ) -> Optional[int]: + """Try to retrieve the FIFF integer code from the str representation.""" + +class StreamInfo(_BaseStreamInfo): + """Base Stream information object, storing the declaration of a stream. + + A StreamInfo contains the following information: + + * Core information (name, number of channels, sampling frequency, channel format, + ...). + * Optional metadata about the stream content (channel labels, measurement units, + ...). + * Hosting information (uID, hostname, ...) if bound to a + :class:`~mne_lsl.lsl.StreamInlet` or :class:`~mne_lsl.lsl.StreamOutlet`. + + Parameters + ---------- + name : str + Name of the stream. This field can not be empty. + stype : str + Content type of the stream, e.g. ``"EEG"`` or ``"Gaze"``. If a stream contains + mixed content, this value should be empty and the description of each channel + should include its type. + n_channels : int ``≥ 1`` + Also called ``channel_count``, represents the number of channels per sample. + This number stays constant for the lifetime of the stream. + sfreq : float ``≥ 0`` + Also called ``nominal_srate``, represents the sampling rate (in Hz) as + advertised by the data source. If the sampling rate is irregular (e.g. for a + trigger stream), the sampling rate is set to ``0``. + dtype : str | dtype + Format of each channel. If your channels have different formats, consider + supplying multiple streams or use the largest type that can hold them all. + One of ``('string', 'float32', 'float64', 'int8', 'int16', 'int32')``. + ``'int64'`` is partially supported. Can also be the equivalent numpy type, e.g. + ``np.int8``. + source_id : str + A unique identifier of the device or source of the data. If not empty, this + information improves the system robustness since it allows recipients to recover + from failure by finding a stream with the same ``source_id`` on the network. + """ + + def __init__( + self, + name: str, + stype: str, + n_channels: int, + sfreq: float, + dtype: str, + source_id: str, + ) -> None: ... + @staticmethod + def _dtype2idxfmt(dtype: Union[str, int, DTypeLike]) -> int: + """Convert a string format to its LSL integer value.""" diff --git a/mne_lsl/lsl/stream_inlet.pyi b/mne_lsl/lsl/stream_inlet.pyi new file mode 100644 index 000000000..17d542f5c --- /dev/null +++ b/mne_lsl/lsl/stream_inlet.pyi @@ -0,0 +1,306 @@ +from typing import Optional, Sequence, Union + +import numpy as np +from _typeshed import Incomplete +from numpy.typing import DTypeLike as DTypeLike +from numpy.typing import NDArray as NDArray + +from .._typing import ScalarType as ScalarType +from ..utils._checks import check_type as check_type +from ..utils._checks import check_value as check_value +from ..utils._checks import ensure_int as ensure_int +from ..utils._docs import copy_doc as copy_doc +from ..utils.logs import logger as logger +from ._utils import check_timeout as check_timeout +from ._utils import free_char_p_array_memory as free_char_p_array_memory +from ._utils import handle_error as handle_error +from .constants import fmt2numpy as fmt2numpy +from .constants import fmt2pull_chunk as fmt2pull_chunk +from .constants import fmt2pull_sample as fmt2pull_sample +from .constants import post_processing_flags as post_processing_flags +from .load_liblsl import lib as lib +from .stream_info import _BaseStreamInfo as _BaseStreamInfo + +class StreamInlet: + """An inlet to retrieve data and metadata on the network. + + Parameters + ---------- + sinfo : StreamInfo + Description of the stream to connect to. + chunk_size : int ``≥ 1`` | ``0`` + The desired chunk granularity in samples. By default, the ``chunk_size`` defined + by the sender (outlet) is used. + max_buffered : int ``≥ 0`` + The maximum amount of data to buffer in the Outlet. The number of samples + buffered is ``max_buffered * 100`` if the sampling rate is irregular, else it's + ``max_buffered`` seconds. + recover : bool + Attempt to silently recover lost streams that are recoverable (requires a + ``source_id`` to be specified in the :class:`~mne_lsl.lsl.StreamInfo`). + processing_flags : sequence of str | ``'all'`` | None + Set the post-processing options. By default, post-processing is disabled. Any + combination of the processing flags is valid. The available flags are: + + * ``'clocksync'``: Automatic clock synchronization, equivalent to + manually adding the estimated + :meth:`~mne_lsl.lsl.StreamInlet.time_correction`. + * ``'dejitter'``: Remove jitter on the received timestamps with a + smoothing algorithm. + * ``'monotize'``: Force the timestamps to be monotically ascending. + This option should not be enable if ``'dejitter'`` is not enabled. + * ``'threadsafe'``: Post-processing is thread-safe, thus the same + inlet can be read from multiple threads. + """ + + _lock: Incomplete + _dtype: Incomplete + _name: Incomplete + _n_channels: Incomplete + _sfreq: Incomplete + _stype: Incomplete + _do_pull_sample: Incomplete + _do_pull_chunk: Incomplete + _buffer_data: Incomplete + _buffer_ts: Incomplete + _stream_is_open: bool + + def __init__( + self, + sinfo: _BaseStreamInfo, + chunk_size: int = ..., + max_buffered: float = ..., + recover: bool = ..., + processing_flags: Optional[Union[str, Sequence[str]]] = ..., + ) -> None: ... + @property + def _obj(self): ... + __obj: Incomplete + + @_obj.setter + def _obj(self, obj) -> None: ... + def __del__(self) -> None: + """Destroy a :class:`~mne_lsl.lsl.StreamInlet`. + + The inlet will automatically disconnect. + """ + + def open_stream(self, timeout: Optional[float] = ...) -> None: + """Subscribe to a data stream. + + All samples pushed in at the other end from this moment onwards will be queued + and eventually be delivered in response to + :meth:`~mne_lsl.lsl.StreamInlet.pull_sample` or + :meth:`~mne_lsl.lsl.StreamInlet.pull_chunk` calls. Pulling a sample without + subscribing to the stream with this method is permitted (the stream will be + opened implicitly). + + Parameters + ---------- + timeout : float | None + Optional timeout (in seconds) of the operation. By default, timeout is + disabled. + + Notes + ----- + Opening a stream is a non-blocking operation. Thus, samples pushed on an outlet + while the stream is not yet open will be missed. + """ + + def close_stream(self) -> None: + """Drop the current data stream. + + All samples that are still buffered or in flight will be dropped and + transmission and buffering of data for this inlet will be stopped. This method + is used if an application stops being interested in data from a source + (temporarily or not) but keeps the outlet alive, to not waste unnecessary system + and network resources. + + .. warning:: + + At the moment, ``liblsl`` is released in version 1.16. Closing and + re-opening a stream does not work and new samples pushed to the outlet do + not arrive at the inlet. c.f. this + `github issue `_. + """ + + def time_correction(self, timeout: Optional[float] = ...) -> float: + """Retrieve an estimated time correction offset for the given stream. + + The first call to this function takes several milliseconds until a reliable + first estimate is obtained. Subsequent calls are instantaneous (and rely on + periodic background updates). The precision of these estimates should be below + 1 ms (empirically within +/-0.2 ms). + + Parameters + ---------- + timeout : float | None + Optional timeout (in seconds) of the operation. By default, timeout is + disabled. + + Returns + ------- + time_correction : float + Current estimate of the time correction. This number needs to be added to a + timestamp that was remotely generated via ``local_clock()`` to map it into + the :func:`~mne_lsl.lsl.local_clock` domain of the client machine. + """ + + def pull_sample( + self, timeout: Optional[float] = ... + ) -> tuple[Union[list[str], NDArray[None]], Optional[float]]: + """Pull a single sample from the inlet. + + Parameters + ---------- + timeout : float | None + Optional timeout (in seconds) of the operation. None correspond to a very + large value, effectively disabling the timeout. ``0.`` makes this function + non-blocking even if no sample is available. See notes for additional + details. + + Returns + ------- + sample : list of str | array of shape (n_channels,) + If the channel format is ``'string``, returns a list of values for each + channel. Else, returns a numpy array of shape ``(n_channels,)``. + timestamp : float | None + Acquisition timestamp on the remote machine. To map the timestamp to the + local clock of the client machine, add the estimated time correction return + by :meth:`~mne_lsl.lsl.StreamInlet.time_correction`. None if no sample was + retrieved. + + Notes + ----- + Note that if ``timeout`` is reached and no sample is available, an empty + ``sample`` is returned and ``timestamp`` is set to None. + """ + + def pull_chunk( + self, timeout: Optional[float] = ..., max_samples: int = ... + ) -> tuple[Union[list[list[str]], NDArray[None]], NDArray[np.float64]]: + """Pull a chunk of samples from the inlet. + + Parameters + ---------- + timeout : float | None + Optional timeout (in seconds) of the operation. None correspond to a very + large value, effectively disabling the timeout. ``0.`` makes this function + non-blocking even if no sample is available. See notes for additional + details. + max_samples : int + Maximum number of samples to return. The function is blocking until this + number of samples is available or until ``timeout`` is reached. See notes + for additional details. + + Returns + ------- + samples : list of list of str | array of shape (n_samples, n_channels) + If the channel format is ``'string'``, returns a list of list of values for + each channel and sample. Each sublist represents an entire channel. Else, + returns a numpy array of shape ``(n_samples, n_channels)``. + timestamps : array of shape (n_samples,) + Acquisition timestamp on the remote machine. To map the timestamp to the + local clock of the client machine, add the estimated time correction return + by :meth:`~mne_lsl.lsl.StreamInlet.time_correction`. + + Notes + ----- + The argument ``timeout`` and ``max_samples`` control the blocking behavior of + the pull operation. If the number of available sample is inferior to + ``n_samples``, the pull operation is blocking until ``timeout`` is reached. + Thus, to return all the available samples at a given time, regardless of the + number of samples requested, ``timeout`` must be set to ``0``. + + Note that if ``timeout`` is reached and no sample is available, empty + ``samples`` and ``timestamps`` arrays are returned. + """ + + def flush(self) -> int: + """Drop all queued and not-yet pulled samples. + + Returns + ------- + n_dropped : int + Number of dropped samples. + """ + + @property + def dtype(self) -> Union[str, DTypeLike]: + """Channel format of a stream. + + All channels in a stream have the same format. + + :type: :class:`~numpy.dtype` | ``"string"`` + """ + + @property + def n_channels(self) -> int: + """Number of channels. + + A stream must have at least one channel. The number of channels remains constant + for all samples. + + :type: :class:`int` + """ + + @property + def name(self) -> str: + """Name of the stream. + + The name of the stream is defined by the application creating the LSL outlet. + Streams with identical names can coexist, at the cost of ambiguity for the + recording application and/or the experimenter. + + :type: :class:`str` + """ + + @property + def sfreq(self) -> float: + """Sampling rate of the stream, according to the source (in Hz). + + If a stream is irregularly sampled, the sampling rate is set to ``0``. + + :type: :class:`float` + """ + + @property + def stype(self) -> str: + """Type of the stream. + + The content type is a short string, such as ``"EEG"``, ``"Gaze"``, ... which + describes the content carried by the channel. If a stream contains mixed + content, this value should be an empty string and the type should be stored in + the description of individual channels. + + :type: :class:`str` + """ + + @property + def samples_available(self) -> int: + """Number of available samples on the :class:`~mne_lsl.lsl.StreamOutlet`. + + :type: :class:`int` + """ + + @property + def was_clock_reset(self) -> bool: + """True if the clock was potentially reset since the last call. + + :type: :class:`bool` + """ + + def get_sinfo(self, timeout: Optional[float] = ...) -> _BaseStreamInfo: + """:class:`~mne_lsl.lsl.StreamInfo` corresponding to this Inlet. + + Parameters + ---------- + timeout : float | None + Optional timeout (in seconds) of the operation. By default, timeout is + disabled. + + Returns + ------- + sinfo : StreamInfo + Description of the stream connected to the inlet. + """ diff --git a/mne_lsl/lsl/stream_outlet.pyi b/mne_lsl/lsl/stream_outlet.pyi new file mode 100644 index 000000000..245c929e2 --- /dev/null +++ b/mne_lsl/lsl/stream_outlet.pyi @@ -0,0 +1,209 @@ +from typing import Optional, Union + +from _typeshed import Incomplete +from numpy.typing import DTypeLike as DTypeLike +from numpy.typing import NDArray as NDArray + +from .._typing import ScalarFloatType as ScalarFloatType +from .._typing import ScalarType as ScalarType +from ..utils._checks import check_type as check_type +from ..utils._checks import ensure_int as ensure_int +from ..utils._docs import copy_doc as copy_doc +from ..utils.logs import logger as logger +from ._utils import check_timeout as check_timeout +from ._utils import handle_error as handle_error +from .constants import fmt2numpy as fmt2numpy +from .constants import fmt2push_chunk as fmt2push_chunk +from .constants import fmt2push_chunk_n as fmt2push_chunk_n +from .constants import fmt2push_sample as fmt2push_sample +from .load_liblsl import lib as lib +from .stream_info import _BaseStreamInfo as _BaseStreamInfo + +class StreamOutlet: + """An outlet to share data and metadata on the network. + + Parameters + ---------- + sinfo : StreamInfo + The :class:`~mne_lsl.lsl.StreamInfo` object describing the stream. Stays + constant over the lifetime of the outlet. + chunk_size : int ``≥ 1`` + The desired chunk granularity in samples. By default, each push operation yields + one chunk. A :class:`~mne_lsl.lsl.StreamInlet` can override this setting. + max_buffered : float ``≥ 0`` + The maximum amount of data to buffer in the Outlet. The number of samples + buffered is ``max_buffered * 100`` if the sampling rate is irregular, else it's + ``max_buffered`` seconds. + """ + + _lock: Incomplete + _dtype: Incomplete + _name: Incomplete + _n_channels: Incomplete + _sfreq: Incomplete + _stype: Incomplete + _do_push_sample: Incomplete + _do_push_chunk: Incomplete + _do_push_chunk_n: Incomplete + _buffer_sample: Incomplete + + def __init__( + self, sinfo: _BaseStreamInfo, chunk_size: int = ..., max_buffered: float = ... + ) -> None: ... + @property + def _obj(self): ... + __obj: Incomplete + + @_obj.setter + def _obj(self, obj) -> None: ... + def __del__(self) -> None: + """Destroy a :class:`~mne_lsl.lsl.StreamOutlet`. + + The outlet will no longer be discoverable after destruction and all connected + inlets will stop delivering data. + """ + + def push_sample( + self, + x: Union[list[str], NDArray[None]], + timestamp: float = ..., + pushThrough: bool = ..., + ) -> None: + """Push a sample into the :class:`~mne_lsl.lsl.StreamOutlet`. + + Parameters + ---------- + x : list | array of shape (n_channels,) + Sample to push, with one element for each channel. If strings are + transmitted, a list is required. If numericals are transmitted, a numpy + array is required. + timestamp : float + The acquisition timestamp of the sample, in agreement with + :func:`mne_lsl.lsl.local_clock`. The default, ``0``, uses the current time. + pushThrough : bool + If True, push the sample through to the receivers instead of buffering it + with subsequent samples. Note that the ``chunk_size`` defined when creating + a :class:`~mne_lsl.lsl.StreamOutlet` takes precedence over the + ``pushThrough`` flag. + """ + + def push_chunk( + self, + x: Union[list[list[str]], NDArray[None]], + timestamp: Optional[Union[float, NDArray[None]]] = ..., + pushThrough: bool = ..., + ) -> None: + """Push a chunk of samples into the :class:`~mne_lsl.lsl.StreamOutlet`. + + Parameters + ---------- + x : list of list | array of shape (n_samples, n_channels) + Samples to push, with one element for each channel at every time point. If + strings are transmitted, a list of sublist containing ``(n_channels,)`` is + required. If numericals are transmitted, a numpy array of shape + ``(n_samples, n_channels)`` is required. + timestamp : float | array of shape (n_samples,) | None + If a float, the acquisition timestamp of the last sample, in agreement with + :func:`mne_lsl.lsl.local_clock`. ``None`` (default) uses the current time. + If an array, the acquisition timestamp of each sample, in agreement with + :func:`mne_lsl.lsl.local_clock`. + pushThrough : bool + If True, push the sample through to the receivers instead of buffering it + with subsequent samples. Note that the ``chunk_size`` defined when creating + a :class:`~mne_lsl.lsl.StreamOutlet` takes precedence over the + ``pushThrough`` flag. + """ + + def wait_for_consumers(self, timeout: Optional[float]) -> bool: + """Wait (block) until at least one :class:`~mne_lsl.lsl.StreamInlet` connects. + + Parameters + ---------- + timeout : float + Timeout duration in seconds. + + Returns + ------- + success : bool + True if the wait was successful, False if the ``timeout`` expired. + + Notes + ----- + This function does not filter the search for :class:`mne_lsl.lsl.StreamInlet`. + Any application inlet will be recognized. + """ + + @property + def dtype(self) -> Union[str, DTypeLike]: + """Channel format of a stream. + + All channels in a stream have the same format. + + :type: :class:`~numpy.dtype` | ``"string"`` + """ + + @property + def n_channels(self) -> int: + """Number of channels. + + A stream must have at least one channel. The number of channels remains constant + for all samples. + + :type: :class:`int` + """ + + @property + def name(self) -> str: + """Name of the stream. + + The name of the stream is defined by the application creating the LSL outlet. + Streams with identical names can coexist, at the cost of ambiguity for the + recording application and/or the experimenter. + + :type: :class:`str` + """ + + @property + def sfreq(self) -> float: + """Sampling rate of the stream, according to the source (in Hz). + + If a stream is irregularly sampled, the sampling rate is set to ``0``. + + :type: :class:`float` + """ + + @property + def stype(self) -> str: + """Type of the stream. + + The content type is a short string, such as ``"EEG"``, ``"Gaze"``, ... which + describes the content carried by the channel. If a stream contains mixed + content, this value should be an empty string and the type should be stored in + the description of individual channels. + + :type: :class:`str` + """ + + @property + def has_consumers(self) -> bool: + """True if at least one :class:`~mne_lsl.lsl.StreamInlet` is connected. + + While it does not hurt, there is technically no reason to push samples if there + is no one connected. + + :type: :class:`bool` + + Notes + ----- + This function does not filter the search for :class:`mne_lsl.lsl.StreamInlet`. + Any application inlet will be recognized. + """ + + def get_sinfo(self) -> _BaseStreamInfo: + """:class:`~mne_lsl.lsl.StreamInfo` corresponding to this Outlet. + + Returns + ------- + sinfo : StreamInfo + Description of the stream connected to the outlet. + """ diff --git a/mne_lsl/player/__init__.pyi b/mne_lsl/player/__init__.pyi new file mode 100644 index 000000000..3fe6bf751 --- /dev/null +++ b/mne_lsl/player/__init__.pyi @@ -0,0 +1 @@ +from .player_lsl import PlayerLSL as PlayerLSL diff --git a/mne_lsl/player/_base.py b/mne_lsl/player/_base.py index 297d9bda2..5e3e8118f 100644 --- a/mne_lsl/player/_base.py +++ b/mne_lsl/player/_base.py @@ -27,7 +27,7 @@ if TYPE_CHECKING: from datetime import datetime from pathlib import Path - from typing import Callable, Optional, Union + from typing import Any, Callable, Optional, Union from mne import Info @@ -309,7 +309,7 @@ def __enter__(self): self.start() return self - def __exit__(self, exc_type, exc_value, exc_tracebac): + def __exit__(self, exc_type: Any, exc_value: Any, exc_traceback: Any): """Context manager exit point.""" if self._streaming_thread is not None: # might have called stop manually self.stop() diff --git a/mne_lsl/player/_base.pyi b/mne_lsl/player/_base.pyi new file mode 100644 index 000000000..f568b4f1a --- /dev/null +++ b/mne_lsl/player/_base.pyi @@ -0,0 +1,303 @@ +from abc import ABC, abstractmethod +from datetime import datetime as datetime +from pathlib import Path +from typing import Any, Callable, Optional, Union + +from _typeshed import Incomplete +from mne import Info +from mne.channels.channels import SetChannelsMixin +from mne.io.meas_info import ContainsMixin + +from ..utils._checks import check_type as check_type +from ..utils._checks import ensure_int as ensure_int +from ..utils._checks import ensure_path as ensure_path +from ..utils._docs import fill_doc as fill_doc +from ..utils.meas_info import _set_channel_units as _set_channel_units + +class BasePlayer(ABC, ContainsMixin, SetChannelsMixin): + """Class for creating a mock real-time stream. + + Parameters + ---------- + fname : path-like + Path to the file to re-play as a mock real-time stream. MNE-Python must be able + to load the file with :func:`mne.io.read_raw`. + chunk_size : int ``≥ 1`` + Number of samples pushed at once on the mock real-time stream. + + Notes + ----- + The file re-played is loaded in memory. Thus, large files are not recommended. Once + the end-of-file is reached, the player loops back to the beginning which can lead to + a small discontinuity in the data stream. + """ + + _fname: Incomplete + _chunk_size: Incomplete + _raw: Incomplete + + @abstractmethod + def __init__(self, fname: Union[str, Path], chunk_size: int = ...): ... + def anonymize( + self, + daysback: Optional[int] = ..., + keep_his: bool = ..., + *, + verbose: Optional[Union[bool, str, int]] = ..., + ) -> None: + """Anonymize the measurement information in-place. + + Parameters + ---------- + daysback : int | None + Number of days to subtract from all dates. + If ``None`` (default), the acquisition date, ``info['meas_date']``, + will be set to ``January 1ˢᵗ, 2000``. This parameter is ignored if + ``info['meas_date']`` is ``None`` (i.e., no acquisition date has been set). + keep_his : bool + If ``True``, ``his_id`` of ``subject_info`` will **not** be overwritten. + Defaults to ``False``. + + .. warning:: This could mean that ``info`` is not fully + anonymized. Use with caution. + verbose : int | str | bool | None + Sets the verbosity level. The verbosity increases gradually between + ``"CRITICAL"``, ``"ERROR"``, ``"WARNING"``, ``"INFO"`` and ``"DEBUG"``. + If None is provided, the verbosity is set to ``"WARNING"``. + If a bool is provided, the verbosity is set to ``"WARNING"`` for False and + to ``"INFO"`` for True. + + Notes + ----- + Removes potentially identifying information if it exists in ``info``. + Specifically for each of the following we use: + + - meas_date, file_id, meas_id + A default value, or as specified by ``daysback``. + - subject_info + Default values, except for 'birthday' which is adjusted + to maintain the subject age. + - experimenter, proj_name, description + Default strings. + - utc_offset + ``None``. + - proj_id + Zeros. + - proc_history + Dates use the ``meas_date`` logic, and experimenter a default string. + - helium_info, device_info + Dates use the ``meas_date`` logic, meta info uses defaults. + + If ``info['meas_date']`` is ``None``, it will remain ``None`` during processing + the above fields. + + Operates in place. + """ + + def get_channel_units( + self, picks: Incomplete | None = ..., only_data_chs: bool = ... + ) -> list[tuple[int, int]]: + """Get a list of channel unit for each channel. + + Parameters + ---------- + picks : str | array-like | slice | None + Channels to include. Slices and lists of integers will be interpreted as + channel indices. In lists, channel *type* strings (e.g., ``['meg', + 'eeg']``) will pick channels of those types, channel *name* strings (e.g., + ``['MEG0111', 'MEG2623']`` will pick the given channels. Can also be the + string values "all" to pick all channels, or "data" to pick :term:`data + channels`. None (default) will pick all channels. Note that channels in + ``info['bads']`` *will be included* if their names or indices are + explicitly provided. + only_data_chs : bool + Whether to ignore non-data channels. Default is ``False``. + + Returns + ------- + channel_units : list of tuple of shape (2,) + A list of 2-element tuples. The first element contains the unit FIFF code + and its associated name, e.g. ``107 (FIFF_UNIT_V)`` for Volts. The second + element contains the unit multiplication factor, e.g. ``-6 (FIFF_UNITM_MU)`` + for micro (corresponds to ``1e-6``). + """ + + @abstractmethod + def rename_channels( + self, + mapping: Union[dict[str, str], Callable], + allow_duplicates: bool = ..., + *, + verbose: Optional[Union[bool, str, int]] = ..., + ) -> None: + """Rename channels. + + Parameters + ---------- + mapping : dict | callable + A dictionary mapping the old channel to a new channel name e.g. + ``{'EEG061' : 'EEG161'}``. Can also be a callable function that takes and + returns a string. + allow_duplicates : bool + If True (default False), allow duplicates, which will automatically be + renamed with ``-N`` at the end. + verbose : int | str | bool | None + Sets the verbosity level. The verbosity increases gradually between + ``"CRITICAL"``, ``"ERROR"``, ``"WARNING"``, ``"INFO"`` and ``"DEBUG"``. + If None is provided, the verbosity is set to ``"WARNING"``. + If a bool is provided, the verbosity is set to ``"WARNING"`` for False and + to ``"INFO"`` for True. + """ + + @abstractmethod + def start(self) -> None: + """Start streaming data.""" + + @abstractmethod + def set_channel_types( + self, + mapping: dict[str, str], + *, + on_unit_change: str = ..., + verbose: Optional[Union[bool, str, int]] = ..., + ) -> None: + """Define the sensor type of channels. + + If the new channel type changes the unit type, e.g. from ``T/m`` to ``V``, the + unit multiplication factor is reset to ``0``. Use + ``Player.set_channel_units`` to change the multiplication factor, e.g. from + ``0`` to ``-6`` to change from Volts to microvolts. + + Parameters + ---------- + mapping : dict + A dictionary mapping a channel to a sensor type (str), e.g., + ``{'EEG061': 'eog'}`` or ``{'EEG061': 'eog', 'TRIGGER': 'stim'}``. + on_unit_change : ``'raise'`` | ``'warn'`` | ``'ignore'`` + What to do if the measurement unit of a channel is changed automatically to + match the new sensor type. + + .. versionadded:: MNE 1.4 + verbose : int | str | bool | None + Sets the verbosity level. The verbosity increases gradually between + ``"CRITICAL"``, ``"ERROR"``, ``"WARNING"``, ``"INFO"`` and ``"DEBUG"``. + If None is provided, the verbosity is set to ``"WARNING"``. + If a bool is provided, the verbosity is set to ``"WARNING"`` for False and + to ``"INFO"`` for True. + """ + + @abstractmethod + def set_channel_units(self, mapping: dict[str, Union[str, int]]) -> None: + """Define the channel unit multiplication factor. + + By convention, MNE stores data in SI units. But systems often stream in non-SI + units. For instance, EEG amplifiers often stream in microvolts. Thus, to mock a + stream from an MNE-compatible file, the data might need to be scale to match + the unit of the system to mock. This function will both change the unit + multiplication factor and rescale the associated data. + + The unit itself is defined by the sensor type. Change the channel type in the + ``raw`` recording with :meth:`mne.io.Raw.set_channel_types` before providing the + recording to the player. + + Parameters + ---------- + mapping : dict + A dictionary mapping a channel to a unit, e.g. ``{'EEG061': 'microvolts'}``. + The unit can be given as a human-readable string or as a unit multiplication + factor, e.g. ``-6`` for microvolts corresponding to ``1e-6``. + + Notes + ----- + If the human-readable unit of your channel is not yet supported by MNE-LSL, + please contact the developers on GitHub to add your units to the known set. + """ + + def set_meas_date( + self, meas_date: Optional[Union[datetime, float, tuple[float, float]]] + ) -> None: + """Set the measurement start date. + + Parameters + ---------- + meas_date : datetime | float | tuple | None + The new measurement date. + If datetime object, it must be timezone-aware and in UTC. + A tuple of (seconds, microseconds) or float (alias for + ``(meas_date, 0)``) can also be passed and a datetime + object will be automatically created. If None, will remove + the time reference. + + See Also + -------- + anonymize + """ + _interrupt: bool + + @abstractmethod + def stop(self) -> None: + """Stop streaming data on the mock real-time stream.""" + + def _check_not_started(self, name: str): + """Check that the player is not started before calling the function 'name'.""" + + @abstractmethod + def _stream(self) -> None: + """Push a chunk of data from the raw object to the real-time stream. + + Don't use raw.get_data but indexing which is faster. + + >>> [In] %timeit raw[:, 0:16][0] + >>> 19 µs ± 50.3 ns per loo + >>> [In] %timeit raw.get_data(start=0, stop=16) + >>> 1.3 ms ± 1.01 µs per loop + >>> [In] %timeit np.ascontiguousarray(raw[:, 0:16][0].T) + >>> 23.7 µs ± 183 ns per loop + """ + _start_idx: int + _streaming_delay: Incomplete + _streaming_thread: Incomplete + + def _reset_variables(self) -> None: + """Reset variables for streaming.""" + + def __del__(self) -> None: + """Delete the player.""" + + def __enter__(self): + """Context manager entry point.""" + + def __exit__(self, exc_type: Any, exc_value: Any, exc_traceback: Any): + """Context manager exit point.""" + + @staticmethod + def __repr__(self) -> str: + """Representation of the instance.""" + + @property + def ch_names(self) -> list[str]: + """Name of the channels. + + :type: :class:`list` of :class:`str` + """ + + @property + def chunk_size(self) -> int: + """Number of samples in a chunk. + + :type: :class:`int` + """ + + @property + def fname(self) -> Path: + """Path to file played. + + :type: :class:`~pathlib.Path` + """ + + @property + def info(self) -> Info: + """Info of the real-time stream. + + :type: :class:`~mne.Info` + """ diff --git a/mne_lsl/player/player_lsl.py b/mne_lsl/player/player_lsl.py index 1a14cb9a4..4a8fe94a3 100644 --- a/mne_lsl/player/player_lsl.py +++ b/mne_lsl/player/player_lsl.py @@ -65,7 +65,7 @@ def rename_channels( mapping: Union[dict[str, str], Callable], allow_duplicates: bool = False, *, - verbose=None, + verbose: Optional[Union[bool, str, int]] = None, ) -> None: super().rename_channels(mapping, allow_duplicates) self._sinfo.set_channel_names(self.info["ch_names"]) @@ -89,7 +89,11 @@ def start(self) -> None: @copy_doc(BasePlayer.set_channel_types) def set_channel_types( - self, mapping: dict[str, str], *, on_unit_change: str = "warn", verbose=None + self, + mapping: dict[str, str], + *, + on_unit_change: str = "warn", + verbose: Optional[Union[bool, str, int]] = None, ) -> None: super().set_channel_types( mapping, on_unit_change=on_unit_change, verbose=verbose diff --git a/mne_lsl/player/player_lsl.pyi b/mne_lsl/player/player_lsl.pyi new file mode 100644 index 000000000..88683028e --- /dev/null +++ b/mne_lsl/player/player_lsl.pyi @@ -0,0 +1,164 @@ +from pathlib import Path as Path +from typing import Callable, Optional, Union + +from _typeshed import Incomplete + +from ..lsl import StreamInfo as StreamInfo +from ..lsl import StreamOutlet as StreamOutlet +from ..lsl import local_clock as local_clock +from ..utils._checks import check_type as check_type +from ..utils._docs import copy_doc as copy_doc +from ..utils.logs import logger as logger +from ._base import BasePlayer as BasePlayer + +class PlayerLSL(BasePlayer): + """Class for creating a mock LSL stream. + + Parameters + ---------- + fname : path-like + Path to the file to re-play as a mock LSL stream. MNE-Python must be able to + load the file with :func:`mne.io.read_raw`. + chunk_size : int ``≥ 1`` + Number of samples pushed at once on the :class:`~mne_lsl.lsl.StreamOutlet`. + If these chunks are too small then the thread-based timing might not work + properly. + name : str | None + Name of the mock LSL stream. If ``None``, the name ``MNE-LSL-Player`` is used. + + Notes + ----- + The file re-played is loaded in memory. Thus, large files are not recommended. Once + the end-of-file is reached, the player loops back to the beginning which can lead to + a small discontinuity in the data stream. + """ + + _name: Incomplete + _sinfo: Incomplete + + def __init__( + self, fname: Union[str, Path], chunk_size: int = ..., name: Optional[str] = ... + ) -> None: ... + def rename_channels( + self, + mapping: Union[dict[str, str], Callable], + allow_duplicates: bool = ..., + *, + verbose: Optional[Union[bool, str, int]] = ..., + ) -> None: + """Rename channels. + + Parameters + ---------- + mapping : dict | callable + A dictionary mapping the old channel to a new channel name e.g. + ``{'EEG061' : 'EEG161'}``. Can also be a callable function that takes and + returns a string. + allow_duplicates : bool + If True (default False), allow duplicates, which will automatically be + renamed with ``-N`` at the end. + verbose : int | str | bool | None + Sets the verbosity level. The verbosity increases gradually between + ``"CRITICAL"``, ``"ERROR"``, ``"WARNING"``, ``"INFO"`` and ``"DEBUG"``. + If None is provided, the verbosity is set to ``"WARNING"``. + If a bool is provided, the verbosity is set to ``"WARNING"`` for False and + to ``"INFO"`` for True. + """ + _outlet: Incomplete + _streaming_delay: Incomplete + _streaming_thread: Incomplete + _target_timestamp: Incomplete + + def start(self) -> None: + """Start streaming data on the LSL :class:`~mne_lsl.lsl.StreamOutlet`.""" + + def set_channel_types( + self, + mapping: dict[str, str], + *, + on_unit_change: str = ..., + verbose: Optional[Union[bool, str, int]] = ..., + ) -> None: + """Define the sensor type of channels. + + If the new channel type changes the unit type, e.g. from ``T/m`` to ``V``, the + unit multiplication factor is reset to ``0``. Use + ``Player.set_channel_units`` to change the multiplication factor, e.g. from + ``0`` to ``-6`` to change from Volts to microvolts. + + Parameters + ---------- + mapping : dict + A dictionary mapping a channel to a sensor type (str), e.g., + ``{'EEG061': 'eog'}`` or ``{'EEG061': 'eog', 'TRIGGER': 'stim'}``. + on_unit_change : ``'raise'`` | ``'warn'`` | ``'ignore'`` + What to do if the measurement unit of a channel is changed automatically to + match the new sensor type. + + .. versionadded:: MNE 1.4 + verbose : int | str | bool | None + Sets the verbosity level. The verbosity increases gradually between + ``"CRITICAL"``, ``"ERROR"``, ``"WARNING"``, ``"INFO"`` and ``"DEBUG"``. + If None is provided, the verbosity is set to ``"WARNING"``. + If a bool is provided, the verbosity is set to ``"WARNING"`` for False and + to ``"INFO"`` for True. + """ + + def set_channel_units(self, mapping: dict[str, Union[str, int]]) -> None: + """Define the channel unit multiplication factor. + + By convention, MNE stores data in SI units. But systems often stream in non-SI + units. For instance, EEG amplifiers often stream in microvolts. Thus, to mock a + stream from an MNE-compatible file, the data might need to be scale to match + the unit of the system to mock. This function will both change the unit + multiplication factor and rescale the associated data. + + The unit itself is defined by the sensor type. Change the channel type in the + ``raw`` recording with :meth:`mne.io.Raw.set_channel_types` before providing the + recording to the player. + + Parameters + ---------- + mapping : dict + A dictionary mapping a channel to a unit, e.g. ``{'EEG061': 'microvolts'}``. + The unit can be given as a human-readable string or as a unit multiplication + factor, e.g. ``-6`` for microvolts corresponding to ``1e-6``. + + Notes + ----- + If the human-readable unit of your channel is not yet supported by MNE-LSL, + please contact the developers on GitHub to add your units to the known set. + """ + + def stop(self) -> None: + """Stop streaming data on the LSL :class:`~mne_lsl.lsl.StreamOutlet`.""" + _start_idx: Incomplete + + def _stream(self) -> None: + """Push a chunk of data from the raw object to the real-time stream. + + Don't use raw.get_data but indexing which is faster. + + >>> [In] %timeit raw[:, 0:16][0] + >>> 19 µs ± 50.3 ns per loo + >>> [In] %timeit raw.get_data(start=0, stop=16) + >>> 1.3 ms ± 1.01 µs per loop + >>> [In] %timeit np.ascontiguousarray(raw[:, 0:16][0].T) + >>> 23.7 µs ± 183 ns per loop + """ + + def _reset_variables(self) -> None: + """Reset variables for streaming.""" + + def __del__(self) -> None: + """Delete the player and destroy the :class:`~mne_lsl.lsl.StreamOutlet`.""" + + def __repr__(self) -> str: + """Representation of the instance.""" + + @property + def name(self) -> str: + """Name of the LSL stream. + + :type: :class:`str` + """ diff --git a/mne_lsl/stream/__init__.pyi b/mne_lsl/stream/__init__.pyi new file mode 100644 index 000000000..1b4ec25df --- /dev/null +++ b/mne_lsl/stream/__init__.pyi @@ -0,0 +1 @@ +from .stream_lsl import StreamLSL as StreamLSL diff --git a/mne_lsl/stream/_base.pyi b/mne_lsl/stream/_base.pyi new file mode 100644 index 000000000..ad81fd44f --- /dev/null +++ b/mne_lsl/stream/_base.pyi @@ -0,0 +1,609 @@ +from abc import ABC, abstractmethod +from collections.abc import Generator +from datetime import datetime as datetime +from typing import Callable, Optional, Union + +import numpy as np +from _typeshed import Incomplete +from mne import Info +from mne.channels import DigMontage as DigMontage +from mne.channels.channels import SetChannelsMixin +from mne.io.meas_info import ContainsMixin +from numpy.typing import DTypeLike as DTypeLike +from numpy.typing import NDArray + +from .._typing import ScalarIntType as ScalarIntType +from .._typing import ScalarType as ScalarType +from ..utils._checks import check_type as check_type +from ..utils._checks import check_value as check_value +from ..utils._docs import copy_doc as copy_doc +from ..utils._docs import fill_doc as fill_doc +from ..utils.logs import logger as logger +from ..utils.meas_info import _HUMAN_UNITS as _HUMAN_UNITS +from ..utils.meas_info import _set_channel_units as _set_channel_units + +class BaseStream(ABC, ContainsMixin, SetChannelsMixin): + """Stream object representing a single real-time stream. + + Parameters + ---------- + bufsize : float | int + Size of the buffer keeping track of the data received from the stream. If + the stream sampling rate ``sfreq`` is regular, ``bufsize`` is expressed in + seconds. The buffer will hold the last ``bufsize * sfreq`` samples (ceiled). + If the stream sampling rate ``sfreq`` is irregular, ``bufsize`` is + expressed in samples. The buffer will hold the last ``bufsize`` samples. + """ + + _bufsize: Incomplete + + @abstractmethod + def __init__(self, bufsize: float): ... + def __contains__(self, ch_type: str) -> bool: + """Check channel type membership. + + Parameters + ---------- + ch_type : str + Channel type to check for. Can be e.g. ``'meg'``, ``'eeg'``, + ``'stim'``, etc. + + Returns + ------- + in : bool + Whether or not the instance contains the given channel type. + + Examples + -------- + Channel type membership can be tested as:: + + >>> 'meg' in inst # doctest: +SKIP + True + >>> 'seeg' in inst # doctest: +SKIP + False + + """ + + def __del__(self) -> None: + """Try to disconnect the stream when deleting the object.""" + + @abstractmethod + def __repr__(self) -> str: + """Representation of the instance.""" + _buffer: Incomplete + + def add_reference_channels( + self, + ref_channels: Union[str, list[str], tuple[str]], + ref_units: Optional[ + Union[str, int, list[Union[str, int]], tuple[Union[str, int]]] + ] = ..., + ) -> None: + """Add EEG reference channels to data that consists of all zeros. + + Adds EEG reference channels that are not part of the streamed data. This is + useful when you need to re-reference your data to different channels. These + added channels will consist of all zeros. + + Parameters + ---------- + ref_channels : str | list of str + Name of the electrode(s) which served as the reference in the + recording. If a name is provided, a corresponding channel is added + and its data is set to 0. This is useful for later re-referencing. + ref_units : str | int | list of str | list of int | None + The unit or unit multiplication factor of the reference channels. The unit + can be given as a human-readable string or as a unit multiplication factor, + e.g. ``-6`` for microvolts corresponding to ``1e-6``. + If not provided, the added EEG reference channel has a unit multiplication + factor set to ``0`` which corresponds to Volts. Use + ``Stream.set_channel_units`` to change the unit multiplication factor. + """ + + def anonymize( + self, + daysback: Optional[int] = ..., + keep_his: bool = ..., + *, + verbose: Optional[Union[bool, str, int]] = ..., + ) -> None: + """Anonymize the measurement information in-place. + + Parameters + ---------- + daysback : int | None + Number of days to subtract from all dates. + If ``None`` (default), the acquisition date, ``info['meas_date']``, + will be set to ``January 1ˢᵗ, 2000``. This parameter is ignored if + ``info['meas_date']`` is ``None`` (i.e., no acquisition date has been set). + keep_his : bool + If ``True``, ``his_id`` of ``subject_info`` will **not** be overwritten. + Defaults to ``False``. + + .. warning:: This could mean that ``info`` is not fully + anonymized. Use with caution. + verbose : int | str | bool | None + Sets the verbosity level. The verbosity increases gradually between + ``"CRITICAL"``, ``"ERROR"``, ``"WARNING"``, ``"INFO"`` and ``"DEBUG"``. + If None is provided, the verbosity is set to ``"WARNING"``. + If a bool is provided, the verbosity is set to ``"WARNING"`` for False and + to ``"INFO"`` for True. + + Notes + ----- + Removes potentially identifying information if it exists in ``info``. + Specifically for each of the following we use: + + - meas_date, file_id, meas_id + A default value, or as specified by ``daysback``. + - subject_info + Default values, except for 'birthday' which is adjusted + to maintain the subject age. + - experimenter, proj_name, description + Default strings. + - utc_offset + ``None``. + - proj_id + Zeros. + - proc_history + Dates use the ``meas_date`` logic, and experimenter a default string. + - helium_info, device_info + Dates use the ``meas_date`` logic, meta info uses defaults. + + If ``info['meas_date']`` is ``None``, it will remain ``None`` during processing + the above fields. + + Operates in place. + """ + _acquisition_delay: Incomplete + _n_new_samples: int + + @abstractmethod + def connect(self, acquisition_delay: float) -> None: + """Connect to the stream and initiate data collection in the buffer. + + Parameters + ---------- + acquisition_delay : float + Delay in seconds between 2 acquisition during which chunks of data are + pulled from the connected device. + """ + _interrupt: bool + + @abstractmethod + def disconnect(self) -> None: + """Disconnect from the LSL stream and interrupt data collection.""" + + def drop_channels(self, ch_names: Union[str, list[str], tuple[str]]) -> None: + """Drop channel(s). + + Parameters + ---------- + ch_names : str | list of str + Name or list of names of channels to remove. + + See Also + -------- + pick + """ + + def filter(self) -> None: + """Filter the stream. Not implemented.""" + + def get_channel_types( + self, + picks: Incomplete | None = ..., + unique: bool = ..., + only_data_chs: bool = ..., + ) -> list[str]: + """Get a list of channel type for each channel. + + Parameters + ---------- + picks : str | array-like | slice | None + Channels to include. Slices and lists of integers will be interpreted as + channel indices. In lists, channel *type* strings (e.g., ``['meg', + 'eeg']``) will pick channels of those types, channel *name* strings (e.g., + ``['MEG0111', 'MEG2623']`` will pick the given channels. Can also be the + string values "all" to pick all channels, or "data" to pick :term:`data + channels`. None (default) will pick all channels. Note that channels in + ``info['bads']`` *will be included* if their names or indices are + explicitly provided. + unique : bool + Whether to return only unique channel types. Default is ``False``. + only_data_chs : bool + Whether to ignore non-data channels. Default is ``False``. + + Returns + ------- + channel_types : list + The channel types. + """ + + def get_channel_units( + self, picks: Incomplete | None = ..., only_data_chs: bool = ... + ) -> list[tuple[int, int]]: + """Get a list of channel unit for each channel. + + Parameters + ---------- + picks : str | array-like | slice | None + Channels to include. Slices and lists of integers will be interpreted as + channel indices. In lists, channel *type* strings (e.g., ``['meg', + 'eeg']``) will pick channels of those types, channel *name* strings (e.g., + ``['MEG0111', 'MEG2623']`` will pick the given channels. Can also be the + string values "all" to pick all channels, or "data" to pick :term:`data + channels`. None (default) will pick all channels. Note that channels in + ``info['bads']`` *will be included* if their names or indices are + explicitly provided. + only_data_chs : bool + Whether to ignore non-data channels. Default is ``False``. + + Returns + ------- + channel_units : list of tuple of shape (2,) + A list of 2-element tuples. The first element contains the unit FIFF code + and its associated name, e.g. ``107 (FIFF_UNIT_V)`` for Volts. The second + element contains the unit multiplication factor, e.g. ``-6 (FIFF_UNITM_MU)`` + for micro (corresponds to ``1e-6``). + """ + + def get_data( + self, + winsize: Optional[float] = ..., + picks: Optional[Union[str, list[str], list[int], NDArray[None]]] = ..., + ) -> tuple[NDArray[None], NDArray[np.float64]]: + """Retrieve the latest data from the buffer. + + Parameters + ---------- + winsize : float | int | None + Size of the window of data to view. If the stream sampling rate ``sfreq`` is + regular, ``winsize`` is expressed in seconds. The window will view the last + ``winsize * sfreq`` samples (ceiled) from the buffer. If the stream sampling + sampling rate ``sfreq`` is irregular, ``winsize`` is expressed in samples. + The window will view the last ``winsize`` samples. If ``None``, the entire + buffer is returned. + picks : str | array-like | slice | None + Channels to include. Slices and lists of integers will be interpreted as + channel indices. In lists, channel *type* strings (e.g., ``['meg', + 'eeg']``) will pick channels of those types, channel *name* strings (e.g., + ``['MEG0111', 'MEG2623']`` will pick the given channels. Can also be the + string values "all" to pick all channels, or "data" to pick :term:`data + channels`. None (default) will pick all channels. Note that channels in + ``info['bads']`` *will be included* if their names or indices are + explicitly provided. + + Returns + ------- + data : array of shape (n_channels, n_samples) + Data in the given window. + timestamps : array of shape (n_samples,) + Timestamps in the given window. + + Notes + ----- + The number of newly available samples stored in the property ``n_new_samples`` + is reset at every function call, even if all channels were not selected with + the argument ``picks``. + """ + + def get_montage(self) -> Optional[DigMontage]: + """Get a DigMontage from instance. + + Returns + ------- + + montage : None | str | DigMontage + A montage containing channel positions. If a string or + :class:`~mne.channels.DigMontage` is + specified, the existing channel information will be updated with the + channel positions from the montage. Valid strings are the names of the + built-in montages that ship with MNE-Python; you can list those via + :func:`mne.channels.get_builtin_montages`. + If ``None`` (default), the channel positions will be removed from the + :class:`~mne.Info`. + """ + + def plot(self) -> None: + """Open a real-time stream viewer. Not implemented.""" + + def pick(self, picks, exclude=...) -> None: + """Pick a subset of channels. + + Parameters + ---------- + picks : str | array-like | slice | None + Channels to include. Slices and lists of integers will be interpreted as + channel indices. In lists, channel *type* strings (e.g., ``['meg', + 'eeg']``) will pick channels of those types, channel *name* strings (e.g., + ``['MEG0111', 'MEG2623']`` will pick the given channels. Can also be the + string values "all" to pick all channels, or "data" to pick :term:`data + channels`. None (default) will pick all channels. Note that channels in + ``info['bads']`` *will be included* if their names or indices are + explicitly provided. + exclude : str | list of str + Set of channels to exclude, only used when picking is based on types, e.g. + ``exclude='bads'`` when ``picks="meg"``. + + See Also + -------- + drop_channels + + Notes + ----- + Contrary to MNE-Python, re-ordering channels is not supported in ``MNE-LSL``. + Thus, if explicit channel names are provided in ``picks``, they are sorted to + match the order of existing channel names. + """ + + def record(self) -> None: + """Record the stream data to disk. Not implemented.""" + + def rename_channels( + self, + mapping: Union[dict[str, str], Callable], + allow_duplicates: bool = ..., + *, + verbose: Optional[Union[bool, str, int]] = ..., + ) -> None: + """Rename channels. + + Parameters + ---------- + mapping : dict | callable + A dictionary mapping the old channel to a new channel name e.g. + ``{'EEG061' : 'EEG161'}``. Can also be a callable function that takes and + returns a string. + allow_duplicates : bool + If True (default False), allow duplicates, which will automatically be + renamed with ``-N`` at the end. + verbose : int | str | bool | None + Sets the verbosity level. The verbosity increases gradually between + ``"CRITICAL"``, ``"ERROR"``, ``"WARNING"``, ``"INFO"`` and ``"DEBUG"``. + If None is provided, the verbosity is set to ``"WARNING"``. + If a bool is provided, the verbosity is set to ``"WARNING"`` for False and + to ``"INFO"`` for True. + """ + + def set_bipolar_reference(self) -> None: + """Set a bipolar reference. Not implemented.""" + + def set_channel_types( + self, + mapping: dict[str, str], + *, + on_unit_change: str = ..., + verbose: Optional[Union[bool, str, int]] = ..., + ) -> None: + """Define the sensor type of channels. + + If the new channel type changes the unit type, e.g. from ``T/m`` to ``V``, the + unit multiplication factor is reset to ``0``. Use + ``Stream.set_channel_units`` to change the multiplication factor, e.g. from + ``0`` to ``-6`` to change from Volts to microvolts. + + Parameters + ---------- + mapping : dict + A dictionary mapping a channel to a sensor type (str), e.g., + ``{'EEG061': 'eog'}`` or ``{'EEG061': 'eog', 'TRIGGER': 'stim'}``. + on_unit_change : ``'raise'`` | ``'warn'`` | ``'ignore'`` + What to do if the measurement unit of a channel is changed automatically to + match the new sensor type. + + .. versionadded:: MNE 1.4 + verbose : int | str | bool | None + Sets the verbosity level. The verbosity increases gradually between + ``"CRITICAL"``, ``"ERROR"``, ``"WARNING"``, ``"INFO"`` and ``"DEBUG"``. + If None is provided, the verbosity is set to ``"WARNING"``. + If a bool is provided, the verbosity is set to ``"WARNING"`` for False and + to ``"INFO"`` for True. + """ + + def set_channel_units(self, mapping: dict[str, Union[str, int]]) -> None: + """Define the channel unit multiplication factor. + + The unit itself is defined by the sensor type. Use + ``Stream.set_channel_types`` to change the channel type, e.g. from planar + gradiometers in ``T/m`` to EEG in ``V``. + + Parameters + ---------- + mapping : dict + A dictionary mapping a channel to a unit, e.g. ``{'EEG061': 'microvolts'}``. + The unit can be given as a human-readable string or as a unit multiplication + factor, e.g. ``-6`` for microvolts corresponding to ``1e-6``. + + Notes + ----- + If the human-readable unit of your channel is not yet supported by MNE-LSL, + please contact the developers on GitHub to add your units to the known set. + """ + _ref_channels: Incomplete + _ref_from: Incomplete + + def set_eeg_reference( + self, + ref_channels: Union[str, list[str], tuple[str]], + ch_type: Union[str, list[str], tuple[str]] = ..., + ) -> None: + """Specify which reference to use for EEG-like data. + + Use this function to explicitly specify the desired reference for EEG-like + channels. This can be either an existing electrode or a new virtual channel + added with ``Stream.add_reference_channels``. This function will re-reference + the data in the ringbuffer according to the desired reference. + + Parameters + ---------- + ref_channels : str | list of str + Name(s) of the channel(s) used to construct the reference. Can also be set + to ``'average'`` to apply a common average reference. + ch_type : str | list of str + The name of the channel type to apply the reference to. Valid channel types + are ``'eeg'``, ``'ecog'``, ``'seeg'``, ``'dbs'``. + """ + + def set_meas_date( + self, meas_date: Optional[Union[datetime, float, tuple[float]]] + ) -> None: + """Set the measurement start date. + + Parameters + ---------- + meas_date : datetime | float | tuple | None + The new measurement date. + If datetime object, it must be timezone-aware and in UTC. + A tuple of (seconds, microseconds) or float (alias for + ``(meas_date, 0)``) can also be passed and a datetime + object will be automatically created. If None, will remove + the time reference. + + See Also + -------- + anonymize + """ + + def set_montage( + self, + montage: Optional[Union[str, DigMontage]], + match_case: bool = ..., + match_alias: Union[bool, dict[str, str]] = ..., + on_missing: str = ..., + *, + verbose: Optional[Union[bool, str, int]] = ..., + ) -> None: + """Set EEG/sEEG/ECoG/DBS/fNIRS channel positions and digitization points. + + Parameters + ---------- + montage : None | str | DigMontage + A montage containing channel positions. If a string or + :class:`~mne.channels.DigMontage` is + specified, the existing channel information will be updated with the + channel positions from the montage. Valid strings are the names of the + built-in montages that ship with MNE-Python; you can list those via + :func:`mne.channels.get_builtin_montages`. + If ``None`` (default), the channel positions will be removed from the + :class:`~mne.Info`. + match_case : bool + If True (default), channel name matching will be case sensitive. + + .. versionadded:: MNE 0.20 + match_alias : bool | dict + Whether to use a lookup table to match unrecognized channel location names + to their known aliases. If True, uses the mapping in + ``mne.io.constants.CHANNEL_LOC_ALIASES``. If a :class:`dict` is passed, it + will be used instead, and should map from non-standard channel names to + names in the specified ``montage``. Default is ``False``. + + .. versionadded:: MNE 0.23 + on_missing : 'raise' | 'warn' | 'ignore' + Can be ``'raise'`` (default) to raise an error, ``'warn'`` to emit a + warning, or ``'ignore'`` to ignore when channels have missing coordinates. + + .. versionadded:: MNE 0.20.1 + verbose : int | str | bool | None + Sets the verbosity level. The verbosity increases gradually between + ``"CRITICAL"``, ``"ERROR"``, ``"WARNING"``, ``"INFO"`` and ``"DEBUG"``. + If None is provided, the verbosity is set to ``"WARNING"``. + If a bool is provided, the verbosity is set to ``"WARNING"`` for False and + to ``"INFO"`` for True. + + See Also + -------- + mne.channels.make_standard_montage + mne.channels.make_dig_montage + mne.channels.read_custom_montage + + Notes + ----- + .. warning:: + + Only EEG/sEEG/ECoG/DBS/fNIRS channels can have their positions set using a + montage. Other channel types (e.g., MEG channels) should have their + positions defined properly using their data reading functions. + """ + + @staticmethod + def _acquire(self) -> None: + """Update function pulling new samples in the buffer at a regular interval.""" + + def _check_connected(self, name: str): + """Check that the stream is connected before calling the function 'name'.""" + + def _check_connected_and_regular_sampling(self, name: str): + """Check that the stream has a regular sampling rate.""" + _acquisition_thread: Incomplete + + def _create_acquisition_thread(self, delay: float) -> None: + """Create and start the daemonic acquisition thread. + + Parameters + ---------- + delay : float + Delay after which the thread will call the acquire function. + """ + + def _interrupt_acquisition(self) -> Generator[None, None, None]: + """Context manager interrupting the acquisition thread.""" + _info: Incomplete + _picks_inlet: Incomplete + + def _pick(self, picks: NDArray[None]) -> None: + """Interrupt acquisition and apply the channel selection.""" + _added_channels: Incomplete + _timestamps: Incomplete + + @abstractmethod + def _reset_variables(self) -> None: + """Reset variables define after connection.""" + + @property + def compensation_grade(self) -> Optional[int]: + """The current gradient compensation grade. + + :type: :class:`int` | None + """ + + @property + def ch_names(self) -> list[str]: + """Name of the channels. + + :type: :class:`list` of :class:`str` + """ + + @property + def connected(self) -> bool: + """Connection status of the stream. + + :type: :class:`bool` + """ + + @property + def dtype(self) -> Optional[DTypeLike]: + """Channel format of the stream.""" + + @property + def info(self) -> Info: + """Info of the LSL stream. + + :type: :class:`~mne.Info` + """ + + @property + def n_buffer(self) -> int: + """Number of samples that can be stored in the buffer. + + :type: :class:`int` + """ + + @property + def n_new_samples(self) -> int: + """Number of new samples available in the buffer. + + The number of new samples is reset at every ``Stream.get_data`` call. + + :type: :class:`int` + """ diff --git a/mne_lsl/stream/stream_lsl.pyi b/mne_lsl/stream/stream_lsl.pyi new file mode 100644 index 000000000..f0ad30cff --- /dev/null +++ b/mne_lsl/stream/stream_lsl.pyi @@ -0,0 +1,147 @@ +from typing import Optional, Sequence, Union + +from _typeshed import Incomplete + +from mne_lsl.lsl.stream_info import _BaseStreamInfo as _BaseStreamInfo + +from ..lsl import StreamInlet as StreamInlet +from ..lsl import resolve_streams as resolve_streams +from ..lsl.constants import fmt2numpy as fmt2numpy +from ..utils._checks import check_type as check_type +from ..utils._docs import fill_doc as fill_doc +from ..utils.logs import logger as logger +from ._base import BaseStream as BaseStream + +class StreamLSL(BaseStream): + """Stream object representing a single LSL stream. + + Parameters + ---------- + bufsize : float | int + Size of the buffer keeping track of the data received from the stream. If + the stream sampling rate ``sfreq`` is regular, ``bufsize`` is expressed in + seconds. The buffer will hold the last ``bufsize * sfreq`` samples (ceiled). + If the stream sampling rate ``sfreq`` is irregular, ``bufsize`` is + expressed in samples. The buffer will hold the last ``bufsize`` samples. + name : str + Name of the LSL stream. + stype : str + Type of the LSL stream. + source_id : str + ID of the source of the LSL stream. + + Notes + ----- + The 3 arguments ``name``, ``stype``, and ``source_id`` must uniquely identify an + LSL stream. If this is not possible, please resolve the available LSL streams + with :func:`mne_lsl.lsl.resolve_streams` and create an inlet with + :class:`~mne_lsl.lsl.StreamInlet`. + """ + + _name: Incomplete + _stype: Incomplete + _source_id: Incomplete + + def __init__( + self, + bufsize: float, + name: Optional[str] = ..., + stype: Optional[str] = ..., + source_id: Optional[str] = ..., + ) -> None: ... + def __repr__(self) -> str: + """Representation of the instance.""" + _inlet: Incomplete + _sinfo: Incomplete + _info: Incomplete + _buffer: Incomplete + _timestamps: Incomplete + _picks_inlet: Incomplete + + def connect( + self, + acquisition_delay: float = ..., + processing_flags: Optional[Union[str, Sequence[str]]] = ..., + timeout: Optional[float] = ..., + ) -> None: + """Connect to the LSL stream and initiate data collection in the buffer. + + Parameters + ---------- + acquisition_delay : float + Delay in seconds between 2 acquisition during which chunks of data are + pulled from the :class:`~mne_lsl.lsl.StreamInlet`. + processing_flags : list of str | ``'all'`` | None + Set the post-processing options. By default, post-processing is disabled. + Any combination of the processing flags is valid. The available flags are: + + * ``'clocksync'``: Automatic clock synchronization, equivalent to + manually adding the estimated + :meth:`~mne_lsl.lsl.StreamInlet.time_correction`. + * ``'dejitter'``: Remove jitter on the received timestamps with a + smoothing algorithm. + * ``'monotize'``: Force the timestamps to be monotically ascending. + This option should not be enable if ``'dejitter'`` is not enabled. + timeout : float | None + Optional timeout (in seconds) of the operation. ``None`` disables the + timeout. The timeout value is applied once to every operation supporting it. + + Notes + ----- + If all 3 stream identifiers ``name``, ``stype`` and ``source_id`` are left to + ``None``, resolution of the available streams will require a full ``timeout``, + blocking the execution until this function returns. If at least one of the 3 + stream identifiers is specified, resolution will stop as soon as one stream + matching the identifier is found. + """ + + def disconnect(self) -> None: + """Disconnect from the LSL stream and interrupt data collection.""" + + def _acquire(self) -> None: + """Update function pulling new samples in the buffer at a regular interval.""" + + def _reset_variables(self) -> None: + """Reset variables define after connection.""" + + @property + def compensation_grade(self) -> Optional[int]: + """The current gradient compensation grade. + + :type: :class:`int` | None + """ + + @property + def connected(self) -> bool: + """Connection status of the stream. + + :type: :class:`bool` + """ + + @property + def name(self) -> Optional[str]: + """Name of the LSL stream. + + :type: :class:`str` | None + """ + + @property + def sinfo(self) -> Optional[_BaseStreamInfo]: + """StreamInfo of the connected stream. + + :type: :class:`~mne_lsl.lsl.StreamInfo` | None + """ + + @property + def stype(self) -> Optional[str]: + """Type of the LSL stream. + + :type: :class:`str` | None + """ + + @property + def source_id(self) -> Optional[str]: + """ID of the source of the LSL stream. + + :type: :class:`str` | None + """ diff --git a/mne_lsl/utils/__init__.pyi b/mne_lsl/utils/__init__.pyi new file mode 100644 index 000000000..e69de29bb diff --git a/mne_lsl/utils/_checks.pyi b/mne_lsl/utils/_checks.pyi new file mode 100644 index 000000000..4e0377b7e --- /dev/null +++ b/mne_lsl/utils/_checks.pyi @@ -0,0 +1,111 @@ +from pathlib import Path +from typing import Any, Optional + +from _typeshed import Incomplete + +from ._docs import fill_doc as fill_doc + +def ensure_int(item: Any, item_name: Optional[str] = ...) -> int: + """Ensure a variable is an integer. + + Parameters + ---------- + item : Any + Item to check. + item_name : str | None + Name of the item to show inside the error message. + + Raises + ------ + TypeError + When the type of the item is not int. + """ + +class _IntLike: + @classmethod + def __instancecheck__(cls, other: Any) -> bool: ... + +class _Callable: + @classmethod + def __instancecheck__(cls, other: Any) -> bool: ... + +_types: Incomplete + +def check_type(item: Any, types: tuple, item_name: Optional[str] = ...) -> None: + """Check that item is an instance of types. + + Parameters + ---------- + item : object + Item to check. + types : tuple of types | tuple of str + Types to be checked against. + If str, must be one of ('int-like', 'numeric', 'path-like', 'callable', + 'array-like'). + item_name : str | None + Name of the item to show inside the error message. + + Raises + ------ + TypeError + When the type of the item is not one of the valid options. + """ + +def check_value( + item: Any, + allowed_values: tuple, + item_name: Optional[str] = ..., + extra: Optional[str] = ..., +) -> None: + """Check the value of a parameter against a list of valid options. + + Parameters + ---------- + item : object + Item to check. + allowed_values : tuple of objects + Allowed values to be checked against. + item_name : str | None + Name of the item to show inside the error message. + extra : str | None + Extra string to append to the invalid value sentence, e.g. "when using DC mode". + + Raises + ------ + ValueError + When the value of the item is not one of the valid options. + """ + +def check_verbose(verbose: Any) -> int: + """Check that the value of verbose is valid. + + Parameters + ---------- + verbose : int | str | bool | None + Sets the verbosity level. The verbosity increases gradually between + ``"CRITICAL"``, ``"ERROR"``, ``"WARNING"``, ``"INFO"`` and ``"DEBUG"``. + If None is provided, the verbosity is set to ``"WARNING"``. + If a bool is provided, the verbosity is set to ``"WARNING"`` for False and + to ``"INFO"`` for True. + + Returns + ------- + verbose : int + The verbosity level as an integer. + """ + +def ensure_path(item: Any, must_exist: bool) -> Path: + """Ensure a variable is a Path. + + Parameters + ---------- + item : Any + Item to check. + must_exist : bool + If True, the path must resolve to an existing file or directory. + + Returns + ------- + path : Path + Path validated and converted to a pathlib.Path object. + """ diff --git a/mne_lsl/utils/_docs.py b/mne_lsl/utils/_docs.py index 16ee568cd..356a435c3 100644 --- a/mne_lsl/utils/_docs.py +++ b/mne_lsl/utils/_docs.py @@ -27,17 +27,16 @@ ) for key in keys: - entry = docdict_mne[key] + entry: str = docdict_mne[key] if ".. versionchanged::" in entry: entry = entry.replace(".. versionchanged::", ".. versionchanged:: MNE ") if ".. versionadded::" in entry: entry = entry.replace(".. versionadded::", ".. versionadded:: MNE ") docdict[key] = entry +del key # ----------------------------------------------- -docdict[ - "stream_bufsize" -] = """ +docdict["stream_bufsize"] = """ bufsize : float | int Size of the buffer keeping track of the data received from the stream. If the stream sampling rate ``sfreq`` is regular, ``bufsize`` is expressed in @@ -46,9 +45,7 @@ expressed in samples. The buffer will hold the last ``bufsize`` samples.""" # ----------------------------------------------- -docdict[ - "verbose" -] = """ +docdict["verbose"] = """ verbose : int | str | bool | None Sets the verbosity level. The verbosity increases gradually between ``"CRITICAL"``, ``"ERROR"``, ``"WARNING"``, ``"INFO"`` and ``"DEBUG"``. @@ -106,16 +103,16 @@ def fill_doc(f: Callable) -> Callable: def _indentcount_lines(lines: list[str]) -> int: """Minimum indent for all lines in line list. - >>> lines = [' one', ' two', ' three'] + >>> lines = [" one", " two", " three"] >>> indentcount_lines(lines) 1 >>> lines = [] >>> indentcount_lines(lines) 0 - >>> lines = [' one'] + >>> lines = [" one"] >>> indentcount_lines(lines) 1 - >>> indentcount_lines([' ']) + >>> indentcount_lines([" "]) 0 """ indent = sys.maxsize @@ -156,7 +153,7 @@ def copy_doc(source: Callable) -> Callable: >>> class B(A): ... @copy_doc(A.m1) ... def m1(): - ... ''' this gets appended''' + ... '''this gets appended''' ... pass >>> print(B.m1.__doc__) Docstring for m1 this gets appended diff --git a/mne_lsl/utils/_docs.pyi b/mne_lsl/utils/_docs.pyi new file mode 100644 index 000000000..e2bdd31bf --- /dev/null +++ b/mne_lsl/utils/_docs.pyi @@ -0,0 +1,70 @@ +from typing import Callable + +docdict: dict[str, str] +keys: tuple[str, ...] +entry: str +docdict_indented: dict[int, dict[str, str]] + +def fill_doc(f: Callable) -> Callable: + """Fill a docstring with docdict entries. + + Parameters + ---------- + f : callable + The function to fill the docstring of (modified in place). + + Returns + ------- + f : callable + The function, potentially with an updated __doc__. + """ + +def _indentcount_lines(lines: list[str]) -> int: + """Minimum indent for all lines in line list. + + >>> lines = [" one", " two", " three"] + >>> indentcount_lines(lines) + 1 + >>> lines = [] + >>> indentcount_lines(lines) + 0 + >>> lines = [" one"] + >>> indentcount_lines(lines) + 1 + >>> indentcount_lines([" "]) + 0 + """ + +def copy_doc(source: Callable) -> Callable: + """Copy the docstring from another function (decorator). + + The docstring of the source function is prepepended to the docstring of the + function wrapped by this decorator. + + This is useful when inheriting from a class and overloading a method. This + decorator can be used to copy the docstring of the original method. + + Parameters + ---------- + source : callable + The function to copy the docstring from. + + Returns + ------- + wrapper : callable + The decorated function. + + Examples + -------- + >>> class A: + ... def m1(): + ... '''Docstring for m1''' + ... pass + >>> class B(A): + ... @copy_doc(A.m1) + ... def m1(): + ... '''this gets appended''' + ... pass + >>> print(B.m1.__doc__) + Docstring for m1 this gets appended + """ diff --git a/mne_lsl/utils/_fixes.pyi b/mne_lsl/utils/_fixes.pyi new file mode 100644 index 000000000..b3afcf622 --- /dev/null +++ b/mne_lsl/utils/_fixes.pyi @@ -0,0 +1,8 @@ +class _WrapStdOut: + """Dynamically wrap to sys.stdout. + + This makes packages that monkey-patch sys.stdout (e.g.doctest, + sphinx-gallery) work properly. + """ + + def __getattr__(self, name): ... diff --git a/mne_lsl/utils/_imports.py b/mne_lsl/utils/_imports.py index 3c717de8a..2175e6015 100644 --- a/mne_lsl/utils/_imports.py +++ b/mne_lsl/utils/_imports.py @@ -3,11 +3,18 @@ Inspired from pandas: https://pandas.pydata.org/ """ +from __future__ import annotations # c.f. PEP 563, PEP 649 + import importlib +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from types import ModuleType + from typing import Optional # A mapping from import name to package name (on PyPI) when the package name # is different. -INSTALL_MAPPING = { +_INSTALL_MAPPING: dict[str, str] = { "codespell_lib": "codespell", "cv2": "opencv-python", "parallel": "pyparallel", @@ -22,11 +29,11 @@ def import_optional_dependency( name: str, extra: str = "", raise_error: bool = True, -): +) -> Optional[ModuleType]: """Import an optional dependency. - By default, if a dependency is missing an ImportError with a nice message - will be raised. + By default, if a dependency is missing an ImportError with a nice message will be + raised. Parameters ---------- @@ -43,10 +50,9 @@ def import_optional_dependency( ------- module : Optional[ModuleType] The imported module when found. - None is returned when the package is not found and raise_error is - False. + None is returned when the package is not found and raise_error is False. """ - package_name = INSTALL_MAPPING.get(name) + package_name = _INSTALL_MAPPING.get(name) install_name = package_name if package_name is not None else name try: diff --git a/mne_lsl/utils/_imports.pyi b/mne_lsl/utils/_imports.pyi new file mode 100644 index 000000000..a83f67e4a --- /dev/null +++ b/mne_lsl/utils/_imports.pyi @@ -0,0 +1,30 @@ +from types import ModuleType as ModuleType +from typing import Optional + +_INSTALL_MAPPING: dict[str, str] + +def import_optional_dependency( + name: str, extra: str = ..., raise_error: bool = ... +) -> Optional[ModuleType]: + """Import an optional dependency. + + By default, if a dependency is missing an ImportError with a nice message will be + raised. + + Parameters + ---------- + name : str + The module name. + extra : str + Additional text to include in the ImportError message. + raise_error : bool + What to do when a dependency is not found. + * True : Raise an ImportError. + * False: Return None. + + Returns + ------- + module : Optional[ModuleType] + The imported module when found. + None is returned when the package is not found and raise_error is False. + """ diff --git a/mne_lsl/utils/_path.pyi b/mne_lsl/utils/_path.pyi new file mode 100644 index 000000000..f340b2740 --- /dev/null +++ b/mne_lsl/utils/_path.pyi @@ -0,0 +1,11 @@ +from pathlib import Path +from typing import Generator + +def walk(path: Path) -> Generator[Path, None, None]: + """Walk recursively through a directory tree and yield the existing files. + + Parameters + ---------- + path : Path + Path to a directory. + """ diff --git a/mne_lsl/utils/config.py b/mne_lsl/utils/config.py index beeb997a2..37b79ac38 100644 --- a/mne_lsl/utils/config.py +++ b/mne_lsl/utils/config.py @@ -98,7 +98,7 @@ def _list_dependencies_info( package: str, dependencies: list[Requirement], unicode: bool, -): +) -> None: """List dependencies names and versions.""" if unicode: ljust += 1 diff --git a/mne_lsl/utils/config.pyi b/mne_lsl/utils/config.pyi new file mode 100644 index 000000000..8ab6ae9e4 --- /dev/null +++ b/mne_lsl/utils/config.pyi @@ -0,0 +1,27 @@ +from typing import IO, Callable, Optional + +from packaging.requirements import Requirement + +from ._checks import check_type as check_type +from .logs import _use_log_level as _use_log_level + +def sys_info(fid: Optional[IO] = ..., developer: bool = ...): + """Print the system information for debugging. + + Parameters + ---------- + fid : file-like | None + The file to write to, passed to :func:`print`. + Can be None to use :data:`sys.stdout`. + developer : bool + If True, display information about optional dependencies. + """ + +def _list_dependencies_info( + out: Callable, + ljust: int, + package: str, + dependencies: list[Requirement], + unicode: bool, +) -> None: + """List dependencies names and versions.""" diff --git a/mne_lsl/utils/logs.py b/mne_lsl/utils/logs.py index a1c8c0413..5b783012d 100644 --- a/mne_lsl/utils/logs.py +++ b/mne_lsl/utils/logs.py @@ -156,12 +156,12 @@ class _use_log_level: def __init__( self, - verbose: Union[bool, str, int, None] = None, + verbose: Optional[Union[bool, str, int]] = None, logger_obj: Optional[Logger] = None, ): - self._logger = logger_obj if logger_obj is not None else logger - self._old_level = self._logger.level - self._level = check_verbose(verbose) + self._logger: Logger = logger_obj if logger_obj is not None else logger + self._old_level: int = self._logger.level + self._level: int = check_verbose(verbose) def __enter__(self): self._logger.setLevel(self._level) diff --git a/mne_lsl/utils/logs.pyi b/mne_lsl/utils/logs.pyi new file mode 100644 index 000000000..6d7c20f4f --- /dev/null +++ b/mne_lsl/utils/logs.pyi @@ -0,0 +1,124 @@ +import logging +from logging import Logger +from pathlib import Path as Path +from typing import Callable, Optional, Union + +from _typeshed import Incomplete + +from ._checks import check_verbose as check_verbose +from ._docs import fill_doc as fill_doc +from ._fixes import _WrapStdOut as _WrapStdOut + +def _init_logger(*, verbose: Optional[Union[bool, str, int]] = ...) -> Logger: + """Initialize a logger. + + Assigns sys.stdout as the first handler of the logger. + + Parameters + ---------- + verbose : int | str | bool | None + Sets the verbosity level. The verbosity increases gradually between + ``"CRITICAL"``, ``"ERROR"``, ``"WARNING"``, ``"INFO"`` and ``"DEBUG"``. + If None is provided, the verbosity is set to ``"WARNING"``. + If a bool is provided, the verbosity is set to ``"WARNING"`` for False and + to ``"INFO"`` for True. + + Returns + ------- + logger : Logger + The initialized logger. + """ + +def add_file_handler( + fname: Union[str, Path], + mode: str = ..., + encoding: Optional[str] = ..., + *, + verbose: Optional[Union[bool, str, int]] = ..., +) -> None: + """Add a file handler to the logger. + + Parameters + ---------- + fname : str | Path + Path to the file where the logging output is saved. + mode : str + Mode in which the file is opened. + encoding : str | None + If not None, encoding used to open the file. + verbose : int | str | bool | None + Sets the verbosity level. The verbosity increases gradually between + ``"CRITICAL"``, ``"ERROR"``, ``"WARNING"``, ``"INFO"`` and ``"DEBUG"``. + If None is provided, the verbosity is set to ``"WARNING"``. + If a bool is provided, the verbosity is set to ``"WARNING"`` for False and + to ``"INFO"`` for True. + """ + +def set_log_level(verbose: Optional[Union[bool, str, int]]) -> None: + """Set the log level for the logger. + + Parameters + ---------- + verbose : int | str | bool | None + Sets the verbosity level. The verbosity increases gradually between + ``"CRITICAL"``, ``"ERROR"``, ``"WARNING"``, ``"INFO"`` and ``"DEBUG"``. + If None is provided, the verbosity is set to ``"WARNING"``. + If a bool is provided, the verbosity is set to ``"WARNING"`` for False and + to ``"INFO"`` for True. + """ + +class _LoggerFormatter(logging.Formatter): + """Format string Syntax.""" + + _formatters: Incomplete + + def __init__(self) -> None: ... + def format(self, record: logging.LogRecord): + """ + Format the received log record. + + Parameters + ---------- + record : logging.LogRecord + """ + +def verbose(f: Callable) -> Callable: + """Set the verbose for the function call from the kwargs. + + Parameters + ---------- + f : callable + The function with a verbose argument. + + Returns + ------- + f : callable + The function. + """ + +class _use_log_level: + """Context manager to change the logging level temporary. + + Parameters + ---------- + verbose : int | str | bool | None + Sets the verbosity level. The verbosity increases gradually between + ``"CRITICAL"``, ``"ERROR"``, ``"WARNING"``, ``"INFO"`` and ``"DEBUG"``. + If None is provided, the verbosity is set to ``"WARNING"``. + If a bool is provided, the verbosity is set to ``"WARNING"`` for False and + to ``"INFO"`` for True. + """ + + _logger: Incomplete + _old_level: Incomplete + _level: Incomplete + + def __init__( + self, + verbose: Optional[Union[bool, str, int]] = ..., + logger_obj: Optional[Logger] = ..., + ) -> None: ... + def __enter__(self): ... + def __exit__(self, *args) -> None: ... + +logger: Incomplete diff --git a/mne_lsl/utils/meas_info.py b/mne_lsl/utils/meas_info.py index 56794730a..351e02639 100644 --- a/mne_lsl/utils/meas_info.py +++ b/mne_lsl/utils/meas_info.py @@ -25,12 +25,12 @@ _CH_TYPES_DICT = get_channel_type_constants(include_defaults=True) -_STIM_TYPES = ( +_STIM_TYPES: tuple[str, ...] = ( "marker", "markers", "stim", ) -_HUMAN_UNITS = { +_HUMAN_UNITS: dict[int, dict[str, int]] = { FIFF.FIFF_UNIT_V: { "v": _ch_unit_mul_named[0], "volt": _ch_unit_mul_named[0], diff --git a/mne_lsl/utils/meas_info.pyi b/mne_lsl/utils/meas_info.pyi new file mode 100644 index 000000000..0b46a3bea --- /dev/null +++ b/mne_lsl/utils/meas_info.pyi @@ -0,0 +1,78 @@ +from typing import Any, Optional, Union + +from _typeshed import Incomplete +from mne import Info + +from ..lsl.stream_info import _BaseStreamInfo as _BaseStreamInfo +from ._checks import check_type as check_type +from ._checks import check_value as check_value +from ._checks import ensure_int as ensure_int +from .logs import logger as logger + +_CH_TYPES_DICT: Incomplete +_STIM_TYPES: tuple[str, ...] +_HUMAN_UNITS: dict[int, dict[str, int]] + +def create_info( + n_channels: int, + sfreq: float, + stype: str, + desc: Optional[Union[_BaseStreamInfo, dict[str, Any]]], +) -> Info: + """Create a minimal :class:`mne.Info` object from an LSL stream attributes. + + Parameters + ---------- + n_channels : int + Number of channels. + sfreq : float + Sampling frequency in Hz. ``0`` corresponds to an irregular sampling rate. + stype : str + Type of the stream. This type will be used as a default for all channels with + an unknown type. If the ``stype`` provided is not among the MNE-known channel + types, defaults to ``'misc'``. + desc : StreamInfo | dict | None + If provided, dictionary or :class:`~mne_lsl.lsl.StreamInfo` containing the + channel information. A `~mne_lsl.lsl.StreamInfo` contains the number of + channels,csampling frequency and stream type, which will be checked against the + providedcarguments ``n_channels``, ``sfreq`` and ``stype``. + + Returns + ------- + info : Info + MNE :class:`~mne.Info` object corresponding. + + Notes + ----- + If the argument ``desc`` is not aligned with ``n_channels``, it is ignored and an + :class:`mne.Info` with the number of channels definbed in ``n_channels`` is created. + """ + +def _read_desc_sinfo( + n_channels: int, stype: str, desc: _BaseStreamInfo +) -> tuple[list[str], list[str], list[int], Optional[str]]: + """Read channel information from a StreamInfo. + + If the StreamInfo is retrieved by resolve_streams, the description will be empty. + An inlet should be created and the inlet StreamInfo should be used to retrieve the + channel description. + """ + +def _read_desc_dict( + n_channels: int, stype: str, desc: dict[str, Any] +) -> tuple[list[str], list[str], list[int], Optional[str]]: + """Read channel information from a description dictionary. + + A dictionary is returned from loading an XDF file. + """ + +def _get_ch_types_and_units( + channels: list[dict[str, Any]], stype: str +) -> tuple[list[str], list[int]]: + """Get the channel types and units from a stream description.""" + +def _safe_get(channel, item, default) -> str: + """Retrieve element from a stream description safely.""" + +def _set_channel_units(info: Info, mapping: dict[str, Union[str, int]]) -> None: + """Set the channel unit multiplication factor.""" diff --git a/pyproject.toml b/pyproject.toml index 8facf4971..367221f07 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -60,6 +60,7 @@ version = '1.2.0.dev0' all = [ 'mne_lsl[build]', 'mne_lsl[doc]', + 'mne_lsl[stubs]', 'mne_lsl[style]', 'mne_lsl[test]', ] @@ -83,13 +84,17 @@ doc = [ full = [ 'mne_lsl[all]', ] +stubs = [ + 'isort', + 'mypy', + 'ruff>=0.1.8', +] style = [ 'bibclean', - 'black', - 'codespell', + 'codespell[toml]>=2.2.4', 'isort', 'pydocstyle[toml]', - 'ruff', + 'ruff>=0.1.8', 'toml-sort', 'yamllint', ] @@ -112,20 +117,11 @@ homepage = 'https://mne.tools/mne-lsl' source = 'https://github.com/mne-tools/mne-lsl' tracker = 'https://github.com/mne-tools/mne-lsl/issues' -[tool.black] -extend-exclude = ''' -( - __pycache__ - | \.github - | doc/ - | pyproject.toml - | setup.py - | tutorials/ -) -''' -include = '\.pyi?$' -line-length = 88 -target-version = ['py39'] +[tool.codespell] +check-filenames = true +check-hidden = true +ignore-words = '.codespellignore' +skip = 'build,.git,.mypy_cache,.pytest_cache,doc/_static/logos/*' [tool.coverage.report] exclude_lines = [ @@ -186,8 +182,14 @@ extend-exclude = [ 'tutorials/*', ] line-length = 88 +select = ["E", "F", "W"] +target-version = 'py39' + +[tool.ruff.format] +docstring-code-format = true [tool.ruff.per-file-ignores] +'*.pyi' = ['E501'] '__init__.py' = ['F401'] [tool.setuptools] diff --git a/tools/stubgen.py b/tools/stubgen.py new file mode 100644 index 000000000..4e38ceb1b --- /dev/null +++ b/tools/stubgen.py @@ -0,0 +1,69 @@ +import ast +import subprocess +import sys +from importlib import import_module +from pathlib import Path + +import isort +from mypy import stubgen + +import mne_lsl + +directory = Path(mne_lsl.__file__).parent +# remove existing stub files +for file in directory.rglob("*.pyi"): + file.unlink() +# generate stub files, including private members and docstrings +files = [ + str(file.as_posix()) + for file in directory.rglob("*.py") + if file.parent.name not in ("commands", "tests") + and "stream_viewer" not in str(file.parent) + and not (file.name == "constants.py" and file.parent.name == "lsl") + and file.name not in ("conftest.py", "_tests.py", "_version.py") +] +stubgen.main( + [ + "--no-analysis", + "--no-import", + "--include-private", + "--include-docstrings", + "--output", + str(directory.parent), + *files, + ] +) +stubs = list(directory.rglob("*.pyi")) +config = str((directory.parent / "pyproject.toml")) +config_isort = isort.settings.Config(config) + +# expand docstrings and inject into stub files +for stub in stubs: + module_path = str(stub.relative_to(directory).with_suffix("").as_posix()) + module = import_module(f"{directory.name}.{module_path.replace('/', '.')}") + module_ast = ast.parse(stub.read_text(encoding="utf-8")) + objects = [ + node + for node in module_ast.body + if isinstance(node, (ast.ClassDef, ast.FunctionDef)) + ] + for node in objects: + docstring = getattr(module, node.name).__doc__ + if not docstring and isinstance(node, ast.FunctionDef): + continue + elif docstring: + node.body[0].value.value = docstring + for method in node.body: + if not isinstance(method, ast.FunctionDef): + continue + docstring = getattr(getattr(module, node.name), method.name).__doc__ + if docstring: + method.body[0].value.value = docstring + unparsed = ast.unparse(module_ast) + stub.write_text(unparsed, encoding="utf-8") + # sort imports + isort.file(stub, config=config_isort) + +# run ruff to improve stub style +exec = subprocess.run(["ruff", "format", str(directory), "--config", config]) +sys.exit(exec.returncode)