Skip to content

Commit

Permalink
RUM-1983 Introduce the BatchProcessingLevel API
Browse files Browse the repository at this point in the history
 RUM-1983 Introduce the BatchProcessingLevel API
RUM-1983 Introduce the BatchProcessingLevel API
  • Loading branch information
mariusc83 committed Nov 9, 2023
1 parent fd22c1f commit 7dbfa0a
Show file tree
Hide file tree
Showing 17 changed files with 473 additions and 110 deletions.
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ tasks.register("buildNdkIntegrationTestsArtifacts") {
dependsOn(":instrumented:integration:assembleDebug")
}

nightlyTestsCoverageConfig(threshold = 0.86f)
nightlyTestsCoverageConfig(threshold = 0.85f)

tasks.register("printSdkDebugRuntimeClasspath") {
val fileTreeClassPathCollector = UnionFileTree(
Expand Down
8 changes: 7 additions & 1 deletion dd-sdk-android-core/api/apiSurface
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ interface com.datadog.android.api.storage.EventBatchWriter
fun currentMetadata(): ByteArray?
fun write(RawBatchEvent, ByteArray?): Boolean
data class com.datadog.android.api.storage.FeatureStorageConfiguration
constructor(Long, Int, Long, Long, com.datadog.android.core.configuration.UploadFrequency?, com.datadog.android.core.configuration.BatchSize?)
constructor(Long, Int, Long, Long, com.datadog.android.core.configuration.UploadFrequency?, com.datadog.android.core.configuration.BatchSize?, com.datadog.android.core.configuration.BatchProcessingLevel?)
companion object
val DEFAULT: FeatureStorageConfiguration
data class com.datadog.android.api.storage.RawBatchEvent
Expand All @@ -153,6 +153,11 @@ class com.datadog.android.core.SdkReference
constructor(String? = null, (com.datadog.android.api.SdkCore) -> Unit = {})
fun get(): com.datadog.android.api.SdkCore?
fun <T> allowThreadDiskReads(() -> T): T
enum com.datadog.android.core.configuration.BatchProcessingLevel
constructor(Int)
- LOW
- MEDIUM
- HIGH
enum com.datadog.android.core.configuration.BatchSize
constructor(Long)
- SMALL
Expand All @@ -168,6 +173,7 @@ data class com.datadog.android.core.configuration.Configuration
fun useSite(com.datadog.android.DatadogSite): Builder
fun setBatchSize(BatchSize): Builder
fun setUploadFrequency(UploadFrequency): Builder
fun setBatchProcessingLevel(BatchProcessingLevel): Builder
fun setAdditionalConfiguration(Map<String, Any>): Builder
fun setProxy(java.net.Proxy, okhttp3.Authenticator?): Builder
fun setEncryption(com.datadog.android.security.Encryption): Builder
Expand Down
18 changes: 15 additions & 3 deletions dd-sdk-android-core/api/dd-sdk-android-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -395,16 +395,18 @@ public abstract interface class com/datadog/android/api/storage/EventBatchWriter

public final class com/datadog/android/api/storage/FeatureStorageConfiguration {
public static final field Companion Lcom/datadog/android/api/storage/FeatureStorageConfiguration$Companion;
public fun <init> (JIJJLcom/datadog/android/core/configuration/UploadFrequency;Lcom/datadog/android/core/configuration/BatchSize;)V
public fun <init> (JIJJLcom/datadog/android/core/configuration/UploadFrequency;Lcom/datadog/android/core/configuration/BatchSize;Lcom/datadog/android/core/configuration/BatchProcessingLevel;)V
public final fun component1 ()J
public final fun component2 ()I
public final fun component3 ()J
public final fun component4 ()J
public final fun component5 ()Lcom/datadog/android/core/configuration/UploadFrequency;
public final fun component6 ()Lcom/datadog/android/core/configuration/BatchSize;
public final fun copy (JIJJLcom/datadog/android/core/configuration/UploadFrequency;Lcom/datadog/android/core/configuration/BatchSize;)Lcom/datadog/android/api/storage/FeatureStorageConfiguration;
public static synthetic fun copy$default (Lcom/datadog/android/api/storage/FeatureStorageConfiguration;JIJJLcom/datadog/android/core/configuration/UploadFrequency;Lcom/datadog/android/core/configuration/BatchSize;ILjava/lang/Object;)Lcom/datadog/android/api/storage/FeatureStorageConfiguration;
public final fun component7 ()Lcom/datadog/android/core/configuration/BatchProcessingLevel;
public final fun copy (JIJJLcom/datadog/android/core/configuration/UploadFrequency;Lcom/datadog/android/core/configuration/BatchSize;Lcom/datadog/android/core/configuration/BatchProcessingLevel;)Lcom/datadog/android/api/storage/FeatureStorageConfiguration;
public static synthetic fun copy$default (Lcom/datadog/android/api/storage/FeatureStorageConfiguration;JIJJLcom/datadog/android/core/configuration/UploadFrequency;Lcom/datadog/android/core/configuration/BatchSize;Lcom/datadog/android/core/configuration/BatchProcessingLevel;ILjava/lang/Object;)Lcom/datadog/android/api/storage/FeatureStorageConfiguration;
public fun equals (Ljava/lang/Object;)Z
public final fun getBatchProcessingLevel ()Lcom/datadog/android/core/configuration/BatchProcessingLevel;
public final fun getBatchSize ()Lcom/datadog/android/core/configuration/BatchSize;
public final fun getMaxBatchSize ()J
public final fun getMaxItemSize ()J
Expand Down Expand Up @@ -457,6 +459,15 @@ public final class com/datadog/android/core/StrictModeExtKt {
public static final fun allowThreadDiskReads (Lkotlin/jvm/functions/Function0;)Ljava/lang/Object;
}

public final class com/datadog/android/core/configuration/BatchProcessingLevel : java/lang/Enum {
public static final field HIGH Lcom/datadog/android/core/configuration/BatchProcessingLevel;
public static final field LOW Lcom/datadog/android/core/configuration/BatchProcessingLevel;
public static final field MEDIUM Lcom/datadog/android/core/configuration/BatchProcessingLevel;
public final fun getMaxBatchesPerUploadJob ()I
public static fun valueOf (Ljava/lang/String;)Lcom/datadog/android/core/configuration/BatchProcessingLevel;
public static fun values ()[Lcom/datadog/android/core/configuration/BatchProcessingLevel;
}

public final class com/datadog/android/core/configuration/BatchSize : java/lang/Enum {
public static final field LARGE Lcom/datadog/android/core/configuration/BatchSize;
public static final field MEDIUM Lcom/datadog/android/core/configuration/BatchSize;
Expand All @@ -481,6 +492,7 @@ public final class com/datadog/android/core/configuration/Configuration$Builder
public synthetic fun <init> (Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun build ()Lcom/datadog/android/core/configuration/Configuration;
public final fun setAdditionalConfiguration (Ljava/util/Map;)Lcom/datadog/android/core/configuration/Configuration$Builder;
public final fun setBatchProcessingLevel (Lcom/datadog/android/core/configuration/BatchProcessingLevel;)Lcom/datadog/android/core/configuration/Configuration$Builder;
public final fun setBatchSize (Lcom/datadog/android/core/configuration/BatchSize;)Lcom/datadog/android/core/configuration/Configuration$Builder;
public final fun setCrashReportsEnabled (Z)Lcom/datadog/android/core/configuration/Configuration$Builder;
public final fun setEncryption (Lcom/datadog/android/security/Encryption;)Lcom/datadog/android/core/configuration/Configuration$Builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

package com.datadog.android.api.storage

import com.datadog.android.core.configuration.BatchProcessingLevel
import com.datadog.android.core.configuration.BatchSize
import com.datadog.android.core.configuration.UploadFrequency

Expand All @@ -18,16 +19,19 @@ import com.datadog.android.core.configuration.UploadFrequency
* old to be uploaded (usually because it'll be discarded at ingestion by the backend)
* @property uploadFrequency the desired upload frequency policy. If not explicitly provided this
* value will be taken from core configuration.
* @property batchSize the desired batch size policy.If not explicitly provided this
* @property batchSize the desired batch size policy. If not explicitly provided this
* value will be taken from core configuration.
* @property batchProcessingLevel the desired batch processing level policy.
* If not explicitly provided this value will be taken from core configuration.
*/
data class FeatureStorageConfiguration(
val maxItemSize: Long,
val maxItemsPerBatch: Int,
val maxBatchSize: Long,
val oldBatchThreshold: Long,
val uploadFrequency: UploadFrequency?,
val batchSize: BatchSize?
val batchSize: BatchSize?,
val batchProcessingLevel: BatchProcessingLevel?
) {
companion object {

Expand All @@ -47,7 +51,8 @@ data class FeatureStorageConfiguration(
// 18 hours
oldBatchThreshold = 18L * 60L * 60L * 1000L,
uploadFrequency = null,
batchSize = null
batchSize = null,
batchProcessingLevel = null
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Unless explicitly stated otherwise all files in this repository are licensed under the Apache License Version 2.0.
* This product includes software developed at Datadog (https://www.datadoghq.com/).
* Copyright 2016-Present Datadog, Inc.
*/

package com.datadog.android.core.configuration

/**
* Defines the policy for sending the batches.
* High level will mean that more data will be sent in a single upload cycle but more CPU and memory
* will be used to process the data.
* Low level will mean that less data will be sent in a single upload cycle but less CPU and memory
* will be used to process the data.
* @param maxBatchesPerUploadJob the maximum number of batches that will be sent in a single upload
* cycle.
*/
@Suppress("MagicNumber")
enum class BatchProcessingLevel(val maxBatchesPerUploadJob: Int) {
/**
* Only 1 batch will be sent in a single upload cycle.
*/
LOW(maxBatchesPerUploadJob = 1),

/**
* 10 batches will be sent in a single upload cycle.
*/
MEDIUM(maxBatchesPerUploadJob = 10),

/**
* 100 batches will be sent in a single upload cycle.
*/
HIGH(maxBatchesPerUploadJob = 100)
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ internal constructor(
val proxy: Proxy?,
val proxyAuth: Authenticator,
val encryption: Encryption?,
val site: DatadogSite
val site: DatadogSite,
val batchProcessingLevel: BatchProcessingLevel
)

// region Builder
Expand Down Expand Up @@ -166,6 +167,18 @@ internal constructor(
return this
}

/**
* Defines the Batch processing level, defining the maximum number of batches processed
* sequentially without a delay within one reading/uploading cycle.
* @param batchProcessingLevel the desired batch processing level. By default it's set to
* [BatchProcessingLevel.MEDIUM].
* @see BatchProcessingLevel
*/
fun setBatchProcessingLevel(batchProcessingLevel: BatchProcessingLevel): Builder {
coreConfig = coreConfig.copy(batchProcessingLevel = batchProcessingLevel)
return this
}

/**
* Allows to provide additional configuration values which can be used by the SDK.
* @param additionalConfig Additional configuration values.
Expand Down Expand Up @@ -238,7 +251,8 @@ internal constructor(
proxy = null,
proxyAuth = Authenticator.NONE,
encryption = null,
site = DatadogSite.US1
site = DatadogSite.US1,
batchProcessingLevel = BatchProcessingLevel.MEDIUM
)

internal const val NETWORK_REQUESTS_TRACKING_FEATURE_NAME = "Network requests"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import com.datadog.android.BuildConfig
import com.datadog.android.DatadogSite
import com.datadog.android.api.InternalLogger
import com.datadog.android.core.allowThreadDiskReads
import com.datadog.android.core.configuration.BatchProcessingLevel
import com.datadog.android.core.configuration.BatchSize
import com.datadog.android.core.configuration.Configuration
import com.datadog.android.core.configuration.UploadFrequency
Expand Down Expand Up @@ -129,6 +130,7 @@ internal class CoreFeature(
internal var variant: String = ""
internal var batchSize: BatchSize = BatchSize.MEDIUM
internal var uploadFrequency: UploadFrequency = UploadFrequency.AVERAGE
internal var batchProcessingLevel: BatchProcessingLevel = BatchProcessingLevel.MEDIUM
internal var ndkCrashHandler: NdkCrashHandler = NoOpNdkCrashHandler()
internal var site: DatadogSite = DatadogSite.US1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import com.datadog.android.api.feature.StorageBackedFeature
import com.datadog.android.api.net.RequestFactory
import com.datadog.android.api.storage.EventBatchWriter
import com.datadog.android.api.storage.FeatureStorageConfiguration
import com.datadog.android.core.configuration.BatchProcessingLevel
import com.datadog.android.core.configuration.UploadFrequency
import com.datadog.android.core.internal.configuration.DataUploadConfiguration
import com.datadog.android.core.internal.data.upload.DataOkHttpUploader
Expand Down Expand Up @@ -71,7 +72,11 @@ internal class SdkFeature(
var dataUploadConfiguration: DataUploadConfiguration? = null
if (wrappedFeature is StorageBackedFeature) {
val uploadFrequency = resolveUploadFrequency()
dataUploadConfiguration = DataUploadConfiguration(uploadFrequency)
val batchProcessingLevel = resolveBatchProcessingLevel()
dataUploadConfiguration = DataUploadConfiguration(
uploadFrequency,
batchProcessingLevel.maxBatchesPerUploadJob
)
val storageConfiguration = wrappedFeature.storageConfiguration
val recentDelayMs = resolveBatchingDelay(coreFeature, storageConfiguration)
val filePersistenceConfig = coreFeature.buildFilePersistenceConfig().copy(
Expand Down Expand Up @@ -204,6 +209,14 @@ internal class SdkFeature(
coreFeature.uploadFrequency
}
}
private fun resolveBatchProcessingLevel(): BatchProcessingLevel {
return if (wrappedFeature is StorageBackedFeature) {
wrappedFeature.storageConfiguration.batchProcessingLevel
?: coreFeature.batchProcessingLevel
} else {
coreFeature.batchProcessingLevel
}
}

private fun setupUploader(
requestFactory: RequestFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ package com.datadog.android.core.internal.configuration

import com.datadog.android.core.configuration.UploadFrequency

internal data class DataUploadConfiguration(internal val frequency: UploadFrequency) {
internal data class DataUploadConfiguration(
internal val frequency: UploadFrequency,
internal val maxBatchesPerUploadJob: Int
) {
internal val minDelayMs = MIN_DELAY_FACTOR * frequency.baseStepMs
internal val maxDelayMs = MAX_DELAY_FACTOR * frequency.baseStepMs
internal val defaultDelayMs = DEFAULT_DELAY_FACTOR * frequency.baseStepMs

companion object {
internal const val MIN_DELAY_FACTOR = 1
internal const val MAX_DELAY_FACTOR = 10
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,38 +44,31 @@ internal class DataUploadRunnable(
internal var currentDelayIntervalMs = uploadConfiguration.defaultDelayMs
internal val minDelayMs = uploadConfiguration.minDelayMs
internal val maxDelayMs = uploadConfiguration.maxDelayMs
internal val maxBatchesPerJob = uploadConfiguration.maxBatchesPerUploadJob

// region Runnable

@WorkerThread
@Suppress("UnsafeThirdPartyFunctionCall") // called inside a dedicated executor
override fun run() {
if (isNetworkAvailable() && isSystemReady()) {
val context = contextProvider.context
// TODO RUMM-0000 it should be already on the worker thread and if readNextBatch is async,
// we should wait until it completes before scheduling further
val lock = CountDownLatch(1)
storage.readNextBatch(
noBatchCallback = {
increaseInterval()
lock.countDown()
}
) { batchId, reader ->
try {
val batch = reader.read()
val batchMeta = reader.currentMetadata()

consumeBatch(
context,
batchId,
batch,
batchMeta
)
} finally {
lock.countDown()
}
var batchConsumerAvailableAttempts = maxBatchesPerJob
var lastBatchUploadStatus: UploadStatus?
do {
batchConsumerAvailableAttempts--
lastBatchUploadStatus = handleNextBatch(context)
} while (batchConsumerAvailableAttempts > 0 &&
lastBatchUploadStatus is UploadStatus.Success
)
if (lastBatchUploadStatus != null) {
handleBatchConsumingJobFrequency(lastBatchUploadStatus)
} else {
// there was no batch left or there was a problem reading the next batch
// in the storage so we increase the interval
increaseInterval()
}
lock.await(batchUploadWaitTimeoutMs, TimeUnit.MILLISECONDS)
}

scheduleNextUpload()
Expand All @@ -85,6 +78,42 @@ internal class DataUploadRunnable(

// region Internal

private fun handleBatchConsumingJobFrequency(lastBatchUploadStatus: UploadStatus) {
if (lastBatchUploadStatus.shouldRetry) {
increaseInterval()
} else {
decreaseInterval()
}
}

@WorkerThread
@Suppress("UnsafeThirdPartyFunctionCall") // called inside a dedicated executor
private fun handleNextBatch(context: DatadogContext): UploadStatus? {
var uploadStatus: UploadStatus? = null
val lock = CountDownLatch(1)
storage.readNextBatch(
noBatchCallback = {
lock.countDown()
}
) { batchId, reader ->
try {
val batch = reader.read()
val batchMeta = reader.currentMetadata()

uploadStatus = consumeBatch(
context,
batchId,
batch,
batchMeta
)
} finally {
lock.countDown()
}
}
lock.await(batchUploadWaitTimeoutMs, TimeUnit.MILLISECONDS)
return uploadStatus
}

private fun isNetworkAvailable(): Boolean {
val networkInfo = networkInfoProvider.getLatestNetworkInfo()
return networkInfo.connectivity != NetworkInfo.Connectivity.NETWORK_NOT_CONNECTED
Expand Down Expand Up @@ -115,22 +144,17 @@ internal class DataUploadRunnable(
batchId: BatchId,
batch: List<RawBatchEvent>,
batchMeta: ByteArray?
) {
): UploadStatus {
val status = dataUploader.upload(context, batch, batchMeta)
val removalReason = if (status is UploadStatus.RequestCreationError) {
RemovalReason.Invalid
} else {
RemovalReason.IntakeCode(status.code)
}
storage.confirmBatchRead(batchId, removalReason) {
if (status.shouldRetry) {
it.markAsRead(false)
increaseInterval()
} else {
it.markAsRead(true)
decreaseInterval()
}
it.markAsRead(deleteBatch = !status.shouldRetry)
}
return status
}

@Suppress("UnsafeThirdPartyFunctionCall") // rounded Double isn't NaN
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ internal class ConfigurationBuilderTest {
proxy = null,
proxyAuth = Authenticator.NONE,
encryption = null,
site = DatadogSite.US1
site = DatadogSite.US1,
batchProcessingLevel = BatchProcessingLevel.MEDIUM
)
)
assertThat(config.crashReportsEnabled).isTrue
Expand Down
Loading

0 comments on commit 7dbfa0a

Please sign in to comment.