diff --git a/logger/logger.py b/logger/logger.py index 043e5019..617094ab 100644 --- a/logger/logger.py +++ b/logger/logger.py @@ -1,7 +1,9 @@ +import asyncio import logging import os from typing import Any +from pubsub.asyncio.pubsub import Publisher as AsyncPublisher from pubsub.pubsub import Publisher @@ -10,12 +12,18 @@ class PubSubLogHandler(logging.Handler): def __init__(self) -> None: self.publisher = Publisher() + self.async_publisher = AsyncPublisher() logging.Handler.__init__(self=self) def emit(self, record: Any) -> None: """Publishes the log message to the pub/sub channel.""" message = self.format(record) - self.publisher.publish(message) + + try: + loop = asyncio.get_running_loop() + loop.create_task(self.async_publisher.publish(message)) + except RuntimeError: + self.publisher.publish(message) class PluginLogger: diff --git a/pubsub/asyncio/__init__.py b/pubsub/asyncio/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pubsub/asyncio/pubsub.py b/pubsub/asyncio/pubsub.py new file mode 100644 index 00000000..8e2611ab --- /dev/null +++ b/pubsub/asyncio/pubsub.py @@ -0,0 +1,20 @@ +from typing import Any + +import redis.asyncio as redis + +from pubsub.pubsub import PubSubBase + + +class Publisher(PubSubBase): + """Publisher class for pub/sub.""" + + def _create_client(self) -> redis.Redis | None: + if self.redis_endpoint and self.channel: + return redis.Redis.from_url(self.redis_endpoint, decode_responses=True) + + return None + + async def publish(self, message: Any) -> None: + """Publishes a message to the channel.""" + if self.client and self.channel: + await self.client.publish(self.channel, message) diff --git a/pubsub/pubsub.py b/pubsub/pubsub.py index d12e6ff1..bbb4103e 100644 --- a/pubsub/pubsub.py +++ b/pubsub/pubsub.py @@ -22,11 +22,7 @@ def _get_channel_name(self) -> str | None: return None - def _create_client(self) -> redis.Redis | None: - if self.redis_endpoint and self.channel: - return redis.Redis.from_url(self.redis_endpoint, decode_responses=True) - - return None + def _create_client(self) -> redis.Redis | redis.asyncio.Redis | None: ... class Publisher(PubSubBase): @@ -36,3 +32,9 @@ def publish(self, message: Any) -> None: """Publishes a message to the channel.""" if self.client and self.channel: self.client.publish(self.channel, message) + + def _create_client(self) -> redis.Redis | None: + if self.redis_endpoint and self.channel: + return redis.Redis.from_url(self.redis_endpoint, decode_responses=True) + + return None