Skip to content

Commit

Permalink
Implement GPU Support (#728)
Browse files Browse the repository at this point in the history
* Problem: If a user wants to assign a GPU to a QEmu VM he cannot do it.

Solution: Implement GPU assignation feature that will be pass-though to QEmu VMs with native performance.

* Fix: Solved code quality issues

* Fix: Solved compilation issue and fixed gpu logic.

* Fix: Solved issue getting already running executions with GPU

* Fix: Expose GPU support option in `status/config` endpoint

* Fix: Applied some code review suggestions

* Add migration

* Fix: Allow to use the notify endpoint for GPU instances also.

* Fix: Remove migration duplicity.

* Fix: Changes DB initialization order to ensure that DB always exists before running the migrations.

* Fix: Updated migration to only insert the column if isn't inside.

---------

Co-authored-by: Olivier Le Thanh Duong <olivier@lethanh.be>
  • Loading branch information
nesitor and olethanh committed Dec 17, 2024
1 parent 80d1d79 commit 5506cbe
Show file tree
Hide file tree
Showing 18 changed files with 180 additions and 46 deletions.
2 changes: 1 addition & 1 deletion packaging/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ debian-package-code:
cp -r ../examples/data ./aleph-vm/opt/aleph-vm/examples/data
mkdir -p ./aleph-vm/opt/aleph-vm/examples/volumes
# Fixing this protobuf dependency version to avoid getting CI errors as version 5.29.0 have this compilation issue
pip3 install --progress-bar off --target ./aleph-vm/opt/aleph-vm/ 'aleph-message==0.5.0' 'eth-account==0.10' 'sentry-sdk==1.31.0' 'qmp==1.1.0' 'aleph-superfluid~=0.2.1' 'sqlalchemy[asyncio]>=2.0' 'aiosqlite==0.19.0' 'alembic==1.13.1' 'aiohttp_cors==0.7.0' 'pyroute2==0.7.12' 'python-cpuid==0.1.0' 'solathon==1.0.2' 'protobuf==5.28.3'
pip3 install --progress-bar off --target ./aleph-vm/opt/aleph-vm/ 'aleph-message==0.6' 'eth-account==0.10' 'sentry-sdk==1.31.0' 'qmp==1.1.0' 'aleph-superfluid~=0.2.1' 'sqlalchemy[asyncio]>=2.0' 'aiosqlite==0.19.0' 'alembic==1.13.1' 'aiohttp_cors==0.7.0' 'pyroute2==0.7.12' 'python-cpuid==0.1.0' 'solathon==1.0.2' 'protobuf==5.28.3'
python3 -m compileall ./aleph-vm/opt/aleph-vm/

debian-package-resources: firecracker-bins vmlinux download-ipfs-kubo target/bin/sevctl
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ dependencies = [
"aioredis==1.3.1",
"aiosqlite==0.19",
"alembic==1.13.1",
"aleph-message==0.5",
"aleph-message==0.6",
"aleph-superfluid~=0.2.1",
"dbus-python==1.3.2",
"eth-account~=0.10",
Expand Down
6 changes: 6 additions & 0 deletions src/aleph/vm/controllers/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ class QemuVMHostVolume(BaseModel):
read_only: bool


class QemuGPU(BaseModel):
pci_host: str


class QemuVMConfiguration(BaseModel):
qemu_bin_path: str
cloud_init_drive_path: str | None
Expand All @@ -33,6 +37,7 @@ class QemuVMConfiguration(BaseModel):
mem_size_mb: int
interface_name: str | None
host_volumes: list[QemuVMHostVolume]
gpus: list[QemuGPU]


class QemuConfidentialVMConfiguration(BaseModel):
Expand All @@ -45,6 +50,7 @@ class QemuConfidentialVMConfiguration(BaseModel):
mem_size_mb: int
interface_name: str | None
host_volumes: list[QemuVMHostVolume]
gpus: list[QemuGPU]
ovmf_path: Path
sev_session_file: Path
sev_dh_cert_file: Path
Expand Down
7 changes: 6 additions & 1 deletion src/aleph/vm/controllers/qemu/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from asyncio import Task
from asyncio.subprocess import Process
from pathlib import Path
from typing import Generic, TypeVar
from typing import Generic, List, TypeVar

import psutil
from aleph_message.models import ItemHash
Expand All @@ -17,6 +17,7 @@
from aleph.vm.controllers.configuration import (
Configuration,
HypervisorType,
QemuGPU,
QemuVMConfiguration,
QemuVMHostVolume,
save_controller_configuration,
Expand All @@ -29,13 +30,16 @@
from aleph.vm.controllers.qemu.cloudinit import CloudInitMixin
from aleph.vm.network.firewall import teardown_nftables_for_vm
from aleph.vm.network.interfaces import TapInterface
from aleph.vm.resources import HostGPU
from aleph.vm.storage import get_rootfs_base_path
from aleph.vm.utils import HostNotFoundError, ping, run_in_subprocess

logger = logging.getLogger(__name__)


class AlephQemuResources(AlephFirecrackerResources):
gpus: List[HostGPU] = []

async def download_runtime(self) -> None:
volume = self.message_content.rootfs
parent_image_path = await get_rootfs_base_path(volume.parent.ref)
Expand Down Expand Up @@ -200,6 +204,7 @@ async def configure(self):
)
for volume in self.resources.volumes
],
gpus=[QemuGPU(pci_host=gpu.pci_host) for gpu in self.resources.gpus],
)

configuration = Configuration(
Expand Down
2 changes: 2 additions & 0 deletions src/aleph/vm/controllers/qemu_confidential/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
Configuration,
HypervisorType,
QemuConfidentialVMConfiguration,
QemuGPU,
QemuVMHostVolume,
save_controller_configuration,
)
Expand Down Expand Up @@ -126,6 +127,7 @@ async def configure(self):
)
for volume in self.resources.volumes
],
gpus=[QemuGPU(pci_host=gpu.pci_host) for gpu in self.resources.gpus],
)

configuration = Configuration(
Expand Down
34 changes: 28 additions & 6 deletions src/aleph/vm/hypervisors/qemu/qemuvm.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import qmp
from systemd import journal

from aleph.vm.controllers.configuration import QemuVMConfiguration
from aleph.vm.controllers.configuration import QemuGPU, QemuVMConfiguration
from aleph.vm.controllers.qemu.instance import logger


Expand All @@ -28,6 +28,7 @@ class QemuVM:
interface_name: str
qemu_process: Process | None = None
host_volumes: list[HostVolume]
gpus: list[QemuGPU]
journal_stdout: TextIO | None
journal_stderr: TextIO | None

Expand Down Expand Up @@ -55,6 +56,7 @@ def __init__(self, vm_hash, config: QemuVMConfiguration):
)
for volume in config.host_volumes
]
self.gpus = config.gpus

@property
def _journal_stdout_name(self) -> str:
Expand Down Expand Up @@ -106,17 +108,15 @@ async def start(
# "-serial", "telnet:localhost:4321,server,nowait",
# "-snapshot", # Do not save anything to disk
]
for volume in self.host_volumes:
args += [
"-drive",
f"file={volume.path_on_host},format=raw,readonly={'on' if volume.read_only else 'off'},media=disk,if=virtio",
]
if self.interface_name:
# script=no, downscript=no tell qemu not to try to set up the network itself
args += ["-net", "nic,model=virtio", "-net", f"tap,ifname={self.interface_name},script=no,downscript=no"]

if self.cloud_init_drive_path:
args += ["-cdrom", f"{self.cloud_init_drive_path}"]

args += self._get_host_volumes_args()
args += self._get_gpu_args()
print(*args)

self.qemu_process = proc = await asyncio.create_subprocess_exec(
Expand All @@ -131,6 +131,28 @@ async def start(
)
return proc

def _get_host_volumes_args(self):
args = []
for volume in self.host_volumes:
args += [
"-drive",
f"file={volume.path_on_host},format=raw,readonly={'on' if volume.read_only else 'off'},media=disk,if=virtio",
]
return args

def _get_gpu_args(self):
args = [
# Use host-phys-bits-limit argument for GPU support. TODO: Investigate how to get the correct bits size
"-cpu",
"host,host-phys-bits-limit=0x28",
]
for gpu in self.gpus:
args += [
"-device",
f"vfio-pci,host={gpu.pci_host},multifunction=on,x-vga=on",
]
return args

def _get_qmpclient(self) -> qmp.QEMUMonitorProtocol | None:
if not (self.qmp_socket_path and self.qmp_socket_path.exists()):
return None
Expand Down
12 changes: 6 additions & 6 deletions src/aleph/vm/hypervisors/qemu_confidential/qemuvm.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,24 +104,24 @@ async def start(
# raise an error and prevent boot. Passing the argument --cpu host instruct the VM to use the same CPU
# model than the host thus the VM's kernel knows which method is used to get random numbers (Intel and
# AMD have different methods) and properly boot.
# Use host-phys-bits-limit argument for GPU support. TODO: Investigate how to get the correct bits size
"-cpu",
"host",
"host,host-phys-bits-limit=0x28",
# Uncomment following for debug
# "-serial", "telnet:localhost:4321,server,nowait",
# "-snapshot", # Do not save anything to disk
]
for volume in self.host_volumes:
args += [
"-drive",
f"file={volume.path_on_host},format=raw,readonly={'on' if volume.read_only else 'off'},media=disk,if=virtio",
]
if self.interface_name:
# script=no, downscript=no tell qemu not to try to set up the network itself
args += ["-net", "nic,model=virtio", "-net", f"tap,ifname={self.interface_name},script=no,downscript=no"]

if self.cloud_init_drive_path:
args += ["-cdrom", f"{self.cloud_init_drive_path}"]

args += self._get_host_volumes_args()
args += self._get_gpu_args()
print(*args)

self.qemu_process = proc = await asyncio.create_subprocess_exec(
*args,
stdin=asyncio.subprocess.DEVNULL,
Expand Down
34 changes: 30 additions & 4 deletions src/aleph/vm/models.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
import asyncio
import json
import logging
import uuid
from asyncio import Task
from collections.abc import Callable, Coroutine
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import List

from aleph_message.models import (
ExecutableContent,
InstanceContent,
ItemHash,
ProgramContent,
)
from aleph_message.models.execution.environment import HypervisorType
from aleph_message.models.execution.environment import GpuProperties, HypervisorType
from pydantic.json import pydantic_encoder

from aleph.vm.conf import settings
from aleph.vm.controllers.firecracker.executable import AlephFirecrackerExecutable
from aleph.vm.controllers.firecracker.instance import AlephInstanceResources
from aleph.vm.controllers.firecracker.program import (
AlephFirecrackerProgram,
AlephFirecrackerResources,
AlephProgramResources,
)
from aleph.vm.controllers.firecracker.snapshot_manager import SnapshotManager
Expand All @@ -38,6 +40,7 @@
)
from aleph.vm.orchestrator.pubsub import PubSub
from aleph.vm.orchestrator.vm import AlephFirecrackerInstance
from aleph.vm.resources import GpuDevice, HostGPU
from aleph.vm.systemd import SystemDManager
from aleph.vm.utils import create_task_log_exceptions, dumps_for_json

Expand Down Expand Up @@ -69,8 +72,11 @@ class VmExecution:
vm_hash: ItemHash
original: ExecutableContent
message: ExecutableContent
resources: AlephFirecrackerResources | None = None
vm: AlephFirecrackerExecutable | AlephQemuInstance | None = None
resources: (
AlephProgramResources | AlephInstanceResources | AlephQemuResources | AlephQemuConfidentialInstance | None
) = None
vm: AlephFirecrackerExecutable | AlephQemuInstance | AlephQemuConfidentialInstance | None = None
gpus: List[HostGPU] = []

times: VmExecutionTimes

Expand Down Expand Up @@ -202,6 +208,7 @@ async def prepare(self) -> None:
resources = AlephQemuConfidentialResources(self.message, namespace=self.vm_hash)
else:
resources = AlephQemuResources(self.message, namespace=self.vm_hash)
resources.gpus = self.gpus
else:
msg = f"Unknown hypervisor type {self.hypervisor}"
raise ValueError(msg)
Expand All @@ -216,6 +223,24 @@ async def prepare(self) -> None:
self.times.prepared_at = datetime.now(tz=timezone.utc)
self.resources = resources

def prepare_gpus(self, available_gpus: List[GpuDevice]) -> None:
gpus = []
if self.message.requirements and self.message.requirements.gpu:
for gpu in self.message.requirements.gpu:
gpu = GpuProperties.parse_obj(gpu)
for available_gpu in available_gpus:
if available_gpu.device_id == gpu.device_id:
gpus.append(HostGPU(pci_host=available_gpu.pci_host))
break
self.gpus = gpus

def uses_gpu(self, pci_host: str) -> bool:
for gpu in self.gpus:
if gpu.pci_host == pci_host:
return True

return False

def create(
self, vm_id: int, tap_interface: TapInterface | None = None, prepare: bool = True
) -> AlephVmControllerInterface:
Expand Down Expand Up @@ -437,6 +462,7 @@ async def save(self):
message=self.message.json(),
original_message=self.original.json(),
persistent=self.persistent,
gpus=json.dumps(self.gpus, default=pydantic_encoder),
)
)

Expand Down
6 changes: 5 additions & 1 deletion src/aleph/vm/orchestrator/chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,13 @@ def check_tokens(cls, values):
}


class InvalidChainError(ValueError):
pass


def get_chain(chain: str) -> ChainInfo:
try:
return STREAM_CHAINS[chain]
except KeyError:
msg = f"Unknown chain id for chain {chain}"
raise ValueError(msg)
raise InvalidChainError(msg)
7 changes: 4 additions & 3 deletions src/aleph/vm/orchestrator/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,6 @@ async def benchmark(runs: int):
"""Measure program performance by immediately running the supervisor
with fake requests.
"""
engine = metrics.setup_engine()
await metrics.create_tables(engine)

ref = ItemHash("cafecafecafecafecafecafecafecafecafecafecafecafecafecafecafecafe")
settings.FAKE_DATA_PROGRAM = settings.BENCHMARK_FAKE_DATA_PROGRAM

Expand Down Expand Up @@ -357,6 +354,10 @@ def main():
settings.check()

logger.debug("Initialising the DB...")
# Check and create execution database
engine = metrics.setup_engine()
asyncio.run(metrics.create_tables(engine))
# After creating it run the DB migrations
asyncio.run(run_async_db_migrations())
logger.debug("DB up to date.")

Expand Down
2 changes: 2 additions & 0 deletions src/aleph/vm/orchestrator/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ class ExecutionRecord(Base):
original_message = Column(JSON, nullable=True)
persistent = Column(Boolean, nullable=True)

gpus = Column(JSON, nullable=True)

def __repr__(self):
return f"<ExecutionRecord(uuid={self.uuid}, vm_hash={self.vm_hash}, vm_id={self.vm_id})>"

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""add gpu table
Revision ID: 5c6ae643c69b
Revises: bbb12a12372e
Create Date: 2024-12-09 19:40:19.279735
"""

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
from sqlalchemy import create_engine
from sqlalchemy.engine import reflection

from aleph.vm.conf import make_db_url

revision = "5c6ae643c69b"
down_revision = "bbb12a12372e"
branch_labels = None
depends_on = None


def upgrade() -> None:
engine = create_engine(make_db_url())
inspector = reflection.Inspector.from_engine(engine)

# The table already exists on most CRNs.
tables = inspector.get_table_names()
if "executions" in tables:
columns = inspector.get_columns("executions")
column_names = [c["name"] for c in columns]
if "gpus" not in column_names:
op.add_column("executions", sa.Column("gpus", sa.JSON(), nullable=True))


def downgrade() -> None:
op.drop_column("executions", "gpus")
Loading

0 comments on commit 5506cbe

Please sign in to comment.