From 7c6bc781da1eb6bb7450c20b45f77210337b5f68 Mon Sep 17 00:00:00 2001 From: KB Sriram Date: Thu, 29 Feb 2024 20:20:23 -0800 Subject: [PATCH] Move SPI bit writes to the right clock phase. - Updated all SPI methods that write bits, so they clock the data on the correct clock transition. - Added tests with a small logic net simulation to verify bits are read and written correctly. --- adafruit_bitbangio.py | 106 ++++++----- tests/README.rst | 53 ++++++ tests/simulated_spi.py | 67 +++++++ tests/simulator.py | 253 +++++++++++++++++++++++++++ tests/test_adafruit_bitbangio_spi.py | 211 ++++++++++++++++++++++ 5 files changed, 648 insertions(+), 42 deletions(-) create mode 100644 tests/README.rst create mode 100644 tests/simulated_spi.py create mode 100644 tests/simulator.py create mode 100644 tests/test_adafruit_bitbangio_spi.py diff --git a/adafruit_bitbangio.py b/adafruit_bitbangio.py index 95c5d38..d5a30c2 100644 --- a/adafruit_bitbangio.py +++ b/adafruit_bitbangio.py @@ -323,9 +323,6 @@ def __init__( self._mosi = None self._miso = None - self.configure() - self.unlock() - # Set pins as outputs/inputs. self._sclk = DigitalInOut(clock) self._sclk.switch_to_output() @@ -338,6 +335,9 @@ def __init__( self._miso = DigitalInOut(MISO) self._miso.switch_to_input() + self.configure() + self.unlock() + def deinit(self) -> None: """Free any hardware used by the object.""" self._sclk.deinit() @@ -372,12 +372,30 @@ def configure( self._bits = bits self._half_period = (1 / self._baudrate) / 2 # 50% Duty Cyle delay + # Initialize the clock to the idle state. This is important to + # guarantee that the clock is at a known (idle) state before + # any read/write operations. + self._sclk.value = self._polarity + def _wait(self, start: Optional[int] = None) -> float: """Wait for up to one half cycle""" while (start + self._half_period) > monotonic(): pass return monotonic() # Return current time + def _should_write(self, to_active: Literal[0, 1]) -> bool: + """Return true if a bit should be written on the given clock transition.""" + # phase 0: write when active is 0 + # phase 1: write when active is 1 + return self._phase == to_active + + def _should_read(self, to_active: Literal[0, 1]) -> bool: + """Return true if a bit should be read on the given clock transition.""" + # phase 0: read when active is 1 + # phase 1: read when active is 0 + # Data is read on the idle->active transition only when the phase is 1 + return self._phase == 1 - to_active + def write( self, buffer: ReadableBuffer, start: int = 0, end: Optional[int] = None ) -> None: @@ -392,24 +410,26 @@ def write( if self._check_lock(): start_time = monotonic() + # Note: when we come here, our clock must always be its idle state. for byte in buffer[start:end]: for bit_position in range(self._bits): bit_value = byte & 0x80 >> bit_position - # Set clock to base - if not self._phase: # Mode 0, 2 + # clock: idle, or has made an active->idle transition. + if self._should_write(to_active=0): self._mosi.value = bit_value - self._sclk.value = not self._polarity + # clock: wait in idle for half a period start_time = self._wait(start_time) - - # Flip clock off base - if self._phase: # Mode 1, 3 + # clock: idle->active + self._sclk.value = not self._polarity + if self._should_write(to_active=1): self._mosi.value = bit_value - self._sclk.value = self._polarity + # clock: wait in active for half a period start_time = self._wait(start_time) - - # Return pins to base positions - self._mosi.value = 0 - self._sclk.value = self._polarity + # clock: active->idle + self._sclk.value = self._polarity + # clock: stay in idle for the last active->idle transition + # to settle. + start_time = self._wait(start_time) # pylint: disable=too-many-branches def readinto( @@ -433,26 +453,29 @@ def readinto( for bit_position in range(self._bits): bit_mask = 0x80 >> bit_position bit_value = write_value & 0x80 >> bit_position - # Return clock to base - self._sclk.value = self._polarity - start_time = self._wait(start_time) - # Handle read on leading edge of clock. - if not self._phase: # Mode 0, 2 + # clock: idle, or has made an active->idle transition. + if self._should_write(to_active=0): if self._mosi is not None: self._mosi.value = bit_value + # clock: wait half a period. + start_time = self._wait(start_time) + # clock: idle->active + self._sclk.value = not self._polarity + if self._should_read(to_active=1): if self._miso.value: # Set bit to 1 at appropriate location. buffer[byte_position] |= bit_mask else: # Set bit to 0 at appropriate location. buffer[byte_position] &= ~bit_mask - # Flip clock off base - self._sclk.value = not self._polarity - start_time = self._wait(start_time) - # Handle read on trailing edge of clock. - if self._phase: # Mode 1, 3 + if self._should_write(to_active=1): if self._mosi is not None: self._mosi.value = bit_value + # clock: wait half a period + start_time = self._wait(start_time) + # Clock: active->idle + self._sclk.value = self._polarity + if self._should_read(to_active=0): if self._miso.value: # Set bit to 1 at appropriate location. buffer[byte_position] |= bit_mask @@ -460,9 +483,8 @@ def readinto( # Set bit to 0 at appropriate location. buffer[byte_position] &= ~bit_mask - # Return pins to base positions - self._mosi.value = 0 - self._sclk.value = self._polarity + # clock: wait another half period for the last transition. + start_time = self._wait(start_time) def write_readinto( self, @@ -499,34 +521,34 @@ def write_readinto( buffer_out[byte_position + out_start] & 0x80 >> bit_position ) in_byte_position = byte_position + in_start - # Return clock to 0 - self._sclk.value = self._polarity - start_time = self._wait(start_time) - # Handle read on leading edge of clock. - if not self._phase: # Mode 0, 2 + # clock: idle, or has made an active->idle transition. + if self._should_write(to_active=0): self._mosi.value = bit_value + # clock: wait half a period. + start_time = self._wait(start_time) + # clock: idle->active + self._sclk.value = not self._polarity + if self._should_read(to_active=1): if self._miso.value: # Set bit to 1 at appropriate location. buffer_in[in_byte_position] |= bit_mask else: - # Set bit to 0 at appropriate location. buffer_in[in_byte_position] &= ~bit_mask - # Flip clock off base - self._sclk.value = not self._polarity - start_time = self._wait(start_time) - # Handle read on trailing edge of clock. - if self._phase: # Mode 1, 3 + if self._should_write(to_active=1): self._mosi.value = bit_value + # clock: wait half a period + start_time = self._wait(start_time) + # Clock: active->idle + self._sclk.value = self._polarity + if self._should_read(to_active=0): if self._miso.value: # Set bit to 1 at appropriate location. buffer_in[in_byte_position] |= bit_mask else: - # Set bit to 0 at appropriate location. buffer_in[in_byte_position] &= ~bit_mask - # Return pins to base positions - self._mosi.value = 0 - self._sclk.value = self._polarity + # clock: wait another half period for the last transition. + start_time = self._wait(start_time) # pylint: enable=too-many-branches diff --git a/tests/README.rst b/tests/README.rst new file mode 100644 index 0000000..aa2311e --- /dev/null +++ b/tests/README.rst @@ -0,0 +1,53 @@ +.. + SPDX-FileCopyrightText: KB Sriram + SPDX-License-Identifier: MIT +.. + +Bitbangio Tests +=============== + +These tests run under CPython, and are intended to verify that the +library passes some sanity checks, using a lightweight simulator as +the target device. + +These tests run automatically from the standard `circuitpython github +workflow `_. To run them manually, first install these packages +if necessary:: + + $ pip3 install pytest + +Then ensure you're in the *root* directory of the repository and run +the following command:: + + $ python -m pytest + +Notes on the simulator +====================== + +`simulator.py` implements a small logic level simulator and a few test +doubles so the library can run under CPython. + +The `Engine` class is used as a singleton in the module to co-ordinate +the simulation. + +A `Net` holds a list of `FakePins` that are connected together. It +also resolves the overall logic level of the net when a `FakePin` is +updated. It can optionally hold a history of logic level changes, +which may be useful for testing some timing expectations, or export +them as a VCD file for `Pulseview `_. Test code can also register +listeners on a `Net` when the net's level changes, so it can simulate +device behavior. + +A `FakePin` is a test double for the CircuitPython `Pin` class, and +implements all the functionality so it behaves appropriately in +CPython. + +A simulated device can create a `FakePin` for each of its terminals, +and connect them to one or more `Net` instances. It can listen for +level changes on the `Net`, and bitbang the `FakePin` to simulate +behavior. `simulated_spi_device.py` implements a peripheral device +that writes a constant value onto an SPI bus. + + +.. _wf: https://github.com/adafruit/workflows-circuitpython-libs/blob/6e1562eaabced4db1bd91173b698b1cc1dfd35ab/build/action.yml#L78-L84 +.. _pv: https://sigrok.org/wiki/PulseView diff --git a/tests/simulated_spi.py b/tests/simulated_spi.py new file mode 100644 index 0000000..8e8ad0b --- /dev/null +++ b/tests/simulated_spi.py @@ -0,0 +1,67 @@ +# SPDX-FileCopyrightText: KB Sriram +# SPDX-License-Identifier: MIT +"""Implementation of testable SPI devices.""" + +import dataclasses +import simulator as sim + + +@dataclasses.dataclass(frozen=True) +class SpiBus: + enable: sim.Net + clock: sim.Net + copi: sim.Net + cipo: sim.Net + + +class Constant: + """Device that always writes a constant.""" + + def __init__(self, data: bytearray, bus: SpiBus, polarity: int, phase: int) -> None: + # convert to binary string array of bits for convenience + datalen = 8 * len(data) + self._data = f"{int.from_bytes(data, 'big'):0{datalen}b}" + self._bit_position = 0 + self._clock = sim.FakePin("const_clock_pin", bus.clock) + self._last_clock_level = bus.clock.level + self._cipo = sim.FakePin("const_cipo_pin", bus.cipo) + self._enable = sim.FakePin("const_enable_pin", bus.enable) + self._cipo.init(sim.Mode.OUT) + self._phase = phase + self._polarity = sim.Level.HIGH if polarity else sim.Level.LOW + self._enabled = False + bus.clock.on_level_change(self._on_level_change) + bus.enable.on_level_change(self._on_level_change) + + def write_bit(self) -> None: + """Writes the next bit to the cipo net.""" + if self._bit_position >= len(self._data): + # Just write a zero + self._cipo.value(0) # pylint: disable=not-callable + return + self._cipo.value( + int(self._data[self._bit_position]) # pylint: disable=not-callable + ) + self._bit_position += 1 + + def _on_level_change(self, net: sim.Net) -> None: + if net == self._enable.net: + # Assumes enable is active high. + self._enabled = net.level == sim.Level.HIGH + if self._enabled: + self._bit_position = 0 + if self._phase == 0: + # Write on enable or idle->active + self.write_bit() + return + if not self._enabled: + return + if net != self._clock.net: + return + cur_clock_level = net.level + if cur_clock_level == self._last_clock_level: + return + active = 0 if cur_clock_level == self._polarity else 1 + if self._phase == active: + self.write_bit() + self._last_clock_level = cur_clock_level diff --git a/tests/simulator.py b/tests/simulator.py new file mode 100644 index 0000000..2837a3a --- /dev/null +++ b/tests/simulator.py @@ -0,0 +1,253 @@ +# SPDX-FileCopyrightText: KB Sriram +# SPDX-License-Identifier: MIT +"""Simple logic level simulator to test I2C/SPI interactions.""" + +from typing import Any, Callable, List, Literal, Optional, Sequence +import dataclasses +import enum +import functools +import time + +import digitalio + + +@enum.unique +class Mode(enum.Enum): + IN = "IN" + OUT = "OUT" + + +@enum.unique +class Level(enum.Enum): + Z = "Z" + LOW = "LOW" + HIGH = "HIGH" + + +@enum.unique +class Pull(enum.Enum): + NONE = "NONE" + UP = "UP" + DOWN = "DOWN" + + +def _level_to_vcd(level: Level) -> str: + """Converts a level to a VCD understandable mnemonic.""" + if level == Level.Z: + return "Z" + if level == Level.HIGH: + return "1" + return "0" + + +@dataclasses.dataclass(frozen=True) +class Change: + """Container to record simulation events.""" + + net_name: str + time_us: int + level: Level + + +class Engine: + """Manages the overall simulation state and clock.""" + + def __init__(self) -> None: + self._start_us = int(time.monotonic() * 1e6) + self._nets: List["Net"] = [] + + def reset(self) -> None: + """Clears out all existing state and resets the simulation.""" + self._start_us = int(time.monotonic() * 1e6) + self._nets = [] + + def _find_net_by_pin_id(self, pin_id: str) -> Optional["Net"]: + """Returns a net (if any) that has a pin with the given id.""" + for net in self._nets: + if net.contains_pin_id(pin_id): + return net + return None + + def create_net( + self, net_id: str, default_level: Level = Level.Z, monitor: bool = False + ) -> "Net": + """Creates a new net with the given name. Monitored nets are also traced.""" + net = Net(net_id, default_level=default_level, monitor=monitor) + self._nets.append(net) + return net + + def change_history(self) -> Sequence[Change]: + """Returns an ordered history of all events in monitored nets.""" + monitored_nets = [net for net in self._nets if net.history] + combined: List[Change] = [] + for net in monitored_nets: + if net.history: + for time_us, level in net.history: + combined.append(Change(net.name, time_us, level)) + combined.sort(key=lambda v: v.time_us) + return combined + + def write_vcd(self, path: str) -> None: + """Writes monitored nets to the provided path as a VCD file.""" + with open(path, "wt") as vcdfile: + vcdfile.write("$version pytest output $end\n") + vcdfile.write("$timescale 1 us $end\n") + vcdfile.write("$scope module top $end\n") + monitored_nets = [net for net in self._nets if net.history] + for net in monitored_nets: + vcdfile.write(f"$var wire 1 {net.name} {net.name} $end\n") + vcdfile.write("$upscope $end\n") + vcdfile.write("$enddefinitions $end\n") + combined = self.change_history() + # History starts when the engine is first reset or initialized. + vcdfile.write(f"#{self._start_us}\n") + last_us = self._start_us + for change in combined: + if change.time_us != last_us: + vcdfile.write(f"#{change.time_us}\n") + last_us = change.time_us + vcdfile.write(f"{_level_to_vcd(change.level)}{change.net_name}\n") + + +# module global/singleton +engine = Engine() + + +class FakePin: + """Test double for a microcontroller pin used in tests.""" + + IN = Mode.IN # pylint: disable=invalid-name + OUT = Mode.OUT + PULL_NONE = Pull.NONE + PULL_UP = Pull.UP + PULL_DOWN = Pull.DOWN + + def __init__(self, pin_id: str, net: Optional["Net"] = None): + self.id = pin_id # pylint: disable=invalid-name + self.mode: Optional[Mode] = None + self.pull: Optional[Pull] = None + self.level: Level = Level.Z + if net: + # Created directly by the test. + if engine._find_net_by_pin_id(pin_id): + raise ValueError(f"{pin_id} has already been created.") + self.net = net + else: + # Created by the library by duplicating an existing id. + net = engine._find_net_by_pin_id(pin_id) + if not net: + raise ValueError(f"Unexpected pin without a net: {pin_id}") + self.net = net + self.id = f"{self.id}_dup" + self.net.add_pin(self) + + def init(self, mode: Mode = Mode.IN, pull: Optional[Pull] = None) -> None: + if mode != self.mode or pull != self.pull: + self.mode = mode + self.pull = pull + self.net.update() + + def value(self, val: Optional[Literal[0, 1]] = None) -> Optional[Literal[0, 1]]: + """Set or return the pin Value""" + if val is None: + if self.mode != Mode.IN: + raise ValueError(f"{self.id}: is not an input") + level = self.net.level + if level is None: + # Nothing is actively driving the line - we assume that during + # testing, this is an error either in the test setup, or + # something is asking for a value in an uninitialized state. + raise ValueError( + f"{self.id}: value read but nothing is driving the net." + ) + return 1 if level == Level.HIGH else 0 + if val in (0, 1): + if self.mode != Mode.OUT: + raise ValueError(f"{self.id}: is not an output") + nlevel = Level.HIGH if val else Level.LOW + if nlevel != self.level: + self.level = nlevel + self.net.update() + return None + raise RuntimeError(f"{self.id}: Invalid value {val} set on pin.") + + +class Net: + """A set of pins connected to each other.""" + + def __init__( + self, + name: str, + default_level: Level = Level.Z, + monitor: bool = False, + ) -> None: + self.name = name + self._pins: List[FakePin] = [] + self._default_level = default_level + self.level = default_level + self._triggers: List[Callable[["Net"], None]] = [] + self.history = [(engine._start_us, default_level)] if monitor else None + + def update(self) -> None: + """Resolves the state of this net based on all pins connected to it.""" + result = Level.Z + # Try to resolve the state of this net by looking at the pin levels + # for all output pins. + for pin in self._pins: + if pin.mode != Mode.OUT: + continue + if pin.level == result: + continue + if result == Level.Z: + # This pin is now driving the net. + result = pin.level + continue + # There are conflicting pins! + raise ValueError( + f"Conflicting pins on {self.name}: " + f"{pin.id} is {pin.level}, " + f" but net was already at {result}" + ) + # Finally, use any default net state if one was provided. (e.g. a pull-up net.) + result = self._default_level if result == Level.Z else result + + if result != self.level: + # Also record a state change if we're being monitored. + if self.history: + event_us = int(time.monotonic() * 1e6) + self.history.append((event_us, result)) + self.level = result + for trigger in self._triggers: + trigger(self) + + def add_pin(self, pin: FakePin) -> None: + self._pins.append(pin) + + def on_level_change(self, trigger: Callable[["Net"], None]) -> None: + """Calls the trigger whenever the net's level changes.""" + self._triggers.append(trigger) + + def contains_pin_id(self, pin_id: str) -> bool: + """Returns True if the net has a pin with the given id.""" + for pin in self._pins: + if pin.id == pin_id: + return True + return False + + +def stub(method: Callable) -> Callable: + """Decorator to safely insert and remove doubles within tests.""" + + @functools.wraps(method) + def wrapper(*args: Any, **kwds: Any) -> Any: + # First save any objects we're going to replace with a double. + pin_module = digitalio.Pin if hasattr(digitalio, "Pin") else None + try: + digitalio.Pin = FakePin + return method(*args, **kwds) + finally: + # Replace the saved objects after the test runs. + if pin_module: + digitalio.Pin = pin_module + + return wrapper diff --git a/tests/test_adafruit_bitbangio_spi.py b/tests/test_adafruit_bitbangio_spi.py new file mode 100644 index 0000000..4da63f3 --- /dev/null +++ b/tests/test_adafruit_bitbangio_spi.py @@ -0,0 +1,211 @@ +# SPDX-FileCopyrightText: KB Sriram +# SPDX-License-Identifier: MIT + +from typing import Literal, Sequence +import pytest +import simulated_spi as sspi +import simulator as sim +import adafruit_bitbangio + + +_CLOCK_NET = "clock" +_COPI_NET = "copi" +_CIPO_NET = "cipo" +_ENABLE_NET = "enable" + + +def _check_bit( + data: bytearray, + bits_read: int, + last_copi_state: sim.Level, +) -> None: + """Checks that the copi state matches the bit we should be writing.""" + intdata = int.from_bytes(data, "big") + nbits = 8 * len(data) + expected_bit_value = (intdata >> (nbits - bits_read - 1)) & 0x1 + expected_level = sim.Level.HIGH if expected_bit_value else sim.Level.LOW + assert last_copi_state == expected_level + + +def _check_write( + data: bytearray, + change_history: Sequence[sim.Change], + polarity: Literal[0, 1], + phase: Literal[0, 1], + baud: int, +) -> None: + """Checks that the net level changes have a correct sequence of write events.""" + state = "disabled" + last_clock_state = sim.Level.Z + last_copi_state = sim.Level.Z + last_copi_us = 0 + idle, active = ( + (sim.Level.HIGH, sim.Level.LOW) if polarity else (sim.Level.LOW, sim.Level.HIGH) + ) + bits_read = 0 + # We want data to be written at least this long before a read + # transition. + quarter_period = 1e6 / baud / 4 + + for change in change_history: + if ( + state == "disabled" + and change.net_name == _ENABLE_NET + and change.level == sim.Level.HIGH + ): + # In this implementation, we should always start out with the + # clock in the idle state by the time the device is enabled. + assert last_clock_state == idle + bits_read = 0 + state = "wait_for_read" + elif state == "wait_for_read" and change.net_name == _CLOCK_NET: + # phase 0 reads on idle->active, and phase 1 reads on active->idle. + should_read = change.level == active if phase == 0 else change.level == idle + if should_read: + # Check we have the right data + _check_bit(data, bits_read, last_copi_state) + # Check the data was also set early enough. + assert change.time_us - last_copi_us > quarter_period + bits_read += 1 + if bits_read == 8: + return + # Track the last time we changed the clock and data values. + if change.net_name == _COPI_NET: + if last_copi_state != change.level: + last_copi_state = change.level + last_copi_us = change.time_us + elif change.net_name == _CLOCK_NET: + if last_clock_state != change.level: + last_clock_state = change.level + # If we came here, we haven't read enough bits. + pytest.fail("Only {bits_read} bits were read") + + +class TestBitbangSpi: + def setup_method(self) -> None: + sim.engine.reset() + clock = sim.engine.create_net(_CLOCK_NET, monitor=True) + copi = sim.engine.create_net(_COPI_NET, monitor=True) + cipo = sim.engine.create_net(_CIPO_NET, monitor=True) + enable = sim.engine.create_net(_ENABLE_NET, monitor=True) + # pylint: disable=attribute-defined-outside-init + self.clock_pin = sim.FakePin("clock_pin", clock) + self.copi_pin = sim.FakePin("copi_pin", copi) + self.cipo_pin = sim.FakePin("cipo_pin", cipo) + self.enable_pin = sim.FakePin("enable_pin", enable) + self.enable_pin.init(mode=sim.Mode.OUT) + self.spibus = sspi.SpiBus(clock=clock, copi=copi, cipo=cipo, enable=enable) + # pylint: enable=attribute-defined-outside-init + self._enable_net(0) + + def _enable_net(self, val: Literal[0, 1]) -> None: + self.enable_pin.value(val) # pylint: disable=not-callable + + @sim.stub + @pytest.mark.parametrize("baud", [100]) + @pytest.mark.parametrize("polarity", [0, 1]) + @pytest.mark.parametrize("phase", [0, 1]) + @pytest.mark.parametrize("data", ["10101010", "01010101", "01111110", "10000001"]) + def test_write( + self, baud: int, polarity: Literal[0, 1], phase: Literal[0, 1], data: str + ) -> None: + data_array = bytearray(int(data, 2).to_bytes(1, byteorder="big")) + # Send one byte of data into the void to verify write timing. + with adafruit_bitbangio.SPI(clock=self.clock_pin, MOSI=self.copi_pin) as spi: + spi.try_lock() + spi.configure(baudrate=baud, polarity=polarity, phase=phase, bits=8) + self._enable_net(1) + spi.write(data_array) + self._enable_net(0) + + # Monitored nets can be viewed in sigrock by dumping out a VCD file. + # sim.engine.write_vcd(f"/tmp/test_{polarity}_{phase}_{data}.vcd") + _check_write( + data_array, + sim.engine.change_history(), + polarity=polarity, + phase=phase, + baud=baud, + ) + + @sim.stub + @pytest.mark.parametrize("baud", [100]) + @pytest.mark.parametrize("polarity", [0, 1]) + @pytest.mark.parametrize("phase", [0, 1]) + @pytest.mark.parametrize("data", ["10101010", "01010101", "01111110", "10000001"]) + def test_readinto( + self, baud: int, polarity: Literal[0, 1], phase: Literal[0, 1], data: str + ) -> None: + data_int = int(data, 2) + data_array = bytearray(data_int.to_bytes(1, byteorder="big")) + # attach a device that sends a constant. + _ = sspi.Constant( + data=data_array, bus=self.spibus, polarity=polarity, phase=phase + ) + + # Read/write a byte of data + with adafruit_bitbangio.SPI( + clock=self.clock_pin, MOSI=self.copi_pin, MISO=self.cipo_pin + ) as spi: + spi.try_lock() + spi.configure(baudrate=baud, polarity=polarity, phase=phase, bits=8) + self._enable_net(1) + received_data = bytearray(1) + spi.readinto(received_data, write_value=data_int) + self._enable_net(0) + + # Monitored nets can be viewed in sigrock by dumping out a VCD file. + # sim.engine.write_vcd(f"/tmp/test_{polarity}_{phase}_{data}.vcd") + + # Check we read the constant correctly from our device. + assert data_array == received_data + # Check the timing on the data we wrote out. + _check_write( + data_array, + sim.engine.change_history(), + polarity=polarity, + phase=phase, + baud=baud, + ) + + @sim.stub + @pytest.mark.parametrize("baud", [100]) + @pytest.mark.parametrize("polarity", [0, 1]) + @pytest.mark.parametrize("phase", [0, 1]) + @pytest.mark.parametrize( + "data", ["10101010", "01010101", "01111110", "10000001", "1000010101111110"] + ) + def test_write_readinto( + self, baud: int, polarity: Literal[0, 1], phase: Literal[0, 1], data: str + ) -> None: + nbytes = len(data) // 8 + data_array = bytearray(int(data, 2).to_bytes(nbytes, byteorder="big")) + # attach a device that sends a constant. + _ = sspi.Constant( + data=data_array, bus=self.spibus, polarity=polarity, phase=phase + ) + + # Read/write data array + with adafruit_bitbangio.SPI( + clock=self.clock_pin, MOSI=self.copi_pin, MISO=self.cipo_pin + ) as spi: + spi.try_lock() + spi.configure(baudrate=baud, polarity=polarity, phase=phase, bits=8) + self._enable_net(1) + received_data = bytearray(nbytes) + spi.write_readinto(buffer_out=data_array, buffer_in=received_data) + self._enable_net(0) + + # Monitored nets can be viewed in sigrock by dumping out a VCD file. + # sim.engine.write_vcd(f"/tmp/test_{polarity}_{phase}_{data}.vcd") + + # Check we read the constant correctly from our device. + assert data_array == received_data + # Check the timing on the data we wrote out. + _check_write( + data_array, + sim.engine.change_history(), + polarity=polarity, + phase=phase, + baud=baud, + )