diff --git a/python/uds.py b/python/uds.py index 9e496e9f81..5bf35088ea 100644 --- a/python/uds.py +++ b/python/uds.py @@ -376,13 +376,30 @@ def send(self, msgs: List[bytes], delay: float = 0) -> None: self._recv_buffer() class IsoTpMessage(): - def __init__(self, can_client: CanClient, timeout: float = 1, single_frame_mode: bool = False, debug: bool = False, max_len: int = 8): + def __init__(self, can_client: CanClient, timeout: float = 1, single_frame_mode: bool = False, separation_time: float = 0, + debug: bool = False, max_len: int = 8): self._can_client = can_client self.timeout = timeout self.single_frame_mode = single_frame_mode self.debug = debug self.max_len = max_len + # <= 127, separation time in milliseconds + # 0xF1 to 0xF9 UF, 100 to 900 microseconds + if 1e-4 <= separation_time <= 9e-4: + offset = int(round(separation_time, 4) * 1e4) - 1 + separation_time = 0xF1 + offset + elif 0 <= separation_time <= 0.127: + separation_time = round(separation_time * 1000) + else: + raise Exception("Separation time not in range") + + self.flow_control_msg = bytes([ + 0x30, # flow control + 0x01 if self.single_frame_mode else 0x00, # block size + separation_time, + ]).ljust(self.max_len, b"\x00") + def send(self, dat: bytes, setup_only: bool = False) -> None: # throw away any stale data self._can_client.recv(drain=True) @@ -461,12 +478,7 @@ def _isotp_rx_next(self, rx_data: bytes) -> None: if self.debug: print(f"ISO-TP: TX - flow control continue - {hex(self._can_client.tx_addr)}") # send flow control message - msg = bytes([ - 0x30, # flow control - 0x01 if self.single_frame_mode else 0x00, # block size - 0x0a, # 10 ms separation time - ]).ljust(self.max_len, b"\x00") - self._can_client.send([msg]) + self._can_client.send([self.flow_control_msg]) return # consecutive rx frame @@ -480,8 +492,7 @@ def _isotp_rx_next(self, rx_data: bytes) -> None: self.rx_done = True elif self.single_frame_mode: # notify ECU to send next frame - msg = b"\x30\x01\x0a".ljust(self.max_len, b"\x00") - self._can_client.send([msg]) + self._can_client.send([self.flow_control_msg]) if self.debug: print(f"ISO-TP: RX - consecutive frame - {hex(self._can_client.rx_addr)} idx={self.rx_idx} done={self.rx_done}") return