diff --git a/README.md b/README.md index 1c82693..fee5747 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# reddish - an async redis client with minimal api +# reddish - an redis client for sockets and trio with minimal api * [Features](#features) * [Installation](#installation) @@ -6,6 +6,8 @@ * [Usage](#usage) ## Features +* both sync and async API +* sync api using the standard library `socket` module (TPC, TPC+TLS, Unix domain sockets) * `async`/`await` using `trio`'s stream primitives (TCP, TCP+TLS, Unix domain sockets) * minimal api so you don't have to relearn how to write redis commands * supports all redis commands including modules except `SUBSCRIBE`, `PSUBSCRIBE` and `MONITOR` [^footnote] @@ -17,10 +19,21 @@ barring regular commands from being issued over the connection. ## Installation ``` +pip install reddish # install just with support for socket pip install reddish[trio] # install with support for trio ``` -## Minimal Example +## Minimal Example - sync version +```python +import socket +from reddish.socket import Redis, Command + +redis = Redis(socket.create_connection(('localhost', 6379))) + +assert b'PONG' == redis.execute(Command('PING')) +``` + +## Minimal Example - async version ```python import trio from reddish.trio import Redis, Command diff --git a/reddish/socket/__init__.py b/reddish/socket/__init__.py new file mode 100644 index 0000000..1fd4d47 --- /dev/null +++ b/reddish/socket/__init__.py @@ -0,0 +1,2 @@ +from ._client import Redis # noqa +from .._command import Args, Command, MultiExec # noqa \ No newline at end of file diff --git a/reddish/socket/_client.py b/reddish/socket/_client.py new file mode 100644 index 0000000..05dd118 --- /dev/null +++ b/reddish/socket/_client.py @@ -0,0 +1,68 @@ +import socket +import threading +from .._sansio import RedisSansIO +from .._errors import BrokenConnectionError + +from .._typing import CommandType + + +class Redis: + + def __init__(self, stream: socket.socket): + """Redis client for executing commands. + + Args: + stream: a `socket.socket` connected to a redis server. + """ + + if not isinstance(stream, socket.socket): + TypeError(f"'{repr(stream)}' is not an instance of '{repr(socket.socket)}'") + try: + stream.getpeername() + except OSError: + raise TypeError(f"'{repr(stream)}' is not connected") from None + self._stream = stream + self._lock = threading.Lock() + self._redis = RedisSansIO() + + def execute_many(self, *commands: CommandType): + """Execute multiple redis commands at once. + + Args: + *commands: The commands to be executed. + + Returns: + Responses from redis as received or parsed into the types + provided to the commands. + """ + + redis = self._redis + stream = self._stream + + with self._lock: + try: + request = redis.send(commands) + stream.sendall(request) + + while True: + data = stream.recv(4096) + if data == b'': + raise BrokenConnectionError() + replies = redis.receive(data) + if replies: + return replies + except OSError: + redis.mark_broken() + raise BrokenConnectionError() + + def execute(self, command: CommandType): + """Execute a single redis command. + + Args: + command: The command to be executed. + + Returns: + Response from redis as received or parsed into the type + provided to the command. + """ + return self.execute_many(command)[0] diff --git a/tests/socket/test_socket.py b/tests/socket/test_socket.py new file mode 100644 index 0000000..c86c04e --- /dev/null +++ b/tests/socket/test_socket.py @@ -0,0 +1,54 @@ +import socket +import pytest +from concurrent.futures import ThreadPoolExecutor + +from reddish.socket import Redis, Command +from reddish._errors import BrokenConnectionError + + +@pytest.fixture +def unconnected_socket(): + return socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + +@pytest.fixture +def connected_socket(unconnected_socket): + unconnected_socket.connect(('localhost', 6379)) + yield unconnected_socket + unconnected_socket.close() + + +@pytest.fixture +def redis(connected_socket): + return Redis(connected_socket) + + +@pytest.fixture +def ping(): + return Command('PING').into(str) + + +def test_execute(redis, ping): + 'PONG' == redis.execute(ping) + + +def test_execute_many(redis, ping): + ['PONG', 'PONG'] == redis.execute_many(ping, ping) + + +def test_stream(unconnected_socket): + with pytest.raises(TypeError): + Redis(unconnected_socket) + + +def test_closed_connection(connected_socket, ping): + redis = Redis(connected_socket) + connected_socket.close() + with pytest.raises(BrokenConnectionError): + redis.execute(ping) + + +def test_concurrent_requests(redis, ping): + with ThreadPoolExecutor(max_workers=10) as pool: + for i in range(10): + pool.submit(redis.execute, ping)