diff --git a/BUILD.gn b/BUILD.gn index 038e70430ddab1..211c4ab0433874 100644 --- a/BUILD.gn +++ b/BUILD.gn @@ -46,6 +46,7 @@ main_extern = [ "$rust_build:tempfile", "$rust_build:rand", "$rust_build:tokio", + "$rust_build:tokio_executor", "$rust_build:url", "$rust_build:remove_dir_all", "$rust_build:dirs", diff --git a/src/handlers.rs b/src/handlers.rs index ad6552dc1ab0cc..b38359ac527734 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -1,46 +1,44 @@ // Copyright 2018 the Deno authors. All rights reserved. MIT license. + use errors::DenoError; use errors::DenoResult; -use flatbuffers::FlatBufferBuilder; use fs as deno_fs; +use isolate::Buf; +use isolate::IsolateState; +use isolate::Op; +use msg; + +use flatbuffers::FlatBufferBuilder; use futures; use futures::sync::oneshot; use hyper; use hyper::rt::{Future, Stream}; use hyper::Client; -use isolate::from_c; -use libdeno; -use libdeno::{deno_buf, DenoC}; -use msg; use remove_dir_all::remove_dir_all; use std; use std::fs; #[cfg(any(unix))] use std::os::unix::fs::PermissionsExt; use std::path::Path; +use std::sync::Arc; use std::time::UNIX_EPOCH; use std::time::{Duration, Instant}; use tokio::timer::Delay; -// Buf represents a byte array returned from a "Op". -// The message might be empty (which will be translated into a null object on -// the javascript side) or it is a heap allocated opaque sequence of bytes. -// Usually a flatbuffer message. -type Buf = Option>; - -// JS promises in Deno map onto a specific Future -// which yields either a DenoError or a byte array. -type Op = Future; - type OpResult = DenoResult; // TODO Ideally we wouldn't have to box the Op being returned. // The box is just to make it easier to get a prototype refactor working. -type Handler = fn(d: *const DenoC, base: &msg::Base) -> Box; +type Handler = fn(state: Arc, base: &msg::Base) -> Box; -pub extern "C" fn msg_from_js(d: *const DenoC, buf: deno_buf) { - let bytes = unsafe { std::slice::from_raw_parts(buf.data_ptr, buf.data_len) }; +// Hopefully Rust optimizes this away. +fn empty_buf() -> Buf { + Box::new([]) +} + +pub fn msg_from_js(state: Arc, bytes: &[u8]) -> (bool, Box) { let base = msg::get_root_as_base(bytes); + let is_sync = base.sync(); let msg_type = base.msg_type(); let cmd_id = base.cmd_id(); let handler: Handler = match msg_type { @@ -67,73 +65,51 @@ pub extern "C" fn msg_from_js(d: *const DenoC, buf: deno_buf) { )), }; - let future = handler(d, &base); - let future = future.or_else(move |err| { - // No matter whether we got an Err or Ok, we want a serialized message to - // send back. So transform the DenoError into a deno_buf. - let builder = &mut FlatBufferBuilder::new(); - let errmsg_offset = builder.create_string(&format!("{}", err)); - Ok(serialize_response( - cmd_id, - builder, - msg::BaseArgs { - error: Some(errmsg_offset), - error_kind: err.kind(), - ..Default::default() - }, - )) - }); - - let isolate = from_c(d); - if base.sync() { - // Execute future synchronously. - // println!("sync handler {}", msg::enum_name_any(msg_type)); - let maybe_box_u8 = future.wait().unwrap(); - match maybe_box_u8 { - None => {} - Some(box_u8) => { - let buf = deno_buf_from(box_u8); - // Set the synchronous response, the value returned from isolate.send(). - unsafe { libdeno::deno_set_response(d, buf) } - } - } - } else { - // Execute future asynchornously. - let future = future.and_then(move |maybe_box_u8| { - let buf = match maybe_box_u8 { - Some(box_u8) => deno_buf_from(box_u8), - None => { - // async RPCs that return None still need to - // send a message back to signal completion. - let builder = &mut FlatBufferBuilder::new(); - deno_buf_from( - serialize_response( - cmd_id, - builder, - msg::BaseArgs { - ..Default::default() - }, - ).unwrap(), - ) - } + let op: Box = handler(state.clone(), &base); + let boxed_op = Box::new( + op.or_else(move |err: DenoError| -> DenoResult { + debug!("op err {}", err); + // No matter whether we got an Err or Ok, we want a serialized message to + // send back. So transform the DenoError into a deno_buf. + let builder = &mut FlatBufferBuilder::new(); + let errmsg_offset = builder.create_string(&format!("{}", err)); + Ok(serialize_response( + cmd_id, + builder, + msg::BaseArgs { + error: Some(errmsg_offset), + error_kind: err.kind(), + ..Default::default() + }, + )) + }).and_then(move |buf: Buf| -> DenoResult { + // Handle empty responses. For sync responses we just want + // to send null. For async we want to send a small message + // with the cmd_id. + let buf = if is_sync || buf.len() > 0 { + buf + } else { + // async RPCs that return empty still need to + // send a message back to signal completion. + let builder = &mut FlatBufferBuilder::new(); + serialize_response( + cmd_id, + builder, + msg::BaseArgs { + ..Default::default() + }, + ) }; - // TODO(ry) make this thread safe. - unsafe { libdeno::deno_send(d, buf) }; - Ok(()) - }); - isolate.rt.spawn(future); - } -} + Ok(buf) + }), + ); -fn deno_buf_from(x: Box<[u8]>) -> deno_buf { - let len = x.len(); - let ptr = Box::into_raw(x); - deno_buf { - alloc_ptr: 0 as *mut u8, - alloc_len: 0, - data_ptr: ptr as *mut u8, - data_len: len, - } + debug!( + "msg_from_js {} sync {}", + msg::enum_name_any(msg_type), + base.sync() + ); + return (base.sync(), boxed_op); } fn permission_denied() -> DenoError { @@ -146,20 +122,19 @@ fn permission_denied() -> DenoError { fn not_implemented() -> DenoError { DenoError::from(std::io::Error::new( std::io::ErrorKind::Other, - "Not implemented" + "Not implemented", )) } -fn handle_exit(_d: *const DenoC, base: &msg::Base) -> Box { +fn handle_exit(_config: Arc, base: &msg::Base) -> Box { let msg = base.msg_as_exit().unwrap(); std::process::exit(msg.code()) } -fn handle_start(d: *const DenoC, base: &msg::Base) -> Box { - let isolate = from_c(d); +fn handle_start(state: Arc, base: &msg::Base) -> Box { let mut builder = FlatBufferBuilder::new(); - let argv = isolate.argv.iter().map(|s| s.as_str()).collect::>(); + let argv = state.argv.iter().map(|s| s.as_str()).collect::>(); let argv_off = builder.create_vector_of_strings(argv.as_slice()); let cwd_path = std::env::current_dir().unwrap(); @@ -171,7 +146,7 @@ fn handle_start(d: *const DenoC, base: &msg::Base) -> Box { &msg::StartResArgs { cwd: Some(cwd_off), argv: Some(argv_off), - debug_flag: isolate.flags.log_debug, + debug_flag: state.flags.log_debug, ..Default::default() }, ); @@ -198,7 +173,7 @@ fn serialize_response( let data = builder.finished_data(); // println!("serialize_response {:x?}", data); let vec = data.to_vec(); - Some(vec.into_boxed_slice()) + vec.into_boxed_slice() } fn ok_future(buf: Buf) -> Box { @@ -211,22 +186,17 @@ fn odd_future(err: DenoError) -> Box { } // https://github.com/denoland/isolate/blob/golang/os.go#L100-L154 -fn handle_code_fetch(d: *const DenoC, base: &msg::Base) -> Box { +fn handle_code_fetch(state: Arc, base: &msg::Base) -> Box { let msg = base.msg_as_code_fetch().unwrap(); let cmd_id = base.cmd_id(); let module_specifier = msg.module_specifier().unwrap(); let containing_file = msg.containing_file().unwrap(); - let isolate = from_c(d); - assert_eq!( - isolate.dir.root.join("gen"), - isolate.dir.gen, - "Sanity check" - ); + assert_eq!(state.dir.root.join("gen"), state.dir.gen, "Sanity check"); Box::new(futures::future::result(|| -> OpResult { let builder = &mut FlatBufferBuilder::new(); - let out = isolate.dir.code_fetch(module_specifier, containing_file)?; + let out = state.dir.code_fetch(module_specifier, containing_file)?; let mut msg_args = msg::CodeFetchResArgs { module_name: Some(builder.create_string(&out.module_name)), filename: Some(builder.create_string(&out.filename)), @@ -253,36 +223,34 @@ fn handle_code_fetch(d: *const DenoC, base: &msg::Base) -> Box { } // https://github.com/denoland/isolate/blob/golang/os.go#L156-L169 -fn handle_code_cache(d: *const DenoC, base: &msg::Base) -> Box { +fn handle_code_cache(state: Arc, base: &msg::Base) -> Box { let msg = base.msg_as_code_cache().unwrap(); let filename = msg.filename().unwrap(); let source_code = msg.source_code().unwrap(); let output_code = msg.output_code().unwrap(); Box::new(futures::future::result(|| -> OpResult { - let isolate = from_c(d); - isolate.dir.code_cache(filename, source_code, output_code)?; - Ok(None) + state.dir.code_cache(filename, source_code, output_code)?; + Ok(empty_buf()) }())) } -fn handle_set_env(d: *const DenoC, base: &msg::Base) -> Box { +fn handle_set_env(state: Arc, base: &msg::Base) -> Box { let msg = base.msg_as_set_env().unwrap(); let key = msg.key().unwrap(); let value = msg.value().unwrap(); - let isolate = from_c(d); - if !isolate.flags.allow_env { + if !state.flags.allow_env { return odd_future(permission_denied()); } std::env::set_var(key, value); - ok_future(None) + ok_future(empty_buf()) } -fn handle_env(d: *const DenoC, base: &msg::Base) -> Box { - let isolate = from_c(d); +fn handle_env(state: Arc, base: &msg::Base) -> Box { let cmd_id = base.cmd_id(); - if !isolate.flags.allow_env { + + if !state.flags.allow_env { return odd_future(permission_denied()); } @@ -320,22 +288,23 @@ fn handle_env(d: *const DenoC, base: &msg::Base) -> Box { )) } -fn handle_fetch_req(d: *const DenoC, base: &msg::Base) -> Box { +fn handle_fetch_req(state: Arc, base: &msg::Base) -> Box { let msg = base.msg_as_fetch_req().unwrap(); let cmd_id = base.cmd_id(); let id = msg.id(); let url = msg.url().unwrap(); - let isolate = from_c(d); - if !isolate.flags.allow_net { + if !state.flags.allow_net { return odd_future(permission_denied()); } let url = url.parse::().unwrap(); let client = Client::new(); + debug!("Before fetch {}", url); let future = client.get(url).and_then(move |res| { let status = res.status().as_u16() as i32; + debug!("fetch {}", status); let headers = { let map = res.headers(); @@ -359,6 +328,7 @@ fn handle_fetch_req(d: *const DenoC, base: &msg::Base) -> Box { let future = future.map_err(|err| -> DenoError { err.into() }).and_then( move |(status, body, headers)| { + debug!("fetch body "); let builder = &mut FlatBufferBuilder::new(); // Send the first message without a body. This is just to indicate // what status code. @@ -420,7 +390,7 @@ where (delay_task, cancel_tx) } -fn handle_make_temp_dir(d: *const DenoC, base: &msg::Base) -> Box { +fn handle_make_temp_dir(state: Arc, base: &msg::Base) -> Box { let base = Box::new(*base); let msg = base.msg_as_make_temp_dir().unwrap(); let cmd_id = base.cmd_id(); @@ -428,8 +398,7 @@ fn handle_make_temp_dir(d: *const DenoC, base: &msg::Base) -> Box { let prefix = msg.prefix(); let suffix = msg.suffix(); - let isolate = from_c(d); - if !isolate.flags.allow_write { + if !state.flags.allow_write { return odd_future(permission_denied()); } // TODO Use blocking() here. @@ -459,28 +428,28 @@ fn handle_make_temp_dir(d: *const DenoC, base: &msg::Base) -> Box { }())) } -fn handle_mkdir(d: *const DenoC, base: &msg::Base) -> Box { +fn handle_mkdir(state: Arc, base: &msg::Base) -> Box { let msg = base.msg_as_mkdir().unwrap(); let mode = msg.mode(); let path = msg.path().unwrap(); - let isolate = from_c(d); - if !isolate.flags.allow_write { + + if !state.flags.allow_write { return odd_future(permission_denied()); } // TODO Use tokio_threadpool. Box::new(futures::future::result(|| -> OpResult { debug!("handle_mkdir {}", path); deno_fs::mkdir(Path::new(path), mode)?; - Ok(None) + Ok(empty_buf()) }())) } -fn handle_remove(d: *const DenoC, base: &msg::Base) -> Box { +fn handle_remove(state: Arc, base: &msg::Base) -> Box { let msg = base.msg_as_remove().unwrap(); let path = msg.path().unwrap(); let recursive = msg.recursive(); - let isolate = from_c(d); - if !isolate.flags.allow_write { + + if !state.flags.allow_write { return odd_future(permission_denied()); } // TODO Use tokio_threadpool. @@ -497,12 +466,12 @@ fn handle_remove(d: *const DenoC, base: &msg::Base) -> Box { fs::remove_dir(&path_)?; } } - Ok(None) + Ok(empty_buf()) }())) } // Prototype https://github.com/denoland/isolate/blob/golang/os.go#L171-L184 -fn handle_read_file(_d: *const DenoC, base: &msg::Base) -> Box { +fn handle_read_file(_config: Arc, base: &msg::Base) -> Box { let msg = base.msg_as_read_file().unwrap(); let cmd_id = base.cmd_id(); let filename = String::from(msg.filename().unwrap()); @@ -552,7 +521,7 @@ fn get_mode(_perm: fs::Permissions) -> u32 { 0 } -fn handle_stat(_d: *const DenoC, base: &msg::Base) -> Box { +fn handle_stat(_config: Arc, base: &msg::Base) -> Box { let msg = base.msg_as_stat().unwrap(); let cmd_id = base.cmd_id(); let filename = String::from(msg.filename().unwrap()); @@ -595,49 +564,49 @@ fn handle_stat(_d: *const DenoC, base: &msg::Base) -> Box { }())) } -fn handle_write_file(d: *const DenoC, base: &msg::Base) -> Box { +fn handle_write_file(state: Arc, base: &msg::Base) -> Box { let msg = base.msg_as_write_file().unwrap(); let filename = String::from(msg.filename().unwrap()); let data = msg.data().unwrap(); let perm = msg.perm(); - let isolate = from_c(d); - if !isolate.flags.allow_write { + if !state.flags.allow_write { return odd_future(permission_denied()); } + Box::new(futures::future::result(|| -> OpResult { debug!("handle_write_file {}", filename); deno_fs::write_file(Path::new(&filename), data, perm)?; - Ok(None) + Ok(empty_buf()) }())) } -// TODO(ry) Use Isolate instead of DenoC as first arg. -fn remove_timer(d: *const DenoC, timer_id: u32) { - let isolate = from_c(d); - isolate.timers.remove(&timer_id); +fn remove_timer(state: Arc, timer_id: u32) { + let mut timers = state.timers.lock().unwrap(); + timers.remove(&timer_id); } // Prototype: https://github.com/ry/isolate/blob/golang/timers.go#L25-L39 -fn handle_timer_start(d: *const DenoC, base: &msg::Base) -> Box { +fn handle_timer_start(state: Arc, base: &msg::Base) -> Box { debug!("handle_timer_start"); let msg = base.msg_as_timer_start().unwrap(); let cmd_id = base.cmd_id(); let timer_id = msg.id(); let delay = msg.delay(); - let isolate = from_c(d); + let config2 = state.clone(); let future = { let (delay_task, cancel_delay) = set_timeout( move || { - remove_timer(d, timer_id); + remove_timer(config2, timer_id); }, delay, ); - isolate.timers.insert(timer_id, cancel_delay); + let mut timers = state.timers.lock().unwrap(); + timers.insert(timer_id, cancel_delay); delay_task }; - Box::new(future.then(move |result| { + let r = Box::new(future.then(move |result| { let builder = &mut FlatBufferBuilder::new(); let msg = msg::TimerReady::create( builder, @@ -656,20 +625,20 @@ fn handle_timer_start(d: *const DenoC, base: &msg::Base) -> Box { ..Default::default() }, )) - })) + })); + r } // Prototype: https://github.com/ry/isolate/blob/golang/timers.go#L40-L43 -fn handle_timer_clear(d: *const DenoC, base: &msg::Base) -> Box { +fn handle_timer_clear(state: Arc, base: &msg::Base) -> Box { let msg = base.msg_as_timer_clear().unwrap(); debug!("handle_timer_clear"); - remove_timer(d, msg.id()); - ok_future(None) + remove_timer(state, msg.id()); + ok_future(empty_buf()) } -fn handle_rename(d: *const DenoC, base: &msg::Base) -> Box { - let isolate = from_c(d); - if !isolate.flags.allow_write { +fn handle_rename(state: Arc, base: &msg::Base) -> Box { + if !state.flags.allow_write { return odd_future(permission_denied()); } let msg = base.msg_as_rename().unwrap(); @@ -678,13 +647,12 @@ fn handle_rename(d: *const DenoC, base: &msg::Base) -> Box { Box::new(futures::future::result(|| -> OpResult { debug!("handle_rename {} {}", oldpath, newpath); fs::rename(Path::new(&oldpath), Path::new(&newpath))?; - Ok(None) + Ok(empty_buf()) }())) } -fn handle_symlink(d: *const DenoC, base: &msg::Base) -> Box { - let deno = from_c(d); - if !deno.flags.allow_write { +fn handle_symlink(state: Arc, base: &msg::Base) -> Box { + if !state.flags.allow_write { return odd_future(permission_denied()); } // TODO Use type for Windows. @@ -698,7 +666,7 @@ fn handle_symlink(d: *const DenoC, base: &msg::Base) -> Box { debug!("handle_symlink {} {}", oldname, newname); #[cfg(any(unix))] std::os::unix::fs::symlink(Path::new(&oldname), Path::new(&newname))?; - Ok(None) + Ok(empty_buf()) }())) } } diff --git a/src/isolate.rs b/src/isolate.rs index b4cb138ffa61e5..4a68a7c56b9775 100644 --- a/src/isolate.rs +++ b/src/isolate.rs @@ -1,54 +1,130 @@ // Copyright 2018 the Deno authors. All rights reserved. MIT license. + +// Do not use FlatBuffers in this module. +// TODO Currently this module uses Tokio, but it would be nice if they were +// decoupled. + use deno_dir; +use errors::DenoError; use flags; +use libdeno; + use futures; -use handlers; +use futures::Future; use libc::c_void; -use libdeno; use std; use std::collections::HashMap; use std::ffi::CStr; use std::ffi::CString; +use std::sync::mpsc; +use std::sync::Arc; +use std::sync::Mutex; use tokio; +use tokio_util; type DenoException<'a> = &'a str; +// Buf represents a byte array returned from a "Op". +// The message might be empty (which will be translated into a null object on +// the javascript side) or it is a heap allocated opaque sequence of bytes. +// Usually a flatbuffer message. +pub type Buf = Box<[u8]>; + +// JS promises in Deno map onto a specific Future +// which yields either a DenoError or a byte array. +pub type Op = Future + Send; + +// Returns (is_sync, op) +pub type Dispatch = fn(state: Arc, buf: &[u8]) -> (bool, Box); + pub struct Isolate { - pub ptr: *const libdeno::DenoC, + ptr: *const libdeno::DenoC, + dispatch: Dispatch, + rx: mpsc::Receiver, + pub state: Arc, +} + +// Isolate cannot be passed between threads but IsolateState can. So any state that +// needs to be accessed outside the main V8 thread should be inside IsolateState. +pub struct IsolateState { pub dir: deno_dir::DenoDir, - pub rt: tokio::runtime::current_thread::Runtime, - pub timers: HashMap>, + pub timers: Mutex>>, pub argv: Vec, pub flags: flags::DenoFlags, + ntasks: Mutex, + tx: Mutex>>, +} + +impl IsolateState { + fn ntasks_increment(&self) { + let mut ntasks = self.ntasks.lock().unwrap(); + assert!(*ntasks >= 0); + *ntasks = *ntasks + 1; + } + + fn ntasks_decrement(&self) { + let mut ntasks = self.ntasks.lock().unwrap(); + *ntasks = *ntasks - 1; + assert!(*ntasks >= 0); + } + + fn is_idle(&self) -> bool { + let ntasks = self.ntasks.lock().unwrap(); + *ntasks == 0 + } + + // Thread safe. + fn send_to_js(&self, buf: Buf) { + let mut g = self.tx.lock().unwrap(); + let maybe_tx = g.as_mut(); + assert!(maybe_tx.is_some(), "Expected tx to not be deleted."); + let tx = maybe_tx.unwrap(); + tx.send(buf).expect("tx.send error"); + } } static DENO_INIT: std::sync::Once = std::sync::ONCE_INIT; impl Isolate { - pub fn new(argv: Vec) -> Box { + pub fn new(argv: Vec, dispatch: Dispatch) -> Box { DENO_INIT.call_once(|| { unsafe { libdeno::deno_init() }; }); let (flags, argv_rest) = flags::set_flags(argv); - let mut deno_box = Box::new(Isolate { + // This channel handles sending async messages back to the runtime. + let (tx, rx) = mpsc::channel::(); + + let mut isolate = Box::new(Isolate { ptr: 0 as *const libdeno::DenoC, - dir: deno_dir::DenoDir::new(flags.reload, None).unwrap(), - rt: tokio::runtime::current_thread::Runtime::new().unwrap(), - timers: HashMap::new(), - argv: argv_rest, - flags, + dispatch, + rx, + state: Arc::new(IsolateState { + ntasks: Mutex::new(0), + dir: deno_dir::DenoDir::new(flags.reload, None).unwrap(), + timers: Mutex::new(HashMap::new()), + argv: argv_rest, + flags, + tx: Mutex::new(Some(tx)), + }), }); - (*deno_box).ptr = unsafe { + (*isolate).ptr = unsafe { libdeno::deno_new( - deno_box.as_ref() as *const _ as *const c_void, - handlers::msg_from_js, + isolate.as_ref() as *const _ as *const c_void, + pre_dispatch, ) }; - deno_box + isolate + } + + pub fn from_c<'a>(d: *const libdeno::DenoC) -> &'a mut Isolate { + let ptr = unsafe { libdeno::deno_get_data(d) }; + let ptr = ptr as *mut Isolate; + let isolate_box = unsafe { Box::from_raw(ptr) }; + Box::leak(isolate_box) } pub fn execute( @@ -68,6 +144,24 @@ impl Isolate { } Ok(()) } + + pub fn set_response(&self, buf: Buf) { + unsafe { libdeno::deno_set_response(self.ptr, buf.into()) } + } + + pub fn send(&self, buf: Buf) { + unsafe { libdeno::deno_send(self.ptr, buf.into()) }; + } + + // TODO Use Park abstraction? Note at time of writing Tokio default runtime + // does not have new_with_park(). + pub fn event_loop(&self) { + // Main thread event loop. + while !self.state.is_idle() { + let buf = self.rx.recv().unwrap(); + self.send(buf); + } + } } impl Drop for Isolate { @@ -76,22 +170,70 @@ impl Drop for Isolate { } } -pub fn from_c<'a>(d: *const libdeno::DenoC) -> &'a mut Isolate { - let ptr = unsafe { libdeno::deno_get_data(d) }; - let ptr = ptr as *mut Isolate; - let isolate_box = unsafe { Box::from_raw(ptr) }; - Box::leak(isolate_box) +/// Converts Rust Buf to libdeno deno_buf. +impl From for libdeno::deno_buf { + fn from(x: Buf) -> libdeno::deno_buf { + let len = x.len(); + let ptr = Box::into_raw(x); + libdeno::deno_buf { + alloc_ptr: 0 as *mut u8, + alloc_len: 0, + data_ptr: ptr as *mut u8, + data_len: len, + } + } +} + +// Dereferences the C pointer into the Rust Isolate object. +extern "C" fn pre_dispatch(d: *const libdeno::DenoC, buf: libdeno::deno_buf) { + let bytes = unsafe { std::slice::from_raw_parts(buf.data_ptr, buf.data_len) }; + let isolate = Isolate::from_c(d); + let dispatch = isolate.dispatch; + let (is_sync, op) = dispatch(isolate.state.clone(), bytes); + + if is_sync { + // Execute op synchronously. + let buf = tokio_util::block_on(op).unwrap(); + if buf.len() != 0 { + // Set the synchronous response, the value returned from isolate.send(). + isolate.set_response(buf); + } + } else { + // Execute op asynchornously. + let state = isolate.state.clone(); + + // TODO Ideally Tokio would could tell us how many tasks are executing, but + // it cannot currently. Therefore we track top-level promises/tasks + // manually. + state.ntasks_increment(); + + let task = op + .and_then(move |buf| { + state.ntasks_decrement(); + state.send_to_js(buf); + Ok(()) + }).map_err(|_| ()); + tokio::spawn(task); + } } #[test] fn test_c_to_rust() { let argv = vec![String::from("./deno"), String::from("hello.js")]; - let isolate = Isolate::new(argv); - let isolate2 = from_c(isolate.ptr); + let isolate = Isolate::new(argv, unreachable_dispatch); + let isolate2 = Isolate::from_c(isolate.ptr); assert_eq!(isolate.ptr, isolate2.ptr); assert_eq!( - isolate.dir.root.join("gen"), - isolate.dir.gen, + isolate.state.dir.root.join("gen"), + isolate.state.dir.gen, "Sanity check" ); } + +#[cfg(test)] +fn unreachable_dispatch( + _state: Arc, + _buf: &[u8], +) -> (bool, Box) { + unreachable!(); +} diff --git a/src/main.rs b/src/main.rs index 8cf29dd3779b98..7e9b7ee407d2fb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,6 +7,7 @@ extern crate msg_rs as msg; extern crate rand; extern crate tempfile; extern crate tokio; +extern crate tokio_executor; extern crate url; #[macro_use] extern crate log; @@ -23,9 +24,9 @@ pub mod handlers; mod isolate; mod libdeno; mod net; +mod tokio_util; mod version; -use isolate::Isolate; use std::env; static LOGGER: Logger = Logger; @@ -47,18 +48,16 @@ impl log::Log for Logger { fn main() { log::set_logger(&LOGGER).unwrap(); - let args = env::args().collect(); - let mut isolate = Isolate::new(args); - flags::process(&isolate.flags); - - isolate - .execute("deno_main.js", "denoMain();") - .unwrap_or_else(|err| { - error!("{}", err); - std::process::exit(1); - }); - - // Start the Tokio event loop - isolate.rt.run().expect("err"); + let isolate = isolate::Isolate::new(args, handlers::msg_from_js); + flags::process(&isolate.state.flags); + tokio_util::init(|| { + isolate + .execute("deno_main.js", "denoMain();") + .unwrap_or_else(|err| { + error!("{}", err); + std::process::exit(1); + }); + isolate.event_loop(); + }); } diff --git a/src/net.rs b/src/net.rs index 7e0700bb685780..03b60ef8a79462 100644 --- a/src/net.rs +++ b/src/net.rs @@ -1,33 +1,37 @@ +// Copyright 2018 the Deno authors. All rights reserved. MIT license. + use errors::DenoResult; +use tokio_util; + use hyper; use hyper::rt::{Future, Stream}; use hyper::{Client, Uri}; use hyper_rustls; -use tokio::runtime::current_thread::Runtime; // The CodeFetch message is used to load HTTP javascript resources and expects a // synchronous response, this utility method supports that. pub fn fetch_sync_string(module_name: &str) -> DenoResult { let url = module_name.parse::().unwrap(); - let https = hyper_rustls::HttpsConnector::new(4); let client: Client<_, hyper::Body> = Client::builder().build(https); - - // TODO Use Deno's RT - let mut rt = Runtime::new().unwrap(); - - let body = rt.block_on( - client - .get(url) - .and_then(|response| response.into_body().concat2()), - )?; + let future = client + .get(url) + .and_then(|response| response.into_body().concat2()); + let body = tokio_util::block_on(future)?; Ok(String::from_utf8(body.to_vec()).unwrap()) } #[test] fn test_fetch_sync_string() { // Relies on external http server. See tools/http_server.py - let p = fetch_sync_string("http://localhost:4545/package.json").unwrap(); - println!("package.json len {}", p.len()); - assert!(p.len() > 1); + use futures; + + tokio_util::init(|| { + tokio_util::block_on(futures::future::lazy(|| -> DenoResult<()> { + let p = fetch_sync_string("http://localhost:4545/package.json")?; + println!("package.json len {}", p.len()); + assert!(p.len() > 1); + Ok(()) + })).unwrap(); + }); } diff --git a/src/tokio_util.rs b/src/tokio_util.rs new file mode 100644 index 00000000000000..1651834b3a32fe --- /dev/null +++ b/src/tokio_util.rs @@ -0,0 +1,30 @@ +// Copyright 2018 the Deno authors. All rights reserved. MIT license. + +use futures; +use futures::Future; +use tokio; +use tokio_executor; + +pub fn block_on(future: F) -> Result +where + F: Send + 'static + Future, + R: Send + 'static, + E: Send + 'static, +{ + let (tx, rx) = futures::sync::oneshot::channel(); + tokio::spawn(future.then(move |r| tx.send(r).map_err(|_| unreachable!()))); + rx.wait().unwrap() +} + +// Set the default executor so we can use tokio::spawn(). It's difficult to +// pass around mut references to the runtime, so using with_default is +// preferable. Ideally Tokio would provide this function. +pub fn init(f: F) +where + F: Fn(), +{ + let rt = tokio::runtime::Runtime::new().unwrap(); + let mut executor = rt.executor(); + let mut enter = tokio_executor::enter().expect("Multiple executors at once"); + tokio_executor::with_default(&mut executor, &mut enter, move |_enter| f()); +}