Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor TriggerExecutor to have associated types for instances #2366

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions crates/trigger-http/src/handler.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{net::SocketAddr, str, str::FromStr};

use crate::{Body, HttpExecutor, HttpTrigger, Store};
use crate::{Body, HttpExecutor, HttpInstance, HttpTrigger, Store};
use anyhow::bail;
use anyhow::{anyhow, Context, Result};
use futures::TryFutureExt;
Expand All @@ -13,7 +13,7 @@ use spin_core::wasi_2023_10_18::exports::wasi::http::incoming_handler::Guest as
use spin_core::wasi_2023_11_10::exports::wasi::http::incoming_handler::Guest as IncomingHandler2023_11_10;
use spin_core::Instance;
use spin_http::body;
use spin_trigger::{EitherInstance, TriggerAppEngine};
use spin_trigger::TriggerAppEngine;
use spin_world::v1::http_types;
use std::sync::Arc;
use tokio::{sync::oneshot, task};
Expand All @@ -39,7 +39,7 @@ impl HttpExecutor for HttpHandlerExecutor {
);

let (instance, mut store) = engine.prepare_instance(component_id).await?;
let EitherInstance::Component(instance) = instance else {
let HttpInstance::Component(instance) = instance else {
unreachable!()
};

Expand Down
40 changes: 34 additions & 6 deletions crates/trigger-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use spin_http::{
routes::{RoutePattern, Router},
};
use spin_outbound_networking::{AllowedHostsConfig, OutboundUrl};
use spin_trigger::{EitherInstancePre, TriggerAppEngine, TriggerExecutor};
use spin_trigger::{TriggerAppEngine, TriggerExecutor, TriggerInstancePre};
use tokio::{
io::{AsyncRead, AsyncWrite},
net::TcpListener,
Expand Down Expand Up @@ -86,12 +86,23 @@ impl CliArgs {
}
}

pub enum HttpInstancePre {
Component(spin_core::InstancePre<RuntimeData>),
Module(spin_core::ModuleInstancePre<RuntimeData>),
}

pub enum HttpInstance {
Component(spin_core::Instance),
Module(spin_core::ModuleInstance),
}

#[async_trait]
impl TriggerExecutor for HttpTrigger {
const TRIGGER_TYPE: &'static str = "http";
type RuntimeData = RuntimeData;
type TriggerConfig = HttpTriggerConfig;
type RunConfig = CliArgs;
type InstancePre = HttpInstancePre;

async fn new(engine: TriggerAppEngine<Self>) -> Result<Self> {
let mut base = engine
Expand Down Expand Up @@ -167,20 +178,37 @@ impl TriggerExecutor for HttpTrigger {
};
Ok(())
}
}

#[async_trait]
impl TriggerInstancePre<RuntimeData, HttpTriggerConfig> for HttpInstancePre {
type Instance = HttpInstance;

async fn instantiate_pre(
engine: &Engine<Self::RuntimeData>,
engine: &Engine<RuntimeData>,
component: &AppComponent,
config: &Self::TriggerConfig,
) -> Result<EitherInstancePre<Self::RuntimeData>> {
config: &HttpTriggerConfig,
) -> Result<HttpInstancePre> {
if let Some(HttpExecutorType::Wagi(_)) = &config.executor {
let module = component.load_module(engine).await?;
Ok(EitherInstancePre::Module(
Ok(HttpInstancePre::Module(
engine.module_instantiate_pre(&module)?,
))
} else {
let comp = component.load_component(engine).await?;
Ok(EitherInstancePre::Component(engine.instantiate_pre(&comp)?))
Ok(HttpInstancePre::Component(engine.instantiate_pre(&comp)?))
}
}

async fn instantiate(&self, store: &mut Store) -> Result<HttpInstance> {
match self {
HttpInstancePre::Component(pre) => pre
.instantiate_async(store)
.await
.map(HttpInstance::Component),
HttpInstancePre::Module(pre) => {
pre.instantiate_async(store).await.map(HttpInstance::Module)
}
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions crates/trigger-http/src/wagi.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::{io::Cursor, net::SocketAddr};

use crate::HttpInstance;
use anyhow::{anyhow, ensure, Context, Result};
use async_trait::async_trait;
use http_body_util::BodyExt;
use hyper::{Request, Response};
use spin_core::WasiVersion;
use spin_http::{config::WagiTriggerConfig, routes::RoutePattern, wagi};
use spin_trigger::{EitherInstance, TriggerAppEngine};
use spin_trigger::TriggerAppEngine;
use wasi_common_preview1::{pipe::WritePipe, I32Exit};

use crate::{Body, HttpExecutor, HttpTrigger};
Expand Down Expand Up @@ -93,7 +94,7 @@ impl HttpExecutor for WagiHttpExecutor {
.prepare_instance_with_store(component, store_builder)
.await?;

let EitherInstance::Module(instance) = instance else {
let HttpInstance::Module(instance) = instance else {
unreachable!()
};

Expand Down
3 changes: 2 additions & 1 deletion crates/trigger-redis/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use futures::{future::join_all, StreamExt};
use redis::{Client, ConnectionLike};
use serde::{de::IgnoredAny, Deserialize, Serialize};
use spin_common::url::remove_credentials;
use spin_core::async_trait;
use spin_core::{async_trait, InstancePre};
use spin_trigger::{cli::NoArgs, TriggerAppEngine, TriggerExecutor};
use std::collections::HashMap;
use std::sync::Arc;
Expand Down Expand Up @@ -53,6 +53,7 @@ impl TriggerExecutor for RedisTrigger {
type RuntimeData = RuntimeData;
type TriggerConfig = RedisTriggerConfig;
type RunConfig = NoArgs;
type InstancePre = InstancePre<RuntimeData>;

async fn new(engine: TriggerAppEngine<Self>) -> Result<Self> {
let default_address: String = engine
Expand Down
5 changes: 1 addition & 4 deletions crates/trigger-redis/src/spin.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use spin_core::Instance;
use spin_trigger::{EitherInstance, TriggerAppEngine};
use spin_trigger::TriggerAppEngine;
use spin_world::v1::redis_types::{Error, Payload};

use crate::{RedisExecutor, RedisTrigger, Store};
Expand All @@ -21,9 +21,6 @@ impl RedisExecutor for SpinRedisExecutor {
tracing::trace!("Executing request using the Spin executor for component {component_id}");

let (instance, store) = engine.prepare_instance(component_id).await?;
let EitherInstance::Component(instance) = instance else {
unreachable!()
};

match Self::execute_impl(store, instance, channel, payload.to_vec()).await {
Ok(()) => {
Expand Down
1 change: 1 addition & 0 deletions crates/trigger/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ pub mod help {
type RuntimeData = ();
type TriggerConfig = ();
type RunConfig = NoArgs;
type InstancePre = spin_core::InstancePre<Self::RuntimeData>;
async fn new(_: crate::TriggerAppEngine<Self>) -> Result<Self> {
Ok(Self)
}
Expand Down
84 changes: 48 additions & 36 deletions crates/trigger/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,19 @@ use serde::de::DeserializeOwned;

use spin_app::{App, AppComponent, AppLoader, AppTrigger, Loader, OwnedApp, APP_NAME_KEY};
use spin_core::{
Config, Engine, EngineBuilder, Instance, InstancePre, ModuleInstance, ModuleInstancePre,
OutboundWasiHttpHandler, Store, StoreBuilder, WasiVersion,
Config, Engine, EngineBuilder, Instance, InstancePre, OutboundWasiHttpHandler, Store,
StoreBuilder, WasiVersion,
};

pub use crate::runtime_config::RuntimeConfig;

pub enum EitherInstancePre<T> {
Component(InstancePre<T>),
Module(ModuleInstancePre<T>),
}

pub enum EitherInstance {
Component(Instance),
Module(ModuleInstance),
}

#[async_trait]
pub trait TriggerExecutor: Sized + Send + Sync {
const TRIGGER_TYPE: &'static str;
type RuntimeData: OutboundWasiHttpHandler + Default + Send + Sync + 'static;
type TriggerConfig;
type RunConfig;
type InstancePre: TriggerInstancePre<Self::RuntimeData, Self::TriggerConfig>;

/// Create a new trigger executor.
async fn new(engine: TriggerAppEngine<Self>) -> Result<Self>;
Expand All @@ -46,18 +37,50 @@ pub trait TriggerExecutor: Sized + Send + Sync {
fn configure_engine(_builder: &mut EngineBuilder<Self::RuntimeData>) -> Result<()> {
Ok(())
}
}

/// Helper type alias to project the `Instance` of a given `TriggerExecutor`.
pub type ExecutorInstance<T> = <<T as TriggerExecutor>::InstancePre as TriggerInstancePre<
<T as TriggerExecutor>::RuntimeData,
<T as TriggerExecutor>::TriggerConfig,
>>::Instance;

#[async_trait]
pub trait TriggerInstancePre<T, C>: Sized + Send + Sync
where
T: OutboundWasiHttpHandler + Send + Sync,
{
type Instance;

async fn instantiate_pre(
engine: &Engine<Self::RuntimeData>,
engine: &Engine<T>,
component: &AppComponent,
_config: &Self::TriggerConfig,
) -> Result<EitherInstancePre<Self::RuntimeData>> {
config: &C,
) -> Result<Self>;

async fn instantiate(&self, store: &mut Store<T>) -> Result<Self::Instance>;
}

#[async_trait]
impl<T, C> TriggerInstancePre<T, C> for InstancePre<T>
where
T: OutboundWasiHttpHandler + Send + Sync,
{
type Instance = Instance;

async fn instantiate_pre(
engine: &Engine<T>,
component: &AppComponent,
_config: &C,
) -> Result<Self> {
let comp = component.load_component(engine).await?;
Ok(EitherInstancePre::Component(
engine
.instantiate_pre(&comp)
.with_context(|| format!("Failed to instantiate component '{}'", component.id()))?,
))
Ok(engine
.instantiate_pre(&comp)
.with_context(|| format!("Failed to instantiate component '{}'", component.id()))?)
}

async fn instantiate(&self, store: &mut Store<T>) -> Result<Self::Instance> {
self.instantiate_async(store).await
}
}

Expand Down Expand Up @@ -246,7 +269,7 @@ pub struct TriggerAppEngine<Executor: TriggerExecutor> {
// Trigger configs for this trigger type, with order matching `app.triggers_with_type(Executor::TRIGGER_TYPE)`
trigger_configs: Vec<Executor::TriggerConfig>,
// Map of {Component ID -> InstancePre} for each component.
component_instance_pres: HashMap<String, EitherInstancePre<Executor::RuntimeData>>,
component_instance_pres: HashMap<String, Executor::InstancePre>,
// Resolver for value template expressions
resolver: std::sync::Arc<spin_expressions::PreparedResolver>,
}
Expand Down Expand Up @@ -290,7 +313,7 @@ impl<Executor: TriggerExecutor> TriggerAppEngine<Executor> {
if let Some(config) = trigger_config {
component_instance_pres.insert(
id.to_owned(),
Executor::instantiate_pre(&engine, &component, config)
Executor::InstancePre::instantiate_pre(&engine, &component, config)
.await
.with_context(|| format!("Failed to instantiate component '{id}'"))?,
);
Expand Down Expand Up @@ -348,7 +371,7 @@ impl<Executor: TriggerExecutor> TriggerAppEngine<Executor> {
pub async fn prepare_instance(
&self,
component_id: &str,
) -> Result<(EitherInstance, Store<Executor::RuntimeData>)> {
) -> Result<(ExecutorInstance<Executor>, Store<Executor::RuntimeData>)> {
let store_builder = self.store_builder(component_id, WasiVersion::Preview2)?;
self.prepare_instance_with_store(component_id, store_builder)
.await
Expand All @@ -359,7 +382,7 @@ impl<Executor: TriggerExecutor> TriggerAppEngine<Executor> {
&self,
component_id: &str,
mut store_builder: StoreBuilder,
) -> Result<(EitherInstance, Store<Executor::RuntimeData>)> {
) -> Result<(ExecutorInstance<Executor>, Store<Executor::RuntimeData>)> {
let component = self.get_component(component_id)?;

// Build Store
Expand All @@ -372,18 +395,7 @@ impl<Executor: TriggerExecutor> TriggerAppEngine<Executor> {
.get(component_id)
.expect("component_instance_pres missing valid component_id");

let instance = match pre {
EitherInstancePre::Component(pre) => pre
.instantiate_async(&mut store)
.await
.map(EitherInstance::Component),

EitherInstancePre::Module(pre) => pre
.instantiate_async(&mut store)
.await
.map(EitherInstance::Module),
}
.with_context(|| {
let instance = pre.instantiate(&mut store).await.with_context(|| {
format!(
"app {:?} component {:?} instantiation failed",
self.app_name, component_id
Expand Down
9 changes: 4 additions & 5 deletions examples/spin-timer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use std::collections::HashMap;
use clap::Args;
use serde::{Deserialize, Serialize};
use spin_app::MetadataKey;
use spin_core::async_trait;
use spin_trigger::{EitherInstance, TriggerAppEngine, TriggerExecutor};
use spin_core::{async_trait, InstancePre};
use spin_trigger::{TriggerAppEngine, TriggerExecutor};

wasmtime::component::bindgen!({
path: ".",
Expand Down Expand Up @@ -62,6 +62,8 @@ impl TriggerExecutor for TimerTrigger {

type RunConfig = CliArgs;

type InstancePre = InstancePre<RuntimeData>;

async fn new(engine: spin_trigger::TriggerAppEngine<Self>) -> anyhow::Result<Self> {
let speedup = engine
.app()
Expand Down Expand Up @@ -119,9 +121,6 @@ impl TimerTrigger {
async fn handle_timer_event(&self, component_id: &str) -> anyhow::Result<()> {
// Load the guest...
let (instance, mut store) = self.engine.prepare_instance(component_id).await?;
let EitherInstance::Component(instance) = instance else {
unreachable!()
};
let instance = SpinTimer::new(&mut store, &instance)?;
// ...and call the entry point
instance.call_handle_timer_request(&mut store).await
Expand Down
Loading