From 74c3ba1b0103883e0bf340be78d428a577ed9f5e Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Mon, 14 Oct 2024 15:29:19 +0200 Subject: [PATCH] Don't store cluster dumps anymore --- .github/workflows/ab_tests.yml | 1 - .github/workflows/tests.yml | 1 - .github/workflows/tpch.yml | 1 - tests/benchmarks/test_parquet.py | 4 +- tests/benchmarks/test_spill.py | 4 +- tests/benchmarks/test_work_stealing.py | 6 +- tests/benchmarks/test_xarray.py | 6 +- tests/conftest.py | 81 ++++++-------------------- tests/stability/test_deadlock.py | 3 +- 9 files changed, 26 insertions(+), 81 deletions(-) diff --git a/.github/workflows/ab_tests.yml b/.github/workflows/ab_tests.yml index bfe2e32aef..68d88c4545 100644 --- a/.github/workflows/ab_tests.yml +++ b/.github/workflows/ab_tests.yml @@ -104,7 +104,6 @@ jobs: SNOWFLAKE_ROLE: ${{ secrets.SNOWFLAKE_ROLE }} AB_VERSION: ${{ matrix.runtime-version }} DB_NAME: ${{ matrix.runtime-version }}-${{ matrix.repeat }}.db - CLUSTER_DUMP: always CLUSTER_KWARGS: AB_environments/${{ matrix.runtime-version }}.cluster.yaml H2O_DATASETS: ${{ matrix.h2o_datasets }} run: pytest --benchmark ${{ matrix.pytest_args }} diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 69d483925d..ae869f04ed 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -140,7 +140,6 @@ jobs: SNOWFLAKE_ROLE: ${{ secrets.SNOWFLAKE_ROLE }} COILED_RUNTIME_VERSION: ${{ matrix.runtime-version }} DB_NAME: ${{ matrix.name_prefix }}-${{ matrix.os }}-py${{ matrix.python_version }}.db - CLUSTER_DUMP: always DASK_DATAFRAME__QUERY_PLANNING: True run: | pytest --benchmark -n 4 --dist loadscope ${{ env.PYTEST_MARKERS }} ${{ matrix.pytest_args }} diff --git a/.github/workflows/tpch.yml b/.github/workflows/tpch.yml index 8c7323cdec..eb11a2ef65 100644 --- a/.github/workflows/tpch.yml +++ b/.github/workflows/tpch.yml @@ -100,7 +100,6 @@ jobs: AWS_SECRET_ACCESS_KEY: ${{ secrets.RUNTIME_CI_BOT_AWS_SECRET_ACCESS_KEY }} COILED_RUNTIME_VERSION: ${{ matrix.runtime-version }} DB_NAME: tpch_${{ inputs.scale }}.db - CLUSTER_DUMP: always DASK_DATAFRAME__QUERY_PLANNING: True run: | pytest --benchmark \ diff --git a/tests/benchmarks/test_parquet.py b/tests/benchmarks/test_parquet.py index 3478b62e99..9ca9ac8c3d 100644 --- a/tests/benchmarks/test_parquet.py +++ b/tests/benchmarks/test_parquet.py @@ -40,13 +40,13 @@ def parquet_cluster(dask_env_variables, cluster_kwargs, github_cluster_tags): @pytest.fixture -def parquet_client(parquet_cluster, cluster_kwargs, upload_cluster_dump, benchmark_all): +def parquet_client(parquet_cluster, cluster_kwargs, benchmark_all): n_workers = cluster_kwargs["parquet_cluster"]["n_workers"] with distributed.Client(parquet_cluster) as client: parquet_cluster.scale(n_workers) client.wait_for_workers(n_workers, timeout=600) client.restart() - with upload_cluster_dump(client), benchmark_all(client): + with benchmark_all(client): yield client diff --git a/tests/benchmarks/test_spill.py b/tests/benchmarks/test_spill.py index aff0dc2937..e4de0fb909 100644 --- a/tests/benchmarks/test_spill.py +++ b/tests/benchmarks/test_spill.py @@ -35,13 +35,13 @@ def spill_cluster(dask_env_variables, cluster_kwargs, github_cluster_tags): @pytest.fixture -def spill_client(spill_cluster, cluster_kwargs, upload_cluster_dump, benchmark_all): +def spill_client(spill_cluster, cluster_kwargs, benchmark_all): n_workers = cluster_kwargs["spill_cluster"]["n_workers"] with Client(spill_cluster) as client: spill_cluster.scale(n_workers) client.wait_for_workers(n_workers, timeout=600) client.restart() - with upload_cluster_dump(client), benchmark_all(client): + with benchmark_all(client): yield client diff --git a/tests/benchmarks/test_work_stealing.py b/tests/benchmarks/test_work_stealing.py index af7cf14300..78ce21a50a 100644 --- a/tests/benchmarks/test_work_stealing.py +++ b/tests/benchmarks/test_work_stealing.py @@ -28,7 +28,6 @@ def test_trivial_workload_should_not_cause_work_stealing(small_client): ) def test_work_stealing_on_scaling_up( test_name_uuid, - upload_cluster_dump, benchmark_all, cluster_kwargs, dask_env_variables, @@ -43,7 +42,7 @@ def test_work_stealing_on_scaling_up( with Client(cluster) as client: # FIXME https://github.com/coiled/platform/issues/103 client.wait_for_workers(1, timeout=300) - with upload_cluster_dump(client), benchmark_all(client): + with benchmark_all(client): # Slow task. def func1(chunk): if sum(chunk.shape) != 0: # Make initialization fast @@ -89,7 +88,6 @@ def clog(n): @run_up_to_nthreads("small_cluster", 100, reason="fixed dataset") def test_work_stealing_on_straggling_worker( test_name_uuid, - upload_cluster_dump, benchmark_all, cluster_kwargs, dask_env_variables, @@ -105,7 +103,7 @@ def test_work_stealing_on_straggling_worker( with Client(cluster) as client: # FIXME https://github.com/coiled/platform/issues/103 client.wait_for_workers(kwargs["n_workers"], timeout=600) - with upload_cluster_dump(client), benchmark_all(client): + with benchmark_all(client): def clog(): time.sleep(1) diff --git a/tests/benchmarks/test_xarray.py b/tests/benchmarks/test_xarray.py index 35fbaba50e..d1f0e81bb5 100644 --- a/tests/benchmarks/test_xarray.py +++ b/tests/benchmarks/test_xarray.py @@ -36,15 +36,13 @@ def group_reduction_cluster(dask_env_variables, cluster_kwargs, github_cluster_t @pytest.fixture -def group_reduction_client( - group_reduction_cluster, cluster_kwargs, upload_cluster_dump, benchmark_all -): +def group_reduction_client(group_reduction_cluster, cluster_kwargs, benchmark_all): n_workers = cluster_kwargs["group_reduction_cluster"]["n_workers"] with Client(group_reduction_cluster) as client: group_reduction_cluster.scale(n_workers) client.wait_for_workers(n_workers, timeout=600) client.restart() - with upload_cluster_dump(client), benchmark_all(client): + with benchmark_all(client): yield client diff --git a/tests/conftest.py b/tests/conftest.py index 209281c884..3dba5e1f35 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -558,7 +558,6 @@ def small_client( testrun_uid, small_cluster, cluster_kwargs, - upload_cluster_dump, benchmark_all, ): n_workers = cluster_kwargs["small_cluster"]["n_workers"] @@ -569,26 +568,25 @@ def small_client( small_cluster.scale(n_workers) client.wait_for_workers(n_workers, timeout=600) - with upload_cluster_dump(client): - log_on_scheduler(client, "Finished client setup of %s", test_label) + log_on_scheduler(client, "Finished client setup of %s", test_label) - with benchmark_all(client): - yield client + with benchmark_all(client): + yield client - # Note: normally, this RPC call is almost instantaneous. However, in the - # case where the scheduler is still very busy when the fixtured test returns - # (e.g. test_futures.py::test_large_map_first_work), it can be delayed into - # several seconds. We don't want to capture this extra delay with - # benchmark_time, as it's beyond the scope of the test. - log_on_scheduler(client, "Starting client teardown of %s", test_label) + # Note: normally, this RPC call is almost instantaneous. However, in the + # case where the scheduler is still very busy when the fixtured test returns + # (e.g. test_futures.py::test_large_map_first_work), it can be delayed into + # several seconds. We don't want to capture this extra delay with + # benchmark_time, as it's beyond the scope of the test. + log_on_scheduler(client, "Starting client teardown of %s", test_label) - client.restart() - # Run connects to all workers once and to ensure they're up before we do - # something else. With another call of restart when entering this - # fixture again, this can trigger a race condition that kills workers - # See https://github.com/dask/distributed/issues/7312 Can be removed - # after this issue is fixed. - client.run(lambda: None) + client.restart() + # Run connects to all workers once and to ensure they're up before we do + # something else. With another call of restart when entering this + # fixture again, this can trigger a race condition that kills workers + # See https://github.com/dask/distributed/issues/7312 Can be removed + # after this issue is fixed. + client.run(lambda: None) @pytest.fixture @@ -597,7 +595,6 @@ def client( dask_env_variables, cluster_kwargs, github_cluster_tags, - upload_cluster_dump, benchmark_all, ): name = request.param["name"] @@ -612,7 +609,7 @@ def client( client.upload_file(request.param["upload_file"]) if request.param["worker_plugin"] is not None: client.register_worker_plugin(request.param["worker_plugin"]) - with upload_cluster_dump(client), benchmark_all(client): + with benchmark_all(client): yield client @@ -671,13 +668,6 @@ def s3_url(s3, s3_scratch, test_name_uuid): s3.rm(url, recursive=True) -@pytest.fixture(scope="session") -def s3_cluster_dump_url(s3, s3_scratch): - dump_url = f"{s3_scratch}/cluster_dumps" - s3.mkdirs(dump_url, exist_ok=True) - return dump_url - - GCS_REGION = "us-central1" GCS_BUCKET = "gs://coiled-oss-scratch/benchmarks-bot" @@ -719,43 +709,6 @@ def pytest_runtest_makereport(item, call): setattr(item, "rep_" + rep.when, rep) -@pytest.fixture -def upload_cluster_dump( - request, s3_cluster_dump_url, s3_storage_options, test_run_benchmark -): - @contextlib.contextmanager - def _upload_cluster_dump(client): - failed = False - # the code below is a workaround to make cluster dumps work with clients in fixtures - # and outside fixtures. - try: - yield - except Exception: - failed = True - raise - else: - # we need this for tests that are not using the client fixture - # for those cases request.node.rep_call.failed can't be access. - try: - failed = request.node.rep_call.failed - except AttributeError: - failed = False - finally: - cluster_dump = os.environ.get("CLUSTER_DUMP", "false") - if cluster_dump == "always" or (cluster_dump == "fail" and failed): - dump_path = ( - f"{s3_cluster_dump_url}/{client.cluster.name}/" - f"{test_run_benchmark.path.replace('/', '.')}.{request.node.name}" - ) - test_run_benchmark.cluster_dump_url = dump_path + ".msgpack.gz" - logger.info( - f"Cluster state dump can be found at: {dump_path}.msgpack.gz" - ) - client.dump_cluster_state(dump_path, **s3_storage_options) - - yield _upload_cluster_dump - - requires_p2p_shuffle = pytest.mark.skipif( Version(distributed.__version__) < Version("2023.1.0"), reason="p2p shuffle not available", diff --git a/tests/stability/test_deadlock.py b/tests/stability/test_deadlock.py index a0bd402438..40b0144d15 100644 --- a/tests/stability/test_deadlock.py +++ b/tests/stability/test_deadlock.py @@ -15,7 +15,6 @@ reason="https://github.com/dask/distributed/issues/6110", ) def test_repeated_merge_spill( - upload_cluster_dump, benchmark_all, cluster_kwargs, dask_env_variables, @@ -28,7 +27,7 @@ def test_repeated_merge_spill( **cluster_kwargs["test_repeated_merge_spill"], ) as cluster: with Client(cluster) as client: - with upload_cluster_dump(client), benchmark_all(client): + with benchmark_all(client): ddf = dask.datasets.timeseries( "2020", "2025",