Skip to content

Commit

Permalink
Imp sqs adapter (#3)
Browse files Browse the repository at this point in the history
* Imp conftest for sqs

* Add skip if not config boto3

* Fix test case

* Fix test

* Support sqs adapter

* Add support in readme

* Add Architecture.png
  • Loading branch information
Wh1isper authored Jun 11, 2024
1 parent 574e13f commit f7a8322
Show file tree
Hide file tree
Showing 7 changed files with 222 additions and 4 deletions.
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,20 @@

# redis-canal

Proxy redis stream from one to another through global queue service.

Motivation: Redis provided on the cloud is usually only available within a VPC and does not provide external access. We want to provide a way to synchronize messages across clouds without going through a VPN.

![Architecture Overview](./assets/Architecture.png)

## Supported queue service

- [x] AWS SQS

Welcome to contribute more queue service, see [adapter/impl](./redis_canal/adapter/impl/) for more details.

We also support the plugin system, if you adapter can also be imported as a third-party library and used. See [example](./example/extension/custom-adapter/) for more details.

## Install

`pip install redis-canal[all]` for all components.
Expand Down
60 changes: 60 additions & 0 deletions assets/Architecture.drawio
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
<mxfile host="65bd71144e">
<diagram id="mhvqekLvz7hUb_vLKXXU" name="第 1 页">
<mxGraphModel dx="615" dy="856" grid="1" gridSize="10" guides="1" tooltips="1" connect="1" arrows="1" fold="1" page="1" pageScale="1" pageWidth="827" pageHeight="1169" math="0" shadow="0">
<root>
<mxCell id="0"/>
<mxCell id="1" parent="0"/>
<mxCell id="21" value="" style="ellipse;shape=cloud;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="490" y="200" width="280" height="260" as="geometry"/>
</mxCell>
<mxCell id="22" value="Cluster B" style="text;html=1;align=center;verticalAlign=middle;resizable=0;points=[];autosize=1;strokeColor=none;fillColor=none;" vertex="1" parent="1">
<mxGeometry x="560" y="376" width="70" height="30" as="geometry"/>
</mxCell>
<mxCell id="17" value="" style="ellipse;shape=cloud;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="70" y="180" width="280" height="260" as="geometry"/>
</mxCell>
<mxCell id="11" value="" style="edgeStyle=none;html=1;" edge="1" parent="1" source="2" target="6">
<mxGeometry relative="1" as="geometry"/>
</mxCell>
<mxCell id="9" value="" style="edgeStyle=none;html=1;exitX=0;exitY=0.5;exitDx=0;exitDy=0;" edge="1" parent="1" source="13" target="5">
<mxGeometry relative="1" as="geometry"/>
</mxCell>
<mxCell id="12" value="Push" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" vertex="1" connectable="0" parent="9">
<mxGeometry x="-0.0444" y="5" relative="1" as="geometry">
<mxPoint y="-5" as="offset"/>
</mxGeometry>
</mxCell>
<mxCell id="3" value="redis-cannal&lt;br&gt;stream-to-queue" style="rounded=0;whiteSpace=wrap;html=1;fillColor=#d5e8d4;strokeColor=#82b366;" vertex="1" parent="1">
<mxGeometry x="590" y="260" width="160" height="60" as="geometry"/>
</mxCell>
<mxCell id="5" value="Grobal Queue" style="shape=process;whiteSpace=wrap;html=1;backgroundOutline=1;fillColor=#fff2cc;strokeColor=#d6b656;" vertex="1" parent="1">
<mxGeometry x="360" y="260" width="120" height="60" as="geometry"/>
</mxCell>
<mxCell id="6" value="" style="sketch=0;aspect=fixed;html=1;points=[];align=center;image;fontSize=12;image=img/lib/mscae/Cache_Redis_Product.svg;" vertex="1" parent="1">
<mxGeometry x="155" y="370" width="50" height="42" as="geometry"/>
</mxCell>
<mxCell id="8" value="" style="edgeStyle=none;html=1;" edge="1" parent="1" source="7" target="3">
<mxGeometry relative="1" as="geometry"/>
</mxCell>
<mxCell id="7" value="" style="sketch=0;aspect=fixed;html=1;points=[];align=center;image;fontSize=12;image=img/lib/mscae/Cache_Redis_Product.svg;" vertex="1" parent="1">
<mxGeometry x="645" y="380" width="50" height="42" as="geometry"/>
</mxCell>
<mxCell id="13" value="adapter" style="whiteSpace=wrap;html=1;fillColor=#dae8fc;strokeColor=#6c8ebf;" vertex="1" parent="1">
<mxGeometry x="560" y="235" width="50" height="110" as="geometry"/>
</mxCell>
<mxCell id="10" value="Poll" style="edgeStyle=none;html=1;entryX=1;entryY=0.5;entryDx=0;entryDy=0;" edge="1" parent="1" source="5" target="16">
<mxGeometry relative="1" as="geometry"/>
</mxCell>
<mxCell id="2" value="redis-cannal&lt;br&gt;queue-to-stream" style="rounded=0;whiteSpace=wrap;html=1;fillColor=#d5e8d4;strokeColor=#82b366;" vertex="1" parent="1">
<mxGeometry x="120" y="260" width="120" height="60" as="geometry"/>
</mxCell>
<mxCell id="16" value="adapter" style="whiteSpace=wrap;html=1;fillColor=#dae8fc;strokeColor=#6c8ebf;" vertex="1" parent="1">
<mxGeometry x="230" y="235" width="50" height="110" as="geometry"/>
</mxCell>
<mxCell id="18" value="Cluster A" style="text;html=1;align=center;verticalAlign=middle;resizable=0;points=[];autosize=1;strokeColor=none;fillColor=none;" vertex="1" parent="1">
<mxGeometry x="220" y="370" width="70" height="30" as="geometry"/>
</mxCell>
</root>
</mxGraphModel>
</diagram>
</mxfile>
Binary file added assets/Architecture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
8 changes: 8 additions & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
coverage:
status:
project:
default:
threshold: 100%
patch:
default:
threshold: 100%
76 changes: 73 additions & 3 deletions redis_canal/adapter/impl/sqs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import asyncio
from functools import cached_property
from typing import Awaitable

from redis_canal.adapter.plugin import Adapter, hookimpl
from redis_canal.log import logger
from redis_canal.models import Message
from redis_canal.tools import run_in_threadpool


class SQSAdapter(Adapter):
Expand All @@ -15,7 +20,14 @@ def __init__(
**kwargs,
):
super().__init__(queue_url, poll_time, poll_size, *args, **kwargs)
self.create_queue_if_not_exists()

if self.poll_time < 1:
self.poll_time = 1
if self.poll_size > 10:
self.poll_size = 10

self.poll_time = self.poll_time
self.ensure_queue_exists()

@cached_property
def client(self):
Expand All @@ -27,8 +39,66 @@ def client(self):
)
return boto3.client("sqs")

def create_queue_if_not_exists(self):
pass
def ensure_queue_exists(self):
queue_name = self.queue_url.split("/")[-1]
try:
if self.client.get_queue_url(QueueName=queue_name):
return
except self.client.exceptions.QueueDoesNotExist:
logger.error(f"Queue {self.queue_url} does not exist")
raise

async def emit(self, message: Message) -> None:
await run_in_threadpool(
self.client.send_message,
QueueUrl=self.queue_url,
MessageBody=message.model_dump_json(),
)

def _poll_message(self) -> list[tuple[Message, str]]:
valid_messages = []
response = self.client.receive_message(
QueueUrl=self.queue_url,
MaxNumberOfMessages=self.poll_size,
WaitTimeSeconds=self.poll_time,
)
if "Messages" not in response:
return valid_messages

for message in response["Messages"]:
receipt_handle = message["ReceiptHandle"]
try:
body = message["Body"]
message = Message.model_validate_json(body)
except Exception as e:
logger.error(f"Error parsing message {body}: {e}")
logger.exception(e)
else:
valid_messages.append((message, receipt_handle))
return valid_messages

async def _process_messages(
self, process_func: Awaitable[Message], message: Message, receipt_handle: str
):
try:
await process_func(message)
except Exception as e:
logger.error(f"Error processing message {message}: {e}")
else:
await run_in_threadpool(
self.client.delete_message,
QueueUrl=self.queue_url,
ReceiptHandle=receipt_handle,
)

async def poll(self, process_func: Awaitable[Message], *args, **kwargs) -> None:
response_and_receipt_handles = await run_in_threadpool(self._poll_message)
await asyncio.gather(
*[
self._process_messages(process_func, message, receipt_handle)
for message, receipt_handle in response_and_receipt_handles
]
)


@hookimpl
Expand Down
16 changes: 15 additions & 1 deletion redis_canal/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,30 @@
import contextlib
import functools
import inspect
import typing
from contextlib import asynccontextmanager
from datetime import datetime
from functools import wraps
from typing import Any, AsyncGenerator
from typing import Any, AsyncGenerator, ParamSpec, TypeVar

import anyio
import redis.asyncio as redis

from redis_canal.log import logger

P = ParamSpec("P")
T = TypeVar("T")


async def run_in_threadpool(func: typing.Callable[P, T], *args: P.args, **kwargs: P.kwargs) -> T:
"""
From fastapi.concurrency
"""
if kwargs: # pragma: no cover
# run_sync doesn't accept 'kwargs', so bind them in here
func = functools.partial(func, **kwargs)
return await anyio.to_thread.run_sync(func, *args)


def coro(f):
@wraps(f)
Expand Down
52 changes: 52 additions & 0 deletions tests/adapter/test_sqs_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import pytest

from redis_canal.adapter.impl.sqs import SQSAdapter
from redis_canal.models import Message


@pytest.fixture
def sqs_adapter(case_id):
try:
import boto3

sqs = boto3.client("sqs")
except ImportError:
pytest.skip("boto3 is not installed")
except Exception:
pytest.skip("boto3 is not configured")

queue_name = f"redis-canal-test-{case_id}"
try:
queue_url = sqs.create_queue(QueueName=queue_name)["QueueUrl"]
except sqs.exceptions.QueueAlreadyExists:
pass
except Exception:
pytest.skip("boto3 is not configured")

adapter = SQSAdapter(
queue_url=queue_url,
poll_time=1,
poll_size=10,
)
yield adapter
try:
sqs.delete_queue(QueueUrl=queue_url)
except sqs.exceptions.QueueDoesNotExist:
pass


async def test_sqs_adapter(sqs_adapter):
message_input = Message(
redis_key="test",
message_id="123-345",
message_content={"f1": "v1"},
)

async def validate(message):
assert message == message_input
print("validated!")

await sqs_adapter.emit(message_input)
await sqs_adapter.poll(
process_func=validate,
)

0 comments on commit f7a8322

Please sign in to comment.