Skip to content

Commit

Permalink
fix: OpenJD architecture capability reporting on ARM (#118)
Browse files Browse the repository at this point in the history
Signed-off-by: Josh Usiskin <56369778+jusiskin@users.noreply.github.com>
  • Loading branch information
jusiskin authored Dec 21, 2023
1 parent 0ee5ff3 commit 73a2877
Show file tree
Hide file tree
Showing 4 changed files with 238 additions and 182 deletions.
103 changes: 102 additions & 1 deletion src/deadline_worker_agent/startup/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,116 @@
from typing import Any, Literal, TYPE_CHECKING
from openjd.model import validate_attribute_capability_name, validate_amount_capability_name
from openjd.model.v2023_09 import STANDARD_ATTRIBUTE_CAPABILITIES, STANDARD_AMOUNT_CAPABILITIES
import logging
import platform
import shutil
import subprocess

from pydantic import BaseModel, NonNegativeFloat
from pydantic import BaseModel, NonNegativeFloat, PositiveFloat
import psutil

from ..errors import ConfigurationError

if TYPE_CHECKING:
from pydantic.typing import CallableGenerator


_logger = logging.getLogger(__name__)


def detect_system_capabilities() -> Capabilities:
amounts: dict[AmountCapabilityName, PositiveFloat] = {}
attributes: dict[AttributeCapabilityName, list[str]] = {}

# Determine OpenJobDescription OS
platform_system = platform.system().lower()
python_system_to_openjd_os_family = {
"darwin": "macos",
"linux": "linux",
"windows": "windows",
}
if openjd_os_family := python_system_to_openjd_os_family.get(platform_system):
attributes[AttributeCapabilityName("attr.worker.os.family")] = [openjd_os_family]

attributes[AttributeCapabilityName("attr.worker.cpu.arch")] = [_get_arch()]

amounts[AmountCapabilityName("amount.worker.vcpu")] = float(psutil.cpu_count())
amounts[AmountCapabilityName("amount.worker.memory")] = float(psutil.virtual_memory().total) / (
1024.0**2
)
amounts[AmountCapabilityName("amount.worker.disk.scratch")] = int(
shutil.disk_usage("/").free // 1024 // 1024
)
amounts[AmountCapabilityName("amount.worker.gpu")] = _get_gpu_count()
amounts[AmountCapabilityName("amount.worker.gpu.memory")] = _get_gpu_memory()

return Capabilities(amounts=amounts, attributes=attributes)


def _get_arch() -> str:
# Determine OpenJobDescription architecture
python_machine_to_openjd_arch = {
"aarch64": "arm64",
"amd64": "x86_64",
}
platform_machine = platform.machine()
return python_machine_to_openjd_arch.get(platform_machine, platform_machine)


def _get_gpu_count(*, verbose: bool = True) -> int:
"""
Get the number of GPUs available on the machine.
Returns
-------
int
The number of GPUs available on the machine.
"""
try:
output = subprocess.check_output(
["nvidia-smi", "--query-gpu=count", "--format=csv,noheader"]
)
except FileNotFoundError:
if verbose:
_logger.warning("Could not detect GPU count, nvidia-smi not found")
return 0
except subprocess.CalledProcessError:
if verbose:
_logger.warning("Could not detect GPU count, error running nvidia-smi")
return 0
else:
if verbose:
_logger.info("Number of GPUs: %s", output.decode().strip())
return int(output.decode().strip())


def _get_gpu_memory(*, verbose: bool = True) -> int:
"""
Get the total GPU memory available on the machine.
Returns
-------
int
The total GPU memory available on the machine.
"""
try:
output = subprocess.check_output(
["nvidia-smi", "--query-gpu=memory.total", "--format=csv,noheader"]
)
except FileNotFoundError:
if verbose:
_logger.warning("Could not detect GPU memory, nvidia-smi not found")
return 0
except subprocess.CalledProcessError:
if verbose:
_logger.warning("Could not detect GPU memory, error running nvidia-smi")
return 0
else:
if verbose:
_logger.info("Total GPU Memory: %s", output.decode().strip())
return int(output.decode().strip().replace("MiB", ""))


def capability_type(capability_name_str: str) -> Literal["amount", "attr"]:
no_prefix_capability_name_str = capability_name_str
if ":" in capability_name_str:
Expand Down
88 changes: 2 additions & 86 deletions src/deadline_worker_agent/startup/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,12 @@
import sys
from logging.handlers import TimedRotatingFileHandler
from typing import Optional
import platform
import psutil
import shutil
from pathlib import Path

from openjd.model import version as openjd_model_version
from openjd.sessions import version as openjd_sessions_version
from openjd.sessions import LOG as OPENJD_SESSION_LOG
from deadline.job_attachments import version as deadline_job_attach_version
from pydantic import PositiveFloat

from .._version import __version__
from ..api_models import WorkerStatus
Expand All @@ -29,8 +25,8 @@
from ..log_sync.loggers import ROOT_LOGGER, logger as log_sync_logger
from ..worker import Worker
from .bootstrap import bootstrap_worker
from .capabilities import AmountCapabilityName, AttributeCapabilityName, Capabilities
from .config import Capabilities, Configuration, ConfigurationError
from .capabilities import detect_system_capabilities
from .config import Configuration, ConfigurationError
from ..aws.deadline import (
DeadlineRequestError,
delete_worker,
Expand All @@ -42,32 +38,6 @@
_logger = logging.getLogger(__name__)


def detect_system_capabilities() -> Capabilities:
amounts: dict[AmountCapabilityName, PositiveFloat] = {}
attributes: dict[AttributeCapabilityName, list[str]] = {}

platform_system = platform.system().lower()
python_system_to_openjd_os_family = {
"darwin": "macos",
"linux": "linux",
"windows": "windows",
}
if openjd_os_family := python_system_to_openjd_os_family.get(platform_system):
attributes[AttributeCapabilityName("attr.worker.os.family")] = [openjd_os_family]
attributes[AttributeCapabilityName("attr.worker.cpu.arch")] = [platform.machine()]
amounts[AmountCapabilityName("amount.worker.vcpu")] = float(psutil.cpu_count())
amounts[AmountCapabilityName("amount.worker.memory")] = float(psutil.virtual_memory().total) / (
1024.0**2
)
amounts[AmountCapabilityName("amount.worker.disk.scratch")] = int(
shutil.disk_usage("/").free // 1024 // 1024
)
amounts[AmountCapabilityName("amount.worker.gpu")] = _get_gpu_count()
amounts[AmountCapabilityName("amount.worker.gpu.memory")] = _get_gpu_memory()

return Capabilities(amounts=amounts, attributes=attributes)


def entrypoint(cli_args: Optional[list[str]] = None) -> None:
"""Entrypoint for the Worker Agent. The worker gets registered and then polls for tasks to
complete.
Expand Down Expand Up @@ -342,57 +312,3 @@ def _log_agent_info() -> None:
_logger.info("\topenjd.model: %s", openjd_model_version)
_logger.info("\topenjd.sessions: %s", openjd_sessions_version)
_logger.info("\tdeadline.job_attachments: %s", deadline_job_attach_version)


def _get_gpu_count(*, verbose: bool = True) -> int:
"""
Get the number of GPUs available on the machine.
Returns
-------
int
The number of GPUs available on the machine.
"""
try:
output = subprocess.check_output(
["nvidia-smi", "--query-gpu=count", "--format=csv,noheader"]
)
except FileNotFoundError:
if verbose:
_logger.warning("Could not detect GPU count, nvidia-smi not found")
return 0
except subprocess.CalledProcessError:
if verbose:
_logger.warning("Could not detect GPU count, error running nvidia-smi")
return 0
else:
if verbose:
_logger.info("Number of GPUs: %s", output.decode().strip())
return int(output.decode().strip())


def _get_gpu_memory(*, verbose: bool = True) -> int:
"""
Get the total GPU memory available on the machine.
Returns
-------
int
The total GPU memory available on the machine.
"""
try:
output = subprocess.check_output(
["nvidia-smi", "--query-gpu=memory.total", "--format=csv,noheader"]
)
except FileNotFoundError:
if verbose:
_logger.warning("Could not detect GPU memory, nvidia-smi not found")
return 0
except subprocess.CalledProcessError:
if verbose:
_logger.warning("Could not detect GPU memory, error running nvidia-smi")
return 0
else:
if verbose:
_logger.info("Total GPU Memory: %s", output.decode().strip())
return int(output.decode().strip().replace("MiB", ""))
135 changes: 134 additions & 1 deletion test/unit/startup/test_capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@
"""Tests for the deadline_worker_agent.startup.capabilities module"""

from typing import Any
from pydantic import ValidationError
from unittest.mock import MagicMock, patch
import pytest
import subprocess

from pydantic import ValidationError

from deadline_worker_agent.startup.capabilities import Capabilities
from deadline_worker_agent.startup import capabilities as capabilities_mod


@pytest.mark.parametrize(
Expand Down Expand Up @@ -195,3 +199,132 @@ def test_merge(

# THEN
assert result == expected_result


@pytest.mark.parametrize(
argnames=("platform_machine", "expected_arch"),
argvalues=(
pytest.param("x86_64", "x86_64", id="intel-x86-64bit"),
pytest.param("amd64", "x86_64", id="amd-x86-64bit"),
pytest.param("arm64", "arm64", id="macos-arm"),
pytest.param("aarch64", "arm64", id="macos-arm"),
),
)
def test_get_arch(
platform_machine: str,
expected_arch: str,
) -> None:
"""Tests that the _get_arch() function returns the correctly mapped value from
platform.machine()"""

# GIVEN
with patch.object(capabilities_mod.platform, "machine", return_value=platform_machine):
# WHEN
arch = capabilities_mod._get_arch()

# THEN
assert arch == expected_arch


class TestGetGPUCount:
@patch.object(capabilities_mod.subprocess, "check_output")
def test_get_gpu_count(
self,
check_output_mock: MagicMock,
) -> None:
"""
Tests that the _get_gpu_count function returns the correct number of GPUs
"""
# GIVEN
check_output_mock.return_value = b"2"

# WHEN
result = capabilities_mod._get_gpu_count()

# THEN
check_output_mock.assert_called_once_with(
["nvidia-smi", "--query-gpu=count", "--format=csv,noheader"]
)
assert result == 2

@pytest.mark.parametrize(
("exception", "expected_result"),
(
pytest.param(FileNotFoundError("nvidia-smi not found"), 0, id="FileNotFoundError"),
pytest.param(subprocess.CalledProcessError(1, "command"), 0, id="CalledProcessError"),
),
)
@patch.object(capabilities_mod.subprocess, "check_output")
def test_get_gpu_count_nvidia_smi_error(
self,
check_output_mock: MagicMock,
exception: Exception,
expected_result: int,
) -> None:
"""
Tests that the _get_gpu_count function returns 0 when nvidia-smi is not found or fails
"""
# GIVEN
check_output_mock.side_effect = exception

# WHEN
result = capabilities_mod._get_gpu_count()

# THEN
check_output_mock.assert_called_once_with(
["nvidia-smi", "--query-gpu=count", "--format=csv,noheader"]
)

assert result == expected_result


class TestGetGPUMemory:
@patch.object(capabilities_mod.subprocess, "check_output")
def test_get_gpu_memory(
self,
check_output_mock: MagicMock,
) -> None:
"""
Tests that the _get_gpu_memory function returns total memory
"""
# GIVEN
check_output_mock.return_value = b"6800 MiB"

# WHEN
result = capabilities_mod._get_gpu_memory()

# THEN
check_output_mock.assert_called_once_with(
["nvidia-smi", "--query-gpu=memory.total", "--format=csv,noheader"]
)
assert result == 6800

@pytest.mark.parametrize(
("exception", "expected_result"),
(
pytest.param(FileNotFoundError("nvidia-smi not found"), 0, id="FileNotFoundError"),
pytest.param(subprocess.CalledProcessError(1, "command"), 0, id="CalledProcessError"),
),
)
@patch.object(capabilities_mod.subprocess, "check_output")
def test_get_gpu_memory_nvidia_smi_error(
self,
check_output_mock: MagicMock,
exception: Exception,
expected_result: int,
) -> None:
"""
Tests that the _get_gpu_memory function returns 0 when nvidia-smi is not found or fails
"""
# GIVEN
check_output_mock.side_effect = exception

# WHEN
result = capabilities_mod._get_gpu_memory()

# THEN
check_output_mock.assert_called_once_with(
["nvidia-smi", "--query-gpu=memory.total", "--format=csv,noheader"]
)

assert result == expected_result
Loading

0 comments on commit 73a2877

Please sign in to comment.