From 68c39fb3e2e59b367771ba1d4b6db2f509bd30f9 Mon Sep 17 00:00:00 2001 From: Greg Hogan Date: Tue, 12 Nov 2019 18:52:41 -0800 Subject: [PATCH] uds: no need for threads if you always drain rx --- python/uds.py | 129 +++++++++++++++++++++++--------------------------- 1 file changed, 60 insertions(+), 69 deletions(-) diff --git a/python/uds.py b/python/uds.py index 1aaeda08ec2006..eb904bfa4a1b71 100644 --- a/python/uds.py +++ b/python/uds.py @@ -1,10 +1,8 @@ #!/usr/bin/env python3 import time import struct -from typing import NamedTuple, List +from typing import Callable, NamedTuple, Tuple, List from enum import IntEnum -from queue import Queue, Empty -from threading import Thread from binascii import hexlify class SERVICE_TYPE(IntEnum): @@ -271,14 +269,50 @@ class InvalidSubFunctioneError(Exception): 0x93: 'voltage too low', } +class CanClient(): + def __init__(self, can_send: Callable[[Tuple[int, bytes, int]], None], can_recv: Callable[[], List[Tuple[int, int, bytes, int]]], tx_addr: int, rx_addrs: int, bus: int, debug: bool=False): + self.tx = can_send + self.rx = can_recv + self.tx_addr = tx_addr + self.rx_addrs = rx_addrs + self.bus = bus + self.debug = debug + + def recv(self, drain=False) -> List[bytes]: + msg_array = [] + while True: + msgs = self.rx() + if drain: + if self.debug: print("CAN-RX: drain - {}".format(len(msgs))) + else: + for rx_addr, rx_ts, rx_data, rx_bus in msgs or []: + if rx_bus == self.bus and rx_addr in self.rx_addrs and len(rx_data) > 0: + if self.debug: print("CAN-RX: {} - {}".format(hex(rx_addr), hexlify(rx_data))) + msg_array.append(rx_data) + # break when non-full buffer is processed + if len(msgs) < 254: + return msg_array + + def send(self, msgs: List[bytes], delay: float=0) -> None: + first = True + for msg in msgs: + if not first and delay: + if self.debug: print(f"CAN-TX: delay - {delay}") + time.sleep(delay) + if self.debug: print("CAN-TX: {} - {}".format(hex(self.tx_addr), hexlify(msg))) + self.tx(self.tx_addr, msg, self.bus) + first = False + class IsoTpMessage(): - def __init__(self, can_tx_queue: Queue, can_rx_queue: Queue, timeout: float, debug: bool=False): - self.can_tx_queue = can_tx_queue - self.can_rx_queue = can_rx_queue + def __init__(self, can_client: CanClient, timeout: float=1, debug: bool=False): + self._can_client = can_client self.timeout = timeout self.debug = debug def send(self, dat: bytes) -> None: + # throw away any stale data + self._can_client.recv(drain=True) + self.tx_dat = dat self.tx_len = len(dat) self.tx_idx = 0 @@ -297,7 +331,7 @@ def _tx_first_frame(self) -> None: # first frame (send first 6 bytes) if self.debug: print("ISO-TP: TX - first frame") msg = (struct.pack("!H", 0x1000 | self.tx_len) + self.tx_dat[:6]).ljust(8, b"\x00") - self.can_tx_queue.put(msg) + self._can_client.send([msg]) def recv(self) -> bytes: self.rx_dat = b"" @@ -305,19 +339,19 @@ def recv(self) -> bytes: self.rx_idx = 0 self.rx_done = False + start_time = time.time() try: while True: - self._isotp_rx_next() - if self.tx_done and self.rx_done: - return self.rx_dat - except Empty: - raise MessageTimeoutError("timeout waiting for response") + for msg in self._can_client.recv(): + self._isotp_rx_next(msg) + if self.tx_done and self.rx_done: + return self.rx_dat + if time.time() - start_time > self.timeout: + raise MessageTimeoutError("timeout waiting for response") finally: if self.debug: print(f"ISO-TP: RESPONSE - {hexlify(self.rx_dat)}") - def _isotp_rx_next(self) -> None: - rx_data = self.can_rx_queue.get(block=True, timeout=self.timeout) - + def _isotp_rx_next(self, rx_data: bytes) -> None: # single rx_frame if rx_data[0] >> 4 == 0x0: self.rx_len = rx_data[0] & 0xFF @@ -337,9 +371,9 @@ def _isotp_rx_next(self) -> None: if self.debug: print(f"ISO-TP: TX - flow control continue") # send flow control message (send all bytes) msg = b"\x30\x00\x00".ljust(8, b"\x00") - self.can_tx_queue.put(msg) + self._can_client.send([msg]) return - + # consecutive rx frame if rx_data[0] >> 4 == 0x2: assert self.rx_done == False, "isotp - rx: consecutive frame with no active frame" @@ -362,19 +396,19 @@ def _isotp_rx_next(self) -> None: delay_ts = rx_data[2] & 0x7F # scale is 1 milliseconds if first bit == 0, 100 micro seconds if first bit == 1 delay_div = 1000. if rx_data[2] & 0x80 == 0 else 10000. + delay_sec = delay_ts / delay_div # first frame = 6 bytes, each consecutive frame = 7 bytes start = 6 + self.tx_idx * 7 count = rx_data[1] end = start + count * 7 if count > 0 else self.tx_len + tx_msgs = [] for i in range(start, end, 7): - if delay_ts > 0 and i > start: - delay_s = delay_ts / delay_div - if self.debug: print(f"ISO-TP: TX - delay - seconds={delay_s}") - time.sleep(delay_s) self.tx_idx += 1 - # consecutive tx frames + # consecutive tx messages msg = (bytes([0x20 | (self.tx_idx & 0xF)]) + self.tx_dat[i:i+7]).ljust(8, b"\x00") - self.can_tx_queue.put(msg) + tx_msgs.append(msg) + # send consecutive tx messages + self._can_client.send(tx_msgs, delay=delay_sec) if end >= self.tx_len: self.tx_done = True if self.debug: print(f"ISO-TP: TX - consecutive frame - idx={self.tx_idx} done={self.tx_done}") @@ -383,8 +417,7 @@ def _isotp_rx_next(self) -> None: if self.debug: print("ISO-TP: TX - flow control wait") class UdsClient(): - def __init__(self, panda, tx_addr: int, rx_addr: int=None, bus: int=0, timeout: float=10, debug: bool=False): - self.panda = panda + def __init__(self, panda, tx_addr: int, rx_addr: int=None, bus: int=0, timeout: float=1, debug: bool=False): self.bus = bus self.tx_addr = tx_addr if rx_addr is None: @@ -396,54 +429,12 @@ def __init__(self, panda, tx_addr: int, rx_addr: int=None, bus: int=0, timeout: self.rx_addr = (tx_addr & 0xFFFF0000) + (tx_addr<<8 & 0xFF00) + (tx_addr>>8 & 0xFF) else: raise ValueError("invalid tx_addr: {}".format(tx_addr)) - - self.can_tx_queue = Queue() - self.can_rx_queue = Queue() self.timeout = timeout self.debug = debug - - self.can_thread = Thread(target=self._can_thread, args=(self.debug,)) - self.can_thread.daemon = True - self.can_thread.start() - - def _can_thread(self, debug: bool=False): - try: - while True: - # send - tx_cnt = 0 - while tx_cnt < 256 and not self.can_tx_queue.empty(): - try: - msg = self.can_tx_queue.get(block=False) - tx_cnt += 1 - if debug: print("CAN-TX: {} - {}".format(hex(self.tx_addr), hexlify(msg))) - self.panda.can_send(self.tx_addr, msg, self.bus) - except Empty: - pass - - # receive - rx_cnt = 0 - while rx_cnt < 4096: - msgs = self.panda.can_recv() - if not msgs: - break - rx_cnt += len(msgs) - for rx_addr, rx_ts, rx_data, rx_bus in msgs: - if rx_bus != self.bus or rx_addr != self.rx_addr or len(rx_data) == 0: - continue - if debug: print("CAN-RX: {} - {}".format(hex(self.rx_addr), hexlify(rx_data))) - self.can_rx_queue.put(rx_data) - finally: - self.panda.close() + self._can_client = CanClient(panda.can_send, panda.can_recv, self.tx_addr, [self.rx_addr], self.bus, debug=self.debug) # generic uds request def _uds_request(self, service_type: SERVICE_TYPE, subfunction: int=None, data: bytes=None) -> bytes: - # throw away any stale data - while not self.can_rx_queue.empty(): - try: - self.can_rx_queue.get(block=False) - except Empty: - pass - req = bytes([service_type]) if subfunction is not None: req += bytes([subfunction]) @@ -451,7 +442,7 @@ def _uds_request(self, service_type: SERVICE_TYPE, subfunction: int=None, data: req += data # send request, wait for response - isotp_msg = IsoTpMessage(self.can_tx_queue, self.can_rx_queue, self.timeout, self.debug) + isotp_msg = IsoTpMessage(self._can_client, self.timeout, self.debug) isotp_msg.send(req) while True: resp = isotp_msg.recv()