Skip to content

Commit

Permalink
Merge branch 'master' of github.com:redis/redis-py into vv-tba-support
Browse files Browse the repository at this point in the history
  • Loading branch information
vladvildanov committed Dec 9, 2024
2 parents 63059d6 + 8f2276e commit cbed9dc
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 22 deletions.
98 changes: 98 additions & 0 deletions doctests/trans_pipe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# EXAMPLE: pipe_trans_tutorial
# HIDE_START
"""
Code samples for vector database quickstart pages:
https://redis.io/docs/latest/develop/get-started/vector-database/
"""
# HIDE_END
import redis

# STEP_START basic_pipe
r = redis.Redis(decode_responses=True)
# REMOVE_START
for i in range(5):
r.delete(f"seat:{i}")

r.delete("shellpath")
# REMOVE_END

pipe = r.pipeline()

for i in range(5):
pipe.set(f"seat:{i}", f"#{i}")

set_5_result = pipe.execute()
print(set_5_result) # >>> [True, True, True, True, True]

pipe = r.pipeline()

# "Chain" pipeline commands together.
get_3_result = pipe.get("seat:0").get("seat:3").get("seat:4").execute()
print(get_3_result) # >>> ['#0', '#3', '#4']
# STEP_END
# REMOVE_START
assert set_5_result == [True, True, True, True, True]
assert get_3_result == ['#0', '#3', '#4']
# REMOVE_END

# STEP_START trans_watch
r.set("shellpath", "/usr/syscmds/")

with r.pipeline() as pipe:
# Repeat until successful.
while True:
try:
# Watch the key we are about to change.
pipe.watch("shellpath")

# The pipeline executes commands directly (instead of
# buffering them) from immediately after the `watch()`
# call until we begin the transaction.
current_path = pipe.get("shellpath")
new_path = current_path + ":/usr/mycmds/"

# Start the transaction, which will enable buffering
# again for the remaining commands.
pipe.multi()

pipe.set("shellpath", new_path)

pipe.execute()

# The transaction succeeded, so break out of the loop.
break
except redis.WatchError:
# The transaction failed, so continue with the next attempt.
continue

get_path_result = r.get("shellpath")
print(get_path_result) # >>> '/usr/syscmds/:/usr/mycmds/'
# STEP_END
# REMOVE_START
assert get_path_result == '/usr/syscmds/:/usr/mycmds/'
r.delete("shellpath")
# REMOVE_END

# STEP_START watch_conv_method
r.set("shellpath", "/usr/syscmds/")


def watched_sequence(pipe):
current_path = pipe.get("shellpath")
new_path = current_path + ":/usr/mycmds/"

pipe.multi()

pipe.set("shellpath", new_path)


trans_result = r.transaction(watched_sequence, "shellpath")
print(trans_result) # True

get_path_result = r.get("shellpath")
print(get_path_result) # >>> '/usr/syscmds/:/usr/mycmds/'
# REMOVE_START
assert trans_result
assert get_path_result == '/usr/syscmds/:/usr/mycmds/'
# REMOVE_END
# STEP_END
15 changes: 11 additions & 4 deletions redis/_parsers/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,13 +396,20 @@ def parse_item(item):
# an O(N) complexity) instead of the command.
if isinstance(item[3], list):
result["command"] = space.join(item[3])
result["client_address"] = item[4]
result["client_name"] = item[5]

# These fields are optional, depends on environment.
if len(item) >= 6:
result["client_address"] = item[4]
result["client_name"] = item[5]
else:
result["complexity"] = item[3]
result["command"] = space.join(item[4])
result["client_address"] = item[5]
result["client_name"] = item[6]

# These fields are optional, depends on environment.
if len(item) >= 7:
result["client_address"] = item[5]
result["client_name"] = item[6]

return result

return [parse_item(item) for item in response]
Expand Down
8 changes: 7 additions & 1 deletion redis/asyncio/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,13 @@ def __del__(self, _warnings: Any = warnings):
_warnings.warn(
f"unclosed Connection {self!r}", ResourceWarning, source=self
)
self._close()

try:
asyncio.get_running_loop()
self._close()
except RuntimeError:
# No actions been taken if pool already closed.
pass

def _close(self):
"""
Expand Down
6 changes: 1 addition & 5 deletions redis/asyncio/sentinel.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,7 @@ def __init__(self, **kwargs):
super().__init__(**kwargs)

def __repr__(self):
pool = self.connection_pool
s = (
f"<{self.__class__.__module__}.{self.__class__.__name__}"
f"(service={pool.service_name}"
)
s = f"<{self.__class__.__module__}.{self.__class__.__name__}"
if self.host:
host_info = f",host={self.host},port={self.port}"
s += host_info
Expand Down
13 changes: 7 additions & 6 deletions tests/test_asyncio/test_hash.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import math
from datetime import datetime, timedelta

from tests.conftest import skip_if_server_version_lt
Expand Down Expand Up @@ -128,9 +129,9 @@ async def test_hpexpire_multiple_fields(r):
async def test_hexpireat_basic(r):
await r.delete("test:hash")
await r.hset("test:hash", mapping={"field1": "value1", "field2": "value2"})
exp_time = int((datetime.now() + timedelta(seconds=1)).timestamp())
exp_time = math.ceil((datetime.now() + timedelta(seconds=1)).timestamp())
assert await r.hexpireat("test:hash", exp_time, "field1") == [1]
await asyncio.sleep(1.1)
await asyncio.sleep(2.1)
assert await r.hexists("test:hash", "field1") is False
assert await r.hexists("test:hash", "field2") is True

Expand All @@ -139,9 +140,9 @@ async def test_hexpireat_basic(r):
async def test_hexpireat_with_datetime(r):
await r.delete("test:hash")
await r.hset("test:hash", mapping={"field1": "value1", "field2": "value2"})
exp_time = datetime.now() + timedelta(seconds=1)
exp_time = (datetime.now() + timedelta(seconds=2)).replace(microsecond=0)
assert await r.hexpireat("test:hash", exp_time, "field1") == [1]
await asyncio.sleep(1.1)
await asyncio.sleep(2.1)
assert await r.hexists("test:hash", "field1") is False
assert await r.hexists("test:hash", "field2") is True

Expand Down Expand Up @@ -175,9 +176,9 @@ async def test_hexpireat_multiple_fields(r):
"test:hash",
mapping={"field1": "value1", "field2": "value2", "field3": "value3"},
)
exp_time = int((datetime.now() + timedelta(seconds=1)).timestamp())
exp_time = math.ceil((datetime.now() + timedelta(seconds=1)).timestamp())
assert await r.hexpireat("test:hash", exp_time, "field1", "field2") == [1, 1]
await asyncio.sleep(1.5)
await asyncio.sleep(2.1)
assert await r.hexists("test:hash", "field1") is False
assert await r.hexists("test:hash", "field2") is False
assert await r.hexists("test:hash", "field3") is True
Expand Down
20 changes: 20 additions & 0 deletions tests/test_asyncio/test_sentinel.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,3 +264,23 @@ async def mock_disconnect():

assert calls == 1
await pool.disconnect()


@pytest.mark.onlynoncluster
async def test_repr_correctly_represents_connection_object(sentinel):
pool = SentinelConnectionPool("mymaster", sentinel)
connection = await pool.get_connection("PING")

assert (
str(connection)
== "<redis.asyncio.sentinel.SentinelManagedConnection,host=127.0.0.1,port=6379)>" # noqa: E501
)
assert connection.connection_pool == pool
await pool.release(connection)

del pool

assert (
str(connection)
== "<redis.asyncio.sentinel.SentinelManagedConnection,host=127.0.0.1,port=6379)>" # noqa: E501
)
13 changes: 7 additions & 6 deletions tests/test_hash.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import math
import time
from datetime import datetime, timedelta

Expand Down Expand Up @@ -147,9 +148,9 @@ def test_hpexpire_multiple_condition_flags_error(r):
def test_hexpireat_basic(r):
r.delete("test:hash")
r.hset("test:hash", mapping={"field1": "value1", "field2": "value2"})
exp_time = int((datetime.now() + timedelta(seconds=1)).timestamp())
exp_time = math.ceil((datetime.now() + timedelta(seconds=1)).timestamp())
assert r.hexpireat("test:hash", exp_time, "field1") == [1]
time.sleep(1.1)
time.sleep(2.1)
assert r.hexists("test:hash", "field1") is False
assert r.hexists("test:hash", "field2") is True

Expand All @@ -158,9 +159,9 @@ def test_hexpireat_basic(r):
def test_hexpireat_with_datetime(r):
r.delete("test:hash")
r.hset("test:hash", mapping={"field1": "value1", "field2": "value2"})
exp_time = datetime.now() + timedelta(seconds=1)
exp_time = (datetime.now() + timedelta(seconds=2)).replace(microsecond=0)
assert r.hexpireat("test:hash", exp_time, "field1") == [1]
time.sleep(1.1)
time.sleep(2.1)
assert r.hexists("test:hash", "field1") is False
assert r.hexists("test:hash", "field2") is True

Expand Down Expand Up @@ -194,9 +195,9 @@ def test_hexpireat_multiple_fields(r):
"test:hash",
mapping={"field1": "value1", "field2": "value2", "field3": "value3"},
)
exp_time = int((datetime.now() + timedelta(seconds=1)).timestamp())
exp_time = math.ceil((datetime.now() + timedelta(seconds=1)).timestamp())
assert r.hexpireat("test:hash", exp_time, "field1", "field2") == [1, 1]
time.sleep(1.1)
time.sleep(2.1)
assert r.hexists("test:hash", "field1") is False
assert r.hexists("test:hash", "field2") is False
assert r.hexists("test:hash", "field3") is True
Expand Down

0 comments on commit cbed9dc

Please sign in to comment.