Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Mio to 0.7 #1767

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
90f8ebc
Need to resolve Poll::poll taking &mut self
kleimkuhler Nov 7, 2019
f4cd344
Tests passing; lots of cleanup
kleimkuhler Nov 7, 2019
4f912f1
Documentation and cleanup needed
kleimkuhler Nov 7, 2019
88dab3e
Naming and docs
kleimkuhler Nov 7, 2019
1a2a5a9
Remove platform specific event handling
kleimkuhler Nov 7, 2019
3c20f33
Remove old code
kleimkuhler Nov 7, 2019
b256539
Add additional comments and cleanup
kleimkuhler Nov 8, 2019
7141696
Remove mio-uds from Cargo.toml
kleimkuhler Nov 10, 2019
381ebbd
Depend on Mio fix
kleimkuhler Nov 12, 2019
1e6caeb
Fix readiness checks for process and handle interruption
kleimkuhler Nov 12, 2019
5052bfa
Add Readiness documentation
kleimkuhler Nov 12, 2019
01841a3
Stop point for draft PR
kleimkuhler Nov 12, 2019
c9177e7
Change windows process to use IoResource
kleimkuhler Nov 12, 2019
d0391e3
Clippy and use mio master
kleimkuhler Nov 12, 2019
e0000e9
send_framed is flakey on master as well
kleimkuhler Nov 12, 2019
43b885e
Remove PollEvented references
kleimkuhler Nov 13, 2019
b0de5ec
Make Readiness functions associated consts
kleimkuhler Nov 13, 2019
79586ec
Fix doc example
kleimkuhler Nov 13, 2019
b3b4b04
Misc edits
kleimkuhler Nov 13, 2019
16ab873
Remove unnecessary readiness
kleimkuhler Nov 13, 2019
78c17df
Remove ready import after rebase
kleimkuhler Nov 18, 2019
bc695c4
Merge remote-tracking branch 'upstream/master' into kleimkuhler/mio-u…
kleimkuhler Nov 27, 2019
ff08bef
Merge remote-tracking branch 'upstream/master' into kleimkuhler/mio-u…
kleimkuhler Dec 2, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ signal = [
"io-driver",
"lazy_static",
"libc",
"mio-uds",
"signal-hook-registry",
"winapi/consoleapi",
"winapi/minwindef",
Expand All @@ -88,8 +87,7 @@ test-util = []
tcp = ["io-driver"]
time = ["slab"]
udp = ["io-driver"]
uds = ["io-driver", "mio-uds", "libc"]

uds = ["io-driver", "libc"]

[dependencies]
tokio-macros = { version = "0.2.0", optional = true }
Expand All @@ -102,13 +100,12 @@ fnv = { version = "1.0.6", optional = true }
futures-core = { version = "0.3.0", optional = true }
lazy_static = { version = "1.0.2", optional = true }
memchr = { version = "2.2", optional = true }
mio = { version = "0.6.20", optional = true }
mio = { git = "https://github.com/tokio-rs/mio", branch = "master", optional = true }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might want to fix to a specific commit to avoid breaking changes.

num_cpus = { version = "1.8.0", optional = true }
# Backs `DelayQueue`
slab = { version = "0.4.1", optional = true }

[target.'cfg(unix)'.dependencies]
mio-uds = { version = "0.6.5", optional = true }
libc = { version = "0.2.42", optional = true }
signal-hook-registry = { version = "1.1.1", optional = true }

Expand Down
148 changes: 72 additions & 76 deletions tokio/src/io/driver/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
pub(crate) mod platform;

mod scheduled_io;
pub(crate) use scheduled_io::ScheduledIo; // pub(crate) for tests

use crate::loom::sync::atomic::AtomicUsize;
use crate::park::{Park, Unpark};
use crate::util::slab::{Address, Slab};

use mio::event::Evented;
use super::Readiness;
use mio;
use std::cell::RefCell;
use std::fmt;
use std::io;
use std::marker::PhantomData;
use std::sync::{Arc, Weak};
use std::sync::atomic::Ordering::SeqCst;
use std::sync::{Arc, Weak};
use std::task::Waker;
use std::time::Duration;

Expand All @@ -22,10 +21,11 @@ pub(crate) struct Driver {
/// Reuse the `mio::Events` value across calls to poll.
events: mio::Events,

/// The underlying system event queue.
poll: mio::Poll,

/// State shared between the reactor and the handles.
inner: Arc<Inner>,

_wakeup_registration: mio::Registration,
}

/// A reference to an I/O driver
Expand All @@ -35,17 +35,19 @@ pub(crate) struct Handle {
}

pub(super) struct Inner {
/// The underlying system event queue.
io: mio::Poll,
/// Registers I/O resources.
registry: mio::Registry,

/// Dispatch slabs for I/O and futures events
pub(super) io_dispatch: Slab<ScheduledIo>,

/// The number of sources in `io_dispatch`.
n_sources: AtomicUsize,

/// Used to wake up the reactor from a call to `turn`
wakeup: mio::SetReadiness,
/// Used to wake up the reactor from a call to [`turn`].
///
/// [`turn`]: #method.turn
waker: mio::Waker,
}

#[derive(Debug, Eq, PartialEq, Clone, Copy)]
Expand Down Expand Up @@ -107,24 +109,18 @@ impl Driver {
/// Creates a new event loop, returning any error that happened during the
/// creation.
pub(crate) fn new() -> io::Result<Driver> {
let io = mio::Poll::new()?;
let wakeup_pair = mio::Registration::new2();

io.register(
&wakeup_pair.0,
TOKEN_WAKEUP,
mio::Ready::readable(),
mio::PollOpt::level(),
)?;
let poll = mio::Poll::new()?;
let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?;
let registry = poll.registry().try_clone()?;

Ok(Driver {
events: mio::Events::with_capacity(1024),
_wakeup_registration: wakeup_pair.0,
poll,
inner: Arc::new(Inner {
io,
registry,
io_dispatch: Slab::new(),
n_sources: AtomicUsize::new(0),
wakeup: wakeup_pair.1,
waker,
}),
})
}
Expand All @@ -144,30 +140,30 @@ impl Driver {
fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<()> {
// Block waiting for an event to happen, peeling out how many events
// happened.
match self.inner.io.poll(&mut self.events, max_wait) {
//
// TODO: Should this loop?
match self.poll.poll(&mut self.events, max_wait) {
Ok(_) => {}
Err(e) => return Err(e),
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
Err(e) => {
return Err(e);
}
}

// Process all the events that came in, dispatching appropriately

for event in self.events.iter() {
let token = event.token();

if token == TOKEN_WAKEUP {
self.inner
.wakeup
.set_readiness(mio::Ready::empty())
.unwrap();
self.inner.waker.wake()?;
} else {
self.dispatch(token, event.readiness());
self.dispatch(token, event);
}
}

Ok(())
}

fn dispatch(&self, token: mio::Token, ready: mio::Ready) {
fn dispatch(&self, token: mio::Token, event: &mio::event::Event) {
let mut rd = None;
let mut wr = None;

Expand All @@ -178,19 +174,33 @@ impl Driver {
None => return,
};

let mut readiness = Readiness::EMPTY;
if event.is_readable() {
readiness |= Readiness::READABLE
}
if event.is_writable() {
readiness |= Readiness::WRITABLE
}
if event.is_read_closed() {
readiness |= Readiness::READ_CLOSED
}
if event.is_write_closed() {
readiness |= Readiness::WRITE_CLOSED
}

if io
.set_readiness(address, |curr| curr | ready.as_usize())
.set_readiness(address, |curr| curr | readiness.as_usize())
.is_err()
{
// token no longer valid!
return;
}

if ready.is_writable() || platform::is_hup(ready) {
if readiness.is_writable() || readiness.is_write_closed() {
wr = io.writer.take_waker();
}

if !(ready & (!mio::Ready::writable())).is_empty() {
if readiness.is_readable() || readiness.is_read_closed() {
rd = io.reader.take_waker();
}

Expand Down Expand Up @@ -255,7 +265,7 @@ impl Handle {
/// return immediately.
fn wakeup(&self) {
if let Some(inner) = self.inner() {
inner.wakeup.set_readiness(mio::Ready::readable()).unwrap();
inner.waker.wake().expect("failed to wake reactor")
}
}

Expand All @@ -282,7 +292,7 @@ impl Inner {
/// Register an I/O resource with the reactor.
///
/// The registration token is returned.
pub(super) fn add_source(&self, source: &dyn Evented) -> io::Result<Address> {
pub(super) fn add_source(&self, source: &dyn mio::event::Source) -> io::Result<Address> {
let address = self.io_dispatch.alloc().ok_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
Expand All @@ -291,20 +301,17 @@ impl Inner {
})?;

self.n_sources.fetch_add(1, SeqCst);

self.io.register(
self.registry.register(
source,
mio::Token(address.to_usize()),
mio::Ready::all(),
mio::PollOpt::edge(),
mio::Interests::READABLE | mio::Interests::WRITABLE,
)?;

Ok(address)
}

/// Deregisters an I/O resource from the reactor.
pub(super) fn deregister_source(&self, source: &dyn Evented) -> io::Result<()> {
self.io.deregister(source)
pub(super) fn deregister_source(&self, source: &dyn mio::event::Source) -> io::Result<()> {
self.registry.deregister(source)
}

pub(super) fn drop_source(&self, address: Address) {
Expand All @@ -323,27 +330,23 @@ impl Inner {
.get_readiness(token)
.unwrap_or_else(|| panic!("token {:?} no longer valid!", token));

let (waker, ready) = match dir {
Direction::Read => (&sched.reader, !mio::Ready::writable()),
Direction::Write => (&sched.writer, mio::Ready::writable()),
let (waker, interest) = match dir {
Direction::Read => (&sched.reader, Readiness::READABLE),
Direction::Write => (&sched.writer, Readiness::WRITABLE),
};

waker.register(w);

if readiness & ready.as_usize() != 0 {
if readiness & interest.as_usize() != 0 {
waker.wake();
}
}
}

impl Direction {
pub(super) fn mask(self) -> mio::Ready {
pub(super) fn mask(self) -> Readiness {
match self {
Direction::Read => {
// Everything except writable is signaled through read.
mio::Ready::all() - mio::Ready::writable()
}
Direction::Write => mio::Ready::writable() | platform::hup(),
Direction::Read => Readiness::READABLE | Readiness::READ_CLOSED,
Direction::Write => Readiness::WRITABLE | Readiness::WRITE_CLOSED,
}
}
}
Expand All @@ -353,31 +356,24 @@ mod tests {
use super::*;
use loom::thread;

// No-op `Evented` impl just so we can have something to pass to `add_source`.
struct NotEvented;
// No-op `Source` impl just so we can have something to pass to `add_source`.
struct NotSource;

impl Evented for NotEvented {
fn register(
&self,
_: &mio::Poll,
_: mio::Token,
_: mio::Ready,
_: mio::PollOpt,
) -> io::Result<()> {
impl mio::event::Source for NotSource {
fn register(&self, _: &mio::Registry, _: mio::Token, _: mio::Interests) -> io::Result<()> {
Ok(())
}

fn reregister(
&self,
_: &mio::Poll,
_: &mio::Registry,
_: mio::Token,
_: mio::Ready,
_: mio::PollOpt,
_: mio::Interests,
) -> io::Result<()> {
Ok(())
}

fn deregister(&self, _: &mio::Poll) -> io::Result<()> {
fn deregister(&self, _: &mio::Registry) -> io::Result<()> {
Ok(())
}
}
Expand All @@ -389,12 +385,12 @@ mod tests {
let inner = reactor.inner;
let inner2 = inner.clone();

let token_1 = inner.add_source(&NotEvented).unwrap();
let token_1 = inner.add_source(&NotSource).unwrap();
let thread = thread::spawn(move || {
inner2.drop_source(token_1);
});

let token_2 = inner.add_source(&NotEvented).unwrap();
let token_2 = inner.add_source(&NotSource).unwrap();
thread.join().unwrap();

assert!(token_1 != token_2);
Expand All @@ -410,15 +406,15 @@ mod tests {
// add sources to fill up the first page so that the dropped index
// may be reused.
for _ in 0..31 {
inner.add_source(&NotEvented).unwrap();
inner.add_source(&NotSource).unwrap();
}

let token_1 = inner.add_source(&NotEvented).unwrap();
let token_1 = inner.add_source(&NotSource).unwrap();
let thread = thread::spawn(move || {
inner2.drop_source(token_1);
});

let token_2 = inner.add_source(&NotEvented).unwrap();
let token_2 = inner.add_source(&NotSource).unwrap();
thread.join().unwrap();

assert!(token_1 != token_2);
Expand All @@ -433,11 +429,11 @@ mod tests {
let inner2 = inner.clone();

let thread = thread::spawn(move || {
let token_2 = inner2.add_source(&NotEvented).unwrap();
let token_2 = inner2.add_source(&NotSource).unwrap();
token_2
});

let token_1 = inner.add_source(&NotEvented).unwrap();
let token_1 = inner.add_source(&NotSource).unwrap();
let token_2 = thread.join().unwrap();

assert!(token_1 != token_2);
Expand Down
28 changes: 0 additions & 28 deletions tokio/src/io/driver/platform.rs

This file was deleted.

Loading