diff --git a/akka-projection-dynamodb/src/main/resources/reference.conf b/akka-projection-dynamodb/src/main/resources/reference.conf index 03b2db60d..68e13edb8 100644 --- a/akka-projection-dynamodb/src/main/resources/reference.conf +++ b/akka-projection-dynamodb/src/main/resources/reference.conf @@ -22,6 +22,12 @@ akka.projection.dynamodb { # Trying to batch insert offsets in batches of this size. offset-batch-size = 20 + + # Number of slices to read offsets simultaneously for. The underlying Dynamo + # client must be able to handle (`http.max-concurrency` plus `http.max-pending-connection-acquires`) + # at least this number of concurrent requests. Defaults to 1024 (all slices simultaneously), + # but may be reduced. + offset-slice-read-parallelism = 1024 } # By default it shares DynamoDB client with akka-persistence-dynamodb (write side). diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/DynamoDBProjectionSettings.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/DynamoDBProjectionSettings.scala index b9090a73d..2c07be1d6 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/DynamoDBProjectionSettings.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/DynamoDBProjectionSettings.scala @@ -44,6 +44,7 @@ object DynamoDBProjectionSettings { evictInterval = config.getDuration("offset-store.evict-interval"), warnAboutFilteredEventsInFlow = config.getBoolean("warn-about-filtered-events-in-flow"), offsetBatchSize = config.getInt("offset-store.offset-batch-size"), + offsetSliceReadParallelism = config.getInt("offset-store.offset-slice-read-parallelism"), timeToLiveSettings = TimeToLiveSettings(config.getConfig("time-to-live"))) } @@ -63,6 +64,7 @@ final class DynamoDBProjectionSettings private ( val evictInterval: JDuration, val warnAboutFilteredEventsInFlow: Boolean, val offsetBatchSize: Int, + val offsetSliceReadParallelism: Int, val timeToLiveSettings: TimeToLiveSettings) { def withTimestampOffsetTable(timestampOffsetTable: String): DynamoDBProjectionSettings = @@ -92,6 +94,9 @@ final class DynamoDBProjectionSettings private ( def withOffsetBatchSize(offsetBatchSize: Int): DynamoDBProjectionSettings = copy(offsetBatchSize = offsetBatchSize) + def withOffsetSliceReadParallelism(offsetSliceReadParallelism: Int): DynamoDBProjectionSettings = + copy(offsetSliceReadParallelism = offsetSliceReadParallelism) + def withTimeToLiveSettings(timeToLiveSettings: TimeToLiveSettings): DynamoDBProjectionSettings = copy(timeToLiveSettings = timeToLiveSettings) @@ -103,6 +108,7 @@ final class DynamoDBProjectionSettings private ( evictInterval: JDuration = evictInterval, warnAboutFilteredEventsInFlow: Boolean = warnAboutFilteredEventsInFlow, offsetBatchSize: Int = offsetBatchSize, + offsetSliceReadParallelism: Int = offsetSliceReadParallelism, timeToLiveSettings: TimeToLiveSettings = timeToLiveSettings) = new DynamoDBProjectionSettings( timestampOffsetTable, @@ -112,6 +118,7 @@ final class DynamoDBProjectionSettings private ( evictInterval, warnAboutFilteredEventsInFlow, offsetBatchSize, + offsetSliceReadParallelism, timeToLiveSettings) override def toString = diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala index 3123d54fb..fc91051bd 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala @@ -28,6 +28,9 @@ import akka.projection.BySlicesSourceProvider import akka.projection.ProjectionId import akka.projection.dynamodb.DynamoDBProjectionSettings import akka.projection.internal.ManagementState +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import akka.stream.Materializer.matFromSystem import org.slf4j.LoggerFactory import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem @@ -248,9 +251,20 @@ private[projection] class DynamoDBOffsetStore( private def readTimestampOffset(): Future[TimestampOffsetBySlice] = { val oldState = state.get() // retrieve latest timestamp for each slice, and use the earliest - val futTimestamps = - (minSlice to maxSlice).map(slice => dao.loadTimestampOffset(slice).map(optTimestamp => slice -> optTimestamp)) - val offsetBySliceFut = Future.sequence(futTimestamps).map(_.collect { case (slice, Some(ts)) => slice -> ts }.toMap) + val offsetBySliceFut = + Source(minSlice to maxSlice) + .mapAsyncUnordered(settings.offsetSliceReadParallelism) { slice => + dao + .loadTimestampOffset(slice) + .map { optTimestampOffset => + optTimestampOffset.map { timestampOffset => slice -> timestampOffset } + }(ExecutionContext.parasitic) + } + .mapConcat(identity) + .runWith(Sink.fold(Map.empty[Int, TimestampOffset]) { (offsetMap, sliceAndOffset: (Int, TimestampOffset)) => + offsetMap + sliceAndOffset + })(matFromSystem(system)) + offsetBySliceFut.map { offsetBySlice => val newState = State(offsetBySlice) logger.debug(