diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index 2894004403..3fe3ebc47e 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -755,7 +755,6 @@ class ClusterNode: """ __slots__ = ( - "_command_stack", "_connections", "_free", "connection_class", @@ -796,7 +795,6 @@ def __init__( self._connections: List[Connection] = [] self._free: Deque[Connection] = collections.deque(maxlen=self.max_connections) - self._command_stack: List["PipelineCommand"] = [] def __repr__(self) -> str: return ( @@ -887,18 +885,18 @@ async def execute_command(self, *args: Any, **kwargs: Any) -> Any: # Release connection self._free.append(connection) - async def execute_pipeline(self) -> bool: + async def execute_pipeline(self, commands: List["PipelineCommand"]) -> bool: # Acquire connection connection = self.acquire_connection() # Execute command await connection.send_packed_command( - connection.pack_commands(cmd.args for cmd in self._command_stack), False + connection.pack_commands(cmd.args for cmd in commands), False ) # Read responses ret = False - for cmd in self._command_stack: + for cmd in commands: try: cmd.result = await self.parse_response( connection, cmd.args[0], **cmd.kwargs @@ -1365,12 +1363,14 @@ async def _execute( node = target_nodes[0] if node.name not in nodes: - nodes[node.name] = node - node._command_stack = [] - node._command_stack.append(cmd) + nodes[node.name] = (node, []) + nodes[node.name][1].append(cmd) errors = await asyncio.gather( - *(asyncio.ensure_future(node.execute_pipeline()) for node in nodes.values()) + *( + asyncio.ensure_future(node[0].execute_pipeline(node[1])) + for node in nodes.values() + ) ) if any(errors): diff --git a/tests/test_asyncio/test_cluster.py b/tests/test_asyncio/test_cluster.py index f4ea5cd7ac..0d0ea33db2 100644 --- a/tests/test_asyncio/test_cluster.py +++ b/tests/test_asyncio/test_cluster.py @@ -2476,3 +2476,11 @@ async def test_readonly_pipeline_from_readonly_client( executed_on_replica = True break assert executed_on_replica + + async def test_can_run_concurrent_pipelines(self, r: RedisCluster) -> None: + """Test that the pipeline can be used concurrently.""" + await asyncio.gather( + *(self.test_redis_cluster_pipeline(r) for i in range(100)), + *(self.test_multi_key_operation_with_a_single_slot(r) for i in range(100)), + *(self.test_multi_key_operation_with_multi_slots(r) for i in range(100)), + )