Skip to content

Commit

Permalink
chore: Log more details if GrpcReadJournal keeps failing to connect
Browse files Browse the repository at this point in the history
  • Loading branch information
johanandren committed Oct 21, 2024
1 parent e956248 commit 98b5f89
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import akka.projection.grpc.consumer.scaladsl
import akka.projection.grpc.consumer.scaladsl.GrpcReadJournal.withChannelBuilderOverrides
import akka.projection.grpc.internal.ProjectionGrpcSerialization
import akka.projection.grpc.internal.ConnectionException
import akka.projection.grpc.internal.DetailedConnectionException
import akka.projection.grpc.internal.ProtoAnySerialization
import akka.projection.grpc.internal.ProtobufProtocolConversions
import akka.projection.grpc.internal.proto.Event
Expand Down Expand Up @@ -64,6 +65,8 @@ import akka.projection.grpc.internal.proto.ReplayPersistenceId
import akka.projection.grpc.internal.proto.ReplicaInfo
import akka.projection.grpc.replication.scaladsl.ReplicationSettings

import java.util.concurrent.atomic.AtomicInteger

object GrpcReadJournal {
val Identifier = "akka.projection.grpc.consumer"

Expand Down Expand Up @@ -291,6 +294,7 @@ final class GrpcReadJournal private (
case _ => offset
})

val consecutiveFailCounter = new AtomicInteger(0)
def sliceHandledByThisStream(pid: String): Boolean = {
val slice = persistenceExt.sliceForPersistenceId(pid)
minSlice <= slice && slice <= maxSlice
Expand Down Expand Up @@ -374,16 +378,30 @@ final class GrpcReadJournal private (
val streamOut: Source[StreamOut, NotUsed] =
addRequestHeaders(client.eventsBySlices())
.invoke(streamIn)
.statefulMap(() => false)({ (seenElement, out) =>
// reset counter on first element
if (!seenElement) consecutiveFailCounter.set(0)
(true, out)
}, _ => None)
.recover {
case ex: akka.grpc.GrpcServiceException if ex.status.getCode == Status.Code.UNAVAILABLE =>
// this means we couldn't connect, will be retried, relatively common, so make it less noisy
throw new ConnectionException(
clientSettings.serviceName,
clientSettings.servicePortName.getOrElse(clientSettings.defaultPort.toString),
streamId)
if (consecutiveFailCounter.incrementAndGet() < 5) {
// this means we couldn't connect, will be retried (and should succeed), relatively common, so make it less noisy
throw new ConnectionException(
clientSettings.serviceName,
clientSettings.servicePortName.getOrElse(clientSettings.defaultPort.toString),
streamId)
} else {
// we couldn't connect, it failed several times, include cause
throw new DetailedConnectionException(
clientSettings.serviceName,
clientSettings.servicePortName.getOrElse(clientSettings.defaultPort.toString),
streamId,
ex)
}

case th: Throwable =>
throw new RuntimeException(s"Failure to consume gRPC event stream for [${streamId}]", th)
throw new RuntimeException(s"Failure to consume gRPC event stream for [$streamId]", th)
}

streamOut.map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,10 @@ import scala.util.control.NoStackTrace
private[akka] final class ConnectionException(host: String, port: String, streamId: String)
extends RuntimeException(s"Connection to $host:$port for stream id $streamId failed or lost")
with NoStackTrace

/**
* INTERNAL API
*/
@InternalApi
private[akka] final class DetailedConnectionException(host: String, port: String, streamId: String, cause: Throwable)
extends RuntimeException(s"Connection to $host:$port for stream id $streamId failed or lost", cause)

0 comments on commit 98b5f89

Please sign in to comment.