Skip to content

Commit

Permalink
Use a custom Tokio runtime
Browse files Browse the repository at this point in the history
It's derived from tokio_current_thread, hence still single thread only,
but it additionally implements the turn() method, and may serve as a basis
upon we can build support for true multi-threading.
  • Loading branch information
piscisaureus committed Sep 21, 2018
1 parent 4d16d54 commit 6ab12f3
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 4 deletions.
4 changes: 4 additions & 0 deletions BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ main_extern = [
"$rust_build:tempfile",
"$rust_build:rand",
"$rust_build:tokio",
"$rust_build:tokio_current_thread",
"$rust_build:tokio_executor",
"$rust_build:tokio_reactor",
"$rust_build:tokio_timer",
"$rust_build:url",
"$rust_build:remove_dir_all",
"$rust_build:dirs",
Expand Down
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,8 @@ rand = "0.4.3"
ring = "0.13.2"
tempfile = "3"
tokio = "0.1.8"
tokio_current_thread = "0.1.1"
tokio_executor = "0.1.2"
tokio_reactor = "0.1.5"
tokio_timer = "0.2.5"
url = "1.7.1"
2 changes: 1 addition & 1 deletion src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ fn permission_denied() -> DenoError {
fn not_implemented() -> DenoError {
DenoError::from(std::io::Error::new(
std::io::ErrorKind::Other,
"Not implemented"
"Not implemented",
))
}

Expand Down
6 changes: 3 additions & 3 deletions src/isolate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ use std;
use std::collections::HashMap;
use std::ffi::CStr;
use std::ffi::CString;
use tokio;
use tokio_runtime;

type DenoException<'a> = &'a str;

pub struct Isolate {
pub ptr: *const libdeno::DenoC,
pub dir: deno_dir::DenoDir,
pub rt: tokio::runtime::current_thread::Runtime,
pub rt: tokio_runtime::Runtime,
pub timers: HashMap<u32, futures::sync::oneshot::Sender<()>>,
pub argv: Vec<String>,
pub flags: flags::DenoFlags,
Expand All @@ -35,7 +35,7 @@ impl Isolate {
let mut deno_box = Box::new(Isolate {
ptr: 0 as *const libdeno::DenoC,
dir: deno_dir::DenoDir::new(flags.reload, None).unwrap(),
rt: tokio::runtime::current_thread::Runtime::new().unwrap(),
rt: tokio_runtime::Runtime::new().unwrap(),
timers: HashMap::new(),
argv: argv_rest,
flags,
Expand Down
5 changes: 5 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ extern crate msg_rs as msg;
extern crate rand;
extern crate tempfile;
extern crate tokio;
extern crate tokio_current_thread;
extern crate tokio_executor;
extern crate tokio_reactor;
extern crate tokio_timer;
extern crate url;
#[macro_use]
extern crate log;
Expand All @@ -23,6 +27,7 @@ pub mod handlers;
mod isolate;
mod libdeno;
mod net;
mod tokio_runtime;
mod version;

use isolate::Isolate;
Expand Down
138 changes: 138 additions & 0 deletions src/tokio_runtime.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
use futures;
use std;
use tokio_current_thread;
pub use tokio_current_thread::{RunError, Turn, TurnError};
use tokio_executor;
use tokio_reactor;
use tokio_timer;

type Timer = tokio_timer::timer::Timer<tokio_reactor::Reactor>;

#[derive(Debug)]
pub struct Runtime {
reactor_handle: tokio_reactor::Handle,
timer_handle: tokio_timer::timer::Handle,
clock: tokio_timer::clock::Clock,
executor: tokio_current_thread::CurrentThread<Timer>,
}

#[derive(Debug, Clone)]
pub struct Handle(tokio_current_thread::Handle);

impl Handle {
// Spawns a future onto the runtime instance that this handle is for.
#[allow(dead_code)]
pub fn spawn<F>(&self, future: F) -> Result<(), tokio_executor::SpawnError>
where
F: futures::Future<Item = (), Error = ()> + Send + 'static,
{
self.0.spawn(future)
}
}

impl Runtime {
pub fn new() -> std::io::Result<Runtime> {
let reactor = tokio_reactor::Reactor::new()?;
let reactor_handle = reactor.handle();

let clock = tokio_timer::clock::Clock::new();

let timer = tokio_timer::timer::Timer::new_with_now(reactor, clock.clone());
let timer_handle = timer.handle();

let executor = tokio_current_thread::CurrentThread::new_with_park(timer);

Ok(Runtime {
reactor_handle: reactor_handle,
timer_handle: timer_handle,
clock,
executor,
})
}

#[allow(dead_code)]
pub fn handle(&self) -> Handle {
Handle(self.executor.handle().clone())
}

// Returns true if the event loop is idle, that is, there are no more futures
// to complete.
#[allow(dead_code)]
pub fn is_idle(&self) -> bool {
self.executor.is_idle()
}

// Spawns a future onto this (single-threaded) runtime.
pub fn spawn<F>(&mut self, future: F) -> &mut Self
where
F: futures::Future<Item = (), Error = ()> + 'static,
{
self.executor.spawn(future);
self
}

// Runs the event loop until the specified future has completed.
#[allow(dead_code)]
pub fn block_on<F>(&mut self, f: F) -> Result<F::Item, F::Error>
where
F: futures::Future,
{
self.enter(|executor| {
let ret = executor.block_on(f);
// Map error to Future::Error.
ret.map_err(|e| e.into_inner().expect("unexpected execution error"))
})
}

// Runs the event loop until all futures have completed.
#[allow(dead_code)]
pub fn run(&mut self) -> Result<(), RunError> {
self.enter(|executor| executor.run())
}

// Runs the event loop until any future has completed or the timeout expires.
#[allow(dead_code)]
pub fn turn(
&mut self,
max_wait: Option<std::time::Duration>,
) -> Result<Turn, TurnError> {
self.enter(|executor| executor.turn(max_wait))
}

fn enter<F, R>(&mut self, f: F) -> R
where
F: FnOnce(&mut tokio_current_thread::Entered<Timer>) -> R,
{
let Runtime {
ref reactor_handle,
ref timer_handle,
ref clock,
ref mut executor,
..
} = *self;

// Binds an executor to this thread.
let mut enter =
tokio_executor::enter().expect("Multiple executors at once");

// This will set the default handle and timer to use inside the closure
// and run the future.
tokio_reactor::with_default(&reactor_handle, &mut enter, |enter| {
tokio_timer::clock::with_default(clock, enter, |enter| {
tokio_timer::timer::with_default(&timer_handle, enter, |enter| {
// The TaskExecutor is a fake executor that looks into the
// current single-threaded executor when used. This is a trick,
// because we need two mutable references to the executor (one
// to run the provided future, another to install as the default
// one). We use the fake one here as the default one.
let mut default_executor =
tokio_current_thread::TaskExecutor::current();
tokio_executor::with_default(&mut default_executor, enter, |enter| {
let mut executor = executor.enter(enter);
f(&mut executor)
})
})
})
})
}
}

0 comments on commit 6ab12f3

Please sign in to comment.