diff --git a/src/aleph/vm/conf.py b/src/aleph/vm/conf.py index 18add7170..264be819e 100644 --- a/src/aleph/vm/conf.py +++ b/src/aleph/vm/conf.py @@ -275,6 +275,11 @@ class Settings(BaseSettings): CONFIDENTIAL_SESSION_DIRECTORY: Path = Field(None, description="Default to EXECUTION_ROOT/sessions") + ENABLE_GPU_SUPPORT: bool = Field( + default=False, + description="Enable GPU pass-through support to VMs, only allowed for QEmu hypervisor", + ) + # Tests on programs FAKE_DATA_PROGRAM: Path | None = None @@ -391,6 +396,8 @@ def check(self): # assert check_amd_sev_snp_supported(), "SEV-SNP feature isn't enabled, enable it in BIOS" assert self.ENABLE_QEMU_SUPPORT, "Qemu Support is needed for confidential computing and it's disabled, " "enable it setting the env variable `ENABLE_QEMU_SUPPORT=True` in configuration" + if self.ENABLE_GPU_SUPPORT: + assert self.ENABLE_QEMU_SUPPORT, "Qemu Support is needed for GPU support and it's disabled, " def setup(self): """Setup the environment defined by the settings. Call this method after loading the settings.""" diff --git a/src/aleph/vm/controllers/configuration.py b/src/aleph/vm/controllers/configuration.py index da10d8395..fb4b4ff1f 100644 --- a/src/aleph/vm/controllers/configuration.py +++ b/src/aleph/vm/controllers/configuration.py @@ -23,6 +23,10 @@ class QemuVMHostVolume(BaseModel): read_only: bool +class QemuGPU(BaseModel): + pci_host: str + + class QemuVMConfiguration(BaseModel): qemu_bin_path: str cloud_init_drive_path: str | None @@ -33,6 +37,7 @@ class QemuVMConfiguration(BaseModel): mem_size_mb: int interface_name: str | None host_volumes: list[QemuVMHostVolume] + gpus: list[QemuGPU] class QemuConfidentialVMConfiguration(BaseModel): @@ -45,6 +50,7 @@ class QemuConfidentialVMConfiguration(BaseModel): mem_size_mb: int interface_name: str | None host_volumes: list[QemuVMHostVolume] + gpus: list[QemuGPU] ovmf_path: Path sev_session_file: Path sev_dh_cert_file: Path diff --git a/src/aleph/vm/controllers/qemu/instance.py b/src/aleph/vm/controllers/qemu/instance.py index dd840e22b..259f84744 100644 --- a/src/aleph/vm/controllers/qemu/instance.py +++ b/src/aleph/vm/controllers/qemu/instance.py @@ -5,7 +5,7 @@ from asyncio import Task from asyncio.subprocess import Process from pathlib import Path -from typing import Generic, TypeVar +from typing import Generic, List, TypeVar import psutil from aleph_message.models import ItemHash @@ -17,6 +17,7 @@ from aleph.vm.controllers.configuration import ( Configuration, HypervisorType, + QemuGPU, QemuVMConfiguration, QemuVMHostVolume, save_controller_configuration, @@ -29,6 +30,7 @@ from aleph.vm.controllers.qemu.cloudinit import CloudInitMixin from aleph.vm.network.firewall import teardown_nftables_for_vm from aleph.vm.network.interfaces import TapInterface +from aleph.vm.resources import HostGPU from aleph.vm.storage import get_rootfs_base_path from aleph.vm.utils import HostNotFoundError, ping, run_in_subprocess @@ -36,6 +38,8 @@ class AlephQemuResources(AlephFirecrackerResources): + gpus: List[HostGPU] = [] + async def download_runtime(self) -> None: volume = self.message_content.rootfs parent_image_path = await get_rootfs_base_path(volume.parent.ref) @@ -200,6 +204,7 @@ async def configure(self): ) for volume in self.resources.volumes ], + gpus=[QemuGPU(pci_host=gpu.pci_host) for gpu in self.resources.gpus], ) configuration = Configuration( diff --git a/src/aleph/vm/controllers/qemu_confidential/instance.py b/src/aleph/vm/controllers/qemu_confidential/instance.py index f432cff69..37986b10c 100644 --- a/src/aleph/vm/controllers/qemu_confidential/instance.py +++ b/src/aleph/vm/controllers/qemu_confidential/instance.py @@ -13,6 +13,7 @@ Configuration, HypervisorType, QemuConfidentialVMConfiguration, + QemuGPU, QemuVMHostVolume, save_controller_configuration, ) @@ -126,6 +127,7 @@ async def configure(self): ) for volume in self.resources.volumes ], + gpus=[QemuGPU(pci_host=gpu.pci_host) for gpu in self.resources.gpus], ) configuration = Configuration( diff --git a/src/aleph/vm/hypervisors/qemu/qemuvm.py b/src/aleph/vm/hypervisors/qemu/qemuvm.py index 5bcb1313c..df7559613 100644 --- a/src/aleph/vm/hypervisors/qemu/qemuvm.py +++ b/src/aleph/vm/hypervisors/qemu/qemuvm.py @@ -7,7 +7,7 @@ import qmp from systemd import journal -from aleph.vm.controllers.configuration import QemuVMConfiguration +from aleph.vm.controllers.configuration import QemuGPU, QemuVMConfiguration from aleph.vm.controllers.qemu.instance import logger @@ -28,6 +28,7 @@ class QemuVM: interface_name: str qemu_process: Process | None = None host_volumes: list[HostVolume] + gpus: list[QemuGPU] journal_stdout: TextIO | None journal_stderr: TextIO | None @@ -55,6 +56,7 @@ def __init__(self, vm_hash, config: QemuVMConfiguration): ) for volume in config.host_volumes ] + self.gpus = config.gpus @property def _journal_stdout_name(self) -> str: @@ -113,17 +115,15 @@ async def start( # "-serial", "telnet:localhost:4321,server,nowait", # "-snapshot", # Do not save anything to disk ] - for volume in self.host_volumes: - args += [ - "-drive", - f"file={volume.path_on_host},format=raw,readonly={'on' if volume.read_only else 'off'},media=disk,if=virtio", - ] if self.interface_name: # script=no, downscript=no tell qemu not to try to set up the network itself args += ["-net", "nic,model=virtio", "-net", f"tap,ifname={self.interface_name},script=no,downscript=no"] if self.cloud_init_drive_path: args += ["-cdrom", f"{self.cloud_init_drive_path}"] + + args += self._get_host_volumes_args() + args += self._get_gpu_args() print(*args) self.qemu_process = proc = await asyncio.create_subprocess_exec( @@ -138,6 +138,28 @@ async def start( ) return proc + def _get_host_volumes_args(self): + args = [] + for volume in self.host_volumes: + args += [ + "-drive", + f"file={volume.path_on_host},format=raw,readonly={'on' if volume.read_only else 'off'},media=disk,if=virtio", + ] + return args + + def _get_gpu_args(self): + args = [ + # Use host-phys-bits-limit argument for GPU support. TODO: Investigate how to get the correct bits size + "-cpu", + "host,host-phys-bits-limit=0x28", + ] + for gpu in self.gpus: + args += [ + "-device", + f"vfio-pci,host={gpu.pci_host},multifunction=on,x-vga=on", + ] + return args + def _get_qmpclient(self) -> qmp.QEMUMonitorProtocol | None: if not (self.qmp_socket_path and self.qmp_socket_path.exists()): return None diff --git a/src/aleph/vm/hypervisors/qemu_confidential/qemuvm.py b/src/aleph/vm/hypervisors/qemu_confidential/qemuvm.py index 89e9c3e80..353c3f78d 100644 --- a/src/aleph/vm/hypervisors/qemu_confidential/qemuvm.py +++ b/src/aleph/vm/hypervisors/qemu_confidential/qemuvm.py @@ -110,24 +110,24 @@ async def start( # raise an error and prevent boot. Passing the argument --cpu host instruct the VM to use the same CPU # model than the host thus the VM's kernel knows which method is used to get random numbers (Intel and # AMD have different methods) and properly boot. + # Use host-phys-bits-limit argument for GPU support. TODO: Investigate how to get the correct bits size "-cpu", - "host", + "host,host-phys-bits-limit=0x28", # Uncomment following for debug # "-serial", "telnet:localhost:4321,server,nowait", # "-snapshot", # Do not save anything to disk ] - for volume in self.host_volumes: - args += [ - "-drive", - f"file={volume.path_on_host},format=raw,readonly={'on' if volume.read_only else 'off'},media=disk,if=virtio", - ] if self.interface_name: # script=no, downscript=no tell qemu not to try to set up the network itself args += ["-net", "nic,model=virtio", "-net", f"tap,ifname={self.interface_name},script=no,downscript=no"] if self.cloud_init_drive_path: args += ["-cdrom", f"{self.cloud_init_drive_path}"] + + args += self._get_host_volumes_args() + args += self._get_gpu_args() print(*args) + self.qemu_process = proc = await asyncio.create_subprocess_exec( *args, stdin=asyncio.subprocess.DEVNULL, diff --git a/src/aleph/vm/models.py b/src/aleph/vm/models.py index 9aee9320a..7dd59091b 100644 --- a/src/aleph/vm/models.py +++ b/src/aleph/vm/models.py @@ -1,10 +1,12 @@ import asyncio +import json import logging import uuid from asyncio import Task from collections.abc import Callable, Coroutine from dataclasses import dataclass from datetime import datetime, timezone +from typing import List from aleph_message.models import ( ExecutableContent, @@ -12,14 +14,14 @@ ItemHash, ProgramContent, ) -from aleph_message.models.execution.environment import HypervisorType +from aleph_message.models.execution.environment import GpuProperties, HypervisorType +from pydantic.json import pydantic_encoder from aleph.vm.conf import settings from aleph.vm.controllers.firecracker.executable import AlephFirecrackerExecutable from aleph.vm.controllers.firecracker.instance import AlephInstanceResources from aleph.vm.controllers.firecracker.program import ( AlephFirecrackerProgram, - AlephFirecrackerResources, AlephProgramResources, ) from aleph.vm.controllers.firecracker.snapshot_manager import SnapshotManager @@ -38,6 +40,7 @@ ) from aleph.vm.orchestrator.pubsub import PubSub from aleph.vm.orchestrator.vm import AlephFirecrackerInstance +from aleph.vm.resources import GpuDevice, HostGPU from aleph.vm.systemd import SystemDManager from aleph.vm.utils import create_task_log_exceptions, dumps_for_json @@ -69,8 +72,11 @@ class VmExecution: vm_hash: ItemHash original: ExecutableContent message: ExecutableContent - resources: AlephFirecrackerResources | None = None - vm: AlephFirecrackerExecutable | AlephQemuInstance | None = None + resources: ( + AlephProgramResources | AlephInstanceResources | AlephQemuResources | AlephQemuConfidentialInstance | None + ) = None + vm: AlephFirecrackerExecutable | AlephQemuInstance | AlephQemuConfidentialInstance | None = None + gpus: List[HostGPU] = [] times: VmExecutionTimes @@ -202,6 +208,7 @@ async def prepare(self) -> None: resources = AlephQemuConfidentialResources(self.message, namespace=self.vm_hash) else: resources = AlephQemuResources(self.message, namespace=self.vm_hash) + resources.gpus = self.gpus else: msg = f"Unknown hypervisor type {self.hypervisor}" raise ValueError(msg) @@ -216,6 +223,24 @@ async def prepare(self) -> None: self.times.prepared_at = datetime.now(tz=timezone.utc) self.resources = resources + def prepare_gpus(self, available_gpus: List[GpuDevice]) -> None: + gpus = [] + if self.message.requirements and self.message.requirements.gpu: + for gpu in self.message.requirements.gpu: + gpu = GpuProperties.parse_obj(gpu) + for available_gpu in available_gpus: + if available_gpu.device_id == gpu.device_id: + gpus.append(HostGPU(pci_host=available_gpu.pci_host)) + break + self.gpus = gpus + + def uses_gpu(self, pci_host: str) -> bool: + for gpu in self.gpus: + if gpu.pci_host == pci_host: + return True + + return False + def create( self, vm_id: int, tap_interface: TapInterface | None = None, prepare: bool = True ) -> AlephVmControllerInterface: @@ -437,6 +462,7 @@ async def save(self): message=self.message.json(), original_message=self.original.json(), persistent=self.persistent, + gpus=json.dumps(self.gpus, default=pydantic_encoder), ) ) diff --git a/src/aleph/vm/orchestrator/chain.py b/src/aleph/vm/orchestrator/chain.py index 7321aa458..0b4174397 100644 --- a/src/aleph/vm/orchestrator/chain.py +++ b/src/aleph/vm/orchestrator/chain.py @@ -60,9 +60,13 @@ def check_tokens(cls, values): } +class InvalidChainError(ValueError): + pass + + def get_chain(chain: str) -> ChainInfo: try: return STREAM_CHAINS[chain] except KeyError: msg = f"Unknown chain id for chain {chain}" - raise ValueError(msg) + raise InvalidChainError(msg) diff --git a/src/aleph/vm/orchestrator/cli.py b/src/aleph/vm/orchestrator/cli.py index bbae396d4..740733e61 100644 --- a/src/aleph/vm/orchestrator/cli.py +++ b/src/aleph/vm/orchestrator/cli.py @@ -167,9 +167,6 @@ async def benchmark(runs: int): """Measure program performance by immediately running the supervisor with fake requests. """ - engine = metrics.setup_engine() - await metrics.create_tables(engine) - ref = ItemHash("cafecafecafecafecafecafecafecafecafecafecafecafecafecafecafecafe") settings.FAKE_DATA_PROGRAM = settings.BENCHMARK_FAKE_DATA_PROGRAM @@ -357,6 +354,10 @@ def main(): settings.check() logger.debug("Initialising the DB...") + # Check and create execution database + engine = metrics.setup_engine() + asyncio.run(metrics.create_tables(engine)) + # After creating it run the DB migrations asyncio.run(run_async_db_migrations()) logger.debug("DB up to date.") diff --git a/src/aleph/vm/orchestrator/metrics.py b/src/aleph/vm/orchestrator/metrics.py index f7f166481..6c9b8eea0 100644 --- a/src/aleph/vm/orchestrator/metrics.py +++ b/src/aleph/vm/orchestrator/metrics.py @@ -76,6 +76,8 @@ class ExecutionRecord(Base): original_message = Column(JSON, nullable=True) persistent = Column(Boolean, nullable=True) + gpus = Column(JSON, nullable=True) + def __repr__(self): return f"" diff --git a/src/aleph/vm/orchestrator/migrations/versions/0002_5c6ae643c69b_add_gpu_column_to_executions_table.py b/src/aleph/vm/orchestrator/migrations/versions/0002_5c6ae643c69b_add_gpu_column_to_executions_table.py new file mode 100644 index 000000000..4b739323b --- /dev/null +++ b/src/aleph/vm/orchestrator/migrations/versions/0002_5c6ae643c69b_add_gpu_column_to_executions_table.py @@ -0,0 +1,38 @@ +"""add gpu table + +Revision ID: 5c6ae643c69b +Revises: bbb12a12372e +Create Date: 2024-12-09 19:40:19.279735 + +""" + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +from sqlalchemy import create_engine +from sqlalchemy.engine import reflection + +from aleph.vm.conf import make_db_url + +revision = "5c6ae643c69b" +down_revision = "bbb12a12372e" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + engine = create_engine(make_db_url()) + inspector = reflection.Inspector.from_engine(engine) + + # The table already exists on most CRNs. + tables = inspector.get_table_names() + if "executions" in tables: + columns = inspector.get_columns("executions") + column_names = [c["name"] for c in columns] + if "gpus" not in column_names: + op.add_column("executions", sa.Column("gpus", sa.JSON(), nullable=True)) + + +def downgrade() -> None: + op.drop_column("executions", "gpus") diff --git a/src/aleph/vm/orchestrator/payment.py b/src/aleph/vm/orchestrator/payment.py index 7194f873a..f5a79bbca 100644 --- a/src/aleph/vm/orchestrator/payment.py +++ b/src/aleph/vm/orchestrator/payment.py @@ -13,7 +13,7 @@ from aleph.vm.models import VmExecution from aleph.vm.utils import to_normalized_address -from .chain import ChainInfo, get_chain +from .chain import ChainInfo, InvalidChainError, get_chain logger = logging.getLogger(__name__) @@ -91,10 +91,6 @@ class InvalidAddressError(ValueError): pass -class InvalidChainError(ValueError): - pass - - async def get_stream(sender: str, receiver: str, chain: str) -> Decimal: """ Get the stream of the user from the Superfluid API. diff --git a/src/aleph/vm/orchestrator/resources.py b/src/aleph/vm/orchestrator/resources.py index d4b9c8985..b6d34a9f0 100644 --- a/src/aleph/vm/orchestrator/resources.py +++ b/src/aleph/vm/orchestrator/resources.py @@ -1,6 +1,7 @@ import math from datetime import datetime, timezone from functools import lru_cache +from typing import List, Optional import cpuinfo import psutil @@ -10,6 +11,8 @@ from pydantic import BaseModel, Field from aleph.vm.conf import settings +from aleph.vm.pool import VmPool +from aleph.vm.resources import GpuDevice from aleph.vm.sevclient import SevClient from aleph.vm.utils import ( check_amd_sev_es_supported, @@ -73,15 +76,32 @@ class MachineProperties(BaseModel): cpu: CpuProperties +class GpuProperties(BaseModel): + devices: Optional[List[GpuDevice]] + available_devices: Optional[List[GpuDevice]] + + class MachineUsage(BaseModel): cpu: CpuUsage mem: MemoryUsage disk: DiskUsage period: UsagePeriod properties: MachineProperties + gpu: GpuProperties active: bool = True +def get_machine_gpus(request: web.Request) -> GpuProperties: + pool: VmPool = request.app["vm_pool"] + gpus = pool.gpus + available_gpus = pool.get_available_gpus() + + return GpuProperties( + devices=gpus, + available_devices=available_gpus, + ) + + @lru_cache def get_machine_properties() -> MachineProperties: """Fetch machine properties such as architecture, CPU vendor, ... @@ -90,6 +110,7 @@ def get_machine_properties() -> MachineProperties: In the future, some properties may have to be fetched from within a VM. """ cpu_info = cpuinfo.get_cpu_info() # Slow + return MachineProperties( cpu=CpuProperties( architecture=cpu_info.get("raw_arch_string", cpu_info.get("arch_string_raw")), @@ -109,9 +130,10 @@ def get_machine_properties() -> MachineProperties: @cors_allow_all -async def about_system_usage(_: web.Request): +async def about_system_usage(request: web.Request): """Public endpoint to expose information about the system usage.""" period_start = datetime.now(timezone.utc).replace(second=0, microsecond=0) + machine_properties = get_machine_properties() usage: MachineUsage = MachineUsage( cpu=CpuUsage( @@ -131,7 +153,8 @@ async def about_system_usage(_: web.Request): start_timestamp=period_start, duration_seconds=60, ), - properties=get_machine_properties(), + properties=machine_properties, + gpu=get_machine_gpus(request), ) return web.json_response(text=usage.json(exclude_none=True)) diff --git a/src/aleph/vm/orchestrator/supervisor.py b/src/aleph/vm/orchestrator/supervisor.py index a5ca999a8..ae6436291 100644 --- a/src/aleph/vm/orchestrator/supervisor.py +++ b/src/aleph/vm/orchestrator/supervisor.py @@ -20,7 +20,6 @@ from aleph.vm.sevclient import SevClient from aleph.vm.version import __version__ -from .metrics import create_tables, setup_engine from .resources import about_certificates, about_system_usage from .tasks import ( start_payment_monitoring_task, @@ -151,9 +150,6 @@ def run(): """Run the VM Supervisor.""" settings.check() - engine = setup_engine() - asyncio.run(create_tables(engine)) - loop = asyncio.new_event_loop() pool = VmPool(loop) pool.setup() diff --git a/src/aleph/vm/orchestrator/tasks.py b/src/aleph/vm/orchestrator/tasks.py index bd89a8816..84d9ca498 100644 --- a/src/aleph/vm/orchestrator/tasks.py +++ b/src/aleph/vm/orchestrator/tasks.py @@ -4,6 +4,7 @@ import math import time from collections.abc import AsyncIterable +from decimal import Decimal from typing import TypeVar import aiohttp @@ -192,13 +193,17 @@ async def check_payment(pool: VmPool): await pool.stop_vm(last_execution.vm_hash) required_balance = await compute_required_balance(executions) - # Check if the balance held in the wallet is sufficient stream tier resources - for sender, chains in pool.get_executions_by_sender(payment_type=PaymentType.superfluid).items(): - for chain, executions in chains.items(): - stream = await get_stream(sender=sender, receiver=settings.PAYMENT_RECEIVER_ADDRESS, chain=chain) - logger.debug( - f"Get stream flow from Sender {sender} to Receiver {settings.PAYMENT_RECEIVER_ADDRESS} of {stream}" - ) + # Check if the balance held in the wallet is sufficient stream tier resources + for sender, chains in pool.get_executions_by_sender(payment_type=PaymentType.superfluid).items(): + for chain, executions in chains.items(): + try: + stream = await get_stream(sender=sender, receiver=settings.PAYMENT_RECEIVER_ADDRESS, chain=chain) + logger.debug( + f"Get stream flow from Sender {sender} to Receiver {settings.PAYMENT_RECEIVER_ADDRESS} of {stream}" + ) + except ValueError as error: + logger.error(f"Error found getting stream for chain {chain} and sender {sender}: {error}") + continue required_stream = await compute_required_flow(executions) logger.debug(f"Required stream for Sender {sender} executions: {required_stream}") diff --git a/src/aleph/vm/orchestrator/views/__init__.py b/src/aleph/vm/orchestrator/views/__init__.py index b6a45c0d7..899a038f8 100644 --- a/src/aleph/vm/orchestrator/views/__init__.py +++ b/src/aleph/vm/orchestrator/views/__init__.py @@ -347,6 +347,7 @@ async def status_public_config(request: web.Request): "ENABLE_QEMU_SUPPORT": settings.ENABLE_QEMU_SUPPORT, "INSTANCE_DEFAULT_HYPERVISOR": settings.INSTANCE_DEFAULT_HYPERVISOR, "ENABLE_CONFIDENTIAL_COMPUTING": settings.ENABLE_CONFIDENTIAL_COMPUTING, + "ENABLE_GPU_SUPPORT": settings.ENABLE_GPU_SUPPORT, }, }, dumps=dumps_for_json, @@ -486,10 +487,14 @@ async def notify_allocation(request: web.Request): payment_type = message.content.payment and message.content.payment.type or PaymentType.hold is_confidential = message.content.environment.trusted_execution is not None - - if payment_type == PaymentType.hold and is_confidential: - # At the moment we will allow hold for PAYG - logger.debug("Confidential instance not using PAYG") + have_gpu = message.content.requirements and message.content.requirements.gpu is not None + + if payment_type == PaymentType.hold and (is_confidential or have_gpu): + # Log confidential and instances with GPU support + if is_confidential: + logger.debug(f"Confidential instance {item_hash} not using PAYG") + if have_gpu: + logger.debug(f"GPU Instance {item_hash} not using PAYG") user_balance = await payment.fetch_balance_of_address(message.sender) hold_price = await payment.fetch_execution_hold_price(item_hash) logger.debug(f"Address {message.sender} Balance: {user_balance}, Price: {hold_price}") diff --git a/src/aleph/vm/pool.py b/src/aleph/vm/pool.py index 9a9e69f3a..d377da567 100644 --- a/src/aleph/vm/pool.py +++ b/src/aleph/vm/pool.py @@ -5,6 +5,7 @@ import logging from collections.abc import Iterable from datetime import datetime, timezone +from typing import List from aleph_message.models import ( Chain, @@ -13,11 +14,13 @@ Payment, PaymentType, ) +from pydantic import parse_raw_as from aleph.vm.conf import settings from aleph.vm.controllers.firecracker.snapshot_manager import SnapshotManager from aleph.vm.network.hostnetwork import Network, make_ipv6_allocator from aleph.vm.orchestrator.metrics import get_execution_records +from aleph.vm.resources import GpuDevice, HostGPU, get_gpu_devices from aleph.vm.systemd import SystemDManager from aleph.vm.utils import get_message_executable_content from aleph.vm.vm_type import VmType @@ -41,6 +44,7 @@ class VmPool: snapshot_manager: SnapshotManager | None = None systemd_manager: SystemDManager creation_lock: asyncio.Lock + gpus: List[GpuDevice] = [] def __init__(self, loop: asyncio.AbstractEventLoop): self.executions = {} @@ -78,6 +82,10 @@ def setup(self) -> None: logger.debug("Initializing SnapshotManager ...") self.snapshot_manager.run_in_thread() + if settings.ENABLE_GPU_SUPPORT: + logger.debug("Detecting GPU devices ...") + self.gpus = get_gpu_devices() + def teardown(self) -> None: """Stop the VM pool and the network properly.""" if self.network: @@ -109,7 +117,11 @@ async def create_a_vm( self.executions[vm_hash] = execution try: + # First assign Host GPUs from the available + execution.prepare_gpus(self.get_available_gpus()) + # Prepare VM general Resources and also the GPUs await execution.prepare() + vm_id = self.get_unique_vm_id() if self.network: @@ -233,6 +245,9 @@ async def load_persistent_executions(self): if execution.is_running: # TODO: Improve the way that we re-create running execution + # Load existing GPUs assigned to VMs + execution.gpus = parse_raw_as(List[HostGPU], saved_execution.gpus) + # Load and instantiate the rest of resources and already assigned GPUs await execution.prepare() if self.network: vm_type = VmType.from_message_content(execution.message) @@ -285,6 +300,18 @@ def get_instance_executions(self) -> Iterable[VmExecution]: ) return executions or [] + def get_available_gpus(self) -> List[GpuDevice]: + available_gpus = [] + for gpu in self.gpus: + used = False + for _, execution in self.executions.items(): + if execution.uses_gpu(gpu.pci_host): + used = True + break + if not used: + available_gpus.append(gpu) + return available_gpus + def get_executions_by_sender(self, payment_type: PaymentType) -> dict[str, dict[str, list[VmExecution]]]: """Return all executions of the given type, grouped by sender and by chain.""" executions_by_sender: dict[str, dict[str, list[VmExecution]]] = {} diff --git a/src/aleph/vm/resources.py b/src/aleph/vm/resources.py new file mode 100644 index 000000000..767b64906 --- /dev/null +++ b/src/aleph/vm/resources.py @@ -0,0 +1,111 @@ +import subprocess +from enum import Enum +from typing import List, Optional + +from aleph_message.models import HashableModel +from pydantic import BaseModel, Extra, Field + + +class HostGPU(BaseModel): + """Host GPU properties detail.""" + + pci_host: str = Field(description="GPU PCI host address") + + class Config: + extra = Extra.forbid + + +class GpuDeviceClass(str, Enum): + """GPU device class. Look at https://admin.pci-ids.ucw.cz/read/PD/03""" + + VGA_COMPATIBLE_CONTROLLER = "0300" + _3D_CONTROLLER = "0302" + + +class GpuDevice(HashableModel): + """GPU properties.""" + + vendor: str = Field(description="GPU vendor name") + device_name: str = Field(description="GPU vendor card name") + device_class: GpuDeviceClass = Field( + description="GPU device class. Look at https://admin.pci-ids.ucw.cz/read/PD/03" + ) + pci_host: str = Field(description="Host PCI bus for this device") + device_id: str = Field(description="GPU vendor & device ids") + + class Config: + extra = Extra.forbid + + +def is_gpu_device_class(device_class: str) -> bool: + try: + GpuDeviceClass(device_class) + return True + except ValueError: + return False + + +def get_vendor_name(vendor_id: str) -> str: + match vendor_id: + case "10de": + return "NVIDIA" + case "1002": + return "AMD" + case "8086": + return "Intel" + case _: + raise ValueError("Device vendor not compatible") + + +def is_kernel_enabled_gpu(pci_host: str) -> bool: + # Get detailed info about Kernel drivers used by this device. + # Needs to use specifically only the kernel driver vfio-pci to be compatible for QEmu virtualization + result = subprocess.run(["lspci", "-s", pci_host, "-nnk"], capture_output=True, text=True, check=True) + details = result.stdout.split("\n") + if "\tKernel driver in use: vfio-pci" in details: + return True + + return False + + +def parse_gpu_device_info(line: str) -> Optional[GpuDevice]: + """Parse GPU device info from a line of lspci output.""" + + pci_host, device = line.split(' "', maxsplit=1) + + if not is_kernel_enabled_gpu(pci_host): + return None + + device_class, device_vendor, device_info = device.split('" "', maxsplit=2) + device_class = device_class.split("[", maxsplit=1)[1][:-1] + + if not is_gpu_device_class(device_class): + return None + + device_class = GpuDeviceClass(device_class) + + vendor, vendor_id = device_vendor.rsplit(" [", maxsplit=1) + vendor_id = vendor_id[:-1] + vendor_name = get_vendor_name(vendor_id) + device_name = device_info.split('"', maxsplit=1)[0] + device_name, model_id = device_name.rsplit(" [", maxsplit=1) + model_id = model_id[:-1] + device_id = f"{vendor_id}:{model_id}" + + return GpuDevice( + pci_host=pci_host, + vendor=vendor_name, + device_name=device_name, + device_class=device_class, + device_id=device_id, + ) + + +def get_gpu_devices() -> Optional[List[GpuDevice]]: + """Get GPU info using lspci command.""" + + result = subprocess.run(["lspci", "-mmnnn"], capture_output=True, text=True, check=True) + gpu_devices = list( + {device for line in result.stdout.split("\n") if line and (device := parse_gpu_device_info(line)) is not None} + ) + return gpu_devices if gpu_devices else None diff --git a/tests/supervisor/test_resources.py b/tests/supervisor/test_resources.py new file mode 100644 index 000000000..fea79fe71 --- /dev/null +++ b/tests/supervisor/test_resources.py @@ -0,0 +1,38 @@ +from unittest import mock + +from aleph.vm.resources import get_gpu_devices + + +def mock_is_kernel_enabled_gpu(pci_host: str) -> bool: + value = True if pci_host == "01:00.0" else False + return value + + +def test_get_gpu_devices(): + class DevicesReturn: + stdout: str = ( + '00:1f.0 "ISA bridge [0601]" "Intel Corporation [8086]" "Device [7a06]" -r11 -p00 "ASUSTeK Computer Inc. [1043]" "Device [8882]"' + '\n00:1f.4 "SMBus [0c05]" "Intel Corporation [8086]" "Raptor Lake-S PCH SMBus Controller [7a23]" -r11 -p00 "ASUSTeK Computer Inc. [1043]" "Device [8882]"' + '\n00:1f.5 "Serial bus controller [0c80]" "Intel Corporation [8086]" "Raptor Lake SPI (flash) Controller [7a24]" -r11 -p00 "ASUSTeK Computer Inc. [1043]" "Device [8882]"' + '\n01:00.0 "VGA compatible controller [0300]" "NVIDIA Corporation [10de]" "AD104GL [RTX 4000 SFF Ada Generation] [27b0]" -ra1 -p00 "NVIDIA Corporation [10de]" "AD104GL [RTX 4000 SFF Ada Generation] [16fa]"' + '\n01:00.1 "Audio device [0403]" "NVIDIA Corporation [10de]" "Device [22bc]" -ra1 -p00 "NVIDIA Corporation [10de]" "Device [16fa]"' + '\n02:00.0 "Non-Volatile memory controller [0108]" "Samsung Electronics Co Ltd [144d]" "NVMe SSD Controller PM9A1/PM9A3/980PRO [a80a]" -p02 "Samsung Electronics Co Ltd [144d]" "NVMe SSD Controller PM9A1/PM9A3/980PRO [aa0a]"' + ) + + with mock.patch( + "subprocess.run", + return_value=DevicesReturn(), + ): + with mock.patch( + "aleph.vm.resources.is_kernel_enabled_gpu", + wraps=mock_is_kernel_enabled_gpu, + ): + expected_gpu_devices = get_gpu_devices() + + print(expected_gpu_devices) + + assert expected_gpu_devices[0].vendor == "NVIDIA" + assert expected_gpu_devices[0].device_name == "AD104GL [RTX 4000 SFF Ada Generation]" + assert expected_gpu_devices[0].device_class == "0300" + assert expected_gpu_devices[0].pci_host == "01:00.0" + assert expected_gpu_devices[0].device_id == "10de:27b0" diff --git a/tests/supervisor/test_views.py b/tests/supervisor/test_views.py index cd32bdc7e..d94ce60f1 100644 --- a/tests/supervisor/test_views.py +++ b/tests/supervisor/test_views.py @@ -36,7 +36,15 @@ async def test_allocation_fails_on_invalid_item_hash(aiohttp_client): @pytest.mark.asyncio async def test_system_usage(aiohttp_client): """Test that the usage system endpoints responds. No auth needed""" + + class FakeVmPool: + gpus = [] + + def get_available_gpus(self): + return [] + app = setup_webapp() + app["vm_pool"] = FakeVmPool() client = await aiohttp_client(app) response: web.Response = await client.get("/about/usage/system") assert response.status == 200 @@ -49,6 +57,13 @@ async def test_system_usage(aiohttp_client): @pytest.mark.asyncio async def test_system_usage_mock(aiohttp_client, mocker): """Test that the usage system endpoints response value. No auth needed""" + + class FakeVmPool: + gpus = [] + + def get_available_gpus(self): + return [] + mocker.patch( "cpuinfo.cpuinfo.get_cpu_info", { @@ -64,7 +79,9 @@ async def test_system_usage_mock(aiohttp_client, mocker): "psutil.cpu_count", lambda: 200, ) + app = setup_webapp() + app["vm_pool"] = FakeVmPool() client = await aiohttp_client(app) response: web.Response = await client.get("/about/usage/system") assert response.status == 200