From d2f99db61d97cf9a2d86372f8f607229e45898fc Mon Sep 17 00:00:00 2001 From: Mathieu Scheltienne Date: Mon, 5 Aug 2024 10:40:13 +0200 Subject: [PATCH 01/18] make abstraction layer public --- doc/api/abstract.rst | 16 + doc/api/index.rst | 2 + doc/api/most-used classes.rst | 5 +- mne_lsl/stream/_base.pyi | 825 --------------------------- mne_lsl/stream/{_base.py => base.py} | 2 +- mne_lsl/stream/epochs.py | 2 +- mne_lsl/stream/stream_lsl.py | 12 +- mne_lsl/stream/stream_lsl.pyi | 173 ------ tutorials/30_stream_manual.py | 2 +- 9 files changed, 25 insertions(+), 1014 deletions(-) create mode 100644 doc/api/abstract.rst delete mode 100644 mne_lsl/stream/_base.pyi rename mne_lsl/stream/{_base.py => base.py} (99%) delete mode 100644 mne_lsl/stream/stream_lsl.pyi diff --git a/doc/api/abstract.rst b/doc/api/abstract.rst new file mode 100644 index 000000000..f66309be6 --- /dev/null +++ b/doc/api/abstract.rst @@ -0,0 +1,16 @@ +Abstraction layer +----------------- + +.. currentmodule:: mne_lsl.stream + +An abstraction layer is provided to create a ``Stream`` object that uses a different +communication protocol than LSL. An object inheriting from +:class:`~mne_lsl.stream.BaseStream` will be compatible with other objects from +``mne-lsl``. For instance, it will be possible to epoch the stream with +:class:`~mne_lsl.stream.EpochsStream`. + +.. autosummary:: + :toctree: ../generated/api + :nosignatures: + + BaseStream diff --git a/doc/api/index.rst b/doc/api/index.rst index 15da66d0d..a407bf664 100644 --- a/doc/api/index.rst +++ b/doc/api/index.rst @@ -19,4 +19,6 @@ This is the reference for classes (``CamelCase`` names) and functions .. include:: utilities.rst +.. include:: abstract.rst + .. include:: legacy.rst diff --git a/doc/api/most-used classes.rst b/doc/api/most-used classes.rst index b3cedcc5c..3c76b9043 100644 --- a/doc/api/most-used classes.rst +++ b/doc/api/most-used classes.rst @@ -9,7 +9,8 @@ Stream ~~~~~~ A ``Stream`` uses an `MNE `_-like API to efficiently interacts with a -numerical stream. +numerical stream. ``Stream`` objects inherit from the abstract object +:class:`~mne_lsl.stream.BaseStream`. .. currentmodule:: mne_lsl.stream @@ -22,7 +23,7 @@ numerical stream. EpochsStream ~~~~~~~~~~~~ -An ``EpochsStream`` can be used to create epochs from a stream. +An ``EpochsStream`` can be used to create epochs from a ``Stream``. .. currentmodule:: mne_lsl.stream diff --git a/mne_lsl/stream/_base.pyi b/mne_lsl/stream/_base.pyi deleted file mode 100644 index 756b97b04..000000000 --- a/mne_lsl/stream/_base.pyi +++ /dev/null @@ -1,825 +0,0 @@ -from abc import ABC, abstractmethod -from collections.abc import Generator -from datetime import datetime as datetime -from typing import Any, Callable - -import numpy as np -from _typeshed import Incomplete -from mne import Info -from mne.channels import DigMontage as DigMontage -from mne.channels.channels import SetChannelsMixin -from mne.io.meas_info import ContainsMixin -from numpy.typing import DTypeLike as DTypeLike -from numpy.typing import NDArray as NDArray - -from .._typing import ScalarArray as ScalarArray -from .._typing import ScalarIntArray as ScalarIntArray -from ..utils._checks import check_type as check_type -from ..utils._checks import check_value as check_value -from ..utils._checks import ensure_int as ensure_int -from ..utils._docs import copy_doc as copy_doc -from ..utils._docs import fill_doc as fill_doc -from ..utils.logs import logger as logger -from ..utils.logs import verbose as verbose -from ..utils.logs import warn as warn -from ..utils.meas_info import _HUMAN_UNITS as _HUMAN_UNITS -from ..utils.meas_info import _set_channel_units as _set_channel_units -from ._filters import StreamFilter as StreamFilter -from ._filters import create_filter as create_filter -from ._filters import ensure_sos_iir_params as ensure_sos_iir_params - -class BaseStream(ABC, ContainsMixin, SetChannelsMixin): - """Stream object representing a single real-time stream. - - Parameters - ---------- - bufsize : float | int - Size of the buffer keeping track of the data received from the stream. If - the stream sampling rate ``sfreq`` is regular, ``bufsize`` is expressed in - seconds. The buffer will hold the last ``bufsize * sfreq`` samples (ceiled). - If the stream sampling rate ``sfreq`` is irregular, ``bufsize`` is - expressed in samples. The buffer will hold the last ``bufsize`` samples. - """ - - _bufsize: Incomplete - - @abstractmethod - def __init__(self, bufsize: float): ... - def __contains__(self, ch_type: str) -> bool: - """Check channel type membership. - - Parameters - ---------- - ch_type : str - Channel type to check for. Can be e.g. ``'meg'``, ``'eeg'``, - ``'stim'``, etc. - - Returns - ------- - in : bool - Whether or not the instance contains the given channel type. - - Examples - -------- - Channel type membership can be tested as:: - - >>> 'meg' in inst # doctest: +SKIP - True - >>> 'seeg' in inst # doctest: +SKIP - False - - """ - - def __del__(self) -> None: - """Try to disconnect the stream when deleting the object.""" - - @abstractmethod - def __repr__(self) -> str: - """Representation of the instance.""" - - @abstractmethod - def acquire(self) -> None: - """Pull new samples in the buffer. - - Notes - ----- - This method is not needed if the stream was connected with an acquisition delay - different from ``0``. In this case, the acquisition is done automatically in a - background thread. - """ - _buffer: Incomplete - - def add_reference_channels( - self, - ref_channels: str | list[str] | tuple[str], - ref_units: str | int | list[str | int] | tuple[str | int] | None = None, - ) -> BaseStream: - """Add EEG reference channels to data that consists of all zeros. - - Adds EEG reference channels that are not part of the streamed data. This is - useful when you need to re-reference your data to different channels. These - added channels will consist of all zeros. - - Parameters - ---------- - ref_channels : str | list of str - Name of the electrode(s) which served as the reference in the - recording. If a name is provided, a corresponding channel is added - and its data is set to 0. This is useful for later re-referencing. - ref_units : str | int | list of str | list of int | None - The unit or unit multiplication factor of the reference channels. The unit - can be given as a human-readable string or as a unit multiplication factor, - e.g. ``-6`` for microvolts corresponding to ``1e-6``. - If not provided, the added EEG reference channel has a unit multiplication - factor set to ``0`` which corresponds to Volts. Use - ``Stream.set_channel_units`` to change the unit multiplication factor. - - Returns - ------- - stream : instance of ``Stream`` - The stream instance modified in-place. - """ - - def anonymize( - self, - daysback: int | None = None, - keep_his: bool = False, - *, - verbose: bool | str | int | None = None, - ) -> BaseStream: - """Anonymize the measurement information in-place. - - Parameters - ---------- - daysback : int | None - Number of days to subtract from all dates. - If ``None`` (default), the acquisition date, ``info['meas_date']``, - will be set to ``January 1ˢᵗ, 2000``. This parameter is ignored if - ``info['meas_date']`` is ``None`` (i.e., no acquisition date has been set). - keep_his : bool - If ``True``, ``his_id`` of ``subject_info`` will **not** be overwritten. - Defaults to ``False``. - - .. warning:: This could mean that ``info`` is not fully - anonymized. Use with caution. - verbose : int | str | bool | None - Sets the verbosity level. The verbosity increases gradually between - ``"CRITICAL"``, ``"ERROR"``, ``"WARNING"``, ``"INFO"`` and ``"DEBUG"``. - If None is provided, the verbosity is set to the currently set logger's level. - If a bool is provided, the verbosity is set to ``"WARNING"`` for False and - to ``"INFO"`` for True. - - Returns - ------- - stream : instance of ``Stream`` - The stream instance modified in-place. - - Notes - ----- - Removes potentially identifying information if it exists in ``info``. - Specifically for each of the following we use: - - - meas_date, file_id, meas_id - A default value, or as specified by ``daysback``. - - subject_info - Default values, except for 'birthday' which is adjusted - to maintain the subject age. - - experimenter, proj_name, description - Default strings. - - utc_offset - ``None``. - - proj_id - Zeros. - - proc_history - Dates use the ``meas_date`` logic, and experimenter a default string. - - helium_info, device_info - Dates use the ``meas_date`` logic, meta info uses defaults. - - If ``info['meas_date']`` is ``None``, it will remain ``None`` during processing - the above fields. - - Operates in place. - """ - _acquisition_delay: Incomplete - _n_new_samples: int - _executor: Incomplete - - @abstractmethod - def connect(self, acquisition_delay: float) -> BaseStream: - """Connect to the stream and initiate data collection in the buffer. - - Parameters - ---------- - acquisition_delay : float - Delay in seconds between 2 acquisition during which chunks of data are - pulled from the connected device. If ``0``, the automatic acquisition in a - background thread is disabled and the user must manually call the - acquisition method to pull new samples. - - Returns - ------- - stream : instance of ``Stream`` - The stream instance modified in-place. - """ - - @abstractmethod - def disconnect(self) -> BaseStream: - """Disconnect from the LSL stream and interrupt data collection. - - Returns - ------- - stream : instance of ``Stream`` - The stream instance modified in-place. - """ - - def del_filter(self, idx: int | list[int] | tuple[int] | str = "all") -> None: - """Remove a filter from the list of applied filters. - - Parameters - ---------- - idx : ``'all'`` | int | list of int | tuple of int - If the string ``'all'`` (default), remove all filters. If an integer or a - list of integers, remove the filter(s) at the given index(es) from - ``Stream.filters``. - - Notes - ----- - When removing a filter, the initial conditions of all the filters applied on - overlapping channels are reset. The initial conditions will be re-estimated as - a step response steady-state. - """ - - def drop_channels(self, ch_names: str | list[str] | tuple[str]) -> BaseStream: - """Drop channel(s). - - Parameters - ---------- - ch_names : str | list of str - Name or list of names of channels to remove. - - Returns - ------- - stream : instance of ``Stream`` - The stream instance modified in-place. - - See Also - -------- - pick - """ - - def filter( - self, - l_freq: float | None, - h_freq: float | None, - picks: str | list[str] | int | list[int] | ScalarIntArray | None = None, - iir_params: dict[str, Any] | None = None, - *, - verbose: bool | str | int | None = None, - ) -> BaseStream: - """Filter the stream with an IIR causal filter. - - Once a filter is applied, the buffer is updated in real-time with the filtered - data. It is possible to apply more than one filter. - - .. code-block:: python - - stream = Stream(2.0).connect() - stream.filter(1.0, 40.0, picks="eeg") - stream.filter(1.0, 15.0, picks="ecg").filter(0.1, 5, picks="EDA") - - Parameters - ---------- - l_freq : float | None - The lower cutoff frequency. If None, the buffer is only low-passed. - h_freq : float | None - The higher cutoff frequency. If None, the buffer is only high-passed. - picks : str | array-like | slice | None - Channels to include. Slices and lists of integers will be interpreted as - channel indices. In lists, channel *type* strings (e.g., ``['meg', - 'eeg']``) will pick channels of those types, channel *name* strings (e.g., - ``['MEG0111', 'MEG2623']`` will pick the given channels. Can also be the - string values "all" to pick all channels, or "data" to pick :term:`data - channels`. None (default) will pick all channels. Note that channels in - ``info['bads']`` *will be included* if their names or indices are - explicitly provided. - iir_params : dict | None - Dictionary of parameters to use for IIR filtering. If None, a 4th order - Butterworth will be used. For more information, see - :func:`mne.filter.construct_iir_filter`. - - .. note:: - - The output ``sos`` must be used. The ``ba`` output is not supported. - verbose : int | str | bool | None - Sets the verbosity level. The verbosity increases gradually between - ``"CRITICAL"``, ``"ERROR"``, ``"WARNING"``, ``"INFO"`` and ``"DEBUG"``. - If None is provided, the verbosity is set to the currently set logger's level. - If a bool is provided, the verbosity is set to ``"WARNING"`` for False and - to ``"INFO"`` for True. - - Returns - ------- - stream : instance of ``Stream`` - The stream instance modified in-place. - """ - - def get_channel_types( - self, - picks: str | list[str] | int | list[int] | ScalarIntArray | None = None, - unique: bool = False, - only_data_chs: bool = False, - ) -> list[str]: - """Get a list of channel type for each channel. - - Parameters - ---------- - picks : str | array-like | slice | None - Channels to include. Slices and lists of integers will be interpreted as - channel indices. In lists, channel *type* strings (e.g., ``['meg', - 'eeg']``) will pick channels of those types, channel *name* strings (e.g., - ``['MEG0111', 'MEG2623']`` will pick the given channels. Can also be the - string values "all" to pick all channels, or "data" to pick :term:`data - channels`. None (default) will pick all channels. Note that channels in - ``info['bads']`` *will be included* if their names or indices are - explicitly provided. - unique : bool - Whether to return only unique channel types. Default is ``False``. - only_data_chs : bool - Whether to ignore non-data channels. Default is ``False``. - - Returns - ------- - channel_types : list - The channel types. - """ - - def get_channel_units( - self, - picks: str | list[str] | int | list[int] | ScalarIntArray | None = None, - only_data_chs: bool = False, - ) -> list[tuple[int, int]]: - """Get a list of channel unit for each channel. - - Parameters - ---------- - picks : str | array-like | slice | None - Channels to include. Slices and lists of integers will be interpreted as - channel indices. In lists, channel *type* strings (e.g., ``['meg', - 'eeg']``) will pick channels of those types, channel *name* strings (e.g., - ``['MEG0111', 'MEG2623']`` will pick the given channels. Can also be the - string values "all" to pick all channels, or "data" to pick :term:`data - channels`. None (default) will pick all channels. Note that channels in - ``info['bads']`` *will be included* if their names or indices are - explicitly provided. - only_data_chs : bool - Whether to ignore non-data channels. Default is ``False``. - - Returns - ------- - channel_units : list of tuple of shape (2,) - A list of 2-element tuples. The first element contains the unit FIFF code - and its associated name, e.g. ``107 (FIFF_UNIT_V)`` for Volts. The second - element contains the unit multiplication factor, e.g. ``-6 (FIFF_UNITM_MU)`` - for micro (corresponds to ``1e-6``). - """ - - def get_data( - self, - winsize: float | None = None, - picks: str | list[str] | int | list[int] | ScalarIntArray | None = None, - ) -> tuple[ScalarArray, NDArray[np.float64]]: - """Retrieve the latest data from the buffer. - - Parameters - ---------- - winsize : float | int | None - Size of the window of data to view. If the stream sampling rate ``sfreq`` is - regular, ``winsize`` is expressed in seconds. The window will view the last - ``winsize * sfreq`` samples (ceiled) from the buffer. If the stream sampling - sampling rate ``sfreq`` is irregular, ``winsize`` is expressed in samples. - The window will view the last ``winsize`` samples. If ``None``, the entire - buffer is returned. - picks : str | array-like | slice | None - Channels to include. Slices and lists of integers will be interpreted as - channel indices. In lists, channel *type* strings (e.g., ``['meg', - 'eeg']``) will pick channels of those types, channel *name* strings (e.g., - ``['MEG0111', 'MEG2623']`` will pick the given channels. Can also be the - string values "all" to pick all channels, or "data" to pick :term:`data - channels`. None (default) will pick all channels. Note that channels in - ``info['bads']`` *will be included* if their names or indices are - explicitly provided. - - Returns - ------- - data : array of shape (n_channels, n_samples) - Data in the given window. - timestamps : array of shape (n_samples,) - Timestamps in the given window. - - Notes - ----- - The number of newly available samples stored in the property ``n_new_samples`` - is reset at every function call, even if all channels were not selected with the - argument ``picks``. - """ - - def get_montage(self) -> DigMontage | None: - """Get a DigMontage from instance. - - Returns - ------- - - montage : None | str | DigMontage - A montage containing channel positions. If a string or - :class:`~mne.channels.DigMontage` is - specified, the existing channel information will be updated with the - channel positions from the montage. Valid strings are the names of the - built-in montages that ship with MNE-Python; you can list those via - :func:`mne.channels.get_builtin_montages`. - If ``None`` (default), the channel positions will be removed from the - :class:`~mne.Info`. - """ - - def notch_filter( - self, - freqs: float, - picks: str | list[str] | int | list[int] | ScalarIntArray | None = None, - notch_widths: float | None = None, - trans_bandwidth: int = 1, - iir_params: dict[str, Any] | None = None, - *, - verbose: bool | str | int | None = None, - ) -> BaseStream: - """Filter the stream with an IIR causal notch filter. - - Once a filter is applied, the buffer is updated in real-time with the filtered - data. It is possible to apply more than one filter. - - .. code-block:: python - - stream = Stream(2.0).connect() - stream.filter(1.0, 40.0, picks="eeg") - stream.notch_filter(50, picks="ecg") - - Parameters - ---------- - freqs : float - Specific frequencies to filter out from data, e.g. ``60`` Hz in the US or - ``50`` Hz in Europe for line noise. - picks : str | array-like | slice | None - Channels to include. Slices and lists of integers will be interpreted as - channel indices. In lists, channel *type* strings (e.g., ``['meg', - 'eeg']``) will pick channels of those types, channel *name* strings (e.g., - ``['MEG0111', 'MEG2623']`` will pick the given channels. Can also be the - string values "all" to pick all channels, or "data" to pick :term:`data - channels`. None (default) will pick all channels. Note that channels in - ``info['bads']`` *will be included* if their names or indices are - explicitly provided. - notch_widths : float | None - Width of the stop band in Hz. If ``None``, ``freqs / 200`` is used. - trans_bandwidth : float - Width of the transition band in Hz. - iir_params : dict | None - Dictionary of parameters to use for IIR filtering. If None, a 4th order - Butterworth will be used. For more information, see - :func:`mne.filter.construct_iir_filter`. - - .. note:: - - The output ``sos`` must be used. The ``ba`` output is not supported. - verbose : int | str | bool | None - Sets the verbosity level. The verbosity increases gradually between - ``"CRITICAL"``, ``"ERROR"``, ``"WARNING"``, ``"INFO"`` and ``"DEBUG"``. - If None is provided, the verbosity is set to the currently set logger's level. - If a bool is provided, the verbosity is set to ``"WARNING"`` for False and - to ``"INFO"`` for True. - - Returns - ------- - stream : instance of ``Stream`` - The stream instance modified in-place. - """ - - def plot(self) -> None: - """Open a real-time stream viewer. Not implemented.""" - - def pick( - self, - picks: str | list[str] | int | list[int] | ScalarIntArray | None = None, - exclude: str | list[str] | int | list[int] | ScalarIntArray = (), - ) -> BaseStream: - """Pick a subset of channels. - - Parameters - ---------- - picks : str | array-like | slice | None - Channels to include. Slices and lists of integers will be interpreted as - channel indices. In lists, channel *type* strings (e.g., ``['meg', - 'eeg']``) will pick channels of those types, channel *name* strings (e.g., - ``['MEG0111', 'MEG2623']`` will pick the given channels. Can also be the - string values "all" to pick all channels, or "data" to pick :term:`data - channels`. None (default) will pick all channels. - exclude : str | list of str - Set of channels to exclude, only used when picking is based on types, e.g. - ``exclude='bads'`` when ``picks="meg"``. - - Returns - ------- - stream : instance of ``Stream`` - The stream instance modified in-place. - - See Also - -------- - drop_channels - - Notes - ----- - Contrary to MNE-Python, re-ordering channels is not supported in ``MNE-LSL``. - Thus, if explicit channel names are provided in ``picks``, they are sorted to - match the order of existing channel names. - """ - - def record(self) -> None: - """Record the stream data to disk. Not implemented.""" - - def rename_channels( - self, - mapping: dict[str, str] | Callable, - allow_duplicates: bool = False, - *, - verbose: bool | str | int | None = None, - ) -> BaseStream: - """Rename channels. - - Parameters - ---------- - mapping : dict | callable - A dictionary mapping the old channel to a new channel name e.g. - ``{'EEG061' : 'EEG161'}``. Can also be a callable function that takes and - returns a string. - allow_duplicates : bool - If True (default False), allow duplicates, which will automatically be - renamed with ``-N`` at the end. - verbose : int | str | bool | None - Sets the verbosity level. The verbosity increases gradually between - ``"CRITICAL"``, ``"ERROR"``, ``"WARNING"``, ``"INFO"`` and ``"DEBUG"``. - If None is provided, the verbosity is set to the currently set logger's level. - If a bool is provided, the verbosity is set to ``"WARNING"`` for False and - to ``"INFO"`` for True. - - Returns - ------- - stream : instance of ``Stream`` - The stream instance modified in-place. - """ - - def set_bipolar_reference(self) -> BaseStream: - """Set a bipolar reference. Not implemented. - - Returns - ------- - stream : instance of ``Stream`` - The stream instance modified in-place. - """ - - def set_channel_types( - self, - mapping: dict[str, str], - *, - on_unit_change: str = "warn", - verbose: bool | str | int | None = None, - ) -> BaseStream: - """Define the sensor type of channels. - - If the new channel type changes the unit type, e.g. from ``T/m`` to ``V``, the - unit multiplication factor is reset to ``0``. Use - ``Stream.set_channel_units`` to change the multiplication factor, e.g. from - ``0`` to ``-6`` to change from Volts to microvolts. - - Parameters - ---------- - mapping : dict - A dictionary mapping a channel to a sensor type (str), e.g., - ``{'EEG061': 'eog'}`` or ``{'EEG061': 'eog', 'TRIGGER': 'stim'}``. - on_unit_change : ``'raise'`` | ``'warn'`` | ``'ignore'`` - What to do if the measurement unit of a channel is changed automatically to - match the new sensor type. - - .. versionadded:: MNE 1.4 - verbose : int | str | bool | None - Sets the verbosity level. The verbosity increases gradually between - ``"CRITICAL"``, ``"ERROR"``, ``"WARNING"``, ``"INFO"`` and ``"DEBUG"``. - If None is provided, the verbosity is set to the currently set logger's level. - If a bool is provided, the verbosity is set to ``"WARNING"`` for False and - to ``"INFO"`` for True. - - Returns - ------- - stream : instance of ``Stream`` - The stream instance modified in-place. - """ - - def set_channel_units(self, mapping: dict[str, str | int]) -> BaseStream: - """Define the channel unit multiplication factor. - - The unit itself is defined by the sensor type. Use - ``Stream.set_channel_types`` to change the channel type, e.g. from planar - gradiometers in ``T/m`` to EEG in ``V``. - - Parameters - ---------- - mapping : dict - A dictionary mapping a channel to a unit, e.g. ``{'EEG061': 'microvolts'}``. - The unit can be given as a human-readable string or as a unit multiplication - factor, e.g. ``-6`` for microvolts corresponding to ``1e-6``. - - Returns - ------- - stream : instance of ``Stream`` - The stream instance modified in-place. - - Notes - ----- - If the human-readable unit of your channel is not yet supported by MNE-LSL, - please contact the developers on GitHub to add your units to the known set. - """ - _ref_channels: Incomplete - _ref_from: Incomplete - - def set_eeg_reference( - self, - ref_channels: str | list[str] | tuple[str], - ch_type: str | list[str] | tuple[str] = "eeg", - ) -> BaseStream: - """Specify which reference to use for EEG-like data. - - Use this function to explicitly specify the desired reference for EEG-like - channels. This can be either an existing electrode or a new virtual channel - added with ``Stream.add_reference_channels``. This function will re-reference - the data in the ringbuffer according to the desired reference. - - Parameters - ---------- - ref_channels : str | list of str - Name(s) of the channel(s) used to construct the reference. Can also be set - to ``'average'`` to apply a common average reference. - ch_type : str | list of str - The name of the channel type to apply the reference to. Valid channel types - are ``'eeg'``, ``'ecog'``, ``'seeg'``, ``'dbs'``. - - Returns - ------- - stream : instance of ``Stream`` - The stream instance modified in-place. - """ - - def set_meas_date( - self, meas_date: datetime | float | tuple[float] | None - ) -> BaseStream: - """Set the measurement start date. - - Parameters - ---------- - meas_date : datetime | float | tuple | None - The new measurement date. - If datetime object, it must be timezone-aware and in UTC. - A tuple of (seconds, microseconds) or float (alias for - ``(meas_date, 0)``) can also be passed and a datetime - object will be automatically created. If None, will remove - the time reference. - - Returns - ------- - stream : instance of ``Stream`` - The stream instance modified in-place. - - See Also - -------- - anonymize - """ - - def set_montage( - self, - montage: str | DigMontage | None, - match_case: bool = True, - match_alias: bool | dict[str, str] = False, - on_missing: str = "raise", - *, - verbose: bool | str | int | None = None, - ) -> BaseStream: - """Set EEG/sEEG/ECoG/DBS/fNIRS channel positions and digitization points. - - Parameters - ---------- - montage : None | str | DigMontage - A montage containing channel positions. If a string or - :class:`~mne.channels.DigMontage` is - specified, the existing channel information will be updated with the - channel positions from the montage. Valid strings are the names of the - built-in montages that ship with MNE-Python; you can list those via - :func:`mne.channels.get_builtin_montages`. - If ``None`` (default), the channel positions will be removed from the - :class:`~mne.Info`. - match_case : bool - If True (default), channel name matching will be case sensitive. - - .. versionadded:: MNE 0.20 - match_alias : bool | dict - Whether to use a lookup table to match unrecognized channel location names - to their known aliases. If True, uses the mapping in - ``mne.io.constants.CHANNEL_LOC_ALIASES``. If a :class:`dict` is passed, it - will be used instead, and should map from non-standard channel names to - names in the specified ``montage``. Default is ``False``. - - .. versionadded:: MNE 0.23 - on_missing : 'raise' | 'warn' | 'ignore' - Can be ``'raise'`` (default) to raise an error, ``'warn'`` to emit a - warning, or ``'ignore'`` to ignore when channels have missing coordinates. - - .. versionadded:: MNE 0.20.1 - verbose : int | str | bool | None - Sets the verbosity level. The verbosity increases gradually between - ``"CRITICAL"``, ``"ERROR"``, ``"WARNING"``, ``"INFO"`` and ``"DEBUG"``. - If None is provided, the verbosity is set to the currently set logger's level. - If a bool is provided, the verbosity is set to ``"WARNING"`` for False and - to ``"INFO"`` for True. - - Returns - ------- - stream : instance of ``Stream`` - The stream instance modified in-place. - - See Also - -------- - mne.channels.make_standard_montage - mne.channels.make_dig_montage - mne.channels.read_custom_montage - - Notes - ----- - .. warning:: - - Only EEG/sEEG/ECoG/DBS/fNIRS channels can have their positions set using a - montage. Other channel types (e.g., MEG channels) should have their - positions defined properly using their data reading functions. - """ - - @staticmethod - def _acquire(self) -> None: - """Update function pulling new samples in the buffer at a regular interval.""" - - def _check_connected(self, name: str): - """Check that the stream is connected before calling the function 'name'.""" - - def _check_connected_and_regular_sampling(self, name: str): - """Check that the stream has a regular sampling rate.""" - - def _interrupt_acquisition(self) -> Generator[None, None, None]: - """Context manager interrupting the acquisition thread.""" - _info: Incomplete - _picks_inlet: Incomplete - - def _pick(self, picks: ScalarIntArray) -> None: - """Interrupt acquisition and apply the channel selection.""" - _added_channels: Incomplete - _filters: Incomplete - _timestamps: Incomplete - - @abstractmethod - def _reset_variables(self) -> None: - """Reset variables define after connection.""" - - @property - def compensation_grade(self) -> int | None: - """The current gradient compensation grade. - - :type: :class:`int` | None - """ - - @property - def ch_names(self) -> list[str]: - """Name of the channels. - - :type: :class:`list` of :class:`str` - """ - - @property - def connected(self) -> bool: - """Connection status of the stream. - - :type: :class:`bool` - """ - - @property - def dtype(self) -> DTypeLike | None: - """Channel format of the stream.""" - - @property - def filters(self) -> list[StreamFilter]: - """List of filters applied to the real-time Stream. - - :type: :class:`list` of ```StreamFilter`` - """ - - @property - def info(self) -> Info: - """Info of the LSL stream. - - :type: :class:`~mne.Info` - """ - - @property - def n_buffer(self) -> int: - """Number of samples that can be stored in the buffer. - - :type: :class:`int` - """ - - @property - def n_new_samples(self) -> int: - """Number of new samples available in the buffer. - - The number of new samples is reset at every ``Stream.get_data`` call. - - :type: :class:`int` - """ diff --git a/mne_lsl/stream/_base.py b/mne_lsl/stream/base.py similarity index 99% rename from mne_lsl/stream/_base.py rename to mne_lsl/stream/base.py index f4d9b6cb7..db38a89b1 100644 --- a/mne_lsl/stream/_base.py +++ b/mne_lsl/stream/base.py @@ -1028,7 +1028,7 @@ def set_montage( ) return self - @staticmethod + @abstractmethod def _acquire(self) -> None: # pragma: no cover """Update function pulling new samples in the buffer at a regular interval.""" diff --git a/mne_lsl/stream/epochs.py b/mne_lsl/stream/epochs.py index 264f0332c..0da303fbc 100644 --- a/mne_lsl/stream/epochs.py +++ b/mne_lsl/stream/epochs.py @@ -25,7 +25,7 @@ from ..utils._docs import fill_doc from ..utils._fixes import find_events from ..utils.logs import logger, warn -from ._base import BaseStream +from .base import BaseStream if TYPE_CHECKING: from typing import Optional, Union diff --git a/mne_lsl/stream/stream_lsl.py b/mne_lsl/stream/stream_lsl.py index b90725264..79e390d3f 100644 --- a/mne_lsl/stream/stream_lsl.py +++ b/mne_lsl/stream/stream_lsl.py @@ -20,7 +20,7 @@ from ..utils._checks import check_type from ..utils._docs import copy_doc, fill_doc from ..utils.logs import logger -from ._base import BaseStream +from .base import BaseStream if TYPE_CHECKING: from collections.abc import Sequence @@ -317,16 +317,6 @@ def _reset_variables(self) -> None: self._sinfo = None self._inlet = None - # ---------------------------------------------------------------------------------- - @property - def compensation_grade(self) -> Optional[int]: - """The current gradient compensation grade. - - :type: :class:`int` | None - """ - self._check_connected("compensation_grade") - return super().compensation_grade - # ---------------------------------------------------------------------------------- @property def connected(self) -> bool: diff --git a/mne_lsl/stream/stream_lsl.pyi b/mne_lsl/stream/stream_lsl.pyi deleted file mode 100644 index dc20a30f1..000000000 --- a/mne_lsl/stream/stream_lsl.pyi +++ /dev/null @@ -1,173 +0,0 @@ -from collections.abc import Sequence as Sequence - -from _typeshed import Incomplete - -from mne_lsl.lsl.stream_info import _BaseStreamInfo as _BaseStreamInfo - -from ..lsl import StreamInlet as StreamInlet -from ..lsl import resolve_streams as resolve_streams -from ..lsl.constants import fmt2numpy as fmt2numpy -from ..utils._checks import check_type as check_type -from ..utils._docs import copy_doc as copy_doc -from ..utils._docs import fill_doc as fill_doc -from ..utils.logs import logger as logger -from ._base import BaseStream as BaseStream - -class StreamLSL(BaseStream): - """Stream object representing a single LSL stream. - - Parameters - ---------- - bufsize : float | int - Size of the buffer keeping track of the data received from the stream. If - the stream sampling rate ``sfreq`` is regular, ``bufsize`` is expressed in - seconds. The buffer will hold the last ``bufsize * sfreq`` samples (ceiled). - If the stream sampling rate ``sfreq`` is irregular, ``bufsize`` is - expressed in samples. The buffer will hold the last ``bufsize`` samples. - name : str - Name of the LSL stream. - stype : str - Type of the LSL stream. - source_id : str - ID of the source of the LSL stream. - - Notes - ----- - The 3 arguments ``name``, ``stype``, and ``source_id`` must uniquely identify an - LSL stream. If this is not possible, please resolve the available LSL streams - with :func:`mne_lsl.lsl.resolve_streams` and create an inlet with - :class:`~mne_lsl.lsl.StreamInlet`. - """ - - _name: Incomplete - _stype: Incomplete - _source_id: Incomplete - - def __init__( - self, - bufsize: float, - *, - name: str | None = None, - stype: str | None = None, - source_id: str | None = None, - ) -> None: ... - def __repr__(self) -> str: - """Representation of the instance.""" - - def acquire(self) -> None: - """Pull new samples in the buffer. - - Notes - ----- - This method is not needed if the stream was connected with an acquisition delay - different from ``0``. In this case, the acquisition is done automatically in a - background thread. - """ - _inlet: Incomplete - _sinfo: Incomplete - _info: Incomplete - _buffer: Incomplete - _timestamps: Incomplete - _picks_inlet: Incomplete - - def connect( - self, - acquisition_delay: float = 0.001, - *, - processing_flags: str | Sequence[str] | None = None, - timeout: float | None = 2, - ) -> StreamLSL: - """Connect to the LSL stream and initiate data collection in the buffer. - - Parameters - ---------- - acquisition_delay : float - Delay in seconds between 2 acquisition during which chunks of data are - pulled from the :class:`~mne_lsl.lsl.StreamInlet`. If ``0``, the automatic - acquisition in a background thread is disabled and the user must manually - call :meth:`~mne_lsl.stream.StreamLSL.acquire` to pull new samples. - processing_flags : list of str | ``'all'`` | None - Set the post-processing options. By default, post-processing is disabled. - Any combination of the processing flags is valid. The available flags are: - - * ``'clocksync'``: Automatic clock synchronization, equivalent to - manually adding the estimated - :meth:`~mne_lsl.lsl.StreamInlet.time_correction`. - * ``'dejitter'``: Remove jitter on the received timestamps with a - smoothing algorithm. - * ``'monotize'``: Force the timestamps to be monotically ascending. - This option should not be enable if ``'dejitter'`` is not enabled. - timeout : float | None - Optional timeout (in seconds) of the operation. ``None`` disables the - timeout. The timeout value is applied once to every operation supporting it. - - Returns - ------- - stream : instance of :class:`~mne_lsl.stream.StreamLSL` - The stream instance modified in-place. - - Notes - ----- - If all 3 stream identifiers ``name``, ``stype`` and ``source_id`` are left to - ``None``, resolution of the available streams will require a full ``timeout``, - blocking the execution until this function returns. If at least one of the 3 - stream identifiers is specified, resolution will stop as soon as one stream - matching the identifier is found. - """ - - def disconnect(self) -> StreamLSL: - """Disconnect from the LSL stream and interrupt data collection. - - Returns - ------- - stream : instance of :class:`~mne_lsl.stream.StreamLSL` - The stream instance modified in-place. - """ - - def _acquire(self) -> None: - """Update function pulling new samples in the buffer at a regular interval.""" - - def _reset_variables(self) -> None: - """Reset variables define after connection.""" - - @property - def compensation_grade(self) -> int | None: - """The current gradient compensation grade. - - :type: :class:`int` | None - """ - - @property - def connected(self) -> bool: - """Connection status of the stream. - - :type: :class:`bool` - """ - - @property - def name(self) -> str | None: - """Name of the LSL stream. - - :type: :class:`str` | None - """ - - @property - def sinfo(self) -> _BaseStreamInfo | None: - """StreamInfo of the connected stream. - - :type: :class:`~mne_lsl.lsl.StreamInfo` | None - """ - - @property - def stype(self) -> str | None: - """Type of the LSL stream. - - :type: :class:`str` | None - """ - - @property - def source_id(self) -> str | None: - """ID of the source of the LSL stream. - - :type: :class:`str` | None - """ diff --git a/tutorials/30_stream_manual.py b/tutorials/30_stream_manual.py index cf9fe5acb..7c153e0bf 100644 --- a/tutorials/30_stream_manual.py +++ b/tutorials/30_stream_manual.py @@ -5,7 +5,7 @@ .. include:: ./../../links.inc The :class:`~mne_lsl.stream.StreamLSL` object offers 2 mode of acquisition: automatic or -manual. In automatic mode, the stream objecdt acquires new chunks of samples at a +manual. In automatic mode, the stream object acquires new chunks of samples at a regular interval. In manual mode, the user has to call the :meth:`~mne_lsl.stream.StreamLSL.acquire` to acquire new chunks of samples from the network. The automatic or manual acquisition is selected via the ``acquisition_delay`` From 22c31af7571876b40eebce7de8b4b22f4652e48c Mon Sep 17 00:00:00 2001 From: Mathieu Scheltienne Date: Mon, 5 Aug 2024 10:56:40 +0200 Subject: [PATCH 02/18] add doc and log --- .github/actions/retry-step/action.yaml | 1 + doc/api/index.rst | 4 ++-- doc/changes/latest.rst | 3 ++- mne_lsl/stream/__init__.py | 3 ++- tutorials/40_epochs.py | 17 +++++++++++++++++ 5 files changed, 24 insertions(+), 4 deletions(-) create mode 100644 tutorials/40_epochs.py diff --git a/.github/actions/retry-step/action.yaml b/.github/actions/retry-step/action.yaml index 54d083d07..b6dbc68be 100644 --- a/.github/actions/retry-step/action.yaml +++ b/.github/actions/retry-step/action.yaml @@ -54,6 +54,7 @@ runs: eval $command_between_retries fi + echo "Retrying command after $exit_code error in attempt $attempt." attempt=$((attempt + 1)) sleep 1 done diff --git a/doc/api/index.rst b/doc/api/index.rst index a407bf664..bca9dccce 100644 --- a/doc/api/index.rst +++ b/doc/api/index.rst @@ -17,8 +17,8 @@ This is the reference for classes (``CamelCase`` names) and functions .. include:: lsl (low-level).rst -.. include:: utilities.rst - .. include:: abstract.rst +.. include:: utilities.rst + .. include:: legacy.rst diff --git a/doc/changes/latest.rst b/doc/changes/latest.rst index ff4c576ef..5159ff4bc 100644 --- a/doc/changes/latest.rst +++ b/doc/changes/latest.rst @@ -21,7 +21,8 @@ Version 1.5 - Add ``exclude='bads'`` argument to :meth:`~mne_lsl.stream.StreamLSL.get_data` to control exclusion from ``picks`` (:pr:`258` by `Mathieu Scheltienne`_) - Add :class:`~mne_lsl.stream.EpochsStream` to epoch a Stream object object on the fly (:pr:`258` by `Mathieu Scheltienne`_) - +- Add tutorial about :class:`~mne_lsl.stream.EpochsStream` (:pr:`302` by `Mathieu Scheltienne`_) +- Expose the abstract :class:`~mne_lsl.stream.BaseStream` object to create custom stream objects relying on different communication protocols (:pr:`302` by `Mathieu Scheltienne`_) Infrastructure -------------- diff --git a/mne_lsl/stream/__init__.py b/mne_lsl/stream/__init__.py index b52dc1b7b..e554c3f7d 100644 --- a/mne_lsl/stream/__init__.py +++ b/mne_lsl/stream/__init__.py @@ -1,3 +1,4 @@ -from . import epochs, stream_lsl +from . import base, epochs, stream_lsl +from .base import BaseStream from .epochs import EpochsStream from .stream_lsl import StreamLSL diff --git a/tutorials/40_epochs.py b/tutorials/40_epochs.py new file mode 100644 index 000000000..996631219 --- /dev/null +++ b/tutorials/40_epochs.py @@ -0,0 +1,17 @@ +""" +Epoching a Stream in real-time +============================== + +.. include:: ./../../links.inc + +The :class:`~mne_lsl.stream.EpochsStream` object can be used similarly to +:class:`mne.Epochs` to create epochs from a continuous stream of samples around events +of interest. + +.. note:: + + The :class:`~mne_lsl.stream.EpochsStream` object is designed to work with + any ``Stream`` object. At the time of writing, only + :class:`~mne_lsl.stream.StreamLSL` is available, but any object inheriting from the + abstract :class:`~mne_lsl.stream.BaseStream` object should work. +""" From 9caa917a427eb3877e671faea6ea1d6d6ad65d1c Mon Sep 17 00:00:00 2001 From: Mathieu Scheltienne Date: Mon, 5 Aug 2024 12:13:49 +0200 Subject: [PATCH 03/18] add tutorial --- doc/changes/latest.rst | 1 + examples/30_real_time_evoked_responses.py | 6 + tutorials/40_epochs.py | 235 ++++++++++++++++++++++ 3 files changed, 242 insertions(+) diff --git a/doc/changes/latest.rst b/doc/changes/latest.rst index 5159ff4bc..08912eee7 100644 --- a/doc/changes/latest.rst +++ b/doc/changes/latest.rst @@ -23,6 +23,7 @@ Version 1.5 - Add :class:`~mne_lsl.stream.EpochsStream` to epoch a Stream object object on the fly (:pr:`258` by `Mathieu Scheltienne`_) - Add tutorial about :class:`~mne_lsl.stream.EpochsStream` (:pr:`302` by `Mathieu Scheltienne`_) - Expose the abstract :class:`~mne_lsl.stream.BaseStream` object to create custom stream objects relying on different communication protocols (:pr:`302` by `Mathieu Scheltienne`_) + Infrastructure -------------- diff --git a/examples/30_real_time_evoked_responses.py b/examples/30_real_time_evoked_responses.py index 8ce00c979..586f9b677 100644 --- a/examples/30_real_time_evoked_responses.py +++ b/examples/30_real_time_evoked_responses.py @@ -26,6 +26,12 @@ # dataset and connect a :class:`~mne_lsl.stream.StreamLSL` to it. Then, we attach a # :class:`~mne_lsl.stream.EpochsStream` object to create epochs from the LSL stream. # The epochs will be created around the event ID ``1`` from the ``'STI 014'`` channel. +# +# .. note:: +# +# A ``chunk_size`` of 200 samples is used here to ensure stability and reliability +# while building the documentation on the CI. In practice, a ``chunk_size`` of 200 +# samples is too large to represent a real-time application. with PlayerLSL(raw, chunk_size=200, name="real-time-evoked-example"): stream = StreamLSL(bufsize=4, name="real-time-evoked-example") diff --git a/tutorials/40_epochs.py b/tutorials/40_epochs.py index 996631219..dbf28a841 100644 --- a/tutorials/40_epochs.py +++ b/tutorials/40_epochs.py @@ -14,4 +14,239 @@ any ``Stream`` object. At the time of writing, only :class:`~mne_lsl.stream.StreamLSL` is available, but any object inheriting from the abstract :class:`~mne_lsl.stream.BaseStream` object should work. + +A :class:`~mne_lsl.stream.EpochsStream` object support peak-to-peak rejection, baseline +correction and detrending. + +Event source +------------ + +A :class:`~mne_lsl.stream.EpochsStream` object requires an event source to create +epochs. 3 event sources are supported: + +- a set of ``'stim'`` channels within the attached ``Stream`` object. +- a set of ``'stim'`` channels within a separate ``Stream`` object. +- an irregularly sampled ``Stream`` object. + +.. note:: + + In the case of an irregularly sampled ``Stream`` object, only numerical streams are + supported at the moment because interaction with ``str`` streams in Python is not + as performant as interaction with numerical streams. + +Set of ``'stim'`` channels +-------------------------- + +The set of ``'stim'`` channels from which the events are extracted can be either part +of the regularly sampled ``Stream`` object epoched (argument ``stream``) or part of a +separate regularly sampled ``Stream`` object (argument ``event_stream``). In both case, +the channel(s) type should be ``'stim'`` and the channel(s) should be formatted for +:func:`mne.find_events` to correctly extract the events. The channels to consider are +provided in the argument ``event_channels`` and the events to consider in the argument +``event_id``. Let's create epochs around the event ID ``2`` from the ``'STI 014'`` +channel of MNE's sample dataset. """ + +import time + +import numpy as np +from matplotlib import pyplot as plt +from mne import EpochsArray, annotations_from_events, find_events +from mne.datasets import sample +from mne.io import read_raw_fif + +from mne_lsl.lsl import resolve_streams +from mne_lsl.player import PlayerLSL +from mne_lsl.stream import EpochsStream, StreamLSL + +data_path = sample.data_path() +fname = data_path / "MEG" / "sample" / "sample_audvis_raw.fif" +raw = read_raw_fif(fname, preload=False).pick(("meg", "stim")).crop(3, 212).load_data() +player = PlayerLSL( + raw, chunk_size=200, name="tutorial-epochs-1", annotations=False +).start() +player.info + +# %% +# .. note:: +# +# A ``chunk_size`` of 200 samples is used here to ensure stability and reliability +# while building the documentation on the CI. In practice, a ``chunk_size`` of 200 +# samples is too large to represent a real-time application. +# +# In the cell above, a mock LSL stream is created using the ``'meg'`` and ``'stim'`` +# channels of MNE's sample dataset. Now, we need to create a +# :class:`~mne_lsl.stream.StreamLSL` object connected to this mock LSL stream. The +# channel ``"MEG 2443"`` is marked as bad and the signal is filtered with a low-pass +# filter. + +stream = StreamLSL(bufsize=4, name="tutorial-epochs-1") +stream.connect(acquisition_delay=0.1, processing_flags="all") +stream.info["bads"] = ["MEG 2443"] # remove bad channel +stream.filter(None, 40, picks="grad") # filter signal +stream.info + +# %% +# Now, we can create epochs using this stream as source for both the epochs and the +# events. The ``'stim'`` channel ``'STI 014'`` is used to extract the events and epochs +# are created around the event ID ``2`` using the gradiometer channels. The epochs are +# created around the event, from 200 ms before the event to 500 ms after the event. A +# baseline correction is applied using the 200 first ms of the epoch as baseline. + +epochs = EpochsStream( + stream, + bufsize=20, # number of epoch held in the buffer + event_id=2, + event_channels="STI 014", + tmin=-0.2, + tmax=0.5, + baseline=(None, 0), + picks="grad", +).connect(acquisition_delay=0.1) +epochs.info + +# %% +# Note the ``bufsize`` argument in the cell above. This argument controls the number of +# epochs that are kept in memory. The actual size of the underlying numpy array depends +# on the number of epochs, the number of samples (controlled by ``tmin`` and ``tmax``) +# and the number of channels. +# +# Let's wait for a couple of epochs to enter in the buffer, and then let's convert the +# array to an MNE-Python :class:`~mne.Epochs` object and plot the evoked response. + +while epochs.n_new_epochs < 10: + time.sleep(0.5) + +data = epochs.get_data(n_epochs=epochs.n_new_epochs) +epochs_mne = EpochsArray(data, epochs.info) +epochs_mne.average().plot() +plt.show() + +# %% + +epochs.disconnect() +stream.disconnect() +player.stop() + +# %% +# Irregularly sampled stream +# -------------------------- +# +# The event source can also be an irregularly sampled stream. In this case, each channel +# represents a separate event. A new value entering the buffer of a channel is +# interpreted as an event, regardless of the value itself. For instance, we can fake +# an irregularly sampled numerical stream using a :class:`~mne_lsl.player.PlayerLSL` +# with a :class:`~mne.io.Raw` object which has :class:`~mne.Annotations` attached to it. + +events = find_events(raw, stim_channel="STI 014") +events = events[np.isin(events[:, 2], (1, 2))] # keep only events with ID 1 and 2 +annotations = annotations_from_events( + events, + raw.info["sfreq"], + event_desc={1: "ignore", 2: "event"}, + first_samp=raw.first_samp, +) +annotations.duration += 0.1 # set duration since annotations_from_events sets it to 0 +annotations + +# %% + +raw.set_annotations(annotations) +player = PlayerLSL( + raw, chunk_size=200, name="tutorial-epochs-2", annotations=True +).start() +player.info + +# %% +# We now have 2 LSL stream availables on the network, one of which is an irregularly +# sampled numerical streams of events. + +resolve_streams() + +# %% +# We can now create a :class:`~mne_lsl.stream.StreamLSL` object for each available +# stream on the network. + +stream = StreamLSL(bufsize=4, name="tutorial-epochs-2") +stream.connect(acquisition_delay=0.1, processing_flags="all") +stream.info["bads"] = ["MEG 2443"] # remove bad channel +stream.filter(None, 40, picks="grad") # filter signal +stream.info + +# %% + +stream_events = StreamLSL(bufsize=4, name="tutorial-epochs-2-annotations") +stream_events.connect(acquisition_delay=0.1, processing_flags="all") +stream_events.info + +# %% +# Let's first inspect the event stream once a couple of samples have been acquired. + +while stream_events.n_new_samples < 3: + time.sleep(0.5) +data, ts = stream_events.get_data(winsize=stream_events.n_new_samples) +print("Array of shape (n_channels, n_samples): ", data.shape) +data + +# %% +# Each channel corresponds to a given annotation, ``0`` to ``'ignore'`` and ``1`` to +# ``'event'``. The value is ``0`` when no annotation is present, and ``x`` when an +# annotation is present, with ``x`` being the duration of the annotation. +# +# Thus, this array can be interpreted as follows: +# +# .. code-block:: python +# +# array([[0.1, 0. , 0.1], +# [0. , 0.1, 0. ]]) +# +# - An annotation of 0.1 seconds labelled ``'ignore'`` was received at ``ts[0]``. +# - An annotation of 0.1 seconds labelled ``'event'`` was received at ``ts[1]``. +# - An annotation of 0.1 seconds labelled ``'ignore'`` was received at ``ts[2]``. +# +# We can now use those 2 streams to create epochs around the events of interest. + +epochs = EpochsStream( + stream, + bufsize=20, # number of epoch held in the buffer + event_id=None, + event_channels="event", # this argument now selects the events of interest + event_stream=stream_events, + tmin=-0.2, + tmax=0.5, + baseline=(None, 0), + picks="grad", +).connect(acquisition_delay=0.1) +epochs.info + +# %% +# Let's wait for a couple of epochs to enter in the buffer, and then let's convert the +# array to an MNE-Python :class:`~mne.Epochs` object and plot the power spectral +# density. + +while epochs.n_new_epochs < 10: + time.sleep(0.5) + +data = epochs.get_data(n_epochs=epochs.n_new_epochs) +epochs_mne = EpochsArray(data, epochs.info) +epochs_mne.compute_psd(fmax=40, tmin=0).plot() +plt.show() + +# %% +# Free resources +# -------------- +# +# When you are done with a :class:`~mne_lsl.player.PlayerLSL`, a +# :class:`~mne_lsl.stream.StreamLSL` or a :class:`~mne_lsl.stream.EpochsStream` don't +# forget to free the resources they both use to continuously mock an LSL stream or +# receive new data from an LSL stream. + +epochs.disconnect() + +# %% + +stream.disconnect() + +# %% + +player.stop() From 505c44dd8f0a4eebc2846e2257c702ddfcbe977e Mon Sep 17 00:00:00 2001 From: Mathieu Scheltienne Date: Mon, 5 Aug 2024 13:49:36 +0200 Subject: [PATCH 04/18] try to improve event pruning --- mne_lsl/player/player_lsl.py | 8 +++++++- mne_lsl/stream/epochs.py | 19 +++++++++++++------ tutorials/40_epochs.py | 2 +- 3 files changed, 21 insertions(+), 8 deletions(-) diff --git a/mne_lsl/player/player_lsl.py b/mne_lsl/player/player_lsl.py index 46c16c46a..40dba117b 100644 --- a/mne_lsl/player/player_lsl.py +++ b/mne_lsl/player/player_lsl.py @@ -141,6 +141,7 @@ def __init__( self._sinfo_annotations.set_channel_types("annotations") self._sinfo_annotations.set_channel_units("none") self._annotations_idx = self._raw.time_as_index(self._raw.annotations.onset) + self._annotations_idx -= self._raw.first_samp else: self._sinfo_annotations = None self._annotations_idx = None @@ -319,7 +320,12 @@ def _stream_annotations( return None # estimate LSL timestamp of each annotation timestamps = ( - start_timestamp + self.annotations.onset[idx] - self._raw.times[start] + start_timestamp + + ( + self.annotations.onset[idx] + - self._raw.first_samp / self._raw.info["sfreq"] + ) + - self._raw.times[start] ) # one-hot encode the description and duration in the channels idx_ = np.array( diff --git a/mne_lsl/stream/epochs.py b/mne_lsl/stream/epochs.py index 0da303fbc..4f89de1f8 100644 --- a/mne_lsl/stream/epochs.py +++ b/mne_lsl/stream/epochs.py @@ -416,6 +416,11 @@ def _acquire(self) -> None: # split the different acquisition scenarios to retrieve new events to add to # the buffer. data, ts = self._stream.get_data(exclude=()) + # in case the epochs were created on a newly created stream object, we need + # to remove the 'empty buffer' samples. + n_samples = np.count_nonzero(ts) + data = data[:, -n_samples:] + ts = ts[-n_samples:] if self._event_stream is None: picks_events = _picks_to_idx( self._stream._info, self._event_channels, exclude="bads" @@ -453,16 +458,18 @@ def _acquire(self) -> None: self._event_stream is not None and self._event_stream._info["sfreq"] == 0 ): + # don't select only the new events as they might all fall outside of + # the attached stream ts buffer, instead always look through all + # available events. data_events, ts_events = self._event_stream.get_data( - winsize=self._event_stream._n_new_samples, - picks=self._event_channels, - exclude=(), + picks=self._event_channels, exclude=() ) + n_events = np.count_nonzero(ts_events) events = np.vstack( [ - np.arange(ts_events.size, dtype=np.int64), - np.zeros(ts_events.size, dtype=np.int64), - np.argmax(data_events, axis=0), + np.arange(n_events, dtype=np.int64), + np.zeros(n_events, dtype=np.int64), + np.argmax(data_events[:, -n_events:], axis=0), ], dtype=np.int64, ).T diff --git a/tutorials/40_epochs.py b/tutorials/40_epochs.py index dbf28a841..8c090a63a 100644 --- a/tutorials/40_epochs.py +++ b/tutorials/40_epochs.py @@ -175,7 +175,7 @@ # %% -stream_events = StreamLSL(bufsize=4, name="tutorial-epochs-2-annotations") +stream_events = StreamLSL(bufsize=20, name="tutorial-epochs-2-annotations") stream_events.connect(acquisition_delay=0.1, processing_flags="all") stream_events.info From 71eae0085e00b55d5d4ab5c024dcb187806594ca Mon Sep 17 00:00:00 2001 From: Mathieu Scheltienne Date: Mon, 5 Aug 2024 14:16:31 +0200 Subject: [PATCH 05/18] remove 'empty buffer' elements --- mne_lsl/stream/epochs.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/mne_lsl/stream/epochs.py b/mne_lsl/stream/epochs.py index 4f89de1f8..8728a6c12 100644 --- a/mne_lsl/stream/epochs.py +++ b/mne_lsl/stream/epochs.py @@ -465,11 +465,13 @@ def _acquire(self) -> None: picks=self._event_channels, exclude=() ) n_events = np.count_nonzero(ts_events) + ts_events = ts_events[-n_events:] + data_events = data_events[:, -n_events:] events = np.vstack( [ - np.arange(n_events, dtype=np.int64), - np.zeros(n_events, dtype=np.int64), - np.argmax(data_events[:, -n_events:], axis=0), + np.arange(ts_events.size, dtype=np.int64), + np.zeros(ts_events.size, dtype=np.int64), + np.argmax(data_events, axis=0), ], dtype=np.int64, ).T From 63b1f5142b6aeea6a1bf11e019556772161109a9 Mon Sep 17 00:00:00 2001 From: Mathieu Scheltienne Date: Mon, 5 Aug 2024 14:18:40 +0200 Subject: [PATCH 06/18] DRY --- mne_lsl/stream/epochs.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/mne_lsl/stream/epochs.py b/mne_lsl/stream/epochs.py index 8728a6c12..91715db5c 100644 --- a/mne_lsl/stream/epochs.py +++ b/mne_lsl/stream/epochs.py @@ -416,11 +416,7 @@ def _acquire(self) -> None: # split the different acquisition scenarios to retrieve new events to add to # the buffer. data, ts = self._stream.get_data(exclude=()) - # in case the epochs were created on a newly created stream object, we need - # to remove the 'empty buffer' samples. - n_samples = np.count_nonzero(ts) - data = data[:, -n_samples:] - ts = ts[-n_samples:] + data, ts = _remove_empty_elements(data, ts) if self._event_stream is None: picks_events = _picks_to_idx( self._stream._info, self._event_channels, exclude="bads" @@ -443,6 +439,7 @@ def _acquire(self) -> None: data_events, ts_events = self._event_stream.get_data( picks=self._event_channels, exclude=() ) + data_events, ts_events = _remove_empty_elements(data_events, ts_events) events = _find_events_in_stim_channels( data_events, self._event_channels, self._info["sfreq"] ) @@ -464,9 +461,7 @@ def _acquire(self) -> None: data_events, ts_events = self._event_stream.get_data( picks=self._event_channels, exclude=() ) - n_events = np.count_nonzero(ts_events) - ts_events = ts_events[-n_events:] - data_events = data_events[:, -n_events:] + data_events, ts_events = _remove_empty_elements(data_events, ts_events) events = np.vstack( [ np.arange(ts_events.size, dtype=np.int64), @@ -958,3 +953,13 @@ def _process_data( if detrend_type is not None: data = detrend(data, axis=1, type=detrend_type, overwrite_data=True) return data + + +def _remove_empty_elements( + data: ScalarArray, ts: NDArray[np.float64] +) -> tuple[ScalarArray, NDArray[np.float64]]: + """Remove empty elements from the data and ts array.""" + n_samples = np.count_nonzero(ts) + data = data[:, -n_samples:] + ts = ts[-n_samples:] + return data, ts From 1b0e8c046708963aebcc15a2b9d1bce3980d68cd Mon Sep 17 00:00:00 2001 From: Mathieu Scheltienne Date: Mon, 5 Aug 2024 14:21:39 +0200 Subject: [PATCH 07/18] test remove_empty_elts --- mne_lsl/stream/tests/test_epochs.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/mne_lsl/stream/tests/test_epochs.py b/mne_lsl/stream/tests/test_epochs.py index afa61a04c..a23fa7a48 100644 --- a/mne_lsl/stream/tests/test_epochs.py +++ b/mne_lsl/stream/tests/test_epochs.py @@ -21,6 +21,7 @@ _find_events_in_stim_channels, _process_data, _prune_events, + _remove_empty_elements, ) if TYPE_CHECKING: @@ -742,3 +743,20 @@ def test_ensure_event_id_with_event_stream(): with pytest.warns(RuntimeWarning, match="should be set to None"): _ensure_event_id(dict(event=1), event_stream) event_stream.disconnect() + + +def test_remove_empty_elements(): + """Test _remove_empty_elements.""" + data = np.ones(10).reshape(1, -1) + ts = np.zeros(10) + ts[5:] = np.arange(5) + data, ts = _remove_empty_elements(data, ts) + assert data.size == ts.size + assert ts.size == 4 + + data = np.ones(20).reshape(2, -1) + ts = np.zeros(10) + ts[5:] = np.arange(5) + data, ts = _remove_empty_elements(data, ts) + assert data.shape[1] == ts.size + assert ts.size == 4 From 4ff4efe7ead1f6b1484fb42e43fe8bdcdefe00d0 Mon Sep 17 00:00:00 2001 From: Mathieu Scheltienne Date: Mon, 5 Aug 2024 15:11:32 +0200 Subject: [PATCH 08/18] update dataset paths --- mne_lsl/lsl/tests/test_stream_info.py | 2 +- mne_lsl/utils/tests/test_test.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/mne_lsl/lsl/tests/test_stream_info.py b/mne_lsl/lsl/tests/test_stream_info.py index be3bab5d3..88f43ca24 100644 --- a/mne_lsl/lsl/tests/test_stream_info.py +++ b/mne_lsl/lsl/tests/test_stream_info.py @@ -232,7 +232,7 @@ def test_stream_info_desc_from_info(close_io): compare_infos(info, info_retrieved) # test with FIFF file from the MNE sample dataset - fname = testing.data_path() / "sample_audvis_raw.fif" + fname = testing.data_path() / "mne-sample" / "sample_audvis_raw.fif" raw = read_raw_fif(fname, preload=False) sinfo = StreamInfo( "test", "", len(raw.ch_names), raw.info["sfreq"], np.float32, uuid.uuid4().hex diff --git a/mne_lsl/utils/tests/test_test.py b/mne_lsl/utils/tests/test_test.py index d5390cca2..4ebba0cdb 100644 --- a/mne_lsl/utils/tests/test_test.py +++ b/mne_lsl/utils/tests/test_test.py @@ -51,7 +51,7 @@ def test_match_stream_and_raw_data(raw_object, request): def test_compare_infos(raw): """Test that the partial info comparison works as intended.""" - info = read_info(testing.data_path() / "sample_audvis_raw.fif") + info = read_info(testing.data_path() / "mne-sample" / "sample_audvis_raw.fif") with pytest.raises(AssertionError): compare_infos(info, raw.info) From b506709496083d2d02eceb745b86640dc46307f1 Mon Sep 17 00:00:00 2001 From: Mathieu Scheltienne Date: Mon, 5 Aug 2024 15:30:26 +0200 Subject: [PATCH 09/18] update imports and add tests --- .../actions/get-testing-dataset/action.yaml | 1 + .github/workflows/doc.yaml | 1 - examples/30_real_time_evoked_responses.py | 5 +- mne_lsl/stream/tests/test_epochs.py | 84 ++++++++++++++++++- tutorials/40_epochs.py | 5 +- 5 files changed, 86 insertions(+), 10 deletions(-) diff --git a/.github/actions/get-testing-dataset/action.yaml b/.github/actions/get-testing-dataset/action.yaml index c8eccc9bb..b725ed384 100644 --- a/.github/actions/get-testing-dataset/action.yaml +++ b/.github/actions/get-testing-dataset/action.yaml @@ -48,6 +48,7 @@ runs: run: rm mne_lsl_dataset_version.txt - name: Cache MNE sample dataset + if: ${{ inputs.mne-sample == 'true' }} id: cache_mne_sample uses: actions/cache@v4 with: diff --git a/.github/workflows/doc.yaml b/.github/workflows/doc.yaml index da6c19e7c..58ce56ad1 100644 --- a/.github/workflows/doc.yaml +++ b/.github/workflows/doc.yaml @@ -37,7 +37,6 @@ jobs: with: sample: "true" testing: "false" - mne-sample: "true" - name: Build doc uses: ./.github/actions/retry-step with: diff --git a/examples/30_real_time_evoked_responses.py b/examples/30_real_time_evoked_responses.py index 586f9b677..d1997ff00 100644 --- a/examples/30_real_time_evoked_responses.py +++ b/examples/30_real_time_evoked_responses.py @@ -9,16 +9,15 @@ import numpy as np from matplotlib import pyplot as plt from mne import EvokedArray, combine_evoked -from mne.datasets import sample from mne.io import read_raw_fif +from mne_lsl.datasets import sample from mne_lsl.player import PlayerLSL from mne_lsl.stream import EpochsStream, StreamLSL from mne_lsl.utils.logs import logger # dataset used in the example -data_path = sample.data_path() -fname = data_path / "MEG" / "sample" / "sample_audvis_raw.fif" +fname = sample.data_path() / "mne-sample" / "sample_audvis_raw.fif" raw = read_raw_fif(fname, preload=False).pick(("meg", "stim")).crop(3, 212).load_data() # %% diff --git a/mne_lsl/stream/tests/test_epochs.py b/mne_lsl/stream/tests/test_epochs.py index a23fa7a48..51843c36d 100644 --- a/mne_lsl/stream/tests/test_epochs.py +++ b/mne_lsl/stream/tests/test_epochs.py @@ -7,12 +7,13 @@ import numpy as np import pytest from mne import annotations_from_events, create_info, find_events -from mne.io import RawArray +from mne.io import RawArray, read_raw_fif from mne.io.base import BaseRaw from numpy.testing import assert_allclose -from .. import EpochsStream, StreamLSL -from ..epochs import ( +from mne_lsl.datasets import testing +from mne_lsl.stream import EpochsStream, StreamLSL +from mne_lsl.stream.epochs import ( _check_baseline, _check_reject_flat, _check_reject_tmin_tmax, @@ -760,3 +761,80 @@ def test_remove_empty_elements(): data, ts = _remove_empty_elements(data, ts) assert data.shape[1] == ts.size assert ts.size == 4 + + +@pytest.fixture() +def raw_with_annotations_and_first_samp() -> BaseRaw: + """Raw with annotations and first_samp set.""" + fname = testing.data_path() / "mne-sample" / "sample_audvis_raw.fif" + raw = read_raw_fif(fname, preload=True) + events = find_events(raw, stim_channel="STI 014") + events = events[np.isin(events[:, 2], (1, 2))] # keep only events with ID 1 and 2 + annotations = annotations_from_events( + events, + raw.info["sfreq"], + event_desc={1: "ignore", 2: "event"}, + first_samp=raw.first_samp, + ) + annotations.duration += 0.1 # set duration, annotations_from_events sets it to 0 + raw.set_annotations(annotations) + return raw + + +@pytest.fixture() +def _mock_lsl_stream_with_annotations_and_first_Samp( + raw_with_annotations, request, chunk_size +): + """Create a mock LSL stream streaming events with annotations and first_samp.""" + manager = mp.Manager() + status = manager.Value("i", 0) + name = f"P_{request.node.name}" + process = mp.Process( + target=_player_mock_lsl_stream, + args=(raw_with_annotations, name, chunk_size, status), + ) + process.start() + while status.value != 1: + pass + yield + status.value = 0 + process.join(timeout=2) + process.kill() + + +@pytest.mark.slow() +@pytest.mark.timeout(30) +@pytest.mark.usefixtures("_mock_lsl_stream_with_annotations") +def test_epochs_with_irregular_numerical_event_stream_and_first_samp(): + """Test creating epochs from an event stream from raw with first_samp.""" + event_stream = StreamLSL(10, stype="annotations").connect(acquisition_delay=0.1) + name = event_stream.name.removesuffix("-annotations") + stream = StreamLSL(0.5, name=name).connect(acquisition_delay=0.1) + stream.info["bads"] = ["MEG 2443"] # remove bad channel + epochs = EpochsStream( + stream, + bufsize=20, # number of epoch held in the buffer + event_id=None, + event_channels="event", # this argument now selects the events of interest + event_stream=event_stream, + tmin=-0.2, + tmax=0.5, + baseline=(None, 0), + picks="grad", + ).connect(acquisition_delay=0.1) + while epochs.n_new_epochs == 0: + time.sleep(0.1) + n = epochs.n_new_epochs + while epochs.n_new_epochs == n: + time.sleep(0.1) + n2 = epochs.n_new_epochs + assert n < n2 + while epochs.n_new_epochs == n: + time.sleep(0.1) + n3 = epochs.n_new_epochs + assert n2 < n3 + epochs.get_data() + assert epochs.n_new_epochs == 0 + epochs.disconnect() + stream.disconnect() + event_stream.disconnect() diff --git a/tutorials/40_epochs.py b/tutorials/40_epochs.py index 8c090a63a..76c671d70 100644 --- a/tutorials/40_epochs.py +++ b/tutorials/40_epochs.py @@ -52,15 +52,14 @@ import numpy as np from matplotlib import pyplot as plt from mne import EpochsArray, annotations_from_events, find_events -from mne.datasets import sample from mne.io import read_raw_fif +from mne_lsl.datasets import sample from mne_lsl.lsl import resolve_streams from mne_lsl.player import PlayerLSL from mne_lsl.stream import EpochsStream, StreamLSL -data_path = sample.data_path() -fname = data_path / "MEG" / "sample" / "sample_audvis_raw.fif" +fname = sample.data_path() / "mne-sample" / "sample_audvis_raw.fif" raw = read_raw_fif(fname, preload=False).pick(("meg", "stim")).crop(3, 212).load_data() player = PlayerLSL( raw, chunk_size=200, name="tutorial-epochs-1", annotations=False From 613fdc49ed6d3a837ad4a7f67ef0659f3f43b985 Mon Sep 17 00:00:00 2001 From: Mathieu Scheltienne Date: Mon, 5 Aug 2024 15:30:50 +0200 Subject: [PATCH 10/18] fix typo --- mne_lsl/stream/tests/test_epochs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mne_lsl/stream/tests/test_epochs.py b/mne_lsl/stream/tests/test_epochs.py index 51843c36d..91dea74a3 100644 --- a/mne_lsl/stream/tests/test_epochs.py +++ b/mne_lsl/stream/tests/test_epochs.py @@ -782,7 +782,7 @@ def raw_with_annotations_and_first_samp() -> BaseRaw: @pytest.fixture() -def _mock_lsl_stream_with_annotations_and_first_Samp( +def _mock_lsl_stream_with_annotations_and_first_samp( raw_with_annotations, request, chunk_size ): """Create a mock LSL stream streaming events with annotations and first_samp.""" @@ -804,7 +804,7 @@ def _mock_lsl_stream_with_annotations_and_first_Samp( @pytest.mark.slow() @pytest.mark.timeout(30) -@pytest.mark.usefixtures("_mock_lsl_stream_with_annotations") +@pytest.mark.usefixtures("_mock_lsl_stream_with_annotations_and_first_samp") def test_epochs_with_irregular_numerical_event_stream_and_first_samp(): """Test creating epochs from an event stream from raw with first_samp.""" event_stream = StreamLSL(10, stype="annotations").connect(acquisition_delay=0.1) From 6768d358c83d0530ae25470d94ab643c58d7ad75 Mon Sep 17 00:00:00 2001 From: Mathieu Scheltienne Date: Mon, 5 Aug 2024 15:38:00 +0200 Subject: [PATCH 11/18] fix typos --- mne_lsl/stream/tests/test_epochs.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/mne_lsl/stream/tests/test_epochs.py b/mne_lsl/stream/tests/test_epochs.py index 91dea74a3..ca8750092 100644 --- a/mne_lsl/stream/tests/test_epochs.py +++ b/mne_lsl/stream/tests/test_epochs.py @@ -783,7 +783,7 @@ def raw_with_annotations_and_first_samp() -> BaseRaw: @pytest.fixture() def _mock_lsl_stream_with_annotations_and_first_samp( - raw_with_annotations, request, chunk_size + raw_with_annotations_and_first_samp, request, chunk_size ): """Create a mock LSL stream streaming events with annotations and first_samp.""" manager = mp.Manager() @@ -791,7 +791,7 @@ def _mock_lsl_stream_with_annotations_and_first_samp( name = f"P_{request.node.name}" process = mp.Process( target=_player_mock_lsl_stream, - args=(raw_with_annotations, name, chunk_size, status), + args=(raw_with_annotations_and_first_samp, name, chunk_size, status), ) process.start() while status.value != 1: @@ -803,13 +803,13 @@ def _mock_lsl_stream_with_annotations_and_first_samp( @pytest.mark.slow() -@pytest.mark.timeout(30) +@pytest.mark.timeout(30) # takes under 9s locally @pytest.mark.usefixtures("_mock_lsl_stream_with_annotations_and_first_samp") def test_epochs_with_irregular_numerical_event_stream_and_first_samp(): """Test creating epochs from an event stream from raw with first_samp.""" event_stream = StreamLSL(10, stype="annotations").connect(acquisition_delay=0.1) name = event_stream.name.removesuffix("-annotations") - stream = StreamLSL(0.5, name=name).connect(acquisition_delay=0.1) + stream = StreamLSL(2, name=name).connect(acquisition_delay=0.1) stream.info["bads"] = ["MEG 2443"] # remove bad channel epochs = EpochsStream( stream, @@ -823,14 +823,14 @@ def test_epochs_with_irregular_numerical_event_stream_and_first_samp(): picks="grad", ).connect(acquisition_delay=0.1) while epochs.n_new_epochs == 0: - time.sleep(0.1) + time.sleep(0.2) n = epochs.n_new_epochs while epochs.n_new_epochs == n: - time.sleep(0.1) + time.sleep(0.2) n2 = epochs.n_new_epochs assert n < n2 - while epochs.n_new_epochs == n: - time.sleep(0.1) + while epochs.n_new_epochs == n2: + time.sleep(0.2) n3 = epochs.n_new_epochs assert n2 < n3 epochs.get_data() From 3b0a84cb8fc6e049a86685002456c983b923c2f1 Mon Sep 17 00:00:00 2001 From: Mathieu Scheltienne Date: Mon, 5 Aug 2024 16:36:39 +0200 Subject: [PATCH 12/18] take into account tmin when pruning events --- mne_lsl/stream/epochs.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/mne_lsl/stream/epochs.py b/mne_lsl/stream/epochs.py index 91715db5c..de52bdb3d 100644 --- a/mne_lsl/stream/epochs.py +++ b/mne_lsl/stream/epochs.py @@ -311,6 +311,7 @@ def connect(self, acquisition_delay: float = 0.001) -> EpochsStream: self._stream._info, self._picks_init, "all", "bads", allow_empty=False ) self._info = pick_info(self._stream._info, self._picks) + self._tmin_shift = round(self._tmin * self._info["sfreq"]) self._ch_idx_by_type = channel_indices_by_type(self._info) self._buffer = np.zeros( ( @@ -431,6 +432,7 @@ def _acquire(self) -> None: ts, self._last_ts, None, + self._tmin_shift, ) elif ( self._event_stream is not None @@ -450,6 +452,7 @@ def _acquire(self) -> None: ts, self._last_ts, ts_events, + self._tmin_shift, ) elif ( self._event_stream is not None @@ -471,7 +474,13 @@ def _acquire(self) -> None: dtype=np.int64, ).T events = _prune_events( - events, None, self._buffer.shape[1], ts, self._last_ts, ts_events + events, + None, + self._buffer.shape[1], + ts, + self._last_ts, + ts_events, + self._tmin_shift, ) else: # pragma: no cover raise RuntimeError( @@ -488,9 +497,8 @@ def _acquire(self) -> None: (events.shape[0], self._buffer.shape[1], self._picks.size), dtype=data.dtype, ) - shift = round(self._tmin * self._info["sfreq"]) # 28.7 ns ± 0.369 ns for k, start in enumerate(events[:, 0]): - start += shift + start += self._tmin_shift data_selection[k] = data[ self._picks, start : start + self._buffer.shape[1] ].T @@ -541,6 +549,7 @@ def _reset_variables(self): self._last_ts = None self._n_new_epochs = 0 self._picks = None + self._tmin_shift = None def _submit_acquisition_job(self) -> None: """Submit a new acquisition job, if applicable.""" @@ -849,6 +858,7 @@ def _prune_events( ts: NDArray[np.float64], last_ts: Optional[float], ts_events: Optional[NDArray[np.float64]], + tmin_shift: float, ) -> NDArray[np.int64]: """Prune events based on criteria and buffer size.""" # remove events outside of the event_id dictionary @@ -863,7 +873,7 @@ def _prune_events( events = events[sel] events[:, 0] = np.searchsorted(ts, ts_events[events[:, 0]], side="left") # remove events which can't fit an entire epoch - sel = np.where(events[:, 0] + buffer_size <= ts.size)[0] + sel = np.where(events[:, 0] + tmin_shift + buffer_size <= ts.size)[0] events = events[sel] # remove events which have already been moved to the buffer if last_ts is not None: From b161e68b3b71329fb17c075ff5457c2bf712ddc5 Mon Sep 17 00:00:00 2001 From: Mathieu Scheltienne Date: Mon, 5 Aug 2024 16:40:46 +0200 Subject: [PATCH 13/18] fix tests --- mne_lsl/stream/tests/test_epochs.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/mne_lsl/stream/tests/test_epochs.py b/mne_lsl/stream/tests/test_epochs.py index ca8750092..5be1f1e89 100644 --- a/mne_lsl/stream/tests/test_epochs.py +++ b/mne_lsl/stream/tests/test_epochs.py @@ -203,27 +203,33 @@ def events() -> NDArray[np.int64]: def test_prune_events(events: NDArray[np.int64]): """Test pruning events.""" ts = np.arange(10000, 11000, 1.8) - events_ = _prune_events(events, dict(a=1, b=2, c=3), 10, ts, None, None) + events_ = _prune_events(events, dict(a=1, b=2, c=3), 10, ts, None, None, 0) assert_allclose(events_, events) # test pruning events outside of the event_id dictionary - events_ = _prune_events(events, dict(a=1, c=3), 10, ts, None, None) + events_ = _prune_events(events, dict(a=1, c=3), 10, ts, None, None, 0) assert sorted(np.unique(events_[:, 2])) == [1, 3] # test pruning events that can't fit in the buffer ts = np.arange(5) - events_ = _prune_events(events, dict(a=1, b=2, c=3), 10, ts, None, None) + events_ = _prune_events(events, dict(a=1, b=2, c=3), 10, ts, None, None, 0) assert events_.size == 0 ts = np.arange(10000, 11000, 1.8) # ts.size == 556 - events_ = _prune_events(events, dict(a=1, b=2, c=3), 500, ts, None, None) + events_ = _prune_events(events, dict(a=1, b=2, c=3), 500, ts, None, None, 0) assert events_[-1, 0] + 500 <= ts.size assert events_[-1, 0] == 50 # events @ 60, 70, 80, ... should be dropped + # test fitting in the buffer with tmin + ts = np.arange(15) + events_ = _prune_events(events, dict(a=1, b=2, c=3), 10, ts, None, None, -7) + assert events_.shape[0] == 1 # test pruning events that have already been moved to the buffer ts = np.arange(10000, 11000, 1.8) # ts.size == 556 - events_ = _prune_events(events, dict(a=1, b=2, c=3), 10, ts, ts[events[3, 0]], None) + events_ = _prune_events( + events, dict(a=1, b=2, c=3), 10, ts, ts[events[3, 0]], None, 0 + ) assert_allclose(events_, events[4:, :]) # test pruning events from an event stream, which converts the index to index in ts ts = np.arange(1000) ts_events = np.arange(500) * 2 + 0.5 # mock a different sampling frequency - events_ = _prune_events(events, dict(a=1, b=2, c=3), 10, ts, None, ts_events) + events_ = _prune_events(events, dict(a=1, b=2, c=3), 10, ts, None, ts_events, 0) assert_allclose(events_[:, 2], events[:, 2]) # with the half sampling rate + 0.5 set above, we should be selecting: # from: 10, 20, 30, 40, ... corresponding to 20.5, 40.5, 60.5, ... From 2837e1219cc893e9a6ccb547dd0d0b22a4426c14 Mon Sep 17 00:00:00 2001 From: Mathieu Scheltienne Date: Mon, 5 Aug 2024 16:41:58 +0200 Subject: [PATCH 14/18] remove crop --- examples/30_real_time_evoked_responses.py | 2 +- tutorials/40_epochs.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/30_real_time_evoked_responses.py b/examples/30_real_time_evoked_responses.py index d1997ff00..be8d23cab 100644 --- a/examples/30_real_time_evoked_responses.py +++ b/examples/30_real_time_evoked_responses.py @@ -18,7 +18,7 @@ # dataset used in the example fname = sample.data_path() / "mne-sample" / "sample_audvis_raw.fif" -raw = read_raw_fif(fname, preload=False).pick(("meg", "stim")).crop(3, 212).load_data() +raw = read_raw_fif(fname, preload=False).pick(("meg", "stim")).load_data() # %% # First, we create a mock stream with :class:`mne_lsl.player.PlayerLSL` from the sample diff --git a/tutorials/40_epochs.py b/tutorials/40_epochs.py index 76c671d70..a3470149b 100644 --- a/tutorials/40_epochs.py +++ b/tutorials/40_epochs.py @@ -60,7 +60,7 @@ from mne_lsl.stream import EpochsStream, StreamLSL fname = sample.data_path() / "mne-sample" / "sample_audvis_raw.fif" -raw = read_raw_fif(fname, preload=False).pick(("meg", "stim")).crop(3, 212).load_data() +raw = read_raw_fif(fname, preload=False).pick(("meg", "stim")).load_data() player = PlayerLSL( raw, chunk_size=200, name="tutorial-epochs-1", annotations=False ).start() From 136d83728f3f8fb8c146c393632399c54bbc8ae7 Mon Sep 17 00:00:00 2001 From: Mathieu Scheltienne Date: Mon, 5 Aug 2024 16:54:14 +0200 Subject: [PATCH 15/18] reduce verbose --- tutorials/40_epochs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tutorials/40_epochs.py b/tutorials/40_epochs.py index a3470149b..ec8547bd9 100644 --- a/tutorials/40_epochs.py +++ b/tutorials/40_epochs.py @@ -117,7 +117,7 @@ time.sleep(0.5) data = epochs.get_data(n_epochs=epochs.n_new_epochs) -epochs_mne = EpochsArray(data, epochs.info) +epochs_mne = EpochsArray(data, epochs.info, verbose="WARNING") epochs_mne.average().plot() plt.show() @@ -227,7 +227,7 @@ time.sleep(0.5) data = epochs.get_data(n_epochs=epochs.n_new_epochs) -epochs_mne = EpochsArray(data, epochs.info) +epochs_mne = EpochsArray(data, epochs.info, verbose="WARNING") epochs_mne.compute_psd(fmax=40, tmin=0).plot() plt.show() From 5ea9755f56cc5a72fc215302cc9f61a0c412a067 Mon Sep 17 00:00:00 2001 From: Mathieu Scheltienne Date: Mon, 5 Aug 2024 17:01:21 +0200 Subject: [PATCH 16/18] fix pruning --- mne_lsl/stream/epochs.py | 8 ++++++-- mne_lsl/stream/tests/test_epochs.py | 6 ++++++ 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/mne_lsl/stream/epochs.py b/mne_lsl/stream/epochs.py index de52bdb3d..365d4cc2c 100644 --- a/mne_lsl/stream/epochs.py +++ b/mne_lsl/stream/epochs.py @@ -872,8 +872,12 @@ def _prune_events( )[0] events = events[sel] events[:, 0] = np.searchsorted(ts, ts_events[events[:, 0]], side="left") - # remove events which can't fit an entire epoch - sel = np.where(events[:, 0] + tmin_shift + buffer_size <= ts.size)[0] + sel = np.where(0 <= events[:, 0] + tmin_shift)[0] + # remove events which can't fit an entire epoch and/or are outside of the buffer + sel = np.where( + (0 <= events[:, 0] + tmin_shift) + & (events[:, 0] + tmin_shift + buffer_size <= ts.size) + )[0] events = events[sel] # remove events which have already been moved to the buffer if last_ts is not None: diff --git a/mne_lsl/stream/tests/test_epochs.py b/mne_lsl/stream/tests/test_epochs.py index 5be1f1e89..764edce8a 100644 --- a/mne_lsl/stream/tests/test_epochs.py +++ b/mne_lsl/stream/tests/test_epochs.py @@ -220,6 +220,12 @@ def test_prune_events(events: NDArray[np.int64]): ts = np.arange(15) events_ = _prune_events(events, dict(a=1, b=2, c=3), 10, ts, None, None, -7) assert events_.shape[0] == 1 + assert events_[0, 0] == 10 # event @ 10 should be kept + events_ = _prune_events(events, dict(a=1, b=2, c=3), 10, ts, None, None, -12) + assert events_.shape[0] == 0 + events_ = _prune_events(events, dict(a=1, b=2, c=3), 10, ts, None, None, -16) + assert events_.shape[0] == 1 + assert events_[0, 0] == 20 # event @ 20 should be kept # test pruning events that have already been moved to the buffer ts = np.arange(10000, 11000, 1.8) # ts.size == 556 events_ = _prune_events( From 07b5a5bce632ccc3b08e29e57c8cb9060f0fd2ab Mon Sep 17 00:00:00 2001 From: Mathieu Scheltienne Date: Mon, 5 Aug 2024 17:19:48 +0200 Subject: [PATCH 17/18] return self on disconnection --- mne_lsl/stream/epochs.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mne_lsl/stream/epochs.py b/mne_lsl/stream/epochs.py index 365d4cc2c..bf6587990 100644 --- a/mne_lsl/stream/epochs.py +++ b/mne_lsl/stream/epochs.py @@ -330,7 +330,7 @@ def connect(self, acquisition_delay: float = 0.001) -> EpochsStream: self._executor.submit(self._acquire) return self - def disconnect(self) -> None: + def disconnect(self) -> EpochsStream: """Stop acquisition of epochs from the connected Stream.""" if hasattr(self._stream, "_epochs") and self in self._stream._epochs: self._stream._epochs.remove(self) @@ -346,6 +346,7 @@ def disconnect(self) -> None: if hasattr(self, "_executor") and self._executor is not None: self._executor.shutdown(wait=True, cancel_futures=True) self._reset_variables() + return self @fill_doc def get_data( From 28b85e2b3a43495be0937cc45d1dcda512ebcb81 Mon Sep 17 00:00:00 2001 From: Mathieu Scheltienne Date: Mon, 5 Aug 2024 17:27:13 +0200 Subject: [PATCH 18/18] fix docstring --- mne_lsl/stream/epochs.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/mne_lsl/stream/epochs.py b/mne_lsl/stream/epochs.py index bf6587990..72dac6bbb 100644 --- a/mne_lsl/stream/epochs.py +++ b/mne_lsl/stream/epochs.py @@ -331,7 +331,13 @@ def connect(self, acquisition_delay: float = 0.001) -> EpochsStream: return self def disconnect(self) -> EpochsStream: - """Stop acquisition of epochs from the connected Stream.""" + """Stop acquisition of epochs from the connected Stream. + + Returns + ------- + epochs : instance of :class:`~mne_lsl.stream.EpochsStream` + The epochs instance modified in-place. + """ if hasattr(self._stream, "_epochs") and self in self._stream._epochs: self._stream._epochs.remove(self) if (