diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ConsumerFilterRegistry.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ConsumerFilterRegistry.scala index 750d125cf..cc804cb95 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ConsumerFilterRegistry.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ConsumerFilterRegistry.scala @@ -7,11 +7,13 @@ package akka.projection.grpc.internal import scala.concurrent.duration._ import java.net.URLEncoder import java.nio.charset.StandardCharsets +import java.util.UUID import scala.collection.immutable import scala.util.Failure import scala.util.Success +import akka.actor.InvalidActorNameException import akka.actor.typed.ActorRef import akka.actor.typed.Behavior import akka.actor.typed.scaladsl.ActorContext @@ -61,9 +63,17 @@ import akka.util.Timeout stores.get(streamId) match { case Some(store) => store case None => - context.spawn( - ConsumerFilterStore(context.system, settings, streamId, context.self), - URLEncoder.encode(streamId, StandardCharsets.UTF_8.name)) + val encodedStreamId = URLEncoder.encode(streamId, StandardCharsets.UTF_8.name) + try { + context.spawn(ConsumerFilterStore(context.system, settings, streamId, context.self), encodedStreamId) + } catch { + case _: InvalidActorNameException => + // There could be a race condition from SubscriberTerminated where the child is stopped, + // but not removed yet. The actor name isn't important, but could be useful for debugging. + context.spawn( + ConsumerFilterStore(context.system, settings, streamId, context.self), + s"$encodedStreamId-${UUID.randomUUID()}") + } } }