From fe9235ac658915eea20d5371ae45cedabe1f7b17 Mon Sep 17 00:00:00 2001 From: Hugo Herter Date: Fri, 26 Apr 2024 13:00:35 +0200 Subject: [PATCH] Fix: Diagnostic API was not updated We published multiple changes to the diagnostic VM recently but none of these was released. This provides a new diagnostic VM, based on a new runtime [1], with fixes: - Reading messages with the newer SDK - Better handling of IPv6 detection errors - Two different tests for signing messages (local and remote) - aleph-message version was not specified - fetching a single message was not tested --- .github/workflows/test-on-droplets-matrix.yml | 5 +- examples/example_fastapi/README.md | 6 + examples/example_fastapi/main.py | 187 ++++++++++++++---- .../create_disk_image.sh | 2 +- src/aleph/vm/conf.py | 2 +- src/aleph/vm/orchestrator/run.py | 1 + src/aleph/vm/orchestrator/status.py | 58 +++++- src/aleph/vm/orchestrator/views/__init__.py | 4 + 8 files changed, 224 insertions(+), 41 deletions(-) create mode 100644 examples/example_fastapi/README.md diff --git a/.github/workflows/test-on-droplets-matrix.yml b/.github/workflows/test-on-droplets-matrix.yml index c9563ab82..c67c1688f 100644 --- a/.github/workflows/test-on-droplets-matrix.yml +++ b/.github/workflows/test-on-droplets-matrix.yml @@ -134,8 +134,11 @@ jobs: - alias: "runtime-6770" # Old runtime, using Debian 11 item_hash: "67705389842a0a1b95eaa408b009741027964edc805997475e95c505d642edd8" query_params: "?retro-compatibility=true" - - alias: "runtime-3fc0" # New runtime, using Debian 12 + - alias: "runtime-3fc0" # Newer runtime, using Debian 12 but now old SDK item_hash: "3fc0aa9569da840c43e7bd2033c3c580abb46b007527d6d20f2d4e98e867f7af" + query_params: "?retro-compatibility=true" + - alias: "runtime-63fa" # Latest runtime, using Debian 12 and SDK 0.9.0 + item_hash: "63faf8b5db1cf8d965e6a464a0cb8062af8e7df131729e48738342d956f29ace" query_params: "" steps: diff --git a/examples/example_fastapi/README.md b/examples/example_fastapi/README.md new file mode 100644 index 000000000..231ce255b --- /dev/null +++ b/examples/example_fastapi/README.md @@ -0,0 +1,6 @@ +Publish using: + +```shell + aleph program upload ../aleph-vm/examples/example_fastapi main:app \ + --persistent-volume "persistence=host,size_mib=1,mount=/var/lib/example,name=increment-storage,comment=Persistence" +``` diff --git a/examples/example_fastapi/main.py b/examples/example_fastapi/main.py index ebe1a8bd0..81055c723 100644 --- a/examples/example_fastapi/main.py +++ b/examples/example_fastapi/main.py @@ -5,12 +5,19 @@ import socket import subprocess import sys -from datetime import datetime +from datetime import datetime, timezone from os import listdir from pathlib import Path -from typing import List, Optional +from typing import Any, Optional import aiohttp +from aleph_message.models import ( + MessagesResponse, + PostMessage, + ProgramMessage, + StoreMessage, +) +from aleph_message.status import MessageStatus from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import PlainTextResponse @@ -18,8 +25,10 @@ from pydantic import BaseModel, HttpUrl from starlette.responses import JSONResponse +from aleph.sdk.chains.ethereum import get_fallback_account from aleph.sdk.chains.remote import RemoteAccount -from aleph.sdk.client import AlephClient, AuthenticatedAlephClient +from aleph.sdk.client import AlephHttpClient, AuthenticatedAlephHttpClient +from aleph.sdk.query.filters import MessageFilter from aleph.sdk.types import StorageEnum from aleph.sdk.vm.app import AlephApp from aleph.sdk.vm.cache import VmCache @@ -42,13 +51,13 @@ @app.on_event("startup") -async def startup_event(): +async def startup_event() -> None: global startup_lifespan_executed startup_lifespan_executed = True @app.get("/") -async def index(): +async def index() -> dict[str, Any]: if os.path.exists("/opt/venv"): opt_venv = list(listdir("/opt/venv")) else: @@ -56,16 +65,33 @@ async def index(): return { "Example": "example_fastapi", "endpoints": [ + # Features + "/lifespan", "/environ", - "/messages", + "/state/increment", + "/wait-for/{delay}", + # Local cache + "/cache/get/{key}", + "/cache/set/{key}/{value}", + "/cache/remove/{key}", + "/cache/keys", + # Networking "/dns", - "ip/address", + "/ip/address", "/ip/4", "/ip/6", "/internet", + # Error handling + "/raise", + "/crash", + # Aleph.im + "/messages", + "/get_a_message", "/post_a_message", - "/state/increment", - "/wait-for/{delay}", + "/post_a_message_local_account", + "/post_a_file", + "/sign_a_message", + # Platform properties "/platform/os", "/platform/python", "/platform/pip-freeze", @@ -91,10 +117,11 @@ async def environ() -> dict[str, str]: @app.get("/messages") -async def read_aleph_messages(): +async def read_aleph_messages() -> dict[str, MessagesResponse]: """Read data from Aleph using the Aleph Client library.""" - async with AlephClient() as client: - data = await client.get_messages(hashes=["f246f873c3e0f637a15c566e7a465d2ecbb83eaa024d54ccb8fb566b549a929e"]) + async with AlephHttpClient() as client: + message_filter = MessageFilter(hashes=["f246f873c3e0f637a15c566e7a465d2ecbb83eaa024d54ccb8fb566b549a929e"]) + data = await client.get_messages(message_filter=message_filter) return {"Messages": data} @@ -163,9 +190,13 @@ async def connect_ipv6(): if resp.status != 404: resp.raise_for_status() return {"result": True, "headers": resp.headers} - except aiohttp.ClientTimeout: - logger.warning(f"Session connection for host {ipv6_host} failed") - return {"result": False, "headers": resp.headers} + except TimeoutError: + logger.warning(f"Session connection to host {ipv6_host} timed out") + return {"result": False, "reason": "Timeout"} + except aiohttp.ClientConnectionError as error: + logger.warning(f"Client connection to host {ipv6_host} failed: {error}") + # Get a string that describes the error + return {"result": False, "reason": str(error.args[0])} async def check_url(internet_host: HttpUrl, timeout_seconds: int = 5): @@ -184,7 +215,7 @@ async def check_url(internet_host: HttpUrl, timeout_seconds: int = 5): @app.get("/internet") async def read_internet(): """Check Internet connectivity of the system, requiring IP connectivity, domain resolution and HTTPS/TLS.""" - internet_hosts: List[HttpUrl] = [ + internet_hosts: list[HttpUrl] = [ HttpUrl(url="https://aleph.im/", scheme="https"), HttpUrl(url="https://ethereum.org", scheme="https"), HttpUrl(url="https://ipfs.io/", scheme="https"), @@ -192,7 +223,7 @@ async def read_internet(): timeout_seconds = 5 # Create a list of tasks to check the URLs in parallel - tasks: set[asyncio.Task] = set(asyncio.create_task(check_url(host, timeout_seconds)) for host in internet_hosts) + tasks: set[asyncio.Task] = {asyncio.create_task(check_url(host, timeout_seconds)) for host in internet_hosts} # While no tasks have completed, keep waiting for the next one to finish while tasks: @@ -211,34 +242,121 @@ async def read_internet(): return {"result": False} -@app.get("/post_a_message") -async def post_a_message(): - """Post a message on the Aleph network""" +@app.get("/get_a_message") +async def get_a_message(): + """Get a message from the Aleph.im network""" + item_hash = "3fc0aa9569da840c43e7bd2033c3c580abb46b007527d6d20f2d4e98e867f7af" + async with AlephHttpClient() as client: + message = await client.get_message( + item_hash=item_hash, + message_type=ProgramMessage, + ) + return message.dict() - account = await RemoteAccount.from_crypto_host(host="http://localhost", unix_socket="/tmp/socat-socket") + +@app.post("/post_a_message") +async def post_with_remote_account(): + """Post a message on the Aleph.im network using the remote account of the host.""" + try: + account = await RemoteAccount.from_crypto_host(host="http://localhost", unix_socket="/tmp/socat-socket") + + content = { + "date": datetime.now(tz=timezone.utc).isoformat(), + "test": True, + "answer": 42, + "something": "interesting", + } + async with AuthenticatedAlephHttpClient( + account=account, + ) as client: + message: PostMessage + status: MessageStatus + message, status = await client.create_post( + post_content=content, + post_type="test", + ref=None, + channel="TEST", + inline=True, + storage_engine=StorageEnum.storage, + sync=True, + ) + if status != MessageStatus.PROCESSED: + return JSONResponse(status_code=500, content={"error": status}) + return { + "message": message, + } + except aiohttp.client_exceptions.UnixClientConnectorError: + return JSONResponse(status_code=500, content={"error": "Could not connect to the remote account"}) + + +@app.post("/post_a_message_local_account") +async def post_with_local_account(): + """Post a message on the Aleph.im network using a local private key.""" + + account = get_fallback_account() content = { - "date": datetime.utcnow().isoformat(), + "date": datetime.now(tz=timezone.utc).isoformat(), "test": True, "answer": 42, "something": "interesting", } - async with AuthenticatedAlephClient( + async with AuthenticatedAlephHttpClient( account=account, + api_server="https://api2.aleph.im", + allow_unix_sockets=False, ) as client: - response = await client.create_post( + message: PostMessage + status: MessageStatus + message, status = await client.create_post( post_content=content, post_type="test", ref=None, channel="TEST", inline=True, storage_engine=StorageEnum.storage, + sync=True, + ) + if status != MessageStatus.PROCESSED: + return JSONResponse(status_code=500, content={"error": status}) + return { + "message": message, + } + + +@app.post("/post_a_file") +async def post_a_file(): + account = get_fallback_account() + file_path = Path(__file__).absolute() + async with AuthenticatedAlephHttpClient( + account=account, + ) as client: + message: StoreMessage + status: MessageStatus + message, status = await client.create_store( + file_path=file_path, + ref=None, + channel="TEST", + storage_engine=StorageEnum.storage, + sync=True, ) + if status != MessageStatus.PROCESSED: + return JSONResponse(status_code=500, content={"error": status}) return { - "response": response, + "message": message, } +@app.get("/sign_a_message") +async def sign_a_message(): + """Sign a message using a locally managed account within the virtual machine.""" + # FIXME: Broken, fixing this depends on https://github.com/aleph-im/aleph-sdk-python/pull/120 + account = get_fallback_account() + message = {"hello": "world", "chain": "ETH"} + signed_message = await account.sign_message(message) + return {"message": signed_message} + + @app.get("/cache/get/{key}") async def get_from_cache(key: str): """Get data in the VM cache""" @@ -265,7 +383,7 @@ async def keys_from_cache(pattern: str = "*"): @app.get("/state/increment") -async def increment(): +async def increment() -> dict[str, int]: path = "/var/lib/example/storage.json" try: with open(path) as fd: @@ -284,7 +402,7 @@ class Data(BaseModel): @app.post("/post") -async def receive_post(data: Data): +async def receive_post(data: Data) -> str: return str(data) @@ -293,13 +411,14 @@ class CustomError(Exception): @app.get("/raise") -def raise_error(): +def raise_error() -> None: """Raises an error to check that the init handles it properly without crashing""" - raise CustomError("Whoops") + error_message = "Whoops" + raise CustomError(error_message) @app.get("/crash") -def crash(): +def crash() -> None: """Crash the entire VM in order to check that the supervisor can handle it""" sys.exit(1) @@ -313,22 +432,22 @@ def crash(): @app.get("/platform/os") -def platform_os(): +def platform_os() -> PlainTextResponse: return PlainTextResponse(content=Path("/etc/os-release").read_text()) @app.get("/platform/python") -def platform_python(): +def platform_python() -> PlainTextResponse: return PlainTextResponse(content=sys.version) @app.get("/platform/pip-freeze") -def platform_pip_freeze(): +def platform_pip_freeze() -> list[str]: return list(freeze()) @app.event(filters=filters) -async def aleph_event(event): +async def aleph_event(event) -> dict[str, str]: print("aleph_event", event) async with aiohttp.ClientSession(connector=aiohttp.TCPConnector()) as session: async with session.get("https://official.aleph.cloud/api/v0/info/public.json") as resp: diff --git a/runtimes/aleph-debian-12-python/create_disk_image.sh b/runtimes/aleph-debian-12-python/create_disk_image.sh index 6a0c2265a..78c96b897 100755 --- a/runtimes/aleph-debian-12-python/create_disk_image.sh +++ b/runtimes/aleph-debian-12-python/create_disk_image.sh @@ -36,7 +36,7 @@ locale-gen en_US.UTF-8 echo "Pip installing aleph-sdk-python" mkdir -p /opt/aleph/libs -pip3 install --target /opt/aleph/libs 'aleph-sdk-python==0.9.0' 'fastapi~=0.109.2' +pip3 install --target /opt/aleph/libs 'aleph-sdk-python==0.9.0' 'aleph-message==0.4.4' 'fastapi~=0.109.2' # Compile Python code to bytecode for faster execution # -o2 is needed to compile with optimization level 2 which is what we launch init1.py (`python -OO`) diff --git a/src/aleph/vm/conf.py b/src/aleph/vm/conf.py index 29a5317f3..e84c58c31 100644 --- a/src/aleph/vm/conf.py +++ b/src/aleph/vm/conf.py @@ -289,7 +289,7 @@ class Settings(BaseSettings): ) FAKE_INSTANCE_MESSAGE = Path(abspath(join(__file__, "../../../../examples/instance_message_from_aleph.json"))) - CHECK_FASTAPI_VM_ID = "3fc0aa9569da840c43e7bd2033c3c580abb46b007527d6d20f2d4e98e867f7af" + CHECK_FASTAPI_VM_ID = "63faf8b5db1cf8d965e6a464a0cb8062af8e7df131729e48738342d956f29ace" LEGACY_CHECK_FASTAPI_VM_ID = "67705389842a0a1b95eaa408b009741027964edc805997475e95c505d642edd8" # Developer options diff --git a/src/aleph/vm/orchestrator/run.py b/src/aleph/vm/orchestrator/run.py index 6e429ff87..8dec7e963 100644 --- a/src/aleph/vm/orchestrator/run.py +++ b/src/aleph/vm/orchestrator/run.py @@ -44,6 +44,7 @@ async def build_asgi_scope(path: str, request: web.Request) -> dict[str, Any]: async def build_event_scope(event) -> dict[str, Any]: + """Build an ASGI scope for an event.""" return { "type": "aleph.message", "body": event, diff --git a/src/aleph/vm/orchestrator/status.py b/src/aleph/vm/orchestrator/status.py index 8c9c8064a..b0d76554d 100644 --- a/src/aleph/vm/orchestrator/status.py +++ b/src/aleph/vm/orchestrator/status.py @@ -15,19 +15,31 @@ logger = logging.getLogger(__name__) -def make_check_vm_url(vm_id: ItemHash) -> str: +def assemble_vm_url(vm_id: ItemHash) -> str: + """Assemble the URL for a VM based on the host and port that the orchestrator is running on and the VM ID.""" return f"http://{settings.SUPERVISOR_HOST}:{settings.SUPERVISOR_PORT}/vm/{vm_id}" async def get_json_from_vm(session: ClientSession, vm_id: ItemHash, suffix: str) -> Any: - vm_url = make_check_vm_url(vm_id) + """Get JSON from a VM running locally.""" + vm_url = assemble_vm_url(vm_id) url = f"{vm_url}{suffix}" async with session.get(url) as resp: resp.raise_for_status() return await resp.json() +async def post_to_vm(session: ClientSession, vm_id: ItemHash, suffix: str, data: Any = None) -> Any: + """Post data to a VM running locally.""" + vm_url = assemble_vm_url(vm_id) + url = f"{vm_url}{suffix}" + async with session.post(url, json=data) as resp: + resp.raise_for_status() + return await resp.json() + + async def check_index(session: ClientSession, vm_id: ItemHash) -> bool: + """Check that the index page of the VM is working.""" try: result: dict = await get_json_from_vm(session, vm_id, "/") assert result["Example"] == "example_fastapi" @@ -37,6 +49,7 @@ async def check_index(session: ClientSession, vm_id: ItemHash) -> bool: async def check_lifespan(session: ClientSession, vm_id: ItemHash) -> bool: + """Check that the lifespan endpoint of the VM is working.""" try: result: dict = await get_json_from_vm(session, vm_id, "/lifespan") return result["Lifespan"] is True @@ -45,6 +58,7 @@ async def check_lifespan(session: ClientSession, vm_id: ItemHash) -> bool: async def check_environ(session: ClientSession, vm_id: ItemHash) -> bool: + """Check that the environ endpoint of the VM returns the expected environment variables.""" try: result: dict = await get_json_from_vm(session, vm_id, "/environ") assert "ALEPH_API_HOST" in result @@ -58,6 +72,7 @@ async def check_environ(session: ClientSession, vm_id: ItemHash) -> bool: async def check_messages(session: ClientSession, vm_id: ItemHash) -> bool: + """Check that the messages endpoint of the VM returns a list of messages.""" try: result: dict = await get_json_from_vm(session, vm_id, "/messages") assert "Messages" in result @@ -69,6 +84,7 @@ async def check_messages(session: ClientSession, vm_id: ItemHash) -> bool: async def check_dns(session: ClientSession, vm_id: ItemHash) -> bool: + """Check that the DNS endpoint of the VM returns both IPv4 and IPv6 results.""" try: result: dict = await get_json_from_vm(session, vm_id, "/dns") assert result["ipv4"] @@ -79,6 +95,7 @@ async def check_dns(session: ClientSession, vm_id: ItemHash) -> bool: async def check_ipv4(session: ClientSession, vm_id: ItemHash) -> bool: + """Check that the VM has IPv4 connectivity.""" try: result: dict = await get_json_from_vm(session, vm_id, "/ip/4") assert result["result"] is True @@ -88,6 +105,7 @@ async def check_ipv4(session: ClientSession, vm_id: ItemHash) -> bool: async def check_ipv6(session: ClientSession, vm_id: ItemHash) -> bool: + """Check that the VM has IPv6 connectivity.""" try: result: dict = await get_json_from_vm(session, vm_id, "/ip/6") assert result["result"] is True @@ -98,6 +116,7 @@ async def check_ipv6(session: ClientSession, vm_id: ItemHash) -> bool: async def check_internet(session: ClientSession, vm_id: ItemHash) -> bool: + """Check that the VM has internet connectivity. This requires DNS, IP, HTTP and TLS to work.""" try: result: dict = await get_json_from_vm(session, vm_id, "/internet") assert result["result"] == HTTPOk.status_code @@ -108,6 +127,7 @@ async def check_internet(session: ClientSession, vm_id: ItemHash) -> bool: async def check_cache(session: ClientSession, vm_id: ItemHash) -> bool: + """Check that the VM can set and get a value in its cache.""" try: result1: bool = await get_json_from_vm(session, vm_id, "/cache/set/a/42") assert result1 is True @@ -121,6 +141,7 @@ async def check_cache(session: ClientSession, vm_id: ItemHash) -> bool: async def check_persistent_storage(session: ClientSession, vm_id: ItemHash) -> bool: + """Check that the VM can set and get a value in its persistent storage.""" try: result: dict = await get_json_from_vm(session, vm_id, "/state/increment") counter = result["counter"] @@ -134,7 +155,8 @@ async def check_persistent_storage(session: ClientSession, vm_id: ItemHash) -> b async def check_error_raised(session: ClientSession, vm_id: ItemHash) -> bool: - vm_url = make_check_vm_url(vm_id) + """Check that the VM can raise an error and return a traceback instead of crashing.""" + vm_url = assemble_vm_url(vm_id) try: async with session.get(f"{vm_url}/raise") as resp: text = await resp.text() @@ -144,8 +166,9 @@ async def check_error_raised(session: ClientSession, vm_id: ItemHash) -> bool: async def check_crash_and_restart(session: ClientSession, vm_id: ItemHash) -> bool: + """Check that a crash in the VM would cause it to restart and work as expected.""" # Crash the VM init. - vm_url = make_check_vm_url(vm_id) + vm_url = assemble_vm_url(vm_id) async with session.get(f"{vm_url}/crash") as resp: if resp.status != HTTPBadGateway.status_code: return False @@ -158,3 +181,30 @@ async def check_crash_and_restart(session: ClientSession, vm_id: ItemHash) -> bo except ClientResponseError: return False + + +async def check_get_a_message(session: ClientSession, vm_id: ItemHash) -> bool: + """Check that the VM can get a message from the aleph.im network.""" + try: + result: dict = await get_json_from_vm(session, vm_id, "/get_a_message") + return "item_hash" in result + except ClientResponseError: + return False + + +async def check_post_a_message(session: ClientSession, vm_id: ItemHash) -> bool: + """Check that the VM can post a message to the aleph.im network using a remote key present on the host.""" + try: + result: dict = await post_to_vm(session, vm_id, "/post_a_message") + return "item_hash" in result + except ClientResponseError: + return False + + +async def check_sign_a_message(session: ClientSession, vm_id: ItemHash) -> bool: + """Check that the VM can sign a message using a key local to the VM.""" + try: + result: dict = await post_to_vm(session, vm_id, "/sign_a_message") + return "item_hash" in result + except ClientResponseError: + return False diff --git a/src/aleph/vm/orchestrator/views/__init__.py b/src/aleph/vm/orchestrator/views/__init__.py index 994476cba..177e6a348 100644 --- a/src/aleph/vm/orchestrator/views/__init__.py +++ b/src/aleph/vm/orchestrator/views/__init__.py @@ -199,6 +199,9 @@ async def status_check_fastapi(request: web.Request, vm_id: Optional[ItemHash] = "index": await status.check_index(session, fastapi_vm_id), "environ": await status.check_environ(session, fastapi_vm_id), "messages": await status.check_messages(session, fastapi_vm_id), + # Using the remote account currently causes issues + # "post_a_message": await status.check_post_a_message(session, fastapi_vm_id), + # "sign_a_message": await status.check_sign_a_message(session, fastapi_vm_id), "dns": await status.check_dns(session, fastapi_vm_id), "ipv4": await status.check_ipv4(session, fastapi_vm_id), "internet": await status.check_internet(session, fastapi_vm_id), @@ -209,6 +212,7 @@ async def status_check_fastapi(request: web.Request, vm_id: Optional[ItemHash] = if not retro_compatibility: # These fields were added in the runtime running Debian 12. result = result | { + "get_a_message": await status.check_get_a_message(session, fastapi_vm_id), "lifespan": await status.check_lifespan(session, fastapi_vm_id), # IPv6 requires extra work from node operators and is not required yet. # "ipv6": await status.check_ipv6(session),