Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added examples/__init__.py
Empty file.
4 changes: 4 additions & 0 deletions examples/hello_esp32_idf/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Ignore the firmware files, as they are downloaded from the internet
hello_world.bin
hello_world.elf
build/
Empty file.
22 changes: 22 additions & 0 deletions examples/hello_esp32_idf/diagram.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"version": 1,
"author": "Uri Shaked",
"editor": "wokwi",
"parts": [
{
"type": "board-esp32-devkit-c-v4",
"id": "esp",
"top": 0,
"left": 0,
"attrs": { "fullBoot": "1" }
}
],
"connections": [
["esp:TX", "$serialMonitor:RX", "", []],
["esp:RX", "$serialMonitor:TX", "", []]
],
"serialMonitor": {
"display": "terminal"
},
"dependencies": {}
}
79 changes: 79 additions & 0 deletions examples/hello_esp32_idf/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# SPDX-License-Identifier: MIT
# Copyright (C) 2025, CodeMagic LTD

import asyncio
import os
from pathlib import Path

from examples.helper.github_download import download_github_dir
from wokwi_client import GET_TOKEN_URL, WokwiClient

# sys.path.append(str(Path(__file__).parent.parent))
# from github_download import download_github_dir

EXAMPLE_DIR = Path(__file__).parent
USER = "espressif"
REPO = "pytest-embedded"
PATH = "tests/fixtures/hello_world_esp32/build"
REF = "7e66a07870d1cd97a454318892c6f6225def3144"

SLEEP_TIME = int(os.getenv("WOKWI_SLEEP_TIME", "10"))


async def main() -> None:
token = os.getenv("WOKWI_CLI_TOKEN")
if not token:
raise SystemExit(
f"Set WOKWI_CLI_TOKEN in your environment. You can get it from {GET_TOKEN_URL}."
)

# Automatically download build files from GitHub if missing
build_dir = EXAMPLE_DIR / "build"
download_github_dir(
user=USER,
repo=REPO,
path=PATH,
base_path=build_dir,
ref=REF,
)

client = WokwiClient(token)
print(f"Wokwi client library version: {client.version}")

hello = await client.connect()
print("Connected to Wokwi Simulator, server version:", hello["version"])

# Upload the diagram and firmware files
await client.upload_file("diagram.json", EXAMPLE_DIR / "diagram.json")
filename = await client.upload_file(
"flasher_args.json", EXAMPLE_DIR / "build" / "flasher_args.json"
)

# Start the simulation
await client.start_simulation(
firmware=filename,
)

# Stream serial output for a few seconds
serial_task = asyncio.create_task(client.serial_monitor_cat())

# # Alternative lambda version
# serial_task = client.serial_monitor(
# lambda line: print(line.decode("utf-8", errors="replace"), end="", flush=True)
# )

# delay 2 seconds
await asyncio.sleep(2)

# await client.set_control("dsa", "dsdsa", 1)

print(f"Simulation started, waiting for {SLEEP_TIME} seconds…")
await client.wait_until_simulation_time(SLEEP_TIME)
serial_task.cancel()

# Disconnect from the simulator
await client.disconnect()


if __name__ == "__main__":
asyncio.run(main())
Empty file added examples/helper/__init__.py
Empty file.
27 changes: 27 additions & 0 deletions examples/helper/github_download.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from pathlib import Path

import requests


def download_file(url: str, dest: Path) -> None:
response = requests.get(url)
response.raise_for_status()
dest.parent.mkdir(parents=True, exist_ok=True)
with open(dest, "wb") as f:
f.write(response.content)


def download_github_dir(
user: str, repo: str, path: str, base_path: Path, ref: str = "main"
) -> None:
api_url = f"https://api.github.com/repos/{user}/{repo}/contents/{path}?ref={ref}"
response = requests.get(api_url)
response.raise_for_status()
items = response.json()
for item in items:
if item["type"] == "file":
print(f"Downloading {base_path / item['name']}...")
download_file(item["download_url"], base_path / item["name"])
elif item["type"] == "dir":
subdir_name = item["name"]
download_github_dir(user, repo, f"{path}/{subdir_name}", base_path / subdir_name, ref)
9 changes: 7 additions & 2 deletions src/wokwi_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,20 @@ async def upload(self, name: str, content: bytes) -> None:
"""
await upload(self._transport, name, content)

async def upload_file(self, filename: str, local_path: Optional[Path] = None) -> None:
async def upload_file(self, filename: str, local_path: Optional[Path] = None) -> str:
"""
Upload a local file to the simulator.
If you specify the local_path to the file `flasher_args.json` (IDF flash information),
the contents of the file will be processed and the correct firmware file will be
uploaded instead, returning the firmware filename.

Args:
filename: The name to use for the uploaded file.
local_path: Optional path to the local file. If not provided, uses filename as the path.
Returns:
The filename of the uploaded file (useful for idf when uploading flasher_args.json).
"""
await upload_file(self._transport, filename, local_path)
return await upload_file(self._transport, filename, local_path)

async def download(self, name: str) -> bytes:
"""
Expand Down
15 changes: 11 additions & 4 deletions src/wokwi_client/file_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,24 @@
from pathlib import Path
from typing import Optional

from wokwi_client.idf import resolveIdfFirmware

from .models import UploadParams
from .protocol_types import ResponseMessage
from .transport import Transport


async def upload_file(
transport: Transport, filename: str, local_path: Optional[Path] = None
) -> ResponseMessage:
path = Path(local_path or filename)
content = path.read_bytes()
return await upload(transport, filename, content)
) -> str:
firmware_path = local_path or filename
if str(firmware_path).endswith("flasher_args.json"):
filename = "firmware.bin"
content = resolveIdfFirmware(str(firmware_path))
else:
content = Path(firmware_path).read_bytes()
await upload(transport, filename, content)
return filename


async def upload(transport: Transport, name: str, content: bytes) -> ResponseMessage:
Expand Down
81 changes: 81 additions & 0 deletions src/wokwi_client/idf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import json
import os
from typing import TypedDict

MAX_FIRMWARE_SIZE = 4 * 1024 * 1024 # 4MB


class FirmwarePart(TypedDict):
offset: int
data: bytes


def resolveIdfFirmware(flasher_args_path: str) -> bytes:
"""
Resolve ESP32 firmware from flasher_args.json file.
Implemented based on the logic from the wokwi-cli.
- https://github.com/wokwi/wokwi-cli/blob/1726692465f458420f71bc4dbd100aeedf2e37bb/src/uploadFirmware.ts

More about flasher_args.json:
- https://docs.espressif.com/projects/esp-idf/en/release-v5.5/esp32/api-guides/build-system.html

Args:
flasher_args_path: Path to the flasher_args.json file

Returns:
Combined firmware binary data as bytes

Raises:
ValueError: If flasher_args.json is invalid or files are missing
FileNotFoundError: If required firmware files are not found
"""
try:
with open(flasher_args_path) as f:
flasher_args = json.load(f)
except (json.JSONDecodeError, FileNotFoundError) as e:
raise ValueError(f"Failed to read flasher_args.json: {e}")

if "flash_files" not in flasher_args:
raise ValueError("flash_files is not defined in flasher_args.json")

firmware_parts: list[FirmwarePart] = []
firmware_size = 0
flasher_dir = os.path.dirname(flasher_args_path)

# Process each flash file entry
for offset_str, file_path in flasher_args["flash_files"].items():
try:
offset = int(offset_str, 16)
except ValueError:
raise ValueError(f"Invalid offset in flasher_args.json flash_files: {offset_str}")

full_file_path = os.path.join(flasher_dir, file_path)

try:
with open(full_file_path, "rb") as f:
data = f.read()
except FileNotFoundError:
raise FileNotFoundError(f"Firmware file not found: {full_file_path}")

firmware_parts.append({"offset": offset, "data": data})
firmware_size = max(firmware_size, offset + len(data))

if firmware_size > MAX_FIRMWARE_SIZE:
raise ValueError(
f"Firmware size ({firmware_size} bytes) exceeds the maximum supported size ({MAX_FIRMWARE_SIZE} bytes)"
)

# Create combined firmware binary
firmware_data = bytearray(firmware_size)

# Fill with 0xFF (typical flash erased state)
for i in range(firmware_size):
firmware_data[i] = 0xFF

# Write each firmware part to the correct offset
for part in firmware_parts:
offset = part["offset"]
data = part["data"]
firmware_data[offset : offset + len(data)] = data

return bytes(firmware_data)
7 changes: 7 additions & 0 deletions tests/test_hello_esp32.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,10 @@ def test_hello_esp32_sync_example() -> None:
result = run_example_module("examples.hello_esp32_sync.main")
assert result.returncode == 0
assert "main_task: Calling app_main()" in result.stdout


def test_hello_esp32_idf_example() -> None:
"""Sync hello_esp32 example should run and exit with 0."""
result = run_example_module("examples.hello_esp32_idf.main")
assert result.returncode == 0
assert "cpu_start: Starting scheduler on APP CPU" in result.stdout