Skip to content

Commit

Permalink
fixup! Refactor PubSub, Server & Publishers
Browse files Browse the repository at this point in the history
  • Loading branch information
roekatz committed Sep 5, 2024
1 parent 9c82461 commit 859d8b9
Showing 1 changed file with 2 additions and 3 deletions.
5 changes: 2 additions & 3 deletions packages/opal-server/opal_server/statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ def __init__(self, pubsub):
self._lock = asyncio.Lock()
self._synced_after_wakeup = asyncio.Event()
self._received_sync_messages: Set[str] = set()
self._publish_tasks = TasksPool()
self._seen_servers: Dict[str, datetime] = {}
self._periodic_keepalive_task: asyncio.Task | None = None

Expand Down Expand Up @@ -174,14 +173,14 @@ async def start(self):
)
self._pubsub.endpoint.notifier.register_unsubscribe_event(
self.remove_client
) # TODO: Fix that
) # TODO: Should have a better way to handle this

# wait before publishing the wakeup message, due to the fact we are
# counting on the broadcaster to listen and to replicate the message
# to the other workers / server nodes in the networks.
# However, since broadcaster is using asyncio.create_task(), there is a
# race condition that is mitigated by this asyncio.sleep() call.
# await asyncio.sleep(SLEEP_TIME_FOR_BROADCASTER_READER_TO_START)
await asyncio.sleep(SLEEP_TIME_FOR_BROADCASTER_READER_TO_START)
# Let all the other opal servers know that new opal server started
logger.info(f"sending stats wakeup message: {self._worker_id}")
await self._publish(
Expand Down

0 comments on commit 859d8b9

Please sign in to comment.