From 8d8b4b2288746e0aa2a95329d91297820aee7586 Mon Sep 17 00:00:00 2001 From: Shadaj Laddad Date: Fri, 8 Nov 2024 10:37:57 -0800 Subject: [PATCH] feat(hydroflow_plus)!: implicitly apply default optimizations (#1557) This also changes the behavior of `with_default_optimize` to be terminal, if users want to apply optimizations after these they should explicitly invoke the optimizations. --- .../hydroflow_plus/quickstart/clusters.mdx | 3 +- .../quickstart/first-dataflow.mdx | 12 +++-- hydroflow_plus/src/builder/built.rs | 39 +++++++--------- hydroflow_plus/src/builder/deploy.rs | 14 ++++++ hydroflow_plus/src/builder/mod.rs | 46 ++++++++++++++++++- hydroflow_plus/src/rewrites/profiler.rs | 2 +- hydroflow_plus/src/rewrites/properties.rs | 5 +- hydroflow_plus/src/stream.rs | 1 - hydroflow_plus_test/examples/compute_pi.rs | 1 - .../examples/first_ten_distributed.rs | 1 - hydroflow_plus_test/examples/map_reduce.rs | 1 - hydroflow_plus_test/examples/paxos.rs | 1 - .../examples/perf_compute_pi.rs | 1 - .../examples/simple_cluster.rs | 1 - hydroflow_plus_test/examples/two_pc.rs | 1 - hydroflow_plus_test/src/cluster/compute_pi.rs | 7 +-- hydroflow_plus_test/src/cluster/map_reduce.rs | 7 +-- .../src/cluster/paxos_bench.rs | 4 +- .../src/local/chat_app.rs | 3 +- .../src/local/compute_pi.rs | 3 +- .../src/local/count_elems.rs | 3 +- .../src/local/first_ten.rs | 3 +- .../src/local/graph_reachability.rs | 3 +- .../src/local/negation.rs | 6 +-- .../src/local/teed_join.rs | 3 +- template/hydroflow_plus/examples/first_ten.rs | 1 - .../examples/first_ten_distributed.rs | 1 - .../examples/first_ten_distributed_gcp.rs | 1 - .../src/first_ten_distributed.rs | 1 - 29 files changed, 99 insertions(+), 76 deletions(-) diff --git a/docs/docs/hydroflow_plus/quickstart/clusters.mdx b/docs/docs/hydroflow_plus/quickstart/clusters.mdx index 77c1dba62e65..cfdecec5e346 100644 --- a/docs/docs/hydroflow_plus/quickstart/clusters.mdx +++ b/docs/docs/hydroflow_plus/quickstart/clusters.mdx @@ -67,8 +67,7 @@ async fn main() { let builder = hydroflow_plus::FlowBuilder::new(); let (leader, workers) = flow::broadcast::broadcast(&builder); - flow.with_default_optimize() - .with_process(&leader, deployment.Localhost()) + flow.with_process(&leader, deployment.Localhost()) .with_cluster(&workers, (0..2) .map(|idx| deployment.Localhost()) ) diff --git a/docs/docs/hydroflow_plus/quickstart/first-dataflow.mdx b/docs/docs/hydroflow_plus/quickstart/first-dataflow.mdx index a1699536f434..16d6b00bcc70 100644 --- a/docs/docs/hydroflow_plus/quickstart/first-dataflow.mdx +++ b/docs/docs/hydroflow_plus/quickstart/first-dataflow.mdx @@ -25,20 +25,22 @@ cargo generate gh:hydro-project/hydroflow template/hydroflow_plus ## Writing a Dataflow -Hydroflow+ programs are _explicit_ about where computation takes place. So our dataflow program takes a single `&Process` parameter which is a handle to the single machine our program will run on. We can use this handle to materialize a stream using `source_iter` (which emits values from a provided collection), and then print out the values using `for_each`. +In Hydroflow+, streams are attached to a **`Location`**, which is a virtual handle to a **single machine** (the `Process` type) or **set of machines** (the `Cluster` type). To write distributed programs, a single piece of code can use multiple locations. -{getLines(firstTenSrc, 1, 7)} +Our first dataflow will run on a single machine, so we take a `&Process` parameter. We can materialize a stream on this machine using `process.source_iter` (which emits values from a provided collection), and then print out the values using `for_each`. + +{firstTenSrc} You'll notice that the arguments to `source_iter` and `for_each` are wrapped in `q!` macros. This is because Hydroflow+ uses a two-stage compilation process, where the first stage generates a deployment plan that is then compiled to individual binaries for each machine in the distributed system. The `q!` macro is used to mark Rust code that will be executed in the second stage ("runtime" code). This generally includes snippets of Rust code that are used to define static sources of data or closures that transform them. ## Running the Dataflow -Next, let's launch the dataflow program we just wrote. To do this, we'll need to write a bit more code in `examples/first_ten.rs` to configure our deployment. +Next, let's launch the dataflow program we just wrote. To do this, we'll need to write a bit more code in `examples/first_ten.rs` to configure our deployment (generally, we will place deployment scripts in `examples` because Hydro Deploy is a dev dependency). -{getLines(firstTenExampleSrc, 1, 17)} +{firstTenExampleSrc} First, we initialize a new [Hydro Deploy](../../deploy/index.md) deployment with `Deployment::new()`. Then, we create a `FlowBuilder` which will store the entire dataflow program and manage its compilation. -To get the `&Process` we provide to `first_ten`, we can call `flow.process()`. After the dataflow has been created, we optimize it using `flow.with_default_optimize()`. Then, we map our virtual `Process` to a physical deployment target using `flow.with_process` (in this case we deploy to localhost). +To create a `Process`, we call `flow.process()`. After the dataflow has been created, we must map each instantiated `Process` to a deployment target using `flow.with_process` (in this case we deploy to localhost). Finally, we call `flow.deploy(&mut deployment)` to provision the dataflow program on the target machine. This returns a struct with handles to the instantiated machines, which we must store in the `_nodes` variable to prevent them from being dropped. Then, we can start the dataflow program and block until `Ctrl-C` using `deployment.run_ctrl_c()`. diff --git a/hydroflow_plus/src/builder/built.rs b/hydroflow_plus/src/builder/built.rs index 997c5388d373..9d07cc37bd98 100644 --- a/hydroflow_plus/src/builder/built.rs +++ b/hydroflow_plus/src/builder/built.rs @@ -26,23 +26,6 @@ impl Drop for BuiltFlow<'_> { } } -impl BuiltFlow<'_> { - pub fn ir(&self) -> &Vec { - &self.ir - } - - pub fn optimize_with(mut self, f: impl FnOnce(Vec) -> Vec) -> Self { - self.used = true; - BuiltFlow { - ir: f(std::mem::take(&mut self.ir)), - processes: std::mem::take(&mut self.processes), - clusters: std::mem::take(&mut self.clusters), - used: false, - _phantom: PhantomData, - } - } -} - pub(crate) fn build_inner(ir: &mut Vec) -> BTreeMap { let mut builders = BTreeMap::new(); let mut built_tees = HashMap::new(); @@ -62,18 +45,24 @@ pub(crate) fn build_inner(ir: &mut Vec) -> BTreeMap BuiltFlow<'a> { - pub fn compile_no_network>(mut self) -> CompiledFlow<'a, D::GraphId> { - self.used = true; + pub fn ir(&self) -> &Vec { + &self.ir + } - CompiledFlow { - hydroflow_ir: build_inner(&mut self.ir), - extra_stmts: BTreeMap::new(), + pub fn optimize_with(mut self, f: impl FnOnce(Vec) -> Vec) -> Self { + self.used = true; + BuiltFlow { + ir: f(std::mem::take(&mut self.ir)), + processes: std::mem::take(&mut self.processes), + clusters: std::mem::take(&mut self.clusters), + used: false, _phantom: PhantomData, } } - pub fn with_default_optimize(self) -> BuiltFlow<'a> { + pub fn with_default_optimize>(self) -> DeployFlow<'a, D> { self.optimize_with(crate::rewrites::persist_pullup::persist_pullup) + .into_deploy() } fn into_deploy>(mut self) -> DeployFlow<'a, D> { @@ -134,6 +123,10 @@ impl<'a> BuiltFlow<'a> { self.into_deploy::().compile(env) } + pub fn compile_no_network + 'a>(self) -> CompiledFlow<'a, D::GraphId> { + self.into_deploy::().compile_no_network() + } + pub fn deploy + 'a>( self, env: &mut D::InstantiateEnv, diff --git a/hydroflow_plus/src/builder/deploy.rs b/hydroflow_plus/src/builder/deploy.rs index 07353dd0fd77..ef24877d1563 100644 --- a/hydroflow_plus/src/builder/deploy.rs +++ b/hydroflow_plus/src/builder/deploy.rs @@ -39,6 +39,10 @@ impl<'a, D: LocalDeploy<'a>> Drop for DeployFlow<'a, D> { } impl<'a, D: LocalDeploy<'a>> DeployFlow<'a, D> { + pub fn ir(&self) -> &Vec { + &self.ir + } + pub fn with_process

( mut self, process: &Process

, @@ -69,6 +73,16 @@ impl<'a, D: LocalDeploy<'a>> DeployFlow<'a, D> { .insert(cluster.id, spec.build(cluster.id, &tag_name)); self } + + pub fn compile_no_network(mut self) -> CompiledFlow<'a, D::GraphId> { + self.used = true; + + CompiledFlow { + hydroflow_ir: build_inner(&mut self.ir), + extra_stmts: BTreeMap::new(), + _phantom: PhantomData, + } + } } impl<'a, D: Deploy<'a>> DeployFlow<'a, D> { diff --git a/hydroflow_plus/src/builder/mod.rs b/hydroflow_plus/src/builder/mod.rs index a15cea44dd66..1db3581a2081 100644 --- a/hydroflow_plus/src/builder/mod.rs +++ b/hydroflow_plus/src/builder/mod.rs @@ -3,11 +3,14 @@ use std::collections::HashMap; use std::marker::PhantomData; use std::rc::Rc; +use compiled::CompiledFlow; +use deploy::{DeployFlow, DeployResult}; use stageleft::*; +use crate::deploy::{ExternalSpec, IntoProcessSpec, LocalDeploy}; use crate::ir::HfPlusLeaf; use crate::location::{Cluster, ExternalProcess, Process}; -use crate::RuntimeContext; +use crate::{ClusterSpec, Deploy, RuntimeContext}; pub mod built; pub mod compiled; @@ -98,7 +101,7 @@ impl<'a> FlowBuilder<'a> { } } - pub fn with_default_optimize(self) -> built::BuiltFlow<'a> { + pub fn with_default_optimize>(self) -> DeployFlow<'a, D> { self.finalize().with_default_optimize() } @@ -158,4 +161,43 @@ impl<'a> FlowBuilder<'a> { pub fn runtime_context(&self) -> RuntimeContext<'a> { RuntimeContext::new() } + + pub fn with_process>( + self, + process: &Process

, + spec: impl IntoProcessSpec<'a, D>, + ) -> DeployFlow<'a, D> { + self.with_default_optimize().with_process(process, spec) + } + + pub fn with_external>( + self, + process: &ExternalProcess

, + spec: impl ExternalSpec<'a, D>, + ) -> DeployFlow<'a, D> { + self.with_default_optimize().with_external(process, spec) + } + + pub fn with_cluster>( + self, + cluster: &Cluster, + spec: impl ClusterSpec<'a, D>, + ) -> DeployFlow<'a, D> { + self.with_default_optimize().with_cluster(cluster, spec) + } + + pub fn compile + 'a>(self, env: &D::CompileEnv) -> CompiledFlow<'a, D::GraphId> { + self.with_default_optimize::().compile(env) + } + + pub fn compile_no_network + 'a>(self) -> CompiledFlow<'a, D::GraphId> { + self.with_default_optimize::().compile_no_network() + } + + pub fn deploy + 'a>( + self, + env: &mut D::InstantiateEnv, + ) -> DeployResult<'a, D> { + self.with_default_optimize().deploy(env) + } } diff --git a/hydroflow_plus/src/rewrites/profiler.rs b/hydroflow_plus/src/rewrites/profiler.rs index a2627567653c..be9b030bd83b 100644 --- a/hydroflow_plus/src/rewrites/profiler.rs +++ b/hydroflow_plus/src/rewrites/profiler.rs @@ -105,7 +105,7 @@ mod tests { let counter_queue = RuntimeData::new("Fake"); let pushed_down = built - .with_default_optimize() + .optimize_with(crate::rewrites::persist_pullup::persist_pullup) .optimize_with(|ir| super::profiling(ir, runtime_context, counters, counter_queue)); insta::assert_debug_snapshot!(&pushed_down.ir()); diff --git a/hydroflow_plus/src/rewrites/properties.rs b/hydroflow_plus/src/rewrites/properties.rs index ecb6bcbe42a4..5d5ce0d3edf5 100644 --- a/hydroflow_plus/src/rewrites/properties.rs +++ b/hydroflow_plus/src/rewrites/properties.rs @@ -118,12 +118,11 @@ mod tests { .for_each(q!(|(string, count)| println!("{}: {}", string, count))); let built = flow - .finalize() .optimize_with(|ir| properties_optimize(ir, &database)) - .with_default_optimize(); + .with_default_optimize::(); insta::assert_debug_snapshot!(built.ir()); - let _ = built.compile_no_network::(); + let _ = built.compile_no_network(); } } diff --git a/hydroflow_plus/src/stream.rs b/hydroflow_plus/src/stream.rs index aa1193e2e859..c6037d2b9924 100644 --- a/hydroflow_plus/src/stream.rs +++ b/hydroflow_plus/src/stream.rs @@ -864,7 +864,6 @@ mod tests { .send_bincode_external(&external); let nodes = flow - .with_default_optimize() .with_process(&first_node, deployment.Localhost()) .with_process(&second_node, deployment.Localhost()) .with_external(&external, deployment.Localhost()) diff --git a/hydroflow_plus_test/examples/compute_pi.rs b/hydroflow_plus_test/examples/compute_pi.rs index e7844e1c2c4e..69ec6df97425 100644 --- a/hydroflow_plus_test/examples/compute_pi.rs +++ b/hydroflow_plus_test/examples/compute_pi.rs @@ -42,7 +42,6 @@ async fn main() { let (cluster, leader) = hydroflow_plus_test::cluster::compute_pi::compute_pi(&builder, 8192); let _nodes = builder - .with_default_optimize() .with_process( &leader, TrybuildHost::new(create_host(&mut deployment)).rustflags(rustflags), diff --git a/hydroflow_plus_test/examples/first_ten_distributed.rs b/hydroflow_plus_test/examples/first_ten_distributed.rs index ddbcd5c83e6d..109464ad4f92 100644 --- a/hydroflow_plus_test/examples/first_ten_distributed.rs +++ b/hydroflow_plus_test/examples/first_ten_distributed.rs @@ -43,7 +43,6 @@ async fn main() { let (external_process, external_port, p1, p2) = hydroflow_plus_test::distributed::first_ten::first_ten_distributed(&builder); let nodes = builder - .with_default_optimize() .with_process( &p1, TrybuildHost::new(create_host(&mut deployment)).rustflags(rustflags), diff --git a/hydroflow_plus_test/examples/map_reduce.rs b/hydroflow_plus_test/examples/map_reduce.rs index 3ccc8ea48969..960fc032277b 100644 --- a/hydroflow_plus_test/examples/map_reduce.rs +++ b/hydroflow_plus_test/examples/map_reduce.rs @@ -41,7 +41,6 @@ async fn main() { let builder = hydroflow_plus::FlowBuilder::new(); let (leader, cluster) = hydroflow_plus_test::cluster::map_reduce::map_reduce(&builder); let _nodes = builder - .with_default_optimize() .with_process( &leader, TrybuildHost::new(create_host(&mut deployment)).rustflags(rustflags), diff --git a/hydroflow_plus_test/examples/paxos.rs b/hydroflow_plus_test/examples/paxos.rs index 06efd212cdf3..90bf86299b7a 100644 --- a/hydroflow_plus_test/examples/paxos.rs +++ b/hydroflow_plus_test/examples/paxos.rs @@ -56,7 +56,6 @@ async fn main() { let rustflags = "-C opt-level=3 -C codegen-units=1 -C strip=none -C debuginfo=2 -C lto=off"; let _nodes = builder - .with_default_optimize() .with_cluster( &proposers, (0..f + 1) diff --git a/hydroflow_plus_test/examples/perf_compute_pi.rs b/hydroflow_plus_test/examples/perf_compute_pi.rs index d92c77c4dd4d..8382e0b18fc2 100644 --- a/hydroflow_plus_test/examples/perf_compute_pi.rs +++ b/hydroflow_plus_test/examples/perf_compute_pi.rs @@ -51,7 +51,6 @@ async fn main() { // .ir()); let _nodes = builder - .with_default_optimize() .with_process( &leader, TrybuildHost::new(create_host(&mut deployment)) diff --git a/hydroflow_plus_test/examples/simple_cluster.rs b/hydroflow_plus_test/examples/simple_cluster.rs index 8fb08e5b6a7d..05fe9ea71508 100644 --- a/hydroflow_plus_test/examples/simple_cluster.rs +++ b/hydroflow_plus_test/examples/simple_cluster.rs @@ -42,7 +42,6 @@ async fn main() { let (process, cluster) = hydroflow_plus_test::cluster::simple_cluster::simple_cluster(&builder); let _nodes = builder - .with_default_optimize() .with_process( &process, TrybuildHost::new(create_host(&mut deployment)).rustflags(rustflags), diff --git a/hydroflow_plus_test/examples/two_pc.rs b/hydroflow_plus_test/examples/two_pc.rs index 23f7a6b64239..aa3065571cca 100644 --- a/hydroflow_plus_test/examples/two_pc.rs +++ b/hydroflow_plus_test/examples/two_pc.rs @@ -15,7 +15,6 @@ async fn main() { let _rustflags = "-C opt-level=3 -C codegen-units=1 -C strip=none -C debuginfo=2 -C lto=off"; let _nodes = builder - .with_default_optimize() .with_process(&coordinator, TrybuildHost::new(deployment.Localhost())) .with_cluster( &participants, diff --git a/hydroflow_plus_test/src/cluster/compute_pi.rs b/hydroflow_plus_test/src/cluster/compute_pi.rs index fbbf2a4d2671..6bfdbda2764c 100644 --- a/hydroflow_plus_test/src/cluster/compute_pi.rs +++ b/hydroflow_plus_test/src/cluster/compute_pi.rs @@ -56,14 +56,11 @@ mod tests { fn compute_pi_ir() { let builder = hydroflow_plus::FlowBuilder::new(); let _ = super::compute_pi(&builder, 8192); - let built = builder.with_default_optimize(); + let built = builder.with_default_optimize::(); insta::assert_debug_snapshot!(built.ir()); - for (id, ir) in built - .compile::(&RuntimeData::new("FAKE")) - .hydroflow_ir() - { + for (id, ir) in built.compile(&RuntimeData::new("FAKE")).hydroflow_ir() { insta::with_settings!({snapshot_suffix => format!("surface_graph_{id}")}, { insta::assert_snapshot!(ir.surface_syntax_string()); }); diff --git a/hydroflow_plus_test/src/cluster/map_reduce.rs b/hydroflow_plus_test/src/cluster/map_reduce.rs index cf9e558c3daa..27ebdc7eb598 100644 --- a/hydroflow_plus_test/src/cluster/map_reduce.rs +++ b/hydroflow_plus_test/src/cluster/map_reduce.rs @@ -50,14 +50,11 @@ mod tests { fn map_reduce_ir() { let builder = hydroflow_plus::FlowBuilder::new(); let _ = super::map_reduce(&builder); - let built = builder.with_default_optimize(); + let built = builder.with_default_optimize::(); insta::assert_debug_snapshot!(built.ir()); - for (id, ir) in built - .compile::(&RuntimeData::new("FAKE")) - .hydroflow_ir() - { + for (id, ir) in built.compile(&RuntimeData::new("FAKE")).hydroflow_ir() { insta::with_settings!({snapshot_suffix => format!("surface_graph_{id}")}, { insta::assert_snapshot!(ir.surface_syntax_string()); }); diff --git a/hydroflow_plus_test/src/cluster/paxos_bench.rs b/hydroflow_plus_test/src/cluster/paxos_bench.rs index a651eebe02c8..c602512bf2f4 100644 --- a/hydroflow_plus_test/src/cluster/paxos_bench.rs +++ b/hydroflow_plus_test/src/cluster/paxos_bench.rs @@ -267,12 +267,12 @@ mod tests { fn paxos_ir() { let builder = hydroflow_plus::FlowBuilder::new(); let _ = super::paxos_bench(&builder, 1, 1, 1, 1, 1, 1, 1); - let built = builder.with_default_optimize(); + let built = builder.with_default_optimize::(); hydroflow_plus::ir::dbg_dedup_tee(|| { insta::assert_debug_snapshot!(built.ir()); }); - let _ = built.compile::(&RuntimeData::new("FAKE")); + let _ = built.compile(&RuntimeData::new("FAKE")); } } diff --git a/hydroflow_plus_test_local/src/local/chat_app.rs b/hydroflow_plus_test_local/src/local/chat_app.rs index dc87f8a680c1..2c1d82f9ce3f 100644 --- a/hydroflow_plus_test_local/src/local/chat_app.rs +++ b/hydroflow_plus_test_local/src/local/chat_app.rs @@ -38,8 +38,7 @@ pub fn chat_app<'a>( output.send(t).unwrap(); })); - flow.with_default_optimize() - .compile_no_network::() + flow.compile_no_network::() } #[stageleft::runtime] diff --git a/hydroflow_plus_test_local/src/local/compute_pi.rs b/hydroflow_plus_test_local/src/local/compute_pi.rs index c0cc525e647e..131c529111bf 100644 --- a/hydroflow_plus_test_local/src/local/compute_pi.rs +++ b/hydroflow_plus_test_local/src/local/compute_pi.rs @@ -46,6 +46,5 @@ pub fn compute_pi_runtime<'a>( batch_size: RuntimeData, ) -> impl Quoted<'a, Hydroflow<'a>> { let _ = compute_pi(&flow, batch_size); - flow.with_default_optimize() - .compile_no_network::() + flow.compile_no_network::() } diff --git a/hydroflow_plus_test_local/src/local/count_elems.rs b/hydroflow_plus_test_local/src/local/count_elems.rs index de967ca18bc0..275c956d7e52 100644 --- a/hydroflow_plus_test_local/src/local/count_elems.rs +++ b/hydroflow_plus_test_local/src/local/count_elems.rs @@ -23,8 +23,7 @@ pub fn count_elems_generic<'a, T: 'a>( output.send(v).unwrap(); })); - flow.with_default_optimize() - .compile_no_network::() + flow.compile_no_network::() } #[stageleft::entry] diff --git a/hydroflow_plus_test_local/src/local/first_ten.rs b/hydroflow_plus_test_local/src/local/first_ten.rs index 20c674a691e4..867df566ce38 100644 --- a/hydroflow_plus_test_local/src/local/first_ten.rs +++ b/hydroflow_plus_test_local/src/local/first_ten.rs @@ -10,8 +10,7 @@ pub fn first_ten(flow: &FlowBuilder) { #[stageleft::entry] pub fn first_ten_runtime<'a>(flow: FlowBuilder<'a>) -> impl Quoted<'a, Hydroflow<'a>> { first_ten(&flow); - flow.with_default_optimize() - .compile_no_network::() + flow.compile_no_network::() } #[stageleft::runtime] diff --git a/hydroflow_plus_test_local/src/local/graph_reachability.rs b/hydroflow_plus_test_local/src/local/graph_reachability.rs index 928a9157c2d1..b03aefb230e2 100644 --- a/hydroflow_plus_test_local/src/local/graph_reachability.rs +++ b/hydroflow_plus_test_local/src/local/graph_reachability.rs @@ -29,8 +29,7 @@ pub fn graph_reachability<'a>( reached_out.send(v).unwrap(); })); - flow.with_default_optimize() - .compile_no_network::() + flow.compile_no_network::() } #[stageleft::runtime] diff --git a/hydroflow_plus_test_local/src/local/negation.rs b/hydroflow_plus_test_local/src/local/negation.rs index 667bec86c572..b771dd760353 100644 --- a/hydroflow_plus_test_local/src/local/negation.rs +++ b/hydroflow_plus_test_local/src/local/negation.rs @@ -27,8 +27,7 @@ pub fn test_difference<'a>( output.send(v).unwrap(); })); - flow.with_default_optimize() - .compile_no_network::() + flow.compile_no_network::() } #[stageleft::entry] @@ -58,8 +57,7 @@ pub fn test_anti_join<'a>( output.send(v.0).unwrap(); })); - flow.with_default_optimize() - .compile_no_network::() + flow.compile_no_network::() } #[stageleft::runtime] diff --git a/hydroflow_plus_test_local/src/local/teed_join.rs b/hydroflow_plus_test_local/src/local/teed_join.rs index 3d1283528af9..3771e481a7e7 100644 --- a/hydroflow_plus_test_local/src/local/teed_join.rs +++ b/hydroflow_plus_test_local/src/local/teed_join.rs @@ -41,8 +41,7 @@ pub fn teed_join<'a, S: Stream + Unpin + 'a>( output.send(v).unwrap(); })); - flow.with_default_optimize() - .compile_no_network::() + flow.compile_no_network::() .with_dynamic_id(subgraph_id) } diff --git a/template/hydroflow_plus/examples/first_ten.rs b/template/hydroflow_plus/examples/first_ten.rs index ada07db93fbf..51d614431fed 100644 --- a/template/hydroflow_plus/examples/first_ten.rs +++ b/template/hydroflow_plus/examples/first_ten.rs @@ -9,7 +9,6 @@ async fn main() { hydroflow_plus_template::first_ten::first_ten(&process); let _nodes = flow - .with_default_optimize() .with_process(&process, deployment.Localhost()) .deploy(&mut deployment); diff --git a/template/hydroflow_plus/examples/first_ten_distributed.rs b/template/hydroflow_plus/examples/first_ten_distributed.rs index e75bdbb1ecd6..472a6c7774d6 100644 --- a/template/hydroflow_plus/examples/first_ten_distributed.rs +++ b/template/hydroflow_plus/examples/first_ten_distributed.rs @@ -8,7 +8,6 @@ async fn main() { let (p1, p2) = hydroflow_plus_template::first_ten_distributed::first_ten_distributed(&flow); let _nodes = flow - .with_default_optimize() .with_process(&p1, deployment.Localhost()) .with_process(&p2, deployment.Localhost()) .deploy(&mut deployment); diff --git a/template/hydroflow_plus/examples/first_ten_distributed_gcp.rs b/template/hydroflow_plus/examples/first_ten_distributed_gcp.rs index 3caf775ce50d..4193dc98ef95 100644 --- a/template/hydroflow_plus/examples/first_ten_distributed_gcp.rs +++ b/template/hydroflow_plus/examples/first_ten_distributed_gcp.rs @@ -21,7 +21,6 @@ async fn main() { let (p1, p2) = hydroflow_plus_template::first_ten_distributed::first_ten_distributed(&flow); let _nodes = flow - .with_default_optimize() .with_process( &p1, TrybuildHost::new( diff --git a/template/hydroflow_plus/src/first_ten_distributed.rs b/template/hydroflow_plus/src/first_ten_distributed.rs index c4d8d1cbcccc..5385d002c726 100644 --- a/template/hydroflow_plus/src/first_ten_distributed.rs +++ b/template/hydroflow_plus/src/first_ten_distributed.rs @@ -31,7 +31,6 @@ mod tests { let (p1, p2) = super::first_ten_distributed(&flow); let nodes = flow - .with_default_optimize() .with_process(&p1, localhost.clone()) .with_process(&p2, localhost.clone()) .deploy(&mut deployment);