From 0348e423c3b7bf10d2e5505ca2c29548d08d6d5e Mon Sep 17 00:00:00 2001 From: David Chu Date: Thu, 19 Dec 2024 14:30:40 -0800 Subject: [PATCH] Paxos snapshot updated --- ...cluster__paxos_bench__tests__paxos_ir.snap | 324 ++++++++++-------- 1 file changed, 178 insertions(+), 146 deletions(-) diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap index 8fdc53efeaa..3eea4f3c1b3 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap @@ -210,7 +210,7 @@ expression: built.ir() input: Tee { inner: : FoldKeyed { init: stageleft :: runtime_support :: fn0_type_hint :: < (usize , usize) > ({ use hydroflow_plus_std :: __staged :: quorum :: * ; move | | (0 , 0) }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , usize) , core :: result :: Result < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > , hydroflow_plus_test :: cluster :: paxos :: Ballot > , () > ({ use hydroflow_plus_std :: __staged :: quorum :: * ; move | accum , value | { if value . is_ok () { accum . 0 += 1 ; } else { accum . 1 += 1 ; } } }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , usize) , core :: result :: Result < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > >) , hydroflow_plus_test :: cluster :: paxos :: Ballot > , () > ({ use hydroflow_plus_std :: __staged :: quorum :: * ; move | accum , value | { if value . is_ok () { accum . 0 += 1 ; } else { accum . 1 += 1 ; } } }), input: Tee { inner: : Chain( CycleSource { @@ -226,9 +226,9 @@ expression: built.ir() }, Tee { inner: : Inspect { - f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , core :: result :: Result < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > , hydroflow_plus_test :: cluster :: paxos :: Ballot >) , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1b | println ! ("Proposer received P1b: {:?}" , p1b) }), + f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , core :: result :: Result < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > >) , hydroflow_plus_test :: cluster :: paxos :: Ballot >) , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1b | println ! ("Proposer received P1b: {:?}" , p1b) }), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , (hydroflow_plus_test :: cluster :: paxos :: Ballot , core :: result :: Result < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > , hydroflow_plus_test :: cluster :: paxos :: Ballot >)) , (hydroflow_plus_test :: cluster :: paxos :: Ballot , core :: result :: Result < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > , hydroflow_plus_test :: cluster :: paxos :: Ballot >) > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , (hydroflow_plus_test :: cluster :: paxos :: Ballot , core :: result :: Result < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > >) , hydroflow_plus_test :: cluster :: paxos :: Ballot >)) , (hydroflow_plus_test :: cluster :: paxos :: Ballot , core :: result :: Result < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > >) , hydroflow_plus_test :: cluster :: paxos :: Ballot >) > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), input: Network { from_location: Cluster( 1, @@ -239,14 +239,14 @@ expression: built.ir() ), to_key: None, serialize_fn: Some( - | (id , data) : (hydroflow_plus :: ClusterId < _ > , (hydroflow_plus_test :: cluster :: paxos :: Ballot , core :: result :: Result < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > , hydroflow_plus_test :: cluster :: paxos :: Ballot >)) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , core :: result :: Result < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > , hydroflow_plus_test :: cluster :: paxos :: Ballot >) > (& data) . unwrap () . into ()) }, + | (id , data) : (hydroflow_plus :: ClusterId < _ > , (hydroflow_plus_test :: cluster :: paxos :: Ballot , core :: result :: Result < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > >) , hydroflow_plus_test :: cluster :: paxos :: Ballot >)) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , core :: result :: Result < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > >) , hydroflow_plus_test :: cluster :: paxos :: Ballot >) > (& data) . unwrap () . into ()) }, ), instantiate_fn: , deserialize_fn: Some( - | res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Acceptor > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , core :: result :: Result < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > , hydroflow_plus_test :: cluster :: paxos :: Ballot >) > (& b) . unwrap ()) }, + | res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Acceptor > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , core :: result :: Result < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > >) , hydroflow_plus_test :: cluster :: paxos :: Ballot >) > (& b) . unwrap ()) }, ), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((hydroflow_plus_test :: cluster :: paxos :: Ballot , hydroflow_plus_test :: cluster :: paxos :: Ballot) , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > >) , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , (hydroflow_plus_test :: cluster :: paxos :: Ballot , core :: result :: Result < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > , hydroflow_plus_test :: cluster :: paxos :: Ballot >)) > ({ use crate :: __staged :: cluster :: paxos :: * ; | ((ballot , max_ballot) , log) | (ballot . proposer_id , (ballot , if ballot == max_ballot { Ok (log) } else { Err (max_ballot) })) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < ((hydroflow_plus_test :: cluster :: paxos :: Ballot , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > >)) , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , (hydroflow_plus_test :: cluster :: paxos :: Ballot , core :: result :: Result < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > >) , hydroflow_plus_test :: cluster :: paxos :: Ballot >)) > ({ use crate :: __staged :: cluster :: paxos :: * ; | ((ballot , max_ballot) , log) | (ballot . proposer_id , (ballot , if ballot == max_ballot { Ok (log) } else { Err (max_ballot) })) }), input: CrossSingleton( CrossSingleton( Tee { @@ -429,19 +429,19 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < (() , ()) , () > ({ use hydroflow_plus :: __staged :: optional :: * ; | (d , _signal) | d }), input: CrossSingleton( Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < std :: vec :: Vec < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | _ | () }), + f: stageleft :: runtime_support :: fn1_type_hint :: < std :: vec :: Vec < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > >) > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | _ | () }), input: Tee { inner: : FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((hydroflow_plus_test :: cluster :: paxos :: Ballot , std :: vec :: Vec < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > >) , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: option :: Option < std :: vec :: Vec < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; move | ((quorum_ballot , quorum_accepted) , my_ballot) | if quorum_ballot == my_ballot { Some (quorum_accepted) } else { None } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < ((hydroflow_plus_test :: cluster :: paxos :: Ballot , std :: vec :: Vec < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > >) >) , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: option :: Option < std :: vec :: Vec < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > >) > > > ({ use crate :: __staged :: cluster :: paxos :: * ; move | ((quorum_ballot , quorum_accepted) , my_ballot) | if quorum_ballot == my_ballot { Some (quorum_accepted) } else { None } }), input: CrossSingleton( Reduce { - f: { let key_fn = stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , std :: vec :: Vec < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > >) , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; | t | t . 0 }) ; move | curr , new | { if key_fn (& new) > key_fn (& * curr) { * curr = new ; } } }, + f: { let key_fn = stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , std :: vec :: Vec < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > >) >) , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; | t | t . 0 }) ; move | curr , new | { if key_fn (& new) > key_fn (& * curr) { * curr = new ; } } }, input: FoldKeyed { - init: stageleft :: runtime_support :: fn0_type_hint :: < std :: vec :: Vec < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | | vec ! [] }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < std :: vec :: Vec < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > > , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | logs , log | { logs . push (log) ; } }), + init: stageleft :: runtime_support :: fn0_type_hint :: < std :: vec :: Vec < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > >) > > ({ use crate :: __staged :: cluster :: paxos :: * ; | | vec ! [] }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < std :: vec :: Vec < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > >) > , (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > >) , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | logs , log | { logs . push (log) ; } }), input: Persist( FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , core :: result :: Result < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > , hydroflow_plus_test :: cluster :: paxos :: Ballot >) , core :: option :: Option < (hydroflow_plus_test :: cluster :: paxos :: Ballot , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > >) > > ({ use hydroflow_plus_std :: __staged :: quorum :: * ; move | (key , res) | match res { Ok (v) => Some ((key , v)) , Err (_) => None , } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , core :: result :: Result < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > >) , hydroflow_plus_test :: cluster :: paxos :: Ballot >) , core :: option :: Option < (hydroflow_plus_test :: cluster :: paxos :: Ballot , (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > >)) > > ({ use hydroflow_plus_std :: __staged :: quorum :: * ; move | (key , res) | match res { Ok (v) => Some ((key , v)) , Err (_) => None , } }), input: AntiJoin( AntiJoin( Tee { @@ -510,7 +510,7 @@ expression: built.ir() input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , hydroflow_plus_test :: cluster :: paxos :: Ballot) , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_ , ballot) | ballot }), input: FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , core :: result :: Result < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > , hydroflow_plus_test :: cluster :: paxos :: Ballot >) , core :: option :: Option < (hydroflow_plus_test :: cluster :: paxos :: Ballot , hydroflow_plus_test :: cluster :: paxos :: Ballot) > > ({ use hydroflow_plus_std :: __staged :: quorum :: * ; move | (key , res) | match res { Ok (_) => None , Err (e) => Some ((key , e)) , } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , core :: result :: Result < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > >) , hydroflow_plus_test :: cluster :: paxos :: Ballot >) , core :: option :: Option < (hydroflow_plus_test :: cluster :: paxos :: Ballot , hydroflow_plus_test :: cluster :: paxos :: Ballot) > > ({ use hydroflow_plus_std :: __staged :: quorum :: * ; move | (key , res) | match res { Ok (_) => None , Err (e) => Some ((key , e)) , } }), input: Tee { inner: , }, @@ -624,10 +624,15 @@ expression: built.ir() acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > >) , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | curr_entry , new_entry | { if let Some (curr_entry_payload) = & mut curr_entry . 1 { let same_values = new_entry . value == curr_entry_payload . value ; let higher_ballot = new_entry . ballot > curr_entry_payload . ballot ; if same_values { curr_entry . 0 += 1 ; } if higher_ballot { curr_entry_payload . ballot = new_entry . ballot ; if ! same_values { curr_entry . 0 = 1 ; curr_entry_payload . value = new_entry . value ; } } } else { * curr_entry = (1 , Some (new_entry)) ; } } }), input: FlatMap { f: stageleft :: runtime_support :: fn1_type_hint :: < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > > ({ use hydroflow_plus :: __staged :: stream :: * ; | d | d }), - input: FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < std :: vec :: Vec < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > > , std :: vec :: Vec < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > > > ({ use hydroflow_plus :: __staged :: optional :: * ; | v | v }), + input: Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > >) , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_checkpoint , log) | log }), input: Tee { - inner: , + inner: : FlatMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < std :: vec :: Vec < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > >) > , std :: vec :: Vec < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > >) > > ({ use hydroflow_plus :: __staged :: optional :: * ; | v | v }), + input: Tee { + inner: , + }, + }, }, }, }, @@ -688,14 +693,14 @@ expression: built.ir() input: DeferTick( Difference( Tee { - inner: : FilterMap { + inner: : FilterMap { f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , usize)) , core :: option :: Option < (usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) > > ({ use hydroflow_plus_std :: __staged :: quorum :: * ; let min__free = 2usize ; move | (key , (success , _error)) | if success >= min__free { Some (key) } else { None } }), input: Tee { - inner: : FoldKeyed { + inner: : FoldKeyed { init: stageleft :: runtime_support :: fn0_type_hint :: < (usize , usize) > ({ use hydroflow_plus_std :: __staged :: quorum :: * ; move | | (0 , 0) }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , usize) , core :: result :: Result < () , hydroflow_plus_test :: cluster :: paxos :: Ballot > , () > ({ use hydroflow_plus_std :: __staged :: quorum :: * ; move | accum , value | { if value . is_ok () { accum . 0 += 1 ; } else { accum . 1 += 1 ; } } }), input: Tee { - inner: : Chain( + inner: : Chain( CycleSource { ident: Ident { sym: cycle_8, @@ -708,7 +713,7 @@ expression: built.ir() ), }, Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: result :: Result < () , hydroflow_plus_test :: cluster :: paxos :: Ballot >)) , ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: result :: Result < () , hydroflow_plus_test :: cluster :: paxos :: Ballot >) > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), input: Network { from_location: Cluster( @@ -730,7 +735,7 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: result :: Result < () , hydroflow_plus_test :: cluster :: paxos :: Ballot >)) > ({ use crate :: __staged :: cluster :: paxos :: * ; | (p2a , max_ballot) | (p2a . ballot . proposer_id , ((p2a . slot , p2a . ballot) , if p2a . ballot == max_ballot { Ok (()) } else { Err (max_ballot) })) }), input: CrossSingleton( Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > >) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), input: Network { from_location: Cluster( @@ -753,7 +758,7 @@ expression: built.ir() input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > >) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | ((slot , ballot) , value) | P2a { ballot , slot , value } }), input: Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > >) , ()) , ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > >) > ({ use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }), input: CrossSingleton( Chain( @@ -772,13 +777,38 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > , ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | p2a | ((p2a . slot , p2a . ballot) , p2a . value) }), input: Chain( FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , (usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > >)) , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f__free = 1usize ; move | ((slot , (count , entry)) , ballot) | { if count <= f__free { Some (P2a { ballot , slot , value : entry . value , }) } else { None } } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (((usize , (usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > >)) , hydroflow_plus_test :: cluster :: paxos :: Ballot) , i64) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f__free = 1usize ; move | (((slot , (count , entry)) , ballot) , checkpoint) | { if count <= f__free && slot as i64 > checkpoint { Some (P2a { ballot , slot , value : entry . value , }) } else { None } } }), input: CrossSingleton( + CrossSingleton( + Tee { + inner: , + }, + Tee { + inner: , + }, + ), Tee { - inner: , - }, - Tee { - inner: , + inner: : Chain( + Reduce { + f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i64 , i64 , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | { if new > * curr { * curr = new ; } } }), + input: Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > >) , i64 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (checkpoint , _log) | { if let Some (checkpoint) = checkpoint { checkpoint as i64 } else { - 1 } } }), + input: Tee { + inner: , + }, + }, + }, + Persist( + Source { + source: Iter( + { use hydroflow_plus :: __staged :: location :: * ; let e__free = { use crate :: __staged :: cluster :: paxos :: * ; - 1_i64 } ; [e__free] }, + ), + location_kind: Cluster( + 0, + ), + }, + ), + ), }, ), }, @@ -787,10 +817,15 @@ expression: built.ir() input: CrossSingleton( Difference( FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < usize , std :: ops :: Range < usize > > ({ use crate :: __staged :: cluster :: paxos :: * ; | max_slot | 0 .. max_slot }), - input: Tee { - inner: , - }, + f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , i64) , std :: ops :: Range < usize > > ({ use crate :: __staged :: cluster :: paxos :: * ; | (max_slot , checkpoint) | (checkpoint + 1) as usize .. max_slot }), + input: CrossSingleton( + Tee { + inner: , + }, + Tee { + inner: , + }, + ), }, Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , (usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > >)) , usize > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , _) | slot }), @@ -836,10 +871,10 @@ expression: built.ir() }, }, Tee { - inner: : FilterMap { + inner: : FilterMap { f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , usize)) , core :: option :: Option < (usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) > > ({ use hydroflow_plus_std :: __staged :: quorum :: * ; let max__free = 3usize ; move | (key , (success , error)) | if (success + error) >= max__free { Some (key) } else { None } }), input: Tee { - inner: , + inner: , }, }, }, @@ -859,10 +894,10 @@ expression: built.ir() input: DeferTick( AntiJoin( Tee { - inner: , + inner: , }, Tee { - inner: , + inner: , }, ), ), @@ -880,7 +915,7 @@ expression: built.ir() input: DeferTick( AntiJoin( Tee { - inner: : Chain( + inner: : Chain( CycleSource { ident: Ident { sym: cycle_10, @@ -893,18 +928,18 @@ expression: built.ir() ), }, Tee { - inner: , + inner: , }, ), }, Map { f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , ()) , (usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) > ({ use hydroflow_plus_std :: __staged :: request_response :: * ; | (key , _) | key }), input: Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , ()) > ({ use crate :: __staged :: cluster :: paxos :: * ; | k | (k , ()) }), input: Difference( Tee { - inner: , + inner: , }, CycleSource { ident: Ident { @@ -934,91 +969,88 @@ expression: built.ir() 1, ), ), - input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > >) , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_ckpnt , log) | log }), - input: Fold { - init: stageleft :: runtime_support :: fn0_type_hint :: < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | | (None , HashMap :: new ()) }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > >) , hydroflow_plus_test :: cluster :: paxos :: CheckpointOrP2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | (prev_checkpoint , log) , checkpoint_or_p2a | { match checkpoint_or_p2a { CheckpointOrP2a :: Checkpoint (new_checkpoint) => { if prev_checkpoint . map (| prev | new_checkpoint > prev) . unwrap_or (true) { for slot in (prev_checkpoint . unwrap_or (0)) .. new_checkpoint { log . remove (& slot) ; } * prev_checkpoint = Some (new_checkpoint) ; } } CheckpointOrP2a :: P2a (p2a) => { if prev_checkpoint . map (| prev | p2a . slot > prev) . unwrap_or (true) && log . get (& p2a . slot) . map (| prev_p2a : & LogValue < _ > | p2a . ballot > prev_p2a . ballot) . unwrap_or (true) { log . insert (p2a . slot , LogValue { ballot : p2a . ballot , value : p2a . value , } ,) ; } } } } }), - input: Persist( - Chain( - FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: CheckpointOrP2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | (p2a , max_ballot) | if p2a . ballot >= max_ballot { Some (CheckpointOrP2a :: P2a (p2a)) } else { None } }), - input: CrossSingleton( - Tee { - inner: , - }, - Tee { - inner: , - }, - ), - }, - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < usize , hydroflow_plus_test :: cluster :: paxos :: CheckpointOrP2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | min_seq | CheckpointOrP2a :: Checkpoint (min_seq) }), - input: Delta( - Reduce { - f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , usize , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | { if new < * curr { * curr = new ; } } }), + input: Fold { + init: stageleft :: runtime_support :: fn0_type_hint :: < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | | (None , HashMap :: new ()) }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > >) , hydroflow_plus_test :: cluster :: paxos :: CheckpointOrP2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | (prev_checkpoint , log) , checkpoint_or_p2a | { match checkpoint_or_p2a { CheckpointOrP2a :: Checkpoint (new_checkpoint) => { if prev_checkpoint . map (| prev | new_checkpoint > prev) . unwrap_or (true) { for slot in (prev_checkpoint . unwrap_or (0)) .. new_checkpoint { log . remove (& slot) ; } * prev_checkpoint = Some (new_checkpoint) ; } } CheckpointOrP2a :: P2a (p2a) => { if prev_checkpoint . map (| prev | p2a . slot > prev) . unwrap_or (true) && log . get (& p2a . slot) . map (| prev_p2a : & LogValue < _ > | p2a . ballot > prev_p2a . ballot) . unwrap_or (true) { log . insert (p2a . slot , LogValue { ballot : p2a . ballot , value : p2a . value , } ,) ; } } } } }), + input: Persist( + Chain( + FilterMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: CheckpointOrP2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | (p2a , max_ballot) | if p2a . ballot >= max_ballot { Some (CheckpointOrP2a :: P2a (p2a)) } else { None } }), + input: CrossSingleton( + Tee { + inner: , + }, + Tee { + inner: , + }, + ), + }, + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < usize , hydroflow_plus_test :: cluster :: paxos :: CheckpointOrP2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | min_seq | CheckpointOrP2a :: Checkpoint (min_seq) }), + input: Delta( + Reduce { + f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , usize , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | { if new < * curr { * curr = new ; } } }), + input: Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > , usize) , usize > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_sender , seq) | seq }), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > , usize) , usize > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_sender , seq) | seq }), - input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > , usize) , ()) , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > , usize) > ({ use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }), - input: CrossSingleton( - Tee { - inner: : ReduceKeyed { - f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , usize , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | curr_seq , seq | { if seq > * curr_seq { * curr_seq = seq ; } } }), - input: Persist( - Network { - from_location: Cluster( - 3, - ), - from_key: None, - to_location: Cluster( - 1, - ), - to_key: None, - serialize_fn: Some( - | (id , data) : (hydroflow_plus :: ClusterId < _ > , usize) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < usize > (& data) . unwrap () . into ()) }, - ), - instantiate_fn: , - deserialize_fn: Some( - | res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < usize > (& b) . unwrap ()) }, - ), - input: FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < usize , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids__free = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > > (__hydroflow_plus_cluster_ids_1) } ; | b | ids__free . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), - input: CycleSource { - ident: Ident { - sym: cycle_0, - }, - location_kind: Cluster( - 3, - ), + f: stageleft :: runtime_support :: fn1_type_hint :: < ((hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > , usize) , ()) , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > , usize) > ({ use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }), + input: CrossSingleton( + Tee { + inner: : ReduceKeyed { + f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , usize , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | curr_seq , seq | { if seq > * curr_seq { * curr_seq = seq ; } } }), + input: Persist( + Network { + from_location: Cluster( + 3, + ), + from_key: None, + to_location: Cluster( + 1, + ), + to_key: None, + serialize_fn: Some( + | (id , data) : (hydroflow_plus :: ClusterId < _ > , usize) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < usize > (& data) . unwrap () . into ()) }, + ), + instantiate_fn: , + deserialize_fn: Some( + | res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < usize > (& b) . unwrap ()) }, + ), + input: FlatMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < usize , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids__free = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > > (__hydroflow_plus_cluster_ids_1) } ; | b | ids__free . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), + input: CycleSource { + ident: Ident { + sym: cycle_0, }, + location_kind: Cluster( + 3, + ), }, }, - ), - }, + }, + ), }, - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < bool , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | _u | () }), - input: FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < usize , core :: option :: Option < bool > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f__free = 1usize ; move | num_received | if num_received == f__free + 1 { Some (true) } else { None } }), - input: Fold { - init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > , usize) , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), - input: Tee { - inner: , - }, + }, + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < bool , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | _u | () }), + input: FilterMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < usize , core :: option :: Option < bool > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f__free = 1usize ; move | num_received | if num_received == f__free + 1 { Some (true) } else { None } }), + input: Fold { + init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > , usize) , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), + input: Tee { + inner: , }, }, }, - ), - }, + }, + ), }, }, - ), - }, - ), + }, + ), + }, ), - }, + ), }, }, CycleSink { @@ -1033,7 +1065,7 @@ expression: built.ir() input: FilterMap { f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: result :: Result < () , hydroflow_plus_test :: cluster :: paxos :: Ballot >) , core :: option :: Option < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , hydroflow_plus_test :: cluster :: paxos :: Ballot) > > ({ use hydroflow_plus_std :: __staged :: quorum :: * ; move | (key , res) | match res { Ok (_) => None , Err (e) => Some ((key , e)) , } }), input: Tee { - inner: , + inner: , }, }, }, @@ -1055,7 +1087,7 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > , usize) , bool > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (sorted_payload , highest_seq) | sorted_payload . seq > * highest_seq }), input: CrossSingleton( Tee { - inner: : Sort( + inner: : Sort( Chain( Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) >) , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), @@ -1085,10 +1117,10 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > , ())) , ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > , ())) > ({ use hydroflow_plus_std :: __staged :: request_response :: * ; | (key , (meta , resp)) | (key , (meta , resp)) }), input: Join( Tee { - inner: , + inner: , }, Tee { - inner: , + inner: , }, ), }, @@ -1112,14 +1144,14 @@ expression: built.ir() ), }, Tee { - inner: : FilterMap { + inner: : FilterMap { f: stageleft :: runtime_support :: fn1_type_hint :: < core :: option :: Option < usize > , core :: option :: Option < usize > > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | v | v }), input: Fold { init: stageleft :: runtime_support :: fn0_type_hint :: < core :: option :: Option < usize > > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | | None }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < core :: option :: Option < usize > , (hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > , core :: option :: Option < usize >) , () > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | filled_slot , (sorted_payload , highest_seq) | { let expected_next_slot = std :: cmp :: max (filled_slot . map (| v | v + 1) . unwrap_or (0) , highest_seq . map (| v | v + 1) . unwrap_or (0) ,) ; if sorted_payload . seq == expected_next_slot { * filled_slot = Some (sorted_payload . seq) ; } } }), input: CrossSingleton( Tee { - inner: , + inner: , }, Chain( Map { @@ -1168,23 +1200,23 @@ expression: built.ir() ), input: DeferTick( Tee { - inner: : FilterMap { + inner: : FilterMap { f: stageleft :: runtime_support :: fn1_type_hint :: < (std :: collections :: hash_map :: HashMap < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > , core :: option :: Option < usize >) , core :: option :: Option < usize > > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (_kv_store , highest_seq) | highest_seq }), input: Fold { init: stageleft :: runtime_support :: fn0_type_hint :: < (std :: collections :: hash_map :: HashMap < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > , core :: option :: Option < usize >) > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | | (HashMap :: new () , None) }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (std :: collections :: hash_map :: HashMap < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > , core :: option :: Option < usize >) , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > , () > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (kv_store , last_seq) , payload | { if let Some (kv) = payload . kv { kv_store . insert (kv . key , kv . value) ; } debug_assert ! (payload . seq == (last_seq . map (| s | s + 1) . unwrap_or (0)) , "Hole in log between seq {:?} and {}" , * last_seq , payload . seq) ; * last_seq = Some (payload . seq) ; } }), input: Persist( Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > , usize) , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (sorted_payload , _) | { sorted_payload } }), input: Filter { f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > , usize) , bool > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (sorted_payload , highest_seq) | sorted_payload . seq <= * highest_seq }), input: CrossSingleton( Tee { - inner: , + inner: , }, Tee { - inner: , + inner: , }, ), }, @@ -1208,7 +1240,7 @@ expression: built.ir() ), input: DeferTick( Tee { - inner: : FilterMap { + inner: : FilterMap { f: stageleft :: runtime_support :: fn1_type_hint :: < (core :: option :: Option < usize > , usize) , core :: option :: Option < usize > > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; let checkpoint_frequency__free = 1usize ; move | (max_checkpointed_seq , new_highest_seq) | if max_checkpointed_seq . map (| m | new_highest_seq - m >= checkpoint_frequency__free) . unwrap_or (true) { Some (new_highest_seq) } else { None } }), input: CrossSingleton( Chain( @@ -1243,7 +1275,7 @@ expression: built.ir() ), ), Tee { - inner: , + inner: , }, ), }, @@ -1258,7 +1290,7 @@ expression: built.ir() 3, ), input: Tee { - inner: , + inner: , }, }, CycleSink { @@ -1339,7 +1371,7 @@ expression: built.ir() input: DeferTick( AntiJoin( Tee { - inner: : Chain( + inner: : Chain( CycleSource { ident: Ident { sym: cycle_2, @@ -1352,7 +1384,7 @@ expression: built.ir() ), }, Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > , ((u32 , u32) , core :: result :: Result < () , () >)) , ((u32 , u32) , core :: result :: Result < () , () >) > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), input: Network { from_location: Cluster( @@ -1375,7 +1407,7 @@ expression: built.ir() input: FilterMap { f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | payload | payload . kv }), input: Tee { - inner: , + inner: , }, }, }, @@ -1385,14 +1417,14 @@ expression: built.ir() ), }, Tee { - inner: : FilterMap { + inner: : FilterMap { f: stageleft :: runtime_support :: fn1_type_hint :: < ((u32 , u32) , (usize , usize)) , core :: option :: Option < (u32 , u32) > > ({ use hydroflow_plus_std :: __staged :: quorum :: * ; let min__free = 2usize ; move | (key , (success , _error)) | if success >= min__free { Some (key) } else { None } }), input: Tee { - inner: : FoldKeyed { + inner: : FoldKeyed { init: stageleft :: runtime_support :: fn0_type_hint :: < (usize , usize) > ({ use hydroflow_plus_std :: __staged :: quorum :: * ; move | | (0 , 0) }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , usize) , core :: result :: Result < () , () > , () > ({ use hydroflow_plus_std :: __staged :: quorum :: * ; move | accum , value | { if value . is_ok () { accum . 0 += 1 ; } else { accum . 1 += 1 ; } } }), input: Tee { - inner: , + inner: , }, }, }, @@ -1412,7 +1444,7 @@ expression: built.ir() FlatMap { f: stageleft :: runtime_support :: fn1_type_hint :: < () , std :: iter :: Map < std :: ops :: Range < usize > , _ > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let CLUSTER_SELF_ID__free = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_bench :: Client > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; let num_clients_per_node__free = 1usize ; move | _ | (0 .. num_clients_per_node__free) . map (move | i | ((CLUSTER_SELF_ID__free . raw_id * (num_clients_per_node__free as u32)) + i as u32 , 0)) }), input: Tee { - inner: : Reduce { + inner: : Reduce { f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | * curr = new }), input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | () }), @@ -1428,8 +1460,8 @@ expression: built.ir() Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , u32) , (u32 , u32) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | payload | (payload . 0 , payload . 1 + 1) }), input: Tee { - inner: : Tee { - inner: , + inner: : Tee { + inner: , }, }, }, @@ -1451,7 +1483,7 @@ expression: built.ir() input: Chain( Chain( Tee { - inner: : CycleSource { + inner: : CycleSource { ident: Ident { sym: cycle_3, }, @@ -1468,16 +1500,16 @@ expression: built.ir() input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < () , tokio :: time :: Instant > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | Instant :: now () }), input: Tee { - inner: , + inner: , }, }, }, ), Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , u32) , (usize , tokio :: time :: Instant) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (key , _prev_count) | (key as usize , Instant :: now ()) }), input: Tee { - inner: , + inner: , }, }, }, @@ -1504,10 +1536,10 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , (tokio :: time :: Instant , tokio :: time :: Instant)) , core :: option :: Option < core :: time :: Duration > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (_virtual_id , (prev_time , curr_time)) | Some (curr_time . duration_since (prev_time)) }), input: Join( Tee { - inner: , + inner: , }, Tee { - inner: , + inner: , }, ), }, @@ -1515,7 +1547,7 @@ expression: built.ir() Map { f: stageleft :: runtime_support :: fn1_type_hint :: < tokio :: time :: Instant , core :: option :: Option < core :: time :: Duration > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | None }), input: Tee { - inner: : Source { + inner: : Source { source: Stream( { use hydroflow_plus :: __staged :: location :: * ; let interval__free = { use crate :: __staged :: cluster :: paxos_bench :: * ; Duration :: from_secs (1) } ; tokio_stream :: wrappers :: IntervalStream :: new (tokio :: time :: interval (interval__free)) }, ), @@ -1545,7 +1577,7 @@ expression: built.ir() init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , (u32 , u32) , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { - inner: , + inner: , }, }, Map { @@ -1556,7 +1588,7 @@ expression: built.ir() init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , tokio :: time :: Instant , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { - inner: , + inner: , }, }, }, @@ -1568,7 +1600,7 @@ expression: built.ir() Map { f: stageleft :: runtime_support :: fn1_type_hint :: < tokio :: time :: Instant , (usize , bool) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | (0 , true) }), input: Tee { - inner: , + inner: , }, }, ), @@ -1579,7 +1611,7 @@ expression: built.ir() Map { f: stageleft :: runtime_support :: fn1_type_hint :: < tokio :: time :: Instant , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), input: Tee { - inner: , + inner: , }, }, ),