Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: embed the synchronizer process within the plugin runner #333

Merged
merged 18 commits into from
Feb 2, 2025

Conversation

djantzen
Copy link
Contributor

@djantzen djantzen commented Jan 16, 2025

https://canvasmedical.atlassian.net/browse/KOALA-2440

At present, the plugin synchronizer runs as a separate process managed by Circus. Problems with doing so:

  • it requires about 60mb of memory to run
  • its actions have to be coordinated with the plugin runner via a SIGHUP

This PR moves the responsibilities of the synchronizer into an async task performed by the plugin runner.

Related home-app PR: https://github.com/canvas-medical/canvas/pull/17170

@djantzen djantzen changed the title wip to see if async pubsub listener works fix: wip to see if async pubsub listener works Jan 16, 2025
@djantzen djantzen force-pushed the fix/embed-synchronizer branch 4 times, most recently from e223e50 to 5f29cba Compare January 23, 2025 02:56
@djantzen djantzen marked this pull request as ready for review January 23, 2025 02:57
@djantzen djantzen requested a review from a team as a code owner January 23, 2025 02:57
@djantzen djantzen force-pushed the fix/embed-synchronizer branch 2 times, most recently from e73a389 to 5f29cba Compare January 23, 2025 05:25
@djantzen djantzen changed the title fix: wip to see if async pubsub listener works fix: embed the synchronizer process within the plugin runner Jan 23, 2025
@djantzen djantzen force-pushed the fix/embed-synchronizer branch from 3c87abd to 9659288 Compare January 24, 2025 05:44
Copy link
Collaborator

@jamagalhaes jamagalhaes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overal LGTM.
Since we are using a single thread with concurrency, I would add a await asyncio.sleep(0.1) at the end of the loop to prevent a busy loop when getting messages from the pubsub channel

plugin_runner/plugin_runner.py Outdated Show resolved Hide resolved
plugin_runner/plugin_installer.py Show resolved Hide resolved
@djantzen djantzen closed this Jan 29, 2025
@djantzen djantzen reopened this Jan 29, 2025
@djantzen djantzen force-pushed the fix/embed-synchronizer branch from 1be3823 to 0856151 Compare January 29, 2025 20:35
@jamagalhaes
Copy link
Collaborator

@djantzen it won't prevent other operations from occurring but we don't need to continuously using cpu and network to check for new messages. I would say that 1 second of delay it's totally fine.

@jamagalhaes jamagalhaes force-pushed the fix/embed-synchronizer branch from 0856151 to 3e9d520 Compare January 29, 2025 22:25
settings.py Outdated Show resolved Hide resolved
@beaugunderson
Copy link
Member

Since we are using a single thread with concurrency, I would add a await asyncio.sleep(0.1) at the end of the loop to prevent a busy loop when getting messages from the pubsub channel

get_message is a blocking call, it accepts an optional timeout parameter, we should specify that and not add our own asyncio.sleep:

28119

@jamagalhaes
Copy link
Collaborator

Since we are using a single thread with concurrency, I would add a await asyncio.sleep(0.1) at the end of the loop to prevent a busy loop when getting messages from the pubsub channel

get_message is a blocking call, it accepts an optional timeout parameter, we should specify that and not add our own asyncio.sleep:

28119

totally forgot that get_message already allows to specificy a timeout. so yes, no need to use asyncio.sleep 👍

@jamagalhaes
Copy link
Collaborator

jamagalhaes commented Jan 29, 2025

@djantzen @beaugunderson since we have the capability of specifying a timeout with get_message, we could set it to None to wait indefinitely. This will maximize efficiency

@beaugunderson
Copy link
Member

@djantzen @beaugunderson since we have the capability of specifying a timeout with get_message, we could set it to None to wait indefinitely. This will maximize efficiency

then blocking=True would be the case right and it turns into a blocking read?

@djantzen
Copy link
Contributor Author

@djantzen it won't prevent other operations from occurring but we don't need to continuously using cpu and network to check for new messages. I would say that 1 second of delay it's totally fine.

1 second, or a tenth of a second? I'm not entirely clear on the tradeoffs here. Seems like a longer timeout would make the listener more responsive to pubsub messages, at the cost of less responsiveness to events firing within the system. I'd think we want to prioritize the events rather than the reload commands.

@jamagalhaes
Copy link
Collaborator

@djantzen @beaugunderson since we have the capability of specifying a timeout with get_message, we could set it to None to wait indefinitely. This will maximize efficiency

then blocking=True would be the case right and it turns into a blocking read?

Initially, I thought the same—that calling await redis.get_message(timeout=None) would block the entire event loop indefinitely. However, after looking into the implementation, that’s not what actually happens under the hood.

The key reason is that, although the function signature and docstrings remain unchanged for asyncio, the async version of get_message is built on top of asyncio streams. These streams are designed to be non-blocking, meaning they release control of the event loop while waiting for a new message.

Here’s what happens:

  • When get_message(timeout=None) is called, it internally does not perform a traditional blocking wait.
  • Instead, it relies on asyncio.StreamReader.read which pauses execution and yields control back to the event loop.
  • This allows other async tasks to run in parallel while waiting for a new Redis message to arrive.

I agree that this isn't explicitly clear in the documentation, but after debugging the implementation, I can confirm that this is how it works. So, using timeout=None is safe—it won’t block the entire event loop, just the coroutine that awaits the message.

@djantzen
Copy link
Contributor Author

@djantzen @beaugunderson since we have the capability of specifying a timeout with get_message, we could set it to None to wait indefinitely. This will maximize efficiency

then blocking=True would be the case right and it turns into a blocking read?

Initially, I thought the same—that calling await redis.get_message(timeout=None) would block the entire event loop indefinitely. However, after looking into the implementation, that’s not what actually happens under the hood.

The key reason is that, although the function signature and docstrings remain unchanged for asyncio, the async version of get_message is built on top of asyncio streams. These streams are designed to be non-blocking, meaning they release control of the event loop while waiting for a new message.

Here’s what happens:

  • When get_message(timeout=None) is called, it internally does not perform a traditional blocking wait.
  • Instead, it relies on asyncio.StreamReader.read which pauses execution and yields control back to the event loop.
  • This allows other async tasks to run in parallel while waiting for a new Redis message to arrive.

I agree that this isn't explicitly clear in the documentation, but after debugging the implementation, I can confirm that this is how it works. So, using timeout=None is safe—it won’t block the entire event loop, just the coroutine that awaits the message.

Thanks for digging into the code @jamagalhaes . This makes sense to me. I was beginning to wonder about the whole point of asyncio if we still had to perform tricks like sleeping and timing out to avoid blocking other async tasks.

@jamagalhaes
Copy link
Collaborator

Thanks for digging into the code @jamagalhaes . This makes sense to me. I was beginning to wonder about the whole point of asyncio if we still had to perform tricks like sleeping and timing out to avoid blocking other async tasks.

@djantzen you're welcome. you still need to explicitly specify timeout=None because by default timeout is 0.

@djantzen
Copy link
Contributor Author

djantzen commented Jan 30, 2025 via email

@beaugunderson
Copy link
Member

beaugunderson commented Jan 30, 2025

Actually, it may be unnecessary since we’re getting host as part of our other metrics already. I’ll remove it.

it's not used here so can be removed without changing anything but since these are statsd metrics they include the metrics from the environment as defined in telegraf.conf, namely:

[global_tags]
  source = "home-app"
  customer = "$CUSTOMER_IDENTIFIER"
  release_channel = "$RELEASE_CHANNEL"
  process_type = "$APTIBLE_PROCESS_TYPE"
  organization = "$ORGANIZATION"

none of which is the hostname, so if you need it you'll need to add it still

@beaugunderson
Copy link
Member

n/m we discussed more here, telegraf will set it correctly https://canvas-medical.slack.com/archives/C1UGTHCKS/p1738271118532679

@djantzen
Copy link
Contributor Author

djantzen commented Jan 30, 2025 via email

@csande
Copy link
Contributor

csande commented Jan 30, 2025

Would the problem manifest if Redis pubsub became overburdened and was slow to receive and acknowledge requests, thereby blocking other tasks? TBH it didn’t even register with me that logs of these administrative actions would get routed through Redis. Do developers really need to see this activity?

On Jan 30, 2025, at 9:15 AM, Christopher Sande @.> wrote: @csande commented on this pull request. In plugin_runner/plugin_runner.py <#333 (comment)>: > @@ -192,14 +198,50 @@ async def ReloadPlugins( self, request: ReloadPluginsRequest, context: Any ) -> AsyncGenerator[ReloadPluginsResponse, None]: """This is invoked when we need to reload plugins.""" + log.info("Reloading plugins...") Is this statement a blocking call, in addition to the other calls to the logger? If you dig into our logging module, it looks like the PubSubBase class uses the non-asyncio version of Redis. FWIW, we're also calling it a few times in HandleEvent, another asyncio context. — Reply to this email directly, view it on GitHub <#333 (review)>, or unsubscribe https://github.com/notifications/unsubscribe-auth/AALB6UCTRUAATKOPY6KEUET2NJM4NAVCNFSM6AAAAABVIVS4ZWVHI2DSMVQWIX3LMV43YUDVNRWFEZLROVSXG5CSMV3GSZLXHMZDKOBUGUZTOOJTHE. You are receiving this because you were mentioned.
-- Disclaimer: This e-mail and any attachments may contain confidential information. If you are not the intended recipient, any disclosure, copying, distribution or use of any information contained herein is strictly prohibited. If you have received this transmission in error, please immediately notify the Security Officer @.
>, and destroy the original transmission and any attachments without reading or saving.

I suppose this is theoretical until we see it happen, but if Redis were overburdened, it could block the main event loop. If the main event loop is blocked, then that would affect the ability of the plugin runner to continue handling incoming events. On the home-app side, send_to_plugin_runner is a blocking call, and we're funneling more and more activity through that function as we add more event hooks. So you'd see home-app affected when it's unable to receive responses from the plugin runner.

It's worth mentioning that if we were using async Redis, and Redis were overburdened, we would still see problems. They would probably just manifest differently. Tasks would still have trouble running, but the reason would be due to Redis directly, rather than being blocked by other tasks. i.e. being blocked indirectly by Redis.

When you introduce blocking calls in an asyncio project, it can be a chokepoint for everything. Even if Redis isn't overburdened, the Plugin Runner main event loop can't do anything else for the few milliseconds that it is communicating with the sync Redis client.

There are a few things we could do.

  • As has been discussed, we could get rid of asyncio (at least for now).
  • Developers need a sync logger that publishes to Redis, so we can't touch that, but we could make an async logger for the Plugin Runner to use.
  • Redis timeout values and error handling would be another thing to look at. In Fumage, errors with the Redis token introspection cache do not impede the ability of the API to continue servicing requests.

@jamagalhaes
Copy link
Collaborator

Would the problem manifest if Redis pubsub became overburdened and was slow to receive and acknowledge requests, thereby blocking other tasks? TBH it didn’t even register with me that logs of these administrative actions would get routed through Redis. Do developers really need to see this activity?

On Jan 30, 2025, at 9:15 AM, Christopher Sande @.> wrote: @csande commented on this pull request. In plugin_runner/plugin_runner.py <#333 (comment)>: > @@ -192,14 +198,50 @@ async def ReloadPlugins( self, request: ReloadPluginsRequest, context: Any ) -> AsyncGenerator[ReloadPluginsResponse, None]: """This is invoked when we need to reload plugins.""" + log.info("Reloading plugins...") Is this statement a blocking call, in addition to the other calls to the logger? If you dig into our logging module, it looks like the PubSubBase class uses the non-asyncio version of Redis. FWIW, we're also calling it a few times in HandleEvent, another asyncio context. — Reply to this email directly, view it on GitHub <#333 (review)>, or unsubscribe https://github.com/notifications/unsubscribe-auth/AALB6UCTRUAATKOPY6KEUET2NJM4NAVCNFSM6AAAAABVIVS4ZWVHI2DSMVQWIX3LMV43YUDVNRWFEZLROVSXG5CSMV3GSZLXHMZDKOBUGUZTOOJTHE. You are receiving this because you were mentioned.
-- Disclaimer: This e-mail and any attachments may contain confidential information. If you are not the intended recipient, any disclosure, copying, distribution or use of any information contained herein is strictly prohibited. If you have received this transmission in error, please immediately notify the Security Officer _
@**
.**_>, and destroy the original transmission and any attachments without reading or saving.

I suppose this is theoretical until we see it happen, but if Redis were overburdened, it could block the main event loop. If the main event loop is blocked, then that would affect the ability of the plugin runner to continue handling incoming events. On the home-app side, send_to_plugin_runner is a blocking call, and we're funneling more and more activity through that function as we add more event hooks. So you'd see home-app affected when it's unable to receive responses from the plugin runner.

It's worth mentioning that if we were using async Redis, and Redis were overburdened, we would still see problems. They would probably just manifest differently. Tasks would still have trouble running, but the reason would be due to Redis directly, rather than being blocked by other tasks. i.e. being blocked indirectly by Redis.

When you introduce blocking calls in an asyncio project, it can be a chokepoint for everything. Even if Redis isn't overburdened, the Plugin Runner main event loop can't do anything else for the few milliseconds that it is communicating with the sync Redis client.

There are a few things we could do.

  • As has been discussed, we could get rid of asyncio (at least for now).
  • Developers need a sync logger that publishes to Redis, so we can't touch that, but we could make an async logger for the Plugin Runner to use.
  • Redis timeout values and error handling would be another thing to look at. In Fumage, errors with the Redis token introspection cache do not impede the ability of the API to continue servicing requests.

@csande While this scenario is theoretically possible, I believe it would be one of the lesser concerns in practice. If our Redis instance were overburdened, many other components in home-app that rely on it would likely be impacted as well, not just the Plugin Runner.

That said, you’re absolutely right in pointing out that redis.publish is a blocking call, which could momentarily hold up the main event loop. However, this shouldn't significantly degrade plugin performance. To mitigate this, I implemented a hybrid approach: if an event loop is already running, we use the asynchronous publish; otherwise, we fall back to the synchronous version. Take a look at the implementation and let me know your thoughts. #381

@csande
Copy link
Contributor

csande commented Jan 31, 2025

Would the problem manifest if Redis pubsub became overburdened and was slow to receive and acknowledge requests, thereby blocking other tasks? TBH it didn’t even register with me that logs of these administrative actions would get routed through Redis. Do developers really need to see this activity?

On Jan 30, 2025, at 9:15 AM, Christopher Sande @.> wrote: @csande commented on this pull request. In plugin_runner/plugin_runner.py <#333 (comment)>: > @@ -192,14 +198,50 @@ async def ReloadPlugins( self, request: ReloadPluginsRequest, context: Any ) -> AsyncGenerator[ReloadPluginsResponse, None]: """This is invoked when we need to reload plugins.""" + log.info("Reloading plugins...") Is this statement a blocking call, in addition to the other calls to the logger? If you dig into our logging module, it looks like the PubSubBase class uses the non-asyncio version of Redis. FWIW, we're also calling it a few times in HandleEvent, another asyncio context. — Reply to this email directly, view it on GitHub <#333 (review)>, or unsubscribe https://github.com/notifications/unsubscribe-auth/AALB6UCTRUAATKOPY6KEUET2NJM4NAVCNFSM6AAAAABVIVS4ZWVHI2DSMVQWIX3LMV43YUDVNRWFEZLROVSXG5CSMV3GSZLXHMZDKOBUGUZTOOJTHE. You are receiving this because you were mentioned.
-- Disclaimer: This e-mail and any attachments may contain confidential information. If you are not the intended recipient, any disclosure, copying, distribution or use of any information contained herein is strictly prohibited. If you have received this transmission in error, please immediately notify the Security Officer _
@**
.**_>, and destroy the original transmission and any attachments without reading or saving.

I suppose this is theoretical until we see it happen, but if Redis were overburdened, it could block the main event loop. If the main event loop is blocked, then that would affect the ability of the plugin runner to continue handling incoming events. On the home-app side, send_to_plugin_runner is a blocking call, and we're funneling more and more activity through that function as we add more event hooks. So you'd see home-app affected when it's unable to receive responses from the plugin runner.
It's worth mentioning that if we were using async Redis, and Redis were overburdened, we would still see problems. They would probably just manifest differently. Tasks would still have trouble running, but the reason would be due to Redis directly, rather than being blocked by other tasks. i.e. being blocked indirectly by Redis.
When you introduce blocking calls in an asyncio project, it can be a chokepoint for everything. Even if Redis isn't overburdened, the Plugin Runner main event loop can't do anything else for the few milliseconds that it is communicating with the sync Redis client.
There are a few things we could do.

  • As has been discussed, we could get rid of asyncio (at least for now).
  • Developers need a sync logger that publishes to Redis, so we can't touch that, but we could make an async logger for the Plugin Runner to use.
  • Redis timeout values and error handling would be another thing to look at. In Fumage, errors with the Redis token introspection cache do not impede the ability of the API to continue servicing requests.

@csande While this scenario is theoretically possible, I believe it would be one of the lesser concerns in practice. If our Redis instance were overburdened, many other components in home-app that rely on it would likely be impacted as well, not just the Plugin Runner.

That said, you’re absolutely right in pointing out that redis.publish is a blocking call, which could momentarily hold up the main event loop. However, this shouldn't significantly degrade plugin performance. To mitigate this, I implemented a hybrid approach: if an event loop is already running, we use the asynchronous publish; otherwise, we fall back to the synchronous version. Take a look at the implementation and let me know your thoughts. #381

Thank you for taking a look at this @jamagalhaes. I'm in agreement that this is theoretical and not a huge concern right now, but it's certainly a core tenet (i.e. this isn't premature optimization) to not make blocking calls in an asyncio framework. I don't know about our plans for scaling in the longer-term, but if we want the plugin runner to handle hundreds or thousands of events at once, those blocking calls to even a non-overburdened Redis will eventually add up, and the performance impact may be noticeable at some point.

@jamagalhaes jamagalhaes mentioned this pull request Jan 31, 2025
@djantzen djantzen merged commit ab576f2 into main Feb 2, 2025
5 checks passed
@djantzen djantzen deleted the fix/embed-synchronizer branch February 2, 2025 18:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants