Skip to content

Commit

Permalink
feat: async pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
jamagalhaes committed Jan 31, 2025
1 parent 214f621 commit 2cc23a2
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 6 deletions.
10 changes: 9 additions & 1 deletion logger/logger.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import asyncio
import logging
import os
from typing import Any

from pubsub.asyncio.pubsub import Publisher as AsyncPublisher
from pubsub.pubsub import Publisher


Expand All @@ -10,12 +12,18 @@ class PubSubLogHandler(logging.Handler):

def __init__(self) -> None:
self.publisher = Publisher()
self.async_publisher = AsyncPublisher()
logging.Handler.__init__(self=self)

def emit(self, record: Any) -> None:
"""Publishes the log message to the pub/sub channel."""
message = self.format(record)
self.publisher.publish(message)

try:
loop = asyncio.get_running_loop()
loop.create_task(self.async_publisher.publish(message))
except RuntimeError:
self.publisher.publish(message)


class PluginLogger:
Expand Down
Empty file added pubsub/asyncio/__init__.py
Empty file.
20 changes: 20 additions & 0 deletions pubsub/asyncio/pubsub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from typing import Any

import redis.asyncio as redis

from pubsub.pubsub import PubSubBase


class Publisher(PubSubBase):
"""Publisher class for pub/sub."""

def _create_client(self) -> redis.Redis | None:
if self.redis_endpoint and self.channel:
return redis.Redis.from_url(self.redis_endpoint, decode_responses=True)

return None

async def publish(self, message: Any) -> None:
"""Publishes a message to the channel."""
if self.client and self.channel:
await self.client.publish(self.channel, message)
12 changes: 7 additions & 5 deletions pubsub/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,7 @@ def _get_channel_name(self) -> str | None:

return None

def _create_client(self) -> redis.Redis | None:
if self.redis_endpoint and self.channel:
return redis.Redis.from_url(self.redis_endpoint, decode_responses=True)

return None
def _create_client(self) -> redis.Redis | redis.asyncio.Redis | None: ...


class Publisher(PubSubBase):
Expand All @@ -36,3 +32,9 @@ def publish(self, message: Any) -> None:
"""Publishes a message to the channel."""
if self.client and self.channel:
self.client.publish(self.channel, message)

def _create_client(self) -> redis.Redis | None:
if self.redis_endpoint and self.channel:
return redis.Redis.from_url(self.redis_endpoint, decode_responses=True)

return None

0 comments on commit 2cc23a2

Please sign in to comment.