Internal cleanup of P2P code #8907
170 fail, 110 skipped, 3 843 pass in 9h 44m 29s
Annotations
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
7 out of 10 runs failed: test_basic_merge[inner] (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49258', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
x y
npartitions=2
0 int64 int64
4 ... ...
5 ... ...
Dask Name: from_pd_divs, 1 expression
Expr=df
b = Dask DataFrame Structure:
y z
npartitions=2
0 int64 int64
2 ... ...
5 ... ...
Dask Name: from_pd_divs, 1 expression
Expr=df
how = 'inner'
@pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
@gen_cluster(client=True)
async def test_basic_merge(c, s, a, b, how):
A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
a = dd.repartition(A, [0, 4, 5])
B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
b = dd.repartition(B, [0, 2, 5])
joined = a.merge(b, left_on="y", right_on="y", how=how)
if dd._dask_expr_enabled():
# Ensure we're using a hash join
from dask_expr._merge import HashJoinP2P
assert any(
isinstance(expr, HashJoinP2P) for expr in joined.optimize()._expr.walk()
)
expected = pd.merge(A, B, how, "y")
> await list_eq(joined, expected)
distributed/shuffle/tests/test_merge.py:91:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/shuffle/tests/test_merge.py:35: in list_eq
a = await c.compute(a) if isinstance(a, dd.DataFrame) else a
distributed/client.py:3669: in compute
dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = Merge(87460a9)
def _layer(self) -> dict:
from distributed.shuffle._core import ShuffleId, barrier_key
from distributed.shuffle._merge import merge_unpack
> from distributed.shuffle._shuffle import shuffle_barrier
E ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
7 out of 10 runs failed: test_basic_merge[left] (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49268', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
x y
npartitions=2
0 int64 int64
4 ... ...
5 ... ...
Dask Name: from_pd_divs, 1 expression
Expr=df
b = Dask DataFrame Structure:
y z
npartitions=2
0 int64 int64
2 ... ...
5 ... ...
Dask Name: from_pd_divs, 1 expression
Expr=df
how = 'left'
@pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
@gen_cluster(client=True)
async def test_basic_merge(c, s, a, b, how):
A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
a = dd.repartition(A, [0, 4, 5])
B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
b = dd.repartition(B, [0, 2, 5])
joined = a.merge(b, left_on="y", right_on="y", how=how)
if dd._dask_expr_enabled():
# Ensure we're using a hash join
from dask_expr._merge import HashJoinP2P
assert any(
isinstance(expr, HashJoinP2P) for expr in joined.optimize()._expr.walk()
)
expected = pd.merge(A, B, how, "y")
> await list_eq(joined, expected)
distributed/shuffle/tests/test_merge.py:91:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/shuffle/tests/test_merge.py:35: in list_eq
a = await c.compute(a) if isinstance(a, dd.DataFrame) else a
distributed/client.py:3669: in compute
dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = Merge(ef2ce9b)
def _layer(self) -> dict:
from distributed.shuffle._core import ShuffleId, barrier_key
from distributed.shuffle._merge import merge_unpack
> from distributed.shuffle._shuffle import shuffle_barrier
E ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
7 out of 10 runs failed: test_basic_merge[right] (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49278', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
x y
npartitions=2
0 int64 int64
4 ... ...
5 ... ...
Dask Name: from_pd_divs, 1 expression
Expr=df
b = Dask DataFrame Structure:
y z
npartitions=2
0 int64 int64
2 ... ...
5 ... ...
Dask Name: from_pd_divs, 1 expression
Expr=df
how = 'right'
@pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
@gen_cluster(client=True)
async def test_basic_merge(c, s, a, b, how):
A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
a = dd.repartition(A, [0, 4, 5])
B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
b = dd.repartition(B, [0, 2, 5])
joined = a.merge(b, left_on="y", right_on="y", how=how)
if dd._dask_expr_enabled():
# Ensure we're using a hash join
from dask_expr._merge import HashJoinP2P
assert any(
isinstance(expr, HashJoinP2P) for expr in joined.optimize()._expr.walk()
)
expected = pd.merge(A, B, how, "y")
> await list_eq(joined, expected)
distributed/shuffle/tests/test_merge.py:91:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/shuffle/tests/test_merge.py:35: in list_eq
a = await c.compute(a) if isinstance(a, dd.DataFrame) else a
distributed/client.py:3669: in compute
dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = Merge(12ef2fe)
def _layer(self) -> dict:
from distributed.shuffle._core import ShuffleId, barrier_key
from distributed.shuffle._merge import merge_unpack
> from distributed.shuffle._shuffle import shuffle_barrier
E ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
7 out of 10 runs failed: test_basic_merge[outer] (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49288', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
x y
npartitions=2
0 int64 int64
4 ... ...
5 ... ...
Dask Name: from_pd_divs, 1 expression
Expr=df
b = Dask DataFrame Structure:
y z
npartitions=2
0 int64 int64
2 ... ...
5 ... ...
Dask Name: from_pd_divs, 1 expression
Expr=df
how = 'outer'
@pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
@gen_cluster(client=True)
async def test_basic_merge(c, s, a, b, how):
A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
a = dd.repartition(A, [0, 4, 5])
B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
b = dd.repartition(B, [0, 2, 5])
joined = a.merge(b, left_on="y", right_on="y", how=how)
if dd._dask_expr_enabled():
# Ensure we're using a hash join
from dask_expr._merge import HashJoinP2P
assert any(
isinstance(expr, HashJoinP2P) for expr in joined.optimize()._expr.walk()
)
expected = pd.merge(A, B, how, "y")
> await list_eq(joined, expected)
distributed/shuffle/tests/test_merge.py:91:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/shuffle/tests/test_merge.py:35: in list_eq
a = await c.compute(a) if isinstance(a, dd.DataFrame) else a
distributed/client.py:3669: in compute
dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = Merge(961ff9e)
def _layer(self) -> dict:
from distributed.shuffle._core import ShuffleId, barrier_key
from distributed.shuffle._merge import merge_unpack
> from distributed.shuffle._shuffle import shuffle_barrier
E ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
7 out of 10 runs failed: test_merge_p2p_shuffle_reused_dataframe_with_different_parameters (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49298', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:49299', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:49302', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
@gen_cluster(client=True)
async def test_merge_p2p_shuffle_reused_dataframe_with_different_parameters(c, s, a, b):
pdf1 = pd.DataFrame({"a": range(100), "b": range(0, 200, 2)})
pdf2 = pd.DataFrame({"x": range(200), "y": [1, 2, 3, 4] * 50})
ddf1 = dd.from_pandas(pdf1, npartitions=5)
ddf2 = dd.from_pandas(pdf2, npartitions=10)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
out = (
ddf1.merge(ddf2, left_on="a", right_on="x")
# Vary the number of output partitions for the shuffles of dd2
.repartition(npartitions=20).merge(ddf2, left_on="b", right_on="x")
)
# Generate unique shuffle IDs if the input frame is the same but
# parameters differ. Reusing shuffles in merges is dangerous because of the
# required coordination and complexity introduced through dynamic clusters.
> assert sum(id_from_key(k) is not None for k in out.dask) == 4
distributed/shuffle/tests/test_merge.py:125:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:599: in dask
return self.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = Merge(ecc0f84)
def _layer(self) -> dict:
from distributed.shuffle._core import ShuffleId, barrier_key
from distributed.shuffle._merge import merge_unpack
> from distributed.shuffle._shuffle import shuffle_barrier
E ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
7 out of 10 runs failed: test_merge_p2p_shuffle_reused_dataframe_with_same_parameters (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49308', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:49309', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:49312', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
@gen_cluster(client=True)
async def test_merge_p2p_shuffle_reused_dataframe_with_same_parameters(c, s, a, b):
pdf1 = pd.DataFrame({"a": range(100), "b": range(0, 200, 2)})
pdf2 = pd.DataFrame({"x": range(200), "y": [1, 2, 3, 4] * 50})
ddf1 = dd.from_pandas(pdf1, npartitions=5)
ddf2 = dd.from_pandas(pdf2, npartitions=10)
# This performs two shuffles:
# * ddf1 is shuffled on `a`
# * ddf2 is shuffled on `x`
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
ddf3 = ddf1.merge(
ddf2,
left_on="a",
right_on="x",
)
# This performs one shuffle:
# * ddf3 is shuffled on `b`
# We can reuse the shuffle of dd2 on `x` from the previous merge.
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
out = ddf2.merge(
ddf3,
left_on="x",
right_on="b",
)
# Generate unique shuffle IDs if the input frame is the same and all its
# parameters match. Reusing shuffles in merges is dangerous because of the
# required coordination and complexity introduced through dynamic clusters.
> assert sum(id_from_key(k) is not None for k in out.dask) == 4
distributed/shuffle/tests/test_merge.py:162:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:599: in dask
return self.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = Merge(c826043)
def _layer(self) -> dict:
from distributed.shuffle._core import ShuffleId, barrier_key
from distributed.shuffle._merge import merge_unpack
> from distributed.shuffle._shuffle import shuffle_barrier
E ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
7 out of 10 runs failed: test_merge[True-inner] (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49318', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
x y
npartitions=2
0 int64 int64
4 ... ...
5 ... ...
Dask Name: from_pd_divs, 1 expression
Expr=df
b = Dask DataFrame Structure:
y z
npartitions=2
0 int64 int64
2 ... ...
5 ... ...
Dask Name: from_pd_divs, 1 expression
Expr=df
how = 'inner', disk = True
@pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
@pytest.mark.parametrize("disk", [True, False])
@gen_cluster(client=True)
async def test_merge(c, s, a, b, how, disk):
A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
a = dd.repartition(A, [0, 4, 5])
B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
b = dd.repartition(B, [0, 2, 5])
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
with dask.config.set({"distributed.p2p.storage.disk": disk}):
joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
res = await c.compute(joined)
assert_eq(
res,
pd.merge(A, B, left_index=True, right_index=True, how=how),
)
joined = dd.merge(a, b, on="y", how=how)
> await list_eq(joined, pd.merge(A, B, on="y", how=how))
distributed/shuffle/tests/test_merge.py:189:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/shuffle/tests/test_merge.py:35: in list_eq
a = await c.compute(a) if isinstance(a, dd.DataFrame) else a
distributed/client.py:3669: in compute
dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = Merge(87460a9)
def _layer(self) -> dict:
from distributed.shuffle._core import ShuffleId, barrier_key
from distributed.shuffle._merge import merge_unpack
> from distributed.shuffle._shuffle import shuffle_barrier
E ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
7 out of 10 runs failed: test_merge[True-outer] (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49331', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
x y
npartitions=2
0 int64 int64
4 ... ...
5 ... ...
Dask Name: from_pd_divs, 1 expression
Expr=df
b = Dask DataFrame Structure:
y z
npartitions=2
0 int64 int64
2 ... ...
5 ... ...
Dask Name: from_pd_divs, 1 expression
Expr=df
how = 'outer', disk = True
@pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
@pytest.mark.parametrize("disk", [True, False])
@gen_cluster(client=True)
async def test_merge(c, s, a, b, how, disk):
A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
a = dd.repartition(A, [0, 4, 5])
B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
b = dd.repartition(B, [0, 2, 5])
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
with dask.config.set({"distributed.p2p.storage.disk": disk}):
joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
res = await c.compute(joined)
assert_eq(
res,
pd.merge(A, B, left_index=True, right_index=True, how=how),
)
joined = dd.merge(a, b, on="y", how=how)
> await list_eq(joined, pd.merge(A, B, on="y", how=how))
distributed/shuffle/tests/test_merge.py:189:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/shuffle/tests/test_merge.py:35: in list_eq
a = await c.compute(a) if isinstance(a, dd.DataFrame) else a
distributed/client.py:3669: in compute
dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = Merge(961ff9e)
def _layer(self) -> dict:
from distributed.shuffle._core import ShuffleId, barrier_key
from distributed.shuffle._merge import merge_unpack
> from distributed.shuffle._shuffle import shuffle_barrier
E ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
7 out of 10 runs failed: test_merge[True-left] (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49344', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
x y
npartitions=2
0 int64 int64
4 ... ...
5 ... ...
Dask Name: from_pd_divs, 1 expression
Expr=df
b = Dask DataFrame Structure:
y z
npartitions=2
0 int64 int64
2 ... ...
5 ... ...
Dask Name: from_pd_divs, 1 expression
Expr=df
how = 'left', disk = True
@pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
@pytest.mark.parametrize("disk", [True, False])
@gen_cluster(client=True)
async def test_merge(c, s, a, b, how, disk):
A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
a = dd.repartition(A, [0, 4, 5])
B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
b = dd.repartition(B, [0, 2, 5])
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
with dask.config.set({"distributed.p2p.storage.disk": disk}):
joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
res = await c.compute(joined)
assert_eq(
res,
pd.merge(A, B, left_index=True, right_index=True, how=how),
)
joined = dd.merge(a, b, on="y", how=how)
> await list_eq(joined, pd.merge(A, B, on="y", how=how))
distributed/shuffle/tests/test_merge.py:189:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/shuffle/tests/test_merge.py:35: in list_eq
a = await c.compute(a) if isinstance(a, dd.DataFrame) else a
distributed/client.py:3669: in compute
dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = Merge(ef2ce9b)
def _layer(self) -> dict:
from distributed.shuffle._core import ShuffleId, barrier_key
from distributed.shuffle._merge import merge_unpack
> from distributed.shuffle._shuffle import shuffle_barrier
E ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
7 out of 10 runs failed: test_merge[True-right] (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49357', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
x y
npartitions=2
0 int64 int64
4 ... ...
5 ... ...
Dask Name: from_pd_divs, 1 expression
Expr=df
b = Dask DataFrame Structure:
y z
npartitions=2
0 int64 int64
2 ... ...
5 ... ...
Dask Name: from_pd_divs, 1 expression
Expr=df
how = 'right', disk = True
@pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
@pytest.mark.parametrize("disk", [True, False])
@gen_cluster(client=True)
async def test_merge(c, s, a, b, how, disk):
A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
a = dd.repartition(A, [0, 4, 5])
B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
b = dd.repartition(B, [0, 2, 5])
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
with dask.config.set({"distributed.p2p.storage.disk": disk}):
joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
res = await c.compute(joined)
assert_eq(
res,
pd.merge(A, B, left_index=True, right_index=True, how=how),
)
joined = dd.merge(a, b, on="y", how=how)
> await list_eq(joined, pd.merge(A, B, on="y", how=how))
distributed/shuffle/tests/test_merge.py:189:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/shuffle/tests/test_merge.py:35: in list_eq
a = await c.compute(a) if isinstance(a, dd.DataFrame) else a
distributed/client.py:3669: in compute
dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = Merge(12ef2fe)
def _layer(self) -> dict:
from distributed.shuffle._core import ShuffleId, barrier_key
from distributed.shuffle._merge import merge_unpack
> from distributed.shuffle._shuffle import shuffle_barrier
E ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
7 out of 10 runs failed: test_merge[False-inner] (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49370', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
x y
npartitions=2
0 int64 int64
4 ... ...
5 ... ...
Dask Name: from_pd_divs, 1 expression
Expr=df
b = Dask DataFrame Structure:
y z
npartitions=2
0 int64 int64
2 ... ...
5 ... ...
Dask Name: from_pd_divs, 1 expression
Expr=df
how = 'inner', disk = False
@pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
@pytest.mark.parametrize("disk", [True, False])
@gen_cluster(client=True)
async def test_merge(c, s, a, b, how, disk):
A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
a = dd.repartition(A, [0, 4, 5])
B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
b = dd.repartition(B, [0, 2, 5])
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
with dask.config.set({"distributed.p2p.storage.disk": disk}):
joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
res = await c.compute(joined)
assert_eq(
res,
pd.merge(A, B, left_index=True, right_index=True, how=how),
)
joined = dd.merge(a, b, on="y", how=how)
> await list_eq(joined, pd.merge(A, B, on="y", how=how))
distributed/shuffle/tests/test_merge.py:189:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/shuffle/tests/test_merge.py:35: in list_eq
a = await c.compute(a) if isinstance(a, dd.DataFrame) else a
distributed/client.py:3669: in compute
dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = Merge(87460a9)
def _layer(self) -> dict:
from distributed.shuffle._core import ShuffleId, barrier_key
from distributed.shuffle._merge import merge_unpack
> from distributed.shuffle._shuffle import shuffle_barrier
E ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
7 out of 10 runs failed: test_merge[False-outer] (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49383', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
x y
npartitions=2
0 int64 int64
4 ... ...
5 ... ...
Dask Name: from_pd_divs, 1 expression
Expr=df
b = Dask DataFrame Structure:
y z
npartitions=2
0 int64 int64
2 ... ...
5 ... ...
Dask Name: from_pd_divs, 1 expression
Expr=df
how = 'outer', disk = False
@pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
@pytest.mark.parametrize("disk", [True, False])
@gen_cluster(client=True)
async def test_merge(c, s, a, b, how, disk):
A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
a = dd.repartition(A, [0, 4, 5])
B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
b = dd.repartition(B, [0, 2, 5])
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
with dask.config.set({"distributed.p2p.storage.disk": disk}):
joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
res = await c.compute(joined)
assert_eq(
res,
pd.merge(A, B, left_index=True, right_index=True, how=how),
)
joined = dd.merge(a, b, on="y", how=how)
> await list_eq(joined, pd.merge(A, B, on="y", how=how))
distributed/shuffle/tests/test_merge.py:189:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/shuffle/tests/test_merge.py:35: in list_eq
a = await c.compute(a) if isinstance(a, dd.DataFrame) else a
distributed/client.py:3669: in compute
dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = Merge(961ff9e)
def _layer(self) -> dict:
from distributed.shuffle._core import ShuffleId, barrier_key
from distributed.shuffle._merge import merge_unpack
> from distributed.shuffle._shuffle import shuffle_barrier
E ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
7 out of 10 runs failed: test_merge[False-left] (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49396', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
x y
npartitions=2
0 int64 int64
4 ... ...
5 ... ...
Dask Name: from_pd_divs, 1 expression
Expr=df
b = Dask DataFrame Structure:
y z
npartitions=2
0 int64 int64
2 ... ...
5 ... ...
Dask Name: from_pd_divs, 1 expression
Expr=df
how = 'left', disk = False
@pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
@pytest.mark.parametrize("disk", [True, False])
@gen_cluster(client=True)
async def test_merge(c, s, a, b, how, disk):
A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
a = dd.repartition(A, [0, 4, 5])
B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
b = dd.repartition(B, [0, 2, 5])
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
with dask.config.set({"distributed.p2p.storage.disk": disk}):
joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
res = await c.compute(joined)
assert_eq(
res,
pd.merge(A, B, left_index=True, right_index=True, how=how),
)
joined = dd.merge(a, b, on="y", how=how)
> await list_eq(joined, pd.merge(A, B, on="y", how=how))
distributed/shuffle/tests/test_merge.py:189:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/shuffle/tests/test_merge.py:35: in list_eq
a = await c.compute(a) if isinstance(a, dd.DataFrame) else a
distributed/client.py:3669: in compute
dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = Merge(ef2ce9b)
def _layer(self) -> dict:
from distributed.shuffle._core import ShuffleId, barrier_key
from distributed.shuffle._merge import merge_unpack
> from distributed.shuffle._shuffle import shuffle_barrier
E ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
7 out of 10 runs failed: test_merge[False-right] (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49409', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
x y
npartitions=2
0 int64 int64
4 ... ...
5 ... ...
Dask Name: from_pd_divs, 1 expression
Expr=df
b = Dask DataFrame Structure:
y z
npartitions=2
0 int64 int64
2 ... ...
5 ... ...
Dask Name: from_pd_divs, 1 expression
Expr=df
how = 'right', disk = False
@pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
@pytest.mark.parametrize("disk", [True, False])
@gen_cluster(client=True)
async def test_merge(c, s, a, b, how, disk):
A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
a = dd.repartition(A, [0, 4, 5])
B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
b = dd.repartition(B, [0, 2, 5])
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
with dask.config.set({"distributed.p2p.storage.disk": disk}):
joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
res = await c.compute(joined)
assert_eq(
res,
pd.merge(A, B, left_index=True, right_index=True, how=how),
)
joined = dd.merge(a, b, on="y", how=how)
> await list_eq(joined, pd.merge(A, B, on="y", how=how))
distributed/shuffle/tests/test_merge.py:189:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/shuffle/tests/test_merge.py:35: in list_eq
a = await c.compute(a) if isinstance(a, dd.DataFrame) else a
distributed/client.py:3669: in compute
dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = Merge(12ef2fe)
def _layer(self) -> dict:
from distributed.shuffle._core import ShuffleId, barrier_key
from distributed.shuffle._merge import merge_unpack
> from distributed.shuffle._shuffle import shuffle_barrier
E ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
7 out of 10 runs failed: test_merge_by_multiple_columns[inner] (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49422', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:49423', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:49426', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'inner'
@pytest.mark.slow
@gen_cluster(client=True, timeout=120)
@pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
async def test_merge_by_multiple_columns(c, s, a, b, how):
# warnings here from pandas
pdf1l = pd.DataFrame(
{
"a": list("abcdefghij"),
"b": list("abcdefghij"),
"c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
},
index=list("abcdefghij"),
)
pdf1r = pd.DataFrame(
{
"d": list("abcdefghij"),
"e": list("abcdefghij"),
"f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
},
index=list("abcdefghij"),
)
pdf2l = pd.DataFrame(
{
"a": list("abcdeabcde"),
"b": list("abcabcabca"),
"c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
},
index=list("abcdefghij"),
)
pdf2r = pd.DataFrame(
{
"d": list("edcbaedcba"),
"e": list("aaabbbcccd"),
"f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
},
index=list("fghijklmno"),
)
pdf3l = pd.DataFrame(
{
"a": list("aaaaaaaaaa"),
"b": list("aaaaaaaaaa"),
"c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
},
index=list("abcdefghij"),
)
pdf3r = pd.DataFrame(
{
"d": list("aaabbbccaa"),
"e": list("abbbbbbbbb"),
"f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
},
index=list("ABCDEFGHIJ"),
)
for pdl, pdr in [(pdf1l, pdf1r), (pdf2l, pdf2r), (pdf3l, pdf3r)]:
for lpart, rpart in [(2, 2), (3, 2), (2, 3)]:
ddl = dd.from_pandas(pdl, lpart)
ddr = dd.from_pandas(pdr, rpart)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
expected = pdl.join(pdr, how=how)
assert_eq(
await c.compute(ddl.join(ddr, how=how)),
expected,
# FIXME: There's an discrepancy with an empty index for
# pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
# Temporarily avoid index check until the discrepancy is fixed.
check_index=not expected.index.empty,
)
expected = pdr.join(pdl, how=how)
assert_eq(
await c.compute(ddr.join(ddl, how=how)),
expected,
# FIXME: There's an discrepancy with an empty index for
# pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
# Temporarily avoid index check until the discrepancy is fixed.
check_index=not expected.index.empty,
)
expected = pd.merge(
pdl, pdr, how=how, left_index=True, right_index=True
)
assert_eq(
await c.compute(
dd.merge(
ddl,
ddr,
how=how,
left_index=True,
right_index=True,
)
),
expected,
# FIXME: There's an discrepancy with an empty index for
# pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
# Temporarily avoid index check until the discrepancy is fixed.
check_index=not expected.index.empty,
)
expected = pd.merge(
pdr, pdl, how=how, left_index=True, right_index=True
)
assert_eq(
await c.compute(
dd.merge(
ddr,
ddl,
how=how,
left_index=True,
right_index=True,
)
),
expected,
# FIXME: There's an discrepancy with an empty index for
# pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
# Temporarily avoid index check until the discrepancy is fixed.
check_index=not expected.index.empty,
)
# hash join
> await list_eq(
dd.merge(ddl, ddr, how=how, left_on="a", right_on="d"),
pd.merge(pdl, pdr, how=how, left_on="a", right_on="d"),
)
distributed/shuffle/tests/test_merge.py:351:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/shuffle/tests/test_merge.py:35: in list_eq
a = await c.compute(a) if isinstance(a, dd.DataFrame) else a
distributed/client.py:3669: in compute
dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = Merge(608e3ec)
def _layer(self) -> dict:
from distributed.shuffle._core import ShuffleId, barrier_key
from distributed.shuffle._merge import merge_unpack
> from distributed.shuffle._shuffle import shuffle_barrier
E ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
7 out of 10 runs failed: test_merge_by_multiple_columns[outer] (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49435', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:49436', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:49439', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'outer'
@pytest.mark.slow
@gen_cluster(client=True, timeout=120)
@pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
async def test_merge_by_multiple_columns(c, s, a, b, how):
# warnings here from pandas
pdf1l = pd.DataFrame(
{
"a": list("abcdefghij"),
"b": list("abcdefghij"),
"c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
},
index=list("abcdefghij"),
)
pdf1r = pd.DataFrame(
{
"d": list("abcdefghij"),
"e": list("abcdefghij"),
"f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
},
index=list("abcdefghij"),
)
pdf2l = pd.DataFrame(
{
"a": list("abcdeabcde"),
"b": list("abcabcabca"),
"c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
},
index=list("abcdefghij"),
)
pdf2r = pd.DataFrame(
{
"d": list("edcbaedcba"),
"e": list("aaabbbcccd"),
"f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
},
index=list("fghijklmno"),
)
pdf3l = pd.DataFrame(
{
"a": list("aaaaaaaaaa"),
"b": list("aaaaaaaaaa"),
"c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
},
index=list("abcdefghij"),
)
pdf3r = pd.DataFrame(
{
"d": list("aaabbbccaa"),
"e": list("abbbbbbbbb"),
"f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
},
index=list("ABCDEFGHIJ"),
)
for pdl, pdr in [(pdf1l, pdf1r), (pdf2l, pdf2r), (pdf3l, pdf3r)]:
for lpart, rpart in [(2, 2), (3, 2), (2, 3)]:
ddl = dd.from_pandas(pdl, lpart)
ddr = dd.from_pandas(pdr, rpart)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
expected = pdl.join(pdr, how=how)
assert_eq(
await c.compute(ddl.join(ddr, how=how)),
expected,
# FIXME: There's an discrepancy with an empty index for
# pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
# Temporarily avoid index check until the discrepancy is fixed.
check_index=not expected.index.empty,
)
expected = pdr.join(pdl, how=how)
assert_eq(
await c.compute(ddr.join(ddl, how=how)),
expected,
# FIXME: There's an discrepancy with an empty index for
# pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
# Temporarily avoid index check until the discrepancy is fixed.
check_index=not expected.index.empty,
)
expected = pd.merge(
pdl, pdr, how=how, left_index=True, right_index=True
)
assert_eq(
await c.compute(
dd.merge(
ddl,
ddr,
how=how,
left_index=True,
right_index=True,
)
),
expected,
# FIXME: There's an discrepancy with an empty index for
# pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
# Temporarily avoid index check until the discrepancy is fixed.
check_index=not expected.index.empty,
)
expected = pd.merge(
pdr, pdl, how=how, left_index=True, right_index=True
)
assert_eq(
await c.compute(
dd.merge(
ddr,
ddl,
how=how,
left_index=True,
right_index=True,
)
),
expected,
# FIXME: There's an discrepancy with an empty index for
# pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
# Temporarily avoid index check until the discrepancy is fixed.
check_index=not expected.index.empty,
)
# hash join
> await list_eq(
dd.merge(ddl, ddr, how=how, left_on="a", right_on="d"),
pd.merge(pdl, pdr, how=how, left_on="a", right_on="d"),
)
distributed/shuffle/tests/test_merge.py:351:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/shuffle/tests/test_merge.py:35: in list_eq
a = await c.compute(a) if isinstance(a, dd.DataFrame) else a
distributed/client.py:3669: in compute
dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = Merge(0d1f7bd)
def _layer(self) -> dict:
from distributed.shuffle._core import ShuffleId, barrier_key
from distributed.shuffle._merge import merge_unpack
> from distributed.shuffle._shuffle import shuffle_barrier
E ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
7 out of 10 runs failed: test_merge_by_multiple_columns[left] (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49448', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:49449', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:49452', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'left'
@pytest.mark.slow
@gen_cluster(client=True, timeout=120)
@pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
async def test_merge_by_multiple_columns(c, s, a, b, how):
# warnings here from pandas
pdf1l = pd.DataFrame(
{
"a": list("abcdefghij"),
"b": list("abcdefghij"),
"c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
},
index=list("abcdefghij"),
)
pdf1r = pd.DataFrame(
{
"d": list("abcdefghij"),
"e": list("abcdefghij"),
"f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
},
index=list("abcdefghij"),
)
pdf2l = pd.DataFrame(
{
"a": list("abcdeabcde"),
"b": list("abcabcabca"),
"c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
},
index=list("abcdefghij"),
)
pdf2r = pd.DataFrame(
{
"d": list("edcbaedcba"),
"e": list("aaabbbcccd"),
"f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
},
index=list("fghijklmno"),
)
pdf3l = pd.DataFrame(
{
"a": list("aaaaaaaaaa"),
"b": list("aaaaaaaaaa"),
"c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
},
index=list("abcdefghij"),
)
pdf3r = pd.DataFrame(
{
"d": list("aaabbbccaa"),
"e": list("abbbbbbbbb"),
"f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
},
index=list("ABCDEFGHIJ"),
)
for pdl, pdr in [(pdf1l, pdf1r), (pdf2l, pdf2r), (pdf3l, pdf3r)]:
for lpart, rpart in [(2, 2), (3, 2), (2, 3)]:
ddl = dd.from_pandas(pdl, lpart)
ddr = dd.from_pandas(pdr, rpart)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
expected = pdl.join(pdr, how=how)
assert_eq(
await c.compute(ddl.join(ddr, how=how)),
expected,
# FIXME: There's an discrepancy with an empty index for
# pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
# Temporarily avoid index check until the discrepancy is fixed.
check_index=not expected.index.empty,
)
expected = pdr.join(pdl, how=how)
assert_eq(
await c.compute(ddr.join(ddl, how=how)),
expected,
# FIXME: There's an discrepancy with an empty index for
# pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
# Temporarily avoid index check until the discrepancy is fixed.
check_index=not expected.index.empty,
)
expected = pd.merge(
pdl, pdr, how=how, left_index=True, right_index=True
)
assert_eq(
await c.compute(
dd.merge(
ddl,
ddr,
how=how,
left_index=True,
right_index=True,
)
),
expected,
# FIXME: There's an discrepancy with an empty index for
# pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
# Temporarily avoid index check until the discrepancy is fixed.
check_index=not expected.index.empty,
)
expected = pd.merge(
pdr, pdl, how=how, left_index=True, right_index=True
)
assert_eq(
await c.compute(
dd.merge(
ddr,
ddl,
how=how,
left_index=True,
right_index=True,
)
),
expected,
# FIXME: There's an discrepancy with an empty index for
# pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
# Temporarily avoid index check until the discrepancy is fixed.
check_index=not expected.index.empty,
)
# hash join
> await list_eq(
dd.merge(ddl, ddr, how=how, left_on="a", right_on="d"),
pd.merge(pdl, pdr, how=how, left_on="a", right_on="d"),
)
distributed/shuffle/tests/test_merge.py:351:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/shuffle/tests/test_merge.py:35: in list_eq
a = await c.compute(a) if isinstance(a, dd.DataFrame) else a
distributed/client.py:3669: in compute
dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = Merge(034eb18)
def _layer(self) -> dict:
from distributed.shuffle._core import ShuffleId, barrier_key
from distributed.shuffle._merge import merge_unpack
> from distributed.shuffle._shuffle import shuffle_barrier
E ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
7 out of 10 runs failed: test_merge_by_multiple_columns[right] (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49461', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:49462', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:49465', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'right'
@pytest.mark.slow
@gen_cluster(client=True, timeout=120)
@pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
async def test_merge_by_multiple_columns(c, s, a, b, how):
# warnings here from pandas
pdf1l = pd.DataFrame(
{
"a": list("abcdefghij"),
"b": list("abcdefghij"),
"c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
},
index=list("abcdefghij"),
)
pdf1r = pd.DataFrame(
{
"d": list("abcdefghij"),
"e": list("abcdefghij"),
"f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
},
index=list("abcdefghij"),
)
pdf2l = pd.DataFrame(
{
"a": list("abcdeabcde"),
"b": list("abcabcabca"),
"c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
},
index=list("abcdefghij"),
)
pdf2r = pd.DataFrame(
{
"d": list("edcbaedcba"),
"e": list("aaabbbcccd"),
"f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
},
index=list("fghijklmno"),
)
pdf3l = pd.DataFrame(
{
"a": list("aaaaaaaaaa"),
"b": list("aaaaaaaaaa"),
"c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
},
index=list("abcdefghij"),
)
pdf3r = pd.DataFrame(
{
"d": list("aaabbbccaa"),
"e": list("abbbbbbbbb"),
"f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
},
index=list("ABCDEFGHIJ"),
)
for pdl, pdr in [(pdf1l, pdf1r), (pdf2l, pdf2r), (pdf3l, pdf3r)]:
for lpart, rpart in [(2, 2), (3, 2), (2, 3)]:
ddl = dd.from_pandas(pdl, lpart)
ddr = dd.from_pandas(pdr, rpart)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
expected = pdl.join(pdr, how=how)
assert_eq(
await c.compute(ddl.join(ddr, how=how)),
expected,
# FIXME: There's an discrepancy with an empty index for
# pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
# Temporarily avoid index check until the discrepancy is fixed.
check_index=not expected.index.empty,
)
expected = pdr.join(pdl, how=how)
assert_eq(
await c.compute(ddr.join(ddl, how=how)),
expected,
# FIXME: There's an discrepancy with an empty index for
# pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
# Temporarily avoid index check until the discrepancy is fixed.
check_index=not expected.index.empty,
)
expected = pd.merge(
pdl, pdr, how=how, left_index=True, right_index=True
)
assert_eq(
await c.compute(
dd.merge(
ddl,
ddr,
how=how,
left_index=True,
right_index=True,
)
),
expected,
# FIXME: There's an discrepancy with an empty index for
# pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
# Temporarily avoid index check until the discrepancy is fixed.
check_index=not expected.index.empty,
)
expected = pd.merge(
pdr, pdl, how=how, left_index=True, right_index=True
)
assert_eq(
await c.compute(
dd.merge(
ddr,
ddl,
how=how,
left_index=True,
right_index=True,
)
),
expected,
# FIXME: There's an discrepancy with an empty index for
# pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
# Temporarily avoid index check until the discrepancy is fixed.
check_index=not expected.index.empty,
)
# hash join
> await list_eq(
dd.merge(ddl, ddr, how=how, left_on="a", right_on="d"),
pd.merge(pdl, pdr, how=how, left_on="a", right_on="d"),
)
distributed/shuffle/tests/test_merge.py:351:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/shuffle/tests/test_merge.py:35: in list_eq
a = await c.compute(a) if isinstance(a, dd.DataFrame) else a
distributed/client.py:3669: in compute
dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = Merge(b02c812)
def _layer(self) -> dict:
from distributed.shuffle._core import ShuffleId, barrier_key
from distributed.shuffle._merge import merge_unpack
> from distributed.shuffle._shuffle import shuffle_barrier
E ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
7 out of 10 runs failed: test_index_merge_p2p[inner] (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49474', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:49475', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:49478', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'inner'
@pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
@gen_cluster(client=True)
async def test_index_merge_p2p(c, s, a, b, how):
pdf_left = pd.DataFrame({"a": [4, 2, 3] * 10, "b": 1}).set_index("a")
pdf_right = pd.DataFrame({"a": [4, 2, 3] * 10, "c": 1})
left = dd.from_pandas(pdf_left, npartitions=5, sort=False)
right = dd.from_pandas(pdf_right, npartitions=6)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
assert_eq(
> await c.compute(left.merge(right, how=how, left_index=True, right_on="a")),
pdf_left.merge(pdf_right, how=how, left_index=True, right_on="a"),
)
distributed/shuffle/tests/test_merge.py:388:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/client.py:3669: in compute
dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = Merge(9585fc2)
def _layer(self) -> dict:
from distributed.shuffle._core import ShuffleId, barrier_key
from distributed.shuffle._merge import merge_unpack
> from distributed.shuffle._shuffle import shuffle_barrier
E ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
7 out of 10 runs failed: test_index_merge_p2p[left] (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49484', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:49485', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:49488', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'left'
@pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
@gen_cluster(client=True)
async def test_index_merge_p2p(c, s, a, b, how):
pdf_left = pd.DataFrame({"a": [4, 2, 3] * 10, "b": 1}).set_index("a")
pdf_right = pd.DataFrame({"a": [4, 2, 3] * 10, "c": 1})
left = dd.from_pandas(pdf_left, npartitions=5, sort=False)
right = dd.from_pandas(pdf_right, npartitions=6)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
assert_eq(
> await c.compute(left.merge(right, how=how, left_index=True, right_on="a")),
pdf_left.merge(pdf_right, how=how, left_index=True, right_on="a"),
)
distributed/shuffle/tests/test_merge.py:388:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/client.py:3669: in compute
dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = Merge(f429166)
def _layer(self) -> dict:
from distributed.shuffle._core import ShuffleId, barrier_key
from distributed.shuffle._merge import merge_unpack
> from distributed.shuffle._shuffle import shuffle_barrier
E ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
7 out of 10 runs failed: test_index_merge_p2p[right] (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49494', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:49495', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:49498', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'right'
@pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
@gen_cluster(client=True)
async def test_index_merge_p2p(c, s, a, b, how):
pdf_left = pd.DataFrame({"a": [4, 2, 3] * 10, "b": 1}).set_index("a")
pdf_right = pd.DataFrame({"a": [4, 2, 3] * 10, "c": 1})
left = dd.from_pandas(pdf_left, npartitions=5, sort=False)
right = dd.from_pandas(pdf_right, npartitions=6)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
assert_eq(
> await c.compute(left.merge(right, how=how, left_index=True, right_on="a")),
pdf_left.merge(pdf_right, how=how, left_index=True, right_on="a"),
)
distributed/shuffle/tests/test_merge.py:388:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/client.py:3669: in compute
dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = Merge(995c063)
def _layer(self) -> dict:
from distributed.shuffle._core import ShuffleId, barrier_key
from distributed.shuffle._merge import merge_unpack
> from distributed.shuffle._shuffle import shuffle_barrier
E ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
7 out of 10 runs failed: test_index_merge_p2p[outer] (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49504', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:49505', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:49508', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'outer'
@pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
@gen_cluster(client=True)
async def test_index_merge_p2p(c, s, a, b, how):
pdf_left = pd.DataFrame({"a": [4, 2, 3] * 10, "b": 1}).set_index("a")
pdf_right = pd.DataFrame({"a": [4, 2, 3] * 10, "c": 1})
left = dd.from_pandas(pdf_left, npartitions=5, sort=False)
right = dd.from_pandas(pdf_right, npartitions=6)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
assert_eq(
> await c.compute(left.merge(right, how=how, left_index=True, right_on="a")),
pdf_left.merge(pdf_right, how=how, left_index=True, right_on="a"),
)
distributed/shuffle/tests/test_merge.py:388:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/client.py:3669: in compute
dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = Merge(1bf3e4f)
def _layer(self) -> dict:
from distributed.shuffle._core import ShuffleId, barrier_key
from distributed.shuffle._merge import merge_unpack
> from distributed.shuffle._shuffle import shuffle_barrier
E ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
7 out of 10 runs failed: test_merge_with_npartitions[4] (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49514', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:49515', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:49518', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
npartitions = 4
@pytest.mark.parametrize("npartitions", [4, 5, 10, 20])
@gen_cluster(client=True)
async def test_merge_with_npartitions(c, s, a, b, npartitions):
pdf = pd.DataFrame({"a": [1, 2, 3, 4] * 10, "b": 1})
left = dd.from_pandas(pdf, npartitions=10)
right = dd.from_pandas(pdf, npartitions=5)
expected = pdf.merge(pdf)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
> result = await c.compute(left.merge(right, npartitions=npartitions))
distributed/shuffle/tests/test_merge.py:408:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/client.py:3669: in compute
dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = Merge(83a374c)
def _layer(self) -> dict:
from distributed.shuffle._core import ShuffleId, barrier_key
from distributed.shuffle._merge import merge_unpack
> from distributed.shuffle._shuffle import shuffle_barrier
E ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
7 out of 10 runs failed: test_merge_with_npartitions[5] (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49524', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:49525', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:49528', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
npartitions = 5
@pytest.mark.parametrize("npartitions", [4, 5, 10, 20])
@gen_cluster(client=True)
async def test_merge_with_npartitions(c, s, a, b, npartitions):
pdf = pd.DataFrame({"a": [1, 2, 3, 4] * 10, "b": 1})
left = dd.from_pandas(pdf, npartitions=10)
right = dd.from_pandas(pdf, npartitions=5)
expected = pdf.merge(pdf)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
> result = await c.compute(left.merge(right, npartitions=npartitions))
distributed/shuffle/tests/test_merge.py:408:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/client.py:3669: in compute
dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = Merge(a2d93ed)
def _layer(self) -> dict:
from distributed.shuffle._core import ShuffleId, barrier_key
from distributed.shuffle._merge import merge_unpack
> from distributed.shuffle._shuffle import shuffle_barrier
E ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
7 out of 10 runs failed: test_merge_with_npartitions[10] (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49534', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:49535', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:49538', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
npartitions = 10
@pytest.mark.parametrize("npartitions", [4, 5, 10, 20])
@gen_cluster(client=True)
async def test_merge_with_npartitions(c, s, a, b, npartitions):
pdf = pd.DataFrame({"a": [1, 2, 3, 4] * 10, "b": 1})
left = dd.from_pandas(pdf, npartitions=10)
right = dd.from_pandas(pdf, npartitions=5)
expected = pdf.merge(pdf)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
> result = await c.compute(left.merge(right, npartitions=npartitions))
distributed/shuffle/tests/test_merge.py:408:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/client.py:3669: in compute
dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = Merge(5bc138c)
def _layer(self) -> dict:
from distributed.shuffle._core import ShuffleId, barrier_key
from distributed.shuffle._merge import merge_unpack
> from distributed.shuffle._shuffle import shuffle_barrier
E ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError