diff --git a/Cargo.lock b/Cargo.lock index 4bd48267f580..f1a183bd8dfa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1553,11 +1553,12 @@ dependencies = [ [[package]] name = "indicatif" -version = "0.17.3" +version = "0.17.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cef509aa9bc73864d6756f0d34d35504af3cf0844373afe9b8669a5b8005a729" +checksum = "0b297dc40733f23a0e52728a58fa9489a5b7638a324932de16b41adc3ef80730" dependencies = [ "console", + "instant", "number_prefix", "portable-atomic", "unicode-width", @@ -2163,9 +2164,9 @@ dependencies = [ [[package]] name = "portable-atomic" -version = "0.3.19" +version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26f6a7b87c2e435a3241addceeeff740ff8b7e76b74c13bf9acb17fa454ea00b" +checksum = "31114a898e107c51bb1609ffaf55a0e011cf6a4d7f1170d0015a165082c0338b" [[package]] name = "ppv-lite86" @@ -3297,6 +3298,21 @@ dependencies = [ "winnow", ] +[[package]] +name = "topolotree" +version = "0.0.0" +dependencies = [ + "dashmap", + "futures", + "hydroflow", + "hydroflow_datalog", + "procinfo", + "rand 0.8.5", + "serde", + "serde_json", + "tokio", +] + [[package]] name = "tracing" version = "0.1.37" diff --git a/Cargo.toml b/Cargo.toml index 0597fc3e45ac..1ef74afd2113 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ members = [ "stageleft_test", "stageleft_test_macro", "stageleft_tool", + "topolotree", "variadics", "website_playground", ] diff --git a/hydro_cli/Cargo.toml b/hydro_cli/Cargo.toml index 64c799933a23..ea1501d08516 100644 --- a/hydro_cli/Cargo.toml +++ b/hydro_cli/Cargo.toml @@ -38,7 +38,7 @@ nanoid = "0.4.0" ctrlc = "3.2.5" nix = "0.26.2" hydroflow_cli_integration = { path = "../hydroflow_cli_integration", version = "^0.3.0" } -indicatif = "0.17.3" +indicatif = "0.17.6" cargo_metadata = "0.15.4" [dev-dependencies] diff --git a/hydro_cli/hydro/_core.pyi b/hydro_cli/hydro/_core.pyi index 7c0f491c9c74..5f561ce846a3 100644 --- a/hydro_cli/hydro/_core.pyi +++ b/hydro_cli/hydro/_core.pyi @@ -20,7 +20,7 @@ class Deployment(object): def CustomService(self, on: "Host", external_ports: List[int]) -> "CustomService": ... - def HydroflowCrate(self, src: str, on: "Host", example: Optional[str] = None, profile: Optional[str] = None, features: Optional[List[str]] = None, args: Optional[List[str]] = None, display_id: Optional[str] = None, external_ports: Optional[List[int]] = None) -> "HydroflowCrate": ... + def HydroflowCrate(self, src: str, on: "Host", bin: Optional[str] = None, example: Optional[str] = None, profile: Optional[str] = None, features: Optional[List[str]] = None, args: Optional[List[str]] = None, display_id: Optional[str] = None, external_ports: Optional[List[int]] = None) -> "HydroflowCrate": ... async def deploy(self): ... diff --git a/hydro_cli/src/core/custom_service.rs b/hydro_cli/src/core/custom_service.rs index 98cf030ca25e..a55eecb4d702 100644 --- a/hydro_cli/src/core/custom_service.rs +++ b/hydro_cli/src/core/custom_service.rs @@ -65,7 +65,9 @@ impl Service for CustomService { Ok(()) } - async fn start(&mut self) {} + async fn start(&mut self) -> Result<()> { + Ok(()) + } async fn stop(&mut self) -> Result<()> { Ok(()) diff --git a/hydro_cli/src/core/deployment.rs b/hydro_cli/src/core/deployment.rs index 331470c70317..11390a274675 100644 --- a/hydro_cli/src/core/deployment.rs +++ b/hydro_cli/src/core/deployment.rs @@ -17,7 +17,7 @@ pub struct Deployment { impl Deployment { pub async fn deploy(&mut self) -> Result<()> { - progress::ProgressTracker::with_group("deploy", || async { + progress::ProgressTracker::with_group("deploy", None, || async { let mut resource_batch = super::ResourceBatch::new(); let active_services = self .services @@ -41,7 +41,7 @@ impl Deployment { } let result = Arc::new( - progress::ProgressTracker::with_group("provision", || async { + progress::ProgressTracker::with_group("provision", None, || async { resource_batch .provision(&mut self.resource_pool, self.last_resource_result.clone()) .await @@ -50,7 +50,7 @@ impl Deployment { ); self.last_resource_result = Some(result.clone()); - progress::ProgressTracker::with_group("provision", || { + progress::ProgressTracker::with_group("provision", None, || { let hosts_provisioned = self.hosts .iter_mut() @@ -61,7 +61,7 @@ impl Deployment { }) .await; - progress::ProgressTracker::with_group("deploy", || { + progress::ProgressTracker::with_group("deploy", None, || { let services_future = self.services .iter_mut() @@ -79,7 +79,7 @@ impl Deployment { }) .await; - progress::ProgressTracker::with_group("ready", || { + progress::ProgressTracker::with_group("ready", None, || { let all_services_ready = self.services .iter() @@ -97,7 +97,7 @@ impl Deployment { .await } - pub async fn start(&mut self) { + pub async fn start(&mut self) -> Result<()> { let active_services = self .services .iter() @@ -110,10 +110,12 @@ impl Deployment { self.services .iter() .map(|service: &Weak>| async { - service.upgrade().unwrap().write().await.start().await; + service.upgrade().unwrap().write().await.start().await?; + Ok(()) as Result<()> }); - futures::future::join_all(all_services_start).await; + futures::future::try_join_all(all_services_start).await?; + Ok(()) } pub fn add_host T>( diff --git a/hydro_cli/src/core/gcp.rs b/hydro_cli/src/core/gcp.rs index aabbc066d9e0..1dd250f838b6 100644 --- a/hydro_cli/src/core/gcp.rs +++ b/hydro_cli/src/core/gcp.rs @@ -92,35 +92,44 @@ impl LaunchedSSHHost for LaunchedComputeEngine { 22, ); - let res = ProgressTracker::leaf( - format!( - "connecting to host @ {}", - self.external_ip.as_ref().unwrap() - ), - async_retry( - &|| async { - let mut config = SessionConfiguration::new(); - config.set_compress(true); - - let mut session = - AsyncSession::::connect(target_addr, Some(config)).await?; - - session.handshake().await?; - - session - .userauth_pubkey_file( - self.user.as_str(), - None, - self.ssh_key_path().as_path(), - None, - ) - .await?; - - Ok(session) - }, - 10, - Duration::from_secs(1), - ), + let mut attempt_count = 0; + + let res = async_retry( + || { + attempt_count += 1; + ProgressTracker::leaf( + format!( + "connecting to host @ {} (attempt: {})", + self.external_ip.as_ref().unwrap(), + attempt_count + ), + async { + let mut config = SessionConfiguration::new(); + config.set_compress(true); + + let mut session = + AsyncSession::::connect(target_addr, Some(config)).await?; + + tokio::time::timeout(Duration::from_secs(15), async move { + session.handshake().await?; + + session + .userauth_pubkey_file( + self.user.as_str(), + None, + self.ssh_key_path().as_path(), + None, + ) + .await?; + + Ok(session) + }) + .await? + }, + ) + }, + 10, + Duration::from_secs(1), ) .await?; diff --git a/hydro_cli/src/core/hydroflow_crate/build.rs b/hydro_cli/src/core/hydroflow_crate/build.rs index 5833299afdc8..a681a45e8754 100644 --- a/hydro_cli/src/core/hydroflow_crate/build.rs +++ b/hydro_cli/src/core/hydroflow_crate/build.rs @@ -12,13 +12,15 @@ use tokio::sync::OnceCell; use crate::core::progress::ProgressTracker; use crate::core::HostTargetType; -type CacheKey = ( - PathBuf, - Option, - Option, - HostTargetType, - Option>, -); +#[derive(PartialEq, Eq, Hash)] +struct CacheKey { + src: PathBuf, + bin: Option, + example: Option, + profile: Option, + target_type: HostTargetType, + features: Option>, +} pub type BuiltCrate = Arc<(String, Vec, PathBuf)>; @@ -27,18 +29,21 @@ static BUILDS: Lazy>>>> = pub async fn build_crate( src: PathBuf, + bin: Option, example: Option, profile: Option, target_type: HostTargetType, features: Option>, ) -> Result { - let key = ( - src.clone(), - example.clone(), - profile.clone(), + let key = CacheKey { + src: src.clone(), + bin: bin.clone(), + example: example.clone(), + profile: profile.clone(), target_type, - features.clone(), - ); + features: features.clone(), + }; + let unit_of_work = { let mut builds = BUILDS.lock().unwrap(); builds.entry(key).or_default().clone() @@ -56,6 +61,10 @@ pub async fn build_crate( profile.unwrap_or("release".to_string()), ]); + if let Some(bin) = bin.as_ref() { + command.args(["--bin", bin]); + } + if let Some(example) = example.as_ref() { command.args(["--example", example]); } diff --git a/hydro_cli/src/core/hydroflow_crate/mod.rs b/hydro_cli/src/core/hydroflow_crate/mod.rs index 7b379c4ffaf4..19f1d6f034e8 100644 --- a/hydro_cli/src/core/hydroflow_crate/mod.rs +++ b/hydro_cli/src/core/hydroflow_crate/mod.rs @@ -24,6 +24,7 @@ pub struct HydroflowCrate { id: usize, src: PathBuf, on: Arc>, + bin: Option, example: Option, profile: Option, features: Option>, @@ -55,6 +56,7 @@ impl HydroflowCrate { id: usize, src: PathBuf, on: Arc>, + bin: Option, example: Option, profile: Option, features: Option>, @@ -65,6 +67,7 @@ impl HydroflowCrate { Self { id, src, + bin, on, example, profile, @@ -162,6 +165,7 @@ impl HydroflowCrate { fn build(&mut self) -> JoinHandle> { let src_cloned = self.src.canonicalize().unwrap(); + let bin_cloned = self.bin.clone(); let example_cloned = self.example.clone(); let features_cloned = self.features.clone(); let host = self.on.clone(); @@ -170,6 +174,7 @@ impl HydroflowCrate { tokio::task::spawn(build_crate( src_cloned, + bin_cloned, example_cloned, profile_cloned, target_type, @@ -213,6 +218,7 @@ impl Service for HydroflowCrate { .display_id .clone() .unwrap_or_else(|| format!("service/{}", self.id)), + None, || async { let mut host_write = self.on.write().await; let launched = host_write.provision(resource_result); @@ -232,6 +238,7 @@ impl Service for HydroflowCrate { .display_id .clone() .unwrap_or_else(|| format!("service/{}", self.id)), + None, || async { let launched_host = self.launched_host.as_ref().unwrap(); @@ -256,7 +263,7 @@ impl Service for HydroflowCrate { let formatted_bind_config = serde_json::to_string(&bind_config).unwrap(); // request stdout before sending config so we don't miss the "ready" response - let stdout_receiver = binary.write().await.stdout().await; + let stdout_receiver = binary.write().await.cli_stdout().await; binary .write() @@ -286,9 +293,9 @@ impl Service for HydroflowCrate { .await } - async fn start(&mut self) { + async fn start(&mut self) -> Result<()> { if self.started { - return; + return Ok(()); } let mut sink_ports = HashMap::new(); @@ -298,6 +305,15 @@ impl Service for HydroflowCrate { let formatted_defns = serde_json::to_string(&sink_ports).unwrap(); + let stdout_receiver = self + .launched_binary + .as_mut() + .unwrap() + .write() + .await + .cli_stdout() + .await; + self.launched_binary .as_mut() .unwrap() @@ -309,7 +325,17 @@ impl Service for HydroflowCrate { .await .unwrap(); + let start_ack_line = ProgressTracker::leaf( + "waiting for ack start".to_string(), + tokio::time::timeout(Duration::from_secs(60), stdout_receiver.recv()), + ) + .await??; + if !start_ack_line.starts_with("ack start") { + bail!("expected ack start"); + } + self.started = true; + Ok(()) } async fn stop(&mut self) -> Result<()> { diff --git a/hydro_cli/src/core/hydroflow_crate/ports.rs b/hydro_cli/src/core/hydroflow_crate/ports.rs index 7c52a5ab981b..ba279bc44104 100644 --- a/hydro_cli/src/core/hydroflow_crate/ports.rs +++ b/hydro_cli/src/core/hydroflow_crate/ports.rs @@ -544,7 +544,6 @@ impl ServerConfig { ServerConfig::TaggedUnwrap(underlying) => { let loaded = underlying.load_instantiated(select).await; - dbg!(&loaded); if let ServerPort::Tagged(underlying, _) = loaded { *underlying } else { diff --git a/hydro_cli/src/core/localhost.rs b/hydro_cli/src/core/localhost.rs index fa565874fe78..d2546d49200b 100644 --- a/hydro_cli/src/core/localhost.rs +++ b/hydro_cli/src/core/localhost.rs @@ -22,6 +22,7 @@ use super::{ struct LaunchedLocalhostBinary { child: RwLock, stdin_sender: Sender, + stdout_cli_receivers: Arc>>>, stdout_receivers: Arc>>>, stderr_receivers: Arc>>>, } @@ -32,6 +33,13 @@ impl LaunchedBinary for LaunchedLocalhostBinary { self.stdin_sender.clone() } + async fn cli_stdout(&self) -> Receiver { + let mut receivers = self.stdout_cli_receivers.write().await; + let (sender, receiver) = async_channel::unbounded::(); + receivers.push(sender); + receiver + } + async fn stdout(&self) -> Receiver { let mut receivers = self.stdout_receivers.write().await; let (sender, receiver) = async_channel::unbounded::(); @@ -69,17 +77,39 @@ impl LaunchedBinary for LaunchedLocalhostBinary { struct LaunchedLocalhost {} +type CLIAndMain = ( + Arc>>>, + Arc>>>, +); + pub fn create_broadcast( source: T, default: impl Fn(String) + Send + 'static, -) -> Arc>>> { +) -> CLIAndMain { + let cli_receivers = Arc::new(RwLock::new(Vec::>::new())); let receivers = Arc::new(RwLock::new(Vec::>::new())); + + let weak_cli_receivers = Arc::downgrade(&cli_receivers); let weak_receivers = Arc::downgrade(&receivers); tokio::spawn(async move { let mut lines = BufReader::new(source).lines(); - while let Some(Result::Ok(line)) = lines.next().await { + 'line_loop: while let Some(Result::Ok(line)) = lines.next().await { + if let Some(cli_receivers) = weak_cli_receivers.upgrade() { + let mut cli_receivers = cli_receivers.write().await; + let mut successful_send = false; + for r in cli_receivers.iter() { + successful_send |= r.send(line.clone()).await.is_ok(); + } + + cli_receivers.retain(|r| !r.is_closed()); + + if successful_send { + continue 'line_loop; + } + } + if let Some(receivers) = weak_receivers.upgrade() { let mut receivers = receivers.write().await; let mut successful_send = false; @@ -98,7 +128,7 @@ pub fn create_broadcast( } }); - receivers + (cli_receivers, receivers) } #[async_trait] @@ -158,16 +188,18 @@ impl LaunchedHost for LaunchedLocalhost { }); let id_clone = id.clone(); - let stdout_receivers = create_broadcast(child.stdout.take().unwrap(), move |s| { - println!("[{id_clone}] {s}") - }); - let stderr_receivers = create_broadcast(child.stderr.take().unwrap(), move |s| { + let (stdout_cli_receivers, stdout_receivers) = + create_broadcast(child.stdout.take().unwrap(), move |s| { + println!("[{id_clone}] {s}") + }); + let (_, stderr_receivers) = create_broadcast(child.stderr.take().unwrap(), move |s| { eprintln!("[{id}] {s}") }); Ok(Arc::new(RwLock::new(LaunchedLocalhostBinary { child: RwLock::new(child), stdin_sender, + stdout_cli_receivers, stdout_receivers, stderr_receivers, }))) diff --git a/hydro_cli/src/core/mod.rs b/hydro_cli/src/core/mod.rs index c28d1f0fe437..9b6fafdbe43d 100644 --- a/hydro_cli/src/core/mod.rs +++ b/hydro_cli/src/core/mod.rs @@ -69,6 +69,13 @@ pub struct ResourceResult { #[async_trait] pub trait LaunchedBinary: Send + Sync { async fn stdin(&self) -> Sender; + + /// Provides a channel for the CLI to handshake with the binary, + /// with the guarantee that as long as the CLI is holding on + /// to a handle, none of the messages will also be broadcast + /// to the user-facing [`LaunchedBinary::stdout`] channel. + async fn cli_stdout(&self) -> Receiver; + async fn stdout(&self) -> Receiver; async fn stderr(&self) -> Receiver; @@ -186,7 +193,7 @@ pub trait Service: Send + Sync { async fn ready(&mut self) -> Result<()>; /// Starts the service by having it connect to other services and start computations. - async fn start(&mut self); + async fn start(&mut self) -> Result<()>; /// Stops the service by having it disconnect from other services and stop computations. async fn stop(&mut self) -> Result<()>; diff --git a/hydro_cli/src/core/progress.rs b/hydro_cli/src/core/progress.rs index 05a7d0d8df30..8e02c199310f 100644 --- a/hydro_cli/src/core/progress.rs +++ b/hydro_cli/src/core/progress.rs @@ -12,7 +12,6 @@ tokio::task_local! { #[derive(Clone, PartialEq, Eq, Debug)] pub enum LeafStatus { - Queued, Started, Finished, } @@ -20,7 +19,12 @@ pub enum LeafStatus { #[derive(Debug)] pub enum BarTree { Root(Vec), - Group(String, Arc, Vec), + Group( + String, + Arc, + Vec, + Option, + ), Leaf(String, Arc, LeafStatus), Finished, } @@ -29,26 +33,22 @@ impl BarTree { fn get_pb(&self) -> Option<&Arc> { match self { BarTree::Root(_) => None, - BarTree::Group(_, pb, _) | BarTree::Leaf(_, pb, _) => Some(pb), + BarTree::Group(_, pb, _, _) | BarTree::Leaf(_, pb, _) => Some(pb), BarTree::Finished => None, } } fn status(&self) -> LeafStatus { match self { - BarTree::Root(children) | BarTree::Group(_, _, children) => { - if children - .iter() - .all(|child| child.status() == LeafStatus::Finished) + BarTree::Root(children) | BarTree::Group(_, _, children, _) => { + if !children.is_empty() + && children + .iter() + .all(|child| child.status() == LeafStatus::Finished) { LeafStatus::Finished - } else if children - .iter() - .any(|child| child.status() == LeafStatus::Started) - { - LeafStatus::Started } else { - LeafStatus::Queued + LeafStatus::Started } } BarTree::Leaf(_, _, status) => status.clone(), @@ -63,7 +63,7 @@ impl BarTree { child.refresh_prefix(cur_path); } } - BarTree::Group(name, pb, children) => { + BarTree::Group(name, pb, children, anticipated_total) => { let mut path_with_group = cur_path.to_vec(); path_with_group.push(name.clone()); @@ -75,18 +75,26 @@ impl BarTree { .iter() .filter(|child| child.status() == LeafStatus::Started) .count(); - let queued_count = children - .iter() - .filter(|child| child.status() == LeafStatus::Queued) - .count(); - - pb.set_prefix(format!( - "{} ({}/{}/{})", - path_with_group.join(" / "), - finished_count, - started_count, - queued_count - )); + let queued_count = + anticipated_total.map(|total| total - finished_count - started_count); + + match queued_count { + Some(queued_count) => { + pb.set_prefix(format!( + "{} ({}/{}/{})", + path_with_group.join(" / "), + finished_count, + started_count, + queued_count + )); + } + None => pb.set_prefix(format!( + "{} ({}/{})", + path_with_group.join(" / "), + finished_count, + started_count + )), + } for child in children { child.refresh_prefix(&path_with_group); } @@ -106,7 +114,7 @@ impl BarTree { } match self { - BarTree::Root(children) | BarTree::Group(_, _, children) => { + BarTree::Root(children) | BarTree::Group(_, _, children, _) => { children[path[0]].find_node(&path[1..]) } _ => panic!(), @@ -134,12 +142,13 @@ impl ProgressTracker { under_path: Vec, name: String, group: bool, + anticipated_total: Option, progress: bool, ) -> (usize, Arc) { let surrounding = self.tree.find_node(&under_path); let (surrounding_children, surrounding_pb) = match surrounding { BarTree::Root(children) => (children, None), - BarTree::Group(_, pb, children) => (children, Some(pb)), + BarTree::Group(_, pb, children, _) => (children, Some(pb)), _ => panic!(), }; @@ -161,7 +170,7 @@ impl ProgressTracker { let pb = Arc::new(created_bar); if group { - surrounding_children.push(BarTree::Group(name, pb.clone(), vec![])); + surrounding_children.push(BarTree::Group(name, pb.clone(), vec![], anticipated_total)); } else { surrounding_children.push(BarTree::Leaf(name, pb.clone(), LeafStatus::Started)); } @@ -189,7 +198,7 @@ impl ProgressTracker { pub fn end_task(&mut self, path: Vec) { match self.tree.find_node(&path[0..path.len() - 1]) { - BarTree::Root(children) | BarTree::Group(_, _, children) => { + BarTree::Root(children) | BarTree::Group(_, _, children, _) => { let removed = children[*path.last().unwrap()].get_pb().unwrap().clone(); children[*path.last().unwrap()] = BarTree::Finished; self.multi_progress.remove(&removed); @@ -213,11 +222,15 @@ impl ProgressTracker { .get_or_init(|| Mutex::new(ProgressTracker::new())) .lock() .unwrap(); - progress_bar.multi_progress.println(msg).unwrap(); + + if progress_bar.multi_progress.println(msg).is_err() { + println!("{}", msg); + } } pub fn with_group<'a, T, F: Future>( name: &str, + anticipated_total: Option, f: impl FnOnce() -> F + 'a, ) -> impl Future + 'a { let mut group = CURRENT_GROUP @@ -229,7 +242,13 @@ impl ProgressTracker { .get_or_init(|| Mutex::new(ProgressTracker::new())) .lock() .unwrap(); - progress_bar.start_task(group.clone(), name.to_string(), true, false) + progress_bar.start_task( + group.clone(), + name.to_string(), + true, + anticipated_total, + false, + ) }; group.push(group_i); @@ -255,7 +274,7 @@ impl ProgressTracker { .get_or_init(|| Mutex::new(ProgressTracker::new())) .lock() .unwrap(); - progress_bar.start_task(group.clone(), name, false, false) + progress_bar.start_task(group.clone(), name, false, None, false) }; group.push(leaf_i); @@ -284,7 +303,7 @@ impl ProgressTracker { .get_or_init(|| Mutex::new(ProgressTracker::new())) .lock() .unwrap(); - progress_bar.start_task(group.clone(), name, false, true) + progress_bar.start_task(group.clone(), name, false, None, true) }; group.push(leaf_i); diff --git a/hydro_cli/src/core/ssh.rs b/hydro_cli/src/core/ssh.rs index bde3fa06d60e..b5cef3f255dd 100644 --- a/hydro_cli/src/core/ssh.rs +++ b/hydro_cli/src/core/ssh.rs @@ -26,6 +26,7 @@ struct LaunchedSSHBinary { channel: AsyncChannel, stdin_sender: Sender, stdout_receivers: Arc>>>, + stdout_cli_receivers: Arc>>>, stderr_receivers: Arc>>>, } @@ -35,6 +36,13 @@ impl LaunchedBinary for LaunchedSSHBinary { self.stdin_sender.clone() } + async fn cli_stdout(&self) -> Receiver { + let mut receivers = self.stdout_cli_receivers.write().await; + let (sender, receiver) = async_channel::unbounded::(); + receivers.push(sender); + receiver + } + async fn stdout(&self) -> Receiver { let mut receivers = self.stdout_receivers.write().await; let (sender, receiver) = async_channel::unbounded::(); @@ -207,15 +215,17 @@ impl LaunchedHost for T { }); let id_clone = id.clone(); - let stdout_receivers = + let (stdout_cli_receivers, stdout_receivers) = create_broadcast(channel.stream(0), move |s| println!("[{id_clone}] {s}")); - let stderr_receivers = create_broadcast(channel.stderr(), move |s| eprintln!("[{id}] {s}")); + let (_, stderr_receivers) = + create_broadcast(channel.stderr(), move |s| eprintln!("[{id}] {s}")); Ok(Arc::new(RwLock::new(LaunchedSSHBinary { _resource_result: self.resource_result().clone(), session: Some(session), channel, stdin_sender, + stdout_cli_receivers, stdout_receivers, stderr_receivers, }))) @@ -243,7 +253,7 @@ impl LaunchedHost for T { // if anyone wants to connect to this forwarded port } - println!("[hydro] closing forwarded port"); + ProgressTracker::println("[hydro] closing forwarded port"); }); Ok(local_addr) diff --git a/hydro_cli/src/core/terraform.rs b/hydro_cli/src/core/terraform.rs index 47a2d7ac15a0..f89af014df69 100644 --- a/hydro_cli/src/core/terraform.rs +++ b/hydro_cli/src/core/terraform.rs @@ -46,6 +46,7 @@ impl TerraformPool { let spawned_child = apply_command .stdout(Stdio::piped()) + .stderr(Stdio::piped()) .spawn() .context("Failed to spawn `terraform`. Is it installed?")?; @@ -114,7 +115,7 @@ impl TerraformBatch { }); } - ProgressTracker::with_group("terraform", || async { + ProgressTracker::with_group("terraform", None, || async { let dothydro_folder = std::env::current_dir().unwrap().join(".hydro"); std::fs::create_dir_all(&dothydro_folder).unwrap(); let deployment_folder = tempfile::tempdir_in(dothydro_folder).unwrap(); @@ -140,9 +141,11 @@ impl TerraformBatch { let (apply_id, apply) = pool.create_apply(deployment_folder)?; - let output = ProgressTracker::with_group("apply", || async { - apply.write().await.output().await - }) + let output = ProgressTracker::with_group( + "apply", + Some(self.resource.values().map(|r| r.len()).sum()), + || async { apply.write().await.output().await }, + ) .await; pool.drop_apply(apply_id); output @@ -226,13 +229,22 @@ impl TerraformApply { async fn output(&mut self) -> Result { let (_, child) = self.child.as_ref().unwrap().clone(); let mut stdout = child.write().unwrap().stdout.take().unwrap(); + let stderr = child.write().unwrap().stderr.take().unwrap(); let status = tokio::task::spawn_blocking(move || { // it is okay for this thread to keep running even if the future is cancelled child.write().unwrap().wait().unwrap() }); - display_apply_outputs(&mut stdout).await; + let display_apply = display_apply_outputs(&mut stdout); + let stderr_loop = tokio::task::spawn_blocking(move || { + let mut lines = BufReader::new(stderr).lines(); + while let Some(Ok(line)) = lines.next() { + ProgressTracker::println(&format!("[terraform] {}", line)); + } + }); + + let _ = futures::join!(display_apply, stderr_loop); let status = status.await; diff --git a/hydro_cli/src/core/util.rs b/hydro_cli/src/core/util.rs index d397a3d52c86..5668212bb57a 100644 --- a/hydro_cli/src/core/util.rs +++ b/hydro_cli/src/core/util.rs @@ -4,7 +4,7 @@ use anyhow::Result; use futures::Future; pub async fn async_retry>>( - thunk: impl Fn() -> F, + mut thunk: impl FnMut() -> F, count: usize, delay: Duration, ) -> Result { diff --git a/hydro_cli/src/lib.rs b/hydro_cli/src/lib.rs index 62e895f38fc1..59c2c3e80ea7 100644 --- a/hydro_cli/src/lib.rs +++ b/hydro_cli/src/lib.rs @@ -236,6 +236,7 @@ impl Deployment { py: Python<'_>, src: String, on: &Host, + bin: Option, example: Option, profile: Option, features: Option>, @@ -248,6 +249,7 @@ impl Deployment { id, src.into(), on.underlying.clone(), + bin, example, profile, features, @@ -286,7 +288,11 @@ impl Deployment { let underlying = self.underlying.clone(); let py_none = py.None(); interruptible_future_to_py(py, async move { - underlying.write().await.start().await; + underlying.write().await.start().await.map_err(|e| { + AnyhowError::new_err(AnyhowWrapper { + underlying: Arc::new(RwLock::new(Some(e))), + }) + })?; Ok(py_none) }) } diff --git a/hydro_cli_examples/Cargo.toml b/hydro_cli_examples/Cargo.toml index 296db8e3c63f..dc1526f2eb1f 100644 --- a/hydro_cli_examples/Cargo.toml +++ b/hydro_cli_examples/Cargo.toml @@ -31,18 +31,6 @@ name = "dedalus_2pc_coordinator" [[example]] name = "dedalus_2pc_participant" -[[example]] -name = "topolotree" - -[[example]] -name = "topolotree_latency_measure" - -[[example]] -name = "pn_counter" - -[[example]] -name = "pn_counter_delta" - [[example]] name = "ws_chat_server" diff --git a/hydro_cli_examples/examples/topolotree/main.rs b/hydro_cli_examples/examples/topolotree/main.rs deleted file mode 100644 index 1725d7bdf9c5..000000000000 --- a/hydro_cli_examples/examples/topolotree/main.rs +++ /dev/null @@ -1,352 +0,0 @@ -use std::collections::{HashMap, HashSet}; -use std::hash::{Hash, Hasher}; - -use hydroflow::serde::{Deserialize, Serialize}; -use hydroflow::util::cli::{ConnectedDirect, ConnectedSink, ConnectedSource}; -use hydroflow::util::{deserialize_from_bytes, serialize_to_bytes}; -use hydroflow::{hydroflow_syntax, tokio}; - -#[derive(Serialize, Deserialize, Clone, Debug)] -struct IncrementRequest { - tweet_id: u64, - likes: i32, -} - -#[derive(Serialize, Deserialize, Default, Copy, Clone, Debug)] -struct TimestampedValue { - pub value: T, - pub timestamp: u32, -} - -impl TimestampedValue { - pub fn merge_from(&mut self, other: TimestampedValue) -> bool { - if other.timestamp > self.timestamp { - self.value = other.value; - self.timestamp = other.timestamp; - true - } else { - false - } - } - - pub fn update(&mut self, updater: impl Fn(&T) -> T) { - self.value = updater(&self.value); - self.timestamp += 1; - } -} - -impl PartialEq for TimestampedValue { - fn eq(&self, other: &Self) -> bool { - self.timestamp == other.timestamp - } -} - -impl Eq for TimestampedValue {} - -impl Hash for TimestampedValue { - fn hash(&self, state: &mut H) { - self.timestamp.hash(state); - } -} - -#[hydroflow::main] -async fn main() { - let mut ports = hydroflow::util::cli::init().await; - let increment_requests = ports - .port("increment_requests") - .connect::() - .await - .into_source(); - - let query_responses = ports - .port("query_responses") - .connect::() - .await - .into_sink(); - - let to_parent = ports - .port("to_parent") - .connect::() - .await - .into_sink(); - - let from_parent = ports - .port("from_parent") - .connect::() - .await - .into_source(); - - let to_left = ports - .port("to_left") - .connect::() - .await - .into_sink(); - - let from_left = ports - .port("from_left") - .connect::() - .await - .into_source(); - - let to_right = ports - .port("to_right") - .connect::() - .await - .into_sink(); - - let from_right = ports - .port("from_right") - .connect::() - .await - .into_source(); - - let f1 = async move { - #[cfg(target_os = "linux")] - loop { - let x = procinfo::pid::stat_self().unwrap(); - let bytes = x.rss * 1024 * 4; - println!("memory,{}", bytes); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - } - }; - - type UpdateType = (u64, i32); - - #[allow(clippy::type_complexity)] - let df = hydroflow_syntax! { - from_parent = source_stream(from_parent) - -> map(|x| deserialize_from_bytes::)>>(x.unwrap()).unwrap()) - -> fold::<'static>( - || (HashMap::>::new(), HashSet::new(), 0), - |(prev, modified_tweets, prev_tick): &mut (HashMap<_, TimestampedValue>, HashSet<_>, _), req: Vec<(u64, TimestampedValue)>| { - if *prev_tick != context.current_tick() { - modified_tweets.clear(); - } - - for (k, v) in req { - let updated = if let Some(e) = prev.get_mut(&k) { - e.merge_from(v) - } else { - prev.insert(k, v); - true - }; - - if updated { - modified_tweets.insert(k); - } - } - - *prev_tick = context.current_tick(); - } - ) - -> filter(|(_, _, tick)| *tick == context.current_tick()) - -> flat_map(|(state, modified_tweets, _)| modified_tweets.iter().map(|t| (*t, *state.get(t).unwrap())).collect::>()) - -> tee(); - - from_left = source_stream(from_left) - -> map(|x| deserialize_from_bytes::)>>(x.unwrap()).unwrap()) - -> fold::<'static>( - || (HashMap::>::new(), HashSet::new(), 0), - |(prev, modified_tweets, prev_tick): &mut (HashMap<_, TimestampedValue>, HashSet<_>, _), req: Vec<(u64, TimestampedValue)>| { - if *prev_tick != context.current_tick() { - modified_tweets.clear(); - } - - for (k, v) in req { - let updated = if let Some(e) = prev.get_mut(&k) { - e.merge_from(v) - } else { - prev.insert(k, v); - true - }; - - if updated { - modified_tweets.insert(k); - } - } - - *prev_tick = context.current_tick(); - } - ) - -> filter(|(_, _, tick)| *tick == context.current_tick()) - -> flat_map(|(state, modified_tweets, _)| modified_tweets.iter().map(|t| (*t, *state.get(t).unwrap())).collect::>()) - -> tee(); - - from_right = source_stream(from_right) - -> map(|x| deserialize_from_bytes::)>>(x.unwrap()).unwrap()) - -> fold::<'static>( - || (HashMap::>::new(), HashSet::new(), 0), - |(prev, modified_tweets, prev_tick): &mut (HashMap<_, TimestampedValue>, HashSet<_>, _), req: Vec<(u64, TimestampedValue)>| { - if *prev_tick != context.current_tick() { - modified_tweets.clear(); - } - - for (k, v) in req { - let updated = if let Some(e) = prev.get_mut(&k) { - e.merge_from(v) - } else { - prev.insert(k, v); - true - }; - - if updated { - modified_tweets.insert(k); - } - } - - *prev_tick = context.current_tick(); - } - ) - -> filter(|(_, _, tick)| *tick == context.current_tick()) - -> flat_map(|(state, modified_tweets, _)| modified_tweets.iter().map(|t| (*t, *state.get(t).unwrap())).collect::>()) - -> tee(); - - from_local = source_stream(increment_requests) - -> map(|x| deserialize_from_bytes::(&x.unwrap()).unwrap()) - -> map(|x| (x.tweet_id, x.likes)) - -> fold::<'static>( - || (HashMap::>::new(), HashSet::new(), 0), - |(prev, modified_tweets, prev_tick): &mut (HashMap<_, TimestampedValue>, HashSet<_>, usize), req: UpdateType| { - if *prev_tick != context.current_tick() { - modified_tweets.clear(); - } - - prev.entry(req.0).or_default().update(|v| v + req.1); - modified_tweets.insert(req.0); - *prev_tick = context.current_tick(); - } - ) - -> filter(|(_, _, tick)| *tick == context.current_tick()) - -> flat_map(|(state, modified_tweets, _)| modified_tweets.iter().map(|t| (*t, *state.get(t).unwrap())).collect::>()) - -> tee(); - - to_right = union(); - - from_parent -> map(|v| (0, v)) -> to_right; - from_left -> map(|v| (1, v)) -> to_right; - from_local -> map(|v| (2, v)) -> to_right; - - to_right - -> fold::<'static>( - || (vec![HashMap::>::new(); 3], HashMap::>::new(), HashSet::new(), 0), - |(each_source, acc_source, modified_tweets, prev_tick): &mut (Vec>>, HashMap<_, TimestampedValue>, HashSet<_>, usize), (source_i, (key, v)): (usize, _)| { - if *prev_tick != context.current_tick() { - modified_tweets.clear(); - } - - let updated = each_source[source_i].entry(key).or_default().merge_from(v); - - if updated { - acc_source.entry(key).or_default().update(|_| each_source.iter().map(|s| s.get(&key).map(|t| t.value).unwrap_or_default()).sum()); - modified_tweets.insert(key); - } - - *prev_tick = context.current_tick(); - } - ) - -> filter(|(_, _, _, tick)| *tick == context.current_tick()) - -> map(|(_, state, modified_tweets, _)| modified_tweets.iter().map(|t| (*t, *state.get(t).unwrap())).collect()) - -> map(serialize_to_bytes::)>>) - -> dest_sink(to_right); - - to_left = union(); - - from_parent -> map(|v| (0, v)) -> to_left; - from_right -> map(|v| (1, v)) -> to_left; - from_local -> map(|v| (2, v)) -> to_left; - - to_left - -> fold::<'static>( - || (vec![HashMap::>::new(); 3], HashMap::>::new(), HashSet::new(), 0), - |(each_source, acc_source, modified_tweets, prev_tick): &mut (Vec>>, HashMap<_, TimestampedValue>, HashSet<_>, usize), (source_i, (key, v)): (usize, _)| { - if *prev_tick != context.current_tick() { - modified_tweets.clear(); - } - - let updated = each_source[source_i].entry(key).or_default().merge_from(v); - - if updated { - acc_source.entry(key).or_default().update(|_| each_source.iter().map(|s| s.get(&key).map(|t| t.value).unwrap_or_default()).sum()); - modified_tweets.insert(key); - } - - *prev_tick = context.current_tick(); - } - ) - -> filter(|(_, _, _, tick)| *tick == context.current_tick()) - -> map(|(_, state, modified_tweets, _)| modified_tweets.iter().map(|t| (*t, *state.get(t).unwrap())).collect()) - -> map(serialize_to_bytes::)>>) - -> dest_sink(to_left); - - to_parent = union(); - - from_right -> map(|v| (0, v)) -> to_parent; - from_left -> map(|v| (1, v)) -> to_parent; - from_local -> map(|v| (2, v)) -> to_parent; - - to_parent - -> fold::<'static>( - || (vec![HashMap::>::new(); 3], HashMap::>::new(), HashSet::new(), 0), - |(each_source, acc_source, modified_tweets, prev_tick): &mut (Vec>>, HashMap<_, TimestampedValue>, HashSet<_>, usize), (source_i, (key, v)): (usize, _)| { - if *prev_tick != context.current_tick() { - modified_tweets.clear(); - } - - let updated = each_source[source_i].entry(key).or_default().merge_from(v); - - if updated { - acc_source.entry(key).or_default().update(|_| each_source.iter().map(|s| s.get(&key).map(|t| t.value).unwrap_or_default()).sum()); - modified_tweets.insert(key); - } - - *prev_tick = context.current_tick(); - } - ) - -> filter(|(_, _, _, tick)| *tick == context.current_tick()) - -> map(|(_, state, modified_tweets, _)| modified_tweets.iter().map(|t| (*t, *state.get(t).unwrap())).collect()) - -> map(serialize_to_bytes::)>>) - -> dest_sink(to_parent); - - to_query = union(); - - from_parent -> map(|v| (0, v)) -> to_query; - from_left -> map(|v| (1, v)) -> to_query; - from_right -> map(|v| (2, v)) -> to_query; - from_local -> map(|v| (3, v)) -> to_query; - - to_query - -> fold::<'static>( - || (vec![HashMap::>::new(); 4], HashMap::>::new(), HashSet::new(), 0), - |(each_source, acc_source, modified_tweets, prev_tick): &mut (Vec>>, HashMap<_, TimestampedValue>, HashSet<_>, usize), (source_i, (key, v)): (usize, _)| { - if *prev_tick != context.current_tick() { - modified_tweets.clear(); - } - - let updated = each_source[source_i].entry(key).or_default().merge_from(v); - - if updated { - acc_source.entry(key).or_default().update(|_| each_source.iter().map(|s| s.get(&key).map(|t| t.value).unwrap_or_default()).sum()); - modified_tweets.insert(key); - } - - *prev_tick = context.current_tick(); - } - ) - -> filter(|(_, _, _, tick)| *tick == context.current_tick()) - -> flat_map(|(_, state, modified_tweets, _)| modified_tweets.iter().map(|t| (*t, state.get(t).unwrap().value)).collect::>()) - -> map(serialize_to_bytes::<(u64, i32)>) - -> dest_sink(query_responses); - }; - - // initial memory - #[cfg(target_os = "linux")] - { - let x = procinfo::pid::stat_self().unwrap(); - let bytes = x.rss * 1024 * 4; - println!("memory,{}", bytes); - } - - let f1_handle = tokio::spawn(f1); - hydroflow::util::cli::launch_flow(df).await; - f1_handle.abort(); -} diff --git a/hydro_cli_examples/topolotree.hydro.py b/hydro_cli_examples/topolotree.hydro.py deleted file mode 100644 index 211757a75516..000000000000 --- a/hydro_cli_examples/topolotree.hydro.py +++ /dev/null @@ -1,181 +0,0 @@ -import asyncio -from codecs import decode -from typing import Optional -import hydro -import json -from pathlib import Path -from aiostream import stream - -class Tree: - def __init__(self, node, left, right): - self.node = node - self.left = left - self.right = right - - def map(self, transform): - return Tree( - transform(self.node), - self.left.map(transform) if self.left is not None else None, - self.right.map(transform) if self.right is not None else None - ) - - def flatten_with_path(self, cur_path=""): - return [(self.node, cur_path)] + \ - (self.left.flatten_with_path(cur_path + "L") if self.left is not None else []) + \ - (self.right.flatten_with_path(cur_path + "R") if self.right is not None else []) - - async def map_async(self, transform): - return Tree( - await transform(self.node), - (await self.left.map_async(transform)) if self.left is not None else None, - (await self.right.map_async(transform)) if self.right is not None else None - ) - -def create_tree(depth, deployment, create_machine) -> Optional[Tree]: - if depth == 0: - return None - else: - self_service = deployment.HydroflowCrate( - src=str(Path(__file__).parent.absolute()), - example="topolotree", - on=create_machine() - ) - - left = create_tree(depth - 1, deployment, create_machine) - right = create_tree(depth - 1, deployment, create_machine) - - if left is not None: - self_service.ports.to_left.send_to(left.node.ports.from_parent) - left.node.ports.to_parent.send_to(self_service.ports.from_left) - else: - self_service.ports.to_left.send_to(hydro.null()) - hydro.null().send_to(self_service.ports.from_left) - - if right is not None: - self_service.ports.to_right.send_to(right.node.ports.from_parent) - right.node.ports.to_parent.send_to(self_service.ports.from_right) - else: - self_service.ports.to_right.send_to(hydro.null()) - hydro.null().send_to(self_service.ports.from_right) - - return Tree( - self_service, - left, - right - ) - -# hydro deploy ../hydro_cli_examples/toplotree.hydro.py -- local/gcp DEPTH_OF_TREE -async def main(args): - tree_depth = int(args[1]) - deployment = hydro.Deployment() - - localhost_machine = deployment.Localhost() - - python_sender = deployment.CustomService( - external_ports=[], - on=localhost_machine, - ) - - gcp_vpc = hydro.GCPNetwork( - project="hydro-chrisdouglas", - ) - - def create_machine(): - if args[0] == "gcp": - return deployment.GCPComputeEngineHost( - project="hydro-chrisdouglas", - machine_type="e2-micro", - image="debian-cloud/debian-11", - region="us-west1-a", - network=gcp_vpc - ) - else: - return localhost_machine - - tree = create_tree(tree_depth, deployment, create_machine) - tree.node.ports.to_parent.send_to(hydro.null()) - hydro.null().send_to(tree.node.ports.from_parent) - - def create_increment_port(node): - port = python_sender.client_port() - port.send_to(node.ports.increment_requests) - return port - - tree_increment_ports = tree.map(create_increment_port) - - def create_query_response_port(node): - port = python_sender.client_port() - node.ports.query_responses.send_to(port) - return port - - tree_query_response_ports = tree.map(create_query_response_port) - - await deployment.deploy() - - async def get_increment_channel(port): - return await (await port.server_port()).into_sink() - tree_increment_channels = await tree_increment_ports.map_async(get_increment_channel) - - async def get_query_response_channel(port): - return await (await port.server_port()).into_source() - tree_query_response_channels = await tree_query_response_ports.map_async(get_query_response_channel) - - async def get_stdouts(node): - return await node.stdout() - tree_stdouts = await tree.map_async(get_stdouts) - - def stream_printer(path, v): - parsed = json.loads(decode(bytes(v), "utf-8")) - return f"{path}: {parsed}" - - print("deployed!") - - with_path_responses = [ - stream.map(response, lambda x,path=path: stream_printer(path, x)) - for (response, path) in tree_query_response_channels.flatten_with_path() - ] - - async def print_queries(): - try: - async with stream.merge(*with_path_responses).stream() as merged: - async for log in merged: - print(log) - except asyncio.CancelledError: - pass - - with_stdouts = [ - stream.map(stdout, lambda x,path=path: (path, x)) - for (stdout, path) in tree_stdouts.flatten_with_path() - ] - - async def print_stdouts(): - try: - async with stream.merge(*with_stdouts).stream() as merged: - async for path, log in merged: - if not log.startswith("to_query"): - return f"{path}: {log}" - except asyncio.CancelledError: - pass - - print_query_task = asyncio.create_task(print_queries()) - print_stdout_task = asyncio.create_task(print_stdouts()) - try: - await deployment.start() - print("started!") - - await asyncio.sleep(1) - - for i in range(1000000): - if i % 10000 == 0: - print(f"sending increment {i}") - await tree_increment_channels.node.send(bytes("{\"tweet_id\": " + str(i) + ", \"likes\": " + str(i % 2 * 2 - 1) + "}", "utf-8")) - finally: - print_query_task.cancel() - print_stdout_task.cancel() - await print_query_task - await print_stdout_task - -if __name__ == "__main__": - import sys - import hydro.async_wrapper - hydro.async_wrapper.run(main, sys.argv[1:]) diff --git a/hydro_cli_examples/topolotree_latency.hydro.py b/hydro_cli_examples/topolotree_latency.hydro.py deleted file mode 100644 index cf1a88c11a6c..000000000000 --- a/hydro_cli_examples/topolotree_latency.hydro.py +++ /dev/null @@ -1,306 +0,0 @@ -import asyncio -from codecs import decode -from typing import Optional -from venv import create -import hydro -import json -from pathlib import Path -from aiostream import stream - -import pandas as pd -import numpy as np -import uuid - - -class Tree: - def __init__(self, node, left, right): - self.node = node - self.left = left - self.right = right - - def map(self, transform): - return Tree( - transform(self.node), - self.left.map(transform) if self.left is not None else None, - self.right.map(transform) if self.right is not None else None - ) - - def flatten_with_path(self, cur_path=""): - return [(self.node, cur_path)] + \ - (self.left.flatten_with_path(cur_path + "L") if self.left is not None else []) + \ - (self.right.flatten_with_path(cur_path + "R") if self.right is not None else []) - - async def map_async(self, transform): - return Tree( - await transform(self.node), - (await self.left.map_async(transform)) if self.left is not None else None, - (await self.right.map_async(transform)) if self.right is not None else None - ) - -def create_tree(depth, deployment, create_machine) -> Optional[Tree]: - if depth == 0: - return None - else: - self_service = deployment.HydroflowCrate( - src=str(Path(__file__).parent.absolute()), - example="topolotree", - on=create_machine() - ) - - left = create_tree(depth - 1, deployment, create_machine) - right = create_tree(depth - 1, deployment, create_machine) - - if left is not None: - self_service.ports.to_left.send_to(left.node.ports.from_parent) - left.node.ports.to_parent.send_to(self_service.ports.from_left) - else: - self_service.ports.to_left.send_to(hydro.null()) - hydro.null().send_to(self_service.ports.from_left) - - if right is not None: - self_service.ports.to_right.send_to(right.node.ports.from_parent) - right.node.ports.to_parent.send_to(self_service.ports.from_right) - else: - self_service.ports.to_right.send_to(hydro.null()) - hydro.null().send_to(self_service.ports.from_right) - - return Tree( - self_service, - left, - right - ) - -async def run_experiment(deployment, machine_pool, experiment_id, summaries_file, tree_arg, depth_arg, clients_arg, is_gcp, gcp_vpc): - tree_depth = int(depth_arg) - is_tree = tree_arg == "topolo" # or "pn" - - num_replicas = 2 ** tree_depth - 1 - - num_clients = int(clients_arg) - - localhost_machine = deployment.Localhost() - - currently_deployed = [] - def create_machine(): - if len(machine_pool) > 0: - print("Using machine from pool") - ret = machine_pool.pop() - currently_deployed.append(ret) - return ret - else: - if is_gcp: - out = deployment.GCPComputeEngineHost( - project="hydro-chrisdouglas", - machine_type="n2-standard-4", - image="debian-cloud/debian-11", - region="us-west1-a", - network=gcp_vpc - ) - else: - out = localhost_machine - currently_deployed.append(out) - return out - - all_nodes = [] - if is_tree: - tree = create_tree(tree_depth, deployment, create_machine) - tree.node.ports.to_parent.send_to(hydro.null()) - hydro.null().send_to(tree.node.ports.from_parent) - all_nodes = [tup[0] for tup in tree.flatten_with_path()] - else: - cluster = [ - deployment.HydroflowCrate( - src=str(Path(__file__).parent.absolute()), - example="pn_counter" if tree_arg == "pn" else "pn_counter_delta", - args=[json.dumps([i]), json.dumps([num_replicas])], - on=create_machine() - ) - for i in range(num_replicas) - ] - - for i in range(num_replicas): - cluster[i].ports.to_peer.send_to(hydro.demux( - { - j: cluster[j].ports.from_peer.merge() - for j in range(num_replicas) - if i != j - } - )) - - all_nodes = cluster - - if is_tree: - source = tree - while source.left is not None: - source = source.left - source = source.node - - dest = tree - while dest.right is not None: - dest = dest.right - dest = dest.node - else: - source = cluster[0] - dest = cluster[-1] - - for node in all_nodes: - if node is not dest: - node.ports.query_responses.send_to(hydro.null()) - if node is not source: - hydro.null().send_to(node.ports.increment_requests) - - latency_measurer = deployment.HydroflowCrate( - src=str(Path(__file__).parent.absolute()), - example="topolotree_latency_measure", - args=[json.dumps([num_clients])], - on=create_machine() - ) - - latency_measurer.ports.increment_start_node.send_to(source.ports.increment_requests.merge()) - dest.ports.query_responses.send_to(latency_measurer.ports.end_node_query) - - await deployment.deploy() - - print("deployed!") - - latency = [] - memory_per_node = [[] for _ in range(num_replicas)] - throughput_raw = [] - - throughput = [] - - latency_stdout = await latency_measurer.stdout() - - memories_streams_with_index = [ - stream.map( - await node.stdout(), - lambda x,i=i: (i, x) - ) - for i, node in enumerate(all_nodes) - ] - - async def memory_plotter(): - try: - async with stream.merge(*memories_streams_with_index).stream() as merged: - async for node_idx, line in merged: - line_split = line.split(",") - if line_split[0] == "memory": - memory_per_node[node_idx].append(int(line_split[1])) - except asyncio.CancelledError: - return - - memory_plotter_task = asyncio.create_task(memory_plotter()) - - async def latency_plotter(): - try: - async for line in latency_stdout: - line_split = line.split(",") - if line_split[0] == "throughput": - count = int(line_split[1]) - period = float(line_split[2]) - throughput_raw.append([count, period]) - throughput.append(count / period) - elif line_split[0] == "latency": - number = int(line_split[1]) # microseconds - latency.append(number) - except asyncio.CancelledError: - return - - latency_plotter_task = asyncio.create_task(latency_plotter()) - - await deployment.start() - print("started!") - - await asyncio.sleep(30) - - await latency_measurer.stop() - await asyncio.gather(*[node.stop() for node in all_nodes]) - - memory_plotter_task.cancel() - await memory_plotter_task - - latency_plotter_task.cancel() - await latency_plotter_task - - def summarize(v, kind): - print("mean = ", np.mean(v)) - print("std = ", np.std(v)) - print("min = ", np.min(v)) - print("max = ", np.max(v)) - print("percentile 99 = ", np.percentile(v, 99)) - print("percentile 75 = ", np.percentile(v, 75)) - print("percentile 50 = ", np.percentile(v, 50)) - print("percentile 25 = ", np.percentile(v, 25)) - print("percentile 1 = ", np.percentile(v, 1)) - - summaries_file.write("\n") - summaries_file.write(tree_arg + ",") - summaries_file.write(str(tree_depth) + ",") - summaries_file.write(str(num_clients) + ",") - summaries_file.write(kind + ",") - summaries_file.write(str(np.mean(v)) + ",") - summaries_file.write(str(np.std(v)) + ",") - summaries_file.write(str(np.min(v)) + ",") - summaries_file.write(str(np.max(v)) + ",") - summaries_file.write(str(np.percentile(v, 99)) + ",") - summaries_file.write(str(np.percentile(v, 75)) + ",") - summaries_file.write(str(np.percentile(v, 50)) + ",") - summaries_file.write(str(np.percentile(v, 25)) + ",") - summaries_file.write(str(np.percentile(v, 1))) - summaries_file.flush() - - print("latency:") - summarize(latency, "latency") - - print("throughput:") - summarize(throughput, "throughput") - - init_memory = [ - memory[0] - for memory in memory_per_node - ] - print("init memory:") - summarize(init_memory, "init_memory") - - final_memory = [ - memory[-1] - for memory in memory_per_node - ] - print("final memory:") - summarize(final_memory, "final_memory") - - pd.DataFrame(latency).to_csv("latency_" + tree_arg + "_tree_depth_" + str(tree_depth) + "_num_clients_" + str(num_clients) + "_" + experiment_id+".csv", index=False, header=["latency"]) - pd.DataFrame(throughput_raw).to_csv("throughput_" + tree_arg + "_tree_depth_" + str(tree_depth) + "_num_clients_" + str(num_clients) + "_" + experiment_id+".csv", index=False, header=["count", "period"]) - pd.DataFrame(init_memory).to_csv("init_memory_" + tree_arg + "_tree_depth_" + str(tree_depth) + "_num_clients_" + str(num_clients) + "_" + experiment_id+".csv", index=False, header=["memory"]) - pd.DataFrame(final_memory).to_csv("final_memory_" + tree_arg + "_tree_depth_" + str(tree_depth) + "_num_clients_" + str(num_clients) + "_" + experiment_id+".csv", index=False, header=["memory"]) - - for machine in currently_deployed: - machine_pool.append(machine) - -# hydro deploy ../hydro_cli_examples/toplotree_latency.hydro.py -- local/gcp DEPTH_OF_TREE -async def main(args): - # the current timestamp - import datetime - experiment_id = str(datetime.datetime.now()) - - summaries_file = open(f"summaries_{experiment_id}.csv", "w") - summaries_file.write("protocol,tree_depth,num_clients,kind,mean,std,min,max,percentile_99,percentile_75,percentile_50,percentile_25,percentile_1") - - deployment = hydro.Deployment() - pool = [] - - network = hydro.GCPNetwork( - project="hydro-chrisdouglas", - ) if args[0] == "gcp" else None - - for depth_arg in args[2].split(","): - for tree_arg in args[1].split(","): - for num_clients_arg in args[3].split(","): - await run_experiment(deployment, pool, experiment_id, summaries_file, tree_arg, depth_arg, num_clients_arg, args[0] == "gcp", network) - - summaries_file.close() - -if __name__ == "__main__": - import sys - import hydro.async_wrapper - hydro.async_wrapper.run(main, sys.argv[1:]) diff --git a/hydroflow/src/util/cli.rs b/hydroflow/src/util/cli.rs index c5f359fa0af6..d14647b9b406 100644 --- a/hydroflow/src/util/cli.rs +++ b/hydroflow/src/util/cli.rs @@ -70,6 +70,8 @@ pub async fn init() -> HydroCLI { all_connected.insert(name, ServerOrBound::Bound(defn)); } + println!("ack start"); + HydroCLI { ports: all_connected, } diff --git a/hydroflow_macro/src/lib.rs b/hydroflow_macro/src/lib.rs index 82d611291c0d..9b2b55304c79 100644 --- a/hydroflow_macro/src/lib.rs +++ b/hydroflow_macro/src/lib.rs @@ -204,15 +204,16 @@ pub fn monotonic_fn(item: proc_macro::TokenStream) -> proc_macro::TokenStream { #[proc_macro_attribute] pub fn hydroflow_test( - _: proc_macro::TokenStream, + args: proc_macro::TokenStream, item: proc_macro::TokenStream, ) -> proc_macro::TokenStream { let root = root(); + let args_2: proc_macro2::TokenStream = args.into(); hydroflow_wrap( item, parse_quote!( - #[#root::tokio::test(flavor = "current_thread")] + #[#root::tokio::test(flavor = "current_thread", #args_2)] ), ) } diff --git a/rust-toolchain.toml b/rust-toolchain.toml index e4ce2a1471f9..a3ed5e9f200e 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,4 +1,4 @@ [toolchain] channel = "nightly-2023-10-24" components = ["rustfmt", "clippy"] -targets = ["wasm32-unknown-unknown"] +targets = ["wasm32-unknown-unknown", "x86_64-unknown-linux-musl"] diff --git a/topolotree/.gitignore b/topolotree/.gitignore index 29a7166e8eed..afed0735dc96 100644 --- a/topolotree/.gitignore +++ b/topolotree/.gitignore @@ -1,2 +1 @@ *.csv - diff --git a/topolotree/Cargo.toml b/topolotree/Cargo.toml new file mode 100644 index 000000000000..07d545d7abeb --- /dev/null +++ b/topolotree/Cargo.toml @@ -0,0 +1,39 @@ +[package] +name = "topolotree" +publish = false +version = "0.0.0" +edition = "2021" + +[[bin]] +name = "topolotree" +path = "src/main.rs" + +[[bin]] +name = "pn" +path = "src/pn.rs" + +[[bin]] +name = "pn_delta" +path = "src/pn_delta.rs" + +[[bin]] +name = "latency_measure" +path = "src/latency_measure.rs" + +[dependencies] +hydroflow = { path = "../hydroflow", features = [ "cli_integration" ] } +hydroflow_datalog = { path = "../hydroflow_datalog" } + +tokio = { version = "1.16", features = [ "full" ] } +serde = { version = "1", features = ["rc"] } +serde_json = "1" +rand = "0.8.5" +dashmap = "5.4.0" + +futures = "0.3.28" + +[dev-dependencies] +tokio = { version = "1.16", features = [ "full", "test-util" ] } + +[target.'cfg(target_os = "linux")'.dependencies] +procinfo = "0.4.2" diff --git a/hydro_cli_examples/examples/topolotree_latency_measure/main.rs b/topolotree/src/latency_measure.rs similarity index 78% rename from hydro_cli_examples/examples/topolotree_latency_measure/main.rs rename to topolotree/src/latency_measure.rs index 56beda564538..afc2fe86ee4c 100644 --- a/hydro_cli_examples/examples/topolotree_latency_measure/main.rs +++ b/topolotree/src/latency_measure.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::sync::atomic::AtomicU64; use std::sync::{mpsc, Arc}; use std::thread; @@ -5,18 +6,14 @@ use std::time::Instant; use futures::{SinkExt, StreamExt}; use hydroflow::bytes::Bytes; -use hydroflow::serde::{Deserialize, Serialize}; use hydroflow::tokio; use hydroflow::util::cli::{ConnectedDirect, ConnectedSink, ConnectedSource}; use hydroflow::util::{deserialize_from_bytes, serialize_to_bytes}; -#[derive(Serialize, Deserialize, Clone, Debug)] -struct IncrementRequest { - tweet_id: u64, - likes: i32, -} +mod protocol; +use protocol::*; -#[hydroflow::main] +#[tokio::main] async fn main() { let mut ports = hydroflow::util::cli::init().await; let mut start_node = ports @@ -64,27 +61,34 @@ async fn main() { let mut queues = vec![]; for i in 0..num_clients { - let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel::(); + let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel::(); queues.push(sender); let inc_sender = inc_sender.clone(); let latency_sender = latency_sender.clone(); let atomic_counter = atomic_counter.clone(); tokio::spawn(async move { + #[cfg(debug_assertions)] + let mut count_tracker = HashMap::new(); + loop { let id = ((rand::random::() % 1024) / (num_clients as u64)) * (num_clients as u64) + (i as u64); let increment = rand::random::(); + let change = if increment { 1 } else { -1 }; let start = Instant::now(); inc_sender - .send(serialize_to_bytes(IncrementRequest { - tweet_id: id, - likes: if increment { 1 } else { -1 }, - })) + .send(serialize_to_bytes(OperationPayload { key: id, change })) .unwrap(); - receiver.recv().await.unwrap(); + let received = receiver.recv().await.unwrap(); + #[cfg(debug_assertions)] + { + let count = count_tracker.entry(id).or_insert(0); + *count += change; + assert!(*count == received); + } latency_sender.send(start.elapsed().as_micros()).unwrap(); @@ -96,10 +100,10 @@ async fn main() { tokio::spawn(async move { loop { let updated = - deserialize_from_bytes::<(u64, i32)>(end_node.next().await.unwrap().unwrap()) + deserialize_from_bytes::(end_node.next().await.unwrap().unwrap()) .unwrap(); - if queues[(updated.0 % (num_clients as u64)) as usize] - .send(updated.1) + if queues[(updated.key % (num_clients as u64)) as usize] + .send(updated.value) .is_err() { break; diff --git a/topolotree/src/main.rs b/topolotree/src/main.rs new file mode 100644 index 000000000000..7e354475f8e6 --- /dev/null +++ b/topolotree/src/main.rs @@ -0,0 +1,254 @@ +#[cfg(test)] +mod tests; + +use std::cell::RefCell; +use std::collections::HashMap; +use std::fmt::{Debug, Display}; +use std::io; +use std::rc::Rc; +use std::time::Duration; + +use futures::{SinkExt, Stream}; +use hydroflow::bytes::{Bytes, BytesMut}; +use hydroflow::hydroflow_syntax; +use hydroflow::scheduled::graph::Hydroflow; +use hydroflow::util::cli::{ + ConnectedDemux, ConnectedDirect, ConnectedSink, ConnectedSource, ConnectedTagged, +}; + +mod protocol; +use hydroflow::util::{deserialize_from_bytes, serialize_to_bytes}; +use protocol::*; +use tokio::time::Instant; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +struct NodeID(pub u32); + +impl Display for NodeID { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + Display::fmt(&self.0, f) + } +} + +type PostNeighborJoin = (((u64, Option), (i64, usize)), NodeID); + +fn run_topolotree( + neighbors: Vec, + input_recv: impl Stream> + Unpin + 'static, + increment_requests: impl Stream> + Unpin + 'static, + output_send: tokio::sync::mpsc::UnboundedSender<(u32, Bytes)>, + query_send: tokio::sync::mpsc::UnboundedSender, +) -> Hydroflow<'static> { + fn merge(x: &mut i64, y: i64) { + *x += y; + } + + // Timestamp stuff is a bit complicated, there is a proper data-flowy way to do it + // but it would require at least one more join and one more cross join just specifically for the local timestamps + // Until we need it to be proper then we can take a shortcut and use rc refcell + let self_timestamp = Rc::new(RefCell::new(HashMap::::new())); + + let self_timestamp1 = Rc::clone(&self_timestamp); + let self_timestamp2 = Rc::clone(&self_timestamp); + let self_timestamp3 = Rc::clone(&self_timestamp); + + // we use current tick to keep track of which *keys* have been modified + + hydroflow_syntax! { + parsed_input = source_stream(input_recv) + -> map(Result::unwrap) + -> map(|(src, x)| (NodeID(src), deserialize_from_bytes::(&x).unwrap())) + -> demux(|(src, msg), var_args!(payload, ping, pong)| { + match msg { + TopolotreeMessage::Payload(p) => payload.give((src, p)), + TopolotreeMessage::Ping() => ping.give((src, ())), + TopolotreeMessage::Pong() => pong.give((src, ())), + } + }); + + from_neighbors = parsed_input[payload]; + pings = parsed_input[ping] -> tee(); + pongs = parsed_input[pong] -> tee(); + + pings -> map(|(src, _)| (src, TopolotreeMessage::Pong())) -> output; + + // generate a ping every second + neighbors -> [0]ping_generator; + source_interval(Duration::from_secs(1)) -> [1]ping_generator; + ping_generator = cross_join() + -> map(|(src, _)| (src, TopolotreeMessage::Ping())) + -> output; + + pongs -> dead_neighbors; + pings -> dead_neighbors; + new_neighbors -> map(|neighbor| (neighbor, ())) -> dead_neighbors; // fake pong + dead_neighbors = union() -> fold_keyed::<'static>(Instant::now, |acc: &mut Instant, _| { + *acc = Instant::now(); + }) + -> filter_map(|(node_id, acc)| { + if acc.elapsed().as_secs() > 5 { + Some(node_id) + } else { + None + } + }) -> tee(); + + from_neighbors + -> map(|(src, payload): (NodeID, Payload)| ((payload.key, src), (payload.key, payload.contents))) + -> fold_keyed::<'static>(|| (Timestamped { timestamp: -1, data: Default::default() }, 0), |acc: &mut (Timestamped, usize), (key, val): (u64, Timestamped)| { + if val.timestamp > acc.0.timestamp { + acc.0 = val; + *self_timestamp1.borrow_mut().entry(key).or_insert(0) += 1; + acc.1 = context.current_tick(); + } + }) + -> map(|((key, src), (payload, change_tick))| ((key, Some(src)), (payload.data, change_tick))) + -> from_neighbors_or_local; + + local_value = source_stream(increment_requests) + -> map(|x| deserialize_from_bytes::(&x.unwrap()).unwrap()) + -> inspect(|change| { + *self_timestamp2.borrow_mut().entry(change.key).or_insert(0) += 1; + }) + -> map(|change_payload: OperationPayload| (change_payload.key, (change_payload.change, context.current_tick()))) + -> reduce_keyed::<'static>(|agg: &mut (i64, usize), change: (i64, usize)| { + agg.0 += change.0; + agg.1 = std::cmp::max(agg.1, change.1); + }); + + local_value -> map(|(key, data)| ((key, None), data)) -> from_neighbors_or_local; + + from_neighbors_or_local = union() -> tee(); + from_neighbors_or_local -> [0]all_neighbor_data; + + new_neighbors = source_iter(neighbors) + -> map(NodeID) + -> tee(); + + new_neighbors + -> persist() + -> [pos]neighbors; + dead_neighbors -> [neg]neighbors; + neighbors = difference() + -> tee(); + + neighbors -> [1]all_neighbor_data; + + query_result = from_neighbors_or_local + -> map(|((key, _), payload): ((u64, _), (i64, usize))| { + (key, payload) + }) + -> reduce_keyed(|acc: &mut (i64, usize), (data, change_tick): (i64, usize)| { + merge(&mut acc.0, data); + acc.1 = std::cmp::max(acc.1, change_tick); + }) + -> filter(|(_, (_, change_tick))| *change_tick == context.current_tick()) + -> for_each(|(key, (data, _))| { + let serialized = serialize_to_bytes(QueryResponse { + key, + value: data + }); + query_send.send(serialized).unwrap(); + }); + + all_neighbor_data = cross_join_multiset() + -> filter(|(((_, aggregate_from_this_guy), _), target_neighbor): &PostNeighborJoin| { + aggregate_from_this_guy.iter().all(|source| source != target_neighbor) + }) + -> map(|(((key, _), payload), target_neighbor)| { + ((key, target_neighbor), payload) + }) + -> reduce_keyed(|acc: &mut (i64, usize), (data, change_tick): (i64, usize)| { + merge(&mut acc.0, data); + acc.1 = std::cmp::max(acc.1, change_tick); + }) + -> filter(|(_, (_, change_tick))| *change_tick == context.current_tick()) + -> map(|((key, target_neighbor), (data, _))| (target_neighbor, Payload { + key, + contents: Timestamped { + timestamp: self_timestamp3.borrow().get(&key).copied().unwrap_or(0), + data, + } + })) + -> map(|(target_neighbor, payload)| (target_neighbor, TopolotreeMessage::Payload(payload))) + -> output; + + output = union() -> for_each(|(target_neighbor, output): (NodeID, TopolotreeMessage)| { + let serialized = serialize_to_bytes(output); + output_send.send((target_neighbor.0, serialized)).unwrap(); + }); + } +} + +#[hydroflow::main] +async fn main() { + let args: Vec = std::env::args().skip(1).collect(); + let neighbors: Vec = args.into_iter().map(|x| x.parse().unwrap()).collect(); + + let mut ports = hydroflow::util::cli::init().await; + + let input_recv = ports + .port("from_peer") + // connect to the port with a single recipient + .connect::>() + .await + .into_source(); + + let mut output_send = ports + .port("to_peer") + .connect::>() + .await + .into_sink(); + + let operations_send = ports + .port("increment_requests") + // connect to the port with a single recipient + .connect::() + .await + .into_source(); + + let mut query_responses = ports + .port("query_responses") + .connect::() + .await + .into_sink(); + + let (chan_tx, mut chan_rx) = tokio::sync::mpsc::unbounded_channel(); + + tokio::task::spawn_local(async move { + while let Some(msg) = chan_rx.recv().await { + output_send.send(msg).await.unwrap(); + } + }); + + let (query_tx, mut query_rx) = tokio::sync::mpsc::unbounded_channel(); + tokio::task::spawn_local(async move { + while let Some(msg) = query_rx.recv().await { + query_responses.send(msg).await.unwrap(); + } + }); + + let flow = run_topolotree(neighbors, input_recv, operations_send, chan_tx, query_tx); + + let f1 = async move { + #[cfg(target_os = "linux")] + loop { + let x = procinfo::pid::stat_self().unwrap(); + let bytes = x.rss * 1024 * 4; + println!("memory,{}", bytes); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + }; + + // initial memory + #[cfg(target_os = "linux")] + { + let x = procinfo::pid::stat_self().unwrap(); + let bytes = x.rss * 1024 * 4; + println!("memory,{}", bytes); + } + + let f1_handle = tokio::spawn(f1); + hydroflow::util::cli::launch_flow(flow).await; + f1_handle.abort(); +} diff --git a/hydro_cli_examples/examples/pn_counter/main.rs b/topolotree/src/pn.rs similarity index 85% rename from hydro_cli_examples/examples/pn_counter/main.rs rename to topolotree/src/pn.rs index 7e2caf00d143..d43b7a6ba3e9 100644 --- a/hydro_cli_examples/examples/pn_counter/main.rs +++ b/topolotree/src/pn.rs @@ -4,22 +4,21 @@ use std::ops::Deref; use std::rc::Rc; use hydroflow::serde::{Deserialize, Serialize}; -use hydroflow::util::cli::{ConnectedDemux, ConnectedDirect, ConnectedSink, ConnectedSource}; +use hydroflow::util::cli::{ + ConnectedDemux, ConnectedDirect, ConnectedSink, ConnectedSource, ConnectedTagged, +}; use hydroflow::util::{deserialize_from_bytes, serialize_to_bytes}; use hydroflow::{hydroflow_syntax, tokio}; -#[derive(Serialize, Deserialize, Clone, Debug)] -struct IncrementRequest { - tweet_id: u64, - likes: i32, -} +mod protocol; +use protocol::*; -type NextStateType = (u64, Rc, Vec)>>); +type NextStateType = (u64, Rc, Vec)>>); #[derive(Serialize, Deserialize, Clone, Debug)] enum GossipOrIncrement { Gossip(Vec), - Increment(u64, i32), + Increment(u64, i64), } #[hydroflow::main] @@ -51,7 +50,7 @@ async fn main() { let from_peer = ports .port("from_peer") - .connect::() + .connect::>() .await .into_source(); @@ -67,7 +66,7 @@ async fn main() { let df = hydroflow_syntax! { next_state = union() - -> fold::<'static>(|| (HashMap::, Vec)>>>::new(), HashSet::new(), 0), |(cur_state, modified_tweets, last_tick): &mut (HashMap<_, _>, HashSet<_>, _), goi| { + -> fold::<'static>(|| (HashMap::, Vec)>>>::new(), HashSet::new(), 0), |(cur_state, modified_tweets, last_tick): &mut (HashMap<_, _>, HashSet<_>, _), goi| { if context.current_tick() != *last_tick { modified_tweets.clear(); } @@ -102,9 +101,9 @@ async fn main() { let mut cur_value = cur_value.as_ref().borrow_mut(); if delta > 0 { - cur_value.0[my_id] += delta as u32; + cur_value.0[my_id] += delta as u64; } else { - cur_value.1[my_id] += (-delta) as u32; + cur_value.1[my_id] += (-delta) as u64; } modified_tweets.insert(counter_id); @@ -119,12 +118,12 @@ async fn main() { -> tee(); source_stream(from_peer) - -> map(|x| deserialize_from_bytes::(&x.unwrap()).unwrap()) + -> map(|x| deserialize_from_bytes::(&x.unwrap().1).unwrap()) -> next_state; source_stream(increment_requests) - -> map(|x| deserialize_from_bytes::(&x.unwrap()).unwrap()) - -> map(|t| GossipOrIncrement::Increment(t.tweet_id, t.likes)) + -> map(|x| deserialize_from_bytes::(&x.unwrap()).unwrap()) + -> map(|t| GossipOrIncrement::Increment(t.key, t.change)) -> next_state; all_peers = source_iter(0..num_replicas) @@ -143,10 +142,13 @@ async fn main() { a.into_iter().map(|(k, rc_array)| { let rc_borrowed = rc_array.as_ref().borrow(); let (pos, neg) = rc_borrowed.deref(); - (k, pos.iter().sum::() as i32 - neg.iter().sum::() as i32) + QueryResponse { + key: k, + value: pos.iter().sum::() as i64 - neg.iter().sum::() as i64 + } }).collect::>() }) - -> map(serialize_to_bytes::<(u64, i32)>) + -> map(serialize_to_bytes::) -> dest_sink(query_responses); }; diff --git a/hydro_cli_examples/examples/pn_counter_delta/main.rs b/topolotree/src/pn_delta.rs similarity index 85% rename from hydro_cli_examples/examples/pn_counter_delta/main.rs rename to topolotree/src/pn_delta.rs index aba606c56db6..9e363b607737 100644 --- a/hydro_cli_examples/examples/pn_counter_delta/main.rs +++ b/topolotree/src/pn_delta.rs @@ -4,23 +4,22 @@ use std::ops::Deref; use std::rc::Rc; use hydroflow::serde::{Deserialize, Serialize}; -use hydroflow::util::cli::{ConnectedDemux, ConnectedDirect, ConnectedSink, ConnectedSource}; +use hydroflow::util::cli::{ + ConnectedDemux, ConnectedDirect, ConnectedSink, ConnectedSource, ConnectedTagged, +}; use hydroflow::util::{deserialize_from_bytes, serialize_to_bytes}; use hydroflow::{hydroflow_syntax, tokio}; -#[derive(Serialize, Deserialize, Clone, Debug)] -struct IncrementRequest { - tweet_id: u64, - likes: i32, -} +mod protocol; +use protocol::*; #[derive(Serialize, Deserialize, Clone, Debug)] enum GossipOrIncrement { - Gossip(Vec<(u64, (usize, u32, u32))>), - Increment(u64, i32), + Gossip(Vec<(u64, (usize, u64, u64))>), + Increment(u64, i64), } -type NextStateType = (u64, bool, Rc, Vec)>>); +type NextStateType = (u64, bool, Rc, Vec)>>); #[hydroflow::main] async fn main() { @@ -51,7 +50,7 @@ async fn main() { let from_peer = ports .port("from_peer") - .connect::() + .connect::>() .await .into_source(); @@ -67,7 +66,7 @@ async fn main() { let df = hydroflow_syntax! { next_state = union() - -> fold::<'static>(|| (HashMap::, Vec)>>>::new(), HashMap::new(), 0), |(cur_state, modified_tweets, last_tick): &mut (HashMap<_, _>, HashMap<_, _>, _), goi| { + -> fold::<'static>(|| (HashMap::, Vec)>>>::new(), HashMap::new(), 0), |(cur_state, modified_tweets, last_tick): &mut (HashMap<_, _>, HashMap<_, _>, _), goi| { if context.current_tick() != *last_tick { modified_tweets.clear(); } @@ -99,9 +98,9 @@ async fn main() { let mut cur_value = cur_value.as_ref().borrow_mut(); if delta > 0 { - cur_value.0[my_id] += delta as u32; + cur_value.0[my_id] += delta as u64; } else { - cur_value.1[my_id] += (-delta) as u32; + cur_value.1[my_id] += (-delta) as u64; } *modified_tweets.entry(counter_id).or_insert(false) |= true; @@ -116,12 +115,12 @@ async fn main() { -> tee(); source_stream(from_peer) - -> map(|x| deserialize_from_bytes::(&x.unwrap()).unwrap()) + -> map(|x| deserialize_from_bytes::(&x.unwrap().1).unwrap()) -> next_state; source_stream(increment_requests) - -> map(|x| deserialize_from_bytes::(&x.unwrap()).unwrap()) - -> map(|t| GossipOrIncrement::Increment(t.tweet_id, t.likes)) + -> map(|x| deserialize_from_bytes::(&x.unwrap()).unwrap()) + -> map(|t| GossipOrIncrement::Increment(t.key, t.change)) -> next_state; all_peers = source_iter(0..num_replicas) @@ -144,10 +143,13 @@ async fn main() { a.into_iter().map(|(k, _, rc_array)| { let rc_borrowed = rc_array.as_ref().borrow(); let (pos, neg) = rc_borrowed.deref(); - (k, pos.iter().sum::() as i32 - neg.iter().sum::() as i32) + QueryResponse { + key: k, + value: pos.iter().sum::() as i64 - neg.iter().sum::() as i64 + } }).collect::>() }) - -> map(serialize_to_bytes::<(u64, i32)>) + -> map(serialize_to_bytes::) -> dest_sink(query_responses); }; diff --git a/topolotree/src/protocol.rs b/topolotree/src/protocol.rs new file mode 100644 index 000000000000..d8a480f59c5d --- /dev/null +++ b/topolotree/src/protocol.rs @@ -0,0 +1,34 @@ +use std::fmt::Debug; + +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)] +pub struct Timestamped { + pub timestamp: isize, + pub data: T, +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)] +pub enum TopolotreeMessage { + Payload(Payload), + Ping(), + Pong(), +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)] +pub struct Payload { + pub key: u64, + pub contents: Timestamped, +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] +pub struct OperationPayload { + pub key: u64, + pub change: i64, +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)] +pub struct QueryResponse { + pub key: u64, + pub value: i64, +} diff --git a/topolotree/src/tests.rs b/topolotree/src/tests.rs new file mode 100644 index 000000000000..ac4112deff96 --- /dev/null +++ b/topolotree/src/tests.rs @@ -0,0 +1,470 @@ +use std::io; +use std::time::Duration; + +use hydroflow::bytes::{Bytes, BytesMut}; +use hydroflow::tokio_stream::wrappers::UnboundedReceiverStream; +use hydroflow::util::multiset::HashMultiSet; +use hydroflow::util::{ + collect_ready_async, deserialize_from_bytes, serialize_to_bytes, unbounded_channel, +}; +use tokio::sync::mpsc::error::SendError; +use tokio::sync::mpsc::UnboundedSender; + +use crate::protocol::{QueryResponse, Timestamped, TopolotreeMessage}; +use crate::{run_topolotree, OperationPayload, Payload}; + +type InputSendResult = Result<(), SendError>>; + +pub fn simulate_input( + input_send: &mut UnboundedSender>, + (id, msg): (u32, TopolotreeMessage), +) -> InputSendResult { + input_send.send(Ok((id, BytesMut::from(&serialize_to_bytes(msg)[..])))) +} + +type OperationSendResult = Result<(), SendError>>; + +pub fn simulate_operation( + input_send: &mut UnboundedSender>, + payload: OperationPayload, +) -> OperationSendResult { + input_send.send(Ok(BytesMut::from(&serialize_to_bytes(payload)[..]))) +} + +pub async fn read_all( + mut output_recv: &mut UnboundedReceiverStream<(u32, Bytes)>, +) -> HashMultiSet<(u32, TopolotreeMessage)> { + let collected = collect_ready_async::, _>(&mut output_recv).await; + collected + .iter() + .map(|(id, bytes)| { + ( + *id, + deserialize_from_bytes::(&bytes[..]).unwrap(), + ) + }) + .collect::>() +} + +pub async fn read_all_query( + mut output_recv: &mut UnboundedReceiverStream, +) -> HashMultiSet { + let collected = collect_ready_async::, _>(&mut output_recv).await; + collected + .iter() + .map(|bytes| deserialize_from_bytes::(&bytes[..]).unwrap()) + .collect::>() +} + +#[hydroflow::test(start_paused = true)] +async fn simple_payload_test() { + let neighbors: Vec = vec![1, 2, 3]; + + let (_operations_tx, operations_rx) = unbounded_channel::>(); + let (mut input_send, input_recv) = unbounded_channel::>(); + let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>(); + let (query_send, mut query_recv) = unbounded_channel::(); + + #[rustfmt::skip] + simulate_input(&mut input_send, (1, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 1, data: 2 } }))).unwrap(); + + let mut flow = run_topolotree( + neighbors, + input_recv, + operations_rx, + output_send, + query_send, + ); + + flow.run_tick(); + + #[rustfmt::skip] + assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ + (2, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 1, data: 2 } })), + (3, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 1, data: 2 } })), + (1, TopolotreeMessage::Ping()), + (2, TopolotreeMessage::Ping()), + (3, TopolotreeMessage::Ping()) + ])); + + #[rustfmt::skip] + assert_eq!(read_all_query(&mut query_recv).await, HashMultiSet::from_iter([ + QueryResponse { key: 123, value: 2 } + ])); +} + +#[hydroflow::test(start_paused = true)] +async fn idempotence_test() { + let neighbors: Vec = vec![1, 2, 3]; + let (_operations_tx, operations_rx) = unbounded_channel::>(); + + let (mut input_send, input_recv) = unbounded_channel::>(); + let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>(); + let (query_send, mut query_recv) = unbounded_channel::(); + + #[rustfmt::skip] + { + simulate_input(&mut input_send, (1, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 4, data: 2 } }))).unwrap(); + simulate_input(&mut input_send, (1, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 4, data: 2 } }))).unwrap(); + }; + + let mut flow = run_topolotree( + neighbors, + input_recv, + operations_rx, + output_send, + query_send, + ); + + flow.run_tick(); + + #[rustfmt::skip] + assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ + (2, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 1, data: 2 } })), + (3, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 1, data: 2 } })), + (1, TopolotreeMessage::Ping()), + (2, TopolotreeMessage::Ping()), + (3, TopolotreeMessage::Ping()) + ])); + + #[rustfmt::skip] + assert_eq!(read_all_query(&mut query_recv).await, HashMultiSet::from_iter([ + QueryResponse { key: 123, value: 2 } + ])); +} + +#[hydroflow::test(start_paused = true)] +async fn backwards_in_time_test() { + let neighbors: Vec = vec![1, 2, 3]; + + let (_operations_tx, operations_rx) = unbounded_channel::>(); + let (mut input_send, input_recv) = unbounded_channel::>(); + let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>(); + let (query_send, mut query_recv) = unbounded_channel::(); + + #[rustfmt::skip] + { + simulate_input(&mut input_send, (1, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 5, data: 7 } }))).unwrap(); + simulate_input(&mut input_send, (1, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 4, data: 2 } }))).unwrap(); + }; + + let mut flow = run_topolotree( + neighbors, + input_recv, + operations_rx, + output_send, + query_send, + ); + + flow.run_tick(); + + #[rustfmt::skip] + assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ + (2, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 1, data: 7 } })), + (3, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 1, data: 7 } })), + (1, TopolotreeMessage::Ping()), + (2, TopolotreeMessage::Ping()), + (3, TopolotreeMessage::Ping()) + ])); + + #[rustfmt::skip] + assert_eq!(read_all_query(&mut query_recv).await, HashMultiSet::from_iter([ + QueryResponse { key: 123, value: 7 } + ])); +} + +#[hydroflow::test(start_paused = true)] +async fn multiple_input_sources_test() { + let neighbors: Vec = vec![1, 2, 3]; + let (_operations_tx, operations_rx) = unbounded_channel::>(); + + let (mut input_send, input_recv) = unbounded_channel::>(); + let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>(); + let (query_send, mut query_recv) = unbounded_channel::(); + + #[rustfmt::skip] + { + simulate_input(&mut input_send, (1, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 5, data: 7 } }))).unwrap(); + simulate_input(&mut input_send, (2, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 4, data: 2 } }))).unwrap(); + }; + + let mut flow = run_topolotree( + neighbors, + input_recv, + operations_rx, + output_send, + query_send, + ); + + flow.run_tick(); + + #[rustfmt::skip] + assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ + (1, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 2, data: 2 } })), + (2, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 2, data: 7 } })), + (3, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 2, data: 9 } })), + (1, TopolotreeMessage::Ping()), + (2, TopolotreeMessage::Ping()), + (3, TopolotreeMessage::Ping()) + ])); + + #[rustfmt::skip] + assert_eq!(read_all_query(&mut query_recv).await, HashMultiSet::from_iter([ + QueryResponse { key: 123, value: 9 } + ])); +} + +#[hydroflow::test(start_paused = true)] +async fn operations_across_ticks() { + let neighbors: Vec = vec![1, 2, 3]; + + let (mut operations_tx, operations_rx) = unbounded_channel::>(); + let (mut input_send, input_recv) = unbounded_channel::>(); + let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>(); + let (query_send, mut query_recv) = unbounded_channel::(); + + let mut flow = run_topolotree( + neighbors, + input_recv, + operations_rx, + output_send, + query_send, + ); + + #[rustfmt::skip] + { + simulate_input(&mut input_send, (1, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 1, data: 2 } }))).unwrap(); + simulate_operation(&mut operations_tx, OperationPayload { key: 123, change: 5 }).unwrap(); + simulate_operation(&mut operations_tx, OperationPayload { key: 123, change: 7 }).unwrap(); + }; + + flow.run_tick(); + + #[rustfmt::skip] + assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ + (1, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 3, data: 12 } })), + (2, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 3, data: 14 } })), + (3, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 3, data: 14 } })), + (1, TopolotreeMessage::Ping()), + (2, TopolotreeMessage::Ping()), + (3, TopolotreeMessage::Ping()) + ])); + + #[rustfmt::skip] + assert_eq!(read_all_query(&mut query_recv).await, HashMultiSet::from_iter([ + QueryResponse { key: 123, value: 14 } + ])); + + #[rustfmt::skip] + { + simulate_operation(&mut operations_tx, OperationPayload { key: 123, change: 1 }).unwrap(); + }; + + flow.run_tick(); + + #[rustfmt::skip] + assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ + (1, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 4, data: 13 } })), + (2, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 4, data: 15 } })), + (3, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 4, data: 15 } })), + ])); + + #[rustfmt::skip] + assert_eq!(read_all_query(&mut query_recv).await, HashMultiSet::from_iter([ + QueryResponse { key: 123, value: 15 } + ])); +} + +#[hydroflow::test(start_paused = true)] +async fn operations_multiple_keys() { + let neighbors: Vec = vec![1, 2, 3]; + + let (mut operations_tx, operations_rx) = unbounded_channel::>(); + let (mut _input_send, input_recv) = unbounded_channel::>(); + let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>(); + let (query_send, mut query_recv) = unbounded_channel::(); + + let mut flow = run_topolotree( + neighbors, + input_recv, + operations_rx, + output_send, + query_send, + ); + + #[rustfmt::skip] + { + simulate_operation(&mut operations_tx, OperationPayload { key: 123, change: 5 }).unwrap(); + simulate_operation(&mut operations_tx, OperationPayload { key: 456, change: 7 }).unwrap(); + }; + + flow.run_tick(); + + #[rustfmt::skip] + assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ + (1, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 1, data: 5 } })), + (2, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 1, data: 5 } })), + (3, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 1, data: 5 } })), + + (1, TopolotreeMessage::Payload(Payload { key: 456, contents: Timestamped { timestamp: 1, data: 7 } })), + (2, TopolotreeMessage::Payload(Payload { key: 456, contents: Timestamped { timestamp: 1, data: 7 } })), + (3, TopolotreeMessage::Payload(Payload { key: 456, contents: Timestamped { timestamp: 1, data: 7 } })), + + (1, TopolotreeMessage::Ping()), + (2, TopolotreeMessage::Ping()), + (3, TopolotreeMessage::Ping()) + ])); + + #[rustfmt::skip] + assert_eq!(read_all_query(&mut query_recv).await, HashMultiSet::from_iter([ + QueryResponse { key: 123, value: 5 }, + QueryResponse { key: 456, value: 7 } + ])); + + #[rustfmt::skip] + { + simulate_operation(&mut operations_tx, OperationPayload { key: 123, change: 1 }).unwrap(); + }; + + flow.run_tick(); + + #[rustfmt::skip] + assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ + (1, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 2, data: 6 } })), + (2, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 2, data: 6 } })), + (3, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 2, data: 6 } })) + ])); + + #[rustfmt::skip] + assert_eq!(read_all_query(&mut query_recv).await, HashMultiSet::from_iter([ + QueryResponse { key: 123, value: 6 } + ])); + + #[rustfmt::skip] + { + simulate_operation(&mut operations_tx, OperationPayload { key: 456, change: 2 }).unwrap(); + }; + + flow.run_tick(); + + #[rustfmt::skip] + assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ + (1, TopolotreeMessage::Payload(Payload { key: 456, contents: Timestamped { timestamp: 2, data: 9 } })), + (2, TopolotreeMessage::Payload(Payload { key: 456, contents: Timestamped { timestamp: 2, data: 9 } })), + (3, TopolotreeMessage::Payload(Payload { key: 456, contents: Timestamped { timestamp: 2, data: 9 } })) + ])); + + #[rustfmt::skip] + assert_eq!(read_all_query(&mut query_recv).await, HashMultiSet::from_iter([ + QueryResponse { key: 456, value: 9 } + ])); +} + +#[hydroflow::test(start_paused = true)] +async fn gossip_multiple_keys() { + let neighbors: Vec = vec![1, 2, 3]; + + let (mut _operations_tx, operations_rx) = unbounded_channel::>(); + let (mut input_send, input_recv) = unbounded_channel::>(); + let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>(); + let (query_send, mut query_recv) = unbounded_channel::(); + + let mut flow = run_topolotree( + neighbors, + input_recv, + operations_rx, + output_send, + query_send, + ); + + #[rustfmt::skip] + { + simulate_input(&mut input_send, (1, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 0, data: 5 } }))).unwrap(); + simulate_input(&mut input_send, (2, TopolotreeMessage::Payload(Payload { key: 456, contents: Timestamped { timestamp: 0, data: 7 } }))).unwrap(); + }; + + flow.run_tick(); + + #[rustfmt::skip] + assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ + (2, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 1, data: 5 } })), + (3, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 1, data: 5 } })), + + (1, TopolotreeMessage::Payload(Payload { key: 456, contents: Timestamped { timestamp: 1, data: 7 } })), + (3, TopolotreeMessage::Payload(Payload { key: 456, contents: Timestamped { timestamp: 1, data: 7 } })), + + (1, TopolotreeMessage::Ping()), + (2, TopolotreeMessage::Ping()), + (3, TopolotreeMessage::Ping()) + ])); + + #[rustfmt::skip] + assert_eq!(read_all_query(&mut query_recv).await, HashMultiSet::from_iter([ + QueryResponse { key: 123, value: 5 }, + QueryResponse { key: 456, value: 7 }, + ])); + + #[rustfmt::skip] + { + simulate_input(&mut input_send, (2, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 0, data: 5 } }))).unwrap(); + simulate_input(&mut input_send, (3, TopolotreeMessage::Payload(Payload { key: 456, contents: Timestamped { timestamp: 0, data: 7 } }))).unwrap(); + }; + + flow.run_tick(); + + #[rustfmt::skip] + assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ + (1, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 2, data: 5 } })), + (3, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 2, data: 10 } })), + + (1, TopolotreeMessage::Payload(Payload { key: 456, contents: Timestamped { timestamp: 2, data: 14 } })), + (2, TopolotreeMessage::Payload(Payload { key: 456, contents: Timestamped { timestamp: 2, data: 7 } })), + ])); + + #[rustfmt::skip] + assert_eq!(read_all_query(&mut query_recv).await, HashMultiSet::from_iter([ + QueryResponse { key: 123, value: 10 }, + QueryResponse { key: 456, value: 14 }, + ])); +} + +#[hydroflow::test(start_paused = true)] +async fn ping_pongs() { + let neighbors: Vec = vec![1]; + + let (mut _operations_tx, operations_rx) = unbounded_channel::>(); + let (mut _input_send, input_recv) = unbounded_channel::>(); + let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>(); + let (query_send, mut _query_recv) = unbounded_channel::(); + + let mut flow = run_topolotree( + neighbors, + input_recv, + operations_rx, + output_send, + query_send, + ); + + flow.run_tick(); + + #[rustfmt::skip] + assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ + (1, TopolotreeMessage::Ping()) + ])); + + tokio::time::advance(Duration::from_millis(500)).await; + + flow.run_tick(); + + #[rustfmt::skip] + assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([])); + + tokio::time::advance(Duration::from_millis(500)).await; + + flow.run_tick(); + + #[rustfmt::skip] + assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ + (1, TopolotreeMessage::Ping()) + ])); +} diff --git a/topolotree/topolotree_latency.hydro.py b/topolotree/topolotree_latency.hydro.py new file mode 100644 index 000000000000..cda984202fde --- /dev/null +++ b/topolotree/topolotree_latency.hydro.py @@ -0,0 +1,352 @@ +import asyncio +from codecs import decode +from typing import List, Optional +import hydro +import json +from pathlib import Path +from aiostream import stream + +import pandas as pd +import numpy as np +import uuid + + +# given a list of IDs for each node in a binary tree, +# organize the nodes into a binary tree and compute the neighbors +# of each node +def get_neighbors_in_binary_tree(flat: List[int]) -> List[List[int]]: + tree = [] + for i in range(len(flat)): + tree.append([]) + if i > 0: + # add the parent + tree[i].append((i - 1) // 2) + if 2 * i + 1 < len(flat): + # add the left child + tree[i].append(2 * i + 1) + if 2 * i + 2 < len(flat): + # add the right child + tree[i].append(2 * i + 2) + return tree + +def get_leaves_in_binary_tree(flat: List[int]) -> List[int]: + tree = get_neighbors_in_binary_tree(flat) + leaves = [] + for i in range(len(tree)): + if len(tree[i]) == 1: + leaves.append(i) + return leaves + + +async def run_experiment( + deployment: hydro.Deployment, + localhost_machine: hydro.LocalhostHost, + profile, + machine_pool, + experiment_id, + summaries_file, + tree_arg, + depth_arg, + clients_arg, + is_gcp, + gcp_vpc, +): + tree_depth = int(depth_arg) + is_tree = tree_arg == "topolo" # or "pn" + + num_replicas = 2**tree_depth - 1 + + num_clients = int(clients_arg) + + print(f"Launching benchmark with protocol {tree_arg}, {num_replicas} replicas, and {num_clients} clients") + + currently_deployed = [] + + def create_machine(): + if len(machine_pool) > 0: + ret = machine_pool.pop() + currently_deployed.append(ret) + return ret + else: + if is_gcp: + out = deployment.GCPComputeEngineHost( + project="hydro-chrisdouglas", + machine_type="n2-standard-2", + image="debian-cloud/debian-11", + region="us-west1-a", + network=gcp_vpc, + ) + else: + out = localhost_machine + currently_deployed.append(out) + return out + + all_nodes = [] + neighbors = get_neighbors_in_binary_tree(list(range(num_replicas))) + cluster = [ + deployment.HydroflowCrate( + src=str( + Path(__file__).parent.absolute() + ), + profile=profile, + bin="topolotree", + args=[str(neighbor) for neighbor in neighbors[i]], + on=create_machine(), + ) if is_tree else deployment.HydroflowCrate( + src=str( + Path(__file__).parent.absolute() + ), + profile=profile, + bin="pn" if tree_arg == "pn" else "pn_delta", + args=[json.dumps([i]), json.dumps([num_replicas])], + on=create_machine(), + ) + for i in range(num_replicas) + ] + + for i in range(num_replicas): + cluster[i].ports.to_peer.tagged(i).send_to( + hydro.demux( + { + j: cluster[j].ports.from_peer.merge() + for j in range(num_replicas) + } + ) + ) + + all_nodes = cluster + + if is_tree: + leaves = get_leaves_in_binary_tree(list(range(num_replicas))) + source = cluster[leaves[0]] + dest = cluster[leaves[-1]] + else: + source = cluster[0] + dest = cluster[-1] + + for node in all_nodes: + if node is not dest: + node.ports.query_responses.send_to(hydro.null()) + if node is not source: + hydro.null().send_to(node.ports.increment_requests) + + latency_measurer = deployment.HydroflowCrate( + src=str(Path(__file__).parent.absolute()), + profile=profile, + bin="latency_measure", + args=[json.dumps([num_clients])], + on=create_machine(), + ) + + latency_measurer.ports.increment_start_node.send_to( + source.ports.increment_requests.merge() + ) + dest.ports.query_responses.send_to(latency_measurer.ports.end_node_query) + + await deployment.deploy() + + print("Deployed!") + + latency = [] + memory_per_node = [[] for _ in range(num_replicas)] + throughput_raw = [] + + throughput = [] + + latency_stdout = await latency_measurer.stdout() + + memories_streams_with_index = [ + stream.map(await node.stdout(), lambda x, i=i: (i, x)) + for i, node in enumerate(all_nodes) + ] + + async def memory_plotter(): + try: + async with stream.merge(*memories_streams_with_index).stream() as merged: + async for node_idx, line in merged: + line_split = line.split(",") + if line_split[0] == "memory": + memory_per_node[node_idx].append(int(line_split[1])) + except asyncio.CancelledError: + return + + memory_plotter_task = asyncio.create_task(memory_plotter()) + + async def latency_plotter(): + try: + async for line in latency_stdout: + line_split = line.split(",") + if line_split[0] == "throughput": + count = int(line_split[1]) + period = float(line_split[2]) + throughput_raw.append([count, period]) + throughput.append(count / period) + elif line_split[0] == "latency": + number = int(line_split[1]) # microseconds + latency.append(number) + except asyncio.CancelledError: + return + + latency_plotter_task = asyncio.create_task(latency_plotter()) + + await deployment.start() + print("Started! Please wait 30 seconds to collect data.") + + await asyncio.sleep(30) + + await latency_measurer.stop() + await asyncio.gather(*[node.stop() for node in all_nodes]) + + memory_plotter_task.cancel() + await memory_plotter_task + + latency_plotter_task.cancel() + await latency_plotter_task + + def summarize(v, kind): + print("mean = ", np.mean(v)) + print("std = ", np.std(v)) + print("min = ", np.min(v)) + print("max = ", np.max(v)) + print("percentile 99 = ", np.percentile(v, 99)) + print("percentile 75 = ", np.percentile(v, 75)) + print("percentile 50 = ", np.percentile(v, 50)) + print("percentile 25 = ", np.percentile(v, 25)) + print("percentile 1 = ", np.percentile(v, 1)) + + summaries_file.write("\n") + summaries_file.write(tree_arg + ",") + summaries_file.write(str(tree_depth) + ",") + summaries_file.write(str(num_clients) + ",") + summaries_file.write(kind + ",") + summaries_file.write(str(np.mean(v)) + ",") + summaries_file.write(str(np.std(v)) + ",") + summaries_file.write(str(np.min(v)) + ",") + summaries_file.write(str(np.max(v)) + ",") + summaries_file.write(str(np.percentile(v, 99)) + ",") + summaries_file.write(str(np.percentile(v, 75)) + ",") + summaries_file.write(str(np.percentile(v, 50)) + ",") + summaries_file.write(str(np.percentile(v, 25)) + ",") + summaries_file.write(str(np.percentile(v, 1))) + summaries_file.flush() + + print("latency:") + summarize(latency, "latency") + + print("throughput:") + summarize(throughput, "throughput") + + init_memory = [memory[0] for memory in memory_per_node] + print("init memory:") + summarize(init_memory, "init_memory") + + final_memory = [memory[-1] for memory in memory_per_node] + print("final memory:") + summarize(final_memory, "final_memory") + + pd.DataFrame(latency).to_csv( + "latency_" + + tree_arg + + "_tree_depth_" + + str(tree_depth) + + "_num_clients_" + + str(num_clients) + + "_" + + experiment_id + + ".csv", + index=False, + header=["latency"], + ) + pd.DataFrame(throughput_raw).to_csv( + "throughput_" + + tree_arg + + "_tree_depth_" + + str(tree_depth) + + "_num_clients_" + + str(num_clients) + + "_" + + experiment_id + + ".csv", + index=False, + header=["count", "period"], + ) + pd.DataFrame(init_memory).to_csv( + "init_memory_" + + tree_arg + + "_tree_depth_" + + str(tree_depth) + + "_num_clients_" + + str(num_clients) + + "_" + + experiment_id + + ".csv", + index=False, + header=["memory"], + ) + pd.DataFrame(final_memory).to_csv( + "final_memory_" + + tree_arg + + "_tree_depth_" + + str(tree_depth) + + "_num_clients_" + + str(num_clients) + + "_" + + experiment_id + + ".csv", + index=False, + header=["memory"], + ) + + for machine in currently_deployed: + machine_pool.append(machine) + + +# hydro deploy toplotree_latency.hydro.py -- local/gcp once/pn/pn_counter_delta DEPTH_OF_TREE NUM_CLIENTS +async def main(args): + # the current timestamp + import datetime + + experiment_id = str(datetime.datetime.now()) + + summaries_file = open(f"summaries_{experiment_id}.csv", "w") + summaries_file.write( + "protocol,tree_depth,num_clients,kind,mean,std,min,max,percentile_99,percentile_75,percentile_50,percentile_25,percentile_1" + ) + + deployment = hydro.Deployment() + localhost = deployment.Localhost() + pool = [] + + network = ( + hydro.GCPNetwork( + project="hydro-chrisdouglas", + ) + if args[0] == "gcp" + else None + ) + + for depth_arg in args[2].split(","): + for tree_arg in args[1].split(","): + for num_clients_arg in args[3].split(","): + await run_experiment( + deployment, + localhost, + "dev" if args[0] == "local" else None, + pool, + experiment_id, + summaries_file, + tree_arg, + depth_arg, + num_clients_arg, + args[0] == "gcp", + network, + ) + + summaries_file.close() + + +if __name__ == "__main__": + import sys + import hydro.async_wrapper + + hydro.async_wrapper.run(main, sys.argv[1:])