From 31e1638fb319bf2ae98e84b675d2ccd10ab2b047 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Tue, 14 Sep 2021 11:29:13 -0700 Subject: [PATCH] [CLI] Improve ray status for placement groups (#18289) --- python/ray/autoscaler/_private/util.py | 101 +++++++++++++----- python/ray/tests/test_cli.py | 3 +- .../test_cli_patterns/test_ray_status.txt | 6 +- python/ray/tests/test_placement_group.py | 39 +++++++ .../tests/test_resource_demand_scheduler.py | 35 +++--- 5 files changed, 143 insertions(+), 41 deletions(-) diff --git a/python/ray/autoscaler/_private/util.py b/python/ray/autoscaler/_private/util.py index d1ce5b6fa3c67..cb590e630d0ee 100644 --- a/python/ray/autoscaler/_private/util.py +++ b/python/ray/autoscaler/_private/util.py @@ -418,32 +418,43 @@ def parse_placement_group_resource_str( placement_group_resource_str: str) -> Tuple[str, Optional[str]]: """Parse placement group resource in the form of following 3 cases: {resource_name}_group_{bundle_id}_{group_name}; + -> This case is ignored as it is duplicated to the case below. {resource_name}_group_{group_name}; {resource_name} Returns: - Tuple of (resource_name, placement_group_name). placement_group_name - could be None if its not a placement group resource. + Tuple of (resource_name, placement_group_name, is_countable_resource). + placement_group_name could be None if its not a placement group + resource. is_countable_resource is True if the resource + doesn't contain bundle index. We shouldn't count resources + with bundle index because it will + have duplicated resource information as + wildcard resources (resource name without bundle index). """ result = PLACEMENT_GROUP_RESOURCE_BUNDLED_PATTERN.match( placement_group_resource_str) if result: - return (result.group(1), result.group(3)) + return (result.group(1), result.group(3), False) result = PLACEMENT_GROUP_RESOURCE_PATTERN.match( placement_group_resource_str) if result: - return (result.group(1), result.group(2)) - return (placement_group_resource_str, None) + return (result.group(1), result.group(2), True) + return (placement_group_resource_str, None, True) def get_usage_report(lm_summary: LoadMetricsSummary) -> str: # first collect resources used in placement groups - placement_group_resource_usage = collections.defaultdict(float) + placement_group_resource_usage = {} + placement_group_resource_total = collections.defaultdict(float) for resource, (used, total) in lm_summary.usage.items(): - (pg_resource_name, - pg_name) = parse_placement_group_resource_str(resource) + (pg_resource_name, pg_name, + is_countable) = parse_placement_group_resource_str(resource) if pg_name: - placement_group_resource_usage[pg_resource_name] += used + if pg_resource_name not in placement_group_resource_usage: + placement_group_resource_usage[pg_resource_name] = 0 + if is_countable: + placement_group_resource_usage[pg_resource_name] += used + placement_group_resource_total[pg_resource_name] += total continue usage_lines = [] @@ -451,26 +462,37 @@ def get_usage_report(lm_summary: LoadMetricsSummary) -> str: if "node:" in resource: continue # Skip the auto-added per-node "node:" resource. - (_, pg_name) = parse_placement_group_resource_str(resource) + (_, pg_name, _) = parse_placement_group_resource_str(resource) if pg_name: continue # Skip resource used by placement groups - used_in_pg = placement_group_resource_usage[resource] - - line = f" {used}/{total} {resource}" - if used_in_pg != 0: - line = line + f" ({used_in_pg} reserved in placement groups)" + pg_used = 0 + pg_total = 0 + used_in_pg = resource in placement_group_resource_usage + if used_in_pg: + pg_used = placement_group_resource_usage[resource] + pg_total = placement_group_resource_total[resource] + # Used includes pg_total because when pgs are created + # it allocates resources. + # To get the real resource usage, we should subtract the pg + # reserved resources from the usage and add pg used instead. + used = used - pg_total + pg_used if resource in ["memory", "object_store_memory"]: to_GiB = 1 / 2**30 - used *= to_GiB - total *= to_GiB - used_in_pg *= to_GiB - line = f" {used:.2f}/{total:.3f} GiB {resource}" - if used_in_pg != 0: - line = line + f" ({used_in_pg:.2f} GiB reserved" \ - + " in placement groups)" - usage_lines.append(line) + line = (f" {(used * to_GiB):.2f}/" + f"{(total * to_GiB):.3f} GiB {resource}") + if used_in_pg: + line = line + (f" ({(pg_used * to_GiB):.2f} used of " + f"{(pg_total * to_GiB):.2f} GiB " + + "reserved in placement groups)") + usage_lines.append(line) + else: + line = f" {used}/{total} {resource}" + if used_in_pg: + line += (f" ({pg_used} used of " + f"{pg_total} reserved in placement groups)") + usage_lines.append(line) usage_report = "\n".join(usage_lines) return usage_report @@ -488,8 +510,8 @@ def filter_placement_group_from_bundle(bundle: ResourceBundle): using_placement_group = False result_bundle = dict() for pg_resource_str, resource_count in bundle.items(): - (resource_name, - pg_name) = parse_placement_group_resource_str(pg_resource_str) + (resource_name, pg_name, + _) = parse_placement_group_resource_str(pg_resource_str) result_bundle[resource_name] = resource_count if pg_name: using_placement_group = True @@ -600,6 +622,32 @@ def format_info_string(lm_summary, autoscaler_summary, time=None): return formatted_output +def format_no_node_type_string(node_type: dict): + placement_group_resource_usage = {} + regular_resource_usage = collections.defaultdict(float) + for resource, total in node_type.items(): + (pg_resource_name, pg_name, + is_countable) = parse_placement_group_resource_str(resource) + if pg_name: + if not is_countable: + continue + if pg_resource_name not in placement_group_resource_usage: + placement_group_resource_usage[pg_resource_name] = 0 + placement_group_resource_usage[pg_resource_name] += total + else: + regular_resource_usage[resource] += total + + output_lines = [""] + for resource, total in regular_resource_usage.items(): + output_line = f"{resource}: {total}" + if resource in placement_group_resource_usage: + pg_resource = placement_group_resource_usage[resource] + output_line += f" ({pg_resource} reserved in placement groups)" + output_lines.append(output_line) + + return "\n ".join(output_lines) + + def format_info_string_no_node_types(lm_summary, time=None): if time is None: time = datetime.now() @@ -608,7 +656,8 @@ def format_info_string_no_node_types(lm_summary, time=None): node_lines = [] for node_type, count in lm_summary.node_types: - line = f" {count} node(s) with resources: {node_type}" + line = (f" {count} node(s) with resources:" + f"{format_no_node_type_string(node_type)}") node_lines.append(line) node_report = "\n".join(node_lines) diff --git a/python/ray/tests/test_cli.py b/python/ray/tests/test_cli.py index 2b5b4212d8878..15b947798addb 100644 --- a/python/ray/tests/test_cli.py +++ b/python/ray/tests/test_cli.py @@ -204,9 +204,8 @@ def _check_output_via_pattern(name, result): expected_lines = _load_output_pattern(name) if result.exception is not None: - print(result.output) raise result.exception from None - + print(result.output) expected = r" *\n".join(expected_lines) + "\n?" if re.fullmatch(expected, result.output) is None: _debug_check_line_by_line(result, expected_lines) diff --git a/python/ray/tests/test_cli_patterns/test_ray_status.txt b/python/ray/tests/test_cli_patterns/test_ray_status.txt index f903c6d62503f..638cfb009a069 100644 --- a/python/ray/tests/test_cli_patterns/test_ray_status.txt +++ b/python/ray/tests/test_cli_patterns/test_ray_status.txt @@ -1,7 +1,11 @@ ======== Cluster status: .+ Node status ------------------------------------------------------------ - 1 node\(s\) with resources: .+ + 1 node\(s\) with resources: + .+ + .+ + .+ + .+ Resources ------------------------------------------------------------ diff --git a/python/ray/tests/test_placement_group.py b/python/ray/tests/test_placement_group.py index 2173770a7b439..89b63e945c1ee 100644 --- a/python/ray/tests/test_placement_group.py +++ b/python/ray/tests/test_placement_group.py @@ -1980,5 +1980,44 @@ def is_usage_updated(): assert demand_output["demand"] == "(no resource demands)" +def test_placement_group_status(ray_start_cluster): + cluster = ray_start_cluster + cluster.add_node(num_cpus=4) + ray.init(address=cluster.address) + + @ray.remote(num_cpus=1) + class A: + def ready(self): + pass + + pg = ray.util.placement_group([{"CPU": 1}]) + ray.get(pg.ready()) + + # Wait until the usage is updated, which is + # when the demand is also updated. + def is_usage_updated(): + demand_output = get_ray_status_output(cluster.address) + return demand_output["usage"] != "" + + wait_for_condition(is_usage_updated) + demand_output = get_ray_status_output(cluster.address) + cpu_usage = demand_output["usage"].split("\n")[0] + expected = "0.0/4.0 CPU (0.0 used of 1.0 reserved in placement groups)" + assert cpu_usage == expected + + # 2 CPU + 1 PG CPU == 3.0/4.0 CPU (1 used by pg) + actors = [A.remote() for _ in range(2)] + actors_in_pg = [A.options(placement_group=pg).remote() for _ in range(1)] + + ray.get([actor.ready.remote() for actor in actors]) + ray.get([actor.ready.remote() for actor in actors_in_pg]) + # Wait long enough until the usage is propagated to GCS. + time.sleep(5) + demand_output = get_ray_status_output(cluster.address) + cpu_usage = demand_output["usage"].split("\n")[0] + expected = "3.0/4.0 CPU (1.0 used of 1.0 reserved in placement groups)" + assert cpu_usage == expected + + if __name__ == "__main__": sys.exit(pytest.main(["-sv", __file__])) diff --git a/python/ray/tests/test_resource_demand_scheduler.py b/python/ray/tests/test_resource_demand_scheduler.py index 24941559f14fd..6aa37698698c5 100644 --- a/python/ray/tests/test_resource_demand_scheduler.py +++ b/python/ray/tests/test_resource_demand_scheduler.py @@ -2391,7 +2391,7 @@ def test_info_string(): lm_summary = LoadMetricsSummary( head_ip="0.0.0.0", usage={ - "CPU": (530, 544), + "CPU": (530.0, 544.0), "GPU": (2, 2), "AcceleratorType:V100": (0, 2), "memory": (2 * 2**30, 2**33), @@ -2439,7 +2439,7 @@ def test_info_string(): Usage: 0/2 AcceleratorType:V100 - 530/544 CPU + 530.0/544.0 CPU 2/2 GPU 2.00/8.000 GiB memory 3.14/16.000 GiB object_store_memory @@ -2461,7 +2461,7 @@ def test_info_string_failed_node_cap(): lm_summary = LoadMetricsSummary( head_ip="0.0.0.0", usage={ - "CPU": (530, 544), + "CPU": (530.0, 544.0), "GPU": (2, 2), "AcceleratorType:V100": (0, 2), "memory": (2 * 2**30, 2**33), @@ -2532,7 +2532,7 @@ def test_info_string_failed_node_cap(): Usage: 0/2 AcceleratorType:V100 - 530/544 CPU (2.0 reserved in placement groups) + 530.0/544.0 CPU (2.0 used of 2.0 reserved in placement groups) 2/2 GPU 2.00/8.000 GiB memory 3.14/16.000 GiB object_store_memory @@ -2556,13 +2556,16 @@ def test_info_string_no_node_type(): lm_summary = LoadMetricsSummary( head_ip="0.0.0.0", usage={ - "CPU": (530, 544), + "CPU": (530.0, 544.0), "GPU": (2, 2), "AcceleratorType:V100": (0, 2), - "memory": (2 * 2**30, 2**33), + "memory": (6 * 2**30, 2**33), "object_store_memory": (3.14 * 2**30, 2**34), - "CPU_group_4a82a217aadd8326a3a49f02700ac5c2": (2.0, 2.0), - "memory_group_4a82a217aadd8326a3a49f02700ac5c2": (2**32, 2.0) + "CPU_group_4a82a217aadd8326a3a49f02700ac5c2": (1.0, 2.0), + "CPU_group_1_4a82a217aadd8326a3a49f02700ac5c2": (0.0, 1.0), + "CPU_group_2_4a82a217aadd8326a3a49f02700ac5c2": (1.0, 1.0), + "memory_group_4a82a217aadd8326a3a49f02700ac5c2": (2**32, 2**32), + "memory_group_0_4a82a217aadd8326a3a49f02700ac5c2": (2**32, 2**32) }, resource_demand=[({ "GPU": 0.5, @@ -2585,22 +2588,30 @@ def test_info_string_no_node_type(): "CPU": 16 }, 100)], node_types=[({ - "CPU": 16 + "CPU": 16, + "CPU_group_4a82a217aadd8326a3a49f02700ac5c2": 2.0, + "CPU_group_1_4a82a217aadd8326a3a49f02700ac5c2": 1.0, + "CPU_group_2_4a82a217aadd8326a3a49f02700ac5c2": 1.0, + "memory": 2**33, + "memory_group_4a82a217aadd8326a3a49f02700ac5c2": 4 * 2**30, + "memory_group_0_4a82a217aadd8326a3a49f02700ac5c2": 4 * 2**30, }, 1)]) expected = """ ======== Cluster status: 2020-12-28 01:02:03 ======== Node status ----------------------------------------------------- - 1 node(s) with resources: {'CPU': 16} + 1 node(s) with resources: + CPU: 16.0 (2.0 reserved in placement groups) + memory: 8589934592.0 (4294967296 reserved in placement groups) Resources ----------------------------------------------------- Usage: 0/2 AcceleratorType:V100 - 530/544 CPU (2.0 reserved in placement groups) + 529.0/544.0 CPU (1.0 used of 2.0 reserved in placement groups) 2/2 GPU - 2.00/8.000 GiB memory (4.00 GiB reserved in placement groups) + 6.00/8.000 GiB memory (4.00 used of 4.00 GiB reserved in placement groups) 3.14/16.000 GiB object_store_memory Demands: