Skip to content

Commit

Permalink
async_cluster: fix concurrent pipeline (#2280)
Browse files Browse the repository at this point in the history
- each pipeline should create separate stacks for each node
  • Loading branch information
utkarshgupta137 authored Jul 24, 2022
1 parent a304953 commit ae171d1
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 9 deletions.
18 changes: 9 additions & 9 deletions redis/asyncio/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,6 @@ class ClusterNode:
"""

__slots__ = (
"_command_stack",
"_connections",
"_free",
"connection_class",
Expand Down Expand Up @@ -796,7 +795,6 @@ def __init__(

self._connections: List[Connection] = []
self._free: Deque[Connection] = collections.deque(maxlen=self.max_connections)
self._command_stack: List["PipelineCommand"] = []

def __repr__(self) -> str:
return (
Expand Down Expand Up @@ -887,18 +885,18 @@ async def execute_command(self, *args: Any, **kwargs: Any) -> Any:
# Release connection
self._free.append(connection)

async def execute_pipeline(self) -> bool:
async def execute_pipeline(self, commands: List["PipelineCommand"]) -> bool:
# Acquire connection
connection = self.acquire_connection()

# Execute command
await connection.send_packed_command(
connection.pack_commands(cmd.args for cmd in self._command_stack), False
connection.pack_commands(cmd.args for cmd in commands), False
)

# Read responses
ret = False
for cmd in self._command_stack:
for cmd in commands:
try:
cmd.result = await self.parse_response(
connection, cmd.args[0], **cmd.kwargs
Expand Down Expand Up @@ -1365,12 +1363,14 @@ async def _execute(

node = target_nodes[0]
if node.name not in nodes:
nodes[node.name] = node
node._command_stack = []
node._command_stack.append(cmd)
nodes[node.name] = (node, [])
nodes[node.name][1].append(cmd)

errors = await asyncio.gather(
*(asyncio.ensure_future(node.execute_pipeline()) for node in nodes.values())
*(
asyncio.ensure_future(node[0].execute_pipeline(node[1]))
for node in nodes.values()
)
)

if any(errors):
Expand Down
8 changes: 8 additions & 0 deletions tests/test_asyncio/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -2476,3 +2476,11 @@ async def test_readonly_pipeline_from_readonly_client(
executed_on_replica = True
break
assert executed_on_replica

async def test_can_run_concurrent_pipelines(self, r: RedisCluster) -> None:
"""Test that the pipeline can be used concurrently."""
await asyncio.gather(
*(self.test_redis_cluster_pipeline(r) for i in range(100)),
*(self.test_multi_key_operation_with_a_single_slot(r) for i in range(100)),
*(self.test_multi_key_operation_with_multi_slots(r) for i in range(100)),
)

0 comments on commit ae171d1

Please sign in to comment.