diff --git a/batch/batch/cloud/azure/driver/create_instance.py b/batch/batch/cloud/azure/driver/create_instance.py index 122386a6b72..754185d4117 100644 --- a/batch/batch/cloud/azure/driver/create_instance.py +++ b/batch/batch/cloud/azure/driver/create_instance.py @@ -13,7 +13,7 @@ from ....instance_config import InstanceConfig from ...resource_utils import unreserved_worker_data_disk_size_gib from ...utils import ACCEPTABLE_QUERY_JAR_URL_PREFIX -from ..resource_utils import azure_machine_type_to_worker_type_and_cores +from ..resource_utils import azure_machine_type_to_parts log = logging.getLogger('create_instance') @@ -43,7 +43,9 @@ def create_vm_config( instance_config: InstanceConfig, feature_flags: dict, ) -> dict: - _, cores = azure_machine_type_to_worker_type_and_cores(machine_type) + parts = azure_machine_type_to_parts(machine_type) + assert parts + cores = parts.cores hail_azure_oauth_scope = os.environ['HAIL_AZURE_OAUTH_SCOPE'] region = instance_config.region_for(location) diff --git a/batch/batch/cloud/azure/driver/resource_manager.py b/batch/batch/cloud/azure/driver/resource_manager.py index ae61e52210b..1744e10e16b 100644 --- a/batch/batch/cloud/azure/driver/resource_manager.py +++ b/batch/batch/cloud/azure/driver/resource_manager.py @@ -1,5 +1,5 @@ import logging -from typing import List, Optional, Tuple +from typing import List, Optional import aiohttp @@ -21,7 +21,7 @@ from ..instance_config import AzureSlimInstanceConfig from ..resource_utils import ( azure_local_ssd_size, - azure_machine_type_to_worker_type_and_cores, + azure_machine_type_to_parts, azure_worker_memory_per_core_mib, azure_worker_properties_to_machine_type, ) @@ -101,9 +101,6 @@ async def get_vm_state(self, instance: Instance) -> VMState: def machine_type(self, cores: int, worker_type: str, local_ssd: bool) -> str: return azure_worker_properties_to_machine_type(worker_type, cores, local_ssd) - def worker_type_and_cores(self, machine_type: str) -> Tuple[str, int]: - return azure_machine_type_to_worker_type_and_cores(machine_type) - def instance_config( self, machine_type: str, @@ -140,10 +137,12 @@ async def create_vm( machine_type: str, instance_config: InstanceConfig, ) -> List[QuantifiedResource]: - worker_type, cores = self.worker_type_and_cores(machine_type) + parts = azure_machine_type_to_parts(machine_type) + assert parts + cores = parts.cores if local_ssd_data_disk: - assert data_disk_size_gb == azure_local_ssd_size(worker_type, cores) + assert data_disk_size_gb == azure_local_ssd_size(parts.family, parts.cores) max_price: Optional[float] if preemptible: @@ -173,7 +172,7 @@ async def create_vm( self.app['feature_flags'], ) - memory_mib = azure_worker_memory_per_core_mib(worker_type) * cores + memory_mib = azure_worker_memory_per_core_mib(parts.family) * cores memory_in_bytes = memory_mib << 20 cores_mcpu = cores * 1000 total_resources_on_instance = instance_config.quantified_resources( diff --git a/batch/batch/cloud/azure/instance_config.py b/batch/batch/cloud/azure/instance_config.py index fbde785cc98..09e698ce486 100644 --- a/batch/batch/cloud/azure/instance_config.py +++ b/batch/batch/cloud/azure/instance_config.py @@ -4,7 +4,7 @@ from ...driver.billing_manager import ProductVersions from ...instance_config import InstanceConfig -from .resource_utils import azure_machine_type_to_worker_type_and_cores +from .resource_utils import azure_cores_mcpu_to_memory_bytes, azure_machine_type_to_parts from .resources import ( AzureDynamicSizedDiskResource, AzureIPFeeResource, @@ -73,14 +73,17 @@ def __init__( self.boot_disk_size_gb = boot_disk_size_gb self.resources = resources - worker_type, cores = azure_machine_type_to_worker_type_and_cores(self._machine_type) - - self._worker_type = worker_type - self.cores = cores + machine_type_parts = azure_machine_type_to_parts(self._machine_type) + assert machine_type_parts + self._worker_type = machine_type_parts.family + self.cores = machine_type_parts.cores def worker_type(self) -> str: return self._worker_type + def cores_mcpu_to_memory_bytes(self, mcpu: int) -> int: + return azure_cores_mcpu_to_memory_bytes(mcpu, self.worker_type()) + def region_for(self, location: str) -> str: return location diff --git a/batch/batch/cloud/azure/resource_utils.py b/batch/batch/cloud/azure/resource_utils.py index 5a85d86edd3..a72b79e25c9 100644 --- a/batch/batch/cloud/azure/resource_utils.py +++ b/batch/batch/cloud/azure/resource_utils.py @@ -1,4 +1,5 @@ import logging +import math import re from typing import Dict, Optional, Tuple @@ -148,11 +149,13 @@ def azure_machine_type_to_parts(machine_type: str) -> Optional[MachineTypeParts] return MachineTypeParts.from_dict(match.groupdict()) -def azure_machine_type_to_worker_type_and_cores(machine_type: str) -> Tuple[str, int]: +def azure_machine_type_to_cores_and_memory_mib_per_core(machine_type: str) -> Tuple[int, int]: maybe_machine_type_parts = azure_machine_type_to_parts(machine_type) if maybe_machine_type_parts is None: raise ValueError(f'bad machine_type: {machine_type}') - return (maybe_machine_type_parts.family, maybe_machine_type_parts.cores) + cores = maybe_machine_type_parts.cores + memory_per_core = azure_worker_memory_per_core_mib(maybe_machine_type_parts.family) + return cores, memory_per_core def azure_worker_properties_to_machine_type(worker_type: str, cores: int, local_ssd_data_disk: bool) -> str: @@ -204,3 +207,16 @@ def azure_requested_to_actual_storage_bytes(storage_bytes, allow_zero_storage): def azure_is_valid_storage_request(storage_in_gib: int) -> bool: return 10 <= storage_in_gib <= AZURE_MAX_PERSISTENT_SSD_SIZE_GIB + + +def azure_cores_mcpu_to_memory_bytes(mcpu: int, worker_type: str) -> int: + memory_mib = azure_worker_memory_per_core_mib(worker_type) + memory_bytes = int(memory_mib * 1024**2) + return int((mcpu / 1000) * memory_bytes) + + +def azure_adjust_cores_for_memory_request(cores_in_mcpu: int, memory_in_bytes: int, worker_type: str) -> int: + memory_per_core_mib = azure_worker_memory_per_core_mib(worker_type) + memory_per_core_bytes = int(memory_per_core_mib * 1024**2) + min_cores_mcpu = math.ceil((memory_in_bytes / memory_per_core_bytes) * 1000) + return max(cores_in_mcpu, min_cores_mcpu) diff --git a/batch/batch/cloud/gcp/driver/create_instance.py b/batch/batch/cloud/gcp/driver/create_instance.py index 77098374b8b..abaa838bbe9 100644 --- a/batch/batch/cloud/gcp/driver/create_instance.py +++ b/batch/batch/cloud/gcp/driver/create_instance.py @@ -13,7 +13,7 @@ from ....instance_config import InstanceConfig from ...resource_utils import unreserved_worker_data_disk_size_gib from ...utils import ACCEPTABLE_QUERY_JAR_URL_PREFIX -from ..resource_utils import gcp_machine_type_to_worker_type_and_cores, machine_family_to_gpu +from ..resource_utils import gcp_machine_type_to_parts, machine_family_to_gpu log = logging.getLogger('create_instance') @@ -40,7 +40,9 @@ def create_vm_config( project: str, instance_config: InstanceConfig, ) -> dict: - _, cores = gcp_machine_type_to_worker_type_and_cores(machine_type) + parts = gcp_machine_type_to_parts(machine_type) + assert parts + cores = parts.cores region = instance_config.region_for(zone) machine_family = machine_type.split('-')[0] diff --git a/batch/batch/cloud/gcp/driver/resource_manager.py b/batch/batch/cloud/gcp/driver/resource_manager.py index c91a1365f7d..711657d64ba 100644 --- a/batch/batch/cloud/gcp/driver/resource_manager.py +++ b/batch/batch/cloud/gcp/driver/resource_manager.py @@ -1,6 +1,6 @@ import logging import uuid -from typing import List, Tuple +from typing import List import aiohttp @@ -23,7 +23,7 @@ from ..resource_utils import ( GCP_MACHINE_FAMILY, family_worker_type_cores_to_gcp_machine_type, - gcp_machine_type_to_worker_type_and_cores, + gcp_machine_type_to_parts, gcp_worker_memory_per_core_mib, ) from .billing_manager import GCPBillingManager @@ -75,9 +75,6 @@ async def get_vm_state(self, instance: Instance) -> VMState: def machine_type(self, cores: int, worker_type: str, local_ssd: bool) -> str: # pylint: disable=unused-argument return family_worker_type_cores_to_gcp_machine_type(GCP_MACHINE_FAMILY, worker_type, cores) - def worker_type_and_cores(self, machine_type: str) -> Tuple[str, int]: - return gcp_machine_type_to_worker_type_and_cores(machine_type) - def instance_config( self, machine_type: str, @@ -119,7 +116,12 @@ async def create_vm( resource_rates = self.billing_manager.resource_rates - worker_type, cores = self.worker_type_and_cores(machine_type) + machine_type_parts = gcp_machine_type_to_parts(machine_type) + assert machine_type_parts is not None, machine_type + machine_family = machine_type_parts.machine_family + worker_type = machine_type_parts.worker_type + cores = machine_type_parts.cores + vm_config = create_vm_config( file_store, resource_rates, @@ -137,7 +139,7 @@ async def create_vm( instance_config, ) - memory_mib = gcp_worker_memory_per_core_mib(worker_type) * cores + memory_mib = gcp_worker_memory_per_core_mib(machine_family, worker_type) * cores memory_in_bytes = memory_mib << 20 cores_mcpu = cores * 1000 total_resources_on_instance = instance_config.quantified_resources( diff --git a/batch/batch/cloud/gcp/instance_config.py b/batch/batch/cloud/gcp/instance_config.py index 2789ff6adcc..f9ea5156dea 100644 --- a/batch/batch/cloud/gcp/instance_config.py +++ b/batch/batch/cloud/gcp/instance_config.py @@ -2,7 +2,7 @@ from ...driver.billing_manager import ProductVersions from ...instance_config import InstanceConfig -from .resource_utils import gcp_machine_type_to_parts, machine_family_to_gpu +from .resource_utils import gcp_cores_mcpu_to_memory_bytes, gcp_machine_type_to_parts, machine_family_to_gpu from .resources import ( GCPAcceleratorResource, GCPComputeResource, @@ -104,6 +104,9 @@ def __init__( def worker_type(self) -> str: return self._worker_type + def cores_mcpu_to_memory_bytes(self, mcpu: int) -> int: + return gcp_cores_mcpu_to_memory_bytes(mcpu, self._instance_family, self.worker_type()) + def region_for(self, location: str) -> str: # location = zone return region_from_location(location) diff --git a/batch/batch/cloud/gcp/resource_utils.py b/batch/batch/cloud/gcp/resource_utils.py index 49d2ce0439b..b0c91859425 100644 --- a/batch/batch/cloud/gcp/resource_utils.py +++ b/batch/batch/cloud/gcp/resource_utils.py @@ -1,4 +1,5 @@ import logging +import math import re from typing import Optional, Tuple @@ -10,6 +11,13 @@ MACHINE_FAMILY_TO_ACCELERATOR_VERSIONS = {'g2': 'l4'} +MEMORY_PER_CORE_MIB = { + ('n1', 'standard'): 3840, + ('n1', 'highmem'): 6656, + ('n1', 'highcpu'): 924, + ('g2', 'standard'): 4000, +} + gcp_valid_cores_from_worker_type = { 'highcpu': [2, 4, 8, 16, 32, 64, 96], 'standard': [1, 2, 4, 8, 16, 32, 64, 96], @@ -45,27 +53,26 @@ def gcp_machine_type_to_parts(machine_type: str) -> Optional[MachineTypeParts]: return MachineTypeParts.from_dict(match.groupdict()) -def gcp_machine_type_to_worker_type_and_cores(machine_type: str) -> Tuple[str, int]: +def gcp_machine_type_to_cores_and_memory_mib_per_core(machine_type: str) -> Tuple[int, int]: # FIXME: "WORKER TYPE" IS WRONG OR CONFUSING WHEN THE MACHINE TYPE IS NOT n1! maybe_machine_type_parts = gcp_machine_type_to_parts(machine_type) if maybe_machine_type_parts is None: raise ValueError(f'bad machine_type: {machine_type}') - return (maybe_machine_type_parts.worker_type, maybe_machine_type_parts.cores) + cores = maybe_machine_type_parts.cores + memory_per_core = gcp_worker_memory_per_core_mib( + maybe_machine_type_parts.machine_family, maybe_machine_type_parts.worker_type + ) + return cores, memory_per_core def family_worker_type_cores_to_gcp_machine_type(family: str, worker_type: str, cores: int) -> str: return f'{family}-{worker_type}-{cores}' -def gcp_worker_memory_per_core_mib(worker_type: str) -> int: - if worker_type == 'standard': - m = 3840 - elif worker_type == 'highmem': - m = 6656 - else: - assert worker_type == 'highcpu', worker_type - m = 924 # this number must be divisible by 4. I rounded up to the nearest MiB - return m +def gcp_worker_memory_per_core_mib(machine_family: str, worker_type: str) -> int: + machine_worker_key = (machine_family, worker_type) + assert machine_worker_key in MEMORY_PER_CORE_MIB, machine_worker_key + return MEMORY_PER_CORE_MIB[machine_worker_key] def gcp_requested_to_actual_storage_bytes(storage_bytes, allow_zero_storage): @@ -91,3 +98,18 @@ def machine_family_to_gpu(machine_family: str) -> Optional[str]: def is_gpu(machine_family: str) -> bool: return machine_family_to_gpu(machine_family) is not None + + +def gcp_cores_mcpu_to_memory_bytes(mcpu: int, machine_family: str, worker_type: str) -> int: + memory_mib = gcp_worker_memory_per_core_mib(machine_family, worker_type) + memory_bytes = int(memory_mib * 1024**2) + return int((mcpu / 1000) * memory_bytes) + + +def gcp_adjust_cores_for_memory_request( + cores_in_mcpu: int, memory_in_bytes: int, machine_family: str, worker_type: str +) -> int: + memory_per_core_mib = gcp_worker_memory_per_core_mib(machine_family, worker_type) + memory_per_core_bytes = int(memory_per_core_mib * 1024**2) + min_cores_mcpu = math.ceil((memory_in_bytes / memory_per_core_bytes) * 1000) + return max(cores_in_mcpu, min_cores_mcpu) diff --git a/batch/batch/cloud/resource_utils.py b/batch/batch/cloud/resource_utils.py index c10fa985b67..41f1913616f 100644 --- a/batch/batch/cloud/resource_utils.py +++ b/batch/batch/cloud/resource_utils.py @@ -6,22 +6,20 @@ from .azure.resource_utils import ( azure_is_valid_storage_request, azure_local_ssd_size, - azure_machine_type_to_worker_type_and_cores, + azure_machine_type_to_cores_and_memory_mib_per_core, azure_memory_to_worker_type, azure_requested_to_actual_storage_bytes, azure_valid_cores_from_worker_type, azure_valid_machine_types, - azure_worker_memory_per_core_mib, ) from .gcp.resource_utils import ( gcp_is_valid_storage_request, gcp_local_ssd_size, - gcp_machine_type_to_worker_type_and_cores, + gcp_machine_type_to_cores_and_memory_mib_per_core, gcp_memory_to_worker_type, gcp_requested_to_actual_storage_bytes, gcp_valid_cores_from_worker_type, gcp_valid_machine_types, - gcp_worker_memory_per_core_mib, ) log = logging.getLogger('resource_utils') @@ -52,36 +50,15 @@ def memory_to_worker_type(cloud: str) -> Dict[str, str]: return gcp_memory_to_worker_type -def machine_type_to_worker_type_cores(cloud: str, machine_type: str) -> Tuple[str, int]: +def machine_type_to_cores_and_memory_bytes(cloud: str, machine_type: str) -> Tuple[int, int]: if cloud == 'azure': - return azure_machine_type_to_worker_type_and_cores(machine_type) - assert cloud == 'gcp' - return gcp_machine_type_to_worker_type_and_cores(machine_type) - - -def worker_memory_per_core_mib(cloud: str, worker_type: str) -> int: - if cloud == 'azure': - return azure_worker_memory_per_core_mib(worker_type) - assert cloud == 'gcp' - return gcp_worker_memory_per_core_mib(worker_type) - - -def worker_memory_per_core_bytes(cloud: str, worker_type: str) -> int: - m = worker_memory_per_core_mib(cloud, worker_type) - return int(m * 1024**2) - - -def memory_bytes_to_cores_mcpu(cloud: str, memory_in_bytes: int, worker_type: str) -> int: - return math.ceil((memory_in_bytes / worker_memory_per_core_bytes(cloud, worker_type)) * 1000) - - -def cores_mcpu_to_memory_bytes(cloud: str, cores_in_mcpu: int, worker_type: str) -> int: - return int((cores_in_mcpu / 1000) * worker_memory_per_core_bytes(cloud, worker_type)) - - -def adjust_cores_for_memory_request(cloud: str, cores_in_mcpu: int, memory_in_bytes: int, worker_type: str) -> int: - min_cores_mcpu = memory_bytes_to_cores_mcpu(cloud, memory_in_bytes, worker_type) - return max(cores_in_mcpu, min_cores_mcpu) + cores, memory_mib_per_core = azure_machine_type_to_cores_and_memory_mib_per_core(machine_type) + else: + assert cloud == 'gcp' + cores, memory_mib_per_core = gcp_machine_type_to_cores_and_memory_mib_per_core(machine_type) + memory_bytes_per_core = memory_mib_per_core * 1024**2 + memory_bytes = cores * memory_bytes_per_core + return cores, memory_bytes def unreserved_worker_data_disk_size_gib(data_disk_size_gib: int, cores: int) -> int: diff --git a/batch/batch/cloud/terra/azure/driver/driver.py b/batch/batch/cloud/terra/azure/driver/driver.py index fb683129b75..2898c4a9fa5 100644 --- a/batch/batch/cloud/terra/azure/driver/driver.py +++ b/batch/batch/cloud/terra/azure/driver/driver.py @@ -5,7 +5,7 @@ import os import uuid from shlex import quote as shq -from typing import List, Tuple +from typing import List import aiohttp @@ -37,7 +37,7 @@ from .....instance_config import InstanceConfig, QuantifiedResource from ....azure.driver.billing_manager import AzureBillingManager from ....azure.resource_utils import ( - azure_machine_type_to_worker_type_and_cores, + azure_machine_type_to_parts, azure_worker_memory_per_core_mib, azure_worker_properties_to_machine_type, ) @@ -346,9 +346,6 @@ async def get_vm_state(self, instance: Instance) -> VMState: def machine_type(self, cores: int, worker_type: str, local_ssd: bool) -> str: return azure_worker_properties_to_machine_type(worker_type, cores, local_ssd) - def worker_type_and_cores(self, machine_type: str) -> Tuple[str, int]: - return azure_machine_type_to_worker_type_and_cores(machine_type) - def instance_config( self, machine_type: str, @@ -389,9 +386,11 @@ async def create_vm( instance_config: InstanceConfig, ) -> List[QuantifiedResource]: assert isinstance(instance_config, TerraAzureSlimInstanceConfig) - worker_type, cores = self.worker_type_and_cores(machine_type) + parts = azure_machine_type_to_parts(machine_type) + assert parts + cores = parts.cores - memory_mib = azure_worker_memory_per_core_mib(worker_type) * cores + memory_mib = azure_worker_memory_per_core_mib(parts.family) * cores memory_in_bytes = memory_mib << 20 cores_mcpu = cores * 1000 total_resources_on_instance = instance_config.quantified_resources( diff --git a/batch/batch/driver/instance_collection/job_private.py b/batch/batch/driver/instance_collection/job_private.py index 0d4d336c92b..c4907ec590b 100644 --- a/batch/batch/driver/instance_collection/job_private.py +++ b/batch/batch/driver/instance_collection/job_private.py @@ -21,6 +21,7 @@ ) from ...batch_format_version import BatchFormatVersion +from ...cloud.resource_utils import machine_type_to_cores_and_memory_bytes from ...inst_coll_config import JobPrivateInstanceManagerConfig from ...instance_config import QuantifiedResource from ...utils import Box, ExceededSharesCounter, regions_bits_rep_to_regions @@ -311,7 +312,7 @@ async def create_instance( machine_type = machine_spec['machine_type'] preemptible = machine_spec['preemptible'] storage_gb = machine_spec['storage_gib'] - _, cores = self.resource_manager.worker_type_and_cores(machine_type) + cores, _ = machine_type_to_cores_and_memory_bytes(self.cloud, machine_type) instance, total_resources_on_instance = await self._create_instance( app=self.app, cores=cores, diff --git a/batch/batch/driver/resource_manager.py b/batch/batch/driver/resource_manager.py index 38c6c2bbe95..1b6591b6940 100644 --- a/batch/batch/driver/resource_manager.py +++ b/batch/batch/driver/resource_manager.py @@ -1,6 +1,6 @@ import abc import logging -from typing import Any, List, Tuple +from typing import Any, List from hailtop.utils import time_msecs @@ -67,10 +67,6 @@ class CloudResourceManager: def machine_type(self, cores: int, worker_type: str, local_ssd: bool) -> str: raise NotImplementedError - @abc.abstractmethod - def worker_type_and_cores(self, machine_type: str) -> Tuple[str, int]: - raise NotImplementedError - @abc.abstractmethod def instance_config( self, diff --git a/batch/batch/front_end/front_end.py b/batch/batch/front_end/front_end.py index ceb1ff1ff53..d7565c24c03 100644 --- a/batch/batch/front_end/front_end.py +++ b/batch/batch/front_end/front_end.py @@ -73,8 +73,9 @@ from ..batch import batch_record_to_dict, cancel_job_group_in_db, job_group_record_to_dict, job_record_to_dict from ..batch_configuration import BATCH_STORAGE_URI, CLOUD, DEFAULT_NAMESPACE, SCOPE from ..batch_format_version import BatchFormatVersion +from ..cloud.azure.resource_utils import azure_cores_mcpu_to_memory_bytes +from ..cloud.gcp.resource_utils import GCP_MACHINE_FAMILY, gcp_cores_mcpu_to_memory_bytes from ..cloud.resource_utils import ( - cores_mcpu_to_memory_bytes, is_valid_cores_mcpu, memory_to_worker_type, valid_machine_types, @@ -1185,7 +1186,11 @@ async def _create_jobs( memory_to_worker_types = memory_to_worker_type(cloud) if req_memory in memory_to_worker_types: worker_type = memory_to_worker_types[req_memory] - req_memory_bytes = cores_mcpu_to_memory_bytes(cloud, req_cores_mcpu, worker_type) + if CLOUD == 'gcp': + req_memory_bytes = gcp_cores_mcpu_to_memory_bytes(req_cores_mcpu, GCP_MACHINE_FAMILY, worker_type) + else: + assert CLOUD == 'azure' + req_memory_bytes = azure_cores_mcpu_to_memory_bytes(req_cores_mcpu, worker_type) else: req_memory_bytes = parse_memory_in_bytes(req_memory) else: diff --git a/batch/batch/inst_coll_config.py b/batch/batch/inst_coll_config.py index 45ed33dc3d8..4e408a7c599 100644 --- a/batch/batch/inst_coll_config.py +++ b/batch/batch/inst_coll_config.py @@ -5,15 +5,22 @@ from gear import Database from .cloud.azure.instance_config import AzureSlimInstanceConfig -from .cloud.azure.resource_utils import azure_worker_properties_to_machine_type +from .cloud.azure.resource_utils import ( + azure_adjust_cores_for_memory_request, + azure_cores_mcpu_to_memory_bytes, + azure_worker_properties_to_machine_type, +) from .cloud.gcp.instance_config import GCPSlimInstanceConfig -from .cloud.gcp.resource_utils import GCP_MACHINE_FAMILY, family_worker_type_cores_to_gcp_machine_type +from .cloud.gcp.resource_utils import ( + GCP_MACHINE_FAMILY, + family_worker_type_cores_to_gcp_machine_type, + gcp_adjust_cores_for_memory_request, + gcp_cores_mcpu_to_memory_bytes, +) from .cloud.resource_utils import ( - adjust_cores_for_memory_request, adjust_cores_for_packability, - cores_mcpu_to_memory_bytes, local_ssd_size, - machine_type_to_worker_type_cores, + machine_type_to_cores_and_memory_bytes, requested_storage_bytes_to_actual_storage_gib, valid_machine_types, ) @@ -187,10 +194,17 @@ def convert_requests_to_resources(self, cores_mcpu, memory_bytes, storage_bytes) if storage_gib is None: return None - cores_mcpu = adjust_cores_for_memory_request(self.cloud, cores_mcpu, memory_bytes, self.worker_type) - cores_mcpu = adjust_cores_for_packability(cores_mcpu) - - memory_bytes = cores_mcpu_to_memory_bytes(self.cloud, cores_mcpu, self.worker_type) + if self.cloud == 'gcp': + cores_mcpu = gcp_adjust_cores_for_memory_request( + cores_mcpu, memory_bytes, GCP_MACHINE_FAMILY, self.worker_type + ) + cores_mcpu = adjust_cores_for_packability(cores_mcpu) + memory_bytes = gcp_cores_mcpu_to_memory_bytes(cores_mcpu, GCP_MACHINE_FAMILY, self.worker_type) + else: + assert self.cloud == 'azure' + cores_mcpu = azure_adjust_cores_for_memory_request(cores_mcpu, memory_bytes, self.worker_type) + cores_mcpu = adjust_cores_for_packability(cores_mcpu) + memory_bytes = azure_cores_mcpu_to_memory_bytes(cores_mcpu, self.worker_type) if cores_mcpu <= self.worker_cores * 1000: return (cores_mcpu, memory_bytes, storage_gib) @@ -243,9 +257,8 @@ def convert_requests_to_resources(self, machine_type, storage_bytes): if storage_gib is None: return None - worker_type, cores = machine_type_to_worker_type_cores(self.cloud, machine_type) + cores, memory_bytes = machine_type_to_cores_and_memory_bytes(self.cloud, machine_type) cores_mcpu = cores * 1000 - memory_bytes = cores_mcpu_to_memory_bytes(self.cloud, cores_mcpu, worker_type) return (self.name, cores_mcpu, memory_bytes, storage_gib) diff --git a/batch/batch/instance_config.py b/batch/batch/instance_config.py index fdbfd8dcafd..308aa5c9f01 100644 --- a/batch/batch/instance_config.py +++ b/batch/batch/instance_config.py @@ -1,7 +1,6 @@ import abc from typing import Dict, List, Sequence -from .cloud.resource_utils import cores_mcpu_to_memory_bytes from .driver.billing_manager import ProductVersions from .resources import QuantifiedResource, Resource @@ -44,6 +43,10 @@ def to_dict(self) -> dict: def region_for(self, location: str) -> str: raise NotImplementedError + @abc.abstractmethod + def cores_mcpu_to_memory_bytes(self, mcpu: int) -> int: + raise NotImplementedError + def quantified_resources( self, cpu_in_mcpu: int, @@ -102,13 +105,13 @@ def cost_per_hour_from_cores( utilized_cores_mcpu: int, ) -> float: assert 0 <= utilized_cores_mcpu <= self.cores * 1000 - memory_in_bytes = cores_mcpu_to_memory_bytes(self.cloud, utilized_cores_mcpu, self.worker_type()) + memory_in_bytes = self.cores_mcpu_to_memory_bytes(utilized_cores_mcpu) storage_in_gb = 0 # we don't need to account for external storage return self.cost_per_hour(resource_rates, utilized_cores_mcpu, memory_in_bytes, storage_in_gb) def actual_cost_per_hour(self, resource_rates: Dict[str, float]) -> float: cpu_in_mcpu = self.cores * 1000 - memory_in_bytes = cores_mcpu_to_memory_bytes(self.cloud, cpu_in_mcpu, self.worker_type()) + memory_in_bytes = self.cores_mcpu_to_memory_bytes(cpu_in_mcpu) storage_in_gb = 0 # we don't need to account for external storage resources = self.quantified_resources(cpu_in_mcpu, memory_in_bytes, storage_in_gb) resources = [r for r in resources if 'service-fee' not in r['name']] diff --git a/batch/batch/worker/worker.py b/batch/batch/worker/worker.py index cc97232c21c..621abca6462 100644 --- a/batch/batch/worker/worker.py +++ b/batch/batch/worker/worker.py @@ -74,9 +74,8 @@ from ..cloud.gcp.worker.worker_api import GCPWorkerAPI from ..cloud.resource_utils import ( is_valid_storage_request, + machine_type_to_cores_and_memory_bytes, storage_gib_to_bytes, - worker_memory_per_core_bytes, - worker_memory_per_core_mib, ) from ..file_store import FileStore from ..globals import HTTP_CLIENT_MAX_SIZE, RESERVED_STORAGE_GB_PER_CORE, STATUS_FORMAT_VERSION @@ -2483,11 +2482,14 @@ async def create_and_start( assert os.path.isdir(root_dir) assert instance_config - total_memory_bytes = n_cores * worker_memory_per_core_bytes(CLOUD, instance_config.worker_type()) + total_machine_cores, total_machine_memory_bytes = machine_type_to_cores_and_memory_bytes( + CLOUD, INSTANCE_CONFIG["machine_type"] + ) + total_memory_bytes = int((n_cores / total_machine_cores) * total_machine_memory_bytes) # We allocate 60% of memory per core to off heap memory - memory_per_core_mib = worker_memory_per_core_mib(CLOUD, instance_config.worker_type()) - memory_mib = n_cores * memory_per_core_mib + + memory_mib = total_memory_bytes // (1024**2) heap_memory_mib = int(0.4 * memory_mib) off_heap_memory_per_core_mib = memory_mib - heap_memory_mib diff --git a/batch/test/test_utils.py b/batch/test/test_utils.py index e381fba9848..7533d4ca105 100644 --- a/batch/test/test_utils.py +++ b/batch/test/test_utils.py @@ -1,3 +1,6 @@ +import pytest + +from batch.cloud.gcp.resource_utils import gcp_worker_memory_per_core_mib from batch.cloud.resource_utils import adjust_cores_for_packability from hailtop.batch_client.parse import parse_memory_in_bytes @@ -23,3 +26,12 @@ def test_memory_str_to_bytes(): assert parse_memory_in_bytes('7') == 7 assert parse_memory_in_bytes('1K') == 1000 assert parse_memory_in_bytes('1Ki') == 1024 + + +def test_gcp_worker_memory_per_core_mib(): + with pytest.raises(AssertionError): + assert gcp_worker_memory_per_core_mib('n2', 'standard') + assert gcp_worker_memory_per_core_mib('n1', 'standard') == 3840 + assert gcp_worker_memory_per_core_mib('n1', 'highmem') == 6656 + assert gcp_worker_memory_per_core_mib('n1', 'highcpu') == 924 + assert gcp_worker_memory_per_core_mib('g2', 'standard') == 4000