Skip to content

Commit

Permalink
feat(hydroflow_lang): make fold output optional, usable as just a sin…
Browse files Browse the repository at this point in the history
…gleton reference (#1134)
  • Loading branch information
MingweiSamuel committed Apr 9, 2024
1 parent 994990b commit 0f16d1f
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
---
source: hydroflow/tests/surface_singleton.rs
expression: "df.meta_graph().unwrap().to_dot(&Default::default())"
---
digraph {
node [fontname="Monaco,Menlo,Consolas,"Droid Sans Mono",Inconsolata,"Courier New",monospace", style=filled];
edge [fontname="Monaco,Menlo,Consolas,"Droid Sans Mono",Inconsolata,"Courier New",monospace"];
n1v1 [label="(n1v1) source_iter(10..=30)", shape=invhouse, fillcolor="#88aaff"]
n2v1 [label="(n2v1) source_iter(15..=25)", shape=invhouse, fillcolor="#88aaff"]
n3v1 [label="(n3v1) fold(|| 0, std::ops::AddAssign::add_assign)", shape=house, fillcolor="#ffff88"]
n4v1 [label="(n4v1) inspect(|x| println!(\"inspect {}\", x))", shape=invhouse, fillcolor="#88aaff"]
n5v1 [label="(n5v1) filter(|&value| { value <= sum_of_stream2 })", shape=invhouse, fillcolor="#88aaff"]
n6v1 [label="(n6v1) for_each(|x| println!(\"filtered {}\", x))", shape=house, fillcolor="#ffff88"]
n7v1 [label="(n7v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n2v1 -> n7v1
n5v1 -> n6v1
n4v1 -> n5v1
n1v1 -> n4v1
n7v1 -> n3v1 [color=red]
n5v1 -> n3v1 [color=red]
subgraph "cluster n1v1" {
fillcolor="#dddddd"
style=filled
label = "sg_1v1\nstratum 0"
n2v1
subgraph "cluster_sg_1v1_var_stream2" {
label="var stream2"
n2v1
}
}
subgraph "cluster n2v1" {
fillcolor="#dddddd"
style=filled
label = "sg_2v1\nstratum 1"
n3v1
subgraph "cluster_sg_2v1_var_sum_of_stream2" {
label="var sum_of_stream2"
n3v1
}
}
subgraph "cluster n3v1" {
fillcolor="#dddddd"
style=filled
label = "sg_3v1\nstratum 0"
n1v1
n4v1
n5v1
n6v1
subgraph "cluster_sg_3v1_var_filtered_stream1" {
label="var filtered_stream1"
n4v1
n5v1
n6v1
}
subgraph "cluster_sg_3v1_var_stream1" {
label="var stream1"
n1v1
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
---
source: hydroflow/tests/surface_singleton.rs
expression: "df.meta_graph().unwrap().to_mermaid(&Default::default())"
---
%%{init:{'theme':'base','themeVariables':{'clusterBkg':'#ddd','clusterBorder':'#888'}}}%%
flowchart TD
classDef pullClass fill:#8af,stroke:#000,text-align:left,white-space:pre
classDef pushClass fill:#ff8,stroke:#000,text-align:left,white-space:pre
classDef otherClass fill:#fdc,stroke:#000,text-align:left,white-space:pre
linkStyle default stroke:#aaa
1v1[\"(1v1) <code>source_iter(10..=30)</code>"/]:::pullClass
2v1[\"(2v1) <code>source_iter(15..=25)</code>"/]:::pullClass
3v1[/"(3v1) <code>fold(|| 0, std::ops::AddAssign::add_assign)</code>"\]:::pushClass
4v1[\"(4v1) <code>inspect(|x| println!(&quot;inspect {}&quot;, x))</code>"/]:::pullClass
5v1[\"(5v1) <code>filter(|&amp;value| { value &lt;= sum_of_stream2 })</code>"/]:::pullClass
6v1[/"(6v1) <code>for_each(|x| println!(&quot;filtered {}&quot;, x))</code>"\]:::pushClass
7v1["(7v1) <code>handoff</code>"]:::otherClass
2v1-->7v1
5v1-->6v1
4v1-->5v1
1v1-->4v1
7v1--x3v1; linkStyle 4 stroke:red
5v1--x3v1; linkStyle 5 stroke:red
subgraph sg_1v1 ["sg_1v1 stratum 0"]
2v1
subgraph sg_1v1_var_stream2 ["var <tt>stream2</tt>"]
2v1
end
end
subgraph sg_2v1 ["sg_2v1 stratum 1"]
3v1
subgraph sg_2v1_var_sum_of_stream2 ["var <tt>sum_of_stream2</tt>"]
3v1
end
end
subgraph sg_3v1 ["sg_3v1 stratum 0"]
1v1
4v1
5v1
6v1
subgraph sg_3v1_var_filtered_stream1 ["var <tt>filtered_stream1</tt>"]
4v1
5v1
6v1
end
subgraph sg_3v1_var_stream1 ["var <tt>stream1</tt>"]
1v1
end
end

23 changes: 22 additions & 1 deletion hydroflow/tests/surface_singleton.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,5 +86,26 @@ pub fn test_fold_singleton() {

assert_graphvis_snapshots!(df);

df.run_available(); // Should return quickly and not hang
df.run_available();
}

#[multiplatform_test]
pub fn test_fold_singleton_push() {
let mut df = hydroflow::hydroflow_syntax! {
stream1 = source_iter(10..=30);
stream2 = source_iter(15..=25);
sum_of_stream2 = stream2 -> fold(|| 0, std::ops::AddAssign::add_assign);

filtered_stream1 = stream1
-> inspect(|x| println!("inspect {}", x))
-> filter(|&value| {
// This is not monotonic.
value <= #sum_of_stream2
})
-> for_each(|x| println!("filtered {}", x));
};

assert_graphvis_snapshots!(df);

df.run_available();
}
64 changes: 40 additions & 24 deletions hydroflow_lang/src/graph/ops/fold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ pub const FOLD: OperatorConstraints = OperatorConstraints {
categories: &[OperatorCategory::Fold],
hard_range_inn: RANGE_1,
soft_range_inn: RANGE_1,
hard_range_out: RANGE_1,
soft_range_out: RANGE_1,
hard_range_out: &(0..=1),
soft_range_out: &(0..=1),
num_args: 2,
persistence_args: &(0..=1),
type_args: RANGE_0,
Expand All @@ -49,6 +49,7 @@ pub const FOLD: OperatorConstraints = OperatorConstraints {
output_edgetype_fn: |_| GraphEdgeType::Value,
flow_prop_fn: None,
write_fn: |wc @ &WriteContextArgs {
root,
context,
hydroflow,
op_span,
Expand All @@ -68,8 +69,6 @@ pub const FOLD: OperatorConstraints = OperatorConstraints {
..
},
diagnostics| {
assert!(is_pull);

let persistence = match persistence_args[..] {
[] => Persistence::Tick,
[a] => a,
Expand Down Expand Up @@ -105,6 +104,18 @@ pub const FOLD: OperatorConstraints = OperatorConstraints {
} else {
Default::default() // No code
};
let iterator_foreach = quote_spanned! {op_span=>
#[inline(always)]
fn call_comb_type<Accum, Item, Out>(
accum: &mut Accum,
item: Item,
func: impl Fn(&mut Accum, Item) -> Out
) -> Out {
(func)(accum, item)
}
#[allow(clippy::redundant_closure_call)]
call_comb_type(&mut *#accumulator_ident, #iterator_item_ident, #func);
};

let write_prologue = quote_spanned! {op_span=>
let #initializer_func_ident = #init;
Expand All @@ -114,29 +125,34 @@ pub const FOLD: OperatorConstraints = OperatorConstraints {
::std::cell::RefCell::new((#initializer_func_ident)())
);
};
let write_iterator = quote_spanned! {op_span=>
let #ident = {
let mut #accumulator_ident = #context.state_ref(#singleton_output_ident).borrow_mut();
#tick_reset_code
let write_iterator = if is_pull {
quote_spanned! {op_span=>
let #ident = {
let mut #accumulator_ident = #context.state_ref(#singleton_output_ident).borrow_mut();
#tick_reset_code

#input.for_each(|#iterator_item_ident| {
#iterator_foreach
});

#input.for_each(|#iterator_item_ident| {
#[inline(always)]
fn call_comb_type<Accum, Item, Out>(
accum: &mut Accum,
item: Item,
func: impl Fn(&mut Accum, Item) -> Out
) -> Out {
(func)(accum, item)
#[allow(clippy::clone_on_copy)]
{
::std::iter::once(::std::clone::Clone::clone(&*#accumulator_ident))
}
#[allow(clippy::redundant_closure_call)]
call_comb_type(&mut *#accumulator_ident, #iterator_item_ident, #func);
});
};
}
} else {
quote_spanned! {op_span=>
let #ident = {
let mut #accumulator_ident = #context.state_ref(#singleton_output_ident).borrow_mut();
#tick_reset_code

#[allow(clippy::clone_on_copy)]
{
::std::iter::once(::std::clone::Clone::clone(&*#accumulator_ident))
}
};
#root::pusherator::for_each::ForEach::new(|#iterator_item_ident| {
let mut #accumulator_ident = #context.state_ref(#singleton_output_ident).borrow_mut();
#iterator_foreach
})
};
}
};
let write_iterator_after = if Persistence::Static == persistence {
quote_spanned! {op_span=>
Expand Down

0 comments on commit 0f16d1f

Please sign in to comment.