From 5e55aacbddc0248f7f45511cfb19f35e15da1533 Mon Sep 17 00:00:00 2001 From: Daniel Goldstein Date: Mon, 29 Apr 2024 18:40:08 -0400 Subject: [PATCH 01/12] [batch] Get rid of machine_type_to_worker_type_cores --- batch/batch/cloud/azure/driver/create_instance.py | 6 ++++-- .../batch/cloud/azure/driver/resource_manager.py | 15 +++++++-------- batch/batch/cloud/azure/instance_config.py | 10 +++++----- batch/batch/cloud/azure/resource_utils.py | 6 ++++-- batch/batch/cloud/gcp/driver/create_instance.py | 6 ++++-- batch/batch/cloud/gcp/driver/resource_manager.py | 14 ++++++-------- batch/batch/cloud/gcp/resource_utils.py | 6 ++++-- batch/batch/cloud/resource_utils.py | 10 +++++----- batch/batch/cloud/terra/azure/driver/driver.py | 13 ++++++------- .../driver/instance_collection/job_private.py | 3 ++- batch/batch/driver/resource_manager.py | 6 +----- batch/batch/inst_coll_config.py | 7 +++---- 12 files changed, 51 insertions(+), 51 deletions(-) diff --git a/batch/batch/cloud/azure/driver/create_instance.py b/batch/batch/cloud/azure/driver/create_instance.py index 122386a6b72..754185d4117 100644 --- a/batch/batch/cloud/azure/driver/create_instance.py +++ b/batch/batch/cloud/azure/driver/create_instance.py @@ -13,7 +13,7 @@ from ....instance_config import InstanceConfig from ...resource_utils import unreserved_worker_data_disk_size_gib from ...utils import ACCEPTABLE_QUERY_JAR_URL_PREFIX -from ..resource_utils import azure_machine_type_to_worker_type_and_cores +from ..resource_utils import azure_machine_type_to_parts log = logging.getLogger('create_instance') @@ -43,7 +43,9 @@ def create_vm_config( instance_config: InstanceConfig, feature_flags: dict, ) -> dict: - _, cores = azure_machine_type_to_worker_type_and_cores(machine_type) + parts = azure_machine_type_to_parts(machine_type) + assert parts + cores = parts.cores hail_azure_oauth_scope = os.environ['HAIL_AZURE_OAUTH_SCOPE'] region = instance_config.region_for(location) diff --git a/batch/batch/cloud/azure/driver/resource_manager.py b/batch/batch/cloud/azure/driver/resource_manager.py index ae61e52210b..1744e10e16b 100644 --- a/batch/batch/cloud/azure/driver/resource_manager.py +++ b/batch/batch/cloud/azure/driver/resource_manager.py @@ -1,5 +1,5 @@ import logging -from typing import List, Optional, Tuple +from typing import List, Optional import aiohttp @@ -21,7 +21,7 @@ from ..instance_config import AzureSlimInstanceConfig from ..resource_utils import ( azure_local_ssd_size, - azure_machine_type_to_worker_type_and_cores, + azure_machine_type_to_parts, azure_worker_memory_per_core_mib, azure_worker_properties_to_machine_type, ) @@ -101,9 +101,6 @@ async def get_vm_state(self, instance: Instance) -> VMState: def machine_type(self, cores: int, worker_type: str, local_ssd: bool) -> str: return azure_worker_properties_to_machine_type(worker_type, cores, local_ssd) - def worker_type_and_cores(self, machine_type: str) -> Tuple[str, int]: - return azure_machine_type_to_worker_type_and_cores(machine_type) - def instance_config( self, machine_type: str, @@ -140,10 +137,12 @@ async def create_vm( machine_type: str, instance_config: InstanceConfig, ) -> List[QuantifiedResource]: - worker_type, cores = self.worker_type_and_cores(machine_type) + parts = azure_machine_type_to_parts(machine_type) + assert parts + cores = parts.cores if local_ssd_data_disk: - assert data_disk_size_gb == azure_local_ssd_size(worker_type, cores) + assert data_disk_size_gb == azure_local_ssd_size(parts.family, parts.cores) max_price: Optional[float] if preemptible: @@ -173,7 +172,7 @@ async def create_vm( self.app['feature_flags'], ) - memory_mib = azure_worker_memory_per_core_mib(worker_type) * cores + memory_mib = azure_worker_memory_per_core_mib(parts.family) * cores memory_in_bytes = memory_mib << 20 cores_mcpu = cores * 1000 total_resources_on_instance = instance_config.quantified_resources( diff --git a/batch/batch/cloud/azure/instance_config.py b/batch/batch/cloud/azure/instance_config.py index fbde785cc98..13bb5d26616 100644 --- a/batch/batch/cloud/azure/instance_config.py +++ b/batch/batch/cloud/azure/instance_config.py @@ -4,7 +4,7 @@ from ...driver.billing_manager import ProductVersions from ...instance_config import InstanceConfig -from .resource_utils import azure_machine_type_to_worker_type_and_cores +from .resource_utils import azure_machine_type_to_parts from .resources import ( AzureDynamicSizedDiskResource, AzureIPFeeResource, @@ -73,10 +73,10 @@ def __init__( self.boot_disk_size_gb = boot_disk_size_gb self.resources = resources - worker_type, cores = azure_machine_type_to_worker_type_and_cores(self._machine_type) - - self._worker_type = worker_type - self.cores = cores + machine_type_parts = azure_machine_type_to_parts(self._machine_type) + assert machine_type_parts + self._worker_type = machine_type_parts.family + self.cores = machine_type_parts.cores def worker_type(self) -> str: return self._worker_type diff --git a/batch/batch/cloud/azure/resource_utils.py b/batch/batch/cloud/azure/resource_utils.py index 5a85d86edd3..36a83b32153 100644 --- a/batch/batch/cloud/azure/resource_utils.py +++ b/batch/batch/cloud/azure/resource_utils.py @@ -148,11 +148,13 @@ def azure_machine_type_to_parts(machine_type: str) -> Optional[MachineTypeParts] return MachineTypeParts.from_dict(match.groupdict()) -def azure_machine_type_to_worker_type_and_cores(machine_type: str) -> Tuple[str, int]: +def azure_machine_type_to_cores_and_memory_mib_per_core(machine_type: str) -> Tuple[int, int]: maybe_machine_type_parts = azure_machine_type_to_parts(machine_type) if maybe_machine_type_parts is None: raise ValueError(f'bad machine_type: {machine_type}') - return (maybe_machine_type_parts.family, maybe_machine_type_parts.cores) + cores = maybe_machine_type_parts.cores + memory_per_core = azure_worker_memory_per_core_mib(maybe_machine_type_parts.family) + return cores, memory_per_core def azure_worker_properties_to_machine_type(worker_type: str, cores: int, local_ssd_data_disk: bool) -> str: diff --git a/batch/batch/cloud/gcp/driver/create_instance.py b/batch/batch/cloud/gcp/driver/create_instance.py index 77098374b8b..abaa838bbe9 100644 --- a/batch/batch/cloud/gcp/driver/create_instance.py +++ b/batch/batch/cloud/gcp/driver/create_instance.py @@ -13,7 +13,7 @@ from ....instance_config import InstanceConfig from ...resource_utils import unreserved_worker_data_disk_size_gib from ...utils import ACCEPTABLE_QUERY_JAR_URL_PREFIX -from ..resource_utils import gcp_machine_type_to_worker_type_and_cores, machine_family_to_gpu +from ..resource_utils import gcp_machine_type_to_parts, machine_family_to_gpu log = logging.getLogger('create_instance') @@ -40,7 +40,9 @@ def create_vm_config( project: str, instance_config: InstanceConfig, ) -> dict: - _, cores = gcp_machine_type_to_worker_type_and_cores(machine_type) + parts = gcp_machine_type_to_parts(machine_type) + assert parts + cores = parts.cores region = instance_config.region_for(zone) machine_family = machine_type.split('-')[0] diff --git a/batch/batch/cloud/gcp/driver/resource_manager.py b/batch/batch/cloud/gcp/driver/resource_manager.py index c91a1365f7d..7bd7644d9cb 100644 --- a/batch/batch/cloud/gcp/driver/resource_manager.py +++ b/batch/batch/cloud/gcp/driver/resource_manager.py @@ -1,6 +1,6 @@ import logging import uuid -from typing import List, Tuple +from typing import List import aiohttp @@ -23,7 +23,7 @@ from ..resource_utils import ( GCP_MACHINE_FAMILY, family_worker_type_cores_to_gcp_machine_type, - gcp_machine_type_to_worker_type_and_cores, + gcp_machine_type_to_parts, gcp_worker_memory_per_core_mib, ) from .billing_manager import GCPBillingManager @@ -75,9 +75,6 @@ async def get_vm_state(self, instance: Instance) -> VMState: def machine_type(self, cores: int, worker_type: str, local_ssd: bool) -> str: # pylint: disable=unused-argument return family_worker_type_cores_to_gcp_machine_type(GCP_MACHINE_FAMILY, worker_type, cores) - def worker_type_and_cores(self, machine_type: str) -> Tuple[str, int]: - return gcp_machine_type_to_worker_type_and_cores(machine_type) - def instance_config( self, machine_type: str, @@ -119,7 +116,8 @@ async def create_vm( resource_rates = self.billing_manager.resource_rates - worker_type, cores = self.worker_type_and_cores(machine_type) + parts = gcp_machine_type_to_parts(machine_type) + assert parts vm_config = create_vm_config( file_store, resource_rates, @@ -137,9 +135,9 @@ async def create_vm( instance_config, ) - memory_mib = gcp_worker_memory_per_core_mib(worker_type) * cores + memory_mib = gcp_worker_memory_per_core_mib(parts.worker_type) * parts.cores memory_in_bytes = memory_mib << 20 - cores_mcpu = cores * 1000 + cores_mcpu = parts.cores * 1000 total_resources_on_instance = instance_config.quantified_resources( cpu_in_mcpu=cores_mcpu, memory_in_bytes=memory_in_bytes, extra_storage_in_gib=0 ) diff --git a/batch/batch/cloud/gcp/resource_utils.py b/batch/batch/cloud/gcp/resource_utils.py index 7d0d8aeda6b..18a30822d66 100644 --- a/batch/batch/cloud/gcp/resource_utils.py +++ b/batch/batch/cloud/gcp/resource_utils.py @@ -45,12 +45,14 @@ def gcp_machine_type_to_parts(machine_type: str) -> Optional[MachineTypeParts]: return MachineTypeParts.from_dict(match.groupdict()) -def gcp_machine_type_to_worker_type_and_cores(machine_type: str) -> Tuple[str, int]: +def gcp_machine_type_to_cores_and_memory_mib_per_core(machine_type: str) -> Tuple[int, int]: # FIXME: "WORKER TYPE" IS WRONG OR CONFUSING WHEN THE MACHINE TYPE IS NOT n1! maybe_machine_type_parts = gcp_machine_type_to_parts(machine_type) if maybe_machine_type_parts is None: raise ValueError(f'bad machine_type: {machine_type}') - return (maybe_machine_type_parts.worker_type, maybe_machine_type_parts.cores) + cores = maybe_machine_type_parts.cores + memory_per_core = gcp_worker_memory_per_core_mib(maybe_machine_type_parts.worker_type) + return cores, memory_per_core def family_worker_type_cores_to_gcp_machine_type(family: str, worker_type: str, cores: int) -> str: diff --git a/batch/batch/cloud/resource_utils.py b/batch/batch/cloud/resource_utils.py index 4c699da5036..1c87aec5222 100644 --- a/batch/batch/cloud/resource_utils.py +++ b/batch/batch/cloud/resource_utils.py @@ -6,7 +6,7 @@ from .azure.resource_utils import ( azure_is_valid_storage_request, azure_local_ssd_size, - azure_machine_type_to_worker_type_and_cores, + azure_machine_type_to_cores_and_memory_mib_per_core, azure_memory_to_worker_type, azure_requested_to_actual_storage_bytes, azure_valid_cores_from_worker_type, @@ -17,7 +17,7 @@ gcp_cost_from_msec_mcpu, gcp_is_valid_storage_request, gcp_local_ssd_size, - gcp_machine_type_to_worker_type_and_cores, + gcp_machine_type_to_cores_and_memory_mib_per_core, gcp_memory_to_worker_type, gcp_requested_to_actual_storage_bytes, gcp_valid_cores_from_worker_type, @@ -53,11 +53,11 @@ def memory_to_worker_type(cloud: str) -> Dict[str, str]: return gcp_memory_to_worker_type -def machine_type_to_worker_type_cores(cloud: str, machine_type: str) -> Tuple[str, int]: +def machine_type_to_cores_and_memory_mib_per_core(cloud: str, machine_type: str) -> Tuple[int, int]: if cloud == 'azure': - return azure_machine_type_to_worker_type_and_cores(machine_type) + return azure_machine_type_to_cores_and_memory_mib_per_core(machine_type) assert cloud == 'gcp' - return gcp_machine_type_to_worker_type_and_cores(machine_type) + return gcp_machine_type_to_cores_and_memory_mib_per_core(machine_type) def cost_from_msec_mcpu(msec_mcpu: int) -> Optional[float]: diff --git a/batch/batch/cloud/terra/azure/driver/driver.py b/batch/batch/cloud/terra/azure/driver/driver.py index fb683129b75..2898c4a9fa5 100644 --- a/batch/batch/cloud/terra/azure/driver/driver.py +++ b/batch/batch/cloud/terra/azure/driver/driver.py @@ -5,7 +5,7 @@ import os import uuid from shlex import quote as shq -from typing import List, Tuple +from typing import List import aiohttp @@ -37,7 +37,7 @@ from .....instance_config import InstanceConfig, QuantifiedResource from ....azure.driver.billing_manager import AzureBillingManager from ....azure.resource_utils import ( - azure_machine_type_to_worker_type_and_cores, + azure_machine_type_to_parts, azure_worker_memory_per_core_mib, azure_worker_properties_to_machine_type, ) @@ -346,9 +346,6 @@ async def get_vm_state(self, instance: Instance) -> VMState: def machine_type(self, cores: int, worker_type: str, local_ssd: bool) -> str: return azure_worker_properties_to_machine_type(worker_type, cores, local_ssd) - def worker_type_and_cores(self, machine_type: str) -> Tuple[str, int]: - return azure_machine_type_to_worker_type_and_cores(machine_type) - def instance_config( self, machine_type: str, @@ -389,9 +386,11 @@ async def create_vm( instance_config: InstanceConfig, ) -> List[QuantifiedResource]: assert isinstance(instance_config, TerraAzureSlimInstanceConfig) - worker_type, cores = self.worker_type_and_cores(machine_type) + parts = azure_machine_type_to_parts(machine_type) + assert parts + cores = parts.cores - memory_mib = azure_worker_memory_per_core_mib(worker_type) * cores + memory_mib = azure_worker_memory_per_core_mib(parts.family) * cores memory_in_bytes = memory_mib << 20 cores_mcpu = cores * 1000 total_resources_on_instance = instance_config.quantified_resources( diff --git a/batch/batch/driver/instance_collection/job_private.py b/batch/batch/driver/instance_collection/job_private.py index 0d4d336c92b..d7de236fe79 100644 --- a/batch/batch/driver/instance_collection/job_private.py +++ b/batch/batch/driver/instance_collection/job_private.py @@ -21,6 +21,7 @@ ) from ...batch_format_version import BatchFormatVersion +from ...cloud.resource_utils import machine_type_to_cores_and_memory_mib_per_core from ...inst_coll_config import JobPrivateInstanceManagerConfig from ...instance_config import QuantifiedResource from ...utils import Box, ExceededSharesCounter, regions_bits_rep_to_regions @@ -311,7 +312,7 @@ async def create_instance( machine_type = machine_spec['machine_type'] preemptible = machine_spec['preemptible'] storage_gb = machine_spec['storage_gib'] - _, cores = self.resource_manager.worker_type_and_cores(machine_type) + cores, _ = machine_type_to_cores_and_memory_mib_per_core(self.cloud, machine_type) instance, total_resources_on_instance = await self._create_instance( app=self.app, cores=cores, diff --git a/batch/batch/driver/resource_manager.py b/batch/batch/driver/resource_manager.py index 38c6c2bbe95..1b6591b6940 100644 --- a/batch/batch/driver/resource_manager.py +++ b/batch/batch/driver/resource_manager.py @@ -1,6 +1,6 @@ import abc import logging -from typing import Any, List, Tuple +from typing import Any, List from hailtop.utils import time_msecs @@ -67,10 +67,6 @@ class CloudResourceManager: def machine_type(self, cores: int, worker_type: str, local_ssd: bool) -> str: raise NotImplementedError - @abc.abstractmethod - def worker_type_and_cores(self, machine_type: str) -> Tuple[str, int]: - raise NotImplementedError - @abc.abstractmethod def instance_config( self, diff --git a/batch/batch/inst_coll_config.py b/batch/batch/inst_coll_config.py index 45ed33dc3d8..bac3d7399c1 100644 --- a/batch/batch/inst_coll_config.py +++ b/batch/batch/inst_coll_config.py @@ -13,7 +13,7 @@ adjust_cores_for_packability, cores_mcpu_to_memory_bytes, local_ssd_size, - machine_type_to_worker_type_cores, + machine_type_to_cores_and_memory_mib_per_core, requested_storage_bytes_to_actual_storage_gib, valid_machine_types, ) @@ -243,11 +243,10 @@ def convert_requests_to_resources(self, machine_type, storage_bytes): if storage_gib is None: return None - worker_type, cores = machine_type_to_worker_type_cores(self.cloud, machine_type) + cores, memory_mib_per_core = machine_type_to_cores_and_memory_mib_per_core(self.cloud, machine_type) cores_mcpu = cores * 1000 - memory_bytes = cores_mcpu_to_memory_bytes(self.cloud, cores_mcpu, worker_type) - return (self.name, cores_mcpu, memory_bytes, storage_gib) + return (self.name, cores_mcpu, memory_mib_per_core * 1024, storage_gib) class InstanceCollectionConfigs: From 9eefae757a9d9b83bd4c5e62a447a2046834ceac Mon Sep 17 00:00:00 2001 From: Sophie Parsa Date: Mon, 22 Apr 2024 13:44:37 -0400 Subject: [PATCH 02/12] [batch] fix g2 machine memory calculation --- batch/batch/cloud/azure/instance_config.py | 5 +++++ .../cloud/gcp/driver/resource_manager.py | 13 +++++++++++ batch/batch/cloud/gcp/instance_config.py | 4 ++++ batch/batch/cloud/gcp/resource_utils.py | 19 +++++++++------- batch/batch/cloud/resource_utils.py | 22 ++++++++++--------- batch/batch/front_end/front_end.py | 2 +- batch/batch/inst_coll_config.py | 14 ++++++++---- batch/batch/instance_config.py | 6 +++-- batch/batch/worker/worker.py | 18 ++++++++++++--- batch/test/test_utils.py | 12 +++++++++- 10 files changed, 86 insertions(+), 29 deletions(-) diff --git a/batch/batch/cloud/azure/instance_config.py b/batch/batch/cloud/azure/instance_config.py index 13bb5d26616..5a3d112a477 100644 --- a/batch/batch/cloud/azure/instance_config.py +++ b/batch/batch/cloud/azure/instance_config.py @@ -81,6 +81,11 @@ def __init__( def worker_type(self) -> str: return self._worker_type + # def machine_family(self) -> str: + # machine_type_parts = azure_machine_type_to_parts(self._machine_type) + # assert machine_type_parts is not None, self._machine_type + # return machine_type_parts.machine_family + def region_for(self, location: str) -> str: return location diff --git a/batch/batch/cloud/gcp/driver/resource_manager.py b/batch/batch/cloud/gcp/driver/resource_manager.py index 7bd7644d9cb..d038071b92d 100644 --- a/batch/batch/cloud/gcp/driver/resource_manager.py +++ b/batch/batch/cloud/gcp/driver/resource_manager.py @@ -116,8 +116,17 @@ async def create_vm( resource_rates = self.billing_manager.resource_rates +<<<<<<< HEAD parts = gcp_machine_type_to_parts(machine_type) assert parts +======= + machine_type_parts = gcp_machine_type_to_parts(machine_type) + assert machine_type_parts is not None, machine_type + machine_family = machine_type_parts.machine_family + worker_type = machine_type_parts.worker_type + cores = machine_type_parts.cores + +>>>>>>> c73cfc25c (code refactor) vm_config = create_vm_config( file_store, resource_rates, @@ -135,7 +144,11 @@ async def create_vm( instance_config, ) +<<<<<<< HEAD memory_mib = gcp_worker_memory_per_core_mib(parts.worker_type) * parts.cores +======= + memory_mib = gcp_worker_memory_per_core_mib(machine_family, worker_type) * cores +>>>>>>> c73cfc25c (code refactor) memory_in_bytes = memory_mib << 20 cores_mcpu = parts.cores * 1000 total_resources_on_instance = instance_config.quantified_resources( diff --git a/batch/batch/cloud/gcp/instance_config.py b/batch/batch/cloud/gcp/instance_config.py index 2789ff6adcc..eb4ed594127 100644 --- a/batch/batch/cloud/gcp/instance_config.py +++ b/batch/batch/cloud/gcp/instance_config.py @@ -104,6 +104,10 @@ def __init__( def worker_type(self) -> str: return self._worker_type + def machine_family(self) -> str: + assert self._instance_family is not None, self._instance_family + return self._instance_family + def region_for(self, location: str) -> str: # location = zone return region_from_location(location) diff --git a/batch/batch/cloud/gcp/resource_utils.py b/batch/batch/cloud/gcp/resource_utils.py index 18a30822d66..aa400c0048e 100644 --- a/batch/batch/cloud/gcp/resource_utils.py +++ b/batch/batch/cloud/gcp/resource_utils.py @@ -10,6 +10,13 @@ MACHINE_FAMILY_TO_ACCELERATOR_VERSIONS = {'g2': 'l4'} +MEMORY_PER_CORE_MIB = { + ('n1', 'standard'): 3840, + ('n1', 'highmem'): 6656, + ('n1', 'highcpu'): 924, + ('g2', 'standard'): 4000, +} + gcp_valid_cores_from_worker_type = { 'highcpu': [2, 4, 8, 16, 32, 64, 96], 'standard': [1, 2, 4, 8, 16, 32, 64, 96], @@ -97,14 +104,10 @@ def gcp_cost_from_msec_mcpu(msec_mcpu: int) -> float: return (msec_mcpu * 0.001 * 0.001) * (total_cost_per_core_hour / 3600) -def gcp_worker_memory_per_core_mib(worker_type: str) -> int: - if worker_type == 'standard': - m = 3840 - elif worker_type == 'highmem': - m = 6656 - else: - assert worker_type == 'highcpu', worker_type - m = 924 # this number must be divisible by 4. I rounded up to the nearest MiB +def gcp_worker_memory_per_core_mib(machine_family: str, worker_type: str) -> int: + machine_worker_key = (machine_family, worker_type) + assert machine_worker_key in MEMORY_PER_CORE_MIB, machine_worker_key + m = MEMORY_PER_CORE_MIB[machine_worker_key] return m diff --git a/batch/batch/cloud/resource_utils.py b/batch/batch/cloud/resource_utils.py index 1c87aec5222..5495febb577 100644 --- a/batch/batch/cloud/resource_utils.py +++ b/batch/batch/cloud/resource_utils.py @@ -67,28 +67,30 @@ def cost_from_msec_mcpu(msec_mcpu: int) -> Optional[float]: return gcp_cost_from_msec_mcpu(msec_mcpu) -def worker_memory_per_core_mib(cloud: str, worker_type: str) -> int: +def worker_memory_per_core_mib(cloud: str, machine_family: str, worker_type: str) -> int: if cloud == 'azure': return azure_worker_memory_per_core_mib(worker_type) assert cloud == 'gcp' - return gcp_worker_memory_per_core_mib(worker_type) + return gcp_worker_memory_per_core_mib(machine_family, worker_type) -def worker_memory_per_core_bytes(cloud: str, worker_type: str) -> int: - m = worker_memory_per_core_mib(cloud, worker_type) +def worker_memory_per_core_bytes(cloud: str, machine_family: str, worker_type: str) -> int: + m = worker_memory_per_core_mib(cloud, machine_family, worker_type) return int(m * 1024**2) -def memory_bytes_to_cores_mcpu(cloud: str, memory_in_bytes: int, worker_type: str) -> int: - return math.ceil((memory_in_bytes / worker_memory_per_core_bytes(cloud, worker_type)) * 1000) +def memory_bytes_to_cores_mcpu(cloud: str, memory_in_bytes: int, machine_family: str, worker_type: str) -> int: + return math.ceil((memory_in_bytes / worker_memory_per_core_bytes(cloud, machine_family, worker_type)) * 1000) -def cores_mcpu_to_memory_bytes(cloud: str, cores_in_mcpu: int, worker_type: str) -> int: - return int((cores_in_mcpu / 1000) * worker_memory_per_core_bytes(cloud, worker_type)) +def cores_mcpu_to_memory_bytes(cloud: str, cores_in_mcpu: int, machine_family: str, worker_type: str) -> int: + return int((cores_in_mcpu / 1000) * worker_memory_per_core_bytes(cloud, machine_family, worker_type)) -def adjust_cores_for_memory_request(cloud: str, cores_in_mcpu: int, memory_in_bytes: int, worker_type: str) -> int: - min_cores_mcpu = memory_bytes_to_cores_mcpu(cloud, memory_in_bytes, worker_type) +def adjust_cores_for_memory_request( + cloud: str, cores_in_mcpu: int, memory_in_bytes: int, machine_family: str, worker_type: str +) -> int: + min_cores_mcpu = memory_bytes_to_cores_mcpu(cloud, memory_in_bytes, machine_family, worker_type) return max(cores_in_mcpu, min_cores_mcpu) diff --git a/batch/batch/front_end/front_end.py b/batch/batch/front_end/front_end.py index ceb1ff1ff53..c7bb66f0e37 100644 --- a/batch/batch/front_end/front_end.py +++ b/batch/batch/front_end/front_end.py @@ -1185,7 +1185,7 @@ async def _create_jobs( memory_to_worker_types = memory_to_worker_type(cloud) if req_memory in memory_to_worker_types: worker_type = memory_to_worker_types[req_memory] - req_memory_bytes = cores_mcpu_to_memory_bytes(cloud, req_cores_mcpu, worker_type) + req_memory_bytes = cores_mcpu_to_memory_bytes(cloud, req_cores_mcpu, 'n1', worker_type) else: req_memory_bytes = parse_memory_in_bytes(req_memory) else: diff --git a/batch/batch/inst_coll_config.py b/batch/batch/inst_coll_config.py index bac3d7399c1..437c86c7bc7 100644 --- a/batch/batch/inst_coll_config.py +++ b/batch/batch/inst_coll_config.py @@ -7,13 +7,20 @@ from .cloud.azure.instance_config import AzureSlimInstanceConfig from .cloud.azure.resource_utils import azure_worker_properties_to_machine_type from .cloud.gcp.instance_config import GCPSlimInstanceConfig -from .cloud.gcp.resource_utils import GCP_MACHINE_FAMILY, family_worker_type_cores_to_gcp_machine_type +from .cloud.gcp.resource_utils import ( + GCP_MACHINE_FAMILY, + family_worker_type_cores_to_gcp_machine_type, + gcp_machine_type_to_parts, +) from .cloud.resource_utils import ( adjust_cores_for_memory_request, adjust_cores_for_packability, cores_mcpu_to_memory_bytes, local_ssd_size, +<<<<<<< HEAD machine_type_to_cores_and_memory_mib_per_core, +======= +>>>>>>> c73cfc25c (code refactor) requested_storage_bytes_to_actual_storage_gib, valid_machine_types, ) @@ -186,11 +193,10 @@ def convert_requests_to_resources(self, cores_mcpu, memory_bytes, storage_bytes) storage_gib = requested_storage_bytes_to_actual_storage_gib(self.cloud, storage_bytes, allow_zero_storage=True) if storage_gib is None: return None - - cores_mcpu = adjust_cores_for_memory_request(self.cloud, cores_mcpu, memory_bytes, self.worker_type) + cores_mcpu = adjust_cores_for_memory_request(self.cloud, cores_mcpu, memory_bytes, 'n1', self.worker_type) cores_mcpu = adjust_cores_for_packability(cores_mcpu) - memory_bytes = cores_mcpu_to_memory_bytes(self.cloud, cores_mcpu, self.worker_type) + memory_bytes = cores_mcpu_to_memory_bytes(self.cloud, cores_mcpu, 'n1', self.worker_type) if cores_mcpu <= self.worker_cores * 1000: return (cores_mcpu, memory_bytes, storage_gib) diff --git a/batch/batch/instance_config.py b/batch/batch/instance_config.py index fdbfd8dcafd..fefc9dd8a3e 100644 --- a/batch/batch/instance_config.py +++ b/batch/batch/instance_config.py @@ -102,13 +102,15 @@ def cost_per_hour_from_cores( utilized_cores_mcpu: int, ) -> float: assert 0 <= utilized_cores_mcpu <= self.cores * 1000 - memory_in_bytes = cores_mcpu_to_memory_bytes(self.cloud, utilized_cores_mcpu, self.worker_type()) + memory_in_bytes = cores_mcpu_to_memory_bytes( + self.cloud, utilized_cores_mcpu, self.machine_family(), self.worker_type() + ) storage_in_gb = 0 # we don't need to account for external storage return self.cost_per_hour(resource_rates, utilized_cores_mcpu, memory_in_bytes, storage_in_gb) def actual_cost_per_hour(self, resource_rates: Dict[str, float]) -> float: cpu_in_mcpu = self.cores * 1000 - memory_in_bytes = cores_mcpu_to_memory_bytes(self.cloud, cpu_in_mcpu, self.worker_type()) + memory_in_bytes = cores_mcpu_to_memory_bytes(self.cloud, cpu_in_mcpu, self.machine_family(), self.worker_type()) storage_in_gb = 0 # we don't need to account for external storage resources = self.quantified_resources(cpu_in_mcpu, memory_in_bytes, storage_in_gb) resources = [r for r in resources if 'service-fee' not in r['name']] diff --git a/batch/batch/worker/worker.py b/batch/batch/worker/worker.py index cc97232c21c..d7f48ef1bc5 100644 --- a/batch/batch/worker/worker.py +++ b/batch/batch/worker/worker.py @@ -69,8 +69,9 @@ ) from ..batch_format_version import BatchFormatVersion +from ..cloud.azure.resource_utils import azure_machine_type_to_parts from ..cloud.azure.worker.worker_api import AzureWorkerAPI -from ..cloud.gcp.resource_utils import is_gpu +from ..cloud.gcp.resource_utils import gcp_machine_type_to_parts, is_gpu from ..cloud.gcp.worker.worker_api import GCPWorkerAPI from ..cloud.resource_utils import ( is_valid_storage_request, @@ -2483,10 +2484,21 @@ async def create_and_start( assert os.path.isdir(root_dir) assert instance_config - total_memory_bytes = n_cores * worker_memory_per_core_bytes(CLOUD, instance_config.worker_type()) + + if CLOUD == 'gcp': + machine_type_parts = gcp_machine_type_to_parts(INSTANCE_CONFIG["machine_type"]) + elif CLOUD == 'azure': + machine_type_parts = azure_machine_type_to_parts(INSTANCE_CONFIG["machine_type"]) + + assert machine_type_parts is not None, INSTANCE_CONFIG["machine_type"] + machine_family = machine_type_parts.machine_family + assert machine_family is not None, machine_family + worker_type = machine_type_parts.worker_type + + total_memory_bytes = n_cores * worker_memory_per_core_bytes(CLOUD, machine_family, worker_type) # We allocate 60% of memory per core to off heap memory - memory_per_core_mib = worker_memory_per_core_mib(CLOUD, instance_config.worker_type()) + memory_per_core_mib = worker_memory_per_core_mib(CLOUD, machine_family, worker_type) memory_mib = n_cores * memory_per_core_mib heap_memory_mib = int(0.4 * memory_mib) off_heap_memory_per_core_mib = memory_mib - heap_memory_mib diff --git a/batch/test/test_utils.py b/batch/test/test_utils.py index e381fba9848..a2d89941b44 100644 --- a/batch/test/test_utils.py +++ b/batch/test/test_utils.py @@ -1,6 +1,7 @@ from batch.cloud.resource_utils import adjust_cores_for_packability +from batch.cloud.gcp.resource_utils import gcp_worker_memory_per_core_mib from hailtop.batch_client.parse import parse_memory_in_bytes - +import pytest def test_packability(): assert adjust_cores_for_packability(0) == 250 @@ -23,3 +24,12 @@ def test_memory_str_to_bytes(): assert parse_memory_in_bytes('7') == 7 assert parse_memory_in_bytes('1K') == 1000 assert parse_memory_in_bytes('1Ki') == 1024 + + +def test_gcp_worker_memory_per_core_mib(): + with pytest.raises(AssertionError): + assert gcp_worker_memory_per_core_mib('n2','standard') + assert gcp_worker_memory_per_core_mib('n1','standard') == 3840 + assert gcp_worker_memory_per_core_mib('n1','highmem') == 6656 + assert gcp_worker_memory_per_core_mib('n1','highcpu') == 924 + assert gcp_worker_memory_per_core_mib('g2','standard') == 4000 \ No newline at end of file From 478f9c86c39e89cba62cd313fb60e2ad7c06f50d Mon Sep 17 00:00:00 2001 From: Sophie Parsa Date: Tue, 30 Apr 2024 15:47:06 -0400 Subject: [PATCH 03/12] resolved front jp instance manager --- batch/batch/cloud/azure/instance_config.py | 7 ++++++- batch/batch/cloud/gcp/driver/resource_manager.py | 11 +---------- batch/batch/cloud/gcp/instance_config.py | 7 ++++++- batch/batch/cloud/gcp/resource_utils.py | 4 +++- batch/batch/inst_coll_config.py | 4 ---- batch/batch/instance_config.py | 11 ++++++----- batch/test/test_utils.py | 16 +++++++++------- 7 files changed, 31 insertions(+), 29 deletions(-) diff --git a/batch/batch/cloud/azure/instance_config.py b/batch/batch/cloud/azure/instance_config.py index 5a3d112a477..e0007aba162 100644 --- a/batch/batch/cloud/azure/instance_config.py +++ b/batch/batch/cloud/azure/instance_config.py @@ -4,7 +4,7 @@ from ...driver.billing_manager import ProductVersions from ...instance_config import InstanceConfig -from .resource_utils import azure_machine_type_to_parts +from .resource_utils import azure_machine_type_to_parts, azure_worker_memory_per_core_mib from .resources import ( AzureDynamicSizedDiskResource, AzureIPFeeResource, @@ -81,6 +81,11 @@ def __init__( def worker_type(self) -> str: return self._worker_type + def cores_mcpu_to_memory_bytes(self, mcpu: int) -> int: + memory_mib = azure_worker_memory_per_core_mib(self.worker_type()) + memory_bytes = int(memory_mib * 1024**2) + return int((mcpu / 1000) * memory_bytes) + # def machine_family(self) -> str: # machine_type_parts = azure_machine_type_to_parts(self._machine_type) # assert machine_type_parts is not None, self._machine_type diff --git a/batch/batch/cloud/gcp/driver/resource_manager.py b/batch/batch/cloud/gcp/driver/resource_manager.py index d038071b92d..711657d64ba 100644 --- a/batch/batch/cloud/gcp/driver/resource_manager.py +++ b/batch/batch/cloud/gcp/driver/resource_manager.py @@ -116,17 +116,12 @@ async def create_vm( resource_rates = self.billing_manager.resource_rates -<<<<<<< HEAD - parts = gcp_machine_type_to_parts(machine_type) - assert parts -======= machine_type_parts = gcp_machine_type_to_parts(machine_type) assert machine_type_parts is not None, machine_type machine_family = machine_type_parts.machine_family worker_type = machine_type_parts.worker_type cores = machine_type_parts.cores ->>>>>>> c73cfc25c (code refactor) vm_config = create_vm_config( file_store, resource_rates, @@ -144,13 +139,9 @@ async def create_vm( instance_config, ) -<<<<<<< HEAD - memory_mib = gcp_worker_memory_per_core_mib(parts.worker_type) * parts.cores -======= memory_mib = gcp_worker_memory_per_core_mib(machine_family, worker_type) * cores ->>>>>>> c73cfc25c (code refactor) memory_in_bytes = memory_mib << 20 - cores_mcpu = parts.cores * 1000 + cores_mcpu = cores * 1000 total_resources_on_instance = instance_config.quantified_resources( cpu_in_mcpu=cores_mcpu, memory_in_bytes=memory_in_bytes, extra_storage_in_gib=0 ) diff --git a/batch/batch/cloud/gcp/instance_config.py b/batch/batch/cloud/gcp/instance_config.py index eb4ed594127..a631b82469c 100644 --- a/batch/batch/cloud/gcp/instance_config.py +++ b/batch/batch/cloud/gcp/instance_config.py @@ -2,7 +2,7 @@ from ...driver.billing_manager import ProductVersions from ...instance_config import InstanceConfig -from .resource_utils import gcp_machine_type_to_parts, machine_family_to_gpu +from .resource_utils import gcp_machine_type_to_parts, gcp_worker_memory_per_core_mib, machine_family_to_gpu from .resources import ( GCPAcceleratorResource, GCPComputeResource, @@ -104,6 +104,11 @@ def __init__( def worker_type(self) -> str: return self._worker_type + def cores_mcpu_to_memory_bytes(self, mcpu: int) -> int: + memory_mib = gcp_worker_memory_per_core_mib(self._instance_family, self.worker_type()) + memory_bytes = int(memory_mib * 1024**2) + return int((mcpu / 1000) * memory_bytes) + def machine_family(self) -> str: assert self._instance_family is not None, self._instance_family return self._instance_family diff --git a/batch/batch/cloud/gcp/resource_utils.py b/batch/batch/cloud/gcp/resource_utils.py index aa400c0048e..4dc63fb7286 100644 --- a/batch/batch/cloud/gcp/resource_utils.py +++ b/batch/batch/cloud/gcp/resource_utils.py @@ -58,7 +58,9 @@ def gcp_machine_type_to_cores_and_memory_mib_per_core(machine_type: str) -> Tupl if maybe_machine_type_parts is None: raise ValueError(f'bad machine_type: {machine_type}') cores = maybe_machine_type_parts.cores - memory_per_core = gcp_worker_memory_per_core_mib(maybe_machine_type_parts.worker_type) + memory_per_core = gcp_worker_memory_per_core_mib( + maybe_machine_type_parts.machine_family, maybe_machine_type_parts.worker_type + ) return cores, memory_per_core diff --git a/batch/batch/inst_coll_config.py b/batch/batch/inst_coll_config.py index 437c86c7bc7..19f244ae5d1 100644 --- a/batch/batch/inst_coll_config.py +++ b/batch/batch/inst_coll_config.py @@ -10,17 +10,13 @@ from .cloud.gcp.resource_utils import ( GCP_MACHINE_FAMILY, family_worker_type_cores_to_gcp_machine_type, - gcp_machine_type_to_parts, ) from .cloud.resource_utils import ( adjust_cores_for_memory_request, adjust_cores_for_packability, cores_mcpu_to_memory_bytes, local_ssd_size, -<<<<<<< HEAD machine_type_to_cores_and_memory_mib_per_core, -======= ->>>>>>> c73cfc25c (code refactor) requested_storage_bytes_to_actual_storage_gib, valid_machine_types, ) diff --git a/batch/batch/instance_config.py b/batch/batch/instance_config.py index fefc9dd8a3e..308aa5c9f01 100644 --- a/batch/batch/instance_config.py +++ b/batch/batch/instance_config.py @@ -1,7 +1,6 @@ import abc from typing import Dict, List, Sequence -from .cloud.resource_utils import cores_mcpu_to_memory_bytes from .driver.billing_manager import ProductVersions from .resources import QuantifiedResource, Resource @@ -44,6 +43,10 @@ def to_dict(self) -> dict: def region_for(self, location: str) -> str: raise NotImplementedError + @abc.abstractmethod + def cores_mcpu_to_memory_bytes(self, mcpu: int) -> int: + raise NotImplementedError + def quantified_resources( self, cpu_in_mcpu: int, @@ -102,15 +105,13 @@ def cost_per_hour_from_cores( utilized_cores_mcpu: int, ) -> float: assert 0 <= utilized_cores_mcpu <= self.cores * 1000 - memory_in_bytes = cores_mcpu_to_memory_bytes( - self.cloud, utilized_cores_mcpu, self.machine_family(), self.worker_type() - ) + memory_in_bytes = self.cores_mcpu_to_memory_bytes(utilized_cores_mcpu) storage_in_gb = 0 # we don't need to account for external storage return self.cost_per_hour(resource_rates, utilized_cores_mcpu, memory_in_bytes, storage_in_gb) def actual_cost_per_hour(self, resource_rates: Dict[str, float]) -> float: cpu_in_mcpu = self.cores * 1000 - memory_in_bytes = cores_mcpu_to_memory_bytes(self.cloud, cpu_in_mcpu, self.machine_family(), self.worker_type()) + memory_in_bytes = self.cores_mcpu_to_memory_bytes(cpu_in_mcpu) storage_in_gb = 0 # we don't need to account for external storage resources = self.quantified_resources(cpu_in_mcpu, memory_in_bytes, storage_in_gb) resources = [r for r in resources if 'service-fee' not in r['name']] diff --git a/batch/test/test_utils.py b/batch/test/test_utils.py index a2d89941b44..7533d4ca105 100644 --- a/batch/test/test_utils.py +++ b/batch/test/test_utils.py @@ -1,7 +1,9 @@ -from batch.cloud.resource_utils import adjust_cores_for_packability +import pytest + from batch.cloud.gcp.resource_utils import gcp_worker_memory_per_core_mib +from batch.cloud.resource_utils import adjust_cores_for_packability from hailtop.batch_client.parse import parse_memory_in_bytes -import pytest + def test_packability(): assert adjust_cores_for_packability(0) == 250 @@ -28,8 +30,8 @@ def test_memory_str_to_bytes(): def test_gcp_worker_memory_per_core_mib(): with pytest.raises(AssertionError): - assert gcp_worker_memory_per_core_mib('n2','standard') - assert gcp_worker_memory_per_core_mib('n1','standard') == 3840 - assert gcp_worker_memory_per_core_mib('n1','highmem') == 6656 - assert gcp_worker_memory_per_core_mib('n1','highcpu') == 924 - assert gcp_worker_memory_per_core_mib('g2','standard') == 4000 \ No newline at end of file + assert gcp_worker_memory_per_core_mib('n2', 'standard') + assert gcp_worker_memory_per_core_mib('n1', 'standard') == 3840 + assert gcp_worker_memory_per_core_mib('n1', 'highmem') == 6656 + assert gcp_worker_memory_per_core_mib('n1', 'highcpu') == 924 + assert gcp_worker_memory_per_core_mib('g2', 'standard') == 4000 From 3eeb65a64e33816290e50ba3ee2d88b0f5fcbe33 Mon Sep 17 00:00:00 2001 From: Sophie Parsa Date: Tue, 30 Apr 2024 16:22:20 -0400 Subject: [PATCH 04/12] fix worker memory calls --- batch/batch/cloud/azure/instance_config.py | 6 ++--- batch/batch/cloud/azure/resource_utils.py | 6 +++++ batch/batch/cloud/gcp/instance_config.py | 6 ++--- batch/batch/cloud/gcp/resource_utils.py | 6 +++++ batch/batch/front_end/front_end.py | 9 ++++++-- batch/batch/worker/worker.py | 27 +++++++++++----------- 6 files changed, 36 insertions(+), 24 deletions(-) diff --git a/batch/batch/cloud/azure/instance_config.py b/batch/batch/cloud/azure/instance_config.py index e0007aba162..80fe079940d 100644 --- a/batch/batch/cloud/azure/instance_config.py +++ b/batch/batch/cloud/azure/instance_config.py @@ -4,7 +4,7 @@ from ...driver.billing_manager import ProductVersions from ...instance_config import InstanceConfig -from .resource_utils import azure_machine_type_to_parts, azure_worker_memory_per_core_mib +from .resource_utils import azure_cores_mcpu_to_memory_bytes, azure_machine_type_to_parts from .resources import ( AzureDynamicSizedDiskResource, AzureIPFeeResource, @@ -82,9 +82,7 @@ def worker_type(self) -> str: return self._worker_type def cores_mcpu_to_memory_bytes(self, mcpu: int) -> int: - memory_mib = azure_worker_memory_per_core_mib(self.worker_type()) - memory_bytes = int(memory_mib * 1024**2) - return int((mcpu / 1000) * memory_bytes) + return azure_cores_mcpu_to_memory_bytes(mcpu, self.worker_type()) # def machine_family(self) -> str: # machine_type_parts = azure_machine_type_to_parts(self._machine_type) diff --git a/batch/batch/cloud/azure/resource_utils.py b/batch/batch/cloud/azure/resource_utils.py index 36a83b32153..6d701638315 100644 --- a/batch/batch/cloud/azure/resource_utils.py +++ b/batch/batch/cloud/azure/resource_utils.py @@ -206,3 +206,9 @@ def azure_requested_to_actual_storage_bytes(storage_bytes, allow_zero_storage): def azure_is_valid_storage_request(storage_in_gib: int) -> bool: return 10 <= storage_in_gib <= AZURE_MAX_PERSISTENT_SSD_SIZE_GIB + + +def azure_cores_mcpu_to_memory_bytes(mcpu: int, worker_type: str) -> int: + memory_mib = azure_worker_memory_per_core_mib(worker_type) + memory_bytes = int(memory_mib * 1024**2) + return int((mcpu / 1000) * memory_bytes) diff --git a/batch/batch/cloud/gcp/instance_config.py b/batch/batch/cloud/gcp/instance_config.py index a631b82469c..1d15986b0fa 100644 --- a/batch/batch/cloud/gcp/instance_config.py +++ b/batch/batch/cloud/gcp/instance_config.py @@ -2,7 +2,7 @@ from ...driver.billing_manager import ProductVersions from ...instance_config import InstanceConfig -from .resource_utils import gcp_machine_type_to_parts, gcp_worker_memory_per_core_mib, machine_family_to_gpu +from .resource_utils import gcp_cores_mcpu_to_memory_bytes, gcp_machine_type_to_parts, machine_family_to_gpu from .resources import ( GCPAcceleratorResource, GCPComputeResource, @@ -105,9 +105,7 @@ def worker_type(self) -> str: return self._worker_type def cores_mcpu_to_memory_bytes(self, mcpu: int) -> int: - memory_mib = gcp_worker_memory_per_core_mib(self._instance_family, self.worker_type()) - memory_bytes = int(memory_mib * 1024**2) - return int((mcpu / 1000) * memory_bytes) + return gcp_cores_mcpu_to_memory_bytes(mcpu, self._instance_family, self.worker_type()) def machine_family(self) -> str: assert self._instance_family is not None, self._instance_family diff --git a/batch/batch/cloud/gcp/resource_utils.py b/batch/batch/cloud/gcp/resource_utils.py index 4dc63fb7286..471bb029496 100644 --- a/batch/batch/cloud/gcp/resource_utils.py +++ b/batch/batch/cloud/gcp/resource_utils.py @@ -136,3 +136,9 @@ def machine_family_to_gpu(machine_family: str) -> Optional[str]: def is_gpu(machine_family: str) -> bool: return machine_family_to_gpu(machine_family) is not None + + +def gcp_cores_mcpu_to_memory_bytes(mcpu: int, machine_family: str, worker_type: str) -> int: + memory_mib = gcp_worker_memory_per_core_mib(machine_family, worker_type) + memory_bytes = int(memory_mib * 1024**2) + return int((mcpu / 1000) * memory_bytes) diff --git a/batch/batch/front_end/front_end.py b/batch/batch/front_end/front_end.py index c7bb66f0e37..d7565c24c03 100644 --- a/batch/batch/front_end/front_end.py +++ b/batch/batch/front_end/front_end.py @@ -73,8 +73,9 @@ from ..batch import batch_record_to_dict, cancel_job_group_in_db, job_group_record_to_dict, job_record_to_dict from ..batch_configuration import BATCH_STORAGE_URI, CLOUD, DEFAULT_NAMESPACE, SCOPE from ..batch_format_version import BatchFormatVersion +from ..cloud.azure.resource_utils import azure_cores_mcpu_to_memory_bytes +from ..cloud.gcp.resource_utils import GCP_MACHINE_FAMILY, gcp_cores_mcpu_to_memory_bytes from ..cloud.resource_utils import ( - cores_mcpu_to_memory_bytes, is_valid_cores_mcpu, memory_to_worker_type, valid_machine_types, @@ -1185,7 +1186,11 @@ async def _create_jobs( memory_to_worker_types = memory_to_worker_type(cloud) if req_memory in memory_to_worker_types: worker_type = memory_to_worker_types[req_memory] - req_memory_bytes = cores_mcpu_to_memory_bytes(cloud, req_cores_mcpu, 'n1', worker_type) + if CLOUD == 'gcp': + req_memory_bytes = gcp_cores_mcpu_to_memory_bytes(req_cores_mcpu, GCP_MACHINE_FAMILY, worker_type) + else: + assert CLOUD == 'azure' + req_memory_bytes = azure_cores_mcpu_to_memory_bytes(req_cores_mcpu, worker_type) else: req_memory_bytes = parse_memory_in_bytes(req_memory) else: diff --git a/batch/batch/worker/worker.py b/batch/batch/worker/worker.py index d7f48ef1bc5..167cabc6215 100644 --- a/batch/batch/worker/worker.py +++ b/batch/batch/worker/worker.py @@ -69,15 +69,13 @@ ) from ..batch_format_version import BatchFormatVersion -from ..cloud.azure.resource_utils import azure_machine_type_to_parts +from ..cloud.azure.resource_utils import azure_cores_mcpu_to_memory_bytes, azure_machine_type_to_parts from ..cloud.azure.worker.worker_api import AzureWorkerAPI -from ..cloud.gcp.resource_utils import gcp_machine_type_to_parts, is_gpu +from ..cloud.gcp.resource_utils import gcp_cores_mcpu_to_memory_bytes, gcp_machine_type_to_parts, is_gpu from ..cloud.gcp.worker.worker_api import GCPWorkerAPI from ..cloud.resource_utils import ( is_valid_storage_request, storage_gib_to_bytes, - worker_memory_per_core_bytes, - worker_memory_per_core_mib, ) from ..file_store import FileStore from ..globals import HTTP_CLIENT_MAX_SIZE, RESERVED_STORAGE_GB_PER_CORE, STATUS_FORMAT_VERSION @@ -2487,19 +2485,20 @@ async def create_and_start( if CLOUD == 'gcp': machine_type_parts = gcp_machine_type_to_parts(INSTANCE_CONFIG["machine_type"]) - elif CLOUD == 'azure': - machine_type_parts = azure_machine_type_to_parts(INSTANCE_CONFIG["machine_type"]) - - assert machine_type_parts is not None, INSTANCE_CONFIG["machine_type"] - machine_family = machine_type_parts.machine_family - assert machine_family is not None, machine_family - worker_type = machine_type_parts.worker_type + assert machine_type_parts + total_memory_bytes = gcp_cores_mcpu_to_memory_bytes( + n_cores * 1000, machine_type_parts.machine_family, machine_type_parts.worker_type + ) - total_memory_bytes = n_cores * worker_memory_per_core_bytes(CLOUD, machine_family, worker_type) + else: + assert CLOUD == 'azure' + machine_type_parts = azure_machine_type_to_parts(INSTANCE_CONFIG["machine_type"]) + assert machine_type_parts + total_memory_bytes = azure_cores_mcpu_to_memory_bytes(n_cores * 1000, machine_type_parts.family) # We allocate 60% of memory per core to off heap memory - memory_per_core_mib = worker_memory_per_core_mib(CLOUD, machine_family, worker_type) - memory_mib = n_cores * memory_per_core_mib + + memory_mib = total_memory_bytes / (1024**2) heap_memory_mib = int(0.4 * memory_mib) off_heap_memory_per_core_mib = memory_mib - heap_memory_mib From 4d2b98cab56814affff01d7e58b4a19197352519 Mon Sep 17 00:00:00 2001 From: Sophie Parsa Date: Tue, 30 Apr 2024 17:06:25 -0400 Subject: [PATCH 05/12] fix memory calculation --- batch/batch/worker/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/batch/batch/worker/worker.py b/batch/batch/worker/worker.py index 167cabc6215..4faca1ee991 100644 --- a/batch/batch/worker/worker.py +++ b/batch/batch/worker/worker.py @@ -2498,7 +2498,7 @@ async def create_and_start( # We allocate 60% of memory per core to off heap memory - memory_mib = total_memory_bytes / (1024**2) + memory_mib = total_memory_bytes // (1024**2) heap_memory_mib = int(0.4 * memory_mib) off_heap_memory_per_core_mib = memory_mib - heap_memory_mib From ae40c9458c14f353a103ce84d806954a767ff8f5 Mon Sep 17 00:00:00 2001 From: Sophie Parsa Date: Wed, 1 May 2024 13:53:51 -0400 Subject: [PATCH 06/12] fix memory bytes calc in instance config --- batch/batch/cloud/resource_utils.py | 12 ++++++++---- .../batch/driver/instance_collection/job_private.py | 4 ++-- batch/batch/inst_coll_config.py | 6 +++--- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/batch/batch/cloud/resource_utils.py b/batch/batch/cloud/resource_utils.py index 5495febb577..41f78bb9347 100644 --- a/batch/batch/cloud/resource_utils.py +++ b/batch/batch/cloud/resource_utils.py @@ -53,11 +53,15 @@ def memory_to_worker_type(cloud: str) -> Dict[str, str]: return gcp_memory_to_worker_type -def machine_type_to_cores_and_memory_mib_per_core(cloud: str, machine_type: str) -> Tuple[int, int]: +def machine_type_to_cores_and_memory_bytes(cloud: str, machine_type: str) -> Tuple[int, int]: if cloud == 'azure': - return azure_machine_type_to_cores_and_memory_mib_per_core(machine_type) - assert cloud == 'gcp' - return gcp_machine_type_to_cores_and_memory_mib_per_core(machine_type) + cores, memory_mib_per_core = azure_machine_type_to_cores_and_memory_mib_per_core(machine_type) + else: + assert cloud == 'gcp' + cores, memory_mib_per_core = gcp_machine_type_to_cores_and_memory_mib_per_core(machine_type) + memory_bytes_per_core = memory_mib_per_core * 1024**2 + memory_bytes = cores * memory_bytes_per_core + return cores, memory_bytes def cost_from_msec_mcpu(msec_mcpu: int) -> Optional[float]: diff --git a/batch/batch/driver/instance_collection/job_private.py b/batch/batch/driver/instance_collection/job_private.py index d7de236fe79..c4907ec590b 100644 --- a/batch/batch/driver/instance_collection/job_private.py +++ b/batch/batch/driver/instance_collection/job_private.py @@ -21,7 +21,7 @@ ) from ...batch_format_version import BatchFormatVersion -from ...cloud.resource_utils import machine_type_to_cores_and_memory_mib_per_core +from ...cloud.resource_utils import machine_type_to_cores_and_memory_bytes from ...inst_coll_config import JobPrivateInstanceManagerConfig from ...instance_config import QuantifiedResource from ...utils import Box, ExceededSharesCounter, regions_bits_rep_to_regions @@ -312,7 +312,7 @@ async def create_instance( machine_type = machine_spec['machine_type'] preemptible = machine_spec['preemptible'] storage_gb = machine_spec['storage_gib'] - cores, _ = machine_type_to_cores_and_memory_mib_per_core(self.cloud, machine_type) + cores, _ = machine_type_to_cores_and_memory_bytes(self.cloud, machine_type) instance, total_resources_on_instance = await self._create_instance( app=self.app, cores=cores, diff --git a/batch/batch/inst_coll_config.py b/batch/batch/inst_coll_config.py index 19f244ae5d1..1308479f48a 100644 --- a/batch/batch/inst_coll_config.py +++ b/batch/batch/inst_coll_config.py @@ -16,7 +16,7 @@ adjust_cores_for_packability, cores_mcpu_to_memory_bytes, local_ssd_size, - machine_type_to_cores_and_memory_mib_per_core, + machine_type_to_cores_and_memory_bytes, requested_storage_bytes_to_actual_storage_gib, valid_machine_types, ) @@ -245,10 +245,10 @@ def convert_requests_to_resources(self, machine_type, storage_bytes): if storage_gib is None: return None - cores, memory_mib_per_core = machine_type_to_cores_and_memory_mib_per_core(self.cloud, machine_type) + cores, memory_bytes = machine_type_to_cores_and_memory_bytes(self.cloud, machine_type) cores_mcpu = cores * 1000 - return (self.name, cores_mcpu, memory_mib_per_core * 1024, storage_gib) + return (self.name, cores_mcpu, memory_bytes, storage_gib) class InstanceCollectionConfigs: From 1cbffd7e7ac3373932bd7cdbfe4b3389211bcb62 Mon Sep 17 00:00:00 2001 From: Sophie Parsa Date: Wed, 1 May 2024 15:06:24 -0400 Subject: [PATCH 07/12] delete commented out code --- batch/batch/cloud/azure/instance_config.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/batch/batch/cloud/azure/instance_config.py b/batch/batch/cloud/azure/instance_config.py index 80fe079940d..09e698ce486 100644 --- a/batch/batch/cloud/azure/instance_config.py +++ b/batch/batch/cloud/azure/instance_config.py @@ -84,11 +84,6 @@ def worker_type(self) -> str: def cores_mcpu_to_memory_bytes(self, mcpu: int) -> int: return azure_cores_mcpu_to_memory_bytes(mcpu, self.worker_type()) - # def machine_family(self) -> str: - # machine_type_parts = azure_machine_type_to_parts(self._machine_type) - # assert machine_type_parts is not None, self._machine_type - # return machine_type_parts.machine_family - def region_for(self, location: str) -> str: return location From 3884d6cffa41bf9eeb14e1c5d7a2da3a24adbf9e Mon Sep 17 00:00:00 2001 From: Sophie Parsa Date: Wed, 1 May 2024 15:16:20 -0400 Subject: [PATCH 08/12] simplify --- batch/batch/worker/worker.py | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/batch/batch/worker/worker.py b/batch/batch/worker/worker.py index 4faca1ee991..e4e67d742f7 100644 --- a/batch/batch/worker/worker.py +++ b/batch/batch/worker/worker.py @@ -69,12 +69,12 @@ ) from ..batch_format_version import BatchFormatVersion -from ..cloud.azure.resource_utils import azure_cores_mcpu_to_memory_bytes, azure_machine_type_to_parts from ..cloud.azure.worker.worker_api import AzureWorkerAPI -from ..cloud.gcp.resource_utils import gcp_cores_mcpu_to_memory_bytes, gcp_machine_type_to_parts, is_gpu +from ..cloud.gcp.resource_utils import is_gpu from ..cloud.gcp.worker.worker_api import GCPWorkerAPI from ..cloud.resource_utils import ( is_valid_storage_request, + machine_type_to_cores_and_memory_bytes, storage_gib_to_bytes, ) from ..file_store import FileStore @@ -2483,19 +2483,7 @@ async def create_and_start( assert instance_config - if CLOUD == 'gcp': - machine_type_parts = gcp_machine_type_to_parts(INSTANCE_CONFIG["machine_type"]) - assert machine_type_parts - total_memory_bytes = gcp_cores_mcpu_to_memory_bytes( - n_cores * 1000, machine_type_parts.machine_family, machine_type_parts.worker_type - ) - - else: - assert CLOUD == 'azure' - machine_type_parts = azure_machine_type_to_parts(INSTANCE_CONFIG["machine_type"]) - assert machine_type_parts - total_memory_bytes = azure_cores_mcpu_to_memory_bytes(n_cores * 1000, machine_type_parts.family) - + _, total_memory_bytes = machine_type_to_cores_and_memory_bytes(CLOUD, INSTANCE_CONFIG["machine_type"]) # We allocate 60% of memory per core to off heap memory memory_mib = total_memory_bytes // (1024**2) From db1982e6b3dcc985125045f6a6184a88c1445d77 Mon Sep 17 00:00:00 2001 From: Sophie Parsa Date: Wed, 1 May 2024 16:03:30 -0400 Subject: [PATCH 09/12] fix worker mem calc --- batch/batch/worker/worker.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/batch/batch/worker/worker.py b/batch/batch/worker/worker.py index e4e67d742f7..165f28b8bd5 100644 --- a/batch/batch/worker/worker.py +++ b/batch/batch/worker/worker.py @@ -2483,7 +2483,8 @@ async def create_and_start( assert instance_config - _, total_memory_bytes = machine_type_to_cores_and_memory_bytes(CLOUD, INSTANCE_CONFIG["machine_type"]) + total_machine_cores , total_machine_memory_bytes = machine_type_to_cores_and_memory_bytes(CLOUD, INSTANCE_CONFIG["machine_type"]) + total_memory_bytes = int((n_cores / total_machine_cores) * total_machine_memory_bytes) # We allocate 60% of memory per core to off heap memory memory_mib = total_memory_bytes // (1024**2) From a78568d957ea38756e8dc109ed8767f1078ccdaf Mon Sep 17 00:00:00 2001 From: Sophie Parsa Date: Wed, 1 May 2024 16:12:45 -0400 Subject: [PATCH 10/12] lint fix --- batch/batch/worker/worker.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/batch/batch/worker/worker.py b/batch/batch/worker/worker.py index 165f28b8bd5..621abca6462 100644 --- a/batch/batch/worker/worker.py +++ b/batch/batch/worker/worker.py @@ -2483,7 +2483,9 @@ async def create_and_start( assert instance_config - total_machine_cores , total_machine_memory_bytes = machine_type_to_cores_and_memory_bytes(CLOUD, INSTANCE_CONFIG["machine_type"]) + total_machine_cores, total_machine_memory_bytes = machine_type_to_cores_and_memory_bytes( + CLOUD, INSTANCE_CONFIG["machine_type"] + ) total_memory_bytes = int((n_cores / total_machine_cores) * total_machine_memory_bytes) # We allocate 60% of memory per core to off heap memory From 1676a8540a0bf6dd4c90c4f62662f7ac2f72402d Mon Sep 17 00:00:00 2001 From: Sophie Parsa Date: Thu, 2 May 2024 21:16:33 -0400 Subject: [PATCH 11/12] review --- batch/batch/cloud/azure/resource_utils.py | 8 +++++++ batch/batch/cloud/gcp/instance_config.py | 4 ---- batch/batch/cloud/gcp/resource_utils.py | 10 ++++++++ batch/batch/cloud/resource_utils.py | 29 ----------------------- batch/batch/inst_coll_config.py | 26 +++++++++++++++----- 5 files changed, 38 insertions(+), 39 deletions(-) diff --git a/batch/batch/cloud/azure/resource_utils.py b/batch/batch/cloud/azure/resource_utils.py index 6d701638315..a72b79e25c9 100644 --- a/batch/batch/cloud/azure/resource_utils.py +++ b/batch/batch/cloud/azure/resource_utils.py @@ -1,4 +1,5 @@ import logging +import math import re from typing import Dict, Optional, Tuple @@ -212,3 +213,10 @@ def azure_cores_mcpu_to_memory_bytes(mcpu: int, worker_type: str) -> int: memory_mib = azure_worker_memory_per_core_mib(worker_type) memory_bytes = int(memory_mib * 1024**2) return int((mcpu / 1000) * memory_bytes) + + +def azure_adjust_cores_for_memory_request(cores_in_mcpu: int, memory_in_bytes: int, worker_type: str) -> int: + memory_per_core_mib = azure_worker_memory_per_core_mib(worker_type) + memory_per_core_bytes = int(memory_per_core_mib * 1024**2) + min_cores_mcpu = math.ceil((memory_in_bytes / memory_per_core_bytes) * 1000) + return max(cores_in_mcpu, min_cores_mcpu) diff --git a/batch/batch/cloud/gcp/instance_config.py b/batch/batch/cloud/gcp/instance_config.py index 1d15986b0fa..f9ea5156dea 100644 --- a/batch/batch/cloud/gcp/instance_config.py +++ b/batch/batch/cloud/gcp/instance_config.py @@ -107,10 +107,6 @@ def worker_type(self) -> str: def cores_mcpu_to_memory_bytes(self, mcpu: int) -> int: return gcp_cores_mcpu_to_memory_bytes(mcpu, self._instance_family, self.worker_type()) - def machine_family(self) -> str: - assert self._instance_family is not None, self._instance_family - return self._instance_family - def region_for(self, location: str) -> str: # location = zone return region_from_location(location) diff --git a/batch/batch/cloud/gcp/resource_utils.py b/batch/batch/cloud/gcp/resource_utils.py index 4ae55b87325..b0c91859425 100644 --- a/batch/batch/cloud/gcp/resource_utils.py +++ b/batch/batch/cloud/gcp/resource_utils.py @@ -1,4 +1,5 @@ import logging +import math import re from typing import Optional, Tuple @@ -103,3 +104,12 @@ def gcp_cores_mcpu_to_memory_bytes(mcpu: int, machine_family: str, worker_type: memory_mib = gcp_worker_memory_per_core_mib(machine_family, worker_type) memory_bytes = int(memory_mib * 1024**2) return int((mcpu / 1000) * memory_bytes) + + +def gcp_adjust_cores_for_memory_request( + cores_in_mcpu: int, memory_in_bytes: int, machine_family: str, worker_type: str +) -> int: + memory_per_core_mib = gcp_worker_memory_per_core_mib(machine_family, worker_type) + memory_per_core_bytes = int(memory_per_core_mib * 1024**2) + min_cores_mcpu = math.ceil((memory_in_bytes / memory_per_core_bytes) * 1000) + return max(cores_in_mcpu, min_cores_mcpu) diff --git a/batch/batch/cloud/resource_utils.py b/batch/batch/cloud/resource_utils.py index 5decc801ad2..41f1913616f 100644 --- a/batch/batch/cloud/resource_utils.py +++ b/batch/batch/cloud/resource_utils.py @@ -11,7 +11,6 @@ azure_requested_to_actual_storage_bytes, azure_valid_cores_from_worker_type, azure_valid_machine_types, - azure_worker_memory_per_core_mib, ) from .gcp.resource_utils import ( gcp_is_valid_storage_request, @@ -21,7 +20,6 @@ gcp_requested_to_actual_storage_bytes, gcp_valid_cores_from_worker_type, gcp_valid_machine_types, - gcp_worker_memory_per_core_mib, ) log = logging.getLogger('resource_utils') @@ -63,33 +61,6 @@ def machine_type_to_cores_and_memory_bytes(cloud: str, machine_type: str) -> Tup return cores, memory_bytes -def worker_memory_per_core_mib(cloud: str, machine_family: str, worker_type: str) -> int: - if cloud == 'azure': - return azure_worker_memory_per_core_mib(worker_type) - assert cloud == 'gcp' - return gcp_worker_memory_per_core_mib(machine_family, worker_type) - - -def worker_memory_per_core_bytes(cloud: str, machine_family: str, worker_type: str) -> int: - m = worker_memory_per_core_mib(cloud, machine_family, worker_type) - return int(m * 1024**2) - - -def memory_bytes_to_cores_mcpu(cloud: str, memory_in_bytes: int, machine_family: str, worker_type: str) -> int: - return math.ceil((memory_in_bytes / worker_memory_per_core_bytes(cloud, machine_family, worker_type)) * 1000) - - -def cores_mcpu_to_memory_bytes(cloud: str, cores_in_mcpu: int, machine_family: str, worker_type: str) -> int: - return int((cores_in_mcpu / 1000) * worker_memory_per_core_bytes(cloud, machine_family, worker_type)) - - -def adjust_cores_for_memory_request( - cloud: str, cores_in_mcpu: int, memory_in_bytes: int, machine_family: str, worker_type: str -) -> int: - min_cores_mcpu = memory_bytes_to_cores_mcpu(cloud, memory_in_bytes, machine_family, worker_type) - return max(cores_in_mcpu, min_cores_mcpu) - - def unreserved_worker_data_disk_size_gib(data_disk_size_gib: int, cores: int) -> int: reserved_image_size = 30 reserved_container_size = RESERVED_STORAGE_GB_PER_CORE * cores diff --git a/batch/batch/inst_coll_config.py b/batch/batch/inst_coll_config.py index 1308479f48a..7ceed3b1086 100644 --- a/batch/batch/inst_coll_config.py +++ b/batch/batch/inst_coll_config.py @@ -5,16 +5,20 @@ from gear import Database from .cloud.azure.instance_config import AzureSlimInstanceConfig -from .cloud.azure.resource_utils import azure_worker_properties_to_machine_type +from .cloud.azure.resource_utils import ( + azure_adjust_cores_for_memory_request, + azure_cores_mcpu_to_memory_bytes, + azure_worker_properties_to_machine_type, +) from .cloud.gcp.instance_config import GCPSlimInstanceConfig from .cloud.gcp.resource_utils import ( GCP_MACHINE_FAMILY, family_worker_type_cores_to_gcp_machine_type, + gcp_adjust_cores_for_memory_request, + gcp_cores_mcpu_to_memory_bytes, ) from .cloud.resource_utils import ( - adjust_cores_for_memory_request, adjust_cores_for_packability, - cores_mcpu_to_memory_bytes, local_ssd_size, machine_type_to_cores_and_memory_bytes, requested_storage_bytes_to_actual_storage_gib, @@ -189,10 +193,20 @@ def convert_requests_to_resources(self, cores_mcpu, memory_bytes, storage_bytes) storage_gib = requested_storage_bytes_to_actual_storage_gib(self.cloud, storage_bytes, allow_zero_storage=True) if storage_gib is None: return None - cores_mcpu = adjust_cores_for_memory_request(self.cloud, cores_mcpu, memory_bytes, 'n1', self.worker_type) - cores_mcpu = adjust_cores_for_packability(cores_mcpu) - memory_bytes = cores_mcpu_to_memory_bytes(self.cloud, cores_mcpu, 'n1', self.worker_type) + if self.cloud == 'gcp': + cores_mcpu = gcp_adjust_cores_for_memory_request( + cores_mcpu, memory_bytes, GCP_MACHINE_FAMILY, self.worker_type + ) + cores_mcpu = adjust_cores_for_packability(cores_mcpu) + memory_bytes = gcp_cores_mcpu_to_memory_bytes(cores_mcpu, GCP_MACHINE_FAMILY, self.worker_type) + else: + assert self.cloud == 'azure' + cores_mcpu = azure_adjust_cores_for_memory_request( + cores_mcpu, memory_bytes, GCP_MACHINE_FAMILY, self.worker_type + ) + cores_mcpu = adjust_cores_for_packability(cores_mcpu) + memory_bytes = azure_cores_mcpu_to_memory_bytes(cores_mcpu, GCP_MACHINE_FAMILY, self.worker_type) if cores_mcpu <= self.worker_cores * 1000: return (cores_mcpu, memory_bytes, storage_gib) From 10df17d827c5f2511d65d87f0fa71af854140565 Mon Sep 17 00:00:00 2001 From: Sophie Parsa Date: Thu, 2 May 2024 21:17:51 -0400 Subject: [PATCH 12/12] fix --- batch/batch/inst_coll_config.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/batch/batch/inst_coll_config.py b/batch/batch/inst_coll_config.py index 7ceed3b1086..4e408a7c599 100644 --- a/batch/batch/inst_coll_config.py +++ b/batch/batch/inst_coll_config.py @@ -202,11 +202,9 @@ def convert_requests_to_resources(self, cores_mcpu, memory_bytes, storage_bytes) memory_bytes = gcp_cores_mcpu_to_memory_bytes(cores_mcpu, GCP_MACHINE_FAMILY, self.worker_type) else: assert self.cloud == 'azure' - cores_mcpu = azure_adjust_cores_for_memory_request( - cores_mcpu, memory_bytes, GCP_MACHINE_FAMILY, self.worker_type - ) + cores_mcpu = azure_adjust_cores_for_memory_request(cores_mcpu, memory_bytes, self.worker_type) cores_mcpu = adjust_cores_for_packability(cores_mcpu) - memory_bytes = azure_cores_mcpu_to_memory_bytes(cores_mcpu, GCP_MACHINE_FAMILY, self.worker_type) + memory_bytes = azure_cores_mcpu_to_memory_bytes(cores_mcpu, self.worker_type) if cores_mcpu <= self.worker_cores * 1000: return (cores_mcpu, memory_bytes, storage_gib)