Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[batch] fix g2 machine memory calculation #14498

Merged
merged 13 commits into from
May 3, 2024
6 changes: 4 additions & 2 deletions batch/batch/cloud/azure/driver/create_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from ....instance_config import InstanceConfig
from ...resource_utils import unreserved_worker_data_disk_size_gib
from ...utils import ACCEPTABLE_QUERY_JAR_URL_PREFIX
from ..resource_utils import azure_machine_type_to_worker_type_and_cores
from ..resource_utils import azure_machine_type_to_parts

log = logging.getLogger('create_instance')

Expand Down Expand Up @@ -43,7 +43,9 @@ def create_vm_config(
instance_config: InstanceConfig,
feature_flags: dict,
) -> dict:
_, cores = azure_machine_type_to_worker_type_and_cores(machine_type)
parts = azure_machine_type_to_parts(machine_type)
assert parts
cores = parts.cores

hail_azure_oauth_scope = os.environ['HAIL_AZURE_OAUTH_SCOPE']
region = instance_config.region_for(location)
Expand Down
15 changes: 7 additions & 8 deletions batch/batch/cloud/azure/driver/resource_manager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import List, Optional, Tuple
from typing import List, Optional

import aiohttp

Expand All @@ -21,7 +21,7 @@
from ..instance_config import AzureSlimInstanceConfig
from ..resource_utils import (
azure_local_ssd_size,
azure_machine_type_to_worker_type_and_cores,
azure_machine_type_to_parts,
azure_worker_memory_per_core_mib,
azure_worker_properties_to_machine_type,
)
Expand Down Expand Up @@ -101,9 +101,6 @@ async def get_vm_state(self, instance: Instance) -> VMState:
def machine_type(self, cores: int, worker_type: str, local_ssd: bool) -> str:
return azure_worker_properties_to_machine_type(worker_type, cores, local_ssd)

def worker_type_and_cores(self, machine_type: str) -> Tuple[str, int]:
return azure_machine_type_to_worker_type_and_cores(machine_type)

def instance_config(
self,
machine_type: str,
Expand Down Expand Up @@ -140,10 +137,12 @@ async def create_vm(
machine_type: str,
instance_config: InstanceConfig,
) -> List[QuantifiedResource]:
worker_type, cores = self.worker_type_and_cores(machine_type)
parts = azure_machine_type_to_parts(machine_type)
assert parts
cores = parts.cores

if local_ssd_data_disk:
assert data_disk_size_gb == azure_local_ssd_size(worker_type, cores)
assert data_disk_size_gb == azure_local_ssd_size(parts.family, parts.cores)

max_price: Optional[float]
if preemptible:
Expand Down Expand Up @@ -173,7 +172,7 @@ async def create_vm(
self.app['feature_flags'],
)

memory_mib = azure_worker_memory_per_core_mib(worker_type) * cores
memory_mib = azure_worker_memory_per_core_mib(parts.family) * cores
memory_in_bytes = memory_mib << 20
cores_mcpu = cores * 1000
total_resources_on_instance = instance_config.quantified_resources(
Expand Down
13 changes: 8 additions & 5 deletions batch/batch/cloud/azure/instance_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from ...driver.billing_manager import ProductVersions
from ...instance_config import InstanceConfig
from .resource_utils import azure_machine_type_to_worker_type_and_cores
from .resource_utils import azure_cores_mcpu_to_memory_bytes, azure_machine_type_to_parts
from .resources import (
AzureDynamicSizedDiskResource,
AzureIPFeeResource,
Expand Down Expand Up @@ -73,14 +73,17 @@ def __init__(
self.boot_disk_size_gb = boot_disk_size_gb
self.resources = resources

worker_type, cores = azure_machine_type_to_worker_type_and_cores(self._machine_type)

self._worker_type = worker_type
self.cores = cores
machine_type_parts = azure_machine_type_to_parts(self._machine_type)
assert machine_type_parts
self._worker_type = machine_type_parts.family
self.cores = machine_type_parts.cores

def worker_type(self) -> str:
return self._worker_type

def cores_mcpu_to_memory_bytes(self, mcpu: int) -> int:
return azure_cores_mcpu_to_memory_bytes(mcpu, self.worker_type())

def region_for(self, location: str) -> str:
return location

Expand Down
20 changes: 18 additions & 2 deletions batch/batch/cloud/azure/resource_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import math
import re
from typing import Dict, Optional, Tuple

Expand Down Expand Up @@ -148,11 +149,13 @@ def azure_machine_type_to_parts(machine_type: str) -> Optional[MachineTypeParts]
return MachineTypeParts.from_dict(match.groupdict())


def azure_machine_type_to_worker_type_and_cores(machine_type: str) -> Tuple[str, int]:
def azure_machine_type_to_cores_and_memory_mib_per_core(machine_type: str) -> Tuple[int, int]:
maybe_machine_type_parts = azure_machine_type_to_parts(machine_type)
if maybe_machine_type_parts is None:
raise ValueError(f'bad machine_type: {machine_type}')
return (maybe_machine_type_parts.family, maybe_machine_type_parts.cores)
cores = maybe_machine_type_parts.cores
memory_per_core = azure_worker_memory_per_core_mib(maybe_machine_type_parts.family)
return cores, memory_per_core


def azure_worker_properties_to_machine_type(worker_type: str, cores: int, local_ssd_data_disk: bool) -> str:
Expand Down Expand Up @@ -204,3 +207,16 @@ def azure_requested_to_actual_storage_bytes(storage_bytes, allow_zero_storage):

def azure_is_valid_storage_request(storage_in_gib: int) -> bool:
return 10 <= storage_in_gib <= AZURE_MAX_PERSISTENT_SSD_SIZE_GIB


def azure_cores_mcpu_to_memory_bytes(mcpu: int, worker_type: str) -> int:
memory_mib = azure_worker_memory_per_core_mib(worker_type)
memory_bytes = int(memory_mib * 1024**2)
return int((mcpu / 1000) * memory_bytes)


def azure_adjust_cores_for_memory_request(cores_in_mcpu: int, memory_in_bytes: int, worker_type: str) -> int:
memory_per_core_mib = azure_worker_memory_per_core_mib(worker_type)
memory_per_core_bytes = int(memory_per_core_mib * 1024**2)
min_cores_mcpu = math.ceil((memory_in_bytes / memory_per_core_bytes) * 1000)
return max(cores_in_mcpu, min_cores_mcpu)
6 changes: 4 additions & 2 deletions batch/batch/cloud/gcp/driver/create_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from ....instance_config import InstanceConfig
from ...resource_utils import unreserved_worker_data_disk_size_gib
from ...utils import ACCEPTABLE_QUERY_JAR_URL_PREFIX
from ..resource_utils import gcp_machine_type_to_worker_type_and_cores, machine_family_to_gpu
from ..resource_utils import gcp_machine_type_to_parts, machine_family_to_gpu

log = logging.getLogger('create_instance')

Expand All @@ -40,7 +40,9 @@ def create_vm_config(
project: str,
instance_config: InstanceConfig,
) -> dict:
_, cores = gcp_machine_type_to_worker_type_and_cores(machine_type)
parts = gcp_machine_type_to_parts(machine_type)
assert parts
cores = parts.cores

region = instance_config.region_for(zone)
machine_family = machine_type.split('-')[0]
Expand Down
16 changes: 9 additions & 7 deletions batch/batch/cloud/gcp/driver/resource_manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
import uuid
from typing import List, Tuple
from typing import List

import aiohttp

Expand All @@ -23,7 +23,7 @@
from ..resource_utils import (
GCP_MACHINE_FAMILY,
family_worker_type_cores_to_gcp_machine_type,
gcp_machine_type_to_worker_type_and_cores,
gcp_machine_type_to_parts,
gcp_worker_memory_per_core_mib,
)
from .billing_manager import GCPBillingManager
Expand Down Expand Up @@ -75,9 +75,6 @@ async def get_vm_state(self, instance: Instance) -> VMState:
def machine_type(self, cores: int, worker_type: str, local_ssd: bool) -> str: # pylint: disable=unused-argument
return family_worker_type_cores_to_gcp_machine_type(GCP_MACHINE_FAMILY, worker_type, cores)

def worker_type_and_cores(self, machine_type: str) -> Tuple[str, int]:
return gcp_machine_type_to_worker_type_and_cores(machine_type)

def instance_config(
self,
machine_type: str,
Expand Down Expand Up @@ -119,7 +116,12 @@ async def create_vm(

resource_rates = self.billing_manager.resource_rates

worker_type, cores = self.worker_type_and_cores(machine_type)
machine_type_parts = gcp_machine_type_to_parts(machine_type)
assert machine_type_parts is not None, machine_type
machine_family = machine_type_parts.machine_family
worker_type = machine_type_parts.worker_type
cores = machine_type_parts.cores

vm_config = create_vm_config(
file_store,
resource_rates,
Expand All @@ -137,7 +139,7 @@ async def create_vm(
instance_config,
)

memory_mib = gcp_worker_memory_per_core_mib(worker_type) * cores
memory_mib = gcp_worker_memory_per_core_mib(machine_family, worker_type) * cores
memory_in_bytes = memory_mib << 20
cores_mcpu = cores * 1000
total_resources_on_instance = instance_config.quantified_resources(
Expand Down
5 changes: 4 additions & 1 deletion batch/batch/cloud/gcp/instance_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from ...driver.billing_manager import ProductVersions
from ...instance_config import InstanceConfig
from .resource_utils import gcp_machine_type_to_parts, machine_family_to_gpu
from .resource_utils import gcp_cores_mcpu_to_memory_bytes, gcp_machine_type_to_parts, machine_family_to_gpu
from .resources import (
GCPAcceleratorResource,
GCPComputeResource,
Expand Down Expand Up @@ -104,6 +104,9 @@ def __init__(
def worker_type(self) -> str:
return self._worker_type

def cores_mcpu_to_memory_bytes(self, mcpu: int) -> int:
return gcp_cores_mcpu_to_memory_bytes(mcpu, self._instance_family, self.worker_type())

def region_for(self, location: str) -> str:
# location = zone
return region_from_location(location)
Expand Down
44 changes: 33 additions & 11 deletions batch/batch/cloud/gcp/resource_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import math
import re
from typing import Optional, Tuple

Expand All @@ -10,6 +11,13 @@

MACHINE_FAMILY_TO_ACCELERATOR_VERSIONS = {'g2': 'l4'}

MEMORY_PER_CORE_MIB = {
('n1', 'standard'): 3840,
('n1', 'highmem'): 6656,
('n1', 'highcpu'): 924,
('g2', 'standard'): 4000,
}

gcp_valid_cores_from_worker_type = {
'highcpu': [2, 4, 8, 16, 32, 64, 96],
'standard': [1, 2, 4, 8, 16, 32, 64, 96],
Expand Down Expand Up @@ -45,27 +53,26 @@ def gcp_machine_type_to_parts(machine_type: str) -> Optional[MachineTypeParts]:
return MachineTypeParts.from_dict(match.groupdict())


def gcp_machine_type_to_worker_type_and_cores(machine_type: str) -> Tuple[str, int]:
def gcp_machine_type_to_cores_and_memory_mib_per_core(machine_type: str) -> Tuple[int, int]:
# FIXME: "WORKER TYPE" IS WRONG OR CONFUSING WHEN THE MACHINE TYPE IS NOT n1!
maybe_machine_type_parts = gcp_machine_type_to_parts(machine_type)
if maybe_machine_type_parts is None:
raise ValueError(f'bad machine_type: {machine_type}')
return (maybe_machine_type_parts.worker_type, maybe_machine_type_parts.cores)
cores = maybe_machine_type_parts.cores
memory_per_core = gcp_worker_memory_per_core_mib(
maybe_machine_type_parts.machine_family, maybe_machine_type_parts.worker_type
)
return cores, memory_per_core


def family_worker_type_cores_to_gcp_machine_type(family: str, worker_type: str, cores: int) -> str:
return f'{family}-{worker_type}-{cores}'


def gcp_worker_memory_per_core_mib(worker_type: str) -> int:
if worker_type == 'standard':
m = 3840
elif worker_type == 'highmem':
m = 6656
else:
assert worker_type == 'highcpu', worker_type
m = 924 # this number must be divisible by 4. I rounded up to the nearest MiB
return m
def gcp_worker_memory_per_core_mib(machine_family: str, worker_type: str) -> int:
machine_worker_key = (machine_family, worker_type)
assert machine_worker_key in MEMORY_PER_CORE_MIB, machine_worker_key
return MEMORY_PER_CORE_MIB[machine_worker_key]


def gcp_requested_to_actual_storage_bytes(storage_bytes, allow_zero_storage):
Expand All @@ -91,3 +98,18 @@ def machine_family_to_gpu(machine_family: str) -> Optional[str]:

def is_gpu(machine_family: str) -> bool:
return machine_family_to_gpu(machine_family) is not None


def gcp_cores_mcpu_to_memory_bytes(mcpu: int, machine_family: str, worker_type: str) -> int:
memory_mib = gcp_worker_memory_per_core_mib(machine_family, worker_type)
memory_bytes = int(memory_mib * 1024**2)
return int((mcpu / 1000) * memory_bytes)


def gcp_adjust_cores_for_memory_request(
cores_in_mcpu: int, memory_in_bytes: int, machine_family: str, worker_type: str
) -> int:
memory_per_core_mib = gcp_worker_memory_per_core_mib(machine_family, worker_type)
memory_per_core_bytes = int(memory_per_core_mib * 1024**2)
min_cores_mcpu = math.ceil((memory_in_bytes / memory_per_core_bytes) * 1000)
return max(cores_in_mcpu, min_cores_mcpu)
43 changes: 10 additions & 33 deletions batch/batch/cloud/resource_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,20 @@
from .azure.resource_utils import (
azure_is_valid_storage_request,
azure_local_ssd_size,
azure_machine_type_to_worker_type_and_cores,
azure_machine_type_to_cores_and_memory_mib_per_core,
azure_memory_to_worker_type,
azure_requested_to_actual_storage_bytes,
azure_valid_cores_from_worker_type,
azure_valid_machine_types,
azure_worker_memory_per_core_mib,
)
from .gcp.resource_utils import (
gcp_is_valid_storage_request,
gcp_local_ssd_size,
gcp_machine_type_to_worker_type_and_cores,
gcp_machine_type_to_cores_and_memory_mib_per_core,
gcp_memory_to_worker_type,
gcp_requested_to_actual_storage_bytes,
gcp_valid_cores_from_worker_type,
gcp_valid_machine_types,
gcp_worker_memory_per_core_mib,
)

log = logging.getLogger('resource_utils')
Expand Down Expand Up @@ -52,36 +50,15 @@ def memory_to_worker_type(cloud: str) -> Dict[str, str]:
return gcp_memory_to_worker_type


def machine_type_to_worker_type_cores(cloud: str, machine_type: str) -> Tuple[str, int]:
def machine_type_to_cores_and_memory_bytes(cloud: str, machine_type: str) -> Tuple[int, int]:
if cloud == 'azure':
return azure_machine_type_to_worker_type_and_cores(machine_type)
assert cloud == 'gcp'
return gcp_machine_type_to_worker_type_and_cores(machine_type)


def worker_memory_per_core_mib(cloud: str, worker_type: str) -> int:
if cloud == 'azure':
return azure_worker_memory_per_core_mib(worker_type)
assert cloud == 'gcp'
return gcp_worker_memory_per_core_mib(worker_type)


def worker_memory_per_core_bytes(cloud: str, worker_type: str) -> int:
m = worker_memory_per_core_mib(cloud, worker_type)
return int(m * 1024**2)


def memory_bytes_to_cores_mcpu(cloud: str, memory_in_bytes: int, worker_type: str) -> int:
return math.ceil((memory_in_bytes / worker_memory_per_core_bytes(cloud, worker_type)) * 1000)


def cores_mcpu_to_memory_bytes(cloud: str, cores_in_mcpu: int, worker_type: str) -> int:
return int((cores_in_mcpu / 1000) * worker_memory_per_core_bytes(cloud, worker_type))


def adjust_cores_for_memory_request(cloud: str, cores_in_mcpu: int, memory_in_bytes: int, worker_type: str) -> int:
min_cores_mcpu = memory_bytes_to_cores_mcpu(cloud, memory_in_bytes, worker_type)
return max(cores_in_mcpu, min_cores_mcpu)
cores, memory_mib_per_core = azure_machine_type_to_cores_and_memory_mib_per_core(machine_type)
else:
assert cloud == 'gcp'
cores, memory_mib_per_core = gcp_machine_type_to_cores_and_memory_mib_per_core(machine_type)
memory_bytes_per_core = memory_mib_per_core * 1024**2
memory_bytes = cores * memory_bytes_per_core
return cores, memory_bytes


def unreserved_worker_data_disk_size_gib(data_disk_size_gib: int, cores: int) -> int:
Expand Down
Loading