Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FixedBufPool::next #199

Merged
merged 13 commits into from
Feb 16, 2023
Merged

Conversation

mzabaluev
Copy link
Contributor

Add an async API to get buffers out of FixedBufPool. This is another way in which FixedBufPool will be more useful than FixedBufRegistry.

Also reorganize the shared data structures of FixedBufPool and FixedBufRegistry into separate modules for better readability and future code reuse.

Move the Inner types of FixedBufRegistry and FixedBufPool
into a plumbing module and expose them within buf::fixed,
to allow code reuse with the upcoming thread-safe collections.
Enable waiting for fixed buffers to become available asynchronously.
The async-friendly wrapper over FixedBufPool::poll_next.
Also document the method and the functionality it enables for
FixedBufPool.

buf::fixed::pool is now a public module and hosts the Next future.
@Noah-Kennedy Noah-Kennedy marked this pull request as ready for review December 6, 2022 01:35
@Noah-Kennedy
Copy link
Contributor

ugh, I did not mean to remove the draft status

@Noah-Kennedy Noah-Kennedy marked this pull request as draft December 6, 2022 01:36
src/buf/fixed/pool.rs Outdated Show resolved Hide resolved
Copy link
Collaborator

@FrankReh FrankReh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice.

src/buf/fixed/plumbing/pool.rs Outdated Show resolved Hide resolved
src/buf/fixed/plumbing/pool.rs Outdated Show resolved Hide resolved
@mzabaluev mzabaluev marked this pull request as ready for review December 6, 2022 08:41
Copy link
Collaborator

@FrankReh FrankReh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The direction is great. Please consider my two outstanding questions. Thank you.

src/buf/fixed/plumbing/pool.rs Outdated Show resolved Hide resolved
The wakeup mechanism is now based on tokio::sync::Notify
and wakes up a single task pending for the checked-in capacity
buffer every time.
Remove FixedBufPool::poll_next and the named Next future.
These could not be rewritten with the Notified future without
replicating some of the unsafe magic.
In the future, TAIT can resurrect the Next named type.
})
}

pub(crate) fn notify_on_next(&mut self, cap: usize) -> Arc<Notify> {
Copy link
Contributor Author

@mzabaluev mzabaluev Dec 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Arc is not necessary for the single-threaded pool, but I'm going to reuse this for the thread-safe implementation. Notify contains a mutex and uses atomic pointers for tracking waiters, so it's going to be slow regardless when we hit the waiting branch in next. Performance of next in the depleted pool case is not high on my current priority list for micro-optimizations.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we are left without a simple/fast pool, that doesn't allocate, for the current-thread case. The case I care about. But all is not lost, once the API allows us to provide our own pool.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd just like to clarify my understanding here: once a notify has been registered for a capacity, then every time a buffer of that capacity is commited, notify_one() is called (because the notify_next_by_cap entry is never removed)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could be improved by keeping a (usize, Notify) instead, to explicitly keep a count for the number of waiting tasks, and removing the entry from the hashset once count == zero. That way, you only pay the SeqCst cost of calling notify() when you need to.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not saying this is needed, I will let @mzabaluev set the path on this one. But I would prefer something that had a little hysteresis to it. Like removing the Notify if the pool size reached something like 10 buffers again. (But I have no idea why I would pick 10. Just something larger than 0.) But I haven't tried to design anything around this idea so feel free to disregard entirely.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern is that with the current design, once try_next() has failed once in the next() method. every checking from then on for that buffer size is going to issue a SeqCst memory barrier with the call to notify_one(). Thats quite a penalty. Its unavoidable with the surrent Notify, which is designed for Send operations, but we could at least limit it to when there is actually a consumer waiting.

Copy link
Collaborator

@FrankReh FrankReh Jan 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ollie-etl With your idea, I don't see how the usize field is decremented properly? In the case of a future being cancelled, there is no future to do the work of updating the count (from my understanding).

And along my idea of not keeping a count but removing once there are some number of buffers back in the pool, I don't see an elegant way of knowing whether the waiters linked list is empty or not. There is no public API.

Copy link
Contributor

@ollie-etl ollie-etl Jan 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'd need to return a struct Next(usize, Notify) wrapper, implment Future,and decrement on drop

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, that's much more like @mzabaluev 's original idea I think, of returning a future that does work when it is polled, and does work when it is dropped. But using Notify.

let cloned_file = file.try_clone().unwrap();

let handle = tokio_uring::spawn(async move {
let file = File::from_std(cloned_file);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be a way to share File and socket objects between tasks, since the descriptor is already in a SharedFd internally.

Copy link
Collaborator

@FrankReh FrankReh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not going to check the unit tests tonight. But the use of Notify looks very nice. That is one powerful waiting mechanism Tokio has to offer. I ended up liking this implementation more than I thought I would because the Notify does all the heavy lifting of avoiding the pitfall of dropping an awake call. And as result, all the waiters don't have to be awoken to be on the safe side. The Notify implementation makes it safe to begin with, hence notify_one can be used very effectively.

})
}

pub(crate) fn notify_on_next(&mut self, cap: usize) -> Arc<Notify> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we are left without a simple/fast pool, that doesn't allocate, for the current-thread case. The case I care about. But all is not lost, once the API allows us to provide our own pool.

src/buf/fixed/plumbing/pool.rs Show resolved Hide resolved
src/buf/fixed/plumbing/pool.rs Outdated Show resolved Hide resolved
src/buf/fixed/pool.rs Show resolved Hide resolved
src/buf/fixed/plumbing/pool.rs Show resolved Hide resolved
src/buf/fixed/pool.rs Show resolved Hide resolved
src/buf/fixed/pool.rs Outdated Show resolved Hide resolved
@mzabaluev
Copy link
Contributor Author

@FrankReh

But we are left without a simple/fast pool, that doesn't allocate, for the current-thread case.

While the pool is not exhausted, tasks call next at near zero cost with no allocations.

@FrankReh
Copy link
Collaborator

FrankReh commented Dec 23, 2022

@mzabaluev

While the pool is not exhausted, tasks call next at near zero cost with no allocations.

That's a very good point. I think I overlooked that while digging into the weeds of the implementation.

@ollie-etl
Copy link
Contributor

ollie-etl commented Jan 19, 2023

@Noah-Kennedy @mzabaluev @FrankReh Is something blocking this? It looks like all points have been resolved

@Noah-Kennedy
Copy link
Contributor

You don't need my approval here. I'm alright with @FrankReh signing off on this and merging it whenever he feels it is ready.

@oliverbunting
Copy link

@mzabaluev

While the pool is not exhausted, tasks call next at near zero cost with no allocations.

That's a very good point. I think I overlooked that while digging into the weeds of the implementation.

Is this not a common case though. I.e, to keep some pipeline of tasks fully utilized, do work whenever there is a buffer available?

@mzabaluev
Copy link
Contributor Author

Is this not a common case though. I.e, to keep some pipeline of tasks fully utilized, do work whenever there is a buffer available?

Depends if the task creation is only throttled by availability of the buffers.

@FrankReh
Copy link
Collaborator

Is this not a common case though. I.e, to keep some pipeline of tasks fully utilized, do work whenever there is a buffer available?

Depends if the task creation is only throttled by availability of the buffers.

Having a limit to the number of buffers that the app would use seems like a good feature and I like the ability to await a next one but I hadn't thought it would be the primary throttler. I also like that a tokio scheduling feature was found to solve this problem because it's more likely to find acceptance by the tokio group.

I had thought I would try my hackier approach of using the copy of the tokio linked list once we have an API for substituting another pool but that was so long ago, I'm not sure it covered an important corner case I appreciated later. The current design even covers the case of a task being made ready to run but then being canceled before the resource is used and I think when select! blocks with timers are used, there will be many cases where a task is cancelled in its tracks.

@FrankReh
Copy link
Collaborator

FrankReh commented Feb 15, 2023

I'm going to merge later tonight so it makes it into the next release. This is some really great work that we want in the repo but I had to do a manual merge on four files because code moved and code changed at the same time.

Actually the condensed diff, for the last commit, that this page makes available isn't too bad to read. Most of the changes are straight forward.

I guess the drop code, and the intra-doc links are where there could be problems so another pair of eyes or two or three wouldn't hurt. It compiles cleanly, and there is a nice unit test that passes. But rarely do our unit tests check that things were dropped properly so that's why looking at the drop again would make sense.

But also, nothing we can't change yet again with more iterations.

@FrankReh FrankReh merged commit 1205af9 into tokio-rs:master Feb 16, 2023
@FrankReh
Copy link
Collaborator

Thank you @mzabaluev .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants