From 2326fd9a138589f4e445ec355a8b46e6512306fa Mon Sep 17 00:00:00 2001 From: Mathieu Scheltienne Date: Wed, 2 Oct 2024 15:00:10 +0200 Subject: [PATCH] fix renaming --- doc/api/{triggers.rst => trigger.rst} | 0 stimuli/triggers/__init__.py | 4 - stimuli/triggers/_base.py | 34 --- stimuli/triggers/io/__init__.py | 38 ---- stimuli/triggers/io/_dlportio.py | 146 ------------- stimuli/triggers/io/_inpout.py | 116 ----------- stimuli/triggers/io/_linux.py | 107 ---------- stimuli/triggers/lsl.py | 116 ----------- stimuli/triggers/mock.py | 21 -- stimuli/triggers/parallel.py | 263 ------------------------ stimuli/triggers/tests/__init__.py | 0 stimuli/triggers/tests/test_lsl.py | 42 ---- stimuli/triggers/tests/test_mock.py | 22 -- stimuli/triggers/tests/test_parallel.py | 44 ---- 14 files changed, 953 deletions(-) rename doc/api/{triggers.rst => trigger.rst} (100%) delete mode 100644 stimuli/triggers/__init__.py delete mode 100644 stimuli/triggers/_base.py delete mode 100644 stimuli/triggers/io/__init__.py delete mode 100644 stimuli/triggers/io/_dlportio.py delete mode 100644 stimuli/triggers/io/_inpout.py delete mode 100644 stimuli/triggers/io/_linux.py delete mode 100644 stimuli/triggers/lsl.py delete mode 100644 stimuli/triggers/mock.py delete mode 100644 stimuli/triggers/parallel.py delete mode 100644 stimuli/triggers/tests/__init__.py delete mode 100644 stimuli/triggers/tests/test_lsl.py delete mode 100644 stimuli/triggers/tests/test_mock.py delete mode 100644 stimuli/triggers/tests/test_parallel.py diff --git a/doc/api/triggers.rst b/doc/api/trigger.rst similarity index 100% rename from doc/api/triggers.rst rename to doc/api/trigger.rst diff --git a/stimuli/triggers/__init__.py b/stimuli/triggers/__init__.py deleted file mode 100644 index 4ebeecf0..00000000 --- a/stimuli/triggers/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -from . import io, lsl, mock, parallel -from .lsl import LSLTrigger -from .mock import MockTrigger -from .parallel import ParallelPortTrigger diff --git a/stimuli/triggers/_base.py b/stimuli/triggers/_base.py deleted file mode 100644 index ceb24fe6..00000000 --- a/stimuli/triggers/_base.py +++ /dev/null @@ -1,34 +0,0 @@ -from __future__ import annotations - -from abc import ABC, abstractmethod - - -class BaseTrigger(ABC): - """Base trigger class.""" - - @abstractmethod - def __init__(self): # pragma: no cover - pass - - @abstractmethod - def signal(self, value: int) -> int: - """Send a trigger value. - - Parameters - ---------- - value : int - Value of the trigger, between 1 and 255. - """ - try: - value = int(value) - except TypeError: - raise TypeError( - "The argument 'value' of a trigger must be an integer " - "between 1 and 255 included." - ) - if not (1 <= value <= 255): - raise ValueError( - "The argument 'value' of a trigger must be an integer " - "between 1 and 255 included." - ) - return value diff --git a/stimuli/triggers/io/__init__.py b/stimuli/triggers/io/__init__.py deleted file mode 100644 index f369de04..00000000 --- a/stimuli/triggers/io/__init__.py +++ /dev/null @@ -1,38 +0,0 @@ -"""Read / write access to the parallel port for Linux or Windows. - -This code snippet is inspired from the parallel module of PsychoPy. -""" - -import sys - -# To make life easier, only try drivers which have a hope in heck of working. Because -# hasattr() in connection to windll ends up in an OSError trying to load 32bit drivers -# in a 64bit environment, different drivers defined in the dictionary 'drivers' are -# tested. - -if sys.platform.startswith("linux"): - from ._linux import PParallelLinux - - ParallelPort = PParallelLinux -elif sys.platform == "win32": - drivers = dict( - inpout32=("_inpout", "PParallelInpOut"), - inpoutx64=("_inpout", "PParallelInpOut"), - dlportio=("_dlportio", "PParallelDLPortIO"), - ) - from ctypes import windll - from importlib import import_module - - for key, val in drivers.items(): - driver_name, class_name = val - try: - hasattr(windll, key) - ParallelPort = getattr( - import_module("." + driver_name, __name__), class_name - ) - break - except (OSError, KeyError, NameError): - ParallelPort = None - continue -else: - ParallelPort = None diff --git a/stimuli/triggers/io/_dlportio.py b/stimuli/triggers/io/_dlportio.py deleted file mode 100644 index 9c7c95c5..00000000 --- a/stimuli/triggers/io/_dlportio.py +++ /dev/null @@ -1,146 +0,0 @@ -# This code is heavily based upon winioport.py -# Provides hardware port access for Python under Windows 95/98/NT/2000 -# -# Original Author: Dincer Aydin dinceraydin@gmx.net www.dinceraydin.com -# Merged directly into psychopy by: Mark Hymers -# All bugs are Mark's fault. -# -# This module depends on: -# ctypes Copyright (c) 2000, 2001, 2002, 2003 Thomas Heller -# DLPortIO Win32 DLL hardware I/O functions & Kernel mode driver for WinNT -# -# In this package you will find almost any sort of port IO function one may -# imagine. Values of port registers are srored in temporary variables. This is -# for the bit set/reset functions to work right Some register bits are -# inverted. on the port pins, but you need not worry about them. The functions -# in this module take this into account. For eaxample when you call -# winioport.pportDataStrobe(1) the data strobe pin of the printer port will go -# HIGH. -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files , to deal in the -# Software without restriction, including without limitation the rights to -# use, copy, modify, merge, publish,and distribute copies of the Software, -# and to permit persons to whom the Software is furnished to do so, subject -# to the following conditions: -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS -# IN THE SOFTWARE. - - -class PParallelDLPortIO: - """Class for read/write access to the parallel port on a PC. - - This is a wrapper around Dincer Aydin's `winioport`_ for reading and - writing to the parallel port, but adds the following additional - functions for convenience. - - On windows `winioport`_ requires the `PortIO driver`_ to be installed. - - An alternative on Linux might be to use PyParallel - An alternative on other versions of Windows might be to use inpout32. - - . _winioport: http://www.dinceraydin.com/python/indexeng.html - .. _PortIO driver: https://www.winford.com/support/download.php - """ - - def __init__(self, address=0x0378): - """Set the memory address of your parallel port. - - Common port addresses:: - - LPT1 = 0x0378 or 0x03BC - LPT2 = 0x0278 or 0x0378 - LPT3 = 0x0278 - """ - from ctypes import windll - - try: - # Load dlportio.dll functions - self.port = windll.dlportio - except Exception as e: - print("Could not import DLportIO driver, parallel Ports not available") - raise e - - if isinstance(address, str) and address.startswith("0x"): - # convert u"0x0378" into 0x0378 - self.base = int(address, 16) - else: - self.base = address - self.status = None - - def setData(self, data): - """Set the data to be presented on the parallel port (one ubyte). - - Alternatively you can set the value of each pin (data pins are pins - 2-9 inclusive) using :func:`setPin` - - Examples:: - - p.setData(0) # sets all pins low - p.setData(255) # sets all pins high - p.setData(2) # sets just pin 3 high (remember that pin2=bit0) - p.setData(3) # sets just pins 2 and 3 high - - You can also convert base 2 to int v easily in python:: - - p.setData(int("00000011", 2)) # pins 2 and 3 high - p.setData(int("00000101", 2)) # pins 2 and 4 high - - """ - self.port.DlPortWritePortUchar(self.base, data) - - def setPin(self, pinNumber, state): - """Set a desired pin to be high(1) or low(0). - - Only pins 2-9 (incl) are normally used for data output:: - - p.setPin(3, 1) # sets pin 3 high - p.setPin(3, 0) # sets pin 3 low - """ - # I can't see how to do this without reading and writing the data - # or caching the registers which seems like a very bad idea... - _uchar = self.port.DlPortReadPortUchar(self.base) - if state: - val = _uchar | 2 ** (pinNumber - 2) - else: - val = _uchar & (255 ^ 2 ** (pinNumber - 2)) - self.port.DlPortWritePortUchar(self.base, val) - - def readData(self): - """Return the value currently set on the data pins (2-9).""" - return self.port.DlPortReadPortUchar(self.base) - - def readPin(self, pinNumber): - """Determine whether a desired (input) pin is high(1) or low(0). - - Pins 2-13 and 15 are currently read here - """ - val = self.port.DlPortReadPortUchar(self.base + 1) - if pinNumber == 10: - # 10 = ACK - return (val >> 6) & 1 - elif pinNumber == 11: - # 11 = BUSY - return (val >> 7) & 1 - elif pinNumber == 12: - # 12 = PAPER-OUT - return (val >> 5) & 1 - elif pinNumber == 13: - # 13 = SELECT - return (val >> 4) & 1 - elif pinNumber == 15: - # 15 = ERROR - return (val >> 3) & 1 - elif 2 <= pinNumber <= 9: - val = self.port.DlPortReadPortUchar(self.base) - return (val >> (pinNumber - 2)) & 1 - else: - raise RuntimeError( - f"Pin {pinNumber} cannot be read (by PParallelDLPortIO.readPin())" - ) diff --git a/stimuli/triggers/io/_inpout.py b/stimuli/triggers/io/_inpout.py deleted file mode 100644 index ef2f341c..00000000 --- a/stimuli/triggers/io/_inpout.py +++ /dev/null @@ -1,116 +0,0 @@ -# We deliberately delay importing the inpout32 or inpoutx64 module until we try -# to use it - this allows us to import the class on machines -# which don't have it and then worry about dealing with -# using the right one later - - -class PParallelInpOut: - """Class for read/write access to the parallel port on a PC. - - Uses inpout32 or inpoutx64 (for instance for Windows 7 64-bit). - """ - - def __init__(self, address=0x0378): - """Set the memory address of your parallel port. - - Common port addresses:: - - LPT1 = 0x0378 or 0x03BC - LPT2 = 0x0278 or 0x0378 - LPT3 = 0x0278 - """ - import platform - from ctypes import windll - - from numpy import uint8 - - if isinstance(address, str) and address.startswith("0x"): - # convert u"0x0378" into 0x0378 - self.base = int(address, 16) - else: - self.base = address - - if platform.architecture()[0] == "32bit": - self.port = windll.inpout32 - elif platform.architecture()[0] == "64bit": - self.port = windll.inpoutx64 - - BYTEMODEMASK = uint8(1 << 5 | 1 << 6 | 1 << 7) - - # Put the port into Byte Mode (ECP register) - _inp = self.port.Inp32(self.base + 0x402) - self.port.Out32(self.base + 0x402, int((_inp & ~BYTEMODEMASK) | (1 << 5))) - - # Now to make sure the port is in output mode we need to make - # sure that bit 5 of the control register is not set - _inp = self.port.Inp32(self.base + 2) - self.port.Out32(self.base + 2, int(_inp & ~uint8(1 << 5))) - self.status = None - - def setData(self, data): - """Set the data to be presented on the parallel port (one ubyte). - - Alternatively you can set the value of each pin (data pins are pins - 2-9 inclusive) using :func:`setPin` - - Examples:: - - p.setData(0) # sets all pins low - p.setData(255) # sets all pins high - p.setData(2) # sets just pin 3 high (remember that pin2=bit0) - p.setData(3) # sets just pins 2 and 3 high - - You can easily convert base 2 to int in python:: - - p.setData(int("00000011", 2)) # pins 2 and 3 high - p.setData(int("00000101", 2)) # pins 2 and 4 high - """ - self.port.Out32(self.base, data) - - def setPin(self, pinNumber, state): - """Set a desired pin to be high(1) or low(0). - - Only pins 2-9 (incl) are normally used for data output:: - - parallel.setPin(3, 1) # sets pin 3 high - parallel.setPin(3, 0) # sets pin 3 low - """ - # I can't see how to do this without reading and writing the data - _inp = self.port.Inp32(self.base) - if state: - val = _inp | 2 ** (pinNumber - 2) - else: - val = _inp & (255 ^ 2 ** (pinNumber - 2)) - self.port.Out32(self.base, val) - - def readData(self): - """Return the value currently set on the data pins (2-9).""" - return self.port.Inp32(self.base) - - def readPin(self, pinNumber): - """Determine whether a desired (input) pin is high(1) or low(0). - - Pins 2-13 and 15 are currently read here - """ - _base = self.port.Inp32(self.base + 1) - if pinNumber == 10: - # 10 = ACK - return (_base >> 6) & 1 - elif pinNumber == 11: - # 11 = BUSY - return (_base >> 7) & 1 - elif pinNumber == 12: - # 12 = PAPER-OUT - return (_base >> 5) & 1 - elif pinNumber == 13: - # 13 = SELECT - return (_base >> 4) & 1 - elif pinNumber == 15: - # 15 = ERROR - return (_base >> 3) & 1 - elif 2 <= pinNumber <= 9: - return (self.port.Inp32(self.base) >> (pinNumber - 2)) & 1 - else: - raise RuntimeError( - f"Pin {pinNumber} cannot be read (by PParallelInpOut32.readPin())" - ) diff --git a/stimuli/triggers/io/_linux.py b/stimuli/triggers/io/_linux.py deleted file mode 100644 index fe02a8f0..00000000 --- a/stimuli/triggers/io/_linux.py +++ /dev/null @@ -1,107 +0,0 @@ -# We deliberately delay importing the pyparallel module until we try -# to use it - this allows us to import the class on machines -# which don't have it and then worry about dealing with -# using the right one later - -# This is necessary to stop the local parallel.py masking the module -# we actually want to find! - -# We duck-type the parallel port objects - - -class PParallelLinux: - """Clsas for read/write access to the parallel port on a Linux. - - Uses pyparallel. - - Note that you must have the lp module removed and the ppdev module loaded - to use this code:: - - sudo rmmod lp - sudo modprobe ppdev - """ - - def __init__(self, address="/dev/parport0"): - """Set the device node of your parallel port. - - Common port addresses:: - - LPT1 = /dev/parport0 - LPT2 = /dev/parport1 - LPT3 = /dev/parport2 - """ - import parallel as pyp - - try: - self.port = pyp.Parallel(address) - except FileNotFoundError: - raise RuntimeError( - f"Could not access parallel port on '{address}'. " - "No such file or directory." - ) - self.status = None - - def __del__(self): - """Delete and free the port.""" - if hasattr(self, "port"): - del self.port - - def setData(self, data): - """Set the data to be presented on the parallel port (one ubyte). - - Alternatively you can set the value of each pin (data pins are pins - 2-9 inclusive) using :func:`~psychopy.parallel.setPin` - - Examples:: - - p.setData(0) # sets all pins low - p.setData(255) # sets all pins high - p.setData(2) # sets just pin 3 high (remember that pin2=bit0) - p.setData(3) # sets just pins 2 and 3 high - - You can also convert base 2 to int easily in python:: - - parallel.setData(int("00000011", 2)) # pins 2 and 3 high - parallel.setData(int("00000101", 2)) # pins 2 and 4 high - """ - self.port.setData(data) - - def setPin(self, pinNumber, state): - """Set a desired pin to be high(1) or low(0). - - Only pins 2-9 (incl) are normally used for data output:: - - p.setPin(3, 1) # sets pin 3 high - p.setPin(3, 0) # sets pin 3 low - """ - # I can't see how to do this without reading and writing the data - if state: - self.port.setData(self.port.PPRDATA() | (2 ** (pinNumber - 2))) - else: - self.port.setData(self.port.PPRDATA() & (255 ^ 2 ** (pinNumber - 2))) - - def readData(self): - """Return the value currently set on the data pins (2-9).""" - return self.port.PPRDATA() - - def readPin(self, pinNumber): - """Determine whether a desired (input) pin is high(1) or low(0). - - Pins 2-13 and 15 are currently read here - """ - if pinNumber == 10: - return self.port.getInAcknowledge() - elif pinNumber == 11: - return self.port.getInBusy() - elif pinNumber == 12: - return self.port.getInPaperOut() - elif pinNumber == 13: - return self.port.getInSelected() - elif pinNumber == 15: - return self.port.getInError() - elif 2 <= pinNumber <= 9: - return (self.port.PPRDATA() >> (pinNumber - 2)) & 1 - else: - raise RuntimeError( - f"Pin {pinNumber} cannot be read (by PParallelLinux.readPin())" - ) diff --git a/stimuli/triggers/lsl.py b/stimuli/triggers/lsl.py deleted file mode 100644 index f521e9c3..00000000 --- a/stimuli/triggers/lsl.py +++ /dev/null @@ -1,116 +0,0 @@ -from __future__ import annotations - -from typing import TYPE_CHECKING - -import numpy as np - -from ..utils._checks import check_type -from ..utils._docs import copy_doc, fill_doc -from ..utils._imports import import_optional_dependency -from ._base import BaseTrigger - -if TYPE_CHECKING: - from mne_lsl.lsl import StreamInfo, StreamOutlet - - -@fill_doc -class LSLTrigger(BaseTrigger): - """Trigger sending values on an LSL outlet. - - Make sure you are recording the stream created by the - :class:`~stimuli.trigger.LSLTrigger` alongside your data. e.g. if you use - LabRecorder, update the stream list after creating the - :class:`~stimuli.trigger.LSLTrigger`. - - .. warning:: - - Make sure to close the :class:`~mne_lsl.lsl.StreamOutlet` by calling the - :meth:`~stimuli.trigger.LSLTrigger.close` method or by deleting the trigger - after use. - - Parameters - ---------- - name : str - Name of the trigger displayed on the LSL network. - - Notes - ----- - The :class:`~mne_lsl.lsl.StreamOutlet` created has the following properties: - - * Name: ``f"{name}"`` - * Type: ``"Markers"`` - * Number of channels: 1 - * Sampling rate: Irregular - * Data type: ``np.int8`` - * Source ID: ``f"HNP-{name}"`` - - The values sent must be in the range of strictly positive integers defined - by ``np.int8``, 1 to 127 included. - """ - - def __init__(self, name: str) -> None: - import_optional_dependency("mne_lsl") - - from mne_lsl.lsl import StreamInfo, StreamOutlet - - check_type(name, (str,), "name") - self._name = name - # create outlet - self._sinfo = StreamInfo( - name=name, - stype="Markers", - n_channels=1, - sfreq=0.0, - dtype="int8", - source_id=f"HNP-{name}", - ) - self._sinfo.set_channel_names(["STI"]) - self._sinfo.set_channel_types(["stim"]) - self._sinfo.set_channel_units(["none"]) - self._outlet = StreamOutlet(self._sinfo, max_buffered=1) - - @copy_doc(BaseTrigger.signal) - def signal(self, value: int) -> None: - value = super().signal(value) - if not (1 <= value <= 127): - raise ValueError( - "The argument 'value' of an LSL trigger must be an integer " - "between 1 and 127 included." - ) - self._outlet.push_sample(np.array([value], dtype=np.int8)) - - def close(self) -> None: - """Close the LSL outlet.""" - if hasattr(self, "_outlet"): - try: - del self._outlet - except Exception: # pragma: no cover - pass - - def __del__(self) -> None: # noqa: D105 - self.close() - - # -------------------------------------------------------------------- - @property - def name(self) -> str: - """Name of the trigger displayed on the LSL network. - - :type: str - """ - return self._name - - @property - def sinfo(self) -> StreamInfo: - """Description of the trigger outlet. - - :type: `~mne_lsl.lsl.StreamInfo` - """ - return self._sinfo - - @property - def outlet(self) -> StreamOutlet: - """Trigger outlet. - - :type: `~mne_lsl.lsl.StreamOutlet` - """ - return self._outlet diff --git a/stimuli/triggers/mock.py b/stimuli/triggers/mock.py deleted file mode 100644 index cd58d1ab..00000000 --- a/stimuli/triggers/mock.py +++ /dev/null @@ -1,21 +0,0 @@ -from __future__ import annotations - -from ..utils._docs import copy_doc -from ..utils.logs import _use_log_level, logger -from ._base import BaseTrigger - - -class MockTrigger(BaseTrigger): - """Mock trigger class. - - Delivered triggers are logged at the 'INFO' level. - """ - - def __init__(self) -> None: - pass - - @copy_doc(BaseTrigger.signal) - def signal(self, value: int) -> None: - value = super().signal(value) - with _use_log_level("INFO"): - logger.info("Mock set to %i.", value) diff --git a/stimuli/triggers/parallel.py b/stimuli/triggers/parallel.py deleted file mode 100644 index c53df869..00000000 --- a/stimuli/triggers/parallel.py +++ /dev/null @@ -1,263 +0,0 @@ -from __future__ import annotations - -import time -from concurrent.futures import ThreadPoolExecutor -from platform import system - -from ..utils._checks import check_type, check_value, ensure_int -from ..utils._docs import copy_doc -from ..utils._imports import import_optional_dependency -from ..utils.logs import logger -from ._base import BaseTrigger - - -class ParallelPortTrigger(BaseTrigger): - """Trigger using a parallel port (also called LPT port). - - Parameters - ---------- - address : int (hex) | str - The address of the parallel port on the system. - If an :ref:`api/trigger:Arduino to Parallel Port converter` is - used, the address must be the serial port address or ``"arduino"`` for automatic - detection. - port_type : str | None - Either ``'arduino'`` or ``'pport'`` depending on the connection. - If None, attempts to infers the type of port from the address. - delay : int - Delay in milliseconds until which a new trigger cannot be sent. During - this time, the pins of the LPT port remain in the same state. - - Notes - ----- - The address is specific to the system. Typical parallel port addresses are: - - - On Linux:: - - LPT1 = /dev/parport0 - LPT2 = /dev/parport1 - LPT3 = /dev/parport2 - - - On Windows:: - - LPT1 = 0x0378 or 0x03BC - LPT2 = 0x0278 or 0x0378 - LPT3 = 0x0278 - - - macOS does not have support for built-in parallel ports. - """ - - def __init__( - self, address: int | str, port_type: str | None = None, delay: int = 10 - ) -> None: - check_type(address, ("int-like", str), "address") - if not isinstance(address, str): - address = ensure_int(address) - delay = ensure_int(delay, "delay") - self._delay = delay / 1000.0 - if port_type is None: - self._port_type = ParallelPortTrigger._infer_port_type(address) - else: - check_type(port_type, (str,), "port_type") - check_value(port_type, ("arduino", "pport"), "port_type") - self._port_type = port_type - - # initialize port - if self._port_type == "arduino": - import_optional_dependency( - "serial", extra="Install 'pyserial' for ARDUINO support." - ) - if address == "arduino": - self._address = ParallelPortTrigger._search_arduino() - else: - self._address = address - self._connect_arduino() - - elif self._port_type == "pport": - if system() == "Linux": - import_optional_dependency( - "parallel", - extra="Install 'pyparallel' for LPT support on Linux.", - ) - self._address = address - self._connect_pport() - # set pins to 0 and prepare threadpool for resets - self._set_data(0) - self._executor = ThreadPoolExecutor(max_workers=1) - self._future = None - - @staticmethod - def _infer_port_type(address: int | str) -> str: - """Infer the type of port from the address.""" - if system() == "Linux": - if not isinstance(address, str): - raise TypeError( - "On Linux, a parallel port address must be provided as a string." - ) - if address.startswith("/dev/parport"): - return "pport" - elif address.startswith("/dev/ttyACM") or address == "arduino": - return "arduino" - else: - raise RuntimeError( - f"Could not infer the port type from the address '{address}'. " - "Please provide the 'port_type' argument when creating the " - "ParallelPortTrigger object." - ) - elif system() == "Darwin": - if address == "arduino": - return "arduino" - else: - raise RuntimeError( - "macOS does not support on-board parallel ports. Only arduino " - "converters are supported with address='arduino'." - ) - elif system() == "Windows": - if isinstance(address, int): - return "pport" - elif address.startswith("COM") or address == "arduino": - return "arduino" - else: - raise RuntimeError( - f"Could not infer the port type from the address '{address}'. " - "Please provide the 'port_type' argument when creating the " - "ParallelPortTrigger object." - ) - - @staticmethod - def _search_arduino() -> str: - """Look for a connected Arduino to LPT converter.""" - from serial.tools import list_ports - - for arduino in list_ports.grep(regexp="Arduino"): - logger.info("Found arduino to LPT on '%s'.", arduino) - return arduino.device - else: - raise OSError("No arduino card was found.") - - def _connect_arduino(self, baud_rate: int = 115200) -> None: - """Connect to an Arduino to LPT converter.""" - from serial import Serial, SerialException - - try: - self._port = Serial(self._address, baud_rate) - except SerialException: - msg = f"Could not access arduino to LPT on '{self._address}'." - if system() == "Linux": - msg += ( - " Make sure you have the permission to access this " - "address, e.g. by adding your user account to the " - "'dialout' group: 'sudo usermod -a -G dialout '." - ) - raise SerialException(msg) - - time.sleep(1) - logger.info("Connected to arduino to LPT on '%s'.", self._address) - - def _connect_pport(self) -> None: - """Connect to the ParallelPort.""" - from .io import ParallelPort - - if ParallelPort is None and system() == "Darwin": - raise RuntimeError( - "macOS does not support built-in parallel port. " - "Please use an arduino to LPT converter for hardware triggers " - "or stimuli.trigger.LSLTrigger for software triggers." - ) - elif ParallelPort is None and system() != "Linux": - raise RuntimeError( - "Windows supports built-in parallel port via " - "inpout32, inpout64 or dlportio. Neither of this driver was found." - ) - - try: - self._port = ParallelPort(self._address) - except Exception: - msg = f"Could not access the parallel port on '{self._address}'." - if system() == "Linux": - msg += ( - " Make sure you have the permission to access this " - "address, e.g. by adding your user account to the 'lp' " - "group: 'sudo usermod -a -G lp '. Make sure the " - "'lp' module is removed and the 'ppdev' module is loaded: " - "'sudo rmmod lp' & 'sudo modprobe ppdev'. You can " - "configure the module loaded by default in '/etc/modprobe.d/'." - ) - raise RuntimeError(msg) - - time.sleep(1) - logger.info("Connected to parallel port on '%s'.", self._address) - - @copy_doc(BaseTrigger.signal) - def signal(self, value: int) -> None: - value = super().signal(value) - if self._future is not None and not self._future.done(): - logger.warning( - "You are sending a new signal before the end of the " - "last signal. Signal ignored. Delay required = %.1f ms.", - self.delay, - ) - self._set_data(value) - self._future = self._executor.submit(self._signal_off()) - - def _signal_off(self) -> None: - """Reset trigger signal to 0.""" - time.sleep(self._delay) - self._set_data(0) - - def _set_data(self, value: int) -> None: - """Set data on the pin.""" - if self._port_type == "arduino": - self._port.write(bytes([value])) - else: - self._port.setData(value) - - def close(self) -> None: - """Disconnect the parallel port. - - This method should free the parallel or serial port and let other application or - python process use it. - """ - if hasattr(self, "_executor"): - self._executor.shutdown(wait=True, cancel_futures=False) - if ( - hasattr(self, "_port_type") - and self._port_type == "arduino" - and hasattr(self, "_port") - ): - try: - self._port.close() - except Exception: - pass - try: - del self._port - except Exception: - pass - - def __del__(self) -> None: # noqa: D105 - self.close() - - # -------------------------------------------------------------------- - @property - def address(self) -> int | str: - """The address of the parallel port on the system. - - :type: int | str - """ - return self._address - - @property - def delay(self) -> float: - """Delay (ms) to wait between two :meth:`~ParallelPortTrigger.signal`. - - :type: float - """ - return self._delay * 1000.0 - - @property - def port_type(self) -> str: - """Type of connection port. - - :type: str - """ - return self._port_type diff --git a/stimuli/triggers/tests/__init__.py b/stimuli/triggers/tests/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/stimuli/triggers/tests/test_lsl.py b/stimuli/triggers/tests/test_lsl.py deleted file mode 100644 index be9de20b..00000000 --- a/stimuli/triggers/tests/test_lsl.py +++ /dev/null @@ -1,42 +0,0 @@ -import numpy as np -import pytest - - -def test_trigger_lsl(): - """Testing for LSL trigger.""" - pytest.importorskip("mne_lsl") - - from mne_lsl.lsl import StreamInfo, StreamInlet, StreamOutlet, resolve_streams - - from stimuli.trigger import LSLTrigger - - name = "test-trigger-lsl" - trigger = LSLTrigger(name) - assert trigger.name == name - streams = resolve_streams(name=name) - assert len(streams) == 1 - sinfo = streams[0] - del streams - inlet = StreamInlet(sinfo) - inlet.open_stream() - assert inlet.samples_available == 0 - sinfo = inlet.get_sinfo() - assert sinfo.get_channel_names() == ["STI"] - assert sinfo.get_channel_types() == ["stim"] - assert sinfo.get_channel_units() == ["none"] - trigger.signal(1) - data, ts = inlet.pull_sample(timeout=10) - assert data.dtype == np.int8 - assert data.size == 1 - assert data[0] == 1 - assert ts is not None - trigger.signal(127) - data, ts = inlet.pull_sample(timeout=10) - assert data.dtype == np.int8 - assert data.size == 1 - assert data[0] == 127 - assert ts is not None - with pytest.raises(ValueError, match="between 1 and 127 included"): - trigger.signal(255) - assert isinstance(trigger.outlet, StreamOutlet) - assert isinstance(trigger.sinfo, StreamInfo) diff --git a/stimuli/triggers/tests/test_mock.py b/stimuli/triggers/tests/test_mock.py deleted file mode 100644 index ce708743..00000000 --- a/stimuli/triggers/tests/test_mock.py +++ /dev/null @@ -1,22 +0,0 @@ -import pytest - -from stimuli.trigger import MockTrigger - - -def test_trigger_mock(caplog): - """Testing for Mock trigger.""" - trigger = MockTrigger() - trigger.signal(1) - assert "Mock set to 1" in caplog.text - caplog.clear() - trigger.signal(2) - assert "Mock set to 2" in caplog.text - caplog.clear() - trigger.signal(str(101)) # convertible to int - assert "Mock set to 101" in caplog.text - caplog.clear() - with pytest.raises(TypeError, match="between 1 and 255"): - trigger.signal(lambda x: 101) - with pytest.raises(ValueError, match="between 1 and 255"): - trigger.signal(256) - assert "Mock set" not in caplog.text diff --git a/stimuli/triggers/tests/test_parallel.py b/stimuli/triggers/tests/test_parallel.py deleted file mode 100644 index 80a475d6..00000000 --- a/stimuli/triggers/tests/test_parallel.py +++ /dev/null @@ -1,44 +0,0 @@ -from platform import system - -import pytest - -from stimuli.trigger import ParallelPortTrigger - - -@pytest.mark.skipif(system() != "Linux", reason="requires Linux") -def test_infer_port_type_linux(): - """Test port type inference patterns on linux.""" - assert ParallelPortTrigger._infer_port_type("arduino") == "arduino" - assert ParallelPortTrigger._infer_port_type("/dev/parport0") == "pport" - assert ParallelPortTrigger._infer_port_type("/dev/parport1") == "pport" - assert ParallelPortTrigger._infer_port_type("/dev/ttyACM0") == "arduino" - assert ParallelPortTrigger._infer_port_type("/dev/ttyACM1") == "arduino" - with pytest.raises(RuntimeError, match="provide the 'port_type' argument"): - ParallelPortTrigger._infer_port_type("101") - with pytest.raises(TypeError, match="provided as a string"): - ParallelPortTrigger._infer_port_type(0x4FB8) - - -@pytest.mark.skipif(system() != "Windows", reason="requires Windows") -def test_infer_port_type_windows(): - """Test port type inference patterns on Windows.""" - assert ParallelPortTrigger._infer_port_type("arduino") == "arduino" - assert ParallelPortTrigger._infer_port_type("COM7") == "arduino" - assert ParallelPortTrigger._infer_port_type("COM8") == "arduino" - assert ParallelPortTrigger._infer_port_type(0x4FB8) == "pport" - - -@pytest.mark.skipif(system() != "Darwin", reason="requires macOS") -def test_infer_port_type_macos(): - """Test port type inference patterns on Windows.""" - assert ParallelPortTrigger._infer_port_type("arduino") == "arduino" - with pytest.raises(RuntimeError, match="macOS does not support"): - ParallelPortTrigger._infer_port_type("/dev/parport0") - with pytest.raises(RuntimeError, match="macOS does not support"): - ParallelPortTrigger._infer_port_type(0x4FB8) - - -def test_search_arduino(): - """Test arduino detection.""" - with pytest.raises(IOError, match="No arduino card was found."): - ParallelPortTrigger._search_arduino()