Skip to content

Commit

Permalink
Configurable parallelism in initial offset store query
Browse files Browse the repository at this point in the history
  • Loading branch information
leviramsey committed Oct 27, 2024
1 parent 014f5bb commit 62d10cc
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 3 deletions.
6 changes: 6 additions & 0 deletions akka-projection-dynamodb/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ akka.projection.dynamodb {

# Trying to batch insert offsets in batches of this size.
offset-batch-size = 20

# Number of slices to read offsets simultaneously for. The underlying Dynamo
# client must be able to handle (`http.max-concurrency` plus `http.max-pending-connection-acquires`)
# at least this number of concurrent requests. Defaults to 1024 (all slices simultaneously),
# but may be reduced.
offset-slice-read-parallelism = 1024
}

# By default it shares DynamoDB client with akka-persistence-dynamodb (write side).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ object DynamoDBProjectionSettings {
evictInterval = config.getDuration("offset-store.evict-interval"),
warnAboutFilteredEventsInFlow = config.getBoolean("warn-about-filtered-events-in-flow"),
offsetBatchSize = config.getInt("offset-store.offset-batch-size"),
offsetSliceReadParallelism = config.getInt("offset-store.offset-slice-read-parallelism"),
timeToLiveSettings = TimeToLiveSettings(config.getConfig("time-to-live")))
}

Expand All @@ -63,6 +64,7 @@ final class DynamoDBProjectionSettings private (
val evictInterval: JDuration,
val warnAboutFilteredEventsInFlow: Boolean,
val offsetBatchSize: Int,
val offsetSliceReadParallelism: Int,
val timeToLiveSettings: TimeToLiveSettings) {

def withTimestampOffsetTable(timestampOffsetTable: String): DynamoDBProjectionSettings =
Expand Down Expand Up @@ -92,6 +94,9 @@ final class DynamoDBProjectionSettings private (
def withOffsetBatchSize(offsetBatchSize: Int): DynamoDBProjectionSettings =
copy(offsetBatchSize = offsetBatchSize)

def withOffsetSliceReadParallelism(offsetSliceReadParallelism: Int): DynamoDBProjectionSettings =
copy(offsetSliceReadParallelism = offsetSliceReadParallelism)

def withTimeToLiveSettings(timeToLiveSettings: TimeToLiveSettings): DynamoDBProjectionSettings =
copy(timeToLiveSettings = timeToLiveSettings)

Expand All @@ -103,6 +108,7 @@ final class DynamoDBProjectionSettings private (
evictInterval: JDuration = evictInterval,
warnAboutFilteredEventsInFlow: Boolean = warnAboutFilteredEventsInFlow,
offsetBatchSize: Int = offsetBatchSize,
offsetSliceReadParallelism: Int = offsetSliceReadParallelism,
timeToLiveSettings: TimeToLiveSettings = timeToLiveSettings) =
new DynamoDBProjectionSettings(
timestampOffsetTable,
Expand All @@ -112,6 +118,7 @@ final class DynamoDBProjectionSettings private (
evictInterval,
warnAboutFilteredEventsInFlow,
offsetBatchSize,
offsetSliceReadParallelism,
timeToLiveSettings)

override def toString =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ import akka.projection.BySlicesSourceProvider
import akka.projection.ProjectionId
import akka.projection.dynamodb.DynamoDBProjectionSettings
import akka.projection.internal.ManagementState
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.stream.Materializer.matFromSystem
import org.slf4j.LoggerFactory
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem
Expand Down Expand Up @@ -248,9 +251,20 @@ private[projection] class DynamoDBOffsetStore(
private def readTimestampOffset(): Future[TimestampOffsetBySlice] = {
val oldState = state.get()
// retrieve latest timestamp for each slice, and use the earliest
val futTimestamps =
(minSlice to maxSlice).map(slice => dao.loadTimestampOffset(slice).map(optTimestamp => slice -> optTimestamp))
val offsetBySliceFut = Future.sequence(futTimestamps).map(_.collect { case (slice, Some(ts)) => slice -> ts }.toMap)
val offsetBySliceFut =
Source(minSlice to maxSlice)
.mapAsyncUnordered(settings.offsetSliceReadParallelism) { slice =>
dao
.loadTimestampOffset(slice)
.map { optTimestampOffset =>
optTimestampOffset.map { timestampOffset => slice -> timestampOffset }
}(ExecutionContext.parasitic)
}
.mapConcat(identity)
.runWith(Sink.fold(Map.empty[Int, TimestampOffset]) { (offsetMap, sliceAndOffset: (Int, TimestampOffset)) =>
offsetMap + sliceAndOffset
})(matFromSystem(system))

offsetBySliceFut.map { offsetBySlice =>
val newState = State(offsetBySlice)
logger.debug(
Expand Down

0 comments on commit 62d10cc

Please sign in to comment.