Skip to content

Commit

Permalink
Fixup merge
Browse files Browse the repository at this point in the history
  • Loading branch information
coreyoconnor committed Feb 17, 2022
1 parent 2d30d81 commit c659dfb
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 107 deletions.
18 changes: 9 additions & 9 deletions project/Publish.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ object Publish extends AutoPlugin {

val defaultPublishTo = settingKey[File]("Default publish directory")

override def trigger = allRequirements
override def trigger = allRequirements
override def requires = sbtrelease.ReleasePlugin

override lazy val projectSettings = Seq(
Expand All @@ -27,8 +27,7 @@ object Publish extends AutoPlugin {
homepage := Some(url("https://github.com/akka/akka-persistence-dynamodb")),
publishMavenStyle := true,
pomIncludeRepository := { x => false },
defaultPublishTo := crossTarget.value / "repository",
)
defaultPublishTo := crossTarget.value / "repository")

def akkaPomExtra = {
<developers>
Expand All @@ -41,15 +40,16 @@ object Publish extends AutoPlugin {
</developers>
}

private def akkaPublishTo = Def.setting {
sonatypeRepo(version.value) orElse localRepo(defaultPublishTo.value)
}
private def akkaPublishTo =
Def.setting {
sonatypeRepo(version.value).orElse(localRepo(defaultPublishTo.value))
}

private def sonatypeRepo(version: String): Option[Resolver] =
Option(sys.props("publish.maven.central")) filter (_.toLowerCase == "true") map { _ =>
Option(sys.props("publish.maven.central")).filter(_.toLowerCase == "true").map { _ =>
val nexus = "https://oss.sonatype.org/"
if (version endsWith "-SNAPSHOT") "snapshots" at nexus + "content/repositories/snapshots"
else "releases" at nexus + "service/local/staging/deploy/maven2"
if (version.endsWith("-SNAPSHOT")) "snapshots".at(nexus + "content/repositories/snapshots")
else "releases".at(nexus + "service/local/staging/deploy/maven2")
}

private def localRepo(repository: File) =
Expand Down
18 changes: 9 additions & 9 deletions project/Whitesource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ object Whitesource extends AutoPlugin {
whitesourceProduct := "Lightbend Reactive Platform",
whitesourceAggregateProjectName := {
val projectName = (moduleName in LocalRootProject).value.replace("-root", "")
projectName + "-" + (
if (isSnapshot.value)
if (gitCurrentBranch.value == "master") "master"
else "adhoc"
else CrossVersion.partialVersion((version in LocalRootProject).value)
.map { case (major,minor) => s"$major.$minor-stable" }
.getOrElse("adhoc"))
projectName + "-" + (if (isSnapshot.value)
if (gitCurrentBranch.value == "master") "master"
else "adhoc"
else
CrossVersion
.partialVersion((version in LocalRootProject).value)
.map { case (major, minor) => s"$major.$minor-stable" }
.getOrElse("adhoc"))
},
whitesourceForceCheckAllDependencies := true,
whitesourceFailOnError := true
)
whitesourceFailOnError := true)
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class DynamoDBJournalConfig(c: Config) extends DynamoDBConfig {
val MaxItemSize = c.getInt("aws-api-limits.max-item-size")

object Fixes {
val HighDistrust = c getBoolean "fixes.high-distrust"
val HighDistrust = c.getBoolean("fixes.high-distrust")
}

val client = new DynamoDBClientConfig(c)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,62 +272,63 @@ trait DynamoDBRecovery extends AsyncRecovery { this: DynamoDBJournal =>
* for which it was written, all other entries do not update the highest value. Therefore we
* must scan the partition of this Sort=0 entry and find the highest occupied number.
*/
getAllPartitionSequenceNrs(persistenceId, start)
.flatMap { result =>
if (result.getItems.isEmpty) {
/*
* If this comes back empty then that means that all events have been deleted. The only
* reliable way to obtain the previously highest number is to also read the lowest number
* (which is always stored in full), knowing that it will be either highest-1 or zero.
*/
readSequenceNr(persistenceId, highest = false).map { lowest =>
val ret = Math.max(start, lowest - 1)
log.debug("readSequenceNr(highest=true persistenceId={}) = {}", persistenceId, ret)
ret
}
} else if (Fixes.HighDistrust) { // allows recovering from failed high mark setting
// this function will keep on chasing the event source tail
// if HighDistrust is enabled and as long as the partitionMax == PartitionSize - 1
def tailChase(partitionStart: Long, nextResults: QueryResult): Future[Long] = {
if (nextResults.getItems.isEmpty) {
// first iteraton will not pass here, as the query result is not empty
// if the new query result is empty the highest observed is partition -1
Future.successful(partitionStart - 1)
} else {
/*
* `partitionStart` is the Sort=0 entry’s sequence number, so add the maximum sort key.
*/
val partitionMax = nextResults.getItems.asScala.map(_.get(Sort).getN.toLong).max
val ret = partitionStart + partitionMax

if (partitionMax == PartitionSize - 1) {
val nextStart = ret + 1
makeEventQuery(persistenceId, nextStart)
.map { logResult =>
if (!logResult.getItems().isEmpty()) // will only log if a follow-up query produced results
log.warning("readSequenceNr(highest=true persistenceId={}) tail found after {}", persistenceId, ret)
logResult
}
.flatMap(tailChase(nextStart, _))
} else
Future.successful(ret)
}
getAllPartitionSequenceNrs(persistenceId, start).flatMap { result =>
if (result.getItems.isEmpty) {
/*
* If this comes back empty then that means that all events have been deleted. The only
* reliable way to obtain the previously highest number is to also read the lowest number
* (which is always stored in full), knowing that it will be either highest-1 or zero.
*/
readSequenceNr(persistenceId, highest = false).map { lowest =>
val ret = Math.max(start, lowest - 1)
log.debug("readSequenceNr(highest=true persistenceId={}) = {}", persistenceId, ret)
ret
}
} else if (Fixes.HighDistrust) { // allows recovering from failed high mark setting
// this function will keep on chasing the event source tail
// if HighDistrust is enabled and as long as the partitionMax == PartitionSize - 1
def tailChase(partitionStart: Long, nextResults: QueryResult): Future[Long] = {
if (nextResults.getItems.isEmpty) {
// first iteraton will not pass here, as the query result is not empty
// if the new query result is empty the highest observed is partition -1
Future.successful(partitionStart - 1)
} else {
/*
* `partitionStart` is the Sort=0 entry’s sequence number, so add the maximum sort key.
*/
val partitionMax = nextResults.getItems.asScala.map(_.get(Sort).getN.toLong).max
val ret = partitionStart + partitionMax

if (partitionMax == PartitionSize - 1) {
val nextStart = ret + 1
getAllPartitionSequenceNrs(persistenceId, nextStart)
.map { logResult =>
if (!logResult.getItems().isEmpty()) // will only log if a follow-up query produced results
log.warning(
"readSequenceNr(highest=true persistenceId={}) tail found after {}",
persistenceId,
ret)
logResult
}
.flatMap(tailChase(nextStart, _))
} else
Future.successful(ret)
}
}

tailChase(start, result)
.map { ret =>
log.debug("readSequenceNr(highest=true persistenceId={}) = {}", persistenceId, ret)
ret
}
} else {
/*
* `start` is the Sort=0 entry’s sequence number, so add the maximum sort key.
*/
val ret = start + result.getItems.asScala.map(_.get(Sort).getN.toLong).max
tailChase(start, result).map { ret =>
log.debug("readSequenceNr(highest=true persistenceId={}) = {}", persistenceId, ret)
Future.successful(ret)
ret
}
} else {
/*
* `start` is the Sort=0 entry’s sequence number, so add the maximum sort key.
*/
val ret = start + result.getItems.asScala.map(_.get(Sort).getN.toLong).max
log.debug("readSequenceNr(highest=true persistenceId={}) = {}", persistenceId, ret)
Future.successful(ret)
}
}
} else {
log.debug("readSequenceNr(highest=false persistenceId={}) = {}", persistenceId, start)
Future.successful(start)
Expand Down Expand Up @@ -498,8 +499,7 @@ trait DynamoDBRecovery extends AsyncRecovery { this: DynamoDBJournal =>

private[dynamodb] def getAllPartitionSequenceNrs(persistenceId: String, sequenceNr: Long) = {
val request = eventQuery(persistenceId, sequenceNr)
dynamo.query(request)
.flatMap(getAllRemainingQueryItems(request, _))
dynamo.query(request).flatMap(getAllRemainingQueryItems(request, _))
}

def batchGetReq(items: JMap[String, KeysAndAttributes]) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ class DeletionSpec
* noisy logging, and I like my build output clean and green.
*/
Thread.sleep(500)
system.terminate().futureValue
client.shutdown()
system.terminate().futureValue
super.afterAll()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import org.scalatest.WordSpecLike
class DynamoPartitionGroupedSpec extends TestKit(ActorSystem("DynamoPartitionGroupedSpec")) with WordSpecLike {
implicit val materializer = ActorMaterializer()

assert(PartitionSize == 100, "This test is only valid with PartitionSize == 100")

"A DynamoPartitionGroup should create the correct PartitionKey outputs" when {
"events 1 thru 250 are presented" in {
val sourceUnderTest =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,30 @@ import com.amazonaws.services.dynamodbv2.model._
import java.util.{ HashMap => JHMap }
import akka.persistence.dynamodb._

class PersistAllConsistencySpec extends TestKit(ActorSystem("PersistAllConsistencySpec"))
class PersistAllConsistencySpec
extends TestKit(ActorSystem("PersistAllConsistencySpec"))
with ImplicitSender
with WordSpecLike
with BeforeAndAfterAll
with Matchers
with ScalaFutures
with ConversionCheckedTripleEquals
with DynamoDBUtils {
with DynamoDBUtils
with IntegSpec {

override def beforeAll(): Unit = {
super.beforeAll()
ensureJournalTableExists()
}

override def beforeAll(): Unit = ensureJournalTableExists()
override def afterAll(): Unit = {
client.shutdown()
system.terminate().futureValue
super.afterAll()
}

override val persistenceId = "PersistAllConsistencySpec"
lazy val journal = Persistence(system).journalFor("")
lazy val journal = Persistence(system).journalFor("")

import settings._

Expand All @@ -42,55 +49,56 @@ class PersistAllConsistencySpec extends TestKit(ActorSystem("PersistAllConsisten
expectMsg(Purged(persistenceId))

val start = nextSeqNr
val end = 10
val end = 10
println(s"start: ${start}; end: ${end}")
val padding = AtomicWrite((start to end).map(i => persistentRepr(f"h-$i"))) :: Nil

journal ! WriteMessages(padding, testActor, 1)
expectMsg(WriteMessagesSuccessful)
(start to end) foreach (i => expectMsg(WriteMessageSuccess(generatedMessages(i), 1)))
(start to end).foreach(i => expectMsg(WriteMessageSuccess(generatedMessages(i), 1)))

journal ! ReplayMessages(start, Long.MaxValue, Long.MaxValue, persistenceId, testActor)
(start to end) foreach (i => expectMsg(ReplayedMessage(generatedMessages(i))))
(start to end).foreach(i => expectMsg(ReplayedMessage(generatedMessages(i))))
expectMsg(RecoverySuccess(end))
}

for (t <- Seq(("last", 3), ("middle", 2), ("first", 1))) s"correctly cross page boundaries with AtomicWrite position ${t._1}" in {
val start1 = nextSeqNr
val end1 = ((start1 / 100) + 1) * 100 - t._2
println(s"start: ${start1}; end: ${end1}")
val padding = AtomicWrite((start1 to end1).map(i => persistentRepr(f"h-$i"))) :: Nil
for (t <- Seq(("last", 3), ("middle", 2), ("first", 1)))
s"correctly cross page boundaries with AtomicWrite position ${t._1}" in {
val start1 = nextSeqNr
val end1 = ((start1 / PartitionSize) + 1) * PartitionSize - t._2
println(s"start: ${start1}; end: ${end1}")
val padding = AtomicWrite((start1 to end1).map(i => persistentRepr(f"h-$i"))) :: Nil

journal ! WriteMessages(padding, testActor, 1)
expectMsg(WriteMessagesSuccessful)
(start1 to end1) foreach (i => expectMsg(WriteMessageSuccess(generatedMessages(i), 1)))
journal ! WriteMessages(padding, testActor, 1)
expectMsg(WriteMessagesSuccessful)
(start1 to end1).foreach(i => expectMsg(WriteMessageSuccess(generatedMessages(i), 1)))

val start2 = nextSeqNr
val end2 = start2 + 2
println(s"start: ${start2}; end: ${end2}")
val subject = AtomicWrite((start2 to end2).map(i => persistentRepr(f"h-$i"))) :: Nil
val start2 = nextSeqNr
val end2 = start2 + 2
println(s"start: ${start2}; end: ${end2}")
val subject = AtomicWrite((start2 to end2).map(i => persistentRepr(f"h-$i"))) :: Nil

journal ! WriteMessages(subject, testActor, 1)
expectMsg(WriteMessagesSuccessful)
(start2 to end2) foreach (i => expectMsg(WriteMessageSuccess(generatedMessages(i), 1)))
journal ! WriteMessages(subject, testActor, 1)
expectMsg(WriteMessagesSuccessful)
(start2 to end2).foreach(i => expectMsg(WriteMessageSuccess(generatedMessages(i), 1)))

journal ! ReplayMessages(start1, Long.MaxValue, Long.MaxValue, persistenceId, testActor)
(start1 to end2) foreach (i => expectMsg(ReplayedMessage(generatedMessages(i))))
expectMsg(RecoverySuccess(end2))
}
journal ! ReplayMessages(start1, Long.MaxValue, Long.MaxValue, persistenceId, testActor)
(start1 to end2).foreach(i => expectMsg(ReplayedMessage(generatedMessages(i))))
expectMsg(RecoverySuccess(end2))
}

"recover correctly when the last partition event ends on 99" in {
s"recover correctly when the last partition event ends on ${PartitionSize - 1}" in {
val start = nextSeqNr
val end = ((start / 100) + 1) * 100 - 1
val end = ((start / PartitionSize) + 1) * PartitionSize - 1
println(s"start: ${start}; end: ${end}")
val padding = AtomicWrite((start to end).map(i => persistentRepr(f"h-$i"))) :: Nil

journal ! WriteMessages(padding, testActor, 1)
expectMsg(WriteMessagesSuccessful)
(start to end) foreach (i => expectMsg(WriteMessageSuccess(generatedMessages(i), 1)))
(start to end).foreach(i => expectMsg(WriteMessageSuccess(generatedMessages(i), 1)))

journal ! ReplayMessages(start, Long.MaxValue, Long.MaxValue, persistenceId, testActor)
(start to end) foreach (i => expectMsg(ReplayedMessage(generatedMessages(i))))
(start to end).foreach(i => expectMsg(ReplayedMessage(generatedMessages(i))))
expectMsg(RecoverySuccess(end))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class RecoveryConsistencySpec

"read correct highest sequence number even if a Sort=0 entry is lost" in {
val start = messages + 19
val end = (start / 100 + 1) * 100
val end = (start / PartitionSize + 1) * PartitionSize
val more = (start to end).map(i => AtomicWrite(persistentRepr(f"e-$i")))
journal ! WriteMessages(more, testActor, 1)
expectMsg(WriteMessagesSuccessful)
Expand All @@ -98,7 +98,8 @@ class RecoveryConsistencySpec

journal ! ListAll(persistenceId, testActor)
val ids = ((1L to (end - 1)).toSet -- Set[Long](2, 4, 12, 15).map(_ + messages)).toSeq.sorted
expectMsg(ListAllResult(persistenceId, Set.empty, (1L to (end / 100)).map(_ * 100).toSet, ids))
expectMsg(
ListAllResult(persistenceId, Set.empty, (1L to (end / PartitionSize)).map(_ * PartitionSize).toSet, ids))

journal ! ReplayMessages(0, Long.MaxValue, 0, persistenceId, testActor)
expectMsg(RecoverySuccess(end))
Expand Down Expand Up @@ -157,8 +158,8 @@ class RecoveryConsistencySpec

private def delete(num: Long) = {
val key: Item = new JHMap
key.put(Key, S(s"$JournalName-P-$persistenceId-${num / 100}"))
key.put(Sort, N(num % 100))
key.put(Key, S(s"$JournalName-P-$persistenceId-${num / PartitionSize}"))
key.put(Sort, N(num % PartitionSize))
client.deleteItem(new DeleteItemRequest().withTableName(JournalTable).withKey(key)).futureValue
}
}

0 comments on commit c659dfb

Please sign in to comment.