Skip to content

Commit

Permalink
Merge GPIO and flow control bootloader reset (#94)
Browse files Browse the repository at this point in the history
* Expose flow control asynchronously

* Use the correct import

* Use a proper method on the `transport` object

* Merge GPIO and UART reset patterns

* Deprecate `SONOFF`

* Fix type annotations

* Improve logging
  • Loading branch information
puddly authored Jan 13, 2025
1 parent 448c4b3 commit 7301770
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 81 deletions.
41 changes: 41 additions & 0 deletions universal_silabs_flasher/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,3 +236,44 @@ def __repr__(self) -> str:
return f"{concatenated!r}"

return f"{concatenated!r} ({comparable})"


class FlowControlSerialProtocol(zigpy.serial.SerialProtocol):
def _set_signals(
self,
*,
rts: bool | None = None,
cts: bool | None = None,
dtr: bool | None = None,
) -> None:
if rts is not None:
self._transport.serial.rts = rts

if cts is not None:
self._transport.serial.cts = cts

if dtr is not None:
self._transport.serial.dtr = dtr

async def set_signals(
self,
*,
rts: bool | None = None,
cts: bool | None = None,
dtr: bool | None = None,
) -> None:
_LOGGER.debug(
"Setting UART signals: rts=%s, cts=%s, dtr=%s",
int(rts) if rts is not None else "-",
int(cts) if cts is not None else "-",
int(dtr) if dtr is not None else "-",
)

if hasattr(self._transport, "set_signals"):
await self._transport.set_signals(rts=rts, cts=cts, dtr=dtr)
return

loop = asyncio.get_running_loop()
await loop.run_in_executor(
None, lambda: self._set_signals(rts=rts, cts=cts, dtr=dtr)
)
83 changes: 58 additions & 25 deletions universal_silabs_flasher/const.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from __future__ import annotations

import dataclasses
import enum


Expand Down Expand Up @@ -52,32 +55,62 @@ class ResetTarget(enum.Enum):
YELLOW = "yellow"
IHOST = "ihost"
SLZB07 = "slzb07"
SONOFF = "sonoff"
RTS_DTR = "rts_dtr"


@dataclasses.dataclass
class GpioPattern:
pins: dict[str | int, bool]
delay_after: float


@dataclasses.dataclass
class GpioResetConfig:
chip: str | None
chip_type: str | None
pattern: list[GpioPattern]


# fmt: off
GPIO_CONFIGS = {
ResetTarget.YELLOW: {
"chip": "/dev/gpiochip0",
"pin_states": {
24: [True, False, False, True],
25: [True, False, True, True],
},
"toggle_delay": 0.1,
},
ResetTarget.IHOST: {
"chip": "/dev/gpiochip1",
"pin_states": {
27: [True, False, False, True],
26: [True, False, True, True],
},
"toggle_delay": 0.1,
},
ResetTarget.SLZB07: {
"chip_name": "cp210x",
"pin_states": {
5: [True, False, False, True],
4: [True, False, True, True],
},
"toggle_delay": 0.1,
},
ResetTarget.YELLOW: GpioResetConfig(
chip="/dev/gpiochip0",
chip_type=None,
pattern=[
GpioPattern(pins={24: True, 25: True}, delay_after=0.1),
GpioPattern(pins={24: False, 25: False}, delay_after=0.1),
GpioPattern(pins={24: False, 25: True}, delay_after=0.1),
GpioPattern(pins={24: True, 25: True}, delay_after=0.0),
],
),
ResetTarget.IHOST: GpioResetConfig(
chip="/dev/gpiochip1",
chip_type=None,
pattern=[
GpioPattern(pins={26: True, 27: True}, delay_after=0.1),
GpioPattern(pins={26: False, 27: False}, delay_after=0.1),
GpioPattern(pins={26: True, 27: False}, delay_after=0.1),
GpioPattern(pins={26: True, 27: True}, delay_after=0.0),
]
),
ResetTarget.SLZB07: GpioResetConfig(
chip=None,
chip_type="cp210x",
pattern=[
GpioPattern(pins={4: True, 5: True}, delay_after=0.1),
GpioPattern(pins={4: False, 5: False}, delay_after=0.1),
GpioPattern(pins={4: True, 5: False}, delay_after=0.1),
GpioPattern(pins={4: True, 5: True}, delay_after=0.0),
]
),
ResetTarget.RTS_DTR: GpioResetConfig(
chip=None,
chip_type="uart",
pattern=[
GpioPattern(pins={"dtr": False, "rts": True}, delay_after=0.1),
GpioPattern(pins={"dtr": True, "rts": False}, delay_after=0.5),
GpioPattern(pins={"dtr": False, "rts": False}, delay_after=0.0),
]
),
}
# fmt: on
11 changes: 9 additions & 2 deletions universal_silabs_flasher/flash.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def convert(self, value: tuple | str, param: click.Parameter, ctx: click.Context
)
@click.option(
"--bootloader-reset",
type=click.Choice([t.value for t in ResetTarget]),
type=click.Choice([t.value for t in ResetTarget] + ["sonoff"]),
)
@click.pass_context
def main(
Expand Down Expand Up @@ -189,6 +189,13 @@ def main(
param = next(p for p in ctx.command.params if p.name == "device")
raise click.MissingParameter(ctx=ctx, param=param)

if bootloader_reset == "sonoff":
_LOGGER.warning(
"The 'sonoff' reset target is deprecated."
" Use '--bootloader-reset rts_dtr' instead."
)
bootloader_reset = ResetTarget.RTS_DTR.value

ctx.obj = {
"verbosity": verbose,
"flasher": Flasher(
Expand Down Expand Up @@ -339,7 +346,7 @@ async def flash(
flasher._reset_target = ResetTarget.YELLOW
_LOGGER.info(reset_msg, "--yellow-gpio-reset")
elif sonoff_reset:
flasher._reset_target = ResetTarget.SONOFF
flasher._reset_target = ResetTarget.RTS_DTR
_LOGGER.info(reset_msg, "--sonoff-reset")

try:
Expand Down
50 changes: 25 additions & 25 deletions universal_silabs_flasher/flasher.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
import bellows.config
import bellows.ezsp
import bellows.types
from zigpy.serial import SerialProtocol
import zigpy.types

from .common import (
PROBE_TIMEOUT,
FlowControlSerialProtocol,
Version,
asyncio_timeout,
connect_protocol,
Expand Down Expand Up @@ -68,33 +68,33 @@ def __init__(
ResetTarget(bootloader_reset) if bootloader_reset else None
)

async def enter_bootloader_reset(self, target):
async def enter_bootloader_reset(self, target: ResetTarget) -> None:
_LOGGER.info(f"Triggering {target.value} bootloader")
if target in GPIO_CONFIGS.keys():
config = GPIO_CONFIGS[target]
if "chip" not in config.keys():
_LOGGER.warning(
f"When using {target.value} bootloader reset "
+ "ensure no other CP2102 USB serial devices are connected."
)
config["chip"] = await find_gpiochip_by_label(config["chip_name"])
await send_gpio_pattern(
config["chip"], config["pin_states"], config["toggle_delay"]

config = GPIO_CONFIGS[target]
chip = config.chip

if config.chip_type == "cp210x":
_LOGGER.warning(
"When using %s bootloader reset ensure no other CP2102 USB serial"
" devices are connected.",
target.value,
)

chip = await find_gpiochip_by_label(config.chip_type)

if config.chip_type == "uart":
# The baudrate isn't really necessary, since we're using flow control pins
baudrate = self._baudrates[ApplicationType.GECKO_BOOTLOADER][0]

async with connect_protocol(
self._device, baudrate, FlowControlSerialProtocol
) as uart:
for pattern in config.pattern:
await uart.set_signals(**pattern.pins)
await asyncio.sleep(pattern.delay_after)
else:
await self.enter_serial_bootloader()

async def enter_serial_bootloader(self):
baudrate = self._baudrates[ApplicationType.GECKO_BOOTLOADER][0]
async with connect_protocol(self._device, baudrate, SerialProtocol) as sonoff:
serial = sonoff._transport.serial
serial.dtr = False
serial.rts = True
await asyncio.sleep(0.1)
serial.dtr = True
serial.rts = False
await asyncio.sleep(0.5)
serial.dtr = False
await send_gpio_pattern(chip, config.pattern)

def _connect_gecko_bootloader(self, baudrate: int):
return connect_protocol(self._device, baudrate, GeckoBootloaderProtocol)
Expand Down
58 changes: 29 additions & 29 deletions universal_silabs_flasher/gpio.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
from __future__ import annotations

import asyncio
import logging
from os import scandir
import time
import typing

from .const import GpioPattern

_LOGGER = logging.getLogger(__name__)

try:
import gpiod

Expand All @@ -14,45 +19,39 @@

if gpiod is None:
# No gpiod library
def _send_gpio_pattern(
chip: str, pin_states: dict[int, list[bool]], toggle_delay: float
) -> None:
def _send_gpio_pattern(chip: str, pattern: list[GpioPattern]) -> None:
raise NotImplementedError("GPIO not supported on this platform")

elif is_gpiod_v1:
# gpiod <= 1.5.4
def _send_gpio_pattern(
chip: str, pin_states: dict[int, list[bool]], toggle_delay: float
) -> None:
num_states = len(next(iter(pin_states.values())))

def _send_gpio_pattern(chip: str, pattern: list[GpioPattern]) -> None:
chip = gpiod.chip(chip, gpiod.chip.OPEN_BY_PATH)
lines = chip.get_lines(pin_states.keys())
lines = chip.get_lines(pattern[0].pins.keys())

config = gpiod.line_request()
config.consumer = "universal-silabs-flasher"
config.request_type = gpiod.line_request.DIRECTION_OUTPUT

try:
# Open the pins and set their initial states
lines.request(config, [int(states[0]) for states in pin_states.values()])
_LOGGER.debug("Sending GPIO pattern %r", pattern[0])
lines.request(config, [int(v) for v in pattern[0].pins.values()])
time.sleep(pattern[0].delay_after)

# Send all subsequent states
for i in range(1, num_states):
time.sleep(toggle_delay)
lines.set_values([int(states[i]) for states in pin_states.values()])
for p in pattern[1:]:
_LOGGER.debug("Sending GPIO pattern %r", p)
lines.set_values([int(v) for v in p.pins.values()])
time.sleep(p.delay_after)
finally:
# Clean up and ensure the GPIO pins are reset to inputs
lines.set_direction_input()
lines.release()

else:
# gpiod >= 2.0.2
def _send_gpio_pattern(
chip: str, pin_states: dict[int, list[bool]], toggle_delay: float
) -> None:
# `gpiod` isn't available on Windows
num_states = len(next(iter(pin_states.values())))
def _send_gpio_pattern(chip: str, pattern: list[GpioPattern]) -> None:
_LOGGER.debug("Sending GPIO pattern %r", pattern[0])

with gpiod.request_lines(
path=chip,
Expand All @@ -61,27 +60,30 @@ def _send_gpio_pattern(
# Set initial states
pin: gpiod.LineSettings(
direction=gpiod.line.Direction.OUTPUT,
output_value=gpiod.line.Value(states[0]),
output_value=gpiod.line.Value(state),
)
for pin, states in pin_states.items()
for pin, state in pattern[0].pins.items()
},
) as request:
time.sleep(pattern[0].delay_after)

try:
# Send all subsequent states
for i in range(1, num_states):
time.sleep(toggle_delay)
for p in pattern[1:]:
_LOGGER.debug("Sending GPIO pattern %r", p)
request.set_values(
{
pin: gpiod.line.Value(int(pin_states[pin][i]))
for pin, states in pin_states.items()
pin: gpiod.line.Value(int(state))
for pin, state in p.pins.items()
}
)
time.sleep(p.delay_after)
finally:
# Clean up and ensure the GPIO pins are reset to inputs
request.reconfigure_lines(
{
pin: gpiod.LineSettings(direction=gpiod.line.Direction.INPUT)
for pin, states in pin_states.items()
for pin in pattern[0].pins.keys()
}
)

Expand Down Expand Up @@ -119,9 +121,7 @@ async def find_gpiochip_by_label(label: str) -> str:
return result


async def send_gpio_pattern(
chip: str, pin_states: dict[int, list[bool]], toggle_delay: float
) -> None:
async def send_gpio_pattern(chip: str, pattern: list[GpioPattern]) -> None:
await asyncio.get_running_loop().run_in_executor(
None, _send_gpio_pattern, chip, pin_states, toggle_delay
None, _send_gpio_pattern, chip, pattern
)

0 comments on commit 7301770

Please sign in to comment.