diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala index f222a1773..ca9920fd0 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala @@ -29,6 +29,7 @@ import akka.projection.grpc.consumer.scaladsl import akka.projection.grpc.consumer.scaladsl.GrpcReadJournal.withChannelBuilderOverrides import akka.projection.grpc.internal.ProjectionGrpcSerialization import akka.projection.grpc.internal.ConnectionException +import akka.projection.grpc.internal.DetailedConnectionException import akka.projection.grpc.internal.ProtoAnySerialization import akka.projection.grpc.internal.ProtobufProtocolConversions import akka.projection.grpc.internal.proto.Event @@ -64,6 +65,8 @@ import akka.projection.grpc.internal.proto.ReplayPersistenceId import akka.projection.grpc.internal.proto.ReplicaInfo import akka.projection.grpc.replication.scaladsl.ReplicationSettings +import java.util.concurrent.atomic.AtomicInteger + object GrpcReadJournal { val Identifier = "akka.projection.grpc.consumer" @@ -291,6 +294,7 @@ final class GrpcReadJournal private ( case _ => offset }) + val consecutiveFailCounter = new AtomicInteger(0) def sliceHandledByThisStream(pid: String): Boolean = { val slice = persistenceExt.sliceForPersistenceId(pid) minSlice <= slice && slice <= maxSlice @@ -374,16 +378,30 @@ final class GrpcReadJournal private ( val streamOut: Source[StreamOut, NotUsed] = addRequestHeaders(client.eventsBySlices()) .invoke(streamIn) + .statefulMap(() => false)({ (seenElement, out) => + // reset counter on first element + if (!seenElement) consecutiveFailCounter.set(0) + (true, out) + }, _ => None) .recover { case ex: akka.grpc.GrpcServiceException if ex.status.getCode == Status.Code.UNAVAILABLE => - // this means we couldn't connect, will be retried, relatively common, so make it less noisy - throw new ConnectionException( - clientSettings.serviceName, - clientSettings.servicePortName.getOrElse(clientSettings.defaultPort.toString), - streamId) + if (consecutiveFailCounter.incrementAndGet() < 5) { + // this means we couldn't connect, will be retried (and should succeed), relatively common, so make it less noisy + throw new ConnectionException( + clientSettings.serviceName, + clientSettings.servicePortName.getOrElse(clientSettings.defaultPort.toString), + streamId) + } else { + // we couldn't connect, it failed several times, include cause + throw new DetailedConnectionException( + clientSettings.serviceName, + clientSettings.servicePortName.getOrElse(clientSettings.defaultPort.toString), + streamId, + ex) + } case th: Throwable => - throw new RuntimeException(s"Failure to consume gRPC event stream for [${streamId}]", th) + throw new RuntimeException(s"Failure to consume gRPC event stream for [$streamId]", th) } streamOut.map { diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ConnectionException.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ConnectionException.scala index f360d7f22..43159dafd 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ConnectionException.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ConnectionException.scala @@ -15,3 +15,10 @@ import scala.util.control.NoStackTrace private[akka] final class ConnectionException(host: String, port: String, streamId: String) extends RuntimeException(s"Connection to $host:$port for stream id $streamId failed or lost") with NoStackTrace + +/** + * INTERNAL API + */ +@InternalApi +private[akka] final class DetailedConnectionException(host: String, port: String, streamId: String, cause: Throwable) + extends RuntimeException(s"Connection to $host:$port for stream id $streamId failed or lost", cause)