diff --git a/Cargo.lock b/Cargo.lock index 2d9c8fcd535..d1c90290746 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -251,6 +251,28 @@ dependencies = [ "tokio", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.75", +] + [[package]] name = "async-task" version = "4.7.1" @@ -1589,6 +1611,7 @@ dependencies = [ "stageleft_tool", "syn 2.0.75", "tokio", + "tokio-test", "toml", "trybuild", "trybuild-internals-api", @@ -3701,6 +3724,19 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "tokio-test" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2468baabc3311435b55dd935f702f42cd1b8abb7e754fb7dfb16bd36aa88f9f7" +dependencies = [ + "async-stream", + "bytes", + "futures-core", + "tokio", + "tokio-stream", +] + [[package]] name = "tokio-tungstenite" version = "0.20.1" diff --git a/hydro_lang/Cargo.toml b/hydro_lang/Cargo.toml index 025ca886a34..0ebfe55e1e6 100644 --- a/hydro_lang/Cargo.toml +++ b/hydro_lang/Cargo.toml @@ -49,4 +49,5 @@ async-ssh2-lite = { version = "0.5.0", features = ["vendored-openssl"] } ctor = "0.2.8" hydro_deploy = { path = "../hydro_deploy/core", version = "^0.11.0" } insta = "1.39" +tokio-test = "0.4.4" trybuild = "1" diff --git a/hydro_lang/src/lib.rs b/hydro_lang/src/lib.rs index 67ad1133827..1f434e6bd27 100644 --- a/hydro_lang/src/lib.rs +++ b/hydro_lang/src/lib.rs @@ -43,6 +43,9 @@ pub mod rewrites; mod staging_util; +#[cfg(feature = "deploy")] +pub mod test_util; + #[stageleft::runtime] #[cfg(test)] mod tests { diff --git a/hydro_lang/src/stream.rs b/hydro_lang/src/stream.rs index 6a29fec9405..98fd8599cd5 100644 --- a/hydro_lang/src/stream.rs +++ b/hydro_lang/src/stream.rs @@ -211,6 +211,24 @@ impl<'a, T: Clone, L: Location<'a>, B, Order> Clone for Stream { } impl<'a, T, L: Location<'a>, B, Order> Stream { + /// Transforms the stream by applying a function (`f`) to each element, + /// emitting the output elements in the same order as the input. + /// + /// # Example + /// ```rust + /// # use hydro_lang::*; + /// # use dfir_rs::futures::StreamExt; + /// # tokio_test::block_on(test_util::stream_transform_test(|process| { + /// let numbers = process.source_iter(q!(0..10)); + /// let mapped = numbers.map(q!(|n| n * 2)); + /// # mapped + /// # }, |mut stream| async move { + /// // 2, 4, 6, 8, ... + /// # for i in 0..10 { + /// # assert_eq!(stream.next().await.unwrap(), i * 2); + /// # } + /// # })); + /// ``` pub fn map U + 'a>( self, f: impl IntoQuotedMut<'a, F, L>, diff --git a/hydro_lang/src/test_util.rs b/hydro_lang/src/test_util.rs new file mode 100644 index 00000000000..f79e4ce476e --- /dev/null +++ b/hydro_lang/src/test_util.rs @@ -0,0 +1,34 @@ +use std::future::Future; +use std::pin::Pin; + +use serde::de::DeserializeOwned; +use serde::Serialize; + +use crate::{FlowBuilder, Process, Stream, Unbounded}; + +pub async fn stream_transform_test< + 'a, + O: Serialize + DeserializeOwned + 'static, + C: Future, +>( + thunk: impl FnOnce(&Process<'a>) -> Stream, Unbounded>, + check: impl FnOnce(Pin>>) -> C, +) { + let mut deployment = hydro_deploy::Deployment::new(); + let flow = FlowBuilder::new(); + let process = flow.process::<()>(); + let external = flow.external_process::<()>(); + let out = thunk(&process); + let out_port = out.send_bincode_external(&external); + let nodes = flow + .with_process(&process, deployment.Localhost()) + .with_external(&external, deployment.Localhost()) + .deploy(&mut deployment); + + deployment.deploy().await.unwrap(); + + let external_out = nodes.connect_source_bincode(out_port).await; + deployment.start().await.unwrap(); + + check(external_out).await; +}