Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(hydroflow_lang): allow fold() to be referenceable as a singleton (3/x) #1134

Merged
merged 3 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
---
source: hydroflow/tests/surface_singleton.rs
expression: "df.meta_graph().unwrap().to_dot(&Default::default())"
---
digraph {
node [fontname="Monaco,Menlo,Consolas,"Droid Sans Mono",Inconsolata,"Courier New",monospace", style=filled];
edge [fontname="Monaco,Menlo,Consolas,"Droid Sans Mono",Inconsolata,"Courier New",monospace"];
n1v1 [label="(n1v1) source_iter(10..=30)", shape=invhouse, fillcolor="#88aaff"]
n2v1 [label="(n2v1) source_iter(15..=25)", shape=invhouse, fillcolor="#88aaff"]
n3v1 [label="(n3v1) fold(|| 0, std::ops::AddAssign::add_assign)", shape=invhouse, fillcolor="#88aaff"]
n4v1 [label="(n4v1) inspect(|x| println!(\"inspect {}\", x))", shape=invhouse, fillcolor="#88aaff"]
n5v1 [label="(n5v1) filter(|&value| { value <= sum_of_stream2 })", shape=invhouse, fillcolor="#88aaff"]
n6v1 [label="(n6v1) for_each(|x| println!(\"filtered {}\", x))", shape=house, fillcolor="#ffff88"]
n7v1 [label="(n7v1) for_each(|x| println!(\"state {:?}\", x))", shape=house, fillcolor="#ffff88"]
n8v1 [label="(n8v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n2v1 -> n8v1
n5v1 -> n6v1
n4v1 -> n5v1
n1v1 -> n4v1
n3v1 -> n7v1
n8v1 -> n3v1 [color=red]
n5v1 -> n3v1 [color=red]
subgraph "cluster n1v1" {
fillcolor="#dddddd"
style=filled
label = "sg_1v1\nstratum 0"
n2v1
subgraph "cluster_sg_1v1_var_stream2" {
label="var stream2"
n2v1
}
}
subgraph "cluster n2v1" {
fillcolor="#dddddd"
style=filled
label = "sg_2v1\nstratum 0"
n1v1
n4v1
n5v1
n6v1
subgraph "cluster_sg_2v1_var_filtered_stream1" {
label="var filtered_stream1"
n4v1
n5v1
n6v1
}
subgraph "cluster_sg_2v1_var_stream1" {
label="var stream1"
n1v1
}
}
subgraph "cluster n3v1" {
fillcolor="#dddddd"
style=filled
label = "sg_3v1\nstratum 1"
n3v1
n7v1
subgraph "cluster_sg_3v1_var_sum_of_stream2" {
label="var sum_of_stream2"
n3v1
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
---
source: hydroflow/tests/surface_singleton.rs
expression: "df.meta_graph().unwrap().to_mermaid(&Default::default())"
---
%%{init:{'theme':'base','themeVariables':{'clusterBkg':'#ddd','clusterBorder':'#888'}}}%%
flowchart TD
classDef pullClass fill:#8af,stroke:#000,text-align:left,white-space:pre
classDef pushClass fill:#ff8,stroke:#000,text-align:left,white-space:pre
classDef otherClass fill:#fdc,stroke:#000,text-align:left,white-space:pre
linkStyle default stroke:#aaa
1v1[\"(1v1) <code>source_iter(10..=30)</code>"/]:::pullClass
2v1[\"(2v1) <code>source_iter(15..=25)</code>"/]:::pullClass
3v1[\"(3v1) <code>fold(|| 0, std::ops::AddAssign::add_assign)</code>"/]:::pullClass
4v1[\"(4v1) <code>inspect(|x| println!(&quot;inspect {}&quot;, x))</code>"/]:::pullClass
5v1[\"(5v1) <code>filter(|&amp;value| { value &lt;= sum_of_stream2 })</code>"/]:::pullClass
6v1[/"(6v1) <code>for_each(|x| println!(&quot;filtered {}&quot;, x))</code>"\]:::pushClass
7v1[/"(7v1) <code>for_each(|x| println!(&quot;state {:?}&quot;, x))</code>"\]:::pushClass
8v1["(8v1) <code>handoff</code>"]:::otherClass
2v1-->8v1
5v1-->6v1
4v1-->5v1
1v1-->4v1
3v1-->7v1
8v1--x3v1; linkStyle 5 stroke:red
5v1--x3v1; linkStyle 6 stroke:red
subgraph sg_1v1 ["sg_1v1 stratum 0"]
2v1
subgraph sg_1v1_var_stream2 ["var <tt>stream2</tt>"]
2v1
end
end
subgraph sg_2v1 ["sg_2v1 stratum 0"]
1v1
4v1
5v1
6v1
subgraph sg_2v1_var_filtered_stream1 ["var <tt>filtered_stream1</tt>"]
4v1
5v1
6v1
end
subgraph sg_2v1_var_stream1 ["var <tt>stream1</tt>"]
1v1
end
end
subgraph sg_3v1 ["sg_3v1 stratum 1"]
3v1
7v1
subgraph sg_3v1_var_sum_of_stream2 ["var <tt>sum_of_stream2</tt>"]
3v1
end
end

Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
---
source: hydroflow/tests/surface_singleton.rs
expression: "df.meta_graph().unwrap().to_dot(&Default::default())"
---
digraph {
node [fontname="Monaco,Menlo,Consolas,&quot;Droid Sans Mono&quot;,Inconsolata,&quot;Courier New&quot;,monospace", style=filled];
edge [fontname="Monaco,Menlo,Consolas,&quot;Droid Sans Mono&quot;,Inconsolata,&quot;Courier New&quot;,monospace"];
n1v1 [label="(n1v1) source_iter(10..=30)", shape=invhouse, fillcolor="#88aaff"]
n2v1 [label="(n2v1) source_iter(15..=25)", shape=invhouse, fillcolor="#88aaff"]
n3v1 [label="(n3v1) fold(|| 0, std::ops::AddAssign::add_assign)", shape=house, fillcolor="#ffff88"]
n4v1 [label="(n4v1) inspect(|x| println!(\"inspect {}\", x))", shape=invhouse, fillcolor="#88aaff"]
n5v1 [label="(n5v1) filter(|&value| { value <= sum_of_stream2 })", shape=invhouse, fillcolor="#88aaff"]
n6v1 [label="(n6v1) for_each(|x| println!(\"filtered {}\", x))", shape=house, fillcolor="#ffff88"]
n7v1 [label="(n7v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n2v1 -> n7v1
n5v1 -> n6v1
n4v1 -> n5v1
n1v1 -> n4v1
n7v1 -> n3v1 [color=red]
n5v1 -> n3v1 [color=red]
subgraph "cluster n1v1" {
fillcolor="#dddddd"
style=filled
label = "sg_1v1\nstratum 0"
n2v1
subgraph "cluster_sg_1v1_var_stream2" {
label="var stream2"
n2v1
}
}
subgraph "cluster n2v1" {
fillcolor="#dddddd"
style=filled
label = "sg_2v1\nstratum 1"
n3v1
subgraph "cluster_sg_2v1_var_sum_of_stream2" {
label="var sum_of_stream2"
n3v1
}
}
subgraph "cluster n3v1" {
fillcolor="#dddddd"
style=filled
label = "sg_3v1\nstratum 0"
n1v1
n4v1
n5v1
n6v1
subgraph "cluster_sg_3v1_var_filtered_stream1" {
label="var filtered_stream1"
n4v1
n5v1
n6v1
}
subgraph "cluster_sg_3v1_var_stream1" {
label="var stream1"
n1v1
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
---
source: hydroflow/tests/surface_singleton.rs
expression: "df.meta_graph().unwrap().to_mermaid(&Default::default())"
---
%%{init:{'theme':'base','themeVariables':{'clusterBkg':'#ddd','clusterBorder':'#888'}}}%%
flowchart TD
classDef pullClass fill:#8af,stroke:#000,text-align:left,white-space:pre
classDef pushClass fill:#ff8,stroke:#000,text-align:left,white-space:pre
classDef otherClass fill:#fdc,stroke:#000,text-align:left,white-space:pre
linkStyle default stroke:#aaa
1v1[\"(1v1) <code>source_iter(10..=30)</code>"/]:::pullClass
2v1[\"(2v1) <code>source_iter(15..=25)</code>"/]:::pullClass
3v1[/"(3v1) <code>fold(|| 0, std::ops::AddAssign::add_assign)</code>"\]:::pushClass
4v1[\"(4v1) <code>inspect(|x| println!(&quot;inspect {}&quot;, x))</code>"/]:::pullClass
5v1[\"(5v1) <code>filter(|&amp;value| { value &lt;= sum_of_stream2 })</code>"/]:::pullClass
6v1[/"(6v1) <code>for_each(|x| println!(&quot;filtered {}&quot;, x))</code>"\]:::pushClass
7v1["(7v1) <code>handoff</code>"]:::otherClass
2v1-->7v1
5v1-->6v1
4v1-->5v1
1v1-->4v1
7v1--x3v1; linkStyle 4 stroke:red
5v1--x3v1; linkStyle 5 stroke:red
subgraph sg_1v1 ["sg_1v1 stratum 0"]
2v1
subgraph sg_1v1_var_stream2 ["var <tt>stream2</tt>"]
2v1
end
end
subgraph sg_2v1 ["sg_2v1 stratum 1"]
3v1
subgraph sg_2v1_var_sum_of_stream2 ["var <tt>sum_of_stream2</tt>"]
3v1
end
end
subgraph sg_3v1 ["sg_3v1 stratum 0"]
1v1
4v1
5v1
6v1
subgraph sg_3v1_var_filtered_stream1 ["var <tt>filtered_stream1</tt>"]
4v1
5v1
6v1
end
subgraph sg_3v1_var_stream1 ["var <tt>stream1</tt>"]
1v1
end
end

59 changes: 46 additions & 13 deletions hydroflow/tests/surface_singleton.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,10 @@ pub fn test_state_ref() {
stream2 = source_iter_delta(15..=25) -> map(Max::new);
sum_of_stream2 = stream2 -> state_ref::<Max<_>>();

// Do we need to stratify between sum_of_stream2_reference and it's usage in the filter?
// Mark state as blocking and just move on.
// (Little different - output of state has to be blocking)
// For now: assume references need to be stratified (?)

filtered_stream1 = stream1
-> inspect(|x| println!("inspect {}", x))
-> filter(|value| {
// THIS IS NOT MONOTONIC AND ALSO DOENS'T WORK DUE TO SCHEDULING
// So it should be blocking (stratified)
// This is not monotonic.
value <= #sum_of_stream2.as_reveal_ref()
})
-> for_each(|x| println!("filtered {}", x));
Expand Down Expand Up @@ -51,19 +45,14 @@ pub fn test_fold_zip() {
stream2 = source_iter_delta(15..=25) -> map(Max::new);
sum_of_stream2 = stream2 -> lattice_reduce() -> tee();

// Do we need to stratify between sum_of_stream2_reference and it's usage in the filter?
// Mark state as blocking and just move on.
// (Little different - output of state has to be blocking)

filtered_stream1 = stream1
-> inspect(|x| println!("inspect {}", x))
-> [0]filtered_stream2;
sum_of_stream2 -> identity::<Max<_>>() -> [1]filtered_stream2;

filtered_stream2 = zip()
-> filter(|(value, sum_of_stream2)| {
// THIS IS NON MONOTONIC, but is ok for now because we're avoiding cycles
// So it should be blocking (stratified)
// This is not monotonic.
value <= sum_of_stream2.as_reveal_ref()
})
-> for_each(|x| println!("filtered {:?}", x));
Expand All @@ -76,3 +65,47 @@ pub fn test_fold_zip() {

df.run_available(); // Should return quickly and not hang
}

#[multiplatform_test]
pub fn test_fold_singleton() {
let mut df = hydroflow::hydroflow_syntax! {
stream1 = source_iter(10..=30);
stream2 = source_iter(15..=25);
sum_of_stream2 = stream2 -> fold(|| 0, std::ops::AddAssign::add_assign);

filtered_stream1 = stream1
-> inspect(|x| println!("inspect {}", x))
-> filter(|&value| {
// This is not monotonic.
value <= #sum_of_stream2
})
-> for_each(|x| println!("filtered {}", x));

sum_of_stream2 -> for_each(|x| println!("state {:?}", x));
};

assert_graphvis_snapshots!(df);

df.run_available();
}

#[multiplatform_test]
pub fn test_fold_singleton_push() {
let mut df = hydroflow::hydroflow_syntax! {
stream1 = source_iter(10..=30);
stream2 = source_iter(15..=25);
sum_of_stream2 = stream2 -> fold(|| 0, std::ops::AddAssign::add_assign);

filtered_stream1 = stream1
-> inspect(|x| println!("inspect {}", x))
-> filter(|&value| {
// This is not monotonic.
value <= #sum_of_stream2
})
-> for_each(|x| println!("filtered {}", x));
};

assert_graphvis_snapshots!(df);

df.run_available();
}
14 changes: 7 additions & 7 deletions hydroflow_lang/src/graph/hydroflow_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1180,20 +1180,20 @@ impl HydroflowGraph {

{
// Determine pull and push halves of the `Pivot`.
#[allow(unknown_lints)]
// https://github.com/rust-lang/rust-clippy/issues/11290
#[allow(clippy::redundant_locals)]
let pull_to_push_idx = pull_to_push_idx;
let pull_ident =
self.node_as_ident(subgraph_nodes[pull_to_push_idx - 1], false);
let pull_ident = if 0 < pull_to_push_idx {
self.node_as_ident(subgraph_nodes[pull_to_push_idx - 1], false)
} else {
// Entire subgraph is push (with a single recv/pull handoff input).
recv_ports[0].clone()
};

#[rustfmt::skip]
let push_ident = if let Some(&node_id) =
subgraph_nodes.get(pull_to_push_idx)
{
self.node_as_ident(node_id, false)
} else if 1 == send_ports.len() {
// Entire subgraph is pull, except for a single send/push handoff output.
// Entire subgraph is pull (with a single send/push handoff output).
send_ports[0].clone()
} else {
diagnostics.push(Diagnostic::spanned(
Expand Down
Loading
Loading