Skip to content

Commit

Permalink
Resultify AVM (#496)
Browse files Browse the repository at this point in the history
* refactor VM part

* reduce allocs in agc loading, get rid of unnecessary if expr in telemetry logging

* change everything to Result

* finish refactor, mostly

* fix file loading

* finalize

* warnings cleanup

* get rid of all panics that can be avoided

* more error usage

* get rid of panics in vm::memory

* get rid of panics in opcodes, just have to fix http

* undo autogenerated file change for vm::protos

* suggestions from @dfellis

* http listener error handling, get rid of InvalidState error stuff

* cargo fmt
  • Loading branch information
cdmistman authored Apr 21, 2021
1 parent c3c42cd commit 2efa9b3
Show file tree
Hide file tree
Showing 9 changed files with 2,311 additions and 2,177 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ compiler-browser-check:
cd compiler && ../node_modules/.bin/pkg --targets host .

./avm/target/release/alan: compiler/alan-compile
cd avm && cargo fmt
cd avm && cargo build --release
cd avm && cargo fmt

./anycloud/cli/target/release/anycloud: compiler/alan-compile
cd anycloud/cli && cargo fmt
Expand Down Expand Up @@ -68,6 +68,6 @@ uninstall:
version:
./.version.sh $(version)

.PHONY: prerelease
.PHONY: prerelease
prerelease:
./.prerelease.sh $(version)
10 changes: 8 additions & 2 deletions avm/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ async fn compile_and_run(source_file: &str) -> i32 {
let mut path = env::current_dir().unwrap();
path.push(dest_file);
let fp = path.into_os_string().into_string().unwrap();
run_file(&fp, true).await;
if let Err(ee) = run_file(&fp, true).await {
eprintln!("{}", ee);
return 2;
};
}
return status_code;
}
Expand Down Expand Up @@ -131,7 +134,10 @@ fn main() {
agc_file
);
telemetry::log("avm-run").await;
run_file(&fp, false).await;
if let Err(ee) = run_file(&fp, false).await {
eprintln!("{}", ee);
std::process::exit(2);
};
}
("compile", Some(matches)) => {
let source_file = matches.value_of("INPUT").unwrap();
Expand Down
94 changes: 52 additions & 42 deletions avm/src/vm/event.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use futures::future::join_all;
use futures::FutureExt;
use std::sync::Arc;
use tokio::task;

Expand All @@ -8,21 +7,25 @@ use crate::vm::memory::HandlerMemory;
use crate::vm::opcode::OpcodeFn;
use crate::vm::program::Program;
use crate::vm::run::EVENT_TX;
use crate::vm::InstrType;
use crate::vm::VMError;
use crate::vm::VMResult;

pub const NOP_ID: i64 = i64::MIN;

#[derive(PartialEq, Eq, Hash)]
#[repr(i64)]
/// Special events in alan found in standard library modules, @std.
/// The IDs for built-in events are negative to avoid collision with positive, custom event IDs.
/// The first hexadecimal byte of the ID in an integer is between 80 and FF
/// The remaining 7 bytes can be used for ASCII-like values
pub enum BuiltInEvents {
/// Alan application start
/// '"start"' in ASCII or 2273 7461 7274 22(80)
START,
START = -9213673853036498142,
/// '__conn ' in ASCII or 5f5f 636f 6e6e 20(80)
HTTPCONN,
NOP,
HTTPCONN = -9214243417005793441,
NOP = NOP_ID,
}

impl From<BuiltInEvents> for i64 {
Expand Down Expand Up @@ -204,25 +207,29 @@ impl HandlerFragment {
&mut self,
mut hand_mem: Arc<HandlerMemory>,
instrs: &Vec<Instruction>,
) -> Arc<HandlerMemory> {
) -> VMResult<Arc<HandlerMemory>> {
task::block_in_place(move || {
instrs.iter().for_each(|i| {
if let OpcodeFn::Cpu(func) = i.opcode.fun {
//eprintln!("{} {} {} {}", i.opcode._name, i.args[0], i.args[1], i.args[2]);
let event = func(i.args.as_slice(), &mut hand_mem);
if event.is_some() {
let event_tx = EVENT_TX.get().unwrap();
let event_sent = event_tx.send(event.unwrap());
if event_sent.is_err() {
eprintln!("Event transmission error");
std::process::exit(2);
instrs
.iter()
.map(|i| {
if let OpcodeFn::Cpu(func) = i.opcode.fun {
//eprintln!("{} {} {} {}", i.opcode._name, i.args[0], i.args[1], i.args[2]);
let event = func(i.args.as_slice(), &mut hand_mem);
if let Some(event) = event? {
let event_tx = EVENT_TX.get().unwrap();
let event_sent = event_tx.send(event);
if event_sent.is_err() {
eprintln!("Event transmission error");
std::process::exit(2);
}
}
Ok(())
} else {
Err(VMError::UnexpectedInstruction(InstrType::CPU))
}
} else {
eprintln!("expected another CPU instruction, found an IO instruction");
};
});
hand_mem
})
.collect::<VMResult<Vec<_>>>()?;
Ok(hand_mem)
})
}

Expand All @@ -231,15 +238,14 @@ impl HandlerFragment {
&mut self,
hand_mem: Arc<HandlerMemory>,
instrs: &Vec<Instruction>,
) -> Arc<HandlerMemory> {
) -> VMResult<Arc<HandlerMemory>> {
// These instructions are always in groups by themselves
let op = &instrs[0];
if let OpcodeFn::UnpredCpu(func) = op.opcode.fun {
//eprintln!("{} {:?}", op.opcode._name, op.args);
return func(op.args.clone(), hand_mem).await;
} else {
eprintln!("expected an UnpredCpu instruction");
std::process::exit(1);
return Err(VMError::UnexpectedInstruction(InstrType::UnpredictableCPU));
}
}

Expand All @@ -248,36 +254,40 @@ impl HandlerFragment {
&mut self,
mut hand_mem: Arc<HandlerMemory>,
instrs: &Vec<Instruction>,
) -> Arc<HandlerMemory> {
) -> VMResult<Arc<HandlerMemory>> {
if instrs.len() == 1 {
let op = &instrs[0];
if let OpcodeFn::Io(func) = op.opcode.fun {
//eprintln!("{} {:?}", op.opcode._name, op.args);
return func(op.args.clone(), hand_mem).await;
} else {
eprintln!("expected an IO instruction");
std::process::exit(1);
return Err(VMError::UnexpectedInstruction(InstrType::IO));
}
} else {
let futures: Vec<_> = instrs
.iter()
.map(|i| {
if let OpcodeFn::Io(func) = i.opcode.fun {
//eprintln!("{} {:?}", i.opcode._name, i.args);
func(i.args.clone(), HandlerMemory::fork(hand_mem.clone()))
.then(HandlerMemory::drop_parent_async)
} else {
eprintln!("expected another IO instruction");
std::process::exit(1);
let hand_mem = hand_mem.clone();
async move {
if let OpcodeFn::Io(func) = i.opcode.fun {
//eprintln!("{} {:?}", i.opcode._name, i.args);
let forked = HandlerMemory::fork(hand_mem.clone())?;
let res = func(i.args.clone(), forked).await?;
Ok(HandlerMemory::drop_parent(res)?)
// Ok(func(i.args.clone(), HandlerMemory::fork(hand_mem.clone())?)
// .then(|res| HandlerMemory::drop_parent_async).await)
} else {
Err(VMError::UnexpectedInstruction(InstrType::IO))
}
}
})
.collect();
let hms = join_all(futures).await;
for hm in hms.into_iter() {
hand_mem.join(hm);
for hm in hms {
hand_mem.join(hm?)?;
}
}
hand_mem
Ok(hand_mem)
}

/// Runs the specified handler in Tokio tasks. Tokio tasks are allocated to a threadpool bound by
Expand All @@ -293,29 +303,29 @@ impl HandlerFragment {
pub async fn run(
mut self: HandlerFragment,
mut hand_mem: Arc<HandlerMemory>,
) -> Arc<HandlerMemory> {
) -> VMResult<Arc<HandlerMemory>> {
loop {
let instrs = self.get_instruction_fragment();
hand_mem = match instrs[0].opcode.fun {
OpcodeFn::Cpu(_) => self.run_cpu(hand_mem, instrs).await,
OpcodeFn::UnpredCpu(_) => self.run_unpred_cpu(hand_mem, instrs).await,
OpcodeFn::Io(_) => self.run_io(hand_mem, instrs).await,
OpcodeFn::Cpu(_) => self.run_cpu(hand_mem, instrs).await?,
OpcodeFn::UnpredCpu(_) => self.run_unpred_cpu(hand_mem, instrs).await?,
OpcodeFn::Io(_) => self.run_io(hand_mem, instrs).await?,
};
if let Some(frag) = self.get_next_fragment() {
self = frag;
} else {
break;
}
}
hand_mem
Ok(hand_mem)
}

/// Spawns and runs a non-blocking tokio task for the fragment that can be awaited.
/// Used to provide event and array level parallelism
pub fn spawn(
self: HandlerFragment,
hand_mem: Arc<HandlerMemory>,
) -> task::JoinHandle<Arc<HandlerMemory>> {
) -> task::JoinHandle<VMResult<Arc<HandlerMemory>>> {
task::spawn(async move { self.run(hand_mem).await })
}
}
Expand Down
Loading

0 comments on commit 2efa9b3

Please sign in to comment.