|
| 1 | +#!/usr/bin/env python3 |
| 2 | +import capnp |
| 3 | +import time |
| 4 | +import traceback |
| 5 | +from typing import Optional, Set, Tuple |
| 6 | + |
| 7 | +import cereal.messaging as messaging |
| 8 | +from panda.python.uds import SERVICE_TYPE |
| 9 | +from selfdrive.car import make_can_msg |
| 10 | +from selfdrive.boardd.boardd import can_list_to_can_capnp |
| 11 | +from system.swaglog import cloudlog |
| 12 | + |
| 13 | + |
| 14 | +def make_tester_present_msg(addr, bus, subaddr=None): |
| 15 | + dat = [0x02, SERVICE_TYPE.TESTER_PRESENT, 0x0] |
| 16 | + if subaddr is not None: |
| 17 | + dat.insert(0, subaddr) |
| 18 | + |
| 19 | + dat.extend([0x0] * (8 - len(dat))) |
| 20 | + return make_can_msg(addr, bytes(dat), bus) |
| 21 | + |
| 22 | + |
| 23 | +def is_tester_present_response(msg: capnp.lib.capnp._DynamicStructReader, subaddr: Optional[int] = None) -> bool: |
| 24 | + # ISO-TP messages are always padded to 8 bytes |
| 25 | + # tester present response is always a single frame |
| 26 | + dat_offset = 1 if subaddr is not None else 0 |
| 27 | + if len(msg.dat) == 8 and 1 <= msg.dat[dat_offset] <= 7: |
| 28 | + # success response |
| 29 | + if msg.dat[dat_offset + 1] == (SERVICE_TYPE.TESTER_PRESENT + 0x40): |
| 30 | + return True |
| 31 | + # error response |
| 32 | + if msg.dat[dat_offset + 1] == 0x7F and msg.dat[dat_offset + 2] == SERVICE_TYPE.TESTER_PRESENT: |
| 33 | + return True |
| 34 | + return False |
| 35 | + |
| 36 | + |
| 37 | +def get_all_ecu_addrs(logcan: messaging.SubSocket, sendcan: messaging.PubSocket, bus: int, timeout: float = 1, debug: bool = True) -> Set[Tuple[int, Optional[int], int]]: |
| 38 | + addr_list = [0x700 + i for i in range(256)] + [0x18da00f1 + (i << 8) for i in range(256)] |
| 39 | + queries: Set[Tuple[int, Optional[int], int]] = {(addr, None, bus) for addr in addr_list} |
| 40 | + responses = queries |
| 41 | + return get_ecu_addrs(logcan, sendcan, queries, responses, timeout=timeout, debug=debug) |
| 42 | + |
| 43 | + |
| 44 | +def get_ecu_addrs(logcan: messaging.SubSocket, sendcan: messaging.PubSocket, queries: Set[Tuple[int, Optional[int], int]], |
| 45 | + responses: Set[Tuple[int, Optional[int], int]], timeout: float = 1, debug: bool = False) -> Set[Tuple[int, Optional[int], int]]: |
| 46 | + ecu_responses: Set[Tuple[int, Optional[int], int]] = set() # set((addr, subaddr, bus),) |
| 47 | + try: |
| 48 | + msgs = [make_tester_present_msg(addr, bus, subaddr) for addr, subaddr, bus in queries] |
| 49 | + |
| 50 | + messaging.drain_sock_raw(logcan) |
| 51 | + sendcan.send(can_list_to_can_capnp(msgs, msgtype='sendcan')) |
| 52 | + start_time = time.monotonic() |
| 53 | + while time.monotonic() - start_time < timeout: |
| 54 | + can_packets = messaging.drain_sock(logcan, wait_for_one=True) |
| 55 | + for packet in can_packets: |
| 56 | + for msg in packet.can: |
| 57 | + subaddr = None if (msg.address, None, msg.src) in responses else msg.dat[0] |
| 58 | + if (msg.address, subaddr, msg.src) in responses and is_tester_present_response(msg, subaddr): |
| 59 | + if debug: |
| 60 | + print(f"CAN-RX: {hex(msg.address)} - 0x{bytes.hex(msg.dat)}") |
| 61 | + if (msg.address, subaddr, msg.src) in ecu_responses: |
| 62 | + print(f"Duplicate ECU address: {hex(msg.address)}") |
| 63 | + ecu_responses.add((msg.address, subaddr, msg.src)) |
| 64 | + except Exception: |
| 65 | + cloudlog.warning(f"ECU addr scan exception: {traceback.format_exc()}") |
| 66 | + return ecu_responses |
| 67 | + |
| 68 | + |
| 69 | +if __name__ == "__main__": |
| 70 | + import argparse |
| 71 | + |
| 72 | + parser = argparse.ArgumentParser(description='Get addresses of all ECUs') |
| 73 | + parser.add_argument('--debug', action='store_true') |
| 74 | + args = parser.parse_args() |
| 75 | + |
| 76 | + logcan = messaging.sub_sock('can') |
| 77 | + sendcan = messaging.pub_sock('sendcan') |
| 78 | + |
| 79 | + time.sleep(1.0) |
| 80 | + |
| 81 | + print("Getting ECU addresses ...") |
| 82 | + ecu_addrs = get_all_ecu_addrs(logcan, sendcan, 1, debug=args.debug) |
| 83 | + |
| 84 | + print() |
| 85 | + print("Found ECUs on addresses:") |
| 86 | + for addr, subaddr, bus in ecu_addrs: |
| 87 | + msg = f" 0x{hex(addr)}" |
| 88 | + if subaddr is not None: |
| 89 | + msg += f" (sub-address: 0x{hex(subaddr)})" |
| 90 | + print(msg) |
0 commit comments