From b85eaff4f372cdf35f5e4617926612f320f94127 Mon Sep 17 00:00:00 2001 From: Steve Reckamp Date: Fri, 2 Feb 2024 23:33:22 -0600 Subject: [PATCH] Run the image_classification test --- benchmark/runner/device_manager.py | 73 +++++++++ benchmark/runner/device_under_test.py | 23 ++- benchmark/runner/devices.yaml | 40 ++--- benchmark/runner/dut.yaml | 1 + benchmark/runner/main.py | 227 +++++++++++++------------- benchmark/runner/power_manager.py | 20 ++- benchmark/runner/tests.yaml | 24 +++ 7 files changed, 265 insertions(+), 143 deletions(-) create mode 100644 benchmark/runner/device_manager.py create mode 100644 benchmark/runner/dut.yaml create mode 100644 benchmark/runner/tests.yaml diff --git a/benchmark/runner/device_manager.py b/benchmark/runner/device_manager.py new file mode 100644 index 00000000..6c0805f1 --- /dev/null +++ b/benchmark/runner/device_manager.py @@ -0,0 +1,73 @@ +import yaml + +from serial.tools import list_ports + +from device_under_test import DUT +from io_manager import IOManager +from io_manager_enhanced import IOManagerEnhanced +from power_manager import PowerManager + + +class DeviceManager: + def __init__(self, devices): + self._device_defs = devices + + def __getitem__(self, item): + return self.__dict__[item] + + def __setitem__(self, key, value): + self.__dict__[key] = value + + def get(self, item, default=None): + return self.__dict__.get(item, default) + + def values(self): + return (a for a in self.__dict__.values() if isinstance(a, dict)) + + def _add_device(self, usb, definition): + type = definition.get("type") + if type and (not self.__dict__.get(type) + or definition.get("preference", 0) > self.__dict__[type].get("preference", 0)): + self.__dict__[type] = {k: v for k, v in definition.items()} + self.__dict__[type]["port"] = usb.device + + def _instantiate(self, definition): + args = { + "port_device": definition.get("port") + } + if definition.get("baud"): + args["baud_rate"] = definition.get("baud") + if definition.get("type") == "interface": + definition["instance"] = IOManagerEnhanced(**args) if definition.get("name") == "stm32h573i-dk" \ + else IOManager(**args) + elif definition.get("type") == "power": + definition["instance"] = PowerManager(**args) + elif definition.get("type") == "dut": + definition["instance"] = DUT(**args) + + def scan(self): + """Scan fpr USB serial devices + This scans the connected usb devices. It compares the device definitions + based on the USB vid and pid primarily and then by the text of the description. + """ + pending = [p for p in list_ports.comports(True) if p.vid] + matched = [] + for p in pending: + for d in self._device_defs: + found = False + for vid, pids in d.get("usb", {}).items(): + for pid in (pids if isinstance(pids, list) else [pids]): + if pid == p.pid and vid == p.vid: + self._add_device(p, d) + matched.append(p) + found = True + break + if found: break + for p in (a for a in pending if a not in matched): + for d in (d1 for d1 in self._device_defs if d1.get("usb_description", "zZzZzZzZ") in p.description): + self._add_device(p, d) + matched.append(p) + break + + for d in self.values(): + self._instantiate(d) diff --git a/benchmark/runner/device_under_test.py b/benchmark/runner/device_under_test.py index a933d0cf..8eb657cc 100644 --- a/benchmark/runner/device_under_test.py +++ b/benchmark/runner/device_under_test.py @@ -1,25 +1,32 @@ import re +import sys +import time from interface_device import InterfaceDevice from serial_device import SerialDevice class DUT: - def __init__(self, port_device, baud_rate=115200): + def __init__(self, port_device, baud_rate=115200, power_manager=None): interface = port_device if not isinstance(port_device, InterfaceDevice): interface = SerialDevice(port_device, baud_rate, "m-ready", '%') self._port = interface + self._power_manager = power_manager self._profile = None self._model = None self._name = None def __enter__(self): + if self._power_manager: + self._power_manager.__enter__() self._port.__enter__() return self def __exit__(self, *args): self._port.__exit__(*args) + if self._power_manager: + self._power_manager.__exit__() def _get_name(self): for l in self._port.send_command("name"): @@ -55,12 +62,24 @@ def send_data(self, data): size = len(data) pass + def load(self, data): + self._port.send_command(f"db load {len(data)}") + i = 0 + while i < len(data): + time.sleep(0.5) + # print(".", end='', file=sys.stderr) + cmd = f"db {''.join(f'{d:02x}' for d in data[i:i+32])}" + result = self._port.send_command(cmd) + print(f"{result} ({i})") + i += 32 + # print("", file=sys.stderr) + def infer(self, number, warmups): command = f"infer {number}" if warmups: command += f" {warmups}" self._port.send_command(command) - self._port.send_command("results") + return self._port.send_command("results") def get_help(self): return self._port.send_command("help") diff --git a/benchmark/runner/devices.yaml b/benchmark/runner/devices.yaml index 8ca991c1..d9451667 100644 --- a/benchmark/runner/devices.yaml +++ b/benchmark/runner/devices.yaml @@ -1,20 +1,20 @@ - - name: stm32h573i-dk - usb_description: STLINK - type: interface - preference: 2 - usb: - 0x0483: 0x374E - - name: arduino - port: auto - type: interface - preference: 1 - usb: - 0x2341: - - 0x0043 - - 0x0001 - 0x2a03: - - 0x0043 - - 0x0243 - - name: lpm01a - usb_description: PowerShield - type: power +- name: stm32h573i-dk + usb_description: STLINK + type: interface + preference: 2 + usb: + 0x0483: 0x374E +- name: arduino + port: auto + type: interface + preference: 1 + usb: + 0x2341: + - 0x0043 + - 0x0001 + 0x2a03: + - 0x0043 + - 0x0243 +- name: lpm01a + usb_description: PowerShield + type: power diff --git a/benchmark/runner/dut.yaml b/benchmark/runner/dut.yaml new file mode 100644 index 00000000..292b437d --- /dev/null +++ b/benchmark/runner/dut.yaml @@ -0,0 +1 @@ +voltage: 3000m \ No newline at end of file diff --git a/benchmark/runner/main.py b/benchmark/runner/main.py index a4c93bd1..36203aa3 100644 --- a/benchmark/runner/main.py +++ b/benchmark/runner/main.py @@ -1,21 +1,22 @@ import argparse +import csv +import time +from device_manager import DeviceManager from device_under_test import DUT from io_manager_enhanced import IOManagerEnhanced from power_manager import PowerManager from io_manager import IOManager -from contextlib import nullcontext +from contextlib import nullcontext, ExitStack from serial.tools import list_ports import yaml def test_power(device): with device as power: - for line in power.get_help(): - print(line) - # power.acquire(500) + pass def run_dut_test(): @@ -29,129 +30,123 @@ def run_dut_test(): dut passthrough: profile """ -def test_dut(dut_def): - if dut_def and "instance" in dut_def: - with dut_def.get("power") if "power" in dut_def else nullcontext: - with dut_def["instance"] as dut: - print(dut.get_name()) - print(dut.get_model()) - print(dut.get_profile()) - print(dut.timestamp()) - for l in dut.get_help(): - print(l) + +def test_dut(device): + if device: + with device as dut: + print(dut.get_name()) + print(dut.get_model()) + print(dut.get_profile()) + print(dut.timestamp()) + for l in dut.get_help(): + print(l) def test_io_manager(device): with device as io: - print(io.get_name()) - for l in io.get_help(): - print(l) + print(io.get_name()) + for l in io.get_help(): + print(l) def test_io_manager_enhanced(device): - with device as io: - print(io.get_name()) - print(io.get_waves()) - print(io.play_wave()) - print(io.play_wave("spaceship.wav")) - # waves = io.list_waves() - # io.play_wave(); - # for w in waves: - # print(w) - # # io.play_wave(w) - - -def scan_devices(devices=None): - pending = [p for p in list_ports.comports(True) if p.vid] - matched = [] - for p in pending: - pass - for d in (d1 for d1 in devices if d1.get("usb_description", "zZzZzZzZ") in p.description): - yield p, d - matched.append(p) - break - for p in (a for a in pending if a not in matched): - for d in devices: - found = False - for vid, pids in d.get("usb", {}).items(): - for pid in (pids if isinstance(pids, list) else [pids]): - if pid == p.pid and vid == p.vid: - yield p, d - found =True - break - if found: break - - -def init_dut(dut_def): - if dut_def and "instance" in dut_def: - with dut_def.get("power") if "power" in dut_def else nullcontext: - with dut_def["instance"] as dut: - dut.get_name() - dut.get_model() - dut.get_profile() - -def identify_dut(tools): - interface = tools.get("interface", {}).get("instance") - power = tools.get("power", {}).get("instance") - if not tools.get("dut") and interface and power: - dut = DUT(interface) - tools["dut"] = { - "instance": dut, - "power": power + with device as io: + print(io.get_name()) + print(io.get_waves()) + print(io.play_wave()) + print(io.play_wave("spaceship.wav")) + # waves = io.list_waves() + # io.play_wave(); + # for w in waves: + # print(w) + # # io.play_wave(w) + + +def init_dut(device): + with device as dut: + time.sleep(2) + dut.get_name() + dut.get_model() + dut.get_profile() + +def identify_dut(manager): + interface = manager.get("interface", {}).get("instance") + power = manager.get("power", {}).get("instance") + if not manager.get("dut") and interface and power: + dut = DUT(interface, power_manager=power) + manager["dut"] = { + "instance": dut } else: - dut = tools.get("dut", {}).get("instance") - init_dut(tools.get("dut")) + dut = manager.get("dut", {}).get("instance") + init_dut(dut) + + +def run_test(devices_config, dut_config, test_script): + manager = DeviceManager(devices_config) + manager.scan() + if manager.get("power", {}).get("instance") and dut_config and dut_config.get("voltage"): + manager["power"]["instance"].configure_voltage(dut_config["voltage"]) + identify_dut(manager) + dut = manager.get("dut", {}).get("instance") + test = test_script.get(dut.get_model()) + with open("/Users/sreckamp/eembc/runner/benchmarks/ulp-mlperf/datasets/ic01/y_labels.csv") as file: + reader = csv.DictReader(file, fieldnames=["file", "classes", "class"]) + file_name = reader.__next__()["file"] + with open(f"/Users/sreckamp/eembc/runner/benchmarks/ulp-mlperf/datasets/ic01/{file_name}", mode="rb") as file: + data = file.read() + with dut: + dut.load(data) + print(dut.infer(10, 1)) + # for _def, i in ((t, t.get("instance")) for t in manager.values() if t and t.get("instance")): + # if isinstance(i, PowerManager): + # test_power(i) + # elif isinstance(i, IOManagerEnhanced): + # test_io_manager_enhanced(i) + # elif isinstance(i, IOManager): + # test_io_manager(i) + # elif isinstance(i, DUT): + # test_dut(i) + + +def parse_device_config(device_list, device_yaml): + if device_yaml: + return yaml.load(device_yaml) + else: + with open(device_list) as dev_file: + return yaml.load(dev_file, Loader=yaml.CLoader) -def instantiate(device): - args = { - "port_device": device.get("port") - } - if device.get("baud"): - args["baud_rate"] = device.get("baud") - if device.get("type") == "interface": - device["instance"] = IOManagerEnhanced(**args) if device.get("name") == "stm32h573i-dk" \ - else IOManager(**args) - elif device.get("type") == "power": - device["instance"] = PowerManager(**args) - elif device.get("type") == "dut": - device["instance"] = DUT(**args) - - -def build_tools(devices): - tools = { 'power': None, 'interface': None, 'dut': None } - for port, device in scan_devices(devices): - type = device.get("type") - if type and (not tools.get(type) or device.get("preference", 0) > tools.get(type).get("preference", 0)): - tools[type] = {k: v for k, v in device.items()} - tools[type]["port"] = port.device - - for d in (a for a in tools.values() if a): - instantiate(d) - return tools - - -def run_test(device_list, dut, test_script): - with open(device_list) as dev_file: - devices = yaml.load(dev_file, Loader=yaml.CLoader) - tools = build_tools(devices) - identify_dut(tools) - for _def, i in ((t, t.get("instance")) for t in tools.values() if t and t.get("instance")): - if isinstance(i, PowerManager): - test_power(i) - elif isinstance(i, IOManagerEnhanced): - test_io_manager_enhanced(i) - elif isinstance(i, IOManager): - test_io_manager(i) - elif isinstance(i, DUT): - test_dut(_def) +def parse_dut_config(dut, dut_voltage, dut_baud): + config = {} + if dut: + with open(dut) as dut_file: + dut_config = yaml.load(dut_file, Loader=yaml.CLoader) + config.update(**dut_config) + if dut_voltage: + config.update(voltage=dut_voltage) + if dut_baud: + config.update(baud=dut_baud) + return config + + +def parse_test_script(test_script): + with open(test_script) as test_file: + return yaml.load(test_file, Loader=yaml.CLoader) if __name__ == '__main__': - parser = argparse.ArgumentParser(prog="TestRunner") - parser.add_argument("-d", "--device_list", default="devices.yaml") - parser.add_argument("-u", "--dut", default="dut.yaml") - parser.add_argument("-t", "--test_script", default="test.yaml") - args = parser.parse_args() - run_test(**args.__dict__) + parser = argparse.ArgumentParser(prog="TestRunner") + parser.add_argument("-d", "--device_list", default="devices.yaml") + parser.add_argument("-y", "--device_yaml", required=False) + parser.add_argument("-u", "--dut", required=False) + parser.add_argument("-v", "--dut_voltage", required=False) + parser.add_argument("-b", "--dut_baud", required=False) + parser.add_argument("-t", "--test_script", default="tests.yaml") + args = parser.parse_args() + config = { + "devices_config": parse_device_config(args.device_list, args.device_yaml), + "dut_config": parse_dut_config(args.dut, args.dut_voltage, args.dut_baud), + "test_script": parse_test_script(args.test_script) + } + run_test(**config) diff --git a/benchmark/runner/power_manager.py b/benchmark/runner/power_manager.py index fb6e770f..b69c12af 100644 --- a/benchmark/runner/power_manager.py +++ b/benchmark/runner/power_manager.py @@ -9,6 +9,7 @@ class PowerManager(SerialDevice): def __init__(self, port_device, baud_rate=921600): self._port = SerialDevice(port_device, baud_rate, "ack|error", "\r\n") + self._voltage = "3000m" self._board_id = None self._version = None self._lcd = [None, None] @@ -58,21 +59,26 @@ def _stop_read_thread(self): def _setup(self): # verify board + self._claim_remote_control() print(f"LPM01A Power Monitor Board", file=sys.stderr) print(f"BoardID: {self.get_board_id()}", file=sys.stderr) print(f"Version: {self.get_version()}", file=sys.stderr) print(f"Status: {self.get_status()}", file=sys.stderr) - # connect - self._send_command("htc") self.set_lcd(" mlperf ", " monitor ") self.power_off() # Acquire infinitely self.configure_trigger(0, 0, 'd7', 'fal') self.configure_output('energy', 'ascii_dec', 1000) - self.set_voltage(3000) + self.set_voltage(self._voltage) self.power_on() def _tear_down(self): + self._release_remote_control() + + def _claim_remote_control(self): + self._send_command("htc") + + def _release_remote_control(self): self._send_command("hrc") def get_board_id(self): @@ -114,13 +120,17 @@ def configure_output(self, output_type, output_format, samples_per_second): err_message=f"Error setting samples_per_second to {samples_per_second}") def power_on(self, show_status=False): + self.set_lcd(None, f"{self._voltage : >14}V ") self._send_command(f"pwr on {'' if show_status else 'no'}status", err_message=f"Error turning on power") def power_off(self): self._send_command("pwr off", err_message=f"Error turning off power") - def set_voltage(self, millivolts): - self._send_command(f"volt {millivolts}m", err_message=f"Error setting voltage to {millivolts}mV") + def configure_voltage(self, voltage): + self._voltage = voltage + + def set_voltage(self, voltage): + self._send_command(f"volt {voltage}", err_message=f"Error setting voltage to {voltage}V") def acquire(self, samples=0): self._in_prograss = True diff --git a/benchmark/runner/tests.yaml b/benchmark/runner/tests.yaml new file mode 100644 index 00000000..bdd9133f --- /dev/null +++ b/benchmark/runner/tests.yaml @@ -0,0 +1,24 @@ +ad01: + name: anomoly_detection + model: ad01 + script: + - download + - a +ic01: + name: image_classification + model: ic01 + script: + - download + - a +kws01: + name: keyword_spotting + model: kws01 + script: + - download + - a +vww01: + name: person_detection + model: vww01 + script: + - download + - a