-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Semaphore uses worker comm pool #4195
Conversation
else: | ||
return sync( | ||
self.io_loop, func, *args, callback_timeout=callback_timeout, **kwargs | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we might want to replicate this for other objects then maybe we should add this method to the worker?
It can probably also be much simpler on the worker because there are only two cases:
- On the event loop (easy to check)
- Inside of the ThreadPoolExecutor
Although of course the current implementation also probably works ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The API of this class still feels a bit awkward, especially in the __init__
with the Client/Worker duality. I needed to introduce this to allow for the construction of this object on client side. In these cases I would not have access to an actual worker and I could not really use this method then if it was part of the Worker.
If I was certain to be on a worker, I might even go for the Worker.batched_stream
instead of using dedicated connections removing the need to deal with async comm on this layer.
However, unless we can find a nice way to construct this object safely on client side, I think it is easier to keep as is. While I don't think this would need to be an actual functioning instance on the client side, implementing this might just introduce more complexity
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The API of this class still feels a bit awkward, especially in the init with the Client/Worker duality. I needed to introduce this to allow for the construction of this object on client side. In these cases I would not have access to an actual worker and I could not really use this method then if it was part of the Worker.
I wonder if we can make the client and worker look similar enough that we can use them interchangably here. Adding a sync
method to the worker is a good step in that direction.
Then we might be able to do things like
class Semaphore:
def __init__(self, ...):
self.server = get_server() # client or worker, whichever is around
If I was certain to be on a worker, I might even go for the Worker.batched_stream instead of using dedicated connections removing the need to deal with async comm on this layer.
The scheduler also has a BatchedSend object, but it's called self.scheduler_comm
. Maybe we should have an alias so that these are the same across the Client and Worker
class Worker:
@property
def scheduler_comm(self):
return self.batched_stream
(or we could rename batched_stream
to scheduler_comm
directly.
In principle this seems fine to me. Thank you for continuing to own this code @fjetter |
d71f505
to
b2f3667
Compare
Sorry for letting this sit for so long. I had another look at the implementation and chose not to implement the suggestions
If there are no new comments by tomorrow I would go ahead and merge this, especially since there is a flaky test which would be resolved by this change (xref #4256) |
Thanks for merging |
Thanks for fixing |
This decouples the lifetime of a semaphore object completely from the Clients and reuses the worker connection pools, if available.
For largish clusters (>>100) I've observed the many worker clients we spawn here to put a lot of stress onto the scheduler. Haven't debugged this further, but generally speaking the system is not well tuned for having many clients (no adaptive heartbeats and a lot of administrative stuff via direct comms instead of streams).
Before this change my scheduler was effectively at twice the CPU utilisation when using semaphores opposed to not using any for the same amount of work being done. With this change, the scheduler is hardly impacted at all by using semaphores.
The coupling to the client instances was initially introduced since I wanted to align the implementation with the existing synchronization objects (e.g. Lock) but I guess they will share the same problem. On top of this, the initial implementation was using the client heartbeat itself for lease lifecycle management but we no longer need this coupling.
The downside of this approach is that we require a bit of code duplication w.r.t. the
Client.sync
method since we can no longer re-use this. I am not certain if this poses any issues (e.g. because it is a frequent source of bugs, etc.)