-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Resultify AVM #496
Resultify AVM #496
Changes from 11 commits
6cf67cd
a16ee4f
5ef56d3
42819d9
7bec966
eb2ac77
28f9625
fe79663
0ac4cfa
1864854
15c5cd8
40db1d9
e3603e5
e065259
501ba62
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,4 @@ | ||
use futures::future::join_all; | ||
use futures::FutureExt; | ||
use std::sync::Arc; | ||
use tokio::task; | ||
|
||
|
@@ -8,21 +7,26 @@ use crate::vm::memory::HandlerMemory; | |
use crate::vm::opcode::OpcodeFn; | ||
use crate::vm::program::Program; | ||
use crate::vm::run::EVENT_TX; | ||
use crate::vm::InstrType; | ||
use crate::vm::InvalidState; | ||
use crate::vm::VMError; | ||
use crate::vm::VMResult; | ||
|
||
pub const NOP_ID: i64 = i64::MIN; | ||
|
||
#[derive(PartialEq, Eq, Hash)] | ||
#[repr(i64)] | ||
/// Special events in alan found in standard library modules, @std. | ||
/// The IDs for built-in events are negative to avoid collision with positive, custom event IDs. | ||
/// The first hexadecimal byte of the ID in an integer is between 80 and FF | ||
/// The remaining 7 bytes can be used for ASCII-like values | ||
pub enum BuiltInEvents { | ||
/// Alan application start | ||
/// '"start"' in ASCII or 2273 7461 7274 22(80) | ||
START, | ||
START = -9213673853036498142, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's going on here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. with this, we can use |
||
/// '__conn ' in ASCII or 5f5f 636f 6e6e 20(80) | ||
HTTPCONN, | ||
NOP, | ||
HTTPCONN = -9214243417005793441, | ||
NOP = NOP_ID, | ||
} | ||
|
||
impl From<BuiltInEvents> for i64 { | ||
|
@@ -204,25 +208,31 @@ impl HandlerFragment { | |
&mut self, | ||
mut hand_mem: Arc<HandlerMemory>, | ||
instrs: &Vec<Instruction>, | ||
) -> Arc<HandlerMemory> { | ||
) -> VMResult<Arc<HandlerMemory>> { | ||
task::block_in_place(move || { | ||
instrs.iter().for_each(|i| { | ||
if let OpcodeFn::Cpu(func) = i.opcode.fun { | ||
//eprintln!("{} {} {} {}", i.opcode._name, i.args[0], i.args[1], i.args[2]); | ||
let event = func(i.args.as_slice(), &mut hand_mem); | ||
if event.is_some() { | ||
let event_tx = EVENT_TX.get().unwrap(); | ||
let event_sent = event_tx.send(event.unwrap()); | ||
if event_sent.is_err() { | ||
eprintln!("Event transmission error"); | ||
std::process::exit(2); | ||
instrs | ||
.iter() | ||
.map(|i| { | ||
if let OpcodeFn::Cpu(func) = i.opcode.fun { | ||
//eprintln!("{} {} {} {}", i.opcode._name, i.args[0], i.args[1], i.args[2]); | ||
let event = func(i.args.as_slice(), &mut hand_mem); | ||
if let Some(event) = event? { | ||
let event_tx = EVENT_TX.get().unwrap(); | ||
let event_sent = event_tx.send(event); | ||
if event_sent.is_err() { | ||
eprintln!("Event transmission error"); | ||
std::process::exit(2); | ||
} | ||
} | ||
Ok(()) | ||
} else { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This whole branch is known (to us) to never happen because of the way the opcode fragments work. It would be nice to be able to assert this with the type during the fragment generation, perhaps having another enum for the "fragment type" wrapping around the vec and only allowing the particular opcode function enum we care about within each of these outer enums and being able to avoid this looping check here? Not something for this PR, but it does bother me that we have to define an error path here that will never be triggered and that we have to pay a price doing these useless checks. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think there's a way to unify the function types of the different opcode kinds, so then we can entirely get rid of these checks and only use the additional information for building up the instruction fragments. I imagine it'd be a struct with 2 fields, and running them would be much faster. The new opcode defs would look like: pub enum OpcodeKind {
CPU,
IO,
UnpredCPU,
}
pub struct Opcode {
name: &'static str,
kind: OpcodeKind,
exec: fn(&[i64], Arc<HandlerMemory>) -> Box<Pin<VMResult<Arc<HandlerMemory>>>>,
} so execution can be done with purely an iterator (simplified): handler.instructions
.iter()
.map(|fragment| {
fragment
.iter()
.map(|(op, instrs)| exec(&instrs, hmem.fork()))
.collect::<UnorderedFutures<VMResult<Vec<Arc<HandlerMemory>>>>>>()
})
.collect::<OrderedFutures<VMResult<Vec<Arc<HandlerMemory>>>>>() There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So the CPU-bound opcodes can return an event payload, they don't return the HandlerMemory. However, I think that weirdness is used only by the |
||
Err(VMError::InvalidState(InvalidState::UnexpectedInstruction( | ||
InstrType::CPU, | ||
))) | ||
} | ||
} else { | ||
eprintln!("expected another CPU instruction, found an IO instruction"); | ||
}; | ||
}); | ||
hand_mem | ||
}) | ||
.collect::<VMResult<Vec<_>>>()?; | ||
Ok(hand_mem) | ||
}) | ||
} | ||
|
||
|
@@ -231,15 +241,16 @@ impl HandlerFragment { | |
&mut self, | ||
hand_mem: Arc<HandlerMemory>, | ||
instrs: &Vec<Instruction>, | ||
) -> Arc<HandlerMemory> { | ||
) -> VMResult<Arc<HandlerMemory>> { | ||
// These instructions are always in groups by themselves | ||
let op = &instrs[0]; | ||
if let OpcodeFn::UnpredCpu(func) = op.opcode.fun { | ||
//eprintln!("{} {:?}", op.opcode._name, op.args); | ||
return func(op.args.clone(), hand_mem).await; | ||
} else { | ||
eprintln!("expected an UnpredCpu instruction"); | ||
std::process::exit(1); | ||
return Err(VMError::InvalidState(InvalidState::UnexpectedInstruction( | ||
InstrType::UnpredictableCPU, | ||
))); | ||
} | ||
} | ||
|
||
|
@@ -248,36 +259,44 @@ impl HandlerFragment { | |
&mut self, | ||
mut hand_mem: Arc<HandlerMemory>, | ||
instrs: &Vec<Instruction>, | ||
) -> Arc<HandlerMemory> { | ||
) -> VMResult<Arc<HandlerMemory>> { | ||
if instrs.len() == 1 { | ||
let op = &instrs[0]; | ||
if let OpcodeFn::Io(func) = op.opcode.fun { | ||
//eprintln!("{} {:?}", op.opcode._name, op.args); | ||
return func(op.args.clone(), hand_mem).await; | ||
} else { | ||
eprintln!("expected an IO instruction"); | ||
std::process::exit(1); | ||
return Err(VMError::InvalidState(InvalidState::UnexpectedInstruction( | ||
InstrType::IO, | ||
))); | ||
} | ||
} else { | ||
let futures: Vec<_> = instrs | ||
.iter() | ||
.map(|i| { | ||
if let OpcodeFn::Io(func) = i.opcode.fun { | ||
//eprintln!("{} {:?}", i.opcode._name, i.args); | ||
func(i.args.clone(), HandlerMemory::fork(hand_mem.clone())) | ||
.then(HandlerMemory::drop_parent_async) | ||
} else { | ||
eprintln!("expected another IO instruction"); | ||
std::process::exit(1); | ||
let hand_mem = hand_mem.clone(); | ||
async move { | ||
if let OpcodeFn::Io(func) = i.opcode.fun { | ||
//eprintln!("{} {:?}", i.opcode._name, i.args); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These comments are for future debugging? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes - I didn't remove it since it's been there for a while (and could prove useful in the future) |
||
let forked = HandlerMemory::fork(hand_mem.clone())?; | ||
let res = func(i.args.clone(), forked).await?; | ||
Ok(HandlerMemory::drop_parent(res)?) | ||
// Ok(func(i.args.clone(), HandlerMemory::fork(hand_mem.clone())?) | ||
// .then(|res| HandlerMemory::drop_parent_async).await) | ||
} else { | ||
Err(VMError::InvalidState(InvalidState::UnexpectedInstruction( | ||
InstrType::IO, | ||
))) | ||
} | ||
} | ||
}) | ||
.collect(); | ||
let hms = join_all(futures).await; | ||
for hm in hms.into_iter() { | ||
hand_mem.join(hm); | ||
for hm in hms { | ||
hand_mem.join(hm?)?; | ||
} | ||
} | ||
hand_mem | ||
Ok(hand_mem) | ||
} | ||
|
||
/// Runs the specified handler in Tokio tasks. Tokio tasks are allocated to a threadpool bound by | ||
|
@@ -293,29 +312,29 @@ impl HandlerFragment { | |
pub async fn run( | ||
mut self: HandlerFragment, | ||
mut hand_mem: Arc<HandlerMemory>, | ||
) -> Arc<HandlerMemory> { | ||
) -> VMResult<Arc<HandlerMemory>> { | ||
loop { | ||
let instrs = self.get_instruction_fragment(); | ||
hand_mem = match instrs[0].opcode.fun { | ||
OpcodeFn::Cpu(_) => self.run_cpu(hand_mem, instrs).await, | ||
OpcodeFn::UnpredCpu(_) => self.run_unpred_cpu(hand_mem, instrs).await, | ||
OpcodeFn::Io(_) => self.run_io(hand_mem, instrs).await, | ||
OpcodeFn::Cpu(_) => self.run_cpu(hand_mem, instrs).await?, | ||
OpcodeFn::UnpredCpu(_) => self.run_unpred_cpu(hand_mem, instrs).await?, | ||
OpcodeFn::Io(_) => self.run_io(hand_mem, instrs).await?, | ||
}; | ||
if let Some(frag) = self.get_next_fragment() { | ||
self = frag; | ||
} else { | ||
break; | ||
} | ||
} | ||
hand_mem | ||
Ok(hand_mem) | ||
} | ||
|
||
/// Spawns and runs a non-blocking tokio task for the fragment that can be awaited. | ||
/// Used to provide event and array level parallelism | ||
pub fn spawn( | ||
self: HandlerFragment, | ||
hand_mem: Arc<HandlerMemory>, | ||
) -> task::JoinHandle<Arc<HandlerMemory>> { | ||
) -> task::JoinHandle<VMResult<Arc<HandlerMemory>>> { | ||
task::spawn(async move { self.run(hand_mem).await }) | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -57,7 +57,7 @@ macro_rules! make_server { | |
let port_num = http.port; | ||
let addr = std::net::SocketAddr::from(([0, 0, 0, 0], port_num)); | ||
let make_svc = hyper::service::make_service_fn(|_conn| async { | ||
Ok::<_, std::convert::Infallible>(hyper::service::service_fn($listener)) | ||
Ok::<_, $crate::vm::VMError>(hyper::service::service_fn($listener)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why this change? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the service function could return a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So for the web server I would prefer it is infallible and if it encounters any |
||
}); | ||
|
||
let bind = hyper::server::Server::try_bind(&addr); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why exit code
2
and not1
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that was what we were using in the majority of cases where we did
eprintln!()
and thenexit()