From 5fedbfc29e5957a3b92d1b706865bb50b075fac1 Mon Sep 17 00:00:00 2001 From: Jarrod Overson Date: Mon, 21 Jun 2021 23:10:00 -0400 Subject: [PATCH] first working pass at GrpcUrlProviders --- Cargo.lock | 34 +- Cargo.toml | 1 + crates/vino-guest/src/lib.rs | 1 + crates/vino-host/Cargo.toml | 3 +- crates/vino-host/src/error.rs | 2 + crates/vino-host/src/host.rs | 6 +- crates/vino-provider-test/schemas/test.widl | 2 +- .../src/components/generated/test.rs | 14 +- .../vino-provider-test/src/components/mod.rs | 2 +- crates/vino-provider-test/src/lib.rs | 2 +- crates/vino-rpc/src/component_service.rs | 18 + crates/vino-runtime/Cargo.toml | 5 + .../vino-runtime/src/components/grpc_host.rs | 2 - .../src/components/grpc_providers.rs | 150 ------ .../src/components/grpc_url_provider.rs | 485 +++++++++++------- crates/vino-runtime/src/components/mod.rs | 2 +- .../src/components/native_component_actor.rs | 125 ++--- .../src/components/vino_component.rs | 152 +++++- crates/vino-runtime/src/dispatch.rs | 38 +- crates/vino-runtime/src/error.rs | 6 +- crates/vino-runtime/src/network.rs | 10 +- crates/vino-runtime/src/schematic.rs | 51 +- .../vino-runtime/src/schematic_definition.rs | 178 ------- crates/vino-runtime/src/util/mod.rs | 1 - crates/vino-runtime/src/util/native_macro.rs | 75 --- src/lib.rs | 6 +- 26 files changed, 637 insertions(+), 734 deletions(-) delete mode 100644 crates/vino-runtime/src/components/grpc_host.rs delete mode 100644 crates/vino-runtime/src/components/grpc_providers.rs delete mode 100644 crates/vino-runtime/src/util/native_macro.rs diff --git a/Cargo.lock b/Cargo.lock index abfdfb6cd..9ab5b2e50 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,30 +2,6 @@ # It is not intended for manual editing. version = 3 -[[package]] -name = "actix" -version = "0.11.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "543c47e7827f8fcc9d1445bd98ba402137bfce80ee2187429de49c52b5131bd3" -dependencies = [ - "actix-rt", - "actix_derive", - "bitflags", - "bytes 1.0.1", - "crossbeam-channel", - "futures-core", - "futures-sink", - "futures-task", - "futures-util", - "log", - "once_cell", - "parking_lot", - "pin-project-lite", - "smallvec", - "tokio", - "tokio-util", -] - [[package]] name = "actix" version = "0.12.0" @@ -3099,6 +3075,7 @@ dependencies = [ "vino-host", "vino-manifest", "vino-runtime", + "vino-transport", "wapc", ] @@ -3115,7 +3092,7 @@ dependencies = [ name = "vino-host" version = "0.1.0" dependencies = [ - "actix 0.11.1", + "actix", "actix-rt", "env_logger 0.8.4", "hocon", @@ -3129,6 +3106,7 @@ dependencies = [ "tokio", "vino-manifest", "vino-runtime", + "vino-transport", ] [[package]] @@ -3253,7 +3231,7 @@ dependencies = [ name = "vino-runtime" version = "0.1.0" dependencies = [ - "actix 0.12.0", + "actix", "actix-rt", "async-trait", "bcrypt", @@ -3275,11 +3253,15 @@ dependencies = [ "serde_yaml", "test-env-log", "thiserror", + "tokio", + "tokio-stream", + "tonic", "uuid", "vino-guest", "vino-macros", "vino-manifest", "vino-provider", + "vino-provider-test", "vino-rpc", "vino-transport", "wapc", diff --git a/Cargo.toml b/Cargo.toml index b00496b8d..28ea6bf50 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ thiserror = "1.0.24" oci-distribution = "0.6" unsafe-io = "= 0.6.2" vino-runtime = { path="./crates/vino-runtime" } +vino-transport = { path="./crates/vino-transport" } vino-host = { path="./crates/vino-host" } logger = { path="./crates/logger" } vino-manifest = { path="./crates/vino-manifest" } diff --git a/crates/vino-guest/src/lib.rs b/crates/vino-guest/src/lib.rs index 3e3bdb803..783305485 100644 --- a/crates/vino-guest/src/lib.rs +++ b/crates/vino-guest/src/lib.rs @@ -35,6 +35,7 @@ impl Display for Signal { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub enum OutputPayload { + Test(String), MessagePack(Vec), Exception(String), Error(String), diff --git a/crates/vino-host/Cargo.toml b/crates/vino-host/Cargo.toml index 4680514e8..b19ebcb88 100644 --- a/crates/vino-host/Cargo.toml +++ b/crates/vino-host/Cargo.toml @@ -9,7 +9,7 @@ edition = "2018" [dependencies] vino-runtime = { path="../vino-runtime" } log = "0.4.14" -actix = "0.11" +actix = "0.12" actix-rt = "2.1.0" thiserror = "1.0.25" nkeys = "0.1.0" @@ -17,6 +17,7 @@ serde = { version="1.0.126", features=["derive"] } serde_yaml = "0.8.17" hocon = "0.5.2" vino-manifest = { path="../vino-manifest" } +vino-transport = { path="../vino-transport" } [dev-dependencies] test-env-log = "0.2.7" diff --git a/crates/vino-host/src/error.rs b/crates/vino-host/src/error.rs index 52b7137e7..f181d2ead 100644 --- a/crates/vino-host/src/error.rs +++ b/crates/vino-host/src/error.rs @@ -18,6 +18,8 @@ pub enum VinoHostError { #[error(transparent)] VinoError(#[from] vino_runtime::Error), #[error(transparent)] + TransportError(#[from] vino_transport::Error), + #[error(transparent)] ManifestError(#[from] vino_manifest::Error), #[error("Invalid host state for operation: {0}")] InvalidHostState(String), diff --git a/crates/vino-host/src/host.rs b/crates/vino-host/src/host.rs index 9dbb64b21..fcb76d5c9 100644 --- a/crates/vino-host/src/host.rs +++ b/crates/vino-host/src/host.rs @@ -96,10 +96,8 @@ mod test { use std::path::PathBuf; use maplit::hashmap; - use vino_runtime::{ - deserialize, - MessagePayload, - }; + use vino_runtime::MessagePayload; + use vino_transport::deserialize; use crate::host_definition::HostDefinition; use crate::{ diff --git a/crates/vino-provider-test/schemas/test.widl b/crates/vino-provider-test/schemas/test.widl index b9526b5d8..f5375600e 100644 --- a/crates/vino-provider-test/schemas/test.widl +++ b/crates/vino-provider-test/schemas/test.widl @@ -1,4 +1,4 @@ -namespace "vino::test::provider" +namespace "test-component" type Inputs { input: string diff --git a/crates/vino-provider-test/src/components/generated/test.rs b/crates/vino-provider-test/src/components/generated/test.rs index c343813f0..cb1274f5c 100644 --- a/crates/vino-provider-test/src/components/generated/test.rs +++ b/crates/vino-provider-test/src/components/generated/test.rs @@ -15,8 +15,8 @@ use vino_rpc::port::{ use vino_transport::deserialize; #[derive(Debug, PartialEq, Deserialize, Serialize, Default, Clone)] -pub struct Inputs { - pub input: String, +pub(crate) struct Inputs { + pub(crate) input: String, } pub(crate) fn inputs_list() -> Vec { @@ -26,19 +26,19 @@ pub(crate) fn inputs_list() -> Vec { #[derive(Debug, PartialEq, Deserialize, Serialize, Default, Clone)] pub(crate) struct InputEncoded { #[serde(rename = "input")] - pub input: Vec, + pub(crate) input: Vec, } pub(crate) fn deserialize_inputs( args: InputEncoded, -) -> std::result::Result> { +) -> Result> { Ok(Inputs { input: deserialize(&args.input)?, }) } #[derive(Default)] -pub struct Outputs { +pub(crate) struct Outputs { pub(crate) output: OutputSender, } @@ -46,7 +46,7 @@ pub(crate) fn outputs_list() -> Vec { vec!["output".to_string()] } -pub struct OutputSender { +pub(crate) struct OutputSender { port: Arc>, } impl Default for OutputSender { @@ -64,7 +64,7 @@ impl Sender for OutputSender { } } -pub fn get_outputs() -> (Outputs, Receiver) { +pub(crate) fn get_outputs() -> (Outputs, Receiver) { let outputs = Outputs::default(); let ports = vec![outputs.output.port.clone()]; let receiver = Receiver::new(ports); diff --git a/crates/vino-provider-test/src/components/mod.rs b/crates/vino-provider-test/src/components/mod.rs index 177678c2c..d21924cac 100644 --- a/crates/vino-provider-test/src/components/mod.rs +++ b/crates/vino-provider-test/src/components/mod.rs @@ -8,7 +8,7 @@ pub(crate) fn get_component( name: &str, ) -> Option + Sync + Send>> { match name { - "vino::test::provider" => Some(Box::new(test::Component::default())), + "test-component" => Some(Box::new(test::Component::default())), _ => None, } } diff --git a/crates/vino-provider-test/src/lib.rs b/crates/vino-provider-test/src/lib.rs index bbb288be5..aa65a91d4 100644 --- a/crates/vino-provider-test/src/lib.rs +++ b/crates/vino-provider-test/src/lib.rs @@ -42,7 +42,7 @@ impl RpcHandler for Provider { let future = instance.job_wrapper(context, &payload); Ok(future.await?) } - None => Err(anyhow!("Component not found").into()), + None => Err(anyhow!("Component '{}' not found", component).into()), } } } diff --git a/crates/vino-rpc/src/component_service.rs b/crates/vino-rpc/src/component_service.rs index 8ee4a794d..26ee83f58 100644 --- a/crates/vino-rpc/src/component_service.rs +++ b/crates/vino-rpc/src/component_service.rs @@ -18,11 +18,29 @@ use crate::{ MessagePayload, Output, PayloadKind, + RpcHandler, }; pub struct ComponentService { pub provider: Arc>, } +impl ComponentService { + pub fn new(provider: T) -> Self + where + T: RpcHandler + 'static, + { + Self { + provider: Arc::new(Mutex::new(provider)), + } + } + pub fn new_shared(provider: Arc>) -> Self + where + T: RpcHandler + 'static, + { + Self { provider } + } +} + pub fn make_output(port: &str, inv_id: &str, payload: OutputPayload) -> Result { match serialize(payload) { Ok(bytes) => Ok(Output { diff --git a/crates/vino-runtime/Cargo.toml b/crates/vino-runtime/Cargo.toml index 49b052e81..26e96fc17 100644 --- a/crates/vino-runtime/Cargo.toml +++ b/crates/vino-runtime/Cargo.toml @@ -41,8 +41,13 @@ oci-distribution = "0.6.0" nkeys = "0.1.0" derivative = "2.2.0" derive-new = "0.5.9" +tokio = { version="1.4.0", features=["macros", "net", "rt-multi-thread"] } +tonic = { version="0.4.3", features=["tls"] } +tokio-stream = "0.1.6" [dev-dependencies] env_logger = "0.8.3" test-env-log = "0.2.7" maplit = "1.0.2" +vino-provider-test = { path="../vino-provider-test" } +tonic = "0.4.3" diff --git a/crates/vino-runtime/src/components/grpc_host.rs b/crates/vino-runtime/src/components/grpc_host.rs deleted file mode 100644 index 6825e60b2..000000000 --- a/crates/vino-runtime/src/components/grpc_host.rs +++ /dev/null @@ -1,2 +0,0 @@ -// #[derive(Debug)] -// pub struct GrpcHost {} diff --git a/crates/vino-runtime/src/components/grpc_providers.rs b/crates/vino-runtime/src/components/grpc_providers.rs deleted file mode 100644 index fbbfa59b2..000000000 --- a/crates/vino-runtime/src/components/grpc_providers.rs +++ /dev/null @@ -1,150 +0,0 @@ -// use std::env::temp_dir; -// use std::path::PathBuf; - -// use actix::prelude::*; -// use wascap::prelude::{ -// Claims, -// KeyPair, -// }; - -// use crate::dispatch::{ -// Invocation, -// InvocationResponse, -// }; -// use crate::{ -// Result, -// VinoEntity, -// }; - -// #[derive(Clone, Debug)] -// pub struct ExternalProvider { -// pub(crate) link_name: String, -// pub(crate) claims: Claims, -// pub(crate) bytes: Option>, -// } - -// impl ExternalProvider { -// /// Returns the path where this capability provider's binary resides/should reside for cache purposes. -// pub fn cache_path(&self) -> PathBuf { -// let mut path = temp_dir(); -// path.push("vino"); -// path.push(&self.claims.subject); -// path.push(format!( -// "{}", -// self.claims.metadata.as_ref().unwrap().rev.unwrap_or(0) -// )); -// path.push(Self::native_target()); -// path -// } -// pub fn native_target() -> String { -// format!("{}-{}", std::env::consts::ARCH, std::env::consts::OS) -// } -// } - -// #[derive(Message, Debug)] -// #[rtype(result = "Result")] -// pub(crate) struct Initialize { -// pub(crate) seed: String, -// pub(crate) image_ref: Option, -// } - -// // trait ExternalProvider {} - -// struct State { -// #[allow(dead_code)] -// kp: KeyPair, -// #[allow(dead_code)] -// plugin: Box, -// #[allow(dead_code)] -// image_ref: Option, -// } - -// pub(crate) struct GrpcProvider { -// state: Option, -// } - -// impl GrpcProvider {} - -// impl Actor for GrpcProvider { -// type Context = SyncContext; - -// fn started(&mut self, _ctx: &mut Self::Context) { -// info!("Native provider host started"); -// } - -// fn stopped(&mut self, _ctx: &mut Self::Context) { -// if self.state.is_none() { -// return; -// } -// let _state = self.state.as_mut().unwrap(); - -// // state.plugin.stop(); // Tell the provider to clean up, dispose of resources, stop threads, etc -// } -// } - -// impl Handler for GrpcProvider { -// type Result = Result; - -// fn handle(&mut self, _msg: Initialize, _ctx: &mut Self::Context) -> Self::Result { -// // let (library, plugin) = match extrude(&msg.cap) { -// // Ok((l, r)) => (l, r), -// // Err(e) => { -// // error!("Failed to extract plugin from provider: {}", e); -// // ctx.stop(); -// // return Err("Failed to extract plugin from provider".into()); -// // } -// // }; - -// Ok(VinoEntity::default()) -// } -// } - -// impl Handler for GrpcProvider { -// type Result = InvocationResponse; - -// fn handle(&mut self, _inv: Invocation, _ctx: &mut Self::Context) -> Self::Result { -// // let state = self.state.as_ref().unwrap(); -// InvocationResponse::success("TODO".to_string(), vec![]) -// } -// } - -// #[allow(dead_code)] -// pub(crate) fn write_provider_to_disk(provider: &ExternalProvider) -> Result<()> { -// if let Some(ref bytes) = provider.bytes { -// use std::io::Write; -// let path = provider.cache_path(); -// let mut parent_dir = path.clone(); -// parent_dir.pop(); -// std::fs::create_dir_all(parent_dir)?; -// debug!("Caching provider to {}", path.display()); -// let mut file = std::fs::File::create(&path)?; -// Ok(file.write_all(bytes)?) -// } else { -// Err("No bytes to cache".into()) -// } -// } - -// #[allow(dead_code)] -// fn extrude(provider: &ExternalProvider) -> Result<()> { -// if provider.bytes.is_some() { -// let path = provider.cache_path(); -// // If this file is already on disk, don't overwrite -// if path.exists() { -// debug!("Using cache at {}", path.display()); -// } else { -// write_provider_to_disk(provider)?; -// } -// // #[cfg(target_os = "linux")] -// // #[cfg(not(target_os = "linux"))] -// // Ok((Some(library), plugin)) -// } -// Ok(()) -// } - -// #[cfg(test)] -// mod test { -// // use actix::prelude::*; - -// #[actix_rt::test] -// async fn test_extras_actor() {} -// } diff --git a/crates/vino-runtime/src/components/grpc_url_provider.rs b/crates/vino-runtime/src/components/grpc_url_provider.rs index 36b781a6c..2a6fd939d 100644 --- a/crates/vino-runtime/src/components/grpc_url_provider.rs +++ b/crates/vino-runtime/src/components/grpc_url_provider.rs @@ -1,175 +1,310 @@ -// use actix::fut::ok; -// use actix::prelude::*; -// use vino_rpc::component_rpc_client::ComponentRpcClient; - -// use crate::dispatch::{ -// Invocation, -// InvocationResponse, -// MessagePayload, -// VinoEntity, -// }; -// use crate::Result; - -// #[derive(Default, Debug)] -// pub struct GrpcUrlProvider { -// namespace: String, -// state: Option, -// seed: String, -// } - -// #[derive(Debug)] -// struct State { -// address: String, -// } - -// impl Actor for GrpcUrlProvider { -// type Context = Context; - -// fn started(&mut self, _ctx: &mut Self::Context) { -// trace!("GRPC Provider started"); -// } - -// fn stopped(&mut self, _ctx: &mut Self::Context) {} -// } - -// #[derive(Message)] -// #[rtype(result = "Result<()>")] -// pub(crate) struct Initialize { -// pub(crate) namespace: String, -// pub(crate) address: String, -// pub(crate) _client: String, -// pub(crate) signing_seed: String, -// } - -// impl Handler for GrpcUrlProvider { -// type Result = ResponseActFuture>; - -// fn handle(&mut self, msg: Initialize, _ctx: &mut Self::Context) -> Self::Result { -// trace!("Native actor initialized"); -// self.namespace = msg.namespace; -// self.seed = msg.signing_seed; - -// let addr = msg.address; - -// Box::pin( -// ComponentRpcClient::connect(addr) -// .into_actor(self) -// .then(|_client, _provider, _ctx| ok(())), -// ) -// } -// } - -// impl Handler for GrpcUrlProvider { -// type Result = InvocationResponse; - -// fn handle(&mut self, msg: Invocation, _ctx: &mut Self::Context) -> Self::Result { -// trace!( -// "Native actor Invocation - From {} to {}", -// msg.origin.url(), -// msg.target.url() -// ); -// // let target = msg.target.url(); - -// // let inv_id = msg.id; - -// if let VinoEntity::Provider(name) = &msg.target { -// trace!("Handling provider invocation for name: {}", name); - -// if let MessagePayload::MultiBytes(_payload) = msg.msg { -// InvocationResponse::error(msg.tx_id, "todo".to_string()) -// } else { -// InvocationResponse::error( -// msg.tx_id, -// "Invalid payload sent from native actor".to_string(), -// ) -// } -// } else { -// InvocationResponse::error( -// msg.tx_id, -// "Sent invocation for incorrect entity".to_string(), -// ) -// } -// } -// } - -// #[cfg(test)] -// mod test { - -// use std::net::{ -// IpAddr, -// Ipv4Addr, -// SocketAddr, -// }; -// use std::str::FromStr; -// use std::sync::Arc; - -// use super::*; - -// async fn make_grpc_server() { -// let addr: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from_str("127.0.0.1")), 54321); - -// trace!("Binding to {:?}", addr.to_string()); - -// let component_service = ComponentService { provider }; - -// let svc = ComponentRpcServer::new(component_service); - -// Server::builder().add_service(svc).serve(addr).await?; - -// trace!("Server started"); -// } - -// #[test_env_log::test(actix_rt::test)] -// async fn test_init() -> Result<()> { -// let init_handle = init( -// Arc::new(Mutex::new(Provider::default())), -// Some(Options { -// port: 12345, -// address: Ipv4Addr::from_str("127.0.0.1")?, -// }), -// ); -// tokio::select! { -// _ = tokio::time::sleep(Duration::from_secs(1)) => { -// println!("timeout reached"); -// } -// _ = init_handle => { -// panic!("server died"); -// } -// }; - -// trace!("test_init"); -// let provider = GrpcUrlProvider::default(); -// let addr = provider.start(); -// let mut schematic_def = Initialize { -// namespace: "test", -// address: "", -// _client: (), -// signing_seed: (), -// }; - -// let hostkey = KeyPair::new_server(); - -// addr -// .send(Initialize { -// network: Network::from_hostlocal_registry(&KeyPair::new_server().public_key()), -// host_id: "test".to_string(), -// schematic: schematic_def, -// seed: hostkey.seed()?, -// allow_latest: true, -// allowed_insecure: vec![], -// }) -// .await??; -// let mut input: HashMap> = HashMap::new(); -// input.insert("input".to_string(), vec![20]); -// let response = addr -// .send(super::Request { -// tx_id: "some_id".to_string(), -// schematic: "logger".to_string(), -// payload: input, -// }) -// .await?; -// println!("{:?}", response); - -// Ok(()) -// } -// } +use actix::fut::{ + err, + ok, +}; +use actix::prelude::*; +use futures::FutureExt; +use rpc::component_rpc_client::ComponentRpcClient; +use tokio::sync::mpsc::UnboundedSender; +use tonic::transport::Channel; +use vino_guest::OutputPayload; +use vino_rpc as rpc; +use vino_transport::{ + deserialize, + serialize, +}; + +use crate::dispatch::{ + Invocation, + InvocationResponse, + MessagePayload, +}; +use crate::error::VinoError; +use crate::Result; + +#[derive(Default, Debug)] +pub struct GrpcUrlProvider { + namespace: String, + state: Option, + seed: String, +} + +#[derive(Debug)] +struct State { + pub(crate) client: ComponentRpcClient, + pub(crate) sender: UnboundedSender, +} + +impl Actor for GrpcUrlProvider { + type Context = Context; + + fn started(&mut self, _ctx: &mut Self::Context) { + debug!("GRPC Provider started"); + } + + fn stopped(&mut self, _ctx: &mut Self::Context) {} +} + +#[derive(Message)] +#[rtype(result = "Result<()>")] +pub(crate) struct Initialize { + pub(crate) namespace: String, + pub(crate) address: String, + pub(crate) signing_seed: String, + pub(crate) sender: UnboundedSender, +} + +impl Handler for GrpcUrlProvider { + type Result = ResponseActFuture>; + + fn handle(&mut self, msg: Initialize, _ctx: &mut Self::Context) -> Self::Result { + debug!("Native actor initialized"); + self.namespace = msg.namespace; + self.seed = msg.signing_seed; + let sender = msg.sender; + + let addr = msg.address; + + Box::pin(ComponentRpcClient::connect(addr).into_actor(self).then( + move |client, provider, _ctx| match client { + Ok(client) => { + provider.state = Some(State { client, sender }); + ok(()) + } + Err(e) => err(VinoError::ProviderError(format!( + "Could not connect to Rpc Client in GrpcUrlProvider: {}", + e + ))), + }, + )) + } +} + +impl Handler for GrpcUrlProvider { + type Result = ResponseFuture; + + fn handle(&mut self, msg: Invocation, _ctx: &mut Self::Context) -> Self::Result { + debug!( + "Native actor Invocation - From {} to {}", + msg.origin.url(), + msg.target.url() + ); + let target_url = msg.target.url(); + let target = msg.target; + let payload = msg.msg; + let tx_id = msg.tx_id; + let tx_id2 = tx_id.clone(); + let claims = msg.encoded_claims; + let host_id = msg.host_id; + + let inv_id = msg.id; + let state = self.state.as_ref().unwrap(); + // let seed = self.seed.clone(); + let mut client = state.client.clone(); + let sender = state.sender.clone(); + let origin = msg.origin; + + let fut = async move { + let entity = target + .into_component() + .map_err(|_| "Provider received invalid invocation")?; + debug!("Getting component: {}", entity); + let payload = payload + .into_multibytes() + .map_err(|_| VinoError::ComponentError("Provider sent invalid payload".into()))?; + debug!("Payload is : {:?}", payload); + let payload = serialize(payload).map_err(|_| "Could not serialize input payload")?; + + debug!("making external request {}", target_url); + + let invocation = rpc::Invocation { + origin: Some(rpc::Entity { + name: origin.key(), + kind: rpc::entity::EntityKind::Test.into(), + }), + target: Some(rpc::Entity { + name: entity.name, + kind: rpc::entity::EntityKind::Provider.into(), + }), + msg: Some(rpc::MessagePayload { + kind: rpc::PayloadKind::MultiBytes.into(), + value: payload, + }), + id: inv_id.to_string(), + tx_id: tx_id.to_string(), + encoded_claims: claims.to_string(), + host_id: host_id.to_string(), + }; + + let stream = client.invoke(invocation).await?; + actix::spawn(async move { + let mut stream = stream.into_inner(); + loop { + match stream.message().await { + Ok(next) => { + if next.is_none() { + break; + } + let output = next.unwrap(); + + // let kp = KeyPair::from_seed(&seed).unwrap(); + debug!( + "Provider Component for invocation {} got output on port [{}]: result: {:?}", + output.invocation_id, output.port, output.payload + ); + let output_payload = output.payload.unwrap(); + let kind: rpc::PayloadKind = rpc::PayloadKind::from_i32(output_payload.kind).unwrap(); + debug!("Payload kind: {:?}", kind); + let payload = match kind { + rpc::PayloadKind::Error => OutputPayload::Error( + deserialize(&output_payload.value) + .unwrap_or_else(|_| "Can not deserialize output payload".to_string()), + ), + rpc::PayloadKind::Exception => OutputPayload::Exception( + deserialize(&output_payload.value) + .unwrap_or_else(|_| "Can not deserialize output payload".to_string()), + ), + rpc::PayloadKind::Invalid => { + OutputPayload::Error("Invalid payload kind".to_string()) + } + rpc::PayloadKind::Test => OutputPayload::Test( + deserialize(&output_payload.value) + .unwrap_or_else(|_| "Invalid payload kind".to_string()), + ), + rpc::PayloadKind::MessagePack => OutputPayload::MessagePack(output_payload.value), + rpc::PayloadKind::MultiBytes => { + OutputPayload::Error("Invalid payload kind".to_string()) + } + }; + let result = sender.send(MessagePayload::from(payload)); + match result { + Ok(_) => { + debug!("successfully sent output payload to receiver"); + } + Err(e) => { + error!("error sending output payload to receiver: {}", e); + } + } + // let _result = native_host_callback(kp, &inv_id, "", &output.port, &payload).unwrap(); + } + Err(e) => { + error!("Received error from GRPC Url Provider upstream: {} ", e); + break; + } + } + } + }); + Ok!(InvocationResponse::success(tx_id, vec![],)) + }; + + Box::pin(fut.then(|result| async { + match result { + Ok(invocation) => invocation, + Err(e) => InvocationResponse::error(tx_id2, e.to_string()), + } + })) + } +} + +#[cfg(test)] +mod test { + + use std::net::{ + IpAddr, + Ipv4Addr, + SocketAddr, + }; + use std::str::FromStr; + + use maplit::hashmap; + use tokio::sync::mpsc::unbounded_channel; + use tonic::transport::Server; + use vino_provider_test::Provider; + use vino_rpc::component_rpc_server::ComponentRpcServer; + use vino_rpc::ComponentService; + use vino_transport::serialize; + + use super::*; + use crate::dispatch::ComponentEntity; + use crate::VinoEntity; + + async fn make_grpc_server(provider: Provider) -> Result<()> { + let addr: SocketAddr = + SocketAddr::new(IpAddr::V4(Ipv4Addr::from_str("127.0.0.1").unwrap()), 54321); + + debug!("Binding to {:?}", addr.to_string()); + + let component_service = ComponentService::new(provider); + + let svc = ComponentRpcServer::new(component_service); + + Server::builder() + .add_service(svc) + .serve(addr) + .await + .unwrap(); + + debug!("Server started"); + Ok(()) + } + + #[test_env_log::test(actix_rt::test)] + async fn test_init() -> Result<()> { + let init_handle = make_grpc_server(Provider::default()); + + debug!("test_init"); + let (tx, rx) = unbounded_channel(); + let mut rx = rx; + let work = async move { + let grpc_provider = GrpcUrlProvider::start_default(); + grpc_provider + .send(Initialize { + namespace: "test".to_string(), + address: "https://127.0.0.1:54321".to_string(), + signing_seed: "seed".to_string(), + sender: tx, + }) + .await??; + + grpc_provider + .send(Invocation { + origin: VinoEntity::Test("grpc".to_string()), + target: VinoEntity::Component(ComponentEntity { + id: "test::DEADBEEF".to_string(), + reference: "DEADBEEF".to_string(), + name: "test-component".to_string(), + }), + msg: MessagePayload::MultiBytes(hashmap! { + "input".to_string()=>serialize("test string payload")? + }), + id: Invocation::uuid(), + tx_id: Invocation::uuid(), + encoded_claims: "".to_string(), + host_id: Invocation::uuid(), + }) + .await?; + Ok!(()) + }; + tokio::select! { + _ = work => { + debug!("Work complete"); + } + _ = init_handle => { + panic!("server died"); + } + }; + let next = rx.recv().await; + match next { + Some(n) => { + let result: OutputPayload = match n { + MessagePayload::MessagePack(bytes) => deserialize(&bytes)?, + _ => panic!("Unexpected payload"), + }; + + debug!("Got next : {:?}", result); + } + None => panic!("Nothing received"), + } + Ok(()) + } +} diff --git a/crates/vino-runtime/src/components/mod.rs b/crates/vino-runtime/src/components/mod.rs index c9e4c5ffc..8efbcbf8e 100644 --- a/crates/vino-runtime/src/components/mod.rs +++ b/crates/vino-runtime/src/components/mod.rs @@ -1,6 +1,6 @@ use std::path::Path; -// pub mod grpc_url_provider; +pub mod grpc_url_provider; pub mod native_component_actor; pub mod vino_component; pub(crate) mod wapc_component_actor; diff --git a/crates/vino-runtime/src/components/native_component_actor.rs b/crates/vino-runtime/src/components/native_component_actor.rs index 375c94594..05ee727b7 100644 --- a/crates/vino-runtime/src/components/native_component_actor.rs +++ b/crates/vino-runtime/src/components/native_component_actor.rs @@ -1,12 +1,13 @@ -use std::borrow::BorrowMut; use std::sync::{ Arc, Mutex, }; use actix::prelude::*; -use futures::executor::block_on; -use futures::StreamExt; +use futures::{ + FutureExt, + StreamExt, +}; use nkeys::KeyPair; use vino_guest::{ OutputPayload, @@ -17,14 +18,13 @@ use vino_transport::serialize; use crate::components::vino_component::NativeComponent; use crate::dispatch::{ native_host_callback, - Invocation, InvocationResponse, - MessagePayload, - VinoEntity, }; use crate::native_actors::State; use crate::{ + error, native_actors, + Invocation, Result, }; @@ -46,7 +46,7 @@ impl std::fmt::Debug for NativeComponentActor { } impl Actor for NativeComponentActor { - type Context = SyncContext; + type Context = Context; fn started(&mut self, _ctx: &mut Self::Context) { trace!("Native actor started"); @@ -96,7 +96,7 @@ impl Handler for NativeComponentActor { } impl Handler for NativeComponentActor { - type Result = InvocationResponse; + type Result = ResponseFuture; fn handle(&mut self, msg: Invocation, _ctx: &mut Self::Context) -> Self::Result { trace!( @@ -104,75 +104,58 @@ impl Handler for NativeComponentActor { msg.origin.url(), msg.target.url() ); - let target = msg.target.url(); + let target_url = msg.target.url(); + let target = msg.target; + let payload = msg.msg; + let tx_id = msg.tx_id; + let tx_id2 = tx_id.clone(); let inv_id = msg.id; let state = self.state.as_ref().unwrap().clone(); - - if let VinoEntity::Component(name) = &msg.target { - trace!("Getting actor by name: {:?}", name); - let component = native_actors::get_native_actor(name); - match component { - Some(component) => { - if let MessagePayload::MultiBytes(payload) = msg.msg { - trace!("Payload is : {:?}", payload); - match serialize(payload) { - Ok(payload) => { - trace!("executing actor {}", target); - // TODO fix async - let mut receiver = block_on(component.job_wrapper(state, &payload)); - match receiver.borrow_mut() { - Err(e) => { - error!("{}", e.to_string()); - InvocationResponse::error(msg.tx_id, e.to_string()) - } - Ok(receiver) => { - loop { - let next = block_on(receiver.next()); - if next.is_none() { - break; - } - - let (port_name, msg) = next.unwrap(); - let kp = KeyPair::from_seed(&self.seed).unwrap(); - trace!( - "Native actor got output on port [{}]: result: {:?}", - port_name, - msg - ); - let _result = - native_host_callback(kp, &inv_id, "", &port_name, &msg).unwrap(); - } - InvocationResponse::success( - msg.tx_id, - serialize("done").unwrap_or_else(|_| serialize(Signal::Done).unwrap()), - ) - } - } - } - Err(e) => { - InvocationResponse::error(msg.tx_id, format!("Could not serialize payload: {}", e)) - } - } - } else { - trace!("Invalid payload"); - InvocationResponse::error( - msg.tx_id, - "Invalid payload sent from native actor".to_string(), - ) + let seed = self.seed.clone(); + let fut = async move { + let entity = target + .into_component() + .map_err(|_| "Provider received invalid invocation")?; + debug!("Getting component: {}", entity); + let component = native_actors::get_native_actor(&entity.name).ok_or_else(|| { + error::VinoError::ComponentError(format!("Component {} not found", entity)) + })?; + let payload = payload + .into_multibytes() + .map_err(|_| error::VinoError::ComponentError("Provider sent invalid payload".into()))?; + trace!("Payload is : {:?}", payload); + let payload = serialize(payload).map_err(|_| "Could not serialize input payload")?; + + trace!("executing actor {}", target_url); + let mut receiver = component.job_wrapper(state, &payload).await?; + actix::spawn(async move { + loop { + trace!("Native component {} waiting for output", entity); + let next = receiver.next().await; + if next.is_none() { + break; } - } - None => { - trace!("Actor not found: {:?}", name); - InvocationResponse::error(msg.tx_id, "Sent invocation for incorrect actor".to_string()) + let (port_name, msg) = next.unwrap(); + let kp = KeyPair::from_seed(&seed).unwrap(); + trace!( + "Native actor {} got output on port [{}]: result: {:?}", + entity, + port_name, + msg + ); + let _result = native_host_callback(kp, &inv_id, "", &port_name, &msg).unwrap(); } + }); + Ok!(InvocationResponse::success(tx_id, vec![],)) + }; + + Box::pin(fut.then(|result| async { + match result { + Ok(invocation) => invocation, + Err(e) => InvocationResponse::error(tx_id2, e.to_string()), } - } else { - InvocationResponse::error( - msg.tx_id, - "Sent invocation for incorrect entity".to_string(), - ) - } + })) } } diff --git a/crates/vino-runtime/src/components/vino_component.rs b/crates/vino-runtime/src/components/vino_component.rs index 3cb4acf97..5ffca4e62 100644 --- a/crates/vino-runtime/src/components/vino_component.rs +++ b/crates/vino-runtime/src/components/vino_component.rs @@ -1,9 +1,13 @@ +use std::collections::HashMap; use std::fs::File; use std::io::prelude::*; use std::path::Path; use actix::{ + Actor, Addr, + Arbiter, + Recipient, SyncArbiter, }; use async_trait::async_trait; @@ -17,11 +21,19 @@ use super::{ Inputs, Outputs, }; -use crate::components::native_component_actor::NativeComponentActor; +use crate::components::native_component_actor::{ + self, + NativeComponentActor, +}; +use crate::components::wapc_component_actor; +use crate::network::ComponentMetadata; +use crate::util::oci::fetch_oci_bytes; use crate::{ native_actors, Error, + Invocation, Result, + SchematicDefinition, }; #[derive(Derivative, Clone)] @@ -73,7 +85,7 @@ pub(crate) trait Start { fn start(&mut self, seed: String) -> Result> where - Self::Item: actix::Actor; + Self::Item: Actor; } impl WapcComponent { @@ -244,10 +256,144 @@ impl VinoComponent for NativeComponent { impl Start for NativeComponent { type Item = NativeComponentActor; fn start(&mut self, _seed: String) -> Result> { - let native_actor = SyncArbiter::start(1, NativeComponentActor::default); + let native_actor = NativeComponentActor::start_default(); // native_actor // .send(native_component_actor::Initialize { name: self.name() }) // .await?; Ok(native_actor) } } + +pub(crate) async fn get_components_for_schematic( + schematic: SchematicDefinition, + seed: String, + allow_latest: bool, + allowed_insecure: Vec, +) -> Result> { + let mut metadata_map: HashMap = HashMap::new(); + debug!("getting components for schematic {:?}", schematic); + + for comp in schematic.components.values() { + debug!("{:?}", comp); + let component_ref = schematic.id_to_ref(&comp.id)?; + let (component, addr) = get_component( + component_ref.to_string(), + seed.clone(), + allow_latest, + &allowed_insecure, + ) + .await?; + metadata_map.insert( + component.id(), + ComponentMetadata { + name: component.name(), + inputs: component.get_inputs(), + outputs: component.get_outputs(), + addr, + }, + ); + } + Ok(metadata_map) +} + +pub(crate) async fn get_component( + comp_ref: String, + seed: String, + allow_latest: bool, + allowed_insecure: &[String], +) -> Result<(BoxedComponent, Recipient)> { + let p = Path::new(&comp_ref); + let component: Result<(BoxedComponent, Recipient)> = if p.exists() { + // read actor from disk + match WapcComponent::from_file(p) { + Ok(component) => { + trace!( + "Starting wapc component '{}' from file {}", + component.name(), + p.to_string_lossy() + ); + let actor = SyncArbiter::start(1, WapcComponentActor::default); + actor + .send(wapc_component_actor::Initialize { + actor_bytes: component.bytes.clone(), + signing_seed: seed, + }) + .await??; + let recipient = actor.recipient::(); + Ok((Box::new(component), recipient)) + } + Err(e) => Err(Error::SchematicError(format!( + "Could not read file {}:{}", + comp_ref, + e.to_string() + ))), + } + } else { + let providers = vec!["vino".to_string()]; + for namespace in providers { + if comp_ref.starts_with(&format!("{}::", namespace)) { + trace!( + "registering component under the {} provider namespace", + namespace + ); + let name = str::replace(&comp_ref, &format!("{}::", namespace), ""); + // todo temporary and very hacky + if namespace == "vino" { + match NativeComponent::from_id(namespace, name) { + Ok(component) => { + trace!("Starting native component '{}'", component.name(),); + let arbiter = Arbiter::new(); + let actor = NativeComponentActor::start_in_arbiter(&arbiter.handle(), |_| { + NativeComponentActor::default() + }); + actor + .send(native_component_actor::Initialize { + name: component.name(), + signing_seed: seed, + }) + .await??; + let recipient = actor.recipient::(); + + return Ok((Box::new(component), recipient)); + } + Err(e) => { + return Err(Error::SchematicError(format!( + "Could not load native component {}: {}", + comp_ref, e + ))) + } + } + } + } + } + // load actor from OCI + let component = fetch_oci_bytes(&comp_ref, allow_latest, allowed_insecure) + .await + .and_then(|bytes| WapcComponent::from_slice(&bytes)); + match component { + Ok(component) => { + trace!( + "Starting wapc component '{}' from URL {}", + component.name(), + comp_ref + ); + + let actor = SyncArbiter::start(1, WapcComponentActor::default); + actor + .send(wapc_component_actor::Initialize { + actor_bytes: component.bytes.clone(), + signing_seed: seed, + }) + .await??; + + let recipient = actor.recipient::(); + Ok((Box::new(component), recipient)) + } + Err(_) => Err(Error::SchematicError(format!( + "Could not find component '{}' on disk or in registry", + comp_ref, + ))), + } + }; + component +} diff --git a/crates/vino-runtime/src/dispatch.rs b/crates/vino-runtime/src/dispatch.rs index e4a0cb656..f394a062e 100644 --- a/crates/vino-runtime/src/dispatch.rs +++ b/crates/vino-runtime/src/dispatch.rs @@ -109,6 +109,7 @@ pub enum MessagePayload { MultiBytes(HashMap>), Exception(String), Error(String), + Test(String), } impl Default for MessagePayload { @@ -120,7 +121,13 @@ impl Default for MessagePayload { impl MessagePayload { pub fn into_bytes(self) -> Result> { match self { - MessagePayload::MessagePack(bytes) => Ok(bytes), + MessagePayload::MessagePack(v) => Ok(v), + _ => Err(Error::PayloadConversionError("Invalid payload".to_string())), + } + } + pub fn into_multibytes(self) -> Result>> { + match self { + MessagePayload::MultiBytes(v) => Ok(v), _ => Err(Error::PayloadConversionError("Invalid payload".to_string())), } } @@ -150,6 +157,7 @@ impl From for MessagePayload { OutputPayload::MessagePack(v) => MessagePayload::MessagePack(v), OutputPayload::Exception(v) => MessagePayload::Exception(v), OutputPayload::Error(v) => MessagePayload::Error(v), + OutputPayload::Test(v) => MessagePayload::Test(v), } } } @@ -205,6 +213,7 @@ pub(crate) fn invocation_hash(target_url: &str, origin_url: &str, msg: &MessageP cleanbytes.write_all(val).unwrap(); } } + MessagePayload::Test(v) => cleanbytes.write_all(v.as_bytes()).unwrap(), } let digest = sha256_digest(cleanbytes.as_slice()).unwrap(); HEXUPPER.encode(digest.as_ref()) @@ -231,10 +240,23 @@ pub enum VinoEntity { Test(String), Schematic(String), Port(PortEntity), - Component(String), + Component(ComponentEntity), Provider(String), } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct ComponentEntity { + pub id: String, + pub reference: String, + pub name: String, +} + +impl Display for ComponentEntity { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}/{}", self.reference, self.id) + } +} + impl Default for VinoEntity { fn default() -> Self { Self::Test("default".to_string()) @@ -246,6 +268,7 @@ impl Display for VinoEntity { write!(f, "{}", self.url()) } } + pub(crate) const URL_SCHEME: &str = "wasmbus"; impl VinoEntity { @@ -254,7 +277,7 @@ impl VinoEntity { match self { VinoEntity::Test(name) => format!("{}://test/{}", URL_SCHEME, name), VinoEntity::Schematic(name) => format!("{}://schematic/{}", URL_SCHEME, name), - VinoEntity::Component(name) => format!("{}://component/{}", URL_SCHEME, name), + VinoEntity::Component(e) => format!("{}://component/{}", URL_SCHEME, e.id), VinoEntity::Provider(name) => format!("{}://provider/{}", URL_SCHEME, name), VinoEntity::Port(port) => format!( "{}://{}::{}:{}", @@ -268,7 +291,7 @@ impl VinoEntity { match self { VinoEntity::Test(name) => format!("test:{}", name), VinoEntity::Schematic(name) => format!("schematic:{}", name), - VinoEntity::Component(name) => format!("component:{}", name), + VinoEntity::Component(e) => format!("component:{}", e.id), VinoEntity::Provider(name) => format!("provider:{}", name), VinoEntity::Port(port) => { format!("{}::{}:{}", port.schematic, port.reference, port.name) @@ -283,7 +306,7 @@ impl VinoEntity { } } - pub fn into_component(self) -> Result { + pub fn into_component(self) -> Result { match self { VinoEntity::Component(s) => Ok(s), _ => Err(Error::ConversionError), @@ -348,16 +371,17 @@ pub(crate) fn native_host_callback( OutputPayload::MessagePack(b) => MessagePayload::MessagePack(b.to_vec()), OutputPayload::Exception(e) => MessagePayload::Exception(e.to_string()), OutputPayload::Error(e) => MessagePayload::Error(e.to_string()), + OutputPayload::Test(v) => MessagePayload::Test(v.to_string()), }; let get_ref = network.send(GetReference { inv_id: inv_id.to_string(), }); match block_on(async { get_ref.await })? { Some((tx_id, schematic, entity)) => match entity { - VinoEntity::Component(reference) => { + VinoEntity::Component(e) => { let port = PortEntity { name: port.to_string(), - reference, + reference: e.reference, schematic, }; debug!( diff --git a/crates/vino-runtime/src/error.rs b/crates/vino-runtime/src/error.rs index 50742a6dd..879458dc4 100644 --- a/crates/vino-runtime/src/error.rs +++ b/crates/vino-runtime/src/error.rs @@ -13,9 +13,11 @@ pub enum VinoError { SchematicError(String), #[error("Dispatch error: {0}")] DispatchError(String), + #[error("Provider error {0}")] + ProviderError(String), #[error("Payload conversion error")] PayloadConversionError(String), - #[error("Schematic error: {0}")] + #[error("Component error: {0}")] ComponentError(String), #[error("Job error: {0}")] JobError(String), @@ -36,6 +38,8 @@ pub enum VinoError { #[error("Failed to deserialize payload {0}")] DeserializationError(rmp_serde::decode::Error), #[error(transparent)] + RpcUpstreamError(#[from] tonic::Status), + #[error(transparent)] TransportError(#[from] vino_transport::Error), #[error(transparent)] YamlError(#[from] serde_yaml::Error), diff --git a/crates/vino-runtime/src/network.rs b/crates/vino-runtime/src/network.rs index 8ef58a099..b51654ba6 100644 --- a/crates/vino-runtime/src/network.rs +++ b/crates/vino-runtime/src/network.rs @@ -191,17 +191,17 @@ impl Handler for Network { #[derive(Message)] #[rtype(result = "()")] -pub(crate) struct MapInvocation { +pub(crate) struct RecordInvocationState { pub(crate) inv_id: String, pub(crate) tx_id: String, pub(crate) schematic: String, pub(crate) entity: VinoEntity, } -impl Handler for Network { +impl Handler for Network { type Result = (); - fn handle(&mut self, msg: MapInvocation, _ctx: &mut Context) -> Self::Result { + fn handle(&mut self, msg: RecordInvocationState, _ctx: &mut Context) -> Self::Result { self .invocation_map .insert(msg.inv_id, (msg.tx_id, msg.schematic, msg.entity)); @@ -253,6 +253,7 @@ impl Handler for Network { OutputPayload::MessagePack(b) => MessagePayload::MessagePack(b), OutputPayload::Exception(e) => MessagePayload::Exception(e), OutputPayload::Error(e) => MessagePayload::Error(e), + OutputPayload::Test(v) => MessagePayload::Test(v), }, Err(e) => e, }; @@ -260,7 +261,7 @@ impl Handler for Network { async move { let port = PortEntity { name: port, - reference: entity.into_component()?, + reference: entity.into_component()?.reference, schematic: schematic_name, }; match receiver { @@ -339,6 +340,7 @@ impl std::fmt::Debug for ComponentRegistry { #[derive(Debug, Clone)] pub struct ComponentMetadata { + pub name: String, pub inputs: Inputs, pub outputs: Outputs, pub addr: Recipient, diff --git a/crates/vino-runtime/src/schematic.rs b/crates/vino-runtime/src/schematic.rs index 9165b2051..d54a110ec 100644 --- a/crates/vino-runtime/src/schematic.rs +++ b/crates/vino-runtime/src/schematic.rs @@ -14,8 +14,6 @@ use serde::{ Deserialize, Serialize, }; -use vino_guest::Signal; -use vino_transport::deserialize; use wascap::prelude::KeyPair; use super::dispatch::{ @@ -23,18 +21,19 @@ use super::dispatch::{ InvocationResponse, }; use super::schematic_response::initialize_schematic_output; +use crate::components::vino_component::get_components_for_schematic; use crate::dispatch::{ + ComponentEntity, PortEntity, VinoEntity, }; use crate::error::VinoError; use crate::network::{ ComponentMetadata, - MapInvocation, Network, + RecordInvocationState, }; use crate::schematic_definition::{ - get_components_for_schematic, ConnectionDefinition, ConnectionTargetDefinition, SchematicDefinition, @@ -188,25 +187,31 @@ impl Schematic { .entry(tx_id.to_string()) .or_insert_with(new_refmap); - let actor = state + let component_id = state .references .get(&reference) .ok_or_else(|| Error::SchematicError(format!("Could not find reference {}", reference)))?; trace!("pushing to {}", port); let key = reference.to_string(); - let metadata = state.components.get(actor).ok_or_else(|| { + let metadata = state.components.get(component_id).ok_or_else(|| { Error::SchematicError(format!( "Could not find metadata for {}. Component may have failed to load.", - actor + component_id )) })?; + debug!("reference: {}", reference); + debug!("refmap key: {}", key); refmap .entry(key) .or_insert_with(|| new_inputbuffer_map(metadata)); + debug!("port_name: {:?}", port.name); + debug!("refmap: {:?}", refmap); push_to_portbuffer(refmap, reference.to_string(), port.name.clone(), data)?; + debug!("refmap: {:?}", refmap); + if !component_has_data(refmap, &reference) { return Ok(ComponentStatus::Waiting); } @@ -228,7 +233,11 @@ impl Schematic { &tx_id, &kp, VinoEntity::Schematic(port.schematic.to_string()), - VinoEntity::Component(reference.to_string()), + VinoEntity::Component(ComponentEntity { + id: component_id.to_string(), + name: metadata.name.to_string(), + reference, + }), MessagePayload::MultiBytes(job_data), ))) } @@ -447,7 +456,7 @@ impl Handler for Schematic { #[derive(Debug, Clone, Serialize, Deserialize, Message, PartialEq)] #[rtype(result = "Result<()>")] -struct MessagePacket { +struct PayloadReceived { tx_id: String, origin: VinoEntity, target: PortEntity, @@ -459,7 +468,7 @@ fn gen_packets( tx_id: String, name: String, bytemap: HashMap>, -) -> Result> { +) -> Result> { let schematic = &sm.definition; let _kp = KeyPair::from_seed(&sm.seed)?; @@ -467,7 +476,7 @@ fn gen_packets( initialize_schematic_output(&tx_id, &name, schematic_outputs); - let invocations: Vec = schematic + let invocations: Vec = schematic .connections .iter() .filter(|conn| conn.from.instance == crate::SCHEMATIC_INPUT) @@ -476,7 +485,7 @@ fn gen_packets( .get(&conn.from.port) .unwrap_or_else(|| panic!("Output on port '{}' not found", conn.to.instance)); - MessagePacket { + PayloadReceived { target: PortEntity { schematic: name.to_string(), name: conn.to.port.to_string(), @@ -492,7 +501,7 @@ fn gen_packets( } async fn handle_schematic_invocation( - invocations: Vec, + invocations: Vec, schematic: Addr, tx_id: String, target: String, @@ -514,10 +523,10 @@ async fn handle_schematic_invocation( Ok(response) } -impl Handler for Schematic { +impl Handler for Schematic { type Result = ResponseActFuture>; - fn handle(&mut self, msg: MessagePacket, ctx: &mut Context) -> Self::Result { + fn handle(&mut self, msg: PayloadReceived, ctx: &mut Context) -> Self::Result { let name = self.definition.get_name(); let port = msg.target; let payload = msg.payload; @@ -538,7 +547,7 @@ impl Handler for Schematic { let status = self.push_to_port(tx_id.to_string(), &port, payload); let schematic_host = ctx.address(); - let receiver = self.get_downstream_recipient(reference.to_string()); + let recipient = self.get_downstream_recipient(reference.to_string()); let network = self.network.clone().unwrap(); Box::pin( @@ -568,16 +577,16 @@ impl Handler for Schematic { Ok(()) } Ok(ComponentStatus::Ready(invocation)) => { - network.do_send(MapInvocation { + network.do_send(RecordInvocationState { inv_id: invocation.id.to_string(), tx_id: tx_id.clone(), schematic: name.to_string(), entity: invocation.target.clone(), }); - let response: Result = match receiver { - Some(receiver) => match receiver.send(invocation).await { - Ok(response) => Ok(deserialize(&response.msg)?), + let response: Result<()> = match recipient { + Some(recipient) => match recipient.send(invocation).await { + Ok(_) => Ok(()), Err(err) => Err(err.into()), }, None => Err("No receiver found".into()), @@ -706,7 +715,7 @@ impl Handler for Schematic { name: conn.to.port.to_string(), reference: conn.to.instance, }; - MessagePacket { + PayloadReceived { tx_id: tx_id.clone(), origin: origin.clone(), target: entity, diff --git a/crates/vino-runtime/src/schematic_definition.rs b/crates/vino-runtime/src/schematic_definition.rs index c4b2a3f80..2c37ed575 100644 --- a/crates/vino-runtime/src/schematic_definition.rs +++ b/crates/vino-runtime/src/schematic_definition.rs @@ -1,33 +1,12 @@ use std::collections::HashMap; use std::fmt::Display; -use std::path::Path; -use actix::{ - Recipient, - SyncArbiter, -}; use serde::{ Deserialize, Serialize, }; use vino_manifest::SchematicManifest; -use crate::components::native_component_actor::{ - self, - NativeComponentActor, -}; -use crate::components::vino_component::{ - BoxedComponent, - NativeComponent, - VinoComponent, - WapcComponent, -}; -use crate::components::wapc_component_actor; -use crate::components::wapc_component_actor::WapcComponentActor; -use crate::dispatch::Invocation; -use crate::network::ComponentMetadata; -use crate::network_definition::NetworkDefinition; -use crate::util::oci::fetch_oci_bytes; use crate::{ Error, Result, @@ -212,160 +191,3 @@ where } } } - -pub(crate) async fn _get_components( - network: &NetworkDefinition, - seed: String, - allow_latest: bool, - allowed_insecure: &[String], -) -> Result)>> { - let mut v: Vec<(BoxedComponent, Recipient)> = Vec::new(); - debug!("getting components {:?}", network); - - for schematic in &network.schematics { - debug!("{:?}", schematic); - for comp in schematic.components.values() { - debug!("{:?}", comp); - let component_ref = schematic.id_to_ref(&comp.id)?; - let component = get_component( - component_ref.to_string(), - seed.clone(), - allow_latest, - allowed_insecure, - ) - .await?; - v.push(component); - } - } - Ok(v) -} - -pub(crate) async fn get_components_for_schematic( - schematic: SchematicDefinition, - seed: String, - allow_latest: bool, - allowed_insecure: Vec, -) -> Result> { - let mut metadata_map: HashMap = HashMap::new(); - debug!("getting components for schematic {:?}", schematic); - - for comp in schematic.components.values() { - debug!("{:?}", comp); - let component_ref = schematic.id_to_ref(&comp.id)?; - let (component, addr) = get_component( - component_ref.to_string(), - seed.clone(), - allow_latest, - &allowed_insecure, - ) - .await?; - metadata_map.insert( - component.id(), - ComponentMetadata { - inputs: component.get_inputs(), - outputs: component.get_outputs(), - addr, - }, - ); - } - Ok(metadata_map) -} - -pub(crate) async fn get_component( - comp_ref: String, - seed: String, - allow_latest: bool, - allowed_insecure: &[String], -) -> Result<(BoxedComponent, Recipient)> { - let p = Path::new(&comp_ref); - let component: Result<(BoxedComponent, Recipient)> = if p.exists() { - // read actor from disk - match WapcComponent::from_file(p) { - Ok(component) => { - trace!( - "Starting wapc component '{}' from file {}", - component.name(), - p.to_string_lossy() - ); - let actor = SyncArbiter::start(1, WapcComponentActor::default); - actor - .send(wapc_component_actor::Initialize { - actor_bytes: component.bytes.clone(), - signing_seed: seed, - }) - .await??; - let recipient = actor.recipient::(); - Ok((Box::new(component), recipient)) - } - Err(e) => Err(Error::SchematicError(format!( - "Could not read file {}:{}", - comp_ref, - e.to_string() - ))), - } - } else { - let providers = vec!["vino".to_string()]; - for namespace in providers { - if comp_ref.starts_with(&format!("{}::", namespace)) { - trace!( - "registering component under the {} provider namespace", - namespace - ); - let name = str::replace(&comp_ref, &format!("{}::", namespace), ""); - // todo temporary and very hacky - if namespace == "vino" { - match NativeComponent::from_id(namespace, name) { - Ok(component) => { - trace!("Starting native component '{}'", component.name(),); - let actor = SyncArbiter::start(1, NativeComponentActor::default); - actor - .send(native_component_actor::Initialize { - name: component.name(), - signing_seed: seed, - }) - .await??; - let recipient = actor.recipient::(); - - return Ok((Box::new(component), recipient)); - } - Err(e) => { - return Err(Error::SchematicError(format!( - "Could not load native component {}: {}", - comp_ref, e - ))) - } - } - } - } - } - // load actor from OCI - let component = fetch_oci_bytes(&comp_ref, allow_latest, allowed_insecure) - .await - .and_then(|bytes| WapcComponent::from_slice(&bytes)); - match component { - Ok(component) => { - trace!( - "Starting wapc component '{}' from URL {}", - component.name(), - comp_ref - ); - - let actor = SyncArbiter::start(1, WapcComponentActor::default); - actor - .send(wapc_component_actor::Initialize { - actor_bytes: component.bytes.clone(), - signing_seed: seed, - }) - .await??; - - let recipient = actor.recipient::(); - Ok((Box::new(component), recipient)) - } - Err(_) => Err(Error::SchematicError(format!( - "Could not find component '{}' on disk or in registry", - comp_ref, - ))), - } - }; - component -} diff --git a/crates/vino-runtime/src/util/mod.rs b/crates/vino-runtime/src/util/mod.rs index 288fffba6..35ae1706d 100644 --- a/crates/vino-runtime/src/util/mod.rs +++ b/crates/vino-runtime/src/util/mod.rs @@ -1,3 +1,2 @@ pub(crate) mod hlreg; -// pub(crate) mod native_macro; pub(crate) mod oci; diff --git a/crates/vino-runtime/src/util/native_macro.rs b/crates/vino-runtime/src/util/native_macro.rs deleted file mode 100644 index 1f6e43751..000000000 --- a/crates/vino-runtime/src/util/native_macro.rs +++ /dev/null @@ -1,75 +0,0 @@ -#[macro_export] -macro_rules! native_actor {( - $component_name:ident, - fn job($inputs_name:ident:Inputs, $outputs_name:ident:Outputs) -> Result $fun:block -) => { - - use crate::{Result, Error}; - use crate::components::native_component_actor::{NativeActor,NativeCallback}; - use crate::components::vino_component::NativeComponent; - use vino_transport::deserialize; - use vino_guest::Signal; - use vino_rpc::port::Sender; - - use super::generated::$component_name::{Inputs, Outputs, InputEncoded, get_outputs, inputs_list, outputs_list, deserialize_inputs}; - - pub(crate) struct Actor { - callback: Option - } - - impl<'a> Default for Actor { - fn default() -> Self { - Self { - callback: None, - } - } - } - - impl Actor { - pub(crate) fn new(cb: NativeCallback) -> Self { - Actor { - callback: Some(cb) - } - } - } - - impl NativeActor for Actor { - fn get_def(&self) -> NativeComponent { - NativeComponent { - id: self.get_name(), - inputs: self.get_input_ports(), - outputs: self.get_output_ports() - } - } - fn get_name(&self) -> String { - format!("vino::{}",std::stringify!($component_name)) - } - fn get_input_ports(&self) -> Vec { - inputs_list() - } - fn get_output_ports(&self) -> Vec { - outputs_list() - } - // async fn job_wrapper(&self, context:ProviderContext, data: &[u8]) -> Result { - // trace!("Job passed data: {:?}", data); - // let input_encoded : InputEncoded = vino_transport::deserialize(&data)?; - // let inputs = deserialize_inputs(input_encoded).map_err(ProviderError::InputDeserializationError)?; - // let (outputs, receiver) = get_outputs(); - // job(inputs, outputs, context).await?; - // Ok(receiver) - // } - fn job_wrapper(&self, data: &[u8]) -> Result{ - match &self.callback { - Some(callback) => { - let (inv_id, input_encoded) : (String, InputEncoded) = deserialize(&data)?; - let inputs = deserialize_inputs(input_encoded)?; - let outputs = get_outputs(callback, inv_id); - job(inputs, outputs) - }, - None => Err(Error::JobError(format!("No callback registered with native actor '{}'", self.get_name()))) - } - } - } - - pub(crate) fn job($inputs_name: Inputs, $outputs_name: Outputs) -> Result $fun -}} diff --git a/src/lib.rs b/src/lib.rs index 9687099b9..9c5c8db7a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,10 +12,8 @@ use vino_host::{ HostBuilder, HostDefinition, }; -use vino_runtime::{ - deserialize, - MessagePayload, -}; +use vino_runtime::MessagePayload; +use vino_transport::deserialize; pub type Result = std::result::Result; pub type Error = VinoError;