diff --git a/.gitignore b/.gitignore index b6e47617de..c3d511b0c9 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ +# macOS +.DS_Store + # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] diff --git a/conftest.py b/conftest.py index c44297e9d3..78fb427840 100644 --- a/conftest.py +++ b/conftest.py @@ -246,6 +246,12 @@ def _sample_memory(client): # ############################################### # +@pytest.fixture +def test_name_uuid(request): + "Test name, suffixed with a UUID. Useful for resources like cluster names, S3 paths, etc." + return f"{request.node.originalname}-{uuid.uuid4().hex}" + + @pytest.fixture(scope="module") def small_cluster(request): # Extract `backend_options` for cluster from `backend_options` markers @@ -256,7 +262,7 @@ def small_cluster(request): with Cluster( name=f"{module}-{uuid.uuid4().hex[:8]}", n_workers=10, - worker_vm_types=["t3.large"], + worker_vm_types=["t3.large"], # 2CPU, 8GiB scheduler_vm_types=["t3.large"], backend_options=backend_options, ) as cluster: @@ -308,8 +314,8 @@ def s3_scratch(s3): @pytest.fixture(scope="function") -def s3_url(s3, s3_scratch, request): - url = f"{s3_scratch}/{request.node.originalname}-{uuid.uuid4().hex}" +def s3_url(s3, s3_scratch, test_name_uuid): + url = f"{s3_scratch}/{test_name_uuid}" s3.mkdirs(url, exist_ok=False) yield url s3.rm(url, recursive=True) diff --git a/setup.cfg b/setup.cfg index 6970620470..e48fa1ff91 100644 --- a/setup.cfg +++ b/setup.cfg @@ -8,6 +8,7 @@ ignore = [isort] skip = alembic +profile = black [tool:pytest] addopts = -v -rsxfE --durations=0 --color=yes --strict-markers --strict-config diff --git a/tests/benchmarks/test_array.py b/tests/benchmarks/test_array.py new file mode 100644 index 0000000000..c3b9d9209d --- /dev/null +++ b/tests/benchmarks/test_array.py @@ -0,0 +1,149 @@ +from __future__ import annotations + +import dask.array as da +import numpy as np +import pytest +import xarray as xr +from dask.utils import format_bytes + +from ..utils_test import arr_to_devnull, cluster_memory, scaled_array_shape, wait + + +def print_size_info(memory: int, target_nbytes: int, *arrs: da.Array) -> None: + print( + f"Cluster memory: {format_bytes(memory)}, target data size: {format_bytes(target_nbytes)}" + ) + for i, arr in enumerate(arrs, 1): + print( + f"Input {i}: {format_bytes(arr.nbytes)} - " + f"{arr.npartitions} {format_bytes(arr.blocks[(0,) * arr.ndim].nbytes)} chunks" + ) + + +def test_anom_mean(small_client): + # From https://github.com/dask/distributed/issues/2602#issuecomment-498718651 + + memory = cluster_memory(small_client) # 76.66 GiB + target_nbytes = memory // 2 + data = da.random.random( + scaled_array_shape(target_nbytes, ("x", "10MiB")), chunks=(1, "10MiB") + ) + print_size_info(memory, target_nbytes, data) + # 38.32 GiB - 3925 10.00 MiB chunks + + ngroups = data.shape[0] // 100 + arr = xr.DataArray( + data, + dims=["time", "x"], + coords={"day": ("time", np.arange(data.shape[0]) % ngroups)}, + ) + + clim = arr.groupby("day").mean(dim="time") + anom = arr.groupby("day") - clim + anom_mean = anom.mean(dim="time") + + wait(anom_mean, small_client, 10 * 60) + + +def test_basic_sum(small_client): + # From https://github.com/dask/distributed/pull/4864 + + memory = cluster_memory(small_client) # 76.66 GiB + target_nbytes = memory * 5 + data = da.zeros( + scaled_array_shape(target_nbytes, ("100MiB", "x")), chunks=("100MiB", 1) + ) + print_size_info(memory, target_nbytes, data) + # 383.20 GiB - 3924 100.00 MiB chunks + + result = da.sum(data, axis=1) + + wait(result, small_client, 10 * 60) + + +@pytest.mark.skip( + "fails in actual CI; see https://github.com/coiled/coiled-runtime/issues/253" +) +def test_climatic_mean(small_client): + # From https://github.com/dask/distributed/issues/2602#issuecomment-535009454 + + memory = cluster_memory(small_client) # 76.66 GiB + target_nbytes = memory * 2 + chunks = (1, 1, 96, 21, 90, 144) + shape = (28, "x", 96, 21, 90, 144) + data = da.random.random(scaled_array_shape(target_nbytes, shape), chunks=chunks) + print_size_info(memory, target_nbytes, data) + # 152.62 GiB - 784 199.34 MiB chunks + + array = xr.DataArray( + data, + dims=["ensemble", "init_date", "lat", "lead_time", "level", "lon"], + # coords={"init_date": pd.date_range(start="1960", periods=arr.shape[1])}, + coords={"init_date": np.arange(data.shape[1]) % 10}, + ) + # arr_clim = array.groupby("init_date.month").mean(dim="init_date") + arr_clim = array.groupby("init_date").mean(dim="init_date") + + wait(arr_clim, small_client, 15 * 60) + + +def test_vorticity(small_client): + # From https://github.com/dask/distributed/issues/6571 + + memory = cluster_memory(small_client) # 76.66 GiB + target_nbytes = int(memory * 0.85) + shape = scaled_array_shape(target_nbytes, (5000, 5000, "x")) + + u = da.random.random(shape, chunks=(5000, 5000, 1)) + v = da.random.random(shape, chunks=(5000, 5000, 1)) + print_size_info(memory, target_nbytes, u, v) + # Input 1: 65.19 GiB - 350 190.73 MiB chunks + # Input 2: 65.19 GiB - 350 190.73 MiB chunks + + dx = da.random.random((5001, 5000), chunks=(5001, 5000)) + dy = da.random.random((5001, 5000), chunks=(5001, 5000)) + + def pad_rechunk(arr): + """ + Pad a single element onto the end of arr, then merge the 1-element long chunk + created back in. + + This operation complicates each chain of the graph enough so that the scheduler + no longer recognizes the overall computation as blockwise, but doesn't actually + change the overall topology of the graph, or the number of chunks along any + dimension of the array. + + This is motivated by the padding operation we do in xGCM, see + + https://xgcm.readthedocs.io/en/latest/grid_ufuncs.html#automatically-applying-boundary-conditions + https://github.com/xgcm/xgcm/blob/fe860f96bbaa7293142254f48663d71fb97a4f36/xgcm/grid_ufunc.py#L871 + """ + + padded = da.pad(arr, pad_width=[(0, 1), (0, 0), (0, 0)], mode="wrap") + old_chunks = padded.chunks + new_chunks = list(old_chunks) + new_chunks[0] = 5001 + rechunked = da.rechunk(padded, chunks=new_chunks) + return rechunked + + up = pad_rechunk(u) + vp = pad_rechunk(v) + result = dx[..., None] * up - dy[..., None] * vp + + wait(arr_to_devnull(result), small_client, 10 * 60) + + +def test_double_diff(small_client): + # Variant of https://github.com/dask/distributed/issues/6597 + memory = cluster_memory(small_client) # 76.66 GiB + + a = da.random.random( + scaled_array_shape(memory, ("x", "x")), chunks=("20MiB", "20MiB") + ) + b = da.random.random( + scaled_array_shape(memory, ("x", "x")), chunks=("20MiB", "20MiB") + ) + print_size_info(memory, memory, a, b) + + diff = a[1:, 1:] - b[:-1, :-1] + wait(arr_to_devnull(diff), small_client, 10 * 60) diff --git a/tests/benchmarks/test_custom.py b/tests/benchmarks/test_custom.py new file mode 100644 index 0000000000..f2f6362e5e --- /dev/null +++ b/tests/benchmarks/test_custom.py @@ -0,0 +1,33 @@ +import random +import time + +from dask import delayed +from dask.utils import parse_bytes + +from ..utils_test import wait + + +def test_jobqueue(small_client): + # Just using dask to run lots of embarrassingly-parallel CPU-bound tasks as fast as possible + nthreads = sum( + w["nthreads"] for w in small_client.scheduler_info()["workers"].values() + ) + max_runtime = 120 + max_sleep = 3 + n_tasks = round(max_runtime / max_sleep * nthreads) + + @delayed(pure=True) + def task(i: int) -> int: + stuff = "x" * parse_bytes("400MiB") + time.sleep(random.uniform(0, max_sleep)) + del stuff + return i + + tasks = [task(i) for i in range(n_tasks)] + result = delayed(sum)(tasks) # just so we have a single object + + wait( + result, + small_client, + max_runtime * 1.15, + ) diff --git a/tests/benchmarks/test_dataframe.py b/tests/benchmarks/test_dataframe.py new file mode 100644 index 0000000000..4d4f5ce00d --- /dev/null +++ b/tests/benchmarks/test_dataframe.py @@ -0,0 +1,60 @@ +from dask.sizeof import sizeof +from dask.utils import format_bytes + +from ..utils_test import cluster_memory, timeseries_of_size, wait + + +def print_dataframe_info(df): + p = df.partitions[0].compute(scheduler="threads") + partition_size = sizeof(p) + total_size = partition_size * df.npartitions + print( + f"~{len(p) * df.npartitions:,} rows x {len(df.columns)} columns, " + f"{format_bytes(total_size)} total, " + f"{df.npartitions:,} {format_bytes(partition_size)} partitions" + ) + + +def test_dataframe_align(small_client): + memory = cluster_memory(small_client) # 76.66 GiB + + df = timeseries_of_size( + memory // 2, + start="2020-01-01", + freq="600ms", + partition_freq="12h", + dtypes={i: float for i in range(100)}, + ) + print_dataframe_info(df) + # ~50,904,000 rows x 100 columns, 38.31 GiB total, 707 55.48 MiB partitions + + df2 = timeseries_of_size( + memory // 4, + start="2010-01-01", + freq="600ms", + partition_freq="12h", + dtypes={i: float for i in range(100)}, + ) + print_dataframe_info(df2) + # ~25,488,000 rows x 100 columns, 19.18 GiB total, 354 55.48 MiB partitions + + final = (df2 - df).mean() # will be all NaN, just forcing alignment + wait(final, small_client, 10 * 60) + + +def test_shuffle(small_client): + memory = cluster_memory(small_client) # 76.66 GiB + + df = timeseries_of_size( + memory // 4, + start="2020-01-01", + freq="1200ms", + partition_freq="24h", + dtypes={i: float for i in range(100)}, + ) + print_dataframe_info(df) + # ~25,488,000 rows x 100 columns, 19.18 GiB total, 354 55.48 MiB partitions + + shuf = df.shuffle(0, shuffle="tasks") + result = shuf.size + wait(result, small_client, 20 * 60) diff --git a/tests/test_utils_test.py b/tests/test_utils_test.py new file mode 100644 index 0000000000..a768d2022a --- /dev/null +++ b/tests/test_utils_test.py @@ -0,0 +1,41 @@ +import dask +import numpy as np +import pytest +from dask.sizeof import sizeof +from dask.utils import parse_bytes + +from .utils_test import scaled_array_shape, timeseries_of_size + + +def test_scaled_array_shape(): + assert scaled_array_shape(1024, (2, "x"), dtype=bool) == (2, 512) + assert scaled_array_shape(1024, (2, "x"), dtype=float) == (2, 64) + assert scaled_array_shape(1024, (2, "x"), dtype=np.float64) == (2, 64) + assert scaled_array_shape(1024, (2, "x")) == (2, 64) + + assert scaled_array_shape(16, ("x", "x"), dtype=bool) == (4, 4) + assert scaled_array_shape(256, ("4x", "x"), dtype=bool) == (32, 8) + assert scaled_array_shape(64, ("x", "x", "x"), dtype=float) == (2, 2, 2) + + assert scaled_array_shape("10kb", ("x", "1kb"), dtype=bool) == (10, 1000) + + +def sizeof_df(df): + # Measure the size of each partition separately (each one has overhead of being a separate DataFrame) + # TODO more efficient method than `df.partitions`? Use `dask.get` directly? + parts = dask.compute( + [df.partitions[i] for i in range(df.npartitions)], scheduler="threads" + ) + return sum(map(sizeof, parts)) + + +def test_timeseries_of_size(): + small_parts = timeseries_of_size( + "1mb", freq="1s", partition_freq="100s", dtypes={"x": float} + ) + big_parts = timeseries_of_size( + "1mb", freq="1s", partition_freq="100s", dtypes={i: float for i in range(10)} + ) + assert sizeof_df(small_parts) == pytest.approx(parse_bytes("1mb"), rel=0.1) + assert sizeof_df(big_parts) == pytest.approx(parse_bytes("1mb"), rel=0.1) + assert big_parts.npartitions < small_parts.npartitions diff --git a/tests/utils_test.py b/tests/utils_test.py new file mode 100644 index 0000000000..73fbb3c85d --- /dev/null +++ b/tests/utils_test.py @@ -0,0 +1,189 @@ +from __future__ import annotations + +import dask +import dask.array as da +import dask.dataframe as dd +import distributed +import numpy as np +import pandas as pd +from dask.datasets import timeseries +from dask.sizeof import sizeof +from dask.utils import format_bytes, parse_bytes + + +def scaled_array_shape( + target_nbytes: int | str, + shape: tuple[int | str, ...], + *, + dtype: np.dtype | type = np.dtype(float), + max_error: float = 0.1, +) -> tuple[int, ...]: + """ + Given a shape with free variables in it, generate the shape that results in the target array size. + + Example + ------- + >>> scaled_array_shape(1024, (2, "x"), dtype=bool) + (2, 512) + >>> scaled_array_shape(2048, (2, "x"), dtype=bool) + (2, 1024) + >>> scaled_array_shape(16, ("x", "x"), dtype=bool) + (4, 4) + >>> scaled_array_shape(256, ("4x", "x"), dtype=bool) + (32, 8) + >>> scaled_array_shape("10kb", ("x", "1kb"), dtype=bool) + (10, 1000) + """ + if isinstance(target_nbytes, str): + target_nbytes = parse_bytes(target_nbytes) + + dtype = np.dtype(dtype) + # Given a shape like: + # (10, "2x", 3, "x", 50) + # We're solving for x in: + # `10 * 2x * 3 * x * 50 * dtype.itemsize == target_nbytes` + # aka: + # `3000x^2 * dtype.itemsize == target_nbytes` + resolved_shape: list[int | None] = [] + x_locs_coeffs: list[tuple[int, float]] = [] + total_coeff = 1 + for i, s in enumerate(shape): + if isinstance(s, str): + if s[-1] == "x": + coeff = 1 if len(s) == 1 else float(s[:-1]) + assert coeff > 0, coeff + x_locs_coeffs.append((i, coeff)) + total_coeff *= coeff + resolved_shape.append(None) + continue + else: + s = parse_bytes(s) // dtype.itemsize + + assert s > 0, s + total_coeff *= s + resolved_shape.append(s) + + assert x_locs_coeffs, f"Expected at least 1 `x` value in shape {shape}" + total_coeff *= dtype.itemsize + x = (target_nbytes / total_coeff) ** (1 / len(x_locs_coeffs)) + + # Replace `x` values back into shape + for i, coeff in x_locs_coeffs: + assert resolved_shape[i] is None + resolved_shape[i] = round(coeff * x) + + final = tuple(s for s in resolved_shape if s is not None) + assert len(final) == len(resolved_shape), resolved_shape + + actual_nbytes = np.prod(final) * dtype.itemsize + error = (actual_nbytes - target_nbytes) / actual_nbytes + assert abs(error) < max_error, (error, actual_nbytes, target_nbytes, final) + return final + + +def wait(thing, client, timeout): + "Like `distributed.wait(thing.persist())`, but if any tasks fail, raises its error." + p = thing.persist() + try: + distributed.wait(p, timeout=timeout) + for f in client.futures_of(p): + if f.status in ("error", "cancelled"): + raise f.exception() + finally: + client.cancel(p) + + +def cluster_memory(client: distributed.Client) -> int: + "Total memory available on the cluster, in bytes" + return int( + sum(w["memory_limit"] for w in client.scheduler_info()["workers"].values()) + ) + + +def timeseries_of_size( + target_nbytes: int | str, + *, + start="2000-01-01", + freq="1s", + partition_freq="1d", + dtypes={"name": str, "id": int, "x": float, "y": float}, + seed=None, + **kwargs, +) -> dd.DataFrame: + """ + Generate a `dask.demo.timeseries` of a target total size. + + Same arguments as `dask.demo.timeseries`, but instead of specifying an ``end`` date, + you specify ``target_nbytes``. The number of partitions is set as necessary to reach + approximately that total dataset size. Note that you control the partition size via + ``freq``, ``partition_freq``, and ``dtypes``. + + Examples + -------- + >>> timeseries_of_size( + ... "1mb", freq="1s", partition_freq="100s", dtypes={"x": float} + ... ).npartitions + 278 + >>> timeseries_of_size( + ... "1mb", freq="1s", partition_freq="100s", dtypes={i: float for i in range(10)} + ... ).npartitions + 93 + + Notes + ----- + The ``target_nbytes`` refers to the amount of RAM the dask DataFrame would use up + across all workers, as many pandas partitions. + + This is typically larger than ``df.compute()`` would be as a single pandas + DataFrame. Especially with many partions, there can be significant overhead to + storing all the individual pandas objects. + + Additionally, ``target_nbytes`` certainly does not correspond to the size + the dataset would take up on disk (as parquet, csv, etc.). + """ + if isinstance(target_nbytes, str): + target_nbytes = parse_bytes(target_nbytes) + + start_dt = pd.to_datetime(start) + partition_freq_dt = pd.to_timedelta(partition_freq) + example_part = timeseries( + start=start, + end=start_dt + partition_freq_dt, + freq=freq, + partition_freq=partition_freq, + dtypes=dtypes, + seed=seed, + **kwargs, + ) + p = example_part.compute(scheduler="threads") + partition_size = sizeof(p) + npartitions = round(target_nbytes / partition_size) + assert npartitions > 0, ( + f"Partition size of {format_bytes(partition_size)} > " + f"target size {format_bytes(target_nbytes)}" + ) + + ts = timeseries( + start=start, + end=start_dt + partition_freq_dt * npartitions, + freq=freq, + partition_freq=partition_freq, + dtypes=dtypes, + seed=seed, + **kwargs, + ) + assert ts.npartitions == npartitions + return ts + + +def arr_to_devnull(arr: da.Array) -> dask.delayed: + "Simulate storing an array to zarr, without writing anything (just drops every block once it's computed)" + + # NOTE: this class must be defined inside the function so it's cloudpickled as code, + # otherwise `tests/utils_test` would have to be installed on the cluster. + class _DevNull: + def __setitem__(self, k, v): + pass + + # TODO `da.store` should use blockwise to be much more efficient https://github.com/dask/dask/issues/9381 + return da.store(arr, _DevNull(), lock=False, compute=False)