Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resultify AVM #496

Merged
merged 15 commits into from
Apr 21, 2021
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ compiler-browser-check:
cd compiler && ../node_modules/.bin/pkg --targets host .

./avm/target/release/alan: compiler/alan-compile
cd avm && cargo fmt
cd avm && cargo build --release
cd avm && cargo fmt

./anycloud/cli/target/release/anycloud: compiler/alan-compile
cd anycloud/cli && cargo fmt
Expand Down Expand Up @@ -68,6 +68,6 @@ uninstall:
version:
./.version.sh $(version)

.PHONY: prerelease
.PHONY: prerelease
prerelease:
./.prerelease.sh $(version)
10 changes: 8 additions & 2 deletions avm/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ async fn compile_and_run(source_file: &str) -> i32 {
let mut path = env::current_dir().unwrap();
path.push(dest_file);
let fp = path.into_os_string().into_string().unwrap();
run_file(&fp, true).await;
if let Err(ee) = run_file(&fp, true).await {
eprintln!("{}", ee);
return 2;
};
}
return status_code;
}
Expand Down Expand Up @@ -131,7 +134,10 @@ fn main() {
agc_file
);
telemetry::log("avm-run").await;
run_file(&fp, false).await;
if let Err(ee) = run_file(&fp, false).await {
eprintln!("{}", ee);
std::process::exit(2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why exit code 2 and not 1?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that was what we were using in the majority of cases where we did eprintln!() and then exit()

};
}
("compile", Some(matches)) => {
let source_file = matches.value_of("INPUT").unwrap();
Expand Down
103 changes: 61 additions & 42 deletions avm/src/vm/event.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use futures::future::join_all;
use futures::FutureExt;
use std::sync::Arc;
use tokio::task;

Expand All @@ -8,21 +7,26 @@ use crate::vm::memory::HandlerMemory;
use crate::vm::opcode::OpcodeFn;
use crate::vm::program::Program;
use crate::vm::run::EVENT_TX;
use crate::vm::InstrType;
use crate::vm::InvalidState;
use crate::vm::VMError;
use crate::vm::VMResult;

pub const NOP_ID: i64 = i64::MIN;

#[derive(PartialEq, Eq, Hash)]
#[repr(i64)]
/// Special events in alan found in standard library modules, @std.
/// The IDs for built-in events are negative to avoid collision with positive, custom event IDs.
/// The first hexadecimal byte of the ID in an integer is between 80 and FF
/// The remaining 7 bytes can be used for ASCII-like values
pub enum BuiltInEvents {
/// Alan application start
/// '"start"' in ASCII or 2273 7461 7274 22(80)
START,
START = -9213673853036498142,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's going on here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with this, we can use BuiltInEvents::START as i64

/// '__conn ' in ASCII or 5f5f 636f 6e6e 20(80)
HTTPCONN,
NOP,
HTTPCONN = -9214243417005793441,
NOP = NOP_ID,
}

impl From<BuiltInEvents> for i64 {
Expand Down Expand Up @@ -204,25 +208,31 @@ impl HandlerFragment {
&mut self,
mut hand_mem: Arc<HandlerMemory>,
instrs: &Vec<Instruction>,
) -> Arc<HandlerMemory> {
) -> VMResult<Arc<HandlerMemory>> {
task::block_in_place(move || {
instrs.iter().for_each(|i| {
if let OpcodeFn::Cpu(func) = i.opcode.fun {
//eprintln!("{} {} {} {}", i.opcode._name, i.args[0], i.args[1], i.args[2]);
let event = func(i.args.as_slice(), &mut hand_mem);
if event.is_some() {
let event_tx = EVENT_TX.get().unwrap();
let event_sent = event_tx.send(event.unwrap());
if event_sent.is_err() {
eprintln!("Event transmission error");
std::process::exit(2);
instrs
.iter()
.map(|i| {
if let OpcodeFn::Cpu(func) = i.opcode.fun {
//eprintln!("{} {} {} {}", i.opcode._name, i.args[0], i.args[1], i.args[2]);
let event = func(i.args.as_slice(), &mut hand_mem);
if let Some(event) = event? {
let event_tx = EVENT_TX.get().unwrap();
let event_sent = event_tx.send(event);
if event_sent.is_err() {
eprintln!("Event transmission error");
std::process::exit(2);
}
}
Ok(())
} else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole branch is known (to us) to never happen because of the way the opcode fragments work. It would be nice to be able to assert this with the type during the fragment generation, perhaps having another enum for the "fragment type" wrapping around the vec and only allowing the particular opcode function enum we care about within each of these outer enums and being able to avoid this looping check here?

Not something for this PR, but it does bother me that we have to define an error path here that will never be triggered and that we have to pay a price doing these useless checks.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a way to unify the function types of the different opcode kinds, so then we can entirely get rid of these checks and only use the additional information for building up the instruction fragments. I imagine it'd be a struct with 2 fields, and running them would be much faster.

The new opcode defs would look like:

pub enum OpcodeKind {
  CPU,
  IO,
  UnpredCPU,
}
pub struct Opcode {
  name: &'static str,
  kind: OpcodeKind,
  exec: fn(&[i64], Arc<HandlerMemory>) -> Box<Pin<VMResult<Arc<HandlerMemory>>>>,
}

so execution can be done with purely an iterator (simplified):

handler.instructions
  .iter()
  .map(|fragment| {
    fragment
      .iter()
      .map(|(op, instrs)| exec(&instrs, hmem.fork()))
      .collect::<UnorderedFutures<VMResult<Vec<Arc<HandlerMemory>>>>>>()
  })
  .collect::<OrderedFutures<VMResult<Vec<Arc<HandlerMemory>>>>>()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the CPU-bound opcodes can return an event payload, they don't return the HandlerMemory.

However, I think that weirdness is used only by the emitop opcode, so if we can refactor things to let event emission be done specially by that opcode and not require checking the return value, that'd be great. But what I'd prefer is that CPU opcodes still have a special return type, it's just that it's now void and we don't have to do any conditional checking on every single CPU opcode call, which is probably a significant perf impact.

Err(VMError::InvalidState(InvalidState::UnexpectedInstruction(
InstrType::CPU,
)))
}
} else {
eprintln!("expected another CPU instruction, found an IO instruction");
};
});
hand_mem
})
.collect::<VMResult<Vec<_>>>()?;
Ok(hand_mem)
})
}

Expand All @@ -231,15 +241,16 @@ impl HandlerFragment {
&mut self,
hand_mem: Arc<HandlerMemory>,
instrs: &Vec<Instruction>,
) -> Arc<HandlerMemory> {
) -> VMResult<Arc<HandlerMemory>> {
// These instructions are always in groups by themselves
let op = &instrs[0];
if let OpcodeFn::UnpredCpu(func) = op.opcode.fun {
//eprintln!("{} {:?}", op.opcode._name, op.args);
return func(op.args.clone(), hand_mem).await;
} else {
eprintln!("expected an UnpredCpu instruction");
std::process::exit(1);
return Err(VMError::InvalidState(InvalidState::UnexpectedInstruction(
InstrType::UnpredictableCPU,
)));
}
}

Expand All @@ -248,36 +259,44 @@ impl HandlerFragment {
&mut self,
mut hand_mem: Arc<HandlerMemory>,
instrs: &Vec<Instruction>,
) -> Arc<HandlerMemory> {
) -> VMResult<Arc<HandlerMemory>> {
if instrs.len() == 1 {
let op = &instrs[0];
if let OpcodeFn::Io(func) = op.opcode.fun {
//eprintln!("{} {:?}", op.opcode._name, op.args);
return func(op.args.clone(), hand_mem).await;
} else {
eprintln!("expected an IO instruction");
std::process::exit(1);
return Err(VMError::InvalidState(InvalidState::UnexpectedInstruction(
InstrType::IO,
)));
}
} else {
let futures: Vec<_> = instrs
.iter()
.map(|i| {
if let OpcodeFn::Io(func) = i.opcode.fun {
//eprintln!("{} {:?}", i.opcode._name, i.args);
func(i.args.clone(), HandlerMemory::fork(hand_mem.clone()))
.then(HandlerMemory::drop_parent_async)
} else {
eprintln!("expected another IO instruction");
std::process::exit(1);
let hand_mem = hand_mem.clone();
async move {
if let OpcodeFn::Io(func) = i.opcode.fun {
//eprintln!("{} {:?}", i.opcode._name, i.args);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These comments are for future debugging?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes - I didn't remove it since it's been there for a while (and could prove useful in the future)

let forked = HandlerMemory::fork(hand_mem.clone())?;
let res = func(i.args.clone(), forked).await?;
Ok(HandlerMemory::drop_parent(res)?)
// Ok(func(i.args.clone(), HandlerMemory::fork(hand_mem.clone())?)
// .then(|res| HandlerMemory::drop_parent_async).await)
} else {
Err(VMError::InvalidState(InvalidState::UnexpectedInstruction(
InstrType::IO,
)))
}
}
})
.collect();
let hms = join_all(futures).await;
for hm in hms.into_iter() {
hand_mem.join(hm);
for hm in hms {
hand_mem.join(hm?)?;
}
}
hand_mem
Ok(hand_mem)
}

/// Runs the specified handler in Tokio tasks. Tokio tasks are allocated to a threadpool bound by
Expand All @@ -293,29 +312,29 @@ impl HandlerFragment {
pub async fn run(
mut self: HandlerFragment,
mut hand_mem: Arc<HandlerMemory>,
) -> Arc<HandlerMemory> {
) -> VMResult<Arc<HandlerMemory>> {
loop {
let instrs = self.get_instruction_fragment();
hand_mem = match instrs[0].opcode.fun {
OpcodeFn::Cpu(_) => self.run_cpu(hand_mem, instrs).await,
OpcodeFn::UnpredCpu(_) => self.run_unpred_cpu(hand_mem, instrs).await,
OpcodeFn::Io(_) => self.run_io(hand_mem, instrs).await,
OpcodeFn::Cpu(_) => self.run_cpu(hand_mem, instrs).await?,
OpcodeFn::UnpredCpu(_) => self.run_unpred_cpu(hand_mem, instrs).await?,
OpcodeFn::Io(_) => self.run_io(hand_mem, instrs).await?,
};
if let Some(frag) = self.get_next_fragment() {
self = frag;
} else {
break;
}
}
hand_mem
Ok(hand_mem)
}

/// Spawns and runs a non-blocking tokio task for the fragment that can be awaited.
/// Used to provide event and array level parallelism
pub fn spawn(
self: HandlerFragment,
hand_mem: Arc<HandlerMemory>,
) -> task::JoinHandle<Arc<HandlerMemory>> {
) -> task::JoinHandle<VMResult<Arc<HandlerMemory>>> {
task::spawn(async move { self.run(hand_mem).await })
}
}
Expand Down
2 changes: 1 addition & 1 deletion avm/src/vm/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ macro_rules! make_server {
let port_num = http.port;
let addr = std::net::SocketAddr::from(([0, 0, 0, 0], port_num));
let make_svc = hyper::service::make_service_fn(|_conn| async {
Ok::<_, std::convert::Infallible>(hyper::service::service_fn($listener))
Ok::<_, $crate::vm::VMError>(hyper::service::service_fn($listener))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the service function could return a VMError since it has to work with the memory of the event that handles each request, so I allow the hyper service to propagate such errors - although we could handle the error directly in the service function?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So for the web server I would prefer it is infallible and if it encounters any VMError responses it returns a 500 Internal Server Error to the end user with some sort of short message indicating an unexpected failure within the AVM itself? Simply crashing out will make it harder to track down what's going wrong, I think.

});

let bind = hyper::server::Server::try_bind(&addr);
Expand Down
Loading