Skip to content

Commit

Permalink
fix(paxos): Proposers don't re-commit entries before the latest check…
Browse files Browse the repository at this point in the history
…point (#1615)

Paxos proposers assign no-ops to holes in the log after receiving a
quorum of p1bs. Acceptors delete log entries before the latest
checkpoint, which means they will show up as holes in the proposer. The
proposer should not propose any operations before the most recent
checkpoint in a quorum of p1bs.
  • Loading branch information
davidchuyaya authored Jan 9, 2025
1 parent b1514be commit 487e8fa
Show file tree
Hide file tree
Showing 2 changed files with 206 additions and 166 deletions.
60 changes: 40 additions & 20 deletions hydro_test/src/cluster/paxos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,11 @@ unsafe fn leader_election<'a, L: Clone + Debug + Serialize + DeserializeOwned>(
i_am_leader_check_timeout: u64,
i_am_leader_check_timeout_delay_multiplier: usize,
p_received_p2b_ballots: Stream<Ballot, Cluster<'a, Proposer>, Unbounded, NoOrder>,
a_log: Singleton<L, Tick<Cluster<'a, Acceptor>>, Bounded>,
a_log: Singleton<(Option<usize>, L), Tick<Cluster<'a, Acceptor>>, Bounded>,
) -> (
Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
Optional<(), Tick<Cluster<'a, Proposer>>, Bounded>,
Stream<L, Tick<Cluster<'a, Proposer>>, Bounded, NoOrder>,
Stream<(Option<usize>, L), Tick<Cluster<'a, Proposer>>, Bounded, NoOrder>,
Singleton<Ballot, Tick<Cluster<'a, Acceptor>>, Bounded>,
) {
let (p1b_fail_complete, p1b_fail) =
Expand Down Expand Up @@ -372,11 +372,11 @@ unsafe fn p_leader_heartbeat<'a>(
fn acceptor_p1<'a, L: Serialize + DeserializeOwned + Clone>(
acceptor_tick: &Tick<Cluster<'a, Acceptor>>,
p_to_acceptors_p1a: Stream<Ballot, Tick<Cluster<'a, Acceptor>>, Bounded, NoOrder>,
a_log: Singleton<L, Tick<Cluster<'a, Acceptor>>, Bounded>,
a_log: Singleton<(Option<usize>, L), Tick<Cluster<'a, Acceptor>>, Bounded>,
proposers: &Cluster<'a, Proposer>,
) -> (
Singleton<Ballot, Tick<Cluster<'a, Acceptor>>, Bounded>,
Stream<(Ballot, Result<L, Ballot>), Cluster<'a, Proposer>, Unbounded, NoOrder>,
Stream<(Ballot, Result<(Option<usize>, L), Ballot>), Cluster<'a, Proposer>, Unbounded, NoOrder>,
) {
let a_max_ballot = p_to_acceptors_p1a
.clone()
Expand Down Expand Up @@ -414,7 +414,7 @@ fn acceptor_p1<'a, L: Serialize + DeserializeOwned + Clone>(
fn p_p1b<'a, P: Clone + Serialize + DeserializeOwned>(
proposer_tick: &Tick<Cluster<'a, Proposer>>,
a_to_proposers_p1b: Stream<
(Ballot, Result<P, Ballot>),
(Ballot, Result<(Option<usize>, P), Ballot>),
Cluster<'a, Proposer>,
Unbounded,
NoOrder,
Expand All @@ -424,7 +424,7 @@ fn p_p1b<'a, P: Clone + Serialize + DeserializeOwned>(
f: usize,
) -> (
Optional<(), Tick<Cluster<'a, Proposer>>, Bounded>,
Stream<P, Tick<Cluster<'a, Proposer>>, Bounded, NoOrder>,
Stream<(Option<usize>, P), Tick<Cluster<'a, Proposer>>, Bounded, NoOrder>,
Stream<Ballot, Timestamped<Cluster<'a, Proposer>>, Unbounded, NoOrder>,
) {
let (quorums, fails) = collect_quorum_with_response(
Expand Down Expand Up @@ -472,7 +472,7 @@ fn p_p1b<'a, P: Clone + Serialize + DeserializeOwned>(
#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
fn recommit_after_leader_election<'a, P: PaxosPayload>(
accepted_logs: Stream<
HashMap<usize, LogValue<P>>,
(Option<usize>, HashMap<usize, LogValue<P>>),
Tick<Cluster<'a, Proposer>>,
Bounded,
NoOrder,
Expand All @@ -483,7 +483,12 @@ fn recommit_after_leader_election<'a, P: PaxosPayload>(
Stream<P2a<P>, Tick<Cluster<'a, Proposer>>, Bounded, NoOrder>,
Optional<usize, Tick<Cluster<'a, Proposer>>, Bounded>,
) {
let p_p1b_max_checkpoint = accepted_logs
.clone()
.map(q!(|(checkpoint, _log)| checkpoint))
.max();
let p_p1b_highest_entries_and_count = accepted_logs
.map(q!(|(_checkpoint, log)| log))
.flatten_unordered() // Convert HashMap log back to stream
.fold_keyed_commutative::<(usize, Option<LogValue<P>>), _, _>(q!(|| (0, None)), q!(|curr_entry, new_entry| {
if let Some(curr_entry_payload) = &mut curr_entry.1 {
Expand All @@ -510,16 +515,20 @@ fn recommit_after_leader_election<'a, P: PaxosPayload>(
let p_log_to_try_commit = p_p1b_highest_entries_and_count
.clone()
.cross_singleton(p_ballot.clone())
.filter_map(q!(move |((slot, (count, entry)), ballot)| {
if count <= f {
Some(P2a {
ballot,
slot,
value: entry.value,
})
} else {
None
.cross_singleton(p_p1b_max_checkpoint.clone())
.filter_map(q!(move |(((slot, (count, entry)), ballot), checkpoint)| {
if count > f {
return None;
} else if let Some(checkpoint) = checkpoint {
if slot <= checkpoint {
return None;
}
}
Some(P2a {
ballot,
slot,
value: entry.value,
})
}));
let p_max_slot = p_p1b_highest_entries_and_count
.clone()
Expand All @@ -530,7 +539,14 @@ fn recommit_after_leader_election<'a, P: PaxosPayload>(
.map(q!(|(slot, _)| slot));
let p_log_holes = p_max_slot
.clone()
.flat_map_ordered(q!(|max_slot| 0..max_slot))
.zip(p_p1b_max_checkpoint)
.flat_map_ordered(q!(|(max_slot, checkpoint)| {
if let Some(checkpoint) = checkpoint {
(checkpoint + 1)..max_slot
} else {
0..max_slot
}
}))
.filter_not_in(p_proposed_slots)
.cross_singleton(p_ballot.clone())
.map(q!(|(slot, ballot)| P2a {
Expand Down Expand Up @@ -564,7 +580,7 @@ unsafe fn sequence_payload<'a, P: PaxosPayload, R>(
p_is_leader: Optional<(), Tick<Cluster<'a, Proposer>>, Bounded>,

p_relevant_p1bs: Stream<
HashMap<usize, LogValue<P>>,
(Option<usize>, HashMap<usize, LogValue<P>>),
Tick<Cluster<'a, Proposer>>,
Bounded,
NoOrder,
Expand All @@ -574,7 +590,11 @@ unsafe fn sequence_payload<'a, P: PaxosPayload, R>(
a_max_ballot: Singleton<Ballot, Tick<Cluster<'a, Acceptor>>, Bounded>,
) -> (
Stream<(usize, Option<P>), Cluster<'a, Proposer>, Unbounded, NoOrder>,
Singleton<HashMap<usize, LogValue<P>>, Timestamped<Cluster<'a, Acceptor>>, Unbounded>,
Singleton<
(Option<usize>, HashMap<usize, LogValue<P>>),
Timestamped<Cluster<'a, Acceptor>>,
Unbounded,
>,
Stream<Ballot, Cluster<'a, Proposer>, Unbounded, NoOrder>,
) {
let (p_log_to_recommit, p_max_slot) =
Expand Down Expand Up @@ -635,7 +655,7 @@ unsafe fn sequence_payload<'a, P: PaxosPayload, R>(
p_to_replicas
.map(q!(|((slot, _ballot), (value, _))| (slot, value)))
.drop_timestamp(),
a_log.map(q!(|(_ckpnt, log)| log)),
a_log,
fails.map(q!(|(_, ballot)| ballot)).drop_timestamp(),
)
}
Expand Down
Loading

0 comments on commit 487e8fa

Please sign in to comment.