Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use select() instead of mio polling in Unix EventSource #711

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,7 @@ crossterm_winapi = "0.9"
#
[target.'cfg(unix)'.dependencies]
libc = "0.2"
mio = { version = "0.8", features = ["os-poll"] }
signal-hook = { version = "0.3.13" }
signal-hook-mio = { version = "0.2.3", features = ["support-v0_8"] }

#
# Dev dependencies (examples, ...)
Expand Down
217 changes: 102 additions & 115 deletions src/event/source/unix.rs
Original file line number Diff line number Diff line change
@@ -1,42 +1,59 @@
use std::{collections::VecDeque, io, time::Duration};
use std::io::Read;
use std::{collections::VecDeque, io, os::unix::net::UnixStream, time::Duration};

use mio::{unix::SourceFd, Events, Interest, Poll, Token};
use signal_hook_mio::v0_8::Signals;
use signal_hook::low_level::pipe;

use crate::event::timeout::PollTimeout;
use crate::event::Event;
use crate::Result;

mod select;
use self::select::Selector;

#[cfg(feature = "event-stream")]
use super::super::sys::Waker;
use super::super::{
source::EventSource,
sys::unix::{
file_descriptor::{tty_fd, FileDesc},
parse::parse_event,

use super::{
super::{
sys::unix::{
file_descriptor::{tty_fd, FileDesc},
parse::parse_event,
},
InternalEvent,
},
timeout::PollTimeout,
Event, InternalEvent,
EventSource,
};

// Tokens to identify file descriptor
const TTY_TOKEN: Token = Token(0);
const SIGNAL_TOKEN: Token = Token(1);
/// Holds a prototypical Waker and a receiver we can wait on when doing select().
#[cfg(feature = "event-stream")]
const WAKE_TOKEN: Token = Token(2);
struct WakePipe {
receiver: UnixStream,
waker: Waker,
}

#[cfg(feature = "event-stream")]
impl WakePipe {
fn new() -> Result<Self> {
let (receiver, sender) = UnixStream::pair()?;
Ok(WakePipe {
receiver,
waker: Waker::new(sender),
})
}
}

// I (@zrzka) wasn't able to read more than 1_022 bytes when testing
// reading on macOS/Linux -> we don't need bigger buffer and 1k of bytes
// is enough.
const TTY_BUFFER_SIZE: usize = 1_024;

pub(crate) struct UnixInternalEventSource {
poll: Poll,
events: Events,
parser: Parser,
tty_buffer: [u8; TTY_BUFFER_SIZE],
tty_fd: FileDesc,
signals: Signals,
tty: FileDesc,
winch_signal_receiver: UnixStream,
#[cfg(feature = "event-stream")]
waker: Waker,
wake_pipe: WakePipe,
}

impl UnixInternalEventSource {
Expand All @@ -45,128 +62,98 @@ impl UnixInternalEventSource {
}

pub(crate) fn from_file_descriptor(input_fd: FileDesc) -> Result<Self> {
let poll = Poll::new()?;
let registry = poll.registry();

let tty_raw_fd = input_fd.raw_fd();
let mut tty_ev = SourceFd(&tty_raw_fd);
registry.register(&mut tty_ev, TTY_TOKEN, Interest::READABLE)?;

let mut signals = Signals::new(&[signal_hook::consts::SIGWINCH])?;
registry.register(&mut signals, SIGNAL_TOKEN, Interest::READABLE)?;

#[cfg(feature = "event-stream")]
let waker = Waker::new(registry, WAKE_TOKEN)?;

Ok(UnixInternalEventSource {
poll,
events: Events::with_capacity(3),
parser: Parser::default(),
tty_buffer: [0u8; TTY_BUFFER_SIZE],
tty_fd: input_fd,
signals,
tty: input_fd,
winch_signal_receiver: {
let (receiver, sender) = UnixStream::pair()?;
// Unregistering is unnecessary because EventSource is a singleton
pipe::register(libc::SIGWINCH, sender)?;
receiver
},
#[cfg(feature = "event-stream")]
waker,
wake_pipe: WakePipe::new()?,
})
}
}

impl EventSource for UnixInternalEventSource {
fn try_read(&mut self, timeout: Option<Duration>) -> Result<Option<InternalEvent>> {
if let Some(event) = self.parser.next() {
return Ok(Some(event));
}

let timeout = PollTimeout::new(timeout);
let mut selector = Selector::default();

loop {
if let Err(e) = self.poll.poll(&mut self.events, timeout.leftover()) {
// Mio will throw an interrupted error in case of cursor position retrieval. We need to retry until it succeeds.
// Previous versions of Mio (< 0.7) would automatically retry the poll call if it was interrupted (if EINTR was returned).
// https://docs.rs/mio/0.7.0/mio/struct.Poll.html#notes
if e.kind() == io::ErrorKind::Interrupted {
continue;
} else {
return Err(e);
}
};

if self.events.is_empty() {
// No readiness events = timeout
return Ok(None);
while timeout.leftover().map_or(true, |t| !t.is_zero()) {
// check if there are buffered events from the last read
if let Some(event) = self.parser.next() {
return Ok(Some(event));
}
selector.add(&self.tty);
selector.add(&self.winch_signal_receiver);

for token in self.events.iter().map(|x| x.token()) {
match token {
TTY_TOKEN => {
loop {
match self.tty_fd.read(&mut self.tty_buffer, TTY_BUFFER_SIZE) {
Ok(read_count) => {
if read_count > 0 {
self.parser.advance(
&self.tty_buffer[..read_count],
read_count == TTY_BUFFER_SIZE,
);
}
}
Err(e) => {
// No more data to read at the moment. We will receive another event
if e.kind() == io::ErrorKind::WouldBlock {
break;
}
// once more data is available to read.
else if e.kind() == io::ErrorKind::Interrupted {
continue;
}
}
};

if let Some(event) = self.parser.next() {
return Ok(Some(event));
#[cfg(feature = "event-stream")]
selector.add(&self.wake_pipe.receiver);

let _ = selector.select(timeout.leftover())?;
if selector.get(&self.tty).is_some() {
loop {
match self.tty.read(&mut self.tty_buffer, TTY_BUFFER_SIZE) {
Ok(read_count) => {
if read_count > 0 {
self.parser.advance(
&self.tty_buffer[..read_count],
read_count == TTY_BUFFER_SIZE,
);
}
}
}
SIGNAL_TOKEN => {
for signal in self.signals.pending() {
match signal {
signal_hook::consts::SIGWINCH => {
// TODO Should we remove tput?
//
// This can take a really long time, because terminal::size can
// launch new process (tput) and then it parses its output. It's
// not a really long time from the absolute time point of view, but
// it's a really long time from the mio, async-std/tokio executor, ...
// point of view.
let new_size = crate::terminal::size()?;
return Ok(Some(InternalEvent::Event(Event::Resize(
new_size.0, new_size.1,
))));
}
_ => unreachable!("Synchronize signal registration & handling"),
};
Err(e) => {
match e.kind() {
// No more data to read at the moment. We will receive another event
// once more data is available to read.
io::ErrorKind::WouldBlock => break,
io::ErrorKind::Interrupted => continue,
_ => return Err(e),
Copy link
Contributor Author

@yyogo yyogo Sep 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This branch was missing, I believe it was a mistake, but this could be a behavior change

}
}
};

if let Some(event) = self.parser.next() {
return Ok(Some(event));
}
#[cfg(feature = "event-stream")]
WAKE_TOKEN => {
return Err(std::io::Error::new(
std::io::ErrorKind::Interrupted,
"Poll operation was woken up by `Waker::wake`",
));
}
_ => unreachable!("Synchronize Evented handle registration & token handling"),
}
}

// Processing above can take some time, check if timeout expired
if timeout.elapsed() {
return Ok(None);
if selector.get(&self.winch_signal_receiver).is_some() {
let mut buff = [0];
if self.winch_signal_receiver.read(&mut buff)? > 0 {
// TODO Should we remove tput?
//
// This can take a really long time, because terminal::size can
// launch new process (tput) and then it parses its output. It's
// not a really long time from the absolute time point of view, but
// it's a really long time from the mio, async-std/tokio executor, ...
// point of view.
let new_size = crate::terminal::size()?;
return Ok(Some(InternalEvent::Event(Event::Resize(
new_size.0, new_size.1,
))));
}
}
#[cfg(feature = "event-stream")]
if selector.get(&self.wake_pipe.receiver).is_some() {
if self.wake_pipe.receiver.read(&mut [0])? > 0 {
return Err(std::io::Error::new(
std::io::ErrorKind::Interrupted,
"Poll operation was woken up by `Waker::wake`",
));
}
}
}
Ok(None)
}

#[cfg(feature = "event-stream")]
fn waker(&self) -> Waker {
self.waker.clone()
self.wake_pipe.waker.clone()
}
}

Expand Down
Loading