Skip to content

Commit

Permalink
simplify multipaxos quite a lot and remove some inconsistencies
Browse files Browse the repository at this point in the history
  • Loading branch information
haaase committed Jan 16, 2025
1 parent e9f2dd7 commit 9075f47
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 102 deletions.
8 changes: 5 additions & 3 deletions Modules/RDTs/.jvm/src/test/scala/MultiPaxosSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import org.scalacheck.Arbitrary.arbitrary
import org.scalacheck.Prop.propBoolean
import org.scalacheck.{Arbitrary, Gen, Prop}
import rdts.base.{Lattice, LocalUid}
import rdts.datatypes.experiments.protocols.MultipaxosPhase.LeaderElection
import rdts.datatypes.experiments.protocols.{MultiPaxos, Participants}

import scala.util.Try
Expand All @@ -10,7 +11,7 @@ extension [A](s1: Seq[A])
def isPrefix(s2: Seq[A]): Boolean = s1.indexOfSlice(s2) == 0

class MultiPaxosSuite extends munit.ScalaCheckSuite {
// override def scalaCheckInitialSeed = "6Y9lv63LraBdJTHwHFLm3ItFEF7sm6Ok2D3S22VQcTO="
// override def scalaCheckInitialSeed = "UyoN51e59jSOQrryl2a6RnndAVJ0m290IRZ8JjXL9rJ="

override def scalaCheckTestParameters =
super.scalaCheckTestParameters
Expand All @@ -19,7 +20,7 @@ class MultiPaxosSuite extends munit.ScalaCheckSuite {
.withMaxSize(500)

property("Multipaxos")(MultiPaxosSpec[Int](
logging = false,
logging = true,
minDevices = 3,
maxDevices = 5,
proposeFreq = 5,
Expand Down Expand Up @@ -88,7 +89,8 @@ class MultiPaxosSpec[A: Arbitrary](
log1
)) :| s"every log is a prefix of another log or vice versa, but we had:\n${multipaxos1.rounds.counter}${multipaxos1.read}\n${multipaxos2.rounds.counter}${multipaxos2.read}" &&
// ((multipaxos1.rounds.counter != multipaxos2.rounds.counter) || multipaxos1.leader.isEmpty || multipaxos2.leader.isEmpty || (multipaxos1.leader == multipaxos2.leader)) :| s"There can only ever be one leader for a given epoch but we got:\n${multipaxos1.leader}\n${multipaxos2.leader}" &&
(log1.isPrefix(oldLog1) && log2.isPrefix(oldLog2)) :| s"logs never shrink"
(log1.isPrefix(oldLog1) && log2.isPrefix(oldLog2)) :| s"logs never shrink" &&
(multipaxos1.phase != LeaderElection || multipaxos1.leader.isEmpty) && (multipaxos1.phase == LeaderElection || multipaxos1.leader.nonEmpty) :| s"leader is only undefined during leader election, got ${multipaxos1.leader} in phase ${multipaxos1.phase}"
}

case class Propose(proposer: LocalUid, value: A) extends ACommand(proposer):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,139 +1,85 @@
package rdts.datatypes.experiments.protocols

import rdts.base.{Bottom, Lattice, LocalUid, Uid}
import rdts.datatypes.Epoch
import rdts.datatypes.experiments.protocols.Paxos.given
import rdts.datatypes.{Epoch, LastWriterWins}

enum MultipaxosPhase:
case LeaderElection
case Voting
case Idle

case class MultiPaxosRound[A](
leader: LastWriterWins[Option[Uid]] = LastWriterWins.empty,
phase: LastWriterWins[MultipaxosPhase] = LastWriterWins.fallback(MultipaxosPhase.LeaderElection),
paxos: Paxos[A] = Paxos[A]()
)

case class MultiPaxos[A](
rounds: Epoch[MultiPaxosRound[A]] = Epoch.empty[MultiPaxosRound[A]],
rounds: Epoch[Paxos[A]] = Epoch.empty[Paxos[A]],
log: Map[Long, A] = Map.empty
):

// private helper functions
private def currentPaxos = rounds.value.paxos
private def currentPaxos = rounds.value

// public API
def leader: Option[Uid] = rounds.value.leader.value
def paxosLeader(using Participants): Option[Uid] = currentPaxos.currentRound match
def leader(using Participants): Option[Uid] = currentPaxos.currentRound match
case Some((_, PaxosRound(leaderElection, _))) => leaderElection.result
case None => None

def phase: MultipaxosPhase = rounds.value.phase.value
def read: List[A] = log.toList.sortBy(_._1).map(_._2)
def phase(using Participants): MultipaxosPhase =
currentPaxos.currentRound match
case None => MultipaxosPhase.LeaderElection
case Some((_, PaxosRound(leaderElection, _))) if leaderElection.result.isEmpty => MultipaxosPhase.LeaderElection
case Some((_, PaxosRound(leaderElection, proposals)))
if leaderElection.result.nonEmpty && proposals.votes.nonEmpty => MultipaxosPhase.Voting
case Some((_, PaxosRound(leaderElection, proposals)))
if leaderElection.result.nonEmpty && proposals.votes.isEmpty => MultipaxosPhase.Idle
case _ => throw new Error("Inconsistent Paxos State")

def read: List[A] = log.toList.sortBy(_._1).map(_._2)

def startLeaderElection(using LocalUid, Participants): MultiPaxos[A] =
MultiPaxos(
rounds =
// transition to new epoch
rounds.epocheWrite(MultiPaxosRound(
leader = rounds.value.leader.write(None), // set leader to none
paxos = Paxos().phase1a, // start new Paxos round with self proposed as leader
phase = rounds.value.phase.write(MultipaxosPhase.LeaderElection)
))
)
MultiPaxos(rounds.write(currentPaxos.phase1a)) // start new Paxos round with self proposed as leader

def proposeIfLeader(value: A)(using LocalUid, Participants): MultiPaxos[A] =
val afterProposal = currentPaxos.phase2a(value)
// check if proposing does anything, otherwise return empty delta
if currentPaxos.subsumes(afterProposal) then
MultiPaxos()
else
MultiPaxos(
rounds = rounds.write(MultiPaxosRound(
paxos = afterProposal, // phase 2a already checks if I am the leader
phase = rounds.value.phase.write(MultipaxosPhase.Voting)
))
)
MultiPaxos(rounds = rounds.write(afterProposal)) // phase 2a already checks if I am the leader

def upkeep(using LocalUid, Participants): MultiPaxos[A] = {
// perform upkeep in Paxos
val deltaPaxos = currentPaxos.upkeep()
val newPaxos = currentPaxos.merge(deltaPaxos)

rounds.value.phase.value match
case MultipaxosPhase.LeaderElection =>
// we are in a leader election
// check if this process was elected as a leader
newPaxos.currentRound match
case Some((_, PaxosRound(leaderElection, _)))
if leaderElection.result.nonEmpty // if leaderElection.result == Some(replicaId)
=>
// there is a result
MultiPaxos(rounds =
rounds.write(MultiPaxosRound(
leader = rounds.value.leader.write(leaderElection.result),
paxos = deltaPaxos,
phase = rounds.value.phase.write(MultipaxosPhase.Idle)
))
)
case _ =>
// there is no result, return upkeep result
MultiPaxos(rounds =
rounds.write(MultiPaxosRound(
paxos = deltaPaxos
))
)
case MultipaxosPhase.Voting =>
// we are voting on proposals
// check if there is a decision
(newPaxos.decision, newPaxos.decidedLeaderElection) match
case (Some(decision), Some((ballotNum, leaderElection))) =>
// append log
val newLog = Map(rounds.counter -> decision)
// create new Paxos where leader is already elected
val newPaxos = Paxos(rounds =
Map(ballotNum -> PaxosRound(
leaderElection = leaderElection,
proposals = Voting[A]()
))
)
// return new Multipaxos with: appended log, set leader, phase as idle,
MultiPaxos(
rounds = rounds.epocheWrite(MultiPaxosRound(
leader = rounds.value.leader.write(paxosLeader),
phase = rounds.value.phase.write(MultipaxosPhase.Idle),
paxos = newPaxos
)),
log = newLog
)
case _ =>
// no decision yet, just send upkeep result
MultiPaxos(rounds =
rounds.write(MultiPaxosRound(
paxos = deltaPaxos
))
)
case MultipaxosPhase.Idle =>
// nothing to do
MultiPaxos()
(newPaxos.decision, newPaxos.newestBallotWithLeader) match
case (Some(decision), Some((ballotNum, PaxosRound(leaderElection, _)))) =>
// we are voting on proposals and there is a decision

val newLog = Map(rounds.counter -> decision) // append log
// create new Paxos where leader is already elected
val newPaxos = Paxos(rounds =
Map(ballotNum -> PaxosRound(
leaderElection = leaderElection,
proposals = Voting[A]()
))
)
// return new Multipaxos with: appended log
MultiPaxos(
rounds = rounds.epocheWrite(newPaxos),
log = newLog
)
case _ =>
// nothing to do, return upkeep result
MultiPaxos(rounds = rounds.write(deltaPaxos))
}

override def toString: String =
lazy val s = s"MultiPaxos(epoch: ${rounds.counter}, phase: $phase, log: $read)"
lazy val s = s"MultiPaxos(epoch: ${rounds.counter}, log: $read)"
s

object MultiPaxos:
given [A]: Bottom[MultiPaxosRound[A]] = Bottom.provide(MultiPaxosRound())
given [A]: Lattice[MultiPaxosRound[A]] = Lattice.derived

// for the log
given [A]: Lattice[Map[Long, A]] =
given Lattice[A] = Lattice.assertEquals
Lattice.mapLattice

given [A]: Lattice[MultiPaxos[A]] = Lattice.derived

// given consensus: Consensus[MultiPaxos] =
// extension [A](c: MultiPaxos[A])
// override def propose
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package rdts.datatypes.experiments.protocols

import rdts.base.LocalUid.replicaId
import rdts.base.{Lattice, LocalUid, Uid}
import rdts.base.{Bottom, Lattice, LocalUid, Uid}
import rdts.datatypes.LastWriterWins
import rdts.datatypes.experiments.protocols.Paxos.given
import rdts.datatypes.experiments.protocols.{Consensus, Participants}
Expand All @@ -24,7 +24,7 @@ case class Paxos[A](
// query functions
def nextBallotNum(using LocalUid): BallotNum =
val maxCounter: Long = rounds
.filter((b, _) => b.uid == replicaId)
// .filter((b, _) => b.uid == replicaId)
.map((b, _) => b.counter)
.maxOption
.getOrElse(-1)
Expand Down Expand Up @@ -100,6 +100,8 @@ object Paxos:
given [A]: Lattice[PaxosRound[A]] = Lattice.derived
given l[A]: Lattice[Paxos[A]] = Lattice.derived

given [A]: Bottom[Paxos[A]] = Bottom.provide(Paxos())

given [A]: Ordering[(BallotNum, PaxosRound[A])] with
override def compare(x: (BallotNum, PaxosRound[A]), y: (BallotNum, PaxosRound[A])): Int = (x, y) match
case ((x, _), (y, _)) => Ordering[BallotNum].compare(x, y)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ class MultiPaxosTest extends munit.FunSuite {
val id1 = LocalUid.gen()
val id2 = LocalUid.gen()
val id3 = LocalUid.gen()
val id4 = LocalUid.gen()

given Participants = Participants(Set(id1, id2, id3).map(_.uid))

val emptyPaxosObject: MultiPaxos[Int] = MultiPaxos()
test("happy path") {
var testPaxosObject = emptyPaxosObject

assertEquals(testPaxosObject.paxosLeader, None)
assertEquals(testPaxosObject.leader, None)
assertEquals(testPaxosObject.phase, MultipaxosPhase.LeaderElection, "multipaxos starts in leader election phase")

val proposeValue = 1
Expand All @@ -27,7 +28,7 @@ class MultiPaxosTest extends munit.FunSuite {
testPaxosObject = testPaxosObject.merge(testPaxosObject.upkeep(using id3))
testPaxosObject = testPaxosObject.merge(testPaxosObject.upkeep(using id1))

assertEquals(testPaxosObject.paxosLeader, Some(id1.uid), "id1 should be the leader")
assertEquals(testPaxosObject.leader, Some(id1.uid), "id1 should be the leader")

// replica 2 tries to write
val afterwrite = testPaxosObject.merge(testPaxosObject.proposeIfLeader(2)(using id2))
Expand Down Expand Up @@ -56,14 +57,14 @@ class MultiPaxosTest extends munit.FunSuite {
testPaxosObject = testPaxosObject.merge(testPaxosObject.startLeaderElection(using id3))
assertEquals(testPaxosObject.log.values.toList.sorted, List(1, 12), "log survives new leader election")
assertEquals(testPaxosObject.phase, MultipaxosPhase.LeaderElection)
assertEquals(testPaxosObject.paxosLeader, None)
assertEquals(testPaxosObject.leader, None)
testPaxosObject = testPaxosObject.merge(testPaxosObject.upkeep(using id3))
assertEquals(testPaxosObject.phase, MultipaxosPhase.LeaderElection)
assertEquals(testPaxosObject.paxosLeader, None)
assertEquals(testPaxosObject.leader, None)
testPaxosObject = testPaxosObject.merge(testPaxosObject.upkeep(using id2))
testPaxosObject = testPaxosObject.merge(testPaxosObject.upkeep(using id3))
assertEquals(testPaxosObject.phase, MultipaxosPhase.Idle)
assertEquals(testPaxosObject.paxosLeader, Some(id3.uid))
assertEquals(testPaxosObject.leader, Some(id3.uid))
}

test("conflicting proposals") {
Expand Down Expand Up @@ -92,6 +93,43 @@ class MultiPaxosTest extends munit.FunSuite {
assert(testPaxosObject.log.values.head == 1 || testPaxosObject.log.values.head == 2)
}

// test("leaders consistent") {
// var rep1 = emptyPaxosObject // Te
// var rep2 = emptyPaxosObject // Se
// var rep3 = emptyPaxosObject // Ve
// var rep4 = emptyPaxosObject // ue
//
// // id1 leader election
// rep1 = rep1.merge(rep1.startLeaderElection(using id1))
// rep1 = rep1.merge(rep1.upkeep(using id1))
//
// // merge id3 <- id1
// rep3 = rep3.merge(rep1)
// rep3 = rep3.merge(rep3.upkeep(using id3))
//
// // 3. merge id2 <- id3
// rep2 = rep2.merge(rep3)
// rep2 = rep2.merge(rep2.upkeep(using id2))
//
// // 4. id2 leader election
// rep2 = rep2.merge(rep2.startLeaderElection(using id2))
// rep2 = rep2.merge(rep2.upkeep(using id2))
//
// // 5. id1 <- id2
// rep1 = rep1.merge(rep2)
// rep1 = rep1.merge(rep1.upkeep(using id1))
//
// // 6. id4 <- id1
// rep4 = rep4.merge(rep1)
// rep4 = rep4.merge(rep4.upkeep(using id4))
//
// assertEquals(rep2.leader, rep2.paxosLeader)
// assertEquals(rep3.leader, rep3.paxosLeader)
// assertEquals(rep1.leader, rep1.paxosLeader)
// assertEquals(rep4.leader, rep4.paxosLeader)
//
// }

test("counterexample?") {
var rep1 = emptyPaxosObject
var rep2 = emptyPaxosObject
Expand Down

0 comments on commit 9075f47

Please sign in to comment.