-
Notifications
You must be signed in to change notification settings - Fork 128
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
driver: improve cqueue dispatch #152
base: master
Are you sure you want to change the base?
Conversation
This PR makes a few changes: - It changes our ticking behavior to loop on a full cqueue to ensure that we actually fully clear out the queue in the event of an overflow. - It adds an optimization to our parking routine which allows us to potentially wake tasks instead of parking on epoll. - It makes our parking routines attempt to tick futures if we hit EBUSY
We should not merge this until we have a repro of #147. |
Okay, you have my attention. I have a reproducer. I'll try it out. I can also share the reproducer with you if you want to try it out yourself, along with all those new flames. My reproducer only worked a little better. It still gets hung. I just thought of trying my repro in my fork where I had worked on overflow ... and that fork doesn't have the no_op operation in it. So I think I want to wait for tomorrow after all. |
Here's my reproducer if you feel like trying it yourself. examples $ cat test_hang_on_small_completion_queue.rs
use std::io;
use clap::Parser;
use tokio::sync::mpsc;
#[derive(Debug, Parser)]
#[command(about = "Serve TCP connections, testing the buf_ring feature. Client can be 'nc <ip-addr> <port>'.", long_about = None)]
struct Args {
/// Spawn count, how many no_op tasks to spawn consecutively
#[arg(long, default_value_t = 16)]
spawn_cnt: usize,
/// Squeue entries
#[arg(long, default_value_t = 4)]
squeue_entries: u32,
/// Cqueue entries
#[arg(long, default_value_t = 0)]
cqueue_entries: u32,
}
fn main() -> io::Result<()> {
let args = Args::parse();
let spawn_cnt = args.spawn_cnt;
let squeue_entries = args.squeue_entries;
let cqueue_entries = std::cmp::max(args.cqueue_entries, 2* squeue_entries);
tokio_uring::builder()
.entries(squeue_entries)
.uring_builder(tokio_uring::uring_builder().setup_cqsize(cqueue_entries))
.start( async move {
let (tx, mut rx) = mpsc::channel(2*spawn_cnt);
for i in 0..spawn_cnt {
let tx = tx.clone();
tokio_uring::spawn(async move {
tokio_uring::no_op().await.unwrap();
tx.send(i).await.unwrap();
Ok::<(), io::Error>(())
});
}
while let Some(i) = rx.recv().await {
println!("got = {}", i);
}
});
Ok(())
} |
And ran it with ' --spawn-cnt 100 --squeue-entries 2' the last when it didn't work. It didn't print out all 100 lines. |
It could be that when I was working on this, last month I think, I had to change my version of io-uring too, and without proof of whether it was needed or not, I didn't push to get it upstreamed yet. The crux is the uring interface won't push its overflow to the cqueue unless IORING_ENTER_GETEVENTS is set in the call to enter. I don't think the api let's you do that from tokio-uring manually. Here's the basic diff. By having io-uring also check for the overflow bit, it knows when it should set the bit, otherwise since we don't want more than 0, that bit doesn't get set. Before
pub fn submit_and_wait(&self, want: usize) -> io::Result<usize> {
let len = self.sq_len();
let mut flags = 0;
if want > 0 || self.params.is_setup_iopoll() {
flags |= sys::IORING_ENTER_GETEVENTS;
}
After
pub fn submit_and_wait(&self, want: usize) -> io::Result<usize> {
let len = self.sq_len();
let mut flags = 0;
// TODO is this the right way to handle the overflow, from this crate's perspective?
// Will need to ask the maintainer.
// Was
// if want > 0 || self.params.is_setup_iopoll() {..}
if want > 0 || self.params.is_setup_iopoll() || self.sq_cq_overflow() {
flags |= sys::IORING_ENTER_GETEVENTS;
}
And sq_cq_overflow is defined like this in the same impl that has sq_need_wakeup:
fn sq_cq_overflow(&self) -> bool {
unsafe {
atomic::fence(atomic::Ordering::SeqCst); // Refer to comment in squeue.rs for same code
(*self.sq_flags).load(atomic::Ordering::Acquire) & sys::IORING_SQ_CQ_OVERFLOW != 0
}
} |
@FrankReh I've slightly modified it and added it as a test. |
@Noah-Kennedy That's nice. We can tweak the numbers. Something a little smaller or perhaps larger, if it is truly fast enough once it works. If we can make it gracefully timeout and report an error, that would be more user friendly. Maybe the std test timeout is short enough, and it keeps the logic simple. Funny, the ci/test is still running. Guess it is hung. |
Simply increase the concurreny factor in the benchmark past the size of the cqe. |
@ollie-etl we've got a test case in there now which repros. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just reviewing the unit test.
I haven't looked at the rest of the changes to tick as I thought getting the hang fixed could be done without any of these changes first.
Can this wait until the older PRs are in now? I don't believe this PR is a blocker for other PRs or issues. I would like to participate in looking at benchmarks once the benchmarking PR is in and once the older PRs are handled. |
This can absolutely wait |
Fixes #147.
This PR makes a few changes: