Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support google pubsub as adapter #12

Merged
merged 3 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Motivation: Redis provided on the cloud is usually only available within a VPC a
## Supported queue service

- [x] AWS SQS
- [x] Google PubSub

Welcome to contribute more queue service, see [adapter/impl](./redis_canal/adapter/impl/) for more details.

Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ classifiers = [
]

[project.optional-dependencies]
all = ["redis_canal[sqs]"]
all = ["redis_canal[sqs, pubsub]"]
sqs = ["boto3"]
pubsub = ["google-cloud-pubsub"]
test = [
"pytest",
"pytest-asyncio",
Expand Down
152 changes: 152 additions & 0 deletions redis_canal/adapter/impl/pubsub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
from __future__ import annotations

import asyncio
from functools import cached_property
from typing import TYPE_CHECKING, Awaitable

from redis_canal.adapter.plugin import Adapter, hookimpl
from redis_canal.log import logger
from redis_canal.models import Message
from redis_canal.tools import run_in_threadpool

if TYPE_CHECKING:
from google.cloud import pubsub_v1


class PubSubAdapter(Adapter):
"""
PubSub Adapter for Google Cloud Pub/Sub.

Args:
queue_url (str): The topic path, e.g. projects/{project}/topics/{topic}
poll_time (float): The time to poll the queue
poll_size (int): The number of messages to poll at a time
"""

register_name = "pubsub"

def __init__(self, queue_url: str, poll_time: float, poll_size: int, *args, **kwargs):
super().__init__(queue_url, poll_time, poll_size, *args, **kwargs)

if self.poll_time < 1:
self.poll_time = 1
if self.poll_size > 10:
self.poll_size = 10

self.poll_time = self.poll_time
self.ensure_queue_exists()

@property
def topic_path(self) -> str:
return self.queue_url

@property
def subscription_path(self) -> str:
return f"projects/{self.project_id}/subscriptions/{self.subscribe_id}"

@property
def project_id(self) -> str:
return self.topic_path.split("/")[1]

@property
def subscribe_id(self) -> str:
return "redis_cannal_pubsub"

@cached_property
def publisher(self) -> "pubsub_v1.PublisherClient":
try:
from google.cloud import pubsub_v1
except ImportError:
raise RuntimeError(
"Google Cloud Pub/Sub SDK is required to use PubSub Adapter, try install redis_canal with `pip install redis_canal[all]` for all components or `pip install redis_canal[pubsub]` for pubsub"
)

return pubsub_v1.PublisherClient()

@cached_property
def subscriber(self) -> "pubsub_v1.SubscriberClient":
try:
from google.cloud import pubsub_v1
except ImportError:
raise RuntimeError(
"Google Cloud Pub/Sub SDK is required to use PubSub Adapter, try install redis_canal with `pip install redis_canal[all]` for all components or `pip install redis_canal[pubsub]` for pubsub"
)

return pubsub_v1.SubscriberClient()

def ensure_queue_exists(self):
from google.api_core import exceptions

try:
self.publisher.create_topic(
name=self.topic_path,
)
except exceptions.AlreadyExists:
pass

try:
self.subscriber.create_subscription(
name=self.subscription_path,
topic=self.topic_path,
)
except exceptions.AlreadyExists:
pass

async def emit(self, message: Message) -> None:
def _():
response = self.publisher.publish(
topic=self.topic_path,
data=message.model_dump_json().encode("utf-8"),
)

logger.debug(f"Published message {response.result()} to {self.topic_path}")

await run_in_threadpool(_)

async def poll(self, process_func: Awaitable[Message], *args, **kwargs) -> None:
pubsub_messages = await self._poll_message()
await asyncio.gather(
*[
self._process_messages(process_func, message, ack_id)
for message, ack_id in pubsub_messages
]
)

async def _poll_message(self) -> list[tuple[Message, str]]:
def _():
response = self.subscriber.pull(
request={
"subscription": self.subscription_path,
"max_messages": self.poll_size,
}
)
return [
(
Message.model_validate_json(message.message.data.decode("utf-8")),
message.ack_id,
)
for message in response.received_messages
]

return await run_in_threadpool(_)

async def _process_messages(
self, process_func: Awaitable[Message], message: Message, ack_id: str
):
try:
await process_func(message)
except Exception as e:
logger.error(f"Error processing message {message}: {e}")
else:
await run_in_threadpool(
self.subscriber.acknowledge,
request={
"subscription": self.subscription_path,
"ack_ids": [ack_id],
},
)


@hookimpl
def register(manager):
manager.register(PubSubAdapter)
2 changes: 1 addition & 1 deletion redis_canal/adapter/impl/sqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def client(self):
import boto3
except ImportError:
raise RuntimeError(
"boto3 is not installed, try install moriarty with `pip install moriarty[matrix]` for all components or `pip install moriarty[sqs]` for sqs only"
"boto3 is not installed, try install redis_canal with `pip install redis_canal[all]` for all components or `pip install redis_canal[sqs]` for sqs"
)
return boto3.client("sqs")

Expand Down
54 changes: 54 additions & 0 deletions tests/adapter/test_pubsub_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import os

import pytest

from redis_canal.adapter.impl.pubsub import PubSubAdapter
from redis_canal.models import Message


@pytest.fixture
def pubsub_adapter(case_id):
try:
import google.api_core.exceptions as exceptions
import google.cloud.pubsub_v1

except ImportError:
pytest.skip("Google Cloud Pub/Sub SDK is not installed")

project_id = os.getenv("TEST_PUBSUB_PROJECT_ID")
if not project_id:
pytest.skip("TEST_PUBSUB_PROJECT_ID is not configured")

queue_url = f"projects/{project_id}/topics/test-topic"
try:
adapter = PubSubAdapter(
queue_url=queue_url,
poll_time=1,
poll_size=10,
)
except Exception as e:
pytest.skip(f"Google Cloud Pub/Sub SDK is not configured correctly: {e}")

yield adapter
try:
adapter.publisher.delete_topic(request={"topic": queue_url})
except exceptions.NotFound:
pass


async def test_pubsub_adapter(pubsub_adapter):

message_input = Message(
redis_key="test",
message_id="123-345",
message_content={"f1": "v1"},
)

async def validate(message):
assert message == message_input
print("validated!")

await pubsub_adapter.emit(message_input)
await pubsub_adapter.poll(
process_func=validate,
)
2 changes: 1 addition & 1 deletion tests/test_adapter_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@


def test_bridge_manager():
registed = ["sqs"]
registed = ["sqs", "pubsub"]
manager = AdapterManager()

assert sorted(registed) == sorted(manager.registed_cls.keys())
Loading