Skip to content

Commit

Permalink
Add cluster support for scripting (#1937)
Browse files Browse the repository at this point in the history
* Add cluster support for scripting

* Fall back to connection_pool.get_encoder if necessary

* Add documentation for cluster-based scripting

* Add test for flush response

Co-authored-by: dvora-h <dvora.heller@redis.com>
  • Loading branch information
jakebarnwell and dvora-h authored Feb 22, 2022
1 parent 1983905 commit e5ac39a
Show file tree
Hide file tree
Showing 8 changed files with 285 additions and 29 deletions.
34 changes: 28 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,8 @@ Monitor object to block until a command is received.
redis-py supports the EVAL, EVALSHA, and SCRIPT commands. However, there
are a number of edge cases that make these commands tedious to use in
real world scenarios. Therefore, redis-py exposes a Script object that
makes scripting much easier to use.
makes scripting much easier to use. (RedisClusters have limited support for
scripting.)

To create a Script instance, use the register_script
function on a client instance passing the Lua code as the first
Expand Down Expand Up @@ -955,14 +956,16 @@ C 3

### Cluster Mode

redis-py is now supports cluster mode and provides a client for
redis-py now supports cluster mode and provides a client for
[Redis Cluster](<https://redis.io/topics/cluster-tutorial>).

The cluster client is based on Grokzen's
[redis-py-cluster](https://github.com/Grokzen/redis-py-cluster), has added bug
fixes, and now supersedes that library. Support for these changes is thanks to
his contributions.

To learn more about Redis Cluster, see
[Redis Cluster specifications](https://redis.io/topics/cluster-spec).

**Create RedisCluster:**

Expand Down Expand Up @@ -1218,10 +1221,29 @@ according to their respective destination nodes. This means that we can not
turn the pipeline commands into one transaction block, because in most cases
they are split up into several smaller pipelines.


See [Redis Cluster tutorial](https://redis.io/topics/cluster-tutorial) and
[Redis Cluster specifications](https://redis.io/topics/cluster-spec)
to learn more about Redis Cluster.
**Lua Scripting in Cluster Mode**

Cluster mode has limited support for lua scripting.

The following commands are supported, with caveats:
- `EVAL` and `EVALSHA`: The command is sent to the relevant node, depending on
the keys (i.e., in `EVAL "<script>" num_keys key_1 ... key_n ...`). The keys
_must_ all be on the same node. If the script requires 0 keys, _the command is
sent to a random (primary) node_.
- `SCRIPT EXISTS`: The command is sent to all primaries. The result is a list
of booleans corresponding to the input SHA hashes. Each boolean is an AND of
"does the script exist on each node?". In other words, each boolean is True iff
the script exists on all nodes.
- `SCRIPT FLUSH`: The command is sent to all primaries. The result is a bool
AND over all nodes' responses.
- `SCRIPT LOAD`: The command is sent to all primaries. The result is the SHA1
digest.

The following commands are not supported:
- `EVAL_RO`
- `EVALSHA_RO`

Using scripting within pipelines in cluster mode is **not supported**.

### Author

Expand Down
87 changes: 69 additions & 18 deletions redis/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,9 @@ class RedisCluster(RedisClusterCommands):
[
"FLUSHALL",
"FLUSHDB",
"SCRIPT EXISTS",
"SCRIPT FLUSH",
"SCRIPT LOAD",
],
PRIMARIES,
),
Expand Down Expand Up @@ -379,6 +382,24 @@ class RedisCluster(RedisClusterCommands):
],
parse_scan_result,
),
list_keys_to_dict(
[
"SCRIPT LOAD",
],
lambda command, res: list(res.values()).pop(),
),
list_keys_to_dict(
[
"SCRIPT EXISTS",
],
lambda command, res: [all(k) for k in zip(*res.values())],
),
list_keys_to_dict(
[
"SCRIPT FLUSH",
],
lambda command, res: all(res.values()),
),
)

ERRORS_ALLOW_RETRY = (
Expand Down Expand Up @@ -778,40 +799,70 @@ def _get_command_keys(self, *args):
"""
Get the keys in the command. If the command has no keys in in, None is
returned.
NOTE: Due to a bug in redis<7.0, this function does not work properly
for EVAL or EVALSHA when the `numkeys` arg is 0.
- issue: https://github.com/redis/redis/issues/9493
- fix: https://github.com/redis/redis/pull/9733
So, don't use this function with EVAL or EVALSHA.
"""
redis_conn = self.get_default_node().redis_connection
return self.commands_parser.get_keys(redis_conn, *args)

def determine_slot(self, *args):
"""
Figure out what slot based on command and args
Figure out what slot to use based on args.
Raises a RedisClusterException if there's a missing key and we can't
determine what slots to map the command to; or, if the keys don't
all map to the same key slot.
"""
if self.command_flags.get(args[0]) == SLOT_ID:
command = args[0]
if self.command_flags.get(command) == SLOT_ID:
# The command contains the slot ID
return args[1]

# Get the keys in the command
keys = self._get_command_keys(*args)
if keys is None or len(keys) == 0:
raise RedisClusterException(
"No way to dispatch this command to Redis Cluster. "
"Missing key.\nYou can execute the command by specifying "
f"target nodes.\nCommand: {args}"
)

if len(keys) > 1:
# multi-key command, we need to make sure all keys are mapped to
# the same slot
slots = {self.keyslot(key) for key in keys}
if len(slots) != 1:
# EVAL and EVALSHA are common enough that it's wasteful to go to the
# redis server to parse the keys. Besides, there is a bug in redis<7.0
# where `self._get_command_keys()` fails anyway. So, we special case
# EVAL/EVALSHA.
if command in ("EVAL", "EVALSHA"):
# command syntax: EVAL "script body" num_keys ...
if len(args) <= 2:
raise RedisClusterException(f"Invalid args in command: {args}")
num_actual_keys = args[2]
eval_keys = args[3 : 3 + num_actual_keys]
# if there are 0 keys, that means the script can be run on any node
# so we can just return a random slot
if len(eval_keys) == 0:
return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS)
keys = eval_keys
else:
keys = self._get_command_keys(*args)
if keys is None or len(keys) == 0:
raise RedisClusterException(
f"{args[0]} - all keys must map to the same key slot"
"No way to dispatch this command to Redis Cluster. "
"Missing key.\nYou can execute the command by specifying "
f"target nodes.\nCommand: {args}"
)
return slots.pop()
else:
# single key command

# single key command
if len(keys) == 1:
return self.keyslot(keys[0])

# multi-key command; we need to make sure all keys are mapped to
# the same slot
slots = {self.keyslot(key) for key in keys}
if len(slots) != 1:
raise RedisClusterException(
f"{command} - all keys must map to the same key slot"
)

return slots.pop()

def reinitialize_caches(self):
self.nodes_manager.initialize()

Expand Down
9 changes: 8 additions & 1 deletion redis/commands/cluster.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
from redis.crc import key_slot
from redis.exceptions import RedisClusterException, RedisError

from .core import ACLCommands, DataAccessCommands, ManagementCommands, PubSubCommands
from .core import (
ACLCommands,
DataAccessCommands,
ManagementCommands,
PubSubCommands,
ScriptCommands,
)
from .helpers import list_or_args


Expand Down Expand Up @@ -205,6 +211,7 @@ class RedisClusterCommands(
ACLCommands,
PubSubCommands,
ClusterDataAccessCommands,
ScriptCommands,
):
"""
A class for all Redis Cluster commands
Expand Down
56 changes: 56 additions & 0 deletions redis/commands/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5399,6 +5399,62 @@ def command(self) -> ResponseT:
return self.execute_command("COMMAND")


class Script:
"""
An executable Lua script object returned by ``register_script``
"""

def __init__(self, registered_client, script):
self.registered_client = registered_client
self.script = script
# Precalculate and store the SHA1 hex digest of the script.

if isinstance(script, str):
# We need the encoding from the client in order to generate an
# accurate byte representation of the script
encoder = self.get_encoder()
script = encoder.encode(script)
self.sha = hashlib.sha1(script).hexdigest()

def __call__(self, keys=[], args=[], client=None):
"Execute the script, passing any required ``args``"
if client is None:
client = self.registered_client
args = tuple(keys) + tuple(args)
# make sure the Redis server knows about the script
from redis.client import Pipeline

if isinstance(client, Pipeline):
# Make sure the pipeline can register the script before executing.
client.scripts.add(self)
try:
return client.evalsha(self.sha, len(keys), *args)
except NoScriptError:
# Maybe the client is pointed to a different server than the client
# that created this instance?
# Overwrite the sha just in case there was a discrepancy.
self.sha = client.script_load(self.script)
return client.evalsha(self.sha, len(keys), *args)

def get_encoder(self):
"""Get the encoder to encode string scripts into bytes."""
try:
return self.registered_client.get_encoder()
except AttributeError:
# DEPRECATED
# In version <=4.1.2, this was the code we used to get the encoder.
# However, after 4.1.2 we added support for scripting in clustered
# redis. ClusteredRedis doesn't have a `.connection_pool` attribute
# so we changed the Script class to use
# `self.registered_client.get_encoder` (see above).
# However, that is technically a breaking change, as consumers who
# use Scripts directly might inject a `registered_client` that
# doesn't have a `.get_encoder` field. This try/except prevents us
# from breaking backward-compatibility. Ideally, it would be
# removed in the next major release.
return self.registered_client.connection_pool.get_encoder()


class AsyncModuleCommands(ModuleCommands):
async def command_info(self) -> None:
return super().command_info()
Expand Down
17 changes: 16 additions & 1 deletion redis/commands/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,14 @@ def initialize(self, r):
# https://github.com/redis/redis/pull/8324
def get_keys(self, redis_conn, *args):
"""
Get the keys from the passed command
Get the keys from the passed command.
NOTE: Due to a bug in redis<7.0, this function does not work properly
for EVAL or EVALSHA when the `numkeys` arg is 0.
- issue: https://github.com/redis/redis/issues/9493
- fix: https://github.com/redis/redis/pull/9733
So, don't use this function with EVAL or EVALSHA.
"""
if len(args) < 2:
# The command has no keys in it
Expand Down Expand Up @@ -72,6 +79,14 @@ def get_keys(self, redis_conn, *args):
return keys

def _get_moveable_keys(self, redis_conn, *args):
"""
NOTE: Due to a bug in redis<7.0, this function does not work properly
for EVAL or EVALSHA when the `numkeys` arg is 0.
- issue: https://github.com/redis/redis/issues/9493
- fix: https://github.com/redis/redis/pull/9733
So, don't use this function with EVAL or EVALSHA.
"""
pieces = []
cmd_name = args[0]
# The command name should be splitted into separate arguments,
Expand Down
2 changes: 1 addition & 1 deletion redis/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def merge_result(command, res):
Merge all items in `res` into a list.
This command is used when sending a command to multiple nodes
and they result from each node should be merged into a single list.
and the result from each node should be merged into a single list.
res : 'dict'
"""
Expand Down
15 changes: 15 additions & 0 deletions tests/test_command_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from redis.commands import CommandsParser

from .conftest import skip_if_server_version_lt


class TestCommandsParser:
def test_init_commands(self, r):
Expand Down Expand Up @@ -68,6 +70,19 @@ def test_get_moveable_keys(self, r):
assert commands_parser.get_keys(r, *args8) is None
assert commands_parser.get_keys(r, *args9).sort() == ["key1", "key2"].sort()

# A bug in redis<7.0 causes this to fail: https://github.com/redis/redis/issues/9493
@skip_if_server_version_lt("7.0.0")
def test_get_eval_keys_with_0_keys(self, r):
commands_parser = CommandsParser(r)
args = [
"EVAL",
"return {ARGV[1],ARGV[2]}",
0,
"key1",
"key2",
]
assert commands_parser.get_keys(r, *args) == []

def test_get_pubsub_keys(self, r):
commands_parser = CommandsParser(r)
args1 = ["PUBLISH", "foo", "bar"]
Expand Down
Loading

0 comments on commit e5ac39a

Please sign in to comment.