-
Notifications
You must be signed in to change notification settings - Fork 127
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FixedBufPool::next
#199
FixedBufPool::next
#199
Conversation
Move the Inner types of FixedBufRegistry and FixedBufPool into a plumbing module and expose them within buf::fixed, to allow code reuse with the upcoming thread-safe collections.
Enable waiting for fixed buffers to become available asynchronously.
The async-friendly wrapper over FixedBufPool::poll_next. Also document the method and the functionality it enables for FixedBufPool. buf::fixed::pool is now a public module and hosts the Next future.
ugh, I did not mean to remove the draft status |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The direction is great. Please consider my two outstanding questions. Thank you.
The wakeup mechanism is now based on tokio::sync::Notify and wakes up a single task pending for the checked-in capacity buffer every time. Remove FixedBufPool::poll_next and the named Next future. These could not be rewritten with the Notified future without replicating some of the unsafe magic. In the future, TAIT can resurrect the Next named type.
Thanks clippy.
}) | ||
} | ||
|
||
pub(crate) fn notify_on_next(&mut self, cap: usize) -> Arc<Notify> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Arc
is not necessary for the single-threaded pool, but I'm going to reuse this for the thread-safe implementation. Notify
contains a mutex and uses atomic pointers for tracking waiters, so it's going to be slow regardless when we hit the waiting branch in next
. Performance of next
in the depleted pool case is not high on my current priority list for micro-optimizations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But we are left without a simple/fast pool, that doesn't allocate, for the current-thread case. The case I care about. But all is not lost, once the API allows us to provide our own pool.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd just like to clarify my understanding here: once a notify has been registered for a capacity, then every time a buffer of that capacity is commited, notify_one() is called (because the notify_next_by_cap entry is never removed)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this could be improved by keeping a (usize, Notify)
instead, to explicitly keep a count for the number of waiting tasks, and removing the entry from the hashset once count == zero. That way, you only pay the SeqCst
cost of calling notify()
when you need to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not saying this is needed, I will let @mzabaluev set the path on this one. But I would prefer something that had a little hysteresis to it. Like removing the Notify if the pool size reached something like 10 buffers again. (But I have no idea why I would pick 10. Just something larger than 0.) But I haven't tried to design anything around this idea so feel free to disregard entirely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My concern is that with the current design, once try_next()
has failed once in the next()
method. every checking from then on for that buffer size is going to issue a SeqCst
memory barrier with the call to notify_one()
. Thats quite a penalty. Its unavoidable with the surrent Notify, which is designed for Send
operations, but we could at least limit it to when there is actually a consumer waiting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ollie-etl With your idea, I don't see how the usize field is decremented properly? In the case of a future being cancelled, there is no future to do the work of updating the count (from my understanding).
And along my idea of not keeping a count but removing once there are some number of buffers back in the pool, I don't see an elegant way of knowing whether the waiters linked list is empty or not. There is no public API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You'd need to return a struct Next(usize, Notify)
wrapper, implment Future,and decrement on drop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, that's much more like @mzabaluev 's original idea I think, of returning a future that does work when it is polled, and does work when it is dropped. But using Notify.
let cloned_file = file.try_clone().unwrap(); | ||
|
||
let handle = tokio_uring::spawn(async move { | ||
let file = File::from_std(cloned_file); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There should be a way to share File
and socket objects between tasks, since the descriptor is already in a SharedFd
internally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not going to check the unit tests tonight. But the use of Notify looks very nice. That is one powerful waiting
mechanism Tokio
has to offer. I ended up liking this implementation more than I thought I would because the Notify does all the heavy lifting of avoiding the pitfall of dropping an awake call. And as result, all the waiters don't have to be awoken to be on the safe side. The Notify
implementation makes it safe to begin with, hence notify_one
can be used very effectively.
}) | ||
} | ||
|
||
pub(crate) fn notify_on_next(&mut self, cap: usize) -> Arc<Notify> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But we are left without a simple/fast pool, that doesn't allocate, for the current-thread case. The case I care about. But all is not lost, once the API allows us to provide our own pool.
While the pool is not exhausted, tasks call |
That's a very good point. I think I overlooked that while digging into the weeds of the implementation. |
@Noah-Kennedy @mzabaluev @FrankReh Is something blocking this? It looks like all points have been resolved |
You don't need my approval here. I'm alright with @FrankReh signing off on this and merging it whenever he feels it is ready. |
Is this not a common case though. I.e, to keep some pipeline of tasks fully utilized, do work whenever there is a buffer available? |
Depends if the task creation is only throttled by availability of the buffers. |
Having a limit to the number of buffers that the app would use seems like a good feature and I like the ability to await a I had thought I would try my hackier approach of using the copy of the tokio linked list once we have an API for substituting another pool but that was so long ago, I'm not sure it covered an important corner case I appreciated later. The current design even covers the case of a task being made ready to run but then being canceled before the resource is used and I think when select! blocks with timers are used, there will be many cases where a task is cancelled in its tracks. |
I'm going to merge later tonight so it makes it into the next release. This is some really great work that we want in the repo but I had to do a manual merge on four files because code moved and code changed at the same time. Actually the condensed diff, for the last commit, that this page makes available isn't too bad to read. Most of the changes are straight forward. I guess the drop code, and the intra-doc links are where there could be problems so another pair of eyes or two or three wouldn't hurt. It compiles cleanly, and there is a nice unit test that passes. But rarely do our unit tests check that things were dropped properly so that's why looking at the drop again would make sense. But also, nothing we can't change yet again with more iterations. |
Thank you @mzabaluev . |
Add an async API to get buffers out of
FixedBufPool
. This is another way in whichFixedBufPool
will be more useful thanFixedBufRegistry
.Also reorganize the shared data structures of
FixedBufPool
andFixedBufRegistry
into separate modules for better readability and future code reuse.