Skip to content

Commit

Permalink
test(hydro_lang): add example of doctest for map (#1636)
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj authored Jan 8, 2025
1 parent e5e6b75 commit b1514be
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 0 deletions.
36 changes: 36 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions hydro_lang/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,5 @@ async-ssh2-lite = { version = "0.5.0", features = ["vendored-openssl"] }
ctor = "0.2.8"
hydro_deploy = { path = "../hydro_deploy/core", version = "^0.11.0" }
insta = "1.39"
tokio-test = "0.4.4"
trybuild = "1"
3 changes: 3 additions & 0 deletions hydro_lang/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ pub mod rewrites;

mod staging_util;

#[cfg(feature = "deploy")]
pub mod test_util;

#[stageleft::runtime]
#[cfg(test)]
mod tests {
Expand Down
18 changes: 18 additions & 0 deletions hydro_lang/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,24 @@ impl<'a, T: Clone, L: Location<'a>, B, Order> Clone for Stream<T, L, B, Order> {
}

impl<'a, T, L: Location<'a>, B, Order> Stream<T, L, B, Order> {
/// Transforms the stream by applying a function (`f`) to each element,
/// emitting the output elements in the same order as the input.
///
/// # Example
/// ```rust
/// # use hydro_lang::*;
/// # use dfir_rs::futures::StreamExt;
/// # tokio_test::block_on(test_util::stream_transform_test(|process| {
/// let numbers = process.source_iter(q!(0..10));
/// let mapped = numbers.map(q!(|n| n * 2));
/// # mapped
/// # }, |mut stream| async move {
/// // 2, 4, 6, 8, ...
/// # for i in 0..10 {
/// # assert_eq!(stream.next().await.unwrap(), i * 2);
/// # }
/// # }));
/// ```
pub fn map<U, F: Fn(T) -> U + 'a>(
self,
f: impl IntoQuotedMut<'a, F, L>,
Expand Down
34 changes: 34 additions & 0 deletions hydro_lang/src/test_util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use std::future::Future;
use std::pin::Pin;

use serde::de::DeserializeOwned;
use serde::Serialize;

use crate::{FlowBuilder, Process, Stream, Unbounded};

pub async fn stream_transform_test<
'a,
O: Serialize + DeserializeOwned + 'static,
C: Future<Output = ()>,
>(
thunk: impl FnOnce(&Process<'a>) -> Stream<O, Process<'a>, Unbounded>,
check: impl FnOnce(Pin<Box<dyn dfir_rs::futures::Stream<Item = O>>>) -> C,
) {
let mut deployment = hydro_deploy::Deployment::new();
let flow = FlowBuilder::new();
let process = flow.process::<()>();
let external = flow.external_process::<()>();
let out = thunk(&process);
let out_port = out.send_bincode_external(&external);
let nodes = flow
.with_process(&process, deployment.Localhost())
.with_external(&external, deployment.Localhost())
.deploy(&mut deployment);

deployment.deploy().await.unwrap();

let external_out = nodes.connect_source_bincode(out_port).await;
deployment.start().await.unwrap();

check(external_out).await;
}

0 comments on commit b1514be

Please sign in to comment.