diff --git a/src/aleph/vm/conf.py b/src/aleph/vm/conf.py index 739cfda9..18add717 100644 --- a/src/aleph/vm/conf.py +++ b/src/aleph/vm/conf.py @@ -133,8 +133,10 @@ class Settings(BaseSettings): CONNECTIVITY_DNS_HOSTNAME = "example.org" USE_JAILER = True - # System logs make boot ~2x slower - PRINT_SYSTEM_LOGS = False + # Changelog: PRINT_SYSTEM_LOGS use to print the MicroVM logs with the supervisor output. + # They are now in separate journald entries, disabling the settings disable the logs output of Firecracker VM (only) + # via the serial console. This break the logs endpoint for program, as such disabling it in prod is not recommended. + PRINT_SYSTEM_LOGS = True IGNORE_TRACEBACK_FROM_DIAGNOSTICS = True LOG_LEVEL = "WARNING" DEBUG_ASYNCIO = False diff --git a/src/aleph/vm/orchestrator/metrics.py b/src/aleph/vm/orchestrator/metrics.py index 67222521..f7f16648 100644 --- a/src/aleph/vm/orchestrator/metrics.py +++ b/src/aleph/vm/orchestrator/metrics.py @@ -115,3 +115,12 @@ async def get_execution_records() -> Iterable[ExecutionRecord]: executions = result.scalars().all() await session.commit() return executions + + +async def get_last_record_for_vm(vm_hash) -> ExecutionRecord | None: + """Get the execution records from the database.""" + async with AsyncSessionMaker() as session: # Use AsyncSession in a context manager + result = await session.execute( + select(ExecutionRecord).where(ExecutionRecord.vm_hash == vm_hash).limit(1) + ) # Use execute for querying + return result.scalar() diff --git a/src/aleph/vm/orchestrator/supervisor.py b/src/aleph/vm/orchestrator/supervisor.py index 11870637..a5ca999a 100644 --- a/src/aleph/vm/orchestrator/supervisor.py +++ b/src/aleph/vm/orchestrator/supervisor.py @@ -51,7 +51,7 @@ operate_confidential_measurement, operate_erase, operate_expire, - operate_logs, + operate_logs_json, operate_reboot, operate_stop, stream_logs, @@ -104,7 +104,7 @@ def setup_webapp(): # /control APIs are used to control the VMs and access their logs web.post("/control/allocation/notify", notify_allocation), web.get("/control/machine/{ref}/stream_logs", stream_logs), - web.get("/control/machine/{ref}/logs", operate_logs), + web.get("/control/machine/{ref}/logs", operate_logs_json), web.post("/control/machine/{ref}/expire", operate_expire), web.post("/control/machine/{ref}/stop", operate_stop), web.post("/control/machine/{ref}/erase", operate_erase), diff --git a/src/aleph/vm/orchestrator/tasks.py b/src/aleph/vm/orchestrator/tasks.py index c7062d93..921a2265 100644 --- a/src/aleph/vm/orchestrator/tasks.py +++ b/src/aleph/vm/orchestrator/tasks.py @@ -11,7 +11,6 @@ from aiohttp import web from aleph_message.models import ( AlephMessage, - ItemHash, PaymentType, ProgramMessage, parse_message, @@ -23,7 +22,7 @@ from aleph.vm.pool import VmPool from aleph.vm.utils import create_task_log_exceptions -from .messages import get_message_status, load_updated_message +from .messages import get_message_status from .payment import ( compute_required_balance, compute_required_flow, diff --git a/src/aleph/vm/orchestrator/views/operator.py b/src/aleph/vm/orchestrator/views/operator.py index 72218f3e..7985687f 100644 --- a/src/aleph/vm/orchestrator/views/operator.py +++ b/src/aleph/vm/orchestrator/views/operator.py @@ -15,6 +15,7 @@ from aleph.vm.conf import settings from aleph.vm.controllers.qemu.client import QemuVmClient from aleph.vm.models import VmExecution +from aleph.vm.orchestrator import metrics from aleph.vm.orchestrator.custom_logs import set_vm_for_logging from aleph.vm.orchestrator.run import create_vm_execution_or_raise_http_error from aleph.vm.orchestrator.views.authentication import ( @@ -22,7 +23,12 @@ require_jwk_authentication, ) from aleph.vm.pool import VmPool -from aleph.vm.utils import cors_allow_all, dumps_for_json +from aleph.vm.utils import ( + cors_allow_all, + dumps_for_json, + get_message_executable_content, +) +from aleph.vm.utils.logs import get_past_vm_logs logger = logging.getLogger(__name__) @@ -99,22 +105,48 @@ async def stream_logs(request: web.Request) -> web.StreamResponse: @cors_allow_all @require_jwk_authentication -async def operate_logs(request: web.Request, authenticated_sender: str) -> web.StreamResponse: - """Logs of a VM (not streaming)""" +async def operate_logs_json(request: web.Request, authenticated_sender: str) -> web.StreamResponse: + """Logs of a VM (not streaming) as json""" vm_hash = get_itemhash_or_400(request.match_info) with set_vm_for_logging(vm_hash=vm_hash): + # This endpoint allow logs for past executions, so we look into the database if any execution by that hash + # occurred, which we can then use to look for rights. We still check in the pool first, it is faster pool: VmPool = request.app["vm_pool"] - execution = get_execution_or_404(vm_hash, pool=pool) - if not is_sender_authorized(authenticated_sender, execution.message): + execution = pool.executions.get(vm_hash) + if execution: + message = execution.message + else: + record = await metrics.get_last_record_for_vm(vm_hash=vm_hash) + if not record: + raise aiohttp.web_exceptions.HTTPNotFound(body="No execution found for this VM") + message = get_message_executable_content(json.loads(record.message)) + if not is_sender_authorized(authenticated_sender, message): return web.Response(status=403, body="Unauthorized sender") + _journal_stdout_name = f"vm-{vm_hash}-stdout" + _journal_stderr_name = f"vm-{vm_hash}-stderr" + response = web.StreamResponse() - response.headers["Content-Type"] = "text/plain" + response.headers["Transfer-encoding"] = "chunked" + response.headers["Content-Type"] = "application/json" await response.prepare(request) + await response.write(b"[") + + first = True + for entry in get_past_vm_logs(_journal_stdout_name, _journal_stderr_name): + if not first: + await response.write(b",\n") + first = False + log_type = "stdout" if entry["SYSLOG_IDENTIFIER"] == _journal_stdout_name else "stderr" + msg = { + "SYSLOG_IDENTIFIER": entry["SYSLOG_IDENTIFIER"], + "MESSAGE": entry["MESSAGE"], + "file": log_type, + "__REALTIME_TIMESTAMP": entry["__REALTIME_TIMESTAMP"], + } + await response.write(dumps_for_json(msg).encode()) + await response.write(b"]") - for entry in execution.vm.past_logs(): - msg = f'{entry["__REALTIME_TIMESTAMP"].isoformat()}> {entry["MESSAGE"]}' - await response.write(msg.encode()) await response.write_eof() return response diff --git a/tests/supervisor/views/test_operator.py b/tests/supervisor/views/test_operator.py index 86c6c5cd..d5a643fe 100644 --- a/tests/supervisor/views/test_operator.py +++ b/tests/supervisor/views/test_operator.py @@ -9,9 +9,10 @@ import aiohttp import pytest from aiohttp.test_utils import TestClient -from aleph_message.models import ItemHash +from aleph_message.models import ItemHash, ProgramMessage from aleph.vm.conf import settings +from aleph.vm.orchestrator.metrics import ExecutionRecord from aleph.vm.orchestrator.supervisor import setup_webapp from aleph.vm.pool import VmPool from aleph.vm.storage import get_message @@ -303,103 +304,6 @@ class FakeVmPool: assert pool.systemd_manager.restart.call_count == 1 -@pytest.mark.asyncio -async def test_logs(aiohttp_client, mocker): - mock_address = "mock_address" - mock_hash = "fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_" - mocker.patch( - "aleph.vm.orchestrator.views.authentication.authenticate_jwk", - return_value=mock_address, - ) - - # noinspection PyMissingConstructor - class FakeVmPool(VmPool): - def __init__(self): - pass - - executions = { - mock_hash: mocker.Mock( - vm_hash=mock_hash, - message=mocker.Mock(address=mock_address), - is_confidential=False, - is_running=True, - vm=mocker.Mock( - past_logs=mocker.Mock( - return_value=[ - EntryDict( - SYSLOG_IDENTIFIER="stdout", - MESSAGE="logline1", - __REALTIME_TIMESTAMP=datetime.datetime(2020, 10, 12, 1, 2), - ), - EntryDict( - SYSLOG_IDENTIFIER="stdout", - MESSAGE="logline2", - __REALTIME_TIMESTAMP=datetime.datetime(2020, 10, 12, 1, 3), - ), - ] - ) - ), - ), - } - systemd_manager = mocker.Mock(restart=mocker.Mock()) - - app = setup_webapp() - pool = FakeVmPool() - app["vm_pool"] = pool - app["pubsub"] = FakeVmPool() - client = await aiohttp_client(app) - response = await client.get( - f"/control/machine/{mock_hash}/logs", - ) - assert response.status == 200 - assert await response.text() == "2020-10-12T01:02:00> logline12020-10-12T01:03:00> logline2" - - -@pytest.mark.asyncio -async def test_websocket_logs(aiohttp_client, mocker): - mock_address = "mock_address" - mock_hash = "fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_" - mocker.patch( - "aleph.vm.orchestrator.views.operator.authenticate_websocket_message", - return_value=mock_address, - ) - fake_queue: Queue[tuple[str, str]] = asyncio.Queue() - await fake_queue.put(("stdout", "this is a first log entry")) - - fakeVmPool = mocker.Mock( - executions={ - mock_hash: mocker.Mock( - vm_hash=mock_hash, - message=mocker.Mock(address=mock_address), - is_confidential=False, - is_running=True, - vm=mocker.Mock( - get_log_queue=mocker.Mock(return_value=fake_queue), - ), - ), - }, - ) - app = setup_webapp() - app["vm_pool"] = fakeVmPool - app["pubsub"] = None - client = await aiohttp_client(app) - websocket = await client.ws_connect( - f"/control/machine/{mock_hash}/stream_logs", - ) - await websocket.send_json({"auth": "auth is disabled"}) - response = await websocket.receive_json() - assert response == {"status": "connected"} - - response = await websocket.receive_json() - assert response == {"message": "this is a first log entry", "type": "stdout"} - - await fake_queue.put(("stdout", "this is a second log entry")) - response = await websocket.receive_json() - assert response == {"message": "this is a second log entry", "type": "stdout"} - await websocket.close() - assert websocket.closed - - @pytest.mark.asyncio async def test_websocket_logs_missing_auth(aiohttp_client, mocker): mock_address = "mock_address" @@ -529,3 +433,110 @@ async def test_websocket_logs_good_auth(aiohttp_client, mocker, patch_datetime_n await websocket.close() assert websocket.closed + + +@pytest.mark.asyncio +async def test_get_past_logs(aiohttp_client, mocker, patch_datetime_now): + mock_address = "0x40684b43B88356F62DCc56017547B6A7AC68780B" + mock_hash = "fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_" + mocker.patch( + "aleph.vm.orchestrator.views.authentication.authenticate_jwk", + return_value=mock_address, + ) + mocker.patch( + "aleph.vm.orchestrator.metrics.get_last_record_for_vm", + return_value=ExecutionRecord( + message="""{ + "address": "0x40684b43B88356F62DCc56017547B6A7AC68780B", + "time": 1720816744.639107, + "allow_amend": false, + "metadata": null, + "authorized_keys": null, + "variables": null, + "environment": { + "reproducible": false, + "internet": true, + "aleph_api": true, + "shared_cache": false + }, + "resources": { + "vcpus": 1, + "memory": 1024, + "seconds": 300, + "published_ports": null + }, + "payment": null, + "requirements": null, + "volumes": [ + { + "comment": null, + "mount": "/opt/packages", + "ref": "7338478721e2e966da6395dbfa37dab7b017b48da55b1be22d4eccf3487b836c", + "use_latest": true + } + ], + "replaces": null, + "type": "vm-function", + "code": { + "encoding": "squashfs", + "entrypoint": "main:app", + "ref": "c4253bf514d2e0a271456c9023c4b3f13f324e53c176e9ec29b98b5972b02bc7", + "interface": null, + "args": null, + "use_latest": true + }, + "runtime": { + "ref": "63f07193e6ee9d207b7d1fcf8286f9aee34e6f12f101d2ec77c1229f92964696", + "use_latest": true, + "comment": "" + }, + "data": null, + "export": null, + "on": { + "http": true, + "message": null, + "persistent": false + } +}""" + ), + ) + mocker.patch( + "aleph.vm.orchestrator.views.operator.get_past_vm_logs", + return_value=[ + EntryDict( + SYSLOG_IDENTIFIER=f"vm-{mock_hash}-stdout", + MESSAGE="logline1", + __REALTIME_TIMESTAMP=datetime.datetime(2020, 10, 12, 1, 2), + ), + EntryDict( + SYSLOG_IDENTIFIER=f"vm-{mock_hash}-stderr", + MESSAGE="logline2", + __REALTIME_TIMESTAMP=datetime.datetime(2020, 10, 12, 1, 3), + ), + ], + ) + + app = setup_webapp() + pool = mocker.MagicMock(executions={}) + app["vm_pool"] = pool + app["pubsub"] = mocker.MagicMock() + client = await aiohttp_client(app) + response = await client.get( + f"/control/machine/{mock_hash}/logs", + ) + + assert response.status == 200 + assert await response.json() == [ + { + "MESSAGE": "logline1", + "SYSLOG_IDENTIFIER": "vm-fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_-stdout", + "__REALTIME_TIMESTAMP": "2020-10-12 01:02:00", + "file": "stdout", + }, + { + "MESSAGE": "logline2", + "SYSLOG_IDENTIFIER": "vm-fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_-stderr", + "__REALTIME_TIMESTAMP": "2020-10-12 01:03:00", + "file": "stderr", + }, + ]