Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Internal cleanup of P2P code #8907

Merged
merged 7 commits into from
Oct 25, 2024

Revert "CI (REVERT ME)"

d829269
Select commit
Loading
Failed to load commit list.
Sign in for the full log view
Merged

Internal cleanup of P2P code #8907

Revert "CI (REVERT ME)"
d829269
Select commit
Loading
Failed to load commit list.
GitHub Actions / Unit Test Results failed Oct 25, 2024 in 0s

170 fail, 110 skipped, 3 843 pass in 9h 44m 29s

    25 files  ±0      25 suites  ±0   9h 44m 29s ⏱️ - 38m 53s
 4 123 tests ±0   3 843 ✅  -   165    110 💤 ±0    170 ❌ +  165 
47 622 runs  ±0  44 367 ✅  - 1 160  2 087 💤  - 2  1 168 ❌ +1 162 

Results for commit d829269. ± Comparison against earlier commit db0e7a3.

Annotations

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

7 out of 10 runs failed: test_basic_merge[inner] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49258', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 expression
Expr=df
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 expression
Expr=df
how = 'inner'

    @pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
    @gen_cluster(client=True)
    async def test_basic_merge(c, s, a, b, how):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        joined = a.merge(b, left_on="y", right_on="y", how=how)
    
        if dd._dask_expr_enabled():
            # Ensure we're using a hash join
            from dask_expr._merge import HashJoinP2P
    
            assert any(
                isinstance(expr, HashJoinP2P) for expr in joined.optimize()._expr.walk()
            )
    
        expected = pd.merge(A, B, how, "y")
>       await list_eq(joined, expected)

distributed/shuffle/tests/test_merge.py:91: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/tests/test_merge.py:35: in list_eq
    a = await c.compute(a) if isinstance(a, dd.DataFrame) else a
distributed/client.py:3669: in compute
    dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
    return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
    dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
    graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
    return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
    layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Merge(87460a9)

    def _layer(self) -> dict:
        from distributed.shuffle._core import ShuffleId, barrier_key
        from distributed.shuffle._merge import merge_unpack
>       from distributed.shuffle._shuffle import shuffle_barrier
E       ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)

../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

7 out of 10 runs failed: test_basic_merge[left] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49268', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 expression
Expr=df
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 expression
Expr=df
how = 'left'

    @pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
    @gen_cluster(client=True)
    async def test_basic_merge(c, s, a, b, how):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        joined = a.merge(b, left_on="y", right_on="y", how=how)
    
        if dd._dask_expr_enabled():
            # Ensure we're using a hash join
            from dask_expr._merge import HashJoinP2P
    
            assert any(
                isinstance(expr, HashJoinP2P) for expr in joined.optimize()._expr.walk()
            )
    
        expected = pd.merge(A, B, how, "y")
>       await list_eq(joined, expected)

distributed/shuffle/tests/test_merge.py:91: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/tests/test_merge.py:35: in list_eq
    a = await c.compute(a) if isinstance(a, dd.DataFrame) else a
distributed/client.py:3669: in compute
    dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
    return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
    dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
    graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
    return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
    layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Merge(ef2ce9b)

    def _layer(self) -> dict:
        from distributed.shuffle._core import ShuffleId, barrier_key
        from distributed.shuffle._merge import merge_unpack
>       from distributed.shuffle._shuffle import shuffle_barrier
E       ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)

../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

7 out of 10 runs failed: test_basic_merge[right] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49278', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 expression
Expr=df
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 expression
Expr=df
how = 'right'

    @pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
    @gen_cluster(client=True)
    async def test_basic_merge(c, s, a, b, how):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        joined = a.merge(b, left_on="y", right_on="y", how=how)
    
        if dd._dask_expr_enabled():
            # Ensure we're using a hash join
            from dask_expr._merge import HashJoinP2P
    
            assert any(
                isinstance(expr, HashJoinP2P) for expr in joined.optimize()._expr.walk()
            )
    
        expected = pd.merge(A, B, how, "y")
>       await list_eq(joined, expected)

distributed/shuffle/tests/test_merge.py:91: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/tests/test_merge.py:35: in list_eq
    a = await c.compute(a) if isinstance(a, dd.DataFrame) else a
distributed/client.py:3669: in compute
    dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
    return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
    dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
    graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
    return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
    layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Merge(12ef2fe)

    def _layer(self) -> dict:
        from distributed.shuffle._core import ShuffleId, barrier_key
        from distributed.shuffle._merge import merge_unpack
>       from distributed.shuffle._shuffle import shuffle_barrier
E       ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)

../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

7 out of 10 runs failed: test_basic_merge[outer] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49288', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 expression
Expr=df
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 expression
Expr=df
how = 'outer'

    @pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
    @gen_cluster(client=True)
    async def test_basic_merge(c, s, a, b, how):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        joined = a.merge(b, left_on="y", right_on="y", how=how)
    
        if dd._dask_expr_enabled():
            # Ensure we're using a hash join
            from dask_expr._merge import HashJoinP2P
    
            assert any(
                isinstance(expr, HashJoinP2P) for expr in joined.optimize()._expr.walk()
            )
    
        expected = pd.merge(A, B, how, "y")
>       await list_eq(joined, expected)

distributed/shuffle/tests/test_merge.py:91: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/tests/test_merge.py:35: in list_eq
    a = await c.compute(a) if isinstance(a, dd.DataFrame) else a
distributed/client.py:3669: in compute
    dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
    return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
    dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
    graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
    return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
    layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Merge(961ff9e)

    def _layer(self) -> dict:
        from distributed.shuffle._core import ShuffleId, barrier_key
        from distributed.shuffle._merge import merge_unpack
>       from distributed.shuffle._shuffle import shuffle_barrier
E       ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)

../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

7 out of 10 runs failed: test_merge_p2p_shuffle_reused_dataframe_with_different_parameters (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49298', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:49299', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:49302', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>

    @gen_cluster(client=True)
    async def test_merge_p2p_shuffle_reused_dataframe_with_different_parameters(c, s, a, b):
        pdf1 = pd.DataFrame({"a": range(100), "b": range(0, 200, 2)})
        pdf2 = pd.DataFrame({"x": range(200), "y": [1, 2, 3, 4] * 50})
        ddf1 = dd.from_pandas(pdf1, npartitions=5)
        ddf2 = dd.from_pandas(pdf2, npartitions=10)
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            out = (
                ddf1.merge(ddf2, left_on="a", right_on="x")
                # Vary the number of output partitions for the shuffles of dd2
                .repartition(npartitions=20).merge(ddf2, left_on="b", right_on="x")
            )
        # Generate unique shuffle IDs if the input frame is the same but
        # parameters differ. Reusing shuffles in merges is dangerous because of the
        # required coordination and complexity introduced through dynamic clusters.
>       assert sum(id_from_key(k) is not None for k in out.dask) == 4

distributed/shuffle/tests/test_merge.py:125: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:599: in dask
    return self.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
    return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
    layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Merge(ecc0f84)

    def _layer(self) -> dict:
        from distributed.shuffle._core import ShuffleId, barrier_key
        from distributed.shuffle._merge import merge_unpack
>       from distributed.shuffle._shuffle import shuffle_barrier
E       ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)

../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

7 out of 10 runs failed: test_merge_p2p_shuffle_reused_dataframe_with_same_parameters (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49308', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:49309', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:49312', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>

    @gen_cluster(client=True)
    async def test_merge_p2p_shuffle_reused_dataframe_with_same_parameters(c, s, a, b):
        pdf1 = pd.DataFrame({"a": range(100), "b": range(0, 200, 2)})
        pdf2 = pd.DataFrame({"x": range(200), "y": [1, 2, 3, 4] * 50})
        ddf1 = dd.from_pandas(pdf1, npartitions=5)
        ddf2 = dd.from_pandas(pdf2, npartitions=10)
    
        # This performs two shuffles:
        #   * ddf1 is shuffled on `a`
        #   * ddf2 is shuffled on `x`
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            ddf3 = ddf1.merge(
                ddf2,
                left_on="a",
                right_on="x",
            )
    
        # This performs one shuffle:
        #   * ddf3 is shuffled on `b`
        # We can reuse the shuffle of dd2 on `x` from the previous merge.
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            out = ddf2.merge(
                ddf3,
                left_on="x",
                right_on="b",
            )
        # Generate unique shuffle IDs if the input frame is the same and all its
        # parameters match. Reusing shuffles in merges is dangerous because of the
        # required coordination and complexity introduced through dynamic clusters.
>       assert sum(id_from_key(k) is not None for k in out.dask) == 4

distributed/shuffle/tests/test_merge.py:162: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:599: in dask
    return self.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
    return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
    layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Merge(c826043)

    def _layer(self) -> dict:
        from distributed.shuffle._core import ShuffleId, barrier_key
        from distributed.shuffle._merge import merge_unpack
>       from distributed.shuffle._shuffle import shuffle_barrier
E       ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)

../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

7 out of 10 runs failed: test_merge[True-inner] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49318', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 expression
Expr=df
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 expression
Expr=df
how = 'inner', disk = True

    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    @pytest.mark.parametrize("disk", [True, False])
    @gen_cluster(client=True)
    async def test_merge(c, s, a, b, how, disk):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            with dask.config.set({"distributed.p2p.storage.disk": disk}):
                joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
            res = await c.compute(joined)
            assert_eq(
                res,
                pd.merge(A, B, left_index=True, right_index=True, how=how),
            )
            joined = dd.merge(a, b, on="y", how=how)
>           await list_eq(joined, pd.merge(A, B, on="y", how=how))

distributed/shuffle/tests/test_merge.py:189: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/tests/test_merge.py:35: in list_eq
    a = await c.compute(a) if isinstance(a, dd.DataFrame) else a
distributed/client.py:3669: in compute
    dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
    return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
    dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
    graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
    return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
    layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Merge(87460a9)

    def _layer(self) -> dict:
        from distributed.shuffle._core import ShuffleId, barrier_key
        from distributed.shuffle._merge import merge_unpack
>       from distributed.shuffle._shuffle import shuffle_barrier
E       ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)

../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

7 out of 10 runs failed: test_merge[True-outer] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49331', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 expression
Expr=df
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 expression
Expr=df
how = 'outer', disk = True

    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    @pytest.mark.parametrize("disk", [True, False])
    @gen_cluster(client=True)
    async def test_merge(c, s, a, b, how, disk):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            with dask.config.set({"distributed.p2p.storage.disk": disk}):
                joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
            res = await c.compute(joined)
            assert_eq(
                res,
                pd.merge(A, B, left_index=True, right_index=True, how=how),
            )
            joined = dd.merge(a, b, on="y", how=how)
>           await list_eq(joined, pd.merge(A, B, on="y", how=how))

distributed/shuffle/tests/test_merge.py:189: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/tests/test_merge.py:35: in list_eq
    a = await c.compute(a) if isinstance(a, dd.DataFrame) else a
distributed/client.py:3669: in compute
    dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
    return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
    dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
    graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
    return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
    layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Merge(961ff9e)

    def _layer(self) -> dict:
        from distributed.shuffle._core import ShuffleId, barrier_key
        from distributed.shuffle._merge import merge_unpack
>       from distributed.shuffle._shuffle import shuffle_barrier
E       ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)

../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

7 out of 10 runs failed: test_merge[True-left] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49344', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 expression
Expr=df
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 expression
Expr=df
how = 'left', disk = True

    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    @pytest.mark.parametrize("disk", [True, False])
    @gen_cluster(client=True)
    async def test_merge(c, s, a, b, how, disk):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            with dask.config.set({"distributed.p2p.storage.disk": disk}):
                joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
            res = await c.compute(joined)
            assert_eq(
                res,
                pd.merge(A, B, left_index=True, right_index=True, how=how),
            )
            joined = dd.merge(a, b, on="y", how=how)
>           await list_eq(joined, pd.merge(A, B, on="y", how=how))

distributed/shuffle/tests/test_merge.py:189: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/tests/test_merge.py:35: in list_eq
    a = await c.compute(a) if isinstance(a, dd.DataFrame) else a
distributed/client.py:3669: in compute
    dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
    return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
    dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
    graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
    return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
    layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Merge(ef2ce9b)

    def _layer(self) -> dict:
        from distributed.shuffle._core import ShuffleId, barrier_key
        from distributed.shuffle._merge import merge_unpack
>       from distributed.shuffle._shuffle import shuffle_barrier
E       ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)

../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

7 out of 10 runs failed: test_merge[True-right] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49357', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 expression
Expr=df
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 expression
Expr=df
how = 'right', disk = True

    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    @pytest.mark.parametrize("disk", [True, False])
    @gen_cluster(client=True)
    async def test_merge(c, s, a, b, how, disk):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            with dask.config.set({"distributed.p2p.storage.disk": disk}):
                joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
            res = await c.compute(joined)
            assert_eq(
                res,
                pd.merge(A, B, left_index=True, right_index=True, how=how),
            )
            joined = dd.merge(a, b, on="y", how=how)
>           await list_eq(joined, pd.merge(A, B, on="y", how=how))

distributed/shuffle/tests/test_merge.py:189: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/tests/test_merge.py:35: in list_eq
    a = await c.compute(a) if isinstance(a, dd.DataFrame) else a
distributed/client.py:3669: in compute
    dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
    return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
    dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
    graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
    return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
    layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Merge(12ef2fe)

    def _layer(self) -> dict:
        from distributed.shuffle._core import ShuffleId, barrier_key
        from distributed.shuffle._merge import merge_unpack
>       from distributed.shuffle._shuffle import shuffle_barrier
E       ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)

../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

7 out of 10 runs failed: test_merge[False-inner] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49370', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 expression
Expr=df
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 expression
Expr=df
how = 'inner', disk = False

    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    @pytest.mark.parametrize("disk", [True, False])
    @gen_cluster(client=True)
    async def test_merge(c, s, a, b, how, disk):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            with dask.config.set({"distributed.p2p.storage.disk": disk}):
                joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
            res = await c.compute(joined)
            assert_eq(
                res,
                pd.merge(A, B, left_index=True, right_index=True, how=how),
            )
            joined = dd.merge(a, b, on="y", how=how)
>           await list_eq(joined, pd.merge(A, B, on="y", how=how))

distributed/shuffle/tests/test_merge.py:189: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/tests/test_merge.py:35: in list_eq
    a = await c.compute(a) if isinstance(a, dd.DataFrame) else a
distributed/client.py:3669: in compute
    dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
    return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
    dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
    graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
    return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
    layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Merge(87460a9)

    def _layer(self) -> dict:
        from distributed.shuffle._core import ShuffleId, barrier_key
        from distributed.shuffle._merge import merge_unpack
>       from distributed.shuffle._shuffle import shuffle_barrier
E       ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)

../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

7 out of 10 runs failed: test_merge[False-outer] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49383', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 expression
Expr=df
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 expression
Expr=df
how = 'outer', disk = False

    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    @pytest.mark.parametrize("disk", [True, False])
    @gen_cluster(client=True)
    async def test_merge(c, s, a, b, how, disk):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            with dask.config.set({"distributed.p2p.storage.disk": disk}):
                joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
            res = await c.compute(joined)
            assert_eq(
                res,
                pd.merge(A, B, left_index=True, right_index=True, how=how),
            )
            joined = dd.merge(a, b, on="y", how=how)
>           await list_eq(joined, pd.merge(A, B, on="y", how=how))

distributed/shuffle/tests/test_merge.py:189: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/tests/test_merge.py:35: in list_eq
    a = await c.compute(a) if isinstance(a, dd.DataFrame) else a
distributed/client.py:3669: in compute
    dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
    return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
    dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
    graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
    return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
    layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Merge(961ff9e)

    def _layer(self) -> dict:
        from distributed.shuffle._core import ShuffleId, barrier_key
        from distributed.shuffle._merge import merge_unpack
>       from distributed.shuffle._shuffle import shuffle_barrier
E       ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)

../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

7 out of 10 runs failed: test_merge[False-left] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49396', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 expression
Expr=df
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 expression
Expr=df
how = 'left', disk = False

    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    @pytest.mark.parametrize("disk", [True, False])
    @gen_cluster(client=True)
    async def test_merge(c, s, a, b, how, disk):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            with dask.config.set({"distributed.p2p.storage.disk": disk}):
                joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
            res = await c.compute(joined)
            assert_eq(
                res,
                pd.merge(A, B, left_index=True, right_index=True, how=how),
            )
            joined = dd.merge(a, b, on="y", how=how)
>           await list_eq(joined, pd.merge(A, B, on="y", how=how))

distributed/shuffle/tests/test_merge.py:189: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/tests/test_merge.py:35: in list_eq
    a = await c.compute(a) if isinstance(a, dd.DataFrame) else a
distributed/client.py:3669: in compute
    dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
    return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
    dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
    graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
    return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
    layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Merge(ef2ce9b)

    def _layer(self) -> dict:
        from distributed.shuffle._core import ShuffleId, barrier_key
        from distributed.shuffle._merge import merge_unpack
>       from distributed.shuffle._shuffle import shuffle_barrier
E       ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)

../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

7 out of 10 runs failed: test_merge[False-right] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49409', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 expression
Expr=df
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 expression
Expr=df
how = 'right', disk = False

    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    @pytest.mark.parametrize("disk", [True, False])
    @gen_cluster(client=True)
    async def test_merge(c, s, a, b, how, disk):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            with dask.config.set({"distributed.p2p.storage.disk": disk}):
                joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
            res = await c.compute(joined)
            assert_eq(
                res,
                pd.merge(A, B, left_index=True, right_index=True, how=how),
            )
            joined = dd.merge(a, b, on="y", how=how)
>           await list_eq(joined, pd.merge(A, B, on="y", how=how))

distributed/shuffle/tests/test_merge.py:189: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/tests/test_merge.py:35: in list_eq
    a = await c.compute(a) if isinstance(a, dd.DataFrame) else a
distributed/client.py:3669: in compute
    dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
    return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
    dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
    graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
    return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
    layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Merge(12ef2fe)

    def _layer(self) -> dict:
        from distributed.shuffle._core import ShuffleId, barrier_key
        from distributed.shuffle._merge import merge_unpack
>       from distributed.shuffle._shuffle import shuffle_barrier
E       ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)

../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

7 out of 10 runs failed: test_merge_by_multiple_columns[inner] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49422', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:49423', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:49426', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'inner'

    @pytest.mark.slow
    @gen_cluster(client=True, timeout=120)
    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    async def test_merge_by_multiple_columns(c, s, a, b, how):
        # warnings here from pandas
        pdf1l = pd.DataFrame(
            {
                "a": list("abcdefghij"),
                "b": list("abcdefghij"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf1r = pd.DataFrame(
            {
                "d": list("abcdefghij"),
                "e": list("abcdefghij"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("abcdefghij"),
        )
    
        pdf2l = pd.DataFrame(
            {
                "a": list("abcdeabcde"),
                "b": list("abcabcabca"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf2r = pd.DataFrame(
            {
                "d": list("edcbaedcba"),
                "e": list("aaabbbcccd"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("fghijklmno"),
        )
    
        pdf3l = pd.DataFrame(
            {
                "a": list("aaaaaaaaaa"),
                "b": list("aaaaaaaaaa"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf3r = pd.DataFrame(
            {
                "d": list("aaabbbccaa"),
                "e": list("abbbbbbbbb"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("ABCDEFGHIJ"),
        )
    
        for pdl, pdr in [(pdf1l, pdf1r), (pdf2l, pdf2r), (pdf3l, pdf3r)]:
            for lpart, rpart in [(2, 2), (3, 2), (2, 3)]:
                ddl = dd.from_pandas(pdl, lpart)
                ddr = dd.from_pandas(pdr, rpart)
    
                with dask.config.set({"dataframe.shuffle.method": "p2p"}):
                    expected = pdl.join(pdr, how=how)
                    assert_eq(
                        await c.compute(ddl.join(ddr, how=how)),
                        expected,
                        # FIXME: There's an discrepancy with an empty index for
                        # pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
                        # Temporarily avoid index check until the discrepancy is fixed.
                        check_index=not expected.index.empty,
                    )
    
                    expected = pdr.join(pdl, how=how)
                    assert_eq(
                        await c.compute(ddr.join(ddl, how=how)),
                        expected,
                        # FIXME: There's an discrepancy with an empty index for
                        # pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
                        # Temporarily avoid index check until the discrepancy is fixed.
                        check_index=not expected.index.empty,
                    )
    
                    expected = pd.merge(
                        pdl, pdr, how=how, left_index=True, right_index=True
                    )
                    assert_eq(
                        await c.compute(
                            dd.merge(
                                ddl,
                                ddr,
                                how=how,
                                left_index=True,
                                right_index=True,
                            )
                        ),
                        expected,
                        # FIXME: There's an discrepancy with an empty index for
                        # pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
                        # Temporarily avoid index check until the discrepancy is fixed.
                        check_index=not expected.index.empty,
                    )
    
                    expected = pd.merge(
                        pdr, pdl, how=how, left_index=True, right_index=True
                    )
                    assert_eq(
                        await c.compute(
                            dd.merge(
                                ddr,
                                ddl,
                                how=how,
                                left_index=True,
                                right_index=True,
                            )
                        ),
                        expected,
                        # FIXME: There's an discrepancy with an empty index for
                        # pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
                        # Temporarily avoid index check until the discrepancy is fixed.
                        check_index=not expected.index.empty,
                    )
    
                    # hash join
>                   await list_eq(
                        dd.merge(ddl, ddr, how=how, left_on="a", right_on="d"),
                        pd.merge(pdl, pdr, how=how, left_on="a", right_on="d"),
                    )

distributed/shuffle/tests/test_merge.py:351: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/tests/test_merge.py:35: in list_eq
    a = await c.compute(a) if isinstance(a, dd.DataFrame) else a
distributed/client.py:3669: in compute
    dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
    return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
    dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
    graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
    return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
    layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Merge(608e3ec)

    def _layer(self) -> dict:
        from distributed.shuffle._core import ShuffleId, barrier_key
        from distributed.shuffle._merge import merge_unpack
>       from distributed.shuffle._shuffle import shuffle_barrier
E       ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)

../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

7 out of 10 runs failed: test_merge_by_multiple_columns[outer] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49435', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:49436', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:49439', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'outer'

    @pytest.mark.slow
    @gen_cluster(client=True, timeout=120)
    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    async def test_merge_by_multiple_columns(c, s, a, b, how):
        # warnings here from pandas
        pdf1l = pd.DataFrame(
            {
                "a": list("abcdefghij"),
                "b": list("abcdefghij"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf1r = pd.DataFrame(
            {
                "d": list("abcdefghij"),
                "e": list("abcdefghij"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("abcdefghij"),
        )
    
        pdf2l = pd.DataFrame(
            {
                "a": list("abcdeabcde"),
                "b": list("abcabcabca"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf2r = pd.DataFrame(
            {
                "d": list("edcbaedcba"),
                "e": list("aaabbbcccd"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("fghijklmno"),
        )
    
        pdf3l = pd.DataFrame(
            {
                "a": list("aaaaaaaaaa"),
                "b": list("aaaaaaaaaa"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf3r = pd.DataFrame(
            {
                "d": list("aaabbbccaa"),
                "e": list("abbbbbbbbb"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("ABCDEFGHIJ"),
        )
    
        for pdl, pdr in [(pdf1l, pdf1r), (pdf2l, pdf2r), (pdf3l, pdf3r)]:
            for lpart, rpart in [(2, 2), (3, 2), (2, 3)]:
                ddl = dd.from_pandas(pdl, lpart)
                ddr = dd.from_pandas(pdr, rpart)
    
                with dask.config.set({"dataframe.shuffle.method": "p2p"}):
                    expected = pdl.join(pdr, how=how)
                    assert_eq(
                        await c.compute(ddl.join(ddr, how=how)),
                        expected,
                        # FIXME: There's an discrepancy with an empty index for
                        # pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
                        # Temporarily avoid index check until the discrepancy is fixed.
                        check_index=not expected.index.empty,
                    )
    
                    expected = pdr.join(pdl, how=how)
                    assert_eq(
                        await c.compute(ddr.join(ddl, how=how)),
                        expected,
                        # FIXME: There's an discrepancy with an empty index for
                        # pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
                        # Temporarily avoid index check until the discrepancy is fixed.
                        check_index=not expected.index.empty,
                    )
    
                    expected = pd.merge(
                        pdl, pdr, how=how, left_index=True, right_index=True
                    )
                    assert_eq(
                        await c.compute(
                            dd.merge(
                                ddl,
                                ddr,
                                how=how,
                                left_index=True,
                                right_index=True,
                            )
                        ),
                        expected,
                        # FIXME: There's an discrepancy with an empty index for
                        # pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
                        # Temporarily avoid index check until the discrepancy is fixed.
                        check_index=not expected.index.empty,
                    )
    
                    expected = pd.merge(
                        pdr, pdl, how=how, left_index=True, right_index=True
                    )
                    assert_eq(
                        await c.compute(
                            dd.merge(
                                ddr,
                                ddl,
                                how=how,
                                left_index=True,
                                right_index=True,
                            )
                        ),
                        expected,
                        # FIXME: There's an discrepancy with an empty index for
                        # pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
                        # Temporarily avoid index check until the discrepancy is fixed.
                        check_index=not expected.index.empty,
                    )
    
                    # hash join
>                   await list_eq(
                        dd.merge(ddl, ddr, how=how, left_on="a", right_on="d"),
                        pd.merge(pdl, pdr, how=how, left_on="a", right_on="d"),
                    )

distributed/shuffle/tests/test_merge.py:351: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/tests/test_merge.py:35: in list_eq
    a = await c.compute(a) if isinstance(a, dd.DataFrame) else a
distributed/client.py:3669: in compute
    dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
    return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
    dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
    graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
    return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
    layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Merge(0d1f7bd)

    def _layer(self) -> dict:
        from distributed.shuffle._core import ShuffleId, barrier_key
        from distributed.shuffle._merge import merge_unpack
>       from distributed.shuffle._shuffle import shuffle_barrier
E       ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)

../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

7 out of 10 runs failed: test_merge_by_multiple_columns[left] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49448', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:49449', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:49452', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'left'

    @pytest.mark.slow
    @gen_cluster(client=True, timeout=120)
    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    async def test_merge_by_multiple_columns(c, s, a, b, how):
        # warnings here from pandas
        pdf1l = pd.DataFrame(
            {
                "a": list("abcdefghij"),
                "b": list("abcdefghij"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf1r = pd.DataFrame(
            {
                "d": list("abcdefghij"),
                "e": list("abcdefghij"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("abcdefghij"),
        )
    
        pdf2l = pd.DataFrame(
            {
                "a": list("abcdeabcde"),
                "b": list("abcabcabca"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf2r = pd.DataFrame(
            {
                "d": list("edcbaedcba"),
                "e": list("aaabbbcccd"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("fghijklmno"),
        )
    
        pdf3l = pd.DataFrame(
            {
                "a": list("aaaaaaaaaa"),
                "b": list("aaaaaaaaaa"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf3r = pd.DataFrame(
            {
                "d": list("aaabbbccaa"),
                "e": list("abbbbbbbbb"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("ABCDEFGHIJ"),
        )
    
        for pdl, pdr in [(pdf1l, pdf1r), (pdf2l, pdf2r), (pdf3l, pdf3r)]:
            for lpart, rpart in [(2, 2), (3, 2), (2, 3)]:
                ddl = dd.from_pandas(pdl, lpart)
                ddr = dd.from_pandas(pdr, rpart)
    
                with dask.config.set({"dataframe.shuffle.method": "p2p"}):
                    expected = pdl.join(pdr, how=how)
                    assert_eq(
                        await c.compute(ddl.join(ddr, how=how)),
                        expected,
                        # FIXME: There's an discrepancy with an empty index for
                        # pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
                        # Temporarily avoid index check until the discrepancy is fixed.
                        check_index=not expected.index.empty,
                    )
    
                    expected = pdr.join(pdl, how=how)
                    assert_eq(
                        await c.compute(ddr.join(ddl, how=how)),
                        expected,
                        # FIXME: There's an discrepancy with an empty index for
                        # pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
                        # Temporarily avoid index check until the discrepancy is fixed.
                        check_index=not expected.index.empty,
                    )
    
                    expected = pd.merge(
                        pdl, pdr, how=how, left_index=True, right_index=True
                    )
                    assert_eq(
                        await c.compute(
                            dd.merge(
                                ddl,
                                ddr,
                                how=how,
                                left_index=True,
                                right_index=True,
                            )
                        ),
                        expected,
                        # FIXME: There's an discrepancy with an empty index for
                        # pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
                        # Temporarily avoid index check until the discrepancy is fixed.
                        check_index=not expected.index.empty,
                    )
    
                    expected = pd.merge(
                        pdr, pdl, how=how, left_index=True, right_index=True
                    )
                    assert_eq(
                        await c.compute(
                            dd.merge(
                                ddr,
                                ddl,
                                how=how,
                                left_index=True,
                                right_index=True,
                            )
                        ),
                        expected,
                        # FIXME: There's an discrepancy with an empty index for
                        # pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
                        # Temporarily avoid index check until the discrepancy is fixed.
                        check_index=not expected.index.empty,
                    )
    
                    # hash join
>                   await list_eq(
                        dd.merge(ddl, ddr, how=how, left_on="a", right_on="d"),
                        pd.merge(pdl, pdr, how=how, left_on="a", right_on="d"),
                    )

distributed/shuffle/tests/test_merge.py:351: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/tests/test_merge.py:35: in list_eq
    a = await c.compute(a) if isinstance(a, dd.DataFrame) else a
distributed/client.py:3669: in compute
    dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
    return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
    dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
    graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
    return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
    layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Merge(034eb18)

    def _layer(self) -> dict:
        from distributed.shuffle._core import ShuffleId, barrier_key
        from distributed.shuffle._merge import merge_unpack
>       from distributed.shuffle._shuffle import shuffle_barrier
E       ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)

../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

7 out of 10 runs failed: test_merge_by_multiple_columns[right] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49461', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:49462', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:49465', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'right'

    @pytest.mark.slow
    @gen_cluster(client=True, timeout=120)
    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    async def test_merge_by_multiple_columns(c, s, a, b, how):
        # warnings here from pandas
        pdf1l = pd.DataFrame(
            {
                "a": list("abcdefghij"),
                "b": list("abcdefghij"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf1r = pd.DataFrame(
            {
                "d": list("abcdefghij"),
                "e": list("abcdefghij"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("abcdefghij"),
        )
    
        pdf2l = pd.DataFrame(
            {
                "a": list("abcdeabcde"),
                "b": list("abcabcabca"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf2r = pd.DataFrame(
            {
                "d": list("edcbaedcba"),
                "e": list("aaabbbcccd"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("fghijklmno"),
        )
    
        pdf3l = pd.DataFrame(
            {
                "a": list("aaaaaaaaaa"),
                "b": list("aaaaaaaaaa"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf3r = pd.DataFrame(
            {
                "d": list("aaabbbccaa"),
                "e": list("abbbbbbbbb"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("ABCDEFGHIJ"),
        )
    
        for pdl, pdr in [(pdf1l, pdf1r), (pdf2l, pdf2r), (pdf3l, pdf3r)]:
            for lpart, rpart in [(2, 2), (3, 2), (2, 3)]:
                ddl = dd.from_pandas(pdl, lpart)
                ddr = dd.from_pandas(pdr, rpart)
    
                with dask.config.set({"dataframe.shuffle.method": "p2p"}):
                    expected = pdl.join(pdr, how=how)
                    assert_eq(
                        await c.compute(ddl.join(ddr, how=how)),
                        expected,
                        # FIXME: There's an discrepancy with an empty index for
                        # pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
                        # Temporarily avoid index check until the discrepancy is fixed.
                        check_index=not expected.index.empty,
                    )
    
                    expected = pdr.join(pdl, how=how)
                    assert_eq(
                        await c.compute(ddr.join(ddl, how=how)),
                        expected,
                        # FIXME: There's an discrepancy with an empty index for
                        # pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
                        # Temporarily avoid index check until the discrepancy is fixed.
                        check_index=not expected.index.empty,
                    )
    
                    expected = pd.merge(
                        pdl, pdr, how=how, left_index=True, right_index=True
                    )
                    assert_eq(
                        await c.compute(
                            dd.merge(
                                ddl,
                                ddr,
                                how=how,
                                left_index=True,
                                right_index=True,
                            )
                        ),
                        expected,
                        # FIXME: There's an discrepancy with an empty index for
                        # pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
                        # Temporarily avoid index check until the discrepancy is fixed.
                        check_index=not expected.index.empty,
                    )
    
                    expected = pd.merge(
                        pdr, pdl, how=how, left_index=True, right_index=True
                    )
                    assert_eq(
                        await c.compute(
                            dd.merge(
                                ddr,
                                ddl,
                                how=how,
                                left_index=True,
                                right_index=True,
                            )
                        ),
                        expected,
                        # FIXME: There's an discrepancy with an empty index for
                        # pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
                        # Temporarily avoid index check until the discrepancy is fixed.
                        check_index=not expected.index.empty,
                    )
    
                    # hash join
>                   await list_eq(
                        dd.merge(ddl, ddr, how=how, left_on="a", right_on="d"),
                        pd.merge(pdl, pdr, how=how, left_on="a", right_on="d"),
                    )

distributed/shuffle/tests/test_merge.py:351: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/tests/test_merge.py:35: in list_eq
    a = await c.compute(a) if isinstance(a, dd.DataFrame) else a
distributed/client.py:3669: in compute
    dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
    return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
    dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
    graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
    return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
    layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Merge(b02c812)

    def _layer(self) -> dict:
        from distributed.shuffle._core import ShuffleId, barrier_key
        from distributed.shuffle._merge import merge_unpack
>       from distributed.shuffle._shuffle import shuffle_barrier
E       ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)

../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

7 out of 10 runs failed: test_index_merge_p2p[inner] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49474', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:49475', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:49478', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'inner'

    @pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
    @gen_cluster(client=True)
    async def test_index_merge_p2p(c, s, a, b, how):
        pdf_left = pd.DataFrame({"a": [4, 2, 3] * 10, "b": 1}).set_index("a")
        pdf_right = pd.DataFrame({"a": [4, 2, 3] * 10, "c": 1})
    
        left = dd.from_pandas(pdf_left, npartitions=5, sort=False)
        right = dd.from_pandas(pdf_right, npartitions=6)
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            assert_eq(
>               await c.compute(left.merge(right, how=how, left_index=True, right_on="a")),
                pdf_left.merge(pdf_right, how=how, left_index=True, right_on="a"),
            )

distributed/shuffle/tests/test_merge.py:388: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:3669: in compute
    dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
    return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
    dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
    graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
    return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
    layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Merge(9585fc2)

    def _layer(self) -> dict:
        from distributed.shuffle._core import ShuffleId, barrier_key
        from distributed.shuffle._merge import merge_unpack
>       from distributed.shuffle._shuffle import shuffle_barrier
E       ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)

../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

7 out of 10 runs failed: test_index_merge_p2p[left] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49484', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:49485', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:49488', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'left'

    @pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
    @gen_cluster(client=True)
    async def test_index_merge_p2p(c, s, a, b, how):
        pdf_left = pd.DataFrame({"a": [4, 2, 3] * 10, "b": 1}).set_index("a")
        pdf_right = pd.DataFrame({"a": [4, 2, 3] * 10, "c": 1})
    
        left = dd.from_pandas(pdf_left, npartitions=5, sort=False)
        right = dd.from_pandas(pdf_right, npartitions=6)
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            assert_eq(
>               await c.compute(left.merge(right, how=how, left_index=True, right_on="a")),
                pdf_left.merge(pdf_right, how=how, left_index=True, right_on="a"),
            )

distributed/shuffle/tests/test_merge.py:388: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:3669: in compute
    dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
    return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
    dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
    graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
    return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
    layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Merge(f429166)

    def _layer(self) -> dict:
        from distributed.shuffle._core import ShuffleId, barrier_key
        from distributed.shuffle._merge import merge_unpack
>       from distributed.shuffle._shuffle import shuffle_barrier
E       ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)

../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

7 out of 10 runs failed: test_index_merge_p2p[right] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49494', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:49495', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:49498', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'right'

    @pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
    @gen_cluster(client=True)
    async def test_index_merge_p2p(c, s, a, b, how):
        pdf_left = pd.DataFrame({"a": [4, 2, 3] * 10, "b": 1}).set_index("a")
        pdf_right = pd.DataFrame({"a": [4, 2, 3] * 10, "c": 1})
    
        left = dd.from_pandas(pdf_left, npartitions=5, sort=False)
        right = dd.from_pandas(pdf_right, npartitions=6)
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            assert_eq(
>               await c.compute(left.merge(right, how=how, left_index=True, right_on="a")),
                pdf_left.merge(pdf_right, how=how, left_index=True, right_on="a"),
            )

distributed/shuffle/tests/test_merge.py:388: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:3669: in compute
    dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
    return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
    dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
    graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
    return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
    layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Merge(995c063)

    def _layer(self) -> dict:
        from distributed.shuffle._core import ShuffleId, barrier_key
        from distributed.shuffle._merge import merge_unpack
>       from distributed.shuffle._shuffle import shuffle_barrier
E       ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)

../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

7 out of 10 runs failed: test_index_merge_p2p[outer] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49504', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:49505', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:49508', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'outer'

    @pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
    @gen_cluster(client=True)
    async def test_index_merge_p2p(c, s, a, b, how):
        pdf_left = pd.DataFrame({"a": [4, 2, 3] * 10, "b": 1}).set_index("a")
        pdf_right = pd.DataFrame({"a": [4, 2, 3] * 10, "c": 1})
    
        left = dd.from_pandas(pdf_left, npartitions=5, sort=False)
        right = dd.from_pandas(pdf_right, npartitions=6)
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            assert_eq(
>               await c.compute(left.merge(right, how=how, left_index=True, right_on="a")),
                pdf_left.merge(pdf_right, how=how, left_index=True, right_on="a"),
            )

distributed/shuffle/tests/test_merge.py:388: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:3669: in compute
    dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
    return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
    dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
    graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
    return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
    layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Merge(1bf3e4f)

    def _layer(self) -> dict:
        from distributed.shuffle._core import ShuffleId, barrier_key
        from distributed.shuffle._merge import merge_unpack
>       from distributed.shuffle._shuffle import shuffle_barrier
E       ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)

../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

7 out of 10 runs failed: test_merge_with_npartitions[4] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49514', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:49515', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:49518', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
npartitions = 4

    @pytest.mark.parametrize("npartitions", [4, 5, 10, 20])
    @gen_cluster(client=True)
    async def test_merge_with_npartitions(c, s, a, b, npartitions):
        pdf = pd.DataFrame({"a": [1, 2, 3, 4] * 10, "b": 1})
    
        left = dd.from_pandas(pdf, npartitions=10)
        right = dd.from_pandas(pdf, npartitions=5)
    
        expected = pdf.merge(pdf)
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
>           result = await c.compute(left.merge(right, npartitions=npartitions))

distributed/shuffle/tests/test_merge.py:408: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:3669: in compute
    dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
    return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
    dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
    graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
    return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
    layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Merge(83a374c)

    def _layer(self) -> dict:
        from distributed.shuffle._core import ShuffleId, barrier_key
        from distributed.shuffle._merge import merge_unpack
>       from distributed.shuffle._shuffle import shuffle_barrier
E       ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)

../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

7 out of 10 runs failed: test_merge_with_npartitions[5] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49524', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:49525', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:49528', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
npartitions = 5

    @pytest.mark.parametrize("npartitions", [4, 5, 10, 20])
    @gen_cluster(client=True)
    async def test_merge_with_npartitions(c, s, a, b, npartitions):
        pdf = pd.DataFrame({"a": [1, 2, 3, 4] * 10, "b": 1})
    
        left = dd.from_pandas(pdf, npartitions=10)
        right = dd.from_pandas(pdf, npartitions=5)
    
        expected = pdf.merge(pdf)
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
>           result = await c.compute(left.merge(right, npartitions=npartitions))

distributed/shuffle/tests/test_merge.py:408: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:3669: in compute
    dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
    return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
    dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
    graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
    return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
    layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Merge(a2d93ed)

    def _layer(self) -> dict:
        from distributed.shuffle._core import ShuffleId, barrier_key
        from distributed.shuffle._merge import merge_unpack
>       from distributed.shuffle._shuffle import shuffle_barrier
E       ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)

../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

7 out of 10 runs failed: test_merge_with_npartitions[10] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
Raw output
ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:49534', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:49535', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:49538', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
npartitions = 10

    @pytest.mark.parametrize("npartitions", [4, 5, 10, 20])
    @gen_cluster(client=True)
    async def test_merge_with_npartitions(c, s, a, b, npartitions):
        pdf = pd.DataFrame({"a": [1, 2, 3, 4] * 10, "b": 1})
    
        left = dd.from_pandas(pdf, npartitions=10)
        right = dd.from_pandas(pdf, npartitions=5)
    
        expected = pdf.merge(pdf)
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
>           result = await c.compute(left.merge(right, npartitions=npartitions))

distributed/shuffle/tests/test_merge.py:408: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:3669: in compute
    dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
distributed/client.py:4931: in collections_to_dsk
    return collections_to_dsk(collections, *args, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:421: in collections_to_dsk
    dsk, keys = _extract_graph_and_keys(val)
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py:447: in _extract_graph_and_keys
    graphs.append(v.__dask_graph__())
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:564: in __dask_graph__
    return out.__dask_graph__()
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_core.py:509: in __dask_graph__
    layers.append(expr._layer())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Merge(5bc138c)

    def _layer(self) -> dict:
        from distributed.shuffle._core import ShuffleId, barrier_key
        from distributed.shuffle._merge import merge_unpack
>       from distributed.shuffle._shuffle import shuffle_barrier
E       ImportError: cannot import name 'shuffle_barrier' from 'distributed.shuffle._shuffle' (/Users/runner/work/distributed/distributed/distributed/shuffle/_shuffle.py)

../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_merge.py:612: ImportError