Skip to content

Commit

Permalink
Move memory store cloning into the wasmer crate
Browse files Browse the repository at this point in the history
  • Loading branch information
syrusakbary committed Mar 16, 2023
1 parent c00f1fd commit a5500fb
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 90 deletions.
15 changes: 15 additions & 0 deletions lib/api/src/externals/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,21 @@ impl Memory {
self.0.try_clone(store)
}

/// Attempts to clone this memory (if its clonable) in a new store
pub fn duplicate_in_store(
&self,
store: &impl AsStoreRef,
new_store: &mut impl AsStoreMut,
) -> Option<Self> {
if !self.ty(store).shared {
// We should only be able to duplicate in a new store if the memory is shared
return None;
}
self.try_clone(&store)
.and_then(|mut memory| memory.duplicate().ok())
.map(|new_memory| Self::new_from_existing(new_store, new_memory.into()))
}

/// To `VMExtern`.
pub(crate) fn to_vm_extern(&self) -> VMExtern {
self.0.to_vm_extern()
Expand Down
19 changes: 4 additions & 15 deletions lib/wasi/src/bin_factory/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ use crate::{
};
use futures::Future;
use tracing::*;
#[cfg(feature = "sys")]
use wasmer::NativeEngineExt;
use wasmer::{FunctionEnvMut, Instance, Memory, Module, Store};
use wasmer_wasix_types::wasi::{Errno, ExitCode};

Expand Down Expand Up @@ -82,15 +80,7 @@ pub fn spawn_exec_module(

// Determine if we are going to create memory and import it or just rely on self creation of memory
let memory_spawn = match shared_memory {
Some(ty) => {
#[cfg(feature = "sys")]
let style = store.engine().tunables().memory_style(&ty);
SpawnType::CreateWithType(SpawnedMemory {
ty,
#[cfg(feature = "sys")]
style,
})
}
Some(ty) => SpawnType::CreateWithType(SpawnedMemory { ty }),
None => SpawnType::Create,
};

Expand All @@ -99,7 +89,7 @@ pub fn spawn_exec_module(
let tasks_outer = tasks.clone();

let task = {
move |mut store, module, memory| {
move |mut store, module, memory: Option<Memory>| {
// Create the WasiFunctionEnv
let mut wasi_env = env;
wasi_env.runtime = runtime;
Expand All @@ -111,9 +101,8 @@ pub fn spawn_exec_module(
let (mut import_object, init) =
import_object_for_all_wasi_versions(&module, &mut store, &wasi_env.env);
let imported_memory = if let Some(memory) = memory {
let imported_memory = Memory::new_from_existing(&mut store, memory);
import_object.define("env", "memory", imported_memory.clone());
Some(imported_memory)
import_object.define("env", "memory", memory.clone());
Some(memory)
} else {
None
};
Expand Down
19 changes: 8 additions & 11 deletions lib/wasi/src/runtime/task_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,20 @@ use std::{pin::Pin, time::Duration};

use ::tokio::runtime::Handle;
use futures::Future;
use wasmer::vm::VMMemory;
use wasmer::{MemoryType, Module, Store};

#[cfg(feature = "sys")]
use wasmer_types::MemoryStyle;
use wasmer::{Memory, MemoryType, Module, Store, StoreMut};

use crate::os::task::thread::WasiThreadError;

#[derive(Debug)]
pub struct SpawnedMemory {
pub ty: MemoryType,
// TODO: don't put behind a feature (Option<MemoryStyle>?)
#[cfg(feature = "sys")]
pub style: MemoryStyle,
}

#[derive(Debug)]
pub enum SpawnType {
Create,
CreateWithType(SpawnedMemory),
NewThread(VMMemory),
NewThread(Memory),
}

/// An implementation of task management
Expand All @@ -36,7 +29,11 @@ pub trait VirtualTaskManager: std::fmt::Debug + Send + Sync + 'static {
/// Build a new Webassembly memory.
///
/// May return `None` if the memory can just be auto-constructed.
fn build_memory(&self, spawn_type: SpawnType) -> Result<Option<VMMemory>, WasiThreadError>;
fn build_memory(
&self,
store: &mut StoreMut,
spawn_type: SpawnType,
) -> Result<Option<Memory>, WasiThreadError>;

/// Invokes whenever a WASM thread goes idle. In some runtimes (like singlethreaded
/// execution environments) they will need to do asynchronous work whenever the main
Expand Down Expand Up @@ -64,7 +61,7 @@ pub trait VirtualTaskManager: std::fmt::Debug + Send + Sync + 'static {
/// It is ok for this task to block execution and any async futures within its scope
fn task_wasm(
&self,
task: Box<dyn FnOnce(Store, Module, Option<VMMemory>) + Send + 'static>,
task: Box<dyn FnOnce(Store, Module, Option<Memory>) + Send + 'static>,
store: Store,
module: Module,
spawn_type: SpawnType,
Expand Down
26 changes: 11 additions & 15 deletions lib/wasi/src/runtime/task_manager/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::pin::Pin;
use futures::Future;
#[cfg(feature = "sys-thread")]
use tokio::runtime::{Builder, Runtime};
use wasmer::{vm::VMMemory, Module, Store};
use wasmer::{Memory, Module, Store};

use crate::{WasiCallingId, WasiThreadError};

Expand Down Expand Up @@ -79,7 +79,7 @@ impl VirtualTaskManager for ThreadTaskManager {
/// It is ok for this task to block execution and any async futures within its scope
fn task_wasm(
&self,
task: Box<dyn FnOnce(Store, Module, Option<VMMemory>) + Send + 'static>,
task: Box<dyn FnOnce(Store, Module, Option<Memory>) + Send + 'static>,
store: Store,
module: Module,
spawn_type: SpawnType,
Expand Down Expand Up @@ -152,22 +152,18 @@ impl VirtualTaskManager for ThreadTaskManager {
/// See [`VirtualTaskManager::enter`].
fn task_wasm(
&self,
task: Box<dyn FnOnce(Store, Module, Option<VMMemory>) + Send + 'static>,
store: Store,
task: Box<dyn FnOnce(Store, Module, Option<Memory>) + Send + 'static>,
mut store: Store,
module: Module,
spawn_type: SpawnType,
) -> Result<(), WasiThreadError> {
use wasmer::vm::VMSharedMemory;

let memory: Option<VMMemory> = match spawn_type {
SpawnType::CreateWithType(mem) => Some(
VMSharedMemory::new(&mem.ty, &mem.style)
.map_err(|err| {
tracing::error!("failed to create memory - {}", err);
})
.unwrap()
.into(),
),
let memory: Option<Memory> = match spawn_type {
SpawnType::CreateWithType(mut mem) => {
mem.ty.shared = true;
Some(
Memory::new(&mut store, mem.ty),
)
},
SpawnType::NewThread(mem) => Some(mem),
SpawnType::Create => None,
};
Expand Down
32 changes: 18 additions & 14 deletions lib/wasi/src/runtime/task_manager/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@ use std::{pin::Pin, time::Duration};

use futures::Future;
use tokio::runtime::Handle;
use wasmer::{
vm::{VMMemory, VMSharedMemory},
Module, Store,
};
use wasmer::{AsStoreMut, Memory, Module, Store, StoreMut};

use crate::os::task::thread::WasiThreadError;

Expand Down Expand Up @@ -73,14 +70,21 @@ impl<'g> Drop for TokioRuntimeGuard<'g> {

#[async_trait::async_trait]
impl VirtualTaskManager for TokioTaskManager {
fn build_memory(&self, spawn_type: SpawnType) -> Result<Option<VMMemory>, WasiThreadError> {
fn build_memory(
&self,
store: &mut StoreMut,
spawn_type: SpawnType,
) -> Result<Option<Memory>, WasiThreadError> {
match spawn_type {
SpawnType::CreateWithType(mem) => VMSharedMemory::new(&mem.ty, &mem.style)
.map_err(|err| {
tracing::error!("could not create memory: {err}");
WasiThreadError::MemoryCreateFailed
})
.map(|m| Some(m.into())),
SpawnType::CreateWithType(mut mem) => {
mem.ty.shared = true;
Memory::new(store, mem.ty)
.map_err(|err| {
tracing::error!("could not create memory: {err}");
WasiThreadError::MemoryCreateFailed
})
.map(|m| Some(m))
}
SpawnType::NewThread(mem) => Ok(Some(mem)),
SpawnType::Create => Ok(None),
}
Expand Down Expand Up @@ -125,12 +129,12 @@ impl VirtualTaskManager for TokioTaskManager {
/// See [`VirtualTaskManager::enter`].
fn task_wasm(
&self,
task: Box<dyn FnOnce(Store, Module, Option<VMMemory>) + Send + 'static>,
store: Store,
task: Box<dyn FnOnce(Store, Module, Option<Memory>) + Send + 'static>,
mut store: Store,
module: Module,
spawn_type: SpawnType,
) -> Result<(), WasiThreadError> {
let memory = self.build_memory(spawn_type)?;
let memory = self.build_memory(&mut store.as_store_mut(), spawn_type)?;
self.0.spawn_blocking(move || {
// Invoke the callback
task(store, module, memory);
Expand Down
21 changes: 5 additions & 16 deletions lib/wasi/src/state/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@ use rand::Rng;
use tracing::{trace, warn};
use virtual_fs::{FsError, VirtualFile};
use virtual_net::DynVirtualNetworking;
#[cfg(feature = "sys")]
use wasmer::NativeEngineExt;
use wasmer::{
AsStoreMut, AsStoreRef, FunctionEnvMut, Global, Instance, Memory, MemoryView, Module,
AsStoreMut, AsStoreRef, FunctionEnvMut, Global, Instance, Memory, MemoryView, Module, StoreMut,
TypedFunction,
};
use wasmer_wasix_types::{
Expand Down Expand Up @@ -437,28 +435,19 @@ impl WasiEnv {
t
} else {
match shared_memory {
Some(ty) => {
#[cfg(feature = "sys")]
let style = store.engine().tunables().memory_style(&ty);
SpawnType::CreateWithType(SpawnedMemory {
ty,
#[cfg(feature = "sys")]
style,
})
}
Some(ty) => SpawnType::CreateWithType(SpawnedMemory { ty }),
None => SpawnType::Create,
}
};
let memory = tasks.build_memory(spawn_type)?;
let memory = tasks.build_memory(&mut store, spawn_type)?;

// Let's instantiate the module with the imports.
let (mut import_object, instance_init_callback) =
import_object_for_all_wasi_versions(&module, &mut store, &func_env.env);

let imported_memory = if let Some(memory) = memory {
let imported_memory = Memory::new_from_existing(&mut store, memory);
import_object.define("env", "memory", imported_memory.clone());
Some(imported_memory)
import_object.define("env", "memory", memory.clone());
Some(memory)
} else {
None
};
Expand Down
15 changes: 6 additions & 9 deletions lib/wasi/src/syscalls/wasix/proc_fork.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::*;
use crate::{os::task::OwnedTaskStatus, syscalls::*};

use wasmer::vm::VMMemory;
use wasmer::Memory;

/// ### `proc_fork()`
/// Forks the current process into a new subprocess. If the function
Expand Down Expand Up @@ -115,11 +115,12 @@ pub fn proc_fork<M: MemorySize>(

// Fork the memory and copy the module (compiled code)
let env = ctx.data();
let fork_memory: VMMemory = match env
let mut fork_store = ctx.data().runtime.new_store();
let fork_module = env.inner().instance.module().clone();
let fork_memory: Memory = match env
.memory()
.try_clone(&ctx)
.duplicate_in_store(&ctx, &mut fork_store)
.ok_or_else(|| MemoryError::Generic("the memory could not be cloned".to_string()))
.and_then(|mut memory| memory.duplicate())
{
Ok(memory) => memory.into(),
Err(err) => {
Expand All @@ -129,9 +130,6 @@ pub fn proc_fork<M: MemorySize>(
return OnCalledAction::Trap(Box::new(WasiError::Exit(Errno::Fault as u32)));
}
};
let fork_module = env.inner().instance.module().clone();

let mut fork_store = ctx.data().runtime.new_store();

// Now we use the environment and memory references
let runtime = child_env.runtime.clone();
Expand All @@ -152,7 +150,7 @@ pub fn proc_fork<M: MemorySize>(
let module = fork_module;

let spawn_type = SpawnType::NewThread(fork_memory);
let task = move |mut store, module, memory| {
let task = move |mut store, module, memory: Option<Memory>| {
// Create the WasiFunctionEnv
let pid = child_env.pid();
let tid = child_env.tid();
Expand All @@ -164,7 +162,6 @@ pub fn proc_fork<M: MemorySize>(
let (mut import_object, init) =
import_object_for_all_wasi_versions(&module, &mut store, &ctx.env);
let memory = if let Some(memory) = memory {
let memory = Memory::new_from_existing(&mut store, memory);
import_object.define("env", "memory", memory.clone());
memory
} else {
Expand Down
23 changes: 13 additions & 10 deletions lib/wasi/src/syscalls/wasix/thread_spawn.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::*;
use crate::syscalls::*;

use wasmer::vm::VMMemory;
use wasmer::Memory;

/// ### `thread_spawn()`
/// Creates a new thread by spawning that shares the same
Expand Down Expand Up @@ -53,25 +53,28 @@ pub fn thread_spawn<M: MemorySize>(
let thread_id: Tid = thread_handle.id().into();
Span::current().record("tid", thread_id);

let mut store = ctx.data().runtime.new_store();

// We need a copy of the process memory and a packaged store in order to
// launch threads and reactors
let thread_memory = wasi_try!(ctx.data().memory().try_clone(&ctx).ok_or_else(|| {
error!("failed - the memory could not be cloned");
Errno::Notcapable
}));

let mut store = ctx.data().runtime.new_store();
let thread_memory = wasi_try!(ctx
.data()
.memory()
.duplicate_in_store(&ctx, &mut store)
.ok_or_else(|| {
error!("failed - the memory could not be cloned");
Errno::Notcapable
}));

// This function takes in memory and a store and creates a context that
// can be used to call back into the process
let create_ctx = {
let state = env.state.clone();
let wasi_env = env.duplicate();
let thread = thread_handle.as_thread();
move |mut store: Store, module: Module, memory: VMMemory| {
move |mut store: Store, module: Module, memory: Memory| {
// We need to reconstruct some things
let module = module;
let memory = Memory::new_from_existing(&mut store, memory);

// Build the context object and import the memory
let mut ctx = WasiFunctionEnv::new(&mut store, wasi_env.duplicate());
Expand Down Expand Up @@ -158,7 +161,7 @@ pub fn thread_spawn<M: MemorySize>(
// calls into the process
let mut execute_module = {
let state = env.state.clone();
move |store: &mut Option<Store>, module: Module, memory: &mut Option<VMMemory>| {
move |store: &mut Option<Store>, module: Module, memory: &mut Option<Memory>| {
// We capture the thread handle here, it is used to notify
// anyone that is interested when this thread has terminated
let _captured_handle = Box::new(&mut thread_handle);
Expand Down

0 comments on commit a5500fb

Please sign in to comment.