diff --git a/examples/__init__.py b/examples/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/hello_esp32_idf/.gitignore b/examples/hello_esp32_idf/.gitignore new file mode 100644 index 0000000..95e4ce5 --- /dev/null +++ b/examples/hello_esp32_idf/.gitignore @@ -0,0 +1,4 @@ +# Ignore the firmware files, as they are downloaded from the internet +hello_world.bin +hello_world.elf +build/ diff --git a/examples/hello_esp32_idf/__init__.py b/examples/hello_esp32_idf/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/hello_esp32_idf/diagram.json b/examples/hello_esp32_idf/diagram.json new file mode 100644 index 0000000..678759d --- /dev/null +++ b/examples/hello_esp32_idf/diagram.json @@ -0,0 +1,22 @@ +{ + "version": 1, + "author": "Uri Shaked", + "editor": "wokwi", + "parts": [ + { + "type": "board-esp32-devkit-c-v4", + "id": "esp", + "top": 0, + "left": 0, + "attrs": { "fullBoot": "1" } + } + ], + "connections": [ + ["esp:TX", "$serialMonitor:RX", "", []], + ["esp:RX", "$serialMonitor:TX", "", []] + ], + "serialMonitor": { + "display": "terminal" + }, + "dependencies": {} +} diff --git a/examples/hello_esp32_idf/main.py b/examples/hello_esp32_idf/main.py new file mode 100644 index 0000000..fee612a --- /dev/null +++ b/examples/hello_esp32_idf/main.py @@ -0,0 +1,79 @@ +# SPDX-License-Identifier: MIT +# Copyright (C) 2025, CodeMagic LTD + +import asyncio +import os +from pathlib import Path + +from examples.helper.github_download import download_github_dir +from wokwi_client import GET_TOKEN_URL, WokwiClient + +# sys.path.append(str(Path(__file__).parent.parent)) +# from github_download import download_github_dir + +EXAMPLE_DIR = Path(__file__).parent +USER = "espressif" +REPO = "pytest-embedded" +PATH = "tests/fixtures/hello_world_esp32/build" +REF = "7e66a07870d1cd97a454318892c6f6225def3144" + +SLEEP_TIME = int(os.getenv("WOKWI_SLEEP_TIME", "10")) + + +async def main() -> None: + token = os.getenv("WOKWI_CLI_TOKEN") + if not token: + raise SystemExit( + f"Set WOKWI_CLI_TOKEN in your environment. You can get it from {GET_TOKEN_URL}." + ) + + # Automatically download build files from GitHub if missing + build_dir = EXAMPLE_DIR / "build" + download_github_dir( + user=USER, + repo=REPO, + path=PATH, + base_path=build_dir, + ref=REF, + ) + + client = WokwiClient(token) + print(f"Wokwi client library version: {client.version}") + + hello = await client.connect() + print("Connected to Wokwi Simulator, server version:", hello["version"]) + + # Upload the diagram and firmware files + await client.upload_file("diagram.json", EXAMPLE_DIR / "diagram.json") + filename = await client.upload_file( + "flasher_args.json", EXAMPLE_DIR / "build" / "flasher_args.json" + ) + + # Start the simulation + await client.start_simulation( + firmware=filename, + ) + + # Stream serial output for a few seconds + serial_task = asyncio.create_task(client.serial_monitor_cat()) + + # # Alternative lambda version + # serial_task = client.serial_monitor( + # lambda line: print(line.decode("utf-8", errors="replace"), end="", flush=True) + # ) + + # delay 2 seconds + await asyncio.sleep(2) + + # await client.set_control("dsa", "dsdsa", 1) + + print(f"Simulation started, waiting for {SLEEP_TIME} seconds…") + await client.wait_until_simulation_time(SLEEP_TIME) + serial_task.cancel() + + # Disconnect from the simulator + await client.disconnect() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/helper/__init__.py b/examples/helper/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/helper/github_download.py b/examples/helper/github_download.py new file mode 100644 index 0000000..ad644b2 --- /dev/null +++ b/examples/helper/github_download.py @@ -0,0 +1,27 @@ +from pathlib import Path + +import requests + + +def download_file(url: str, dest: Path) -> None: + response = requests.get(url) + response.raise_for_status() + dest.parent.mkdir(parents=True, exist_ok=True) + with open(dest, "wb") as f: + f.write(response.content) + + +def download_github_dir( + user: str, repo: str, path: str, base_path: Path, ref: str = "main" +) -> None: + api_url = f"https://api.github.com/repos/{user}/{repo}/contents/{path}?ref={ref}" + response = requests.get(api_url) + response.raise_for_status() + items = response.json() + for item in items: + if item["type"] == "file": + print(f"Downloading {base_path / item['name']}...") + download_file(item["download_url"], base_path / item["name"]) + elif item["type"] == "dir": + subdir_name = item["name"] + download_github_dir(user, repo, f"{path}/{subdir_name}", base_path / subdir_name, ref) diff --git a/src/wokwi_client/client.py b/src/wokwi_client/client.py index da02a80..3ab8841 100644 --- a/src/wokwi_client/client.py +++ b/src/wokwi_client/client.py @@ -81,15 +81,20 @@ async def upload(self, name: str, content: bytes) -> None: """ await upload(self._transport, name, content) - async def upload_file(self, filename: str, local_path: Optional[Path] = None) -> None: + async def upload_file(self, filename: str, local_path: Optional[Path] = None) -> str: """ Upload a local file to the simulator. + If you specify the local_path to the file `flasher_args.json` (IDF flash information), + the contents of the file will be processed and the correct firmware file will be + uploaded instead, returning the firmware filename. Args: filename: The name to use for the uploaded file. local_path: Optional path to the local file. If not provided, uses filename as the path. + Returns: + The filename of the uploaded file (useful for idf when uploading flasher_args.json). """ - await upload_file(self._transport, filename, local_path) + return await upload_file(self._transport, filename, local_path) async def download(self, name: str) -> bytes: """ diff --git a/src/wokwi_client/file_ops.py b/src/wokwi_client/file_ops.py index f8e1def..67a3e16 100644 --- a/src/wokwi_client/file_ops.py +++ b/src/wokwi_client/file_ops.py @@ -6,6 +6,8 @@ from pathlib import Path from typing import Optional +from wokwi_client.idf import resolveIdfFirmware + from .models import UploadParams from .protocol_types import ResponseMessage from .transport import Transport @@ -13,10 +15,15 @@ async def upload_file( transport: Transport, filename: str, local_path: Optional[Path] = None -) -> ResponseMessage: - path = Path(local_path or filename) - content = path.read_bytes() - return await upload(transport, filename, content) +) -> str: + firmware_path = local_path or filename + if str(firmware_path).endswith("flasher_args.json"): + filename = "firmware.bin" + content = resolveIdfFirmware(str(firmware_path)) + else: + content = Path(firmware_path).read_bytes() + await upload(transport, filename, content) + return filename async def upload(transport: Transport, name: str, content: bytes) -> ResponseMessage: diff --git a/src/wokwi_client/idf.py b/src/wokwi_client/idf.py new file mode 100644 index 0000000..6d82eb1 --- /dev/null +++ b/src/wokwi_client/idf.py @@ -0,0 +1,81 @@ +import json +import os +from typing import TypedDict + +MAX_FIRMWARE_SIZE = 4 * 1024 * 1024 # 4MB + + +class FirmwarePart(TypedDict): + offset: int + data: bytes + + +def resolveIdfFirmware(flasher_args_path: str) -> bytes: + """ + Resolve ESP32 firmware from flasher_args.json file. + Implemented based on the logic from the wokwi-cli. + - https://github.com/wokwi/wokwi-cli/blob/1726692465f458420f71bc4dbd100aeedf2e37bb/src/uploadFirmware.ts + + More about flasher_args.json: + - https://docs.espressif.com/projects/esp-idf/en/release-v5.5/esp32/api-guides/build-system.html + + Args: + flasher_args_path: Path to the flasher_args.json file + + Returns: + Combined firmware binary data as bytes + + Raises: + ValueError: If flasher_args.json is invalid or files are missing + FileNotFoundError: If required firmware files are not found + """ + try: + with open(flasher_args_path) as f: + flasher_args = json.load(f) + except (json.JSONDecodeError, FileNotFoundError) as e: + raise ValueError(f"Failed to read flasher_args.json: {e}") + + if "flash_files" not in flasher_args: + raise ValueError("flash_files is not defined in flasher_args.json") + + firmware_parts: list[FirmwarePart] = [] + firmware_size = 0 + flasher_dir = os.path.dirname(flasher_args_path) + + # Process each flash file entry + for offset_str, file_path in flasher_args["flash_files"].items(): + try: + offset = int(offset_str, 16) + except ValueError: + raise ValueError(f"Invalid offset in flasher_args.json flash_files: {offset_str}") + + full_file_path = os.path.join(flasher_dir, file_path) + + try: + with open(full_file_path, "rb") as f: + data = f.read() + except FileNotFoundError: + raise FileNotFoundError(f"Firmware file not found: {full_file_path}") + + firmware_parts.append({"offset": offset, "data": data}) + firmware_size = max(firmware_size, offset + len(data)) + + if firmware_size > MAX_FIRMWARE_SIZE: + raise ValueError( + f"Firmware size ({firmware_size} bytes) exceeds the maximum supported size ({MAX_FIRMWARE_SIZE} bytes)" + ) + + # Create combined firmware binary + firmware_data = bytearray(firmware_size) + + # Fill with 0xFF (typical flash erased state) + for i in range(firmware_size): + firmware_data[i] = 0xFF + + # Write each firmware part to the correct offset + for part in firmware_parts: + offset = part["offset"] + data = part["data"] + firmware_data[offset : offset + len(data)] = data + + return bytes(firmware_data) diff --git a/tests/test_hello_esp32.py b/tests/test_hello_esp32.py index 21f0e31..fb78e21 100644 --- a/tests/test_hello_esp32.py +++ b/tests/test_hello_esp32.py @@ -17,3 +17,10 @@ def test_hello_esp32_sync_example() -> None: result = run_example_module("examples.hello_esp32_sync.main") assert result.returncode == 0 assert "main_task: Calling app_main()" in result.stdout + + +def test_hello_esp32_idf_example() -> None: + """Sync hello_esp32 example should run and exit with 0.""" + result = run_example_module("examples.hello_esp32_idf.main") + assert result.returncode == 0 + assert "cpu_start: Starting scheduler on APP CPU" in result.stdout