Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Semaphore uses worker comm pool #4195

Merged
merged 3 commits into from
Jan 20, 2021
Merged

Conversation

fjetter
Copy link
Member

@fjetter fjetter commented Oct 28, 2020

This decouples the lifetime of a semaphore object completely from the Clients and reuses the worker connection pools, if available.

For largish clusters (>>100) I've observed the many worker clients we spawn here to put a lot of stress onto the scheduler. Haven't debugged this further, but generally speaking the system is not well tuned for having many clients (no adaptive heartbeats and a lot of administrative stuff via direct comms instead of streams).

Before this change my scheduler was effectively at twice the CPU utilisation when using semaphores opposed to not using any for the same amount of work being done. With this change, the scheduler is hardly impacted at all by using semaphores.

The coupling to the client instances was initially introduced since I wanted to align the implementation with the existing synchronization objects (e.g. Lock) but I guess they will share the same problem. On top of this, the initial implementation was using the client heartbeat itself for lease lifecycle management but we no longer need this coupling.

The downside of this approach is that we require a bit of code duplication w.r.t. the Client.sync method since we can no longer re-use this. I am not certain if this poses any issues (e.g. because it is a frequent source of bugs, etc.)

else:
return sync(
self.io_loop, func, *args, callback_timeout=callback_timeout, **kwargs
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we might want to replicate this for other objects then maybe we should add this method to the worker?

It can probably also be much simpler on the worker because there are only two cases:

  1. On the event loop (easy to check)
  2. Inside of the ThreadPoolExecutor

Although of course the current implementation also probably works ok

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The API of this class still feels a bit awkward, especially in the __init__ with the Client/Worker duality. I needed to introduce this to allow for the construction of this object on client side. In these cases I would not have access to an actual worker and I could not really use this method then if it was part of the Worker.

If I was certain to be on a worker, I might even go for the Worker.batched_stream instead of using dedicated connections removing the need to deal with async comm on this layer.

However, unless we can find a nice way to construct this object safely on client side, I think it is easier to keep as is. While I don't think this would need to be an actual functioning instance on the client side, implementing this might just introduce more complexity

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The API of this class still feels a bit awkward, especially in the init with the Client/Worker duality. I needed to introduce this to allow for the construction of this object on client side. In these cases I would not have access to an actual worker and I could not really use this method then if it was part of the Worker.

I wonder if we can make the client and worker look similar enough that we can use them interchangably here. Adding a sync method to the worker is a good step in that direction.

Then we might be able to do things like

class Semaphore:
    def __init__(self, ...):
        self.server = get_server()  # client or worker, whichever is around

If I was certain to be on a worker, I might even go for the Worker.batched_stream instead of using dedicated connections removing the need to deal with async comm on this layer.

The scheduler also has a BatchedSend object, but it's called self.scheduler_comm. Maybe we should have an alias so that these are the same across the Client and Worker

class Worker:
    @property
    def scheduler_comm(self):
        return self.batched_stream

(or we could rename batched_stream to scheduler_comm directly.

@mrocklin
Copy link
Member

In principle this seems fine to me. Thank you for continuing to own this code @fjetter

@fjetter
Copy link
Member Author

fjetter commented Jan 20, 2021

Sorry for letting this sit for so long. I had another look at the implementation and chose not to implement the suggestions

  1. I did not want to add the sync method to Worker as long as it is not used anywhere. Somehow this feels like unnecessary complexity. In particular the client doesn't feel like a "server" and making them look alike is good for this application but not necessarily for others. If this is helpful somewhere else, we can still refactor.
  2. The scheduler_comm would be a nice change since we could move all refreshs to the stream but I feel this would increase complexity which would only be justified if the refreshs pose a performance problem which doesn't look like it as long as we're using pooled comms. Also, I like that we can currently pass the pool directly to the instance since this allows for some easier resilience testing (broken/flaky comms, etc.).

If there are no new comments by tomorrow I would go ahead and merge this, especially since there is a flaky test which would be resolved by this change (xref #4256)

@mrocklin mrocklin merged commit e736c0b into dask:master Jan 20, 2021
@fjetter
Copy link
Member Author

fjetter commented Jan 20, 2021

Thanks for merging

@fjetter fjetter deleted the semaphore_worker_pool branch January 20, 2021 15:51
@mrocklin
Copy link
Member

Thanks for fixing

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants