Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: CassandraProjectionSpec race conditions #756

Merged
merged 2 commits into from
Jan 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package akka.projection.cassandra
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference

import scala.annotation.tailrec
import scala.collection.immutable
import scala.compat.java8.FutureConverters._
Expand All @@ -16,7 +15,6 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.concurrent.duration._

import akka.Done
import akka.NotUsed
import akka.actor.testkit.typed.TestException
Expand Down Expand Up @@ -52,6 +50,8 @@ import akka.stream.testkit.TestSubscriber
import akka.stream.testkit.scaladsl.TestSource
import org.scalatest.wordspec.AnyWordSpecLike

import scala.concurrent.Promise

object CassandraProjectionSpec {
case class Envelope(id: String, offset: Long, message: String)

Expand Down Expand Up @@ -191,20 +191,40 @@ class CassandraProjectionSpec

private val concatHandlerFail4Msg = "fail on fourth envelope"

class ConcatHandlerFail4 extends Handler[Envelope] {
private val _attempts = new AtomicInteger()
def attempts: Int = _attempts.get
private case class ImmediateFailOn4() extends Handler[Envelope] {

@volatile var attempts = 0

override def process(envelope: Envelope): Future[Done] = {
if (envelope.offset == 4L) {
_attempts.incrementAndGet()
attempts += 1
throw TestException(concatHandlerFail4Msg + s" after $attempts attempts")
} else {
repository.concatToText(envelope.id, envelope.message)
}
repository.concatToText(envelope.id, envelope.message)
}
}

private def concatHandlerFail4(): ConcatHandlerFail4 = new ConcatHandlerFail4
private case class ControlledHandlerFailOn4(saw4Probe: ActorRef[Promise[Done]]) extends Handler[Envelope] {

private var attempts = 0

override def process(envelope: Envelope): Future[Done] = {
if (envelope.offset == 4L) {
attempts += 1
val continue = Promise[Done]()
// We can't just throw immediately here because we don't know if the downstream offset commit completed
// yet, and most tests depend on that, instead, give the test a chance to observe we reached offset 4
// do some assertions and then fail in a more deterministic fashion
saw4Probe ! continue
continue.future.map { _ =>
throw TestException(concatHandlerFail4Msg + s" after $attempts attempts")
}
} else {
repository.concatToText(envelope.id, envelope.message)
}
}
}

def offsetShouldBe(projectionId: ProjectionId, expectedOffset: Long) = {
eventually {
Expand Down Expand Up @@ -242,9 +262,13 @@ class CassandraProjectionSpec
val entityId = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()

val failProbe = createTestProbe[Promise[Done]]()
val failingProjection =
CassandraProjection
.atLeastOnce[Long, Envelope](projectionId, sourceProvider(entityId), () => concatHandlerFail4())
.atLeastOnce[Long, Envelope](
projectionId,
sourceProvider(entityId),
() => ControlledHandlerFailOn4(failProbe.ref))
.withSaveOffset(1, Duration.Zero)

withClue("check - offset is empty") {
Expand All @@ -255,11 +279,13 @@ class CassandraProjectionSpec
withClue("check: projection failed with stream failure") {
projectionTestKit.runWithTestSink(failingProjection) { sinkProbe =>
sinkProbe.request(1000)
val fail = failProbe.receiveMessage() // handler saw 4
offsetShouldBe(projectionId, 3L) // saving each offset, we should see offset 3 written
fail.success(Done) // then we fail
eventuallyExpectError(sinkProbe).getMessage should startWith(concatHandlerFail4Msg)
}
}

offsetShouldBe(projectionId, 3L)
withClue("check: projection is consumed up to third") {
val concatStr = repository.findById(entityId).futureValue
concatStr should matchPattern {
Expand All @@ -286,19 +312,27 @@ class CassandraProjectionSpec
val entityId = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()

val failProbe = createTestProbe[Promise[Done]]()
val failingProjection =
CassandraProjection
.atLeastOnce[Long, Envelope](projectionId, sourceProvider(entityId), () => concatHandlerFail4())
.atLeastOnce[Long, Envelope](
projectionId,
sourceProvider(entityId),
() => ControlledHandlerFailOn4(failProbe.ref))
.withSaveOffset(2, 1.minute)

withClue("check - offset is empty") {
val offsetOpt = offsetStore.readOffset[Long](projectionId).futureValue
offsetOpt shouldBe empty
}

withClue("check: projection failed with stream failure") {
projectionTestKit.runWithTestSink(failingProjection) { sinkProbe =>
sinkProbe.request(1000)

val fail = failProbe.receiveMessage() // handler saw 4
offsetShouldBe(projectionId, 2L) // observe 2 committed
fail.success(Done) // then fail

eventuallyExpectError(sinkProbe).getMessage should startWith(concatHandlerFail4Msg)
}
}
Expand All @@ -312,8 +346,6 @@ class CassandraProjectionSpec
}
}

offsetShouldBe(projectionId, 2L)

// re-run projection without failing function
val projection =
CassandraProjection
Expand Down Expand Up @@ -411,7 +443,7 @@ class CassandraProjectionSpec

val projection =
CassandraProjection
.atLeastOnce[Long, Envelope](projectionId, sourceProvider(entityId), () => concatHandlerFail4())
.atLeastOnce[Long, Envelope](projectionId, sourceProvider(entityId), () => ImmediateFailOn4())
.withSaveOffset(2, 1.minute)
.withRecoveryStrategy(HandlerRecoveryStrategy.skip)

Expand All @@ -430,7 +462,7 @@ class CassandraProjectionSpec
val entityId = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()

val handler = concatHandlerFail4()
val handler = ImmediateFailOn4() // share the instance

val statusProbe = createTestProbe[TestStatusObserver.Status]()
val statusObserver = new TestStatusObserver[Envelope](statusProbe.ref)
Expand Down Expand Up @@ -472,7 +504,7 @@ class CassandraProjectionSpec
val entityId = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()

val handler = concatHandlerFail4()
val handler = ImmediateFailOn4()

val statusProbe = createTestProbe[TestStatusObserver.Status]()
val statusObserver = new TestStatusObserver[Envelope](statusProbe.ref, lifecycle = true)
Expand Down Expand Up @@ -715,9 +747,13 @@ class CassandraProjectionSpec
val entityId = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()

val failProbe = createTestProbe[Promise[Done]]()
val failingProjection =
CassandraProjection
.atMostOnce[Long, Envelope](projectionId, sourceProvider(entityId), () => concatHandlerFail4())
.atMostOnce[Long, Envelope](
projectionId,
sourceProvider(entityId),
() => ControlledHandlerFailOn4(failProbe.ref))

withClue("check - offset is empty") {
val offsetOpt = offsetStore.readOffset[Long](projectionId).futureValue
Expand All @@ -727,6 +763,7 @@ class CassandraProjectionSpec
withClue("check: projection failed with stream failure") {
projectionTestKit.runWithTestSink(failingProjection) { sinkProbe =>
sinkProbe.request(1000)
failProbe.receiveMessage().success(Done)
johanandren marked this conversation as resolved.
Show resolved Hide resolved
eventuallyExpectError(sinkProbe).getMessage should startWith(concatHandlerFail4Msg)
}
}
Expand Down
6 changes: 5 additions & 1 deletion project/Common.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ object Common extends AutoPlugin {
},
description := "Akka Projection.",
excludeLintKeys += scmInfo,
excludeLintKeys += mimaPreviousArtifacts)
excludeLintKeys += mimaPreviousArtifacts,
excludeLintKeys += testOptions,
excludeLintKeys += logBuffered)

override lazy val projectSettings = Seq(
projectInfoVersion := (if (isSnapshot.value) "snapshot" else version.value),
Expand All @@ -65,11 +67,13 @@ object Common extends AutoPlugin {
apiURL := Some(url(s"https://doc.akka.io/api/akka-projection/${projectInfoVersion.value}")),
// show full stack traces and test case durations
Test / testOptions += Tests.Argument(TestFrameworks.ScalaTest, "-oDF"),
IntegrationTest / testOptions += Tests.Argument(TestFrameworks.ScalaTest, "-oDF"),
// -a Show stack traces and exception class name for AssertionErrors.
// -v Log "test run started" / "test started" / "test run finished" events on log level "info" instead of "debug".
// -q Suppress stdout for successful tests.
Test / testOptions += Tests.Argument(TestFrameworks.JUnit, "-a", "-v", "-q"),
Test / logBuffered := false,
IntegrationTest / logBuffered := false,
mimaPreviousArtifacts :=
Set(
organization.value %% moduleName.value % previousStableVersion.value
Expand Down