From ae9a07ca11fcf2d3c3831c375e4c1d1177ac4608 Mon Sep 17 00:00:00 2001 From: "Xavier F. Gouchet" Date: Mon, 25 Mar 2024 15:24:53 +0100 Subject: [PATCH 1/4] RUM-3670 Create a BackPressureStrategy --- .../configuration/BackPressureMitigation.kt | 19 + .../configuration/BackPressureStrategy.kt | 20 + .../thread/BackPressureExecutorService.kt | 68 +++ .../thread/BackPressuredBlockingQueue.kt | 91 ++++ .../core/thread/FlushableExecutorService.kt | 44 ++ .../thread/BackPressureExecutorServiceTest.kt | 35 ++ ...ropOldestBackPressuredBlockingQueueTest.kt | 420 ++++++++++++++++++ ...oreNewestBackPressuredBlockingQueueTest.kt | 420 ++++++++++++++++++ 8 files changed, 1117 insertions(+) create mode 100644 dd-sdk-android-core/src/main/kotlin/com/datadog/android/core/configuration/BackPressureMitigation.kt create mode 100644 dd-sdk-android-core/src/main/kotlin/com/datadog/android/core/configuration/BackPressureStrategy.kt create mode 100644 dd-sdk-android-core/src/main/kotlin/com/datadog/android/core/internal/thread/BackPressureExecutorService.kt create mode 100644 dd-sdk-android-core/src/main/kotlin/com/datadog/android/core/internal/thread/BackPressuredBlockingQueue.kt create mode 100644 dd-sdk-android-core/src/main/kotlin/com/datadog/android/core/thread/FlushableExecutorService.kt create mode 100644 dd-sdk-android-core/src/test/kotlin/com/datadog/android/core/internal/thread/BackPressureExecutorServiceTest.kt create mode 100644 dd-sdk-android-core/src/test/kotlin/com/datadog/android/core/internal/thread/DropOldestBackPressuredBlockingQueueTest.kt create mode 100644 dd-sdk-android-core/src/test/kotlin/com/datadog/android/core/internal/thread/IgnoreNewestBackPressuredBlockingQueueTest.kt diff --git a/dd-sdk-android-core/src/main/kotlin/com/datadog/android/core/configuration/BackPressureMitigation.kt b/dd-sdk-android-core/src/main/kotlin/com/datadog/android/core/configuration/BackPressureMitigation.kt new file mode 100644 index 0000000000..61d2e9c7d1 --- /dev/null +++ b/dd-sdk-android-core/src/main/kotlin/com/datadog/android/core/configuration/BackPressureMitigation.kt @@ -0,0 +1,19 @@ +/* + * Unless explicitly stated otherwise all files in this repository are licensed under the Apache License Version 2.0. + * This product includes software developed at Datadog (https://www.datadoghq.com/). + * Copyright 2016-Present Datadog, Inc. + */ + +package com.datadog.android.core.configuration + +/** + * Defines the mitigation to use when a queue hits the maximum back pressure capacity. + */ +enum class BackPressureMitigation { + + /** Drop the oldest items already in the queue to make room for new ones. */ + DROP_OLDEST, + + /** Ignore newest items that are not yet in the queue. */ + IGNORE_NEWEST +} diff --git a/dd-sdk-android-core/src/main/kotlin/com/datadog/android/core/configuration/BackPressureStrategy.kt b/dd-sdk-android-core/src/main/kotlin/com/datadog/android/core/configuration/BackPressureStrategy.kt new file mode 100644 index 0000000000..627ec795fc --- /dev/null +++ b/dd-sdk-android-core/src/main/kotlin/com/datadog/android/core/configuration/BackPressureStrategy.kt @@ -0,0 +1,20 @@ +/* + * Unless explicitly stated otherwise all files in this repository are licensed under the Apache License Version 2.0. + * This product includes software developed at Datadog (https://www.datadoghq.com/). + * Copyright 2016-Present Datadog, Inc. + */ + +package com.datadog.android.core.configuration + +/** + * @param capacity the maximum size of the queue + * @param onThresholdReached callback called when the queue reaches full capacity + * @param onItemDropped called when an item is dropped because of this backpressure strategy + * @param backpressureMitigation the mitigation to use when reaching the capacity + */ +data class BackPressureStrategy( + val capacity: Int, + val onThresholdReached: () -> Unit, + val onItemDropped: (Any) -> Unit, + val backpressureMitigation: BackPressureMitigation +) diff --git a/dd-sdk-android-core/src/main/kotlin/com/datadog/android/core/internal/thread/BackPressureExecutorService.kt b/dd-sdk-android-core/src/main/kotlin/com/datadog/android/core/internal/thread/BackPressureExecutorService.kt new file mode 100644 index 0000000000..ab5dd7bc9b --- /dev/null +++ b/dd-sdk-android-core/src/main/kotlin/com/datadog/android/core/internal/thread/BackPressureExecutorService.kt @@ -0,0 +1,68 @@ +/* + * Unless explicitly stated otherwise all files in this repository are licensed under the Apache License Version 2.0. + * This product includes software developed at Datadog (https://www.datadoghq.com/). + * Copyright 2016-Present Datadog, Inc. + */ + +package com.datadog.android.core.internal.thread + +import com.datadog.android.api.InternalLogger +import com.datadog.android.core.configuration.BackPressureStrategy +import com.datadog.android.core.thread.FlushableExecutorService +import java.util.concurrent.ThreadPoolExecutor +import java.util.concurrent.TimeUnit + +internal class BackPressureExecutorService( + val logger: InternalLogger, + backpressureStrategy: BackPressureStrategy +) : ThreadPoolExecutor( + CORE_POOL_SIZE, + CORE_POOL_SIZE, + THREAD_POOL_MAX_KEEP_ALIVE_MS, + TimeUnit.MILLISECONDS, + BackPressuredBlockingQueue(logger, backpressureStrategy) +), + FlushableExecutorService { + + // region FlushableExecutorService + + @Suppress("TooGenericExceptionCaught") + override fun drainTo(destination: MutableCollection) { + try { + queue.drainTo(destination) + } catch (e: IllegalArgumentException) { + onDrainException(e) + } catch (e: NullPointerException) { + onDrainException(e) + } catch (e: UnsupportedOperationException) { + onDrainException(e) + } catch (e: ClassCastException) { + onDrainException(e) + } + } + + // endregion + + // region ThreadPoolExecutor + + override fun afterExecute(r: Runnable?, t: Throwable?) { + super.afterExecute(r, t) + loggingAfterExecute(r, t, logger) + } + + // endregion + + private fun onDrainException(e: RuntimeException) { + logger.log( + InternalLogger.Level.ERROR, + listOf(InternalLogger.Target.MAINTAINER, InternalLogger.Target.TELEMETRY), + { "Unable to drain BackPressureExecutorService queue" }, + e + ) + } + + companion object { + private const val CORE_POOL_SIZE = 1 + private val THREAD_POOL_MAX_KEEP_ALIVE_MS = TimeUnit.SECONDS.toMillis(5) + } +} diff --git a/dd-sdk-android-core/src/main/kotlin/com/datadog/android/core/internal/thread/BackPressuredBlockingQueue.kt b/dd-sdk-android-core/src/main/kotlin/com/datadog/android/core/internal/thread/BackPressuredBlockingQueue.kt new file mode 100644 index 0000000000..286e695ba5 --- /dev/null +++ b/dd-sdk-android-core/src/main/kotlin/com/datadog/android/core/internal/thread/BackPressuredBlockingQueue.kt @@ -0,0 +1,91 @@ +/* + * Unless explicitly stated otherwise all files in this repository are licensed under the Apache License Version 2.0. + * This product includes software developed at Datadog (https://www.datadoghq.com/). + * Copyright 2016-Present Datadog, Inc. + */ + +package com.datadog.android.core.internal.thread + +import com.datadog.android.api.InternalLogger +import com.datadog.android.core.configuration.BackPressureMitigation +import com.datadog.android.core.configuration.BackPressureStrategy +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.TimeUnit + +internal class BackPressuredBlockingQueue( + private val logger: InternalLogger, + private val backPressureStrategy: BackPressureStrategy +) : LinkedBlockingQueue( + backPressureStrategy.capacity +) { + override fun offer(e: E): Boolean { + return addWithBackPressure(e) { + @Suppress("UnsafeThirdPartyFunctionCall") // can't have NPE here + super.offer(it) + } + } + + override fun offer(e: E, timeout: Long, unit: TimeUnit?): Boolean { + @Suppress("UnsafeThirdPartyFunctionCall") // can't have NPE here + val accepted = super.offer(e, timeout, unit) + if (!accepted) { + return offer(e) + } else { + if (remainingCapacity() == 0) { + onThresholdReached() + } + return true + } + } + + private fun addWithBackPressure( + e: E, + operation: (E) -> Boolean + ): Boolean { + val remainingCapacity = remainingCapacity() + return if (remainingCapacity == 0) { + when (backPressureStrategy.backpressureMitigation) { + BackPressureMitigation.DROP_OLDEST -> { + val first = take() + onItemDropped(first) + operation(e) + } + + BackPressureMitigation.IGNORE_NEWEST -> { + onItemDropped(e) + true + } + } + } else { + if (remainingCapacity == 1) { + onThresholdReached() + } + operation(e) + } + } + + private fun onThresholdReached() { + backPressureStrategy.onThresholdReached() + logger.log( + level = InternalLogger.Level.WARN, + targets = listOf(InternalLogger.Target.MAINTAINER, InternalLogger.Target.TELEMETRY), + messageBuilder = { "BackPressuredBlockingQueue reached capacity:${backPressureStrategy.capacity}" }, + throwable = null, + onlyOnce = false, + additionalProperties = mapOf("backpressure.capacity" to backPressureStrategy.capacity) + ) + } + + private fun onItemDropped(item: E) { + backPressureStrategy.onItemDropped(item) + + logger.log( + level = InternalLogger.Level.ERROR, + targets = listOf(InternalLogger.Target.MAINTAINER, InternalLogger.Target.TELEMETRY), + messageBuilder = { "Dropped item in BackPressuredBlockingQueue queue: $item" }, + throwable = null, + onlyOnce = false, + additionalProperties = mapOf("backpressure.capacity" to backPressureStrategy.capacity) + ) + } +} diff --git a/dd-sdk-android-core/src/main/kotlin/com/datadog/android/core/thread/FlushableExecutorService.kt b/dd-sdk-android-core/src/main/kotlin/com/datadog/android/core/thread/FlushableExecutorService.kt new file mode 100644 index 0000000000..e2d0853731 --- /dev/null +++ b/dd-sdk-android-core/src/main/kotlin/com/datadog/android/core/thread/FlushableExecutorService.kt @@ -0,0 +1,44 @@ +/* + * Unless explicitly stated otherwise all files in this repository are licensed under the Apache License Version 2.0. + * This product includes software developed at Datadog (https://www.datadoghq.com/). + * Copyright 2016-Present Datadog, Inc. + */ + +package com.datadog.android.core.thread + +import com.datadog.android.api.InternalLogger +import com.datadog.android.core.configuration.BackPressureStrategy +import java.util.concurrent.ExecutorService + +/** + * An [ExecutorService] which backing queue can be drained to a collection. + * + */ +interface FlushableExecutorService : ExecutorService { + + /** + * Drains the queue backing this [ExecutorService] into the provided mutable collection. + * After this operation, the executor's queue will be empty, and all the runnable entries added + * to the destination won't have run yet. + * + * @param destination the collection into which [Runnable] in the queue should be drained to. + */ + fun drainTo(destination: MutableCollection) + + /** + * A Factory for a [FlushableExecutorService] implementation. + */ + fun interface Factory { + + /** + * Create an instance of [FlushableExecutorService]. + * @param internalLogger the internal logger + * @param backPressureStrategy the strategy to handle back-pressure + * @return the instance + */ + fun create( + internalLogger: InternalLogger, + backPressureStrategy: BackPressureStrategy + ): FlushableExecutorService + } +} diff --git a/dd-sdk-android-core/src/test/kotlin/com/datadog/android/core/internal/thread/BackPressureExecutorServiceTest.kt b/dd-sdk-android-core/src/test/kotlin/com/datadog/android/core/internal/thread/BackPressureExecutorServiceTest.kt new file mode 100644 index 0000000000..f2d393db1e --- /dev/null +++ b/dd-sdk-android-core/src/test/kotlin/com/datadog/android/core/internal/thread/BackPressureExecutorServiceTest.kt @@ -0,0 +1,35 @@ +/* + * Unless explicitly stated otherwise all files in this repository are licensed under the Apache License Version 2.0. + * This product includes software developed at Datadog (https://www.datadoghq.com/). + * Copyright 2016-Present Datadog, Inc. + */ + +package com.datadog.android.core.internal.thread + +import com.datadog.android.core.configuration.BackPressureMitigation +import com.datadog.android.core.configuration.BackPressureStrategy +import fr.xgouchet.elmyr.annotation.Forgery +import fr.xgouchet.elmyr.annotation.IntForgery +import org.mockito.Mockito.mock + +internal class BackPressureExecutorServiceTest : + AbstractLoggingExecutorServiceTest() { + + @IntForgery(128, 1024) + var fakeBackPressureCapacity: Int = 0 + + @Forgery + lateinit var fakeBackPressureMitigation: BackPressureMitigation + + override fun createTestedExecutorService(): BackPressureExecutorService { + return BackPressureExecutorService( + mockInternalLogger, + BackPressureStrategy( + fakeBackPressureCapacity, + mock(), + mock(), + fakeBackPressureMitigation + ) + ) + } +} diff --git a/dd-sdk-android-core/src/test/kotlin/com/datadog/android/core/internal/thread/DropOldestBackPressuredBlockingQueueTest.kt b/dd-sdk-android-core/src/test/kotlin/com/datadog/android/core/internal/thread/DropOldestBackPressuredBlockingQueueTest.kt new file mode 100644 index 0000000000..ce6bfb5cac --- /dev/null +++ b/dd-sdk-android-core/src/test/kotlin/com/datadog/android/core/internal/thread/DropOldestBackPressuredBlockingQueueTest.kt @@ -0,0 +1,420 @@ +package com.datadog.android.core.internal.thread + +import com.datadog.android.api.InternalLogger +import com.datadog.android.core.configuration.BackPressureMitigation +import com.datadog.android.core.configuration.BackPressureStrategy +import com.datadog.android.utils.forge.Configurator +import fr.xgouchet.elmyr.annotation.IntForgery +import fr.xgouchet.elmyr.annotation.LongForgery +import fr.xgouchet.elmyr.annotation.StringForgery +import fr.xgouchet.elmyr.junit5.ForgeConfiguration +import fr.xgouchet.elmyr.junit5.ForgeExtension +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.junit.jupiter.api.extension.Extensions +import org.mockito.Mock +import org.mockito.junit.jupiter.MockitoExtension +import org.mockito.junit.jupiter.MockitoSettings +import org.mockito.kotlin.verify +import org.mockito.kotlin.verifyNoInteractions +import org.mockito.quality.Strictness +import java.lang.Thread.sleep +import java.util.concurrent.BlockingQueue +import java.util.concurrent.TimeUnit +import kotlin.math.min + +@Extensions( + ExtendWith(MockitoExtension::class), + ExtendWith(ForgeExtension::class) +) +@MockitoSettings(strictness = Strictness.LENIENT) +@ForgeConfiguration(Configurator::class) +class DropOldestBackPressuredBlockingQueueTest { + + lateinit var testedQueue: BlockingQueue + + @Mock + lateinit var mockLogger: InternalLogger + + @Mock + lateinit var mockOnThresholdReached: () -> Unit + + @Mock + lateinit var mockOnItemsDropped: (Any) -> Unit + + @IntForgery(8, 16) + var fakeBackPressureThreshold: Int = 0 + + @BeforeEach + fun `set up`() { + testedQueue = BackPressuredBlockingQueue( + mockLogger, + BackPressureStrategy( + fakeBackPressureThreshold, + mockOnThresholdReached, + mockOnItemsDropped, + BackPressureMitigation.DROP_OLDEST + ) + ) + } + + // region add(e) + + @Test + fun `M accept item W add() {empty}`( + @StringForgery fakeNewItem: String + ) { + // Given + + // When + val result = testedQueue.add(fakeNewItem) + + // Then + assertThat(result).isTrue() + assertThat(testedQueue).contains(fakeNewItem) + verifyNoInteractions(mockOnItemsDropped, mockOnThresholdReached, mockLogger) + } + + @Test + fun `M accept item W add() { not reached threshold yet }`( + @StringForgery fakeItemList: List, + @StringForgery fakeNewItem: String + ) { + // Given + val previousCount = min(fakeBackPressureThreshold / 2, fakeItemList.size) + fakeItemList.take(previousCount).forEach { + testedQueue.add(it) + } + + // When + val result = testedQueue.add(fakeNewItem) + + // Then + assertThat(result).isTrue() + assertThat(testedQueue).hasSize(previousCount + 1) + assertThat(testedQueue).contains(fakeNewItem) + verifyNoInteractions(mockOnItemsDropped, mockOnThresholdReached, mockLogger) + } + + @Test + fun `M accept item W add() { reaching threshold on last item }`( + @StringForgery fakeItemList: List, + @StringForgery fakeNewItem: String + ) { + // Given + for (i in 1 until fakeBackPressureThreshold) { + testedQueue.add(fakeItemList[i % fakeItemList.size]) + } + + // When + val result = testedQueue.add(fakeNewItem) + + // Then + assertThat(result).isTrue() + assertThat(testedQueue).hasSize(fakeBackPressureThreshold) + assertThat(testedQueue).contains(fakeNewItem) + verify(mockOnThresholdReached).invoke() + verifyNoInteractions(mockOnItemsDropped) + } + + @Test + fun `M drop old item and accept W add() { queue already at threshold }`( + @StringForgery fakeItemList: List, + @StringForgery fakeNewItem: String + ) { + // Given + for (i in 0 until fakeBackPressureThreshold) { + testedQueue.add(fakeItemList[i % fakeItemList.size]) + } + + // When + val result = testedQueue.add(fakeNewItem) + + // Then + assertThat(result).isTrue() + assertThat(testedQueue).hasSize(fakeBackPressureThreshold) + assertThat(testedQueue).contains(fakeNewItem) + verify(mockOnThresholdReached).invoke() + verify(mockOnItemsDropped).invoke(fakeItemList.first()) + } + + // endregion + + // region offer(e) + + @Test + fun `M accept item W offer() {empty}`( + @StringForgery fakeNewItem: String + ) { + // Given + + // When + val result = testedQueue.offer(fakeNewItem) + + // Then + assertThat(result).isTrue() + assertThat(testedQueue).contains(fakeNewItem) + verifyNoInteractions(mockOnItemsDropped, mockOnThresholdReached, mockLogger) + } + + @Test + fun `M accept item W offer() { not reached threshold yet }`( + @StringForgery fakeItemList: List, + @StringForgery fakeNewItem: String + ) { + // Given + val previousCount = min(fakeBackPressureThreshold / 2, fakeItemList.size) + fakeItemList.take(previousCount).forEach { + testedQueue.add(it) + } + + // When + val result = testedQueue.offer(fakeNewItem) + + // Then + assertThat(result).isTrue() + assertThat(testedQueue).hasSize(previousCount + 1) + assertThat(testedQueue).contains(fakeNewItem) + verifyNoInteractions(mockOnItemsDropped, mockOnThresholdReached, mockLogger) + } + + @Test + fun `M accept item W offer() { reaching threshold on last item }`( + @StringForgery fakeItemList: List, + @StringForgery fakeNewItem: String + ) { + // Given + for (i in 1 until fakeBackPressureThreshold) { + testedQueue.add(fakeItemList[i % fakeItemList.size]) + } + + // When + val result = testedQueue.offer(fakeNewItem) + + // Then + assertThat(result).isTrue() + assertThat(testedQueue).hasSize(fakeBackPressureThreshold) + assertThat(testedQueue).contains(fakeNewItem) + verify(mockOnThresholdReached).invoke() + verifyNoInteractions(mockOnItemsDropped) + } + + @Test + fun `M drop old item and accept W offer() { queue already at threshold }`( + @StringForgery fakeItemList: List, + @StringForgery fakeNewItem: String + ) { + // Given + for (i in 0 until fakeBackPressureThreshold) { + testedQueue.add(fakeItemList[i % fakeItemList.size]) + } + + // When + val result = testedQueue.offer(fakeNewItem) + + // Then + assertThat(result).isTrue() + assertThat(testedQueue).hasSize(fakeBackPressureThreshold) + assertThat(testedQueue).contains(fakeNewItem) + verify(mockOnThresholdReached).invoke() + verify(mockOnItemsDropped).invoke(fakeItemList.first()) + } + + // endregion + + // region offer(e, timeout) + + @Test + fun `M accept item W offer() {empty}`( + @StringForgery fakeNewItem: String, + @LongForgery(10, 100) fakeTimeoutMs: Long + ) { + // Given + + // When + val result = testedQueue.offer(fakeNewItem, fakeTimeoutMs, TimeUnit.MILLISECONDS) + + // Then + assertThat(result).isTrue() + assertThat(testedQueue).contains(fakeNewItem) + verifyNoInteractions(mockOnItemsDropped, mockOnThresholdReached, mockLogger) + } + + @Test + fun `M accept item W offer() { not reached threshold yet }`( + @StringForgery fakeItemList: List, + @StringForgery fakeNewItem: String, + @LongForgery(10, 100) fakeTimeoutMs: Long + ) { + // Given + val previousCount = min(fakeBackPressureThreshold / 2, fakeItemList.size) + fakeItemList.take(previousCount).forEach { + testedQueue.add(it) + } + + // When + val result = testedQueue.offer(fakeNewItem, fakeTimeoutMs, TimeUnit.MILLISECONDS) + + // Then + assertThat(result).isTrue() + assertThat(testedQueue).hasSize(previousCount + 1) + assertThat(testedQueue).contains(fakeNewItem) + verifyNoInteractions(mockOnItemsDropped, mockOnThresholdReached, mockLogger) + } + + @Test + fun `M accept item W offer() { reaching threshold on last item }`( + @StringForgery fakeItemList: List, + @StringForgery fakeNewItem: String, + @LongForgery(10, 100) fakeTimeoutMs: Long + ) { + // Given + for (i in 1 until fakeBackPressureThreshold) { + testedQueue.add(fakeItemList[i % fakeItemList.size]) + } + + // When + val result = testedQueue.offer(fakeNewItem, fakeTimeoutMs, TimeUnit.MILLISECONDS) + + // Then + assertThat(result).isTrue() + assertThat(testedQueue).hasSize(fakeBackPressureThreshold) + assertThat(testedQueue).contains(fakeNewItem) + verify(mockOnThresholdReached).invoke() + verifyNoInteractions(mockOnItemsDropped) + } + + @Test + fun `M accept item W offer() { queue already at threshold, waiting for space }`( + @StringForgery fakeItemList: List, + @StringForgery fakeNewItem: String, + @LongForgery(10, 100) fakeTimeoutMs: Long + ) { + // Given + for (i in 1 until fakeBackPressureThreshold) { + testedQueue.add(fakeItemList[i % fakeItemList.size]) + } + + // When + Thread { + sleep(fakeTimeoutMs - 5) + testedQueue.take() + }.start() + val result = testedQueue.offer(fakeNewItem, fakeTimeoutMs, TimeUnit.MILLISECONDS) + + // Then + assertThat(result).isTrue() + assertThat(testedQueue).hasSize(fakeBackPressureThreshold) + assertThat(testedQueue).contains(fakeNewItem) + verify(mockOnThresholdReached).invoke() + verifyNoInteractions(mockOnItemsDropped) + } + + @Test + fun `M drop old item and accept W offer() { queue already at threshold }`( + @StringForgery fakeItemList: List, + @StringForgery fakeNewItem: String, + @LongForgery(10, 100) fakeTimeoutMs: Long + ) { + // Given + for (i in 0 until fakeBackPressureThreshold) { + testedQueue.add(fakeItemList[i % fakeItemList.size]) + } + + // When + val result = testedQueue.offer(fakeNewItem, fakeTimeoutMs, TimeUnit.MILLISECONDS) + + // Then + assertThat(result).isTrue() + assertThat(testedQueue).hasSize(fakeBackPressureThreshold) + assertThat(testedQueue).contains(fakeNewItem) + verify(mockOnThresholdReached).invoke() + verify(mockOnItemsDropped).invoke(fakeItemList.first()) + } + + // endregion + + // region put(e) + + @Test + fun `M accept item W put() {empty}`( + @StringForgery fakeNewItem: String + ) { + // Given + + // When + testedQueue.put(fakeNewItem) + + // Then + assertThat(testedQueue).contains(fakeNewItem) + verifyNoInteractions(mockOnItemsDropped, mockOnThresholdReached, mockLogger) + } + + @Test + fun `M accept item W put() { not reached threshold yet }`( + @StringForgery fakeItemList: List, + @StringForgery fakeNewItem: String + ) { + // Given + val previousCount = min(fakeBackPressureThreshold / 2, fakeItemList.size) + fakeItemList.take(previousCount).forEach { + testedQueue.put(it) + } + + // When + testedQueue.put(fakeNewItem) + + // Then + assertThat(testedQueue).hasSize(previousCount + 1) + assertThat(testedQueue).contains(fakeNewItem) + verifyNoInteractions(mockOnItemsDropped, mockOnThresholdReached, mockLogger) + } + + @Test + fun `M accept item W put() { reaching threshold on last item }`( + @StringForgery fakeItemList: List, + @StringForgery fakeNewItem: String + ) { + // Given + for (i in 1 until fakeBackPressureThreshold) { + testedQueue.put(fakeItemList[i % fakeItemList.size]) + } + + // When + testedQueue.put(fakeNewItem) + + // Then + assertThat(testedQueue).hasSize(fakeBackPressureThreshold) + assertThat(testedQueue).contains(fakeNewItem) + verifyNoInteractions(mockOnItemsDropped, mockOnThresholdReached, mockLogger) + } + + @Test + fun `M wait and accept W put() { queue already at threshold }`( + @StringForgery fakeItemList: List, + @StringForgery fakeNewItem: String + ) { + // Given + for (i in 0 until fakeBackPressureThreshold) { + testedQueue.put(fakeItemList[i % fakeItemList.size]) + } + + // When + Thread { + // PutInserts the specified element into this queue, waiting if necessary for space to become available. + // In order to not wait indefinitely, we need to remove an element + sleep(100) + testedQueue.take() + }.start() + testedQueue.put(fakeNewItem) + + // Then + assertThat(testedQueue).hasSize(fakeBackPressureThreshold) + assertThat(testedQueue).contains(fakeNewItem) + verifyNoInteractions(mockOnItemsDropped, mockOnThresholdReached, mockLogger) + } + + // endregion +} diff --git a/dd-sdk-android-core/src/test/kotlin/com/datadog/android/core/internal/thread/IgnoreNewestBackPressuredBlockingQueueTest.kt b/dd-sdk-android-core/src/test/kotlin/com/datadog/android/core/internal/thread/IgnoreNewestBackPressuredBlockingQueueTest.kt new file mode 100644 index 0000000000..c943ac40d1 --- /dev/null +++ b/dd-sdk-android-core/src/test/kotlin/com/datadog/android/core/internal/thread/IgnoreNewestBackPressuredBlockingQueueTest.kt @@ -0,0 +1,420 @@ +package com.datadog.android.core.internal.thread + +import com.datadog.android.api.InternalLogger +import com.datadog.android.core.configuration.BackPressureMitigation +import com.datadog.android.core.configuration.BackPressureStrategy +import com.datadog.android.utils.forge.Configurator +import fr.xgouchet.elmyr.annotation.IntForgery +import fr.xgouchet.elmyr.annotation.LongForgery +import fr.xgouchet.elmyr.annotation.StringForgery +import fr.xgouchet.elmyr.junit5.ForgeConfiguration +import fr.xgouchet.elmyr.junit5.ForgeExtension +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.junit.jupiter.api.extension.Extensions +import org.mockito.Mock +import org.mockito.junit.jupiter.MockitoExtension +import org.mockito.junit.jupiter.MockitoSettings +import org.mockito.kotlin.verify +import org.mockito.kotlin.verifyNoInteractions +import org.mockito.quality.Strictness +import java.lang.Thread.sleep +import java.util.concurrent.BlockingQueue +import java.util.concurrent.TimeUnit +import kotlin.math.min + +@Extensions( + ExtendWith(MockitoExtension::class), + ExtendWith(ForgeExtension::class) +) +@MockitoSettings(strictness = Strictness.LENIENT) +@ForgeConfiguration(Configurator::class) +class IgnoreNewestBackPressuredBlockingQueueTest { + + lateinit var testedQueue: BlockingQueue + + @Mock + lateinit var mockLogger: InternalLogger + + @Mock + lateinit var mockOnThresholdReached: () -> Unit + + @Mock + lateinit var mockOnItemsDropped: (Any) -> Unit + + @IntForgery(8, 16) + var fakeBackPressureThreshold: Int = 0 + + @BeforeEach + fun `set up`() { + testedQueue = BackPressuredBlockingQueue( + mockLogger, + BackPressureStrategy( + fakeBackPressureThreshold, + mockOnThresholdReached, + mockOnItemsDropped, + BackPressureMitigation.IGNORE_NEWEST + ) + ) + } + + // region add(e) + + @Test + fun `M accept item W add() {empty}`( + @StringForgery fakeNewItem: String + ) { + // Given + + // When + val result = testedQueue.add(fakeNewItem) + + // Then + assertThat(result).isTrue() + assertThat(testedQueue).contains(fakeNewItem) + verifyNoInteractions(mockOnItemsDropped, mockOnThresholdReached, mockLogger) + } + + @Test + fun `M accept item W add() { not reached threshold yet }`( + @StringForgery fakeItemList: List, + @StringForgery fakeNewItem: String + ) { + // Given + val previousCount = min(fakeBackPressureThreshold / 2, fakeItemList.size) + fakeItemList.take(previousCount).forEach { + testedQueue.add(it) + } + + // When + val result = testedQueue.add(fakeNewItem) + + // Then + assertThat(result).isTrue() + assertThat(testedQueue).hasSize(previousCount + 1) + assertThat(testedQueue).contains(fakeNewItem) + verifyNoInteractions(mockOnItemsDropped, mockOnThresholdReached, mockLogger) + } + + @Test + fun `M accept item W add() { reaching threshold on last item }`( + @StringForgery fakeItemList: List, + @StringForgery fakeNewItem: String + ) { + // Given + for (i in 1 until fakeBackPressureThreshold) { + testedQueue.add(fakeItemList[i % fakeItemList.size]) + } + + // When + val result = testedQueue.add(fakeNewItem) + + // Then + assertThat(result).isTrue() + assertThat(testedQueue).hasSize(fakeBackPressureThreshold) + assertThat(testedQueue).contains(fakeNewItem) + verify(mockOnThresholdReached).invoke() + verifyNoInteractions(mockOnItemsDropped) + } + + @Test + fun `M drop old item and accept W add() { queue already at threshold }`( + @StringForgery fakeItemList: List, + @StringForgery fakeNewItem: String + ) { + // Given + for (i in 0 until fakeBackPressureThreshold) { + testedQueue.add(fakeItemList[i % fakeItemList.size]) + } + + // When + val result = testedQueue.add(fakeNewItem) + + // Then + assertThat(result).isTrue() + assertThat(testedQueue).hasSize(fakeBackPressureThreshold) + assertThat(testedQueue).doesNotContain(fakeNewItem) + verify(mockOnThresholdReached).invoke() + verify(mockOnItemsDropped).invoke(fakeNewItem) + } + + // endregion + + // region offer(e) + + @Test + fun `M accept item W offer() {empty}`( + @StringForgery fakeNewItem: String + ) { + // Given + + // When + val result = testedQueue.offer(fakeNewItem) + + // Then + assertThat(result).isTrue() + assertThat(testedQueue).contains(fakeNewItem) + verifyNoInteractions(mockOnItemsDropped, mockOnThresholdReached, mockLogger) + } + + @Test + fun `M accept item W offer() { not reached threshold yet }`( + @StringForgery fakeItemList: List, + @StringForgery fakeNewItem: String + ) { + // Given + val previousCount = min(fakeBackPressureThreshold / 2, fakeItemList.size) + fakeItemList.take(previousCount).forEach { + testedQueue.add(it) + } + + // When + val result = testedQueue.offer(fakeNewItem) + + // Then + assertThat(result).isTrue() + assertThat(testedQueue).hasSize(previousCount + 1) + assertThat(testedQueue).contains(fakeNewItem) + verifyNoInteractions(mockOnItemsDropped, mockOnThresholdReached, mockLogger) + } + + @Test + fun `M accept item W offer() { reaching threshold on last item }`( + @StringForgery fakeItemList: List, + @StringForgery fakeNewItem: String + ) { + // Given + for (i in 1 until fakeBackPressureThreshold) { + testedQueue.add(fakeItemList[i % fakeItemList.size]) + } + + // When + val result = testedQueue.offer(fakeNewItem) + + // Then + assertThat(result).isTrue() + assertThat(testedQueue).hasSize(fakeBackPressureThreshold) + assertThat(testedQueue).contains(fakeNewItem) + verify(mockOnThresholdReached).invoke() + verifyNoInteractions(mockOnItemsDropped) + } + + @Test + fun `M drop old item and accept W offer() { queue already at threshold }`( + @StringForgery fakeItemList: List, + @StringForgery fakeNewItem: String + ) { + // Given + for (i in 0 until fakeBackPressureThreshold) { + testedQueue.add(fakeItemList[i % fakeItemList.size]) + } + + // When + val result = testedQueue.offer(fakeNewItem) + + // Then + assertThat(result).isTrue() + assertThat(testedQueue).hasSize(fakeBackPressureThreshold) + assertThat(testedQueue).doesNotContain(fakeNewItem) + verify(mockOnThresholdReached).invoke() + verify(mockOnItemsDropped).invoke(fakeNewItem) + } + + // endregion + + // region offer(e, timeout) + + @Test + fun `M accept item W offer() {empty}`( + @StringForgery fakeNewItem: String, + @LongForgery(10, 100) fakeTimeoutMs: Long + ) { + // Given + + // When + val result = testedQueue.offer(fakeNewItem, fakeTimeoutMs, TimeUnit.MILLISECONDS) + + // Then + assertThat(result).isTrue() + assertThat(testedQueue).contains(fakeNewItem) + verifyNoInteractions(mockOnItemsDropped, mockOnThresholdReached, mockLogger) + } + + @Test + fun `M accept item W offer() { not reached threshold yet }`( + @StringForgery fakeItemList: List, + @StringForgery fakeNewItem: String, + @LongForgery(10, 100) fakeTimeoutMs: Long + ) { + // Given + val previousCount = min(fakeBackPressureThreshold / 2, fakeItemList.size) + fakeItemList.take(previousCount).forEach { + testedQueue.add(it) + } + + // When + val result = testedQueue.offer(fakeNewItem, fakeTimeoutMs, TimeUnit.MILLISECONDS) + + // Then + assertThat(result).isTrue() + assertThat(testedQueue).hasSize(previousCount + 1) + assertThat(testedQueue).contains(fakeNewItem) + verifyNoInteractions(mockOnItemsDropped, mockOnThresholdReached, mockLogger) + } + + @Test + fun `M accept item W offer() { reaching threshold on last item }`( + @StringForgery fakeItemList: List, + @StringForgery fakeNewItem: String, + @LongForgery(10, 100) fakeTimeoutMs: Long + ) { + // Given + for (i in 1 until fakeBackPressureThreshold) { + testedQueue.add(fakeItemList[i % fakeItemList.size]) + } + + // When + val result = testedQueue.offer(fakeNewItem, fakeTimeoutMs, TimeUnit.MILLISECONDS) + + // Then + assertThat(result).isTrue() + assertThat(testedQueue).hasSize(fakeBackPressureThreshold) + assertThat(testedQueue).contains(fakeNewItem) + verify(mockOnThresholdReached).invoke() + verifyNoInteractions(mockOnItemsDropped) + } + + @Test + fun `M accept item W offer() { queue already at threshold, waiting for space }`( + @StringForgery fakeItemList: List, + @StringForgery fakeNewItem: String, + @LongForgery(10, 100) fakeTimeoutMs: Long + ) { + // Given + for (i in 1 until fakeBackPressureThreshold) { + testedQueue.add(fakeItemList[i % fakeItemList.size]) + } + + // When + Thread { + sleep(fakeTimeoutMs - 5) + testedQueue.take() + }.start() + val result = testedQueue.offer(fakeNewItem, fakeTimeoutMs, TimeUnit.MILLISECONDS) + + // Then + assertThat(result).isTrue() + assertThat(testedQueue).hasSize(fakeBackPressureThreshold) + assertThat(testedQueue).contains(fakeNewItem) + verify(mockOnThresholdReached).invoke() + verifyNoInteractions(mockOnItemsDropped) + } + + @Test + fun `M drop old item and accept W offer() { queue already at threshold }`( + @StringForgery fakeItemList: List, + @StringForgery fakeNewItem: String, + @LongForgery(10, 100) fakeTimeoutMs: Long + ) { + // Given + for (i in 0 until fakeBackPressureThreshold) { + testedQueue.add(fakeItemList[i % fakeItemList.size]) + } + + // When + val result = testedQueue.offer(fakeNewItem, fakeTimeoutMs, TimeUnit.MILLISECONDS) + + // Then + assertThat(result).isTrue() + assertThat(testedQueue).hasSize(fakeBackPressureThreshold) + assertThat(testedQueue).doesNotContain(fakeNewItem) + verify(mockOnThresholdReached).invoke() + verify(mockOnItemsDropped).invoke(fakeNewItem) + } + + // endregion + + // region put(e) + + @Test + fun `M accept item W put() {empty}`( + @StringForgery fakeNewItem: String + ) { + // Given + + // When + testedQueue.put(fakeNewItem) + + // Then + assertThat(testedQueue).contains(fakeNewItem) + verifyNoInteractions(mockOnItemsDropped, mockOnThresholdReached, mockLogger) + } + + @Test + fun `M accept item W put() { not reached threshold yet }`( + @StringForgery fakeItemList: List, + @StringForgery fakeNewItem: String + ) { + // Given + val previousCount = min(fakeBackPressureThreshold / 2, fakeItemList.size) + fakeItemList.take(previousCount).forEach { + testedQueue.put(it) + } + + // When + testedQueue.put(fakeNewItem) + + // Then + assertThat(testedQueue).hasSize(previousCount + 1) + assertThat(testedQueue).contains(fakeNewItem) + verifyNoInteractions(mockOnItemsDropped, mockOnThresholdReached, mockLogger) + } + + @Test + fun `M accept item W put() { reaching threshold on last item }`( + @StringForgery fakeItemList: List, + @StringForgery fakeNewItem: String + ) { + // Given + for (i in 1 until fakeBackPressureThreshold) { + testedQueue.put(fakeItemList[i % fakeItemList.size]) + } + + // When + testedQueue.put(fakeNewItem) + + // Then + assertThat(testedQueue).hasSize(fakeBackPressureThreshold) + assertThat(testedQueue).contains(fakeNewItem) + verifyNoInteractions(mockOnItemsDropped, mockOnThresholdReached, mockLogger) + } + + @Test + fun `M wait and accept W put() { queue already at threshold }`( + @StringForgery fakeItemList: List, + @StringForgery fakeNewItem: String + ) { + // Given + for (i in 0 until fakeBackPressureThreshold) { + testedQueue.put(fakeItemList[i % fakeItemList.size]) + } + + // When + Thread { + // PutInserts the specified element into this queue, waiting if necessary for space to become available. + // In order to not wait indefinitely, we need to remove an element + sleep(100) + testedQueue.take() + }.start() + testedQueue.put(fakeNewItem) + + // Then + assertThat(testedQueue).hasSize(fakeBackPressureThreshold) + assertThat(testedQueue).contains(fakeNewItem) + verifyNoInteractions(mockOnItemsDropped, mockOnThresholdReached, mockLogger) + } + + // endregion +} From ed36cff96cc0e09469e11f5c0430f6f36c7db5f4 Mon Sep 17 00:00:00 2001 From: "Xavier F. Gouchet" Date: Mon, 25 Mar 2024 15:32:33 +0100 Subject: [PATCH 2/4] RUM-3670 Let user set a custom BackPressureStrategy --- .../core/configuration/Configuration.kt | 31 +++++++++++++++++-- .../configuration/ConfigurationBuilderTest.kt | 29 +++++++++-------- .../forge/ConfigurationCoreForgeryFactory.kt | 10 +++++- 3 files changed, 51 insertions(+), 19 deletions(-) diff --git a/dd-sdk-android-core/src/main/kotlin/com/datadog/android/core/configuration/Configuration.kt b/dd-sdk-android-core/src/main/kotlin/com/datadog/android/core/configuration/Configuration.kt index 502eda3262..35093aea96 100644 --- a/dd-sdk-android-core/src/main/kotlin/com/datadog/android/core/configuration/Configuration.kt +++ b/dd-sdk-android-core/src/main/kotlin/com/datadog/android/core/configuration/Configuration.kt @@ -41,7 +41,8 @@ internal constructor( val encryption: Encryption?, val site: DatadogSite, val batchProcessingLevel: BatchProcessingLevel, - val persistenceStrategyFactory: PersistenceStrategy.Factory? + val persistenceStrategyFactory: PersistenceStrategy.Factory?, + val backpressureStrategy: BackPressureStrategy ) // region Builder @@ -70,6 +71,7 @@ internal constructor( private var coreConfig = DEFAULT_CORE_CONFIG private var crashReportsEnabled: Boolean = true + private var backpressureStrategy: BackPressureStrategy = DEFAULT_BACKPRESSURE_STRATEGY internal var hostsSanitizer = HostsSanitizer() @@ -236,7 +238,7 @@ internal constructor( } /** - * Allows to control if JVM crashes are tracked or not. Default value is [true]. + * Allows to control if JVM crashes are tracked or not. Default value is `true`. * * @param crashReportsEnabled whether crashes are tracked and sent to Datadog */ @@ -245,6 +247,19 @@ internal constructor( return this } + /** + * Sets the strategy to handle scalability issues. + * Many operations (data processing, event I/O, …) are queued in background threads. + * This configuration lets one decide how to handle the edge case when the queue starts growing, which can lead + * to a lot of memory usage, delayed processing, and possibly OOM or ANR. + * @param backpressureStrategy the backpressure strategy (default strategy ignores new tasks if a queue reaches + * 1024 items) + */ + fun setBackpressureStrategy(backpressureStrategy: BackPressureStrategy): Builder { + this.backpressureStrategy = backpressureStrategy + return this + } + internal fun allowClearTextHttp(): Builder { coreConfig = coreConfig.copy( needsClearTextHttp = true @@ -262,6 +277,15 @@ internal constructor( */ private const val NO_VARIANT: String = "" + private const val DEFAULT_BACKPRESSURE_THRESHOLD = 1024 + + internal val DEFAULT_BACKPRESSURE_STRATEGY = BackPressureStrategy( + DEFAULT_BACKPRESSURE_THRESHOLD, + {}, + {}, + BackPressureMitigation.IGNORE_NEWEST + ) + internal val DEFAULT_CORE_CONFIG = Core( needsClearTextHttp = false, enableDeveloperModeWhenDebuggable = false, @@ -273,7 +297,8 @@ internal constructor( encryption = null, site = DatadogSite.US1, batchProcessingLevel = BatchProcessingLevel.MEDIUM, - persistenceStrategyFactory = null + persistenceStrategyFactory = null, + backpressureStrategy = DEFAULT_BACKPRESSURE_STRATEGY ) internal const val NETWORK_REQUESTS_TRACKING_FEATURE_NAME = "Network requests" diff --git a/dd-sdk-android-core/src/test/kotlin/com/datadog/android/core/configuration/ConfigurationBuilderTest.kt b/dd-sdk-android-core/src/test/kotlin/com/datadog/android/core/configuration/ConfigurationBuilderTest.kt index a90fe04e4f..5c2b81a12b 100644 --- a/dd-sdk-android-core/src/test/kotlin/com/datadog/android/core/configuration/ConfigurationBuilderTest.kt +++ b/dd-sdk-android-core/src/test/kotlin/com/datadog/android/core/configuration/ConfigurationBuilderTest.kt @@ -63,21 +63,20 @@ internal class ConfigurationBuilderTest { val config = testedBuilder.build() // Then - assertThat(config.coreConfig).isEqualTo( - Configuration.Core( - needsClearTextHttp = false, - enableDeveloperModeWhenDebuggable = false, - firstPartyHostsWithHeaderTypes = emptyMap(), - batchSize = BatchSize.MEDIUM, - uploadFrequency = UploadFrequency.AVERAGE, - proxy = null, - proxyAuth = Authenticator.NONE, - encryption = null, - site = DatadogSite.US1, - batchProcessingLevel = BatchProcessingLevel.MEDIUM, - persistenceStrategyFactory = null - ) - ) + assertThat(config.coreConfig.needsClearTextHttp).isFalse() + assertThat(config.coreConfig.enableDeveloperModeWhenDebuggable).isFalse() + assertThat(config.coreConfig.firstPartyHostsWithHeaderTypes).isEmpty() + assertThat(config.coreConfig.batchSize).isEqualTo(BatchSize.MEDIUM) + assertThat(config.coreConfig.uploadFrequency).isEqualTo(UploadFrequency.AVERAGE) + assertThat(config.coreConfig.proxy).isNull() + assertThat(config.coreConfig.proxyAuth).isEqualTo(Authenticator.NONE) + assertThat(config.coreConfig.encryption).isNull() + assertThat(config.coreConfig.site).isEqualTo(DatadogSite.US1) + assertThat(config.coreConfig.batchProcessingLevel).isEqualTo(BatchProcessingLevel.MEDIUM) + assertThat(config.coreConfig.persistenceStrategyFactory).isNull() + assertThat(config.coreConfig.backpressureStrategy.backpressureMitigation) + .isEqualTo(BackPressureMitigation.IGNORE_NEWEST) + assertThat(config.coreConfig.backpressureStrategy.capacity).isEqualTo(1024) assertThat(config.crashReportsEnabled).isTrue assertThat(config.additionalConfig).isEmpty() } diff --git a/dd-sdk-android-core/src/test/kotlin/com/datadog/android/utils/forge/ConfigurationCoreForgeryFactory.kt b/dd-sdk-android-core/src/test/kotlin/com/datadog/android/utils/forge/ConfigurationCoreForgeryFactory.kt index 927fba83a5..be22a5f796 100644 --- a/dd-sdk-android-core/src/test/kotlin/com/datadog/android/utils/forge/ConfigurationCoreForgeryFactory.kt +++ b/dd-sdk-android-core/src/test/kotlin/com/datadog/android/utils/forge/ConfigurationCoreForgeryFactory.kt @@ -7,6 +7,8 @@ package com.datadog.android.utils.forge import com.datadog.android.DatadogSite +import com.datadog.android.core.configuration.BackPressureMitigation +import com.datadog.android.core.configuration.BackPressureStrategy import com.datadog.android.core.configuration.Configuration import com.datadog.android.core.persistence.PersistenceStrategy import com.datadog.android.security.NoOpEncryption @@ -51,7 +53,13 @@ internal class ConfigurationCoreForgeryFactory : mock().apply { whenever(create(any(), any(), any())) doReturn mock() } - } + }, + backpressureStrategy = BackPressureStrategy( + forge.aSmallInt(), + mock(), + mock(), + forge.aValueFrom(BackPressureMitigation::class.java) + ) ) } } From a29875180a6934e41edc99eb75527a0468464d35 Mon Sep 17 00:00:00 2001 From: "Xavier F. Gouchet" Date: Mon, 25 Mar 2024 18:14:31 +0100 Subject: [PATCH 3/4] RUM-3670 update detekt --- dd-sdk-android-core/api/apiSurface | 10 ++++++ .../api/dd-sdk-android-core.api | 33 +++++++++++++++++++ detekt_custom.yml | 10 ++++-- 3 files changed, 50 insertions(+), 3 deletions(-) diff --git a/dd-sdk-android-core/api/apiSurface b/dd-sdk-android-core/api/apiSurface index 75291e672d..01a9cda097 100644 --- a/dd-sdk-android-core/api/apiSurface +++ b/dd-sdk-android-core/api/apiSurface @@ -158,6 +158,11 @@ class com.datadog.android.core.SdkReference constructor(String? = null, (com.datadog.android.api.SdkCore) -> Unit = {}) fun get(): com.datadog.android.api.SdkCore? fun allowThreadDiskReads(() -> T): T +enum com.datadog.android.core.configuration.BackPressureMitigation + - DROP_OLDEST + - IGNORE_NEWEST +data class com.datadog.android.core.configuration.BackPressureStrategy + constructor(Int, () -> Unit, (Any) -> Unit, BackPressureMitigation) enum com.datadog.android.core.configuration.BatchProcessingLevel constructor(Int) - LOW @@ -184,6 +189,7 @@ data class com.datadog.android.core.configuration.Configuration fun setEncryption(com.datadog.android.security.Encryption): Builder fun setPersistenceStrategyFactory(com.datadog.android.core.persistence.PersistenceStrategy.Factory?): Builder fun setCrashReportsEnabled(Boolean): Builder + fun setBackpressureStrategy(BackPressureStrategy): Builder companion object class com.datadog.android.core.configuration.HostsSanitizer fun sanitizeHosts(List, String): List @@ -279,6 +285,10 @@ class com.datadog.android.core.sampling.RateBasedSampler : Sampler interface com.datadog.android.core.sampling.Sampler fun sample(): Boolean fun getSampleRate(): Float? +interface com.datadog.android.core.thread.FlushableExecutorService : java.util.concurrent.ExecutorService + fun drainTo(MutableCollection) + interface Factory + fun create(com.datadog.android.api.InternalLogger, com.datadog.android.core.configuration.BackPressureStrategy): FlushableExecutorService interface com.datadog.android.event.EventMapper fun map(T): T? class com.datadog.android.event.MapperSerializer : com.datadog.android.core.persistence.Serializer diff --git a/dd-sdk-android-core/api/dd-sdk-android-core.api b/dd-sdk-android-core/api/dd-sdk-android-core.api index d585a8e1fc..7d3b2123b4 100644 --- a/dd-sdk-android-core/api/dd-sdk-android-core.api +++ b/dd-sdk-android-core/api/dd-sdk-android-core.api @@ -459,6 +459,30 @@ public final class com/datadog/android/core/StrictModeExtKt { public static final fun allowThreadDiskReads (Lkotlin/jvm/functions/Function0;)Ljava/lang/Object; } +public final class com/datadog/android/core/configuration/BackPressureMitigation : java/lang/Enum { + public static final field DROP_OLDEST Lcom/datadog/android/core/configuration/BackPressureMitigation; + public static final field IGNORE_NEWEST Lcom/datadog/android/core/configuration/BackPressureMitigation; + public static fun valueOf (Ljava/lang/String;)Lcom/datadog/android/core/configuration/BackPressureMitigation; + public static fun values ()[Lcom/datadog/android/core/configuration/BackPressureMitigation; +} + +public final class com/datadog/android/core/configuration/BackPressureStrategy { + public fun (ILkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function1;Lcom/datadog/android/core/configuration/BackPressureMitigation;)V + public final fun component1 ()I + public final fun component2 ()Lkotlin/jvm/functions/Function0; + public final fun component3 ()Lkotlin/jvm/functions/Function1; + public final fun component4 ()Lcom/datadog/android/core/configuration/BackPressureMitigation; + public final fun copy (ILkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function1;Lcom/datadog/android/core/configuration/BackPressureMitigation;)Lcom/datadog/android/core/configuration/BackPressureStrategy; + public static synthetic fun copy$default (Lcom/datadog/android/core/configuration/BackPressureStrategy;ILkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function1;Lcom/datadog/android/core/configuration/BackPressureMitigation;ILjava/lang/Object;)Lcom/datadog/android/core/configuration/BackPressureStrategy; + public fun equals (Ljava/lang/Object;)Z + public final fun getBackpressureMitigation ()Lcom/datadog/android/core/configuration/BackPressureMitigation; + public final fun getCapacity ()I + public final fun getOnItemDropped ()Lkotlin/jvm/functions/Function1; + public final fun getOnThresholdReached ()Lkotlin/jvm/functions/Function0; + public fun hashCode ()I + public fun toString ()Ljava/lang/String; +} + public final class com/datadog/android/core/configuration/BatchProcessingLevel : java/lang/Enum { public static final field HIGH Lcom/datadog/android/core/configuration/BatchProcessingLevel; public static final field LOW Lcom/datadog/android/core/configuration/BatchProcessingLevel; @@ -492,6 +516,7 @@ public final class com/datadog/android/core/configuration/Configuration$Builder public synthetic fun (Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;ILkotlin/jvm/internal/DefaultConstructorMarker;)V public final fun build ()Lcom/datadog/android/core/configuration/Configuration; public final fun setAdditionalConfiguration (Ljava/util/Map;)Lcom/datadog/android/core/configuration/Configuration$Builder; + public final fun setBackpressureStrategy (Lcom/datadog/android/core/configuration/BackPressureStrategy;)Lcom/datadog/android/core/configuration/Configuration$Builder; public final fun setBatchProcessingLevel (Lcom/datadog/android/core/configuration/BatchProcessingLevel;)Lcom/datadog/android/core/configuration/Configuration$Builder; public final fun setBatchSize (Lcom/datadog/android/core/configuration/BatchSize;)Lcom/datadog/android/core/configuration/Configuration$Builder; public final fun setCrashReportsEnabled (Z)Lcom/datadog/android/core/configuration/Configuration$Builder; @@ -735,6 +760,14 @@ public abstract interface class com/datadog/android/core/sampling/Sampler { public abstract fun sample ()Z } +public abstract interface class com/datadog/android/core/thread/FlushableExecutorService : java/util/concurrent/ExecutorService { + public abstract fun drainTo (Ljava/util/Collection;)V +} + +public abstract interface class com/datadog/android/core/thread/FlushableExecutorService$Factory { + public abstract fun create (Lcom/datadog/android/api/InternalLogger;Lcom/datadog/android/core/configuration/BackPressureStrategy;)Lcom/datadog/android/core/thread/FlushableExecutorService; +} + public abstract interface class com/datadog/android/event/EventMapper { public abstract fun map (Ljava/lang/Object;)Ljava/lang/Object; } diff --git a/detekt_custom.yml b/detekt_custom.yml index 6da9c16e07..9bc1fe198e 100644 --- a/detekt_custom.yml +++ b/detekt_custom.yml @@ -172,12 +172,13 @@ datadog: - "java.lang.Thread.constructor(java.lang.Runnable, kotlin.String):java.lang.NullPointerException,java.lang.SecurityException,java.lang.IllegalArgumentException" - "java.lang.Thread.constructor(java.lang.Runnable, kotlin.String):java.lang.NullPointerException,java.lang.SecurityException,java.lang.IllegalArgumentException" - "java.lang.Thread.getAllStackTraces():java.lang.SecurityException" - - "java.lang.Thread.sleep(kotlin.Long):java.lang.IllegalArgumentException,java.lang.InterruptedException" - "java.lang.Thread.interrupt():java.lang.SecurityException" + - "java.lang.Thread.sleep(kotlin.Long):java.lang.IllegalArgumentException,java.lang.InterruptedException" - "java.util.concurrent.BlockingQueue.drainTo(kotlin.collections.MutableCollection):java.lang.UnsupportedOperationException,java.lang.ClassCastException,java.lang.NullPointerException,java.lang.IllegalArgumentException" + - "java.util.concurrent.BlockingQueue.drainTo(kotlin.collections.MutableCollection?):java.lang.UnsupportedOperationException,java.lang.ClassCastException,java.lang.NullPointerException,java.lang.IllegalArgumentException" - "java.util.concurrent.Callable.call():java.lang.Exception" - - "java.util.concurrent.ConcurrentLinkedQueue.offer(com.datadog.android.sessionreplay.internal.async.RecordedDataQueueItem):java.lang.NullPointerException" - "java.util.concurrent.ConcurrentHashMap.remove(kotlin.String):java.lang.NullPointerException" + - "java.util.concurrent.ConcurrentLinkedQueue.offer(com.datadog.android.sessionreplay.internal.async.RecordedDataQueueItem):java.lang.NullPointerException" - "java.util.concurrent.CopyOnWriteArraySet.removeAll(kotlin.collections.Collection):java.lang.NullPointerException,java.lang.ClassCastException" - "java.util.concurrent.CountDownLatch.await(kotlin.Long, java.util.concurrent.TimeUnit?):java.lang.InterruptedException" - "java.util.concurrent.CountDownLatch.constructor(kotlin.Int):java.lang.IllegalArgumentException" @@ -186,9 +187,11 @@ datadog: - "java.util.concurrent.ExecutorService.execute(java.lang.Runnable?):java.util.concurrent.RejectedExecutionException,java.lang.NullPointerException" - "java.util.concurrent.ExecutorService.submit(java.lang.Runnable?):java.util.concurrent.RejectedExecutionException,java.lang.NullPointerException" - "java.util.concurrent.Future.get():java.lang.InterruptedException,java.util.concurrent.CancellationException,java.util.concurrent.ExecutionException" + - "java.util.concurrent.LinkedBlockingQueue.offer(kotlin.Any?):java.lang.NullPointerException" + - "java.util.concurrent.LinkedBlockingQueue.offer(kotlin.Any?, kotlin.Long, java.util.concurrent.TimeUnit?):java.lang.NullPointerException" - "java.util.concurrent.ScheduledExecutorService.schedule(java.lang.Runnable, kotlin.Long, java.util.concurrent.TimeUnit):java.util.concurrent.RejectedExecutionException,java.lang.NullPointerException" - - "java.util.concurrent.ScheduledThreadPoolExecutor.constructor(kotlin.Int):java.lang.IllegalArgumentException" - "java.util.concurrent.ScheduledThreadPoolExecutor.awaitTermination(kotlin.Long, java.util.concurrent.TimeUnit?):java.lang.InterruptedException" + - "java.util.concurrent.ScheduledThreadPoolExecutor.constructor(kotlin.Int):java.lang.IllegalArgumentException" - "java.util.concurrent.ScheduledThreadPoolExecutor.schedule(java.lang.Runnable, kotlin.Long, java.util.concurrent.TimeUnit):java.util.concurrent.RejectedExecutionException,java.lang.NullPointerException" - "java.util.concurrent.ThreadPoolExecutor.constructor(kotlin.Int, kotlin.Int, kotlin.Long, java.util.concurrent.TimeUnit, java.util.concurrent.BlockingQueue):java.lang.NullPointerException,java.lang.IllegalArgumentException" # endregion @@ -623,6 +626,7 @@ datadog: - "java.util.concurrent.ScheduledThreadPoolExecutor.scheduleSafe(kotlin.String, kotlin.Long, java.util.concurrent.TimeUnit, com.datadog.android.api.InternalLogger, java.lang.Runnable)" - "java.util.concurrent.ScheduledThreadPoolExecutor.shutdown()" - "java.util.concurrent.ScheduledThreadPoolExecutor.shutdownNow()" + - "java.util.concurrent.ThreadPoolExecutor.afterExecute(java.lang.Runnable?, kotlin.Throwable?)" - "java.util.concurrent.ThreadPoolExecutor.isIdle()" - "java.util.concurrent.ThreadPoolExecutor.waitToIdle(kotlin.Long)" - "java.util.concurrent.ThreadPoolExecutor.waitToIdle(kotlin.Long, com.datadog.android.api.InternalLogger)" From 707aa844d7f3798fe5ce39d448f6b452df5e7aaa Mon Sep 17 00:00:00 2001 From: "Xavier F. Gouchet" Date: Tue, 26 Mar 2024 11:30:00 +0100 Subject: [PATCH 4/4] RUM-3670 code review feedbacks --- .../core/internal/thread/BackPressureExecutorService.kt | 3 +++ .../datadog/android/core/thread/FlushableExecutorService.kt | 3 ++- .../thread/DropOldestBackPressuredBlockingQueueTest.kt | 2 +- .../thread/IgnoreNewestBackPressuredBlockingQueueTest.kt | 2 +- 4 files changed, 7 insertions(+), 3 deletions(-) diff --git a/dd-sdk-android-core/src/main/kotlin/com/datadog/android/core/internal/thread/BackPressureExecutorService.kt b/dd-sdk-android-core/src/main/kotlin/com/datadog/android/core/internal/thread/BackPressureExecutorService.kt index ab5dd7bc9b..ca8e00465a 100644 --- a/dd-sdk-android-core/src/main/kotlin/com/datadog/android/core/internal/thread/BackPressureExecutorService.kt +++ b/dd-sdk-android-core/src/main/kotlin/com/datadog/android/core/internal/thread/BackPressureExecutorService.kt @@ -12,6 +12,9 @@ import com.datadog.android.core.thread.FlushableExecutorService import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit +/** + * A single threaded executor service using a BackPressureStrategy. + */ internal class BackPressureExecutorService( val logger: InternalLogger, backpressureStrategy: BackPressureStrategy diff --git a/dd-sdk-android-core/src/main/kotlin/com/datadog/android/core/thread/FlushableExecutorService.kt b/dd-sdk-android-core/src/main/kotlin/com/datadog/android/core/thread/FlushableExecutorService.kt index e2d0853731..531680c961 100644 --- a/dd-sdk-android-core/src/main/kotlin/com/datadog/android/core/thread/FlushableExecutorService.kt +++ b/dd-sdk-android-core/src/main/kotlin/com/datadog/android/core/thread/FlushableExecutorService.kt @@ -8,12 +8,13 @@ package com.datadog.android.core.thread import com.datadog.android.api.InternalLogger import com.datadog.android.core.configuration.BackPressureStrategy +import com.datadog.android.lint.InternalApi import java.util.concurrent.ExecutorService /** * An [ExecutorService] which backing queue can be drained to a collection. - * */ +@InternalApi interface FlushableExecutorService : ExecutorService { /** diff --git a/dd-sdk-android-core/src/test/kotlin/com/datadog/android/core/internal/thread/DropOldestBackPressuredBlockingQueueTest.kt b/dd-sdk-android-core/src/test/kotlin/com/datadog/android/core/internal/thread/DropOldestBackPressuredBlockingQueueTest.kt index ce6bfb5cac..8b093dcb36 100644 --- a/dd-sdk-android-core/src/test/kotlin/com/datadog/android/core/internal/thread/DropOldestBackPressuredBlockingQueueTest.kt +++ b/dd-sdk-android-core/src/test/kotlin/com/datadog/android/core/internal/thread/DropOldestBackPressuredBlockingQueueTest.kt @@ -403,7 +403,7 @@ class DropOldestBackPressuredBlockingQueueTest { // When Thread { - // PutInserts the specified element into this queue, waiting if necessary for space to become available. + // put() inserts the specified element into this queue, waiting if necessary for space to become available. // In order to not wait indefinitely, we need to remove an element sleep(100) testedQueue.take() diff --git a/dd-sdk-android-core/src/test/kotlin/com/datadog/android/core/internal/thread/IgnoreNewestBackPressuredBlockingQueueTest.kt b/dd-sdk-android-core/src/test/kotlin/com/datadog/android/core/internal/thread/IgnoreNewestBackPressuredBlockingQueueTest.kt index c943ac40d1..e8e24eddab 100644 --- a/dd-sdk-android-core/src/test/kotlin/com/datadog/android/core/internal/thread/IgnoreNewestBackPressuredBlockingQueueTest.kt +++ b/dd-sdk-android-core/src/test/kotlin/com/datadog/android/core/internal/thread/IgnoreNewestBackPressuredBlockingQueueTest.kt @@ -403,7 +403,7 @@ class IgnoreNewestBackPressuredBlockingQueueTest { // When Thread { - // PutInserts the specified element into this queue, waiting if necessary for space to become available. + // put() inserts the specified element into this queue, waiting if necessary for space to become available. // In order to not wait indefinitely, we need to remove an element sleep(100) testedQueue.take()