diff --git a/features/dd-sdk-android-session-replay/src/main/kotlin/com/datadog/android/sessionreplay/internal/async/RecordedDataQueueHandler.kt b/features/dd-sdk-android-session-replay/src/main/kotlin/com/datadog/android/sessionreplay/internal/async/RecordedDataQueueHandler.kt index 190e4e8382..27047a581b 100644 --- a/features/dd-sdk-android-session-replay/src/main/kotlin/com/datadog/android/sessionreplay/internal/async/RecordedDataQueueHandler.kt +++ b/features/dd-sdk-android-session-replay/src/main/kotlin/com/datadog/android/sessionreplay/internal/async/RecordedDataQueueHandler.kt @@ -13,74 +13,24 @@ import com.datadog.android.api.InternalLogger import com.datadog.android.core.internal.utils.executeSafe import com.datadog.android.sessionreplay.internal.processor.RecordedDataProcessor import com.datadog.android.sessionreplay.internal.processor.RumContextDataHandler -import com.datadog.android.sessionreplay.internal.utils.TimeProvider import com.datadog.android.sessionreplay.model.MobileSegment import com.datadog.android.sessionreplay.recorder.SystemInformation import java.util.Locale import java.util.Queue -import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.ExecutorService -import java.util.concurrent.LinkedBlockingDeque -import java.util.concurrent.ThreadPoolExecutor -import java.util.concurrent.TimeUnit /** * Responsible for storing the Snapshot and Interaction events in a queue. * Allows for asynchronous enrichment, which still preserving the event order. * The items are added to the queue from the main thread and processed on a background thread. */ -@Suppress("TooManyFunctions") -internal class RecordedDataQueueHandler : DataQueueHandler { - private var executorService: ExecutorService - private var processor: RecordedDataProcessor - private var rumContextDataHandler: RumContextDataHandler - private var timeProvider: TimeProvider - private val internalLogger: InternalLogger +internal class RecordedDataQueueHandler( + private var processor: RecordedDataProcessor, + private var rumContextDataHandler: RumContextDataHandler, + private val internalLogger: InternalLogger, + private val executorService: ExecutorService, internal val recordedDataQueue: Queue - - internal constructor( - processor: RecordedDataProcessor, - rumContextDataHandler: RumContextDataHandler, - timeProvider: TimeProvider, - internalLogger: InternalLogger - ) : this( - processor = processor, - rumContextDataHandler = rumContextDataHandler, - timeProvider = timeProvider, - internalLogger = internalLogger, - - /** - * TODO RUMM-0000 consider change to LoggingThreadPoolExecutor once V2 is merged. - * if we ever decide to make the poolsize greater than 1, we need to ensure - * synchronization works correctly in the triggerProcessingLoop method below - */ - // all parameters are non-negative and queue is not null - executorService = @Suppress("UnsafeThirdPartyFunctionCall") ThreadPoolExecutor( - CORE_DEFAULT_POOL_SIZE, - CORE_DEFAULT_POOL_SIZE, - THREAD_POOL_MAX_KEEP_ALIVE_MS, - TimeUnit.MILLISECONDS, - LinkedBlockingDeque() - ), - recordedQueue = ConcurrentLinkedQueue() - ) - - @VisibleForTesting - internal constructor( - processor: RecordedDataProcessor, - rumContextDataHandler: RumContextDataHandler, - timeProvider: TimeProvider, - executorService: ExecutorService, - internalLogger: InternalLogger, - recordedQueue: Queue = ConcurrentLinkedQueue() - ) { - this.processor = processor - this.rumContextDataHandler = rumContextDataHandler - this.executorService = executorService - this.timeProvider = timeProvider - this.internalLogger = internalLogger - this.recordedDataQueue = recordedQueue - } +) : DataQueueHandler { @Synchronized override fun clearAndStopProcessingQueue() { @@ -155,11 +105,8 @@ internal class RecordedDataQueueHandler : DataQueueHandler { return } - // currentTime needs to be obtained on the uithread - val currentTime = timeProvider.getDeviceTimestamp() - executorService.executeSafe("Recorded Data queue processing", internalLogger) { - triggerProcessingLoop(currentTime) + triggerProcessingLoop() } } @@ -170,7 +117,7 @@ internal class RecordedDataQueueHandler : DataQueueHandler { */ @WorkerThread @Synchronized - private fun triggerProcessingLoop(currentTime: Long) { + private fun triggerProcessingLoop() { while (recordedDataQueue.isNotEmpty()) { // peeking is safe here because we are in a synchronized block // and we check for isEmpty first @@ -178,7 +125,7 @@ internal class RecordedDataQueueHandler : DataQueueHandler { val nextItem = recordedDataQueue.peek() if (nextItem != null) { - val nextItemAge = currentTime - nextItem.recordedQueuedItemContext.timestamp + val nextItemAgeInNs = System.nanoTime() - nextItem.creationTimeStampInNs if (!nextItem.isValid()) { internalLogger.log( InternalLogger.Level.WARN, @@ -189,14 +136,14 @@ internal class RecordedDataQueueHandler : DataQueueHandler { { ITEM_DROPPED_INVALID_MESSAGE.format(Locale.US, nextItem.javaClass.simpleName) } ) recordedDataQueue.poll() - } else if (nextItemAge > MAX_DELAY_MS) { + } else if (nextItemAgeInNs > MAX_DELAY_NS) { internalLogger.log( InternalLogger.Level.WARN, listOf( InternalLogger.Target.MAINTAINER, InternalLogger.Target.TELEMETRY ), - { ITEM_DROPPED_EXPIRED_MESSAGE.format(Locale.US, nextItemAge) } + { ITEM_DROPPED_EXPIRED_MESSAGE.format(Locale.US, nextItemAgeInNs) } ) recordedDataQueue.poll() } else if (nextItem.isReady()) { @@ -263,12 +210,8 @@ internal class RecordedDataQueueHandler : DataQueueHandler { internal companion object { @VisibleForTesting - internal const val MAX_DELAY_MS = 1000L + internal const val MAX_DELAY_NS = 1_000_000_000L // 1 second in ns - private val THREAD_POOL_MAX_KEEP_ALIVE_MS = TimeUnit.SECONDS.toMillis(5) - private const val CORE_DEFAULT_POOL_SIZE = 1 // Only one thread will be kept alive - internal const val FAILED_TO_CONSUME_RECORDS_QUEUE_ERROR_MESSAGE = - "SR RecordedDataQueueHandler: failed to consume records from queue" internal const val FAILED_TO_ADD_RECORDS_TO_QUEUE_ERROR_MESSAGE = "SR RecordedDataQueueHandler: failed to add records into the queue" diff --git a/features/dd-sdk-android-session-replay/src/main/kotlin/com/datadog/android/sessionreplay/internal/async/RecordedDataQueueItem.kt b/features/dd-sdk-android-session-replay/src/main/kotlin/com/datadog/android/sessionreplay/internal/async/RecordedDataQueueItem.kt index 6f18a57f45..0e02939187 100644 --- a/features/dd-sdk-android-session-replay/src/main/kotlin/com/datadog/android/sessionreplay/internal/async/RecordedDataQueueItem.kt +++ b/features/dd-sdk-android-session-replay/src/main/kotlin/com/datadog/android/sessionreplay/internal/async/RecordedDataQueueItem.kt @@ -9,7 +9,8 @@ package com.datadog.android.sessionreplay.internal.async import com.datadog.android.sessionreplay.internal.processor.RecordedQueuedItemContext internal abstract class RecordedDataQueueItem( - internal val recordedQueuedItemContext: RecordedQueuedItemContext + internal val recordedQueuedItemContext: RecordedQueuedItemContext, + internal val creationTimeStampInNs: Long = System.nanoTime() ) { internal abstract fun isValid(): Boolean diff --git a/features/dd-sdk-android-session-replay/src/main/kotlin/com/datadog/android/sessionreplay/internal/recorder/SessionReplayRecorder.kt b/features/dd-sdk-android-session-replay/src/main/kotlin/com/datadog/android/sessionreplay/internal/recorder/SessionReplayRecorder.kt index 794e814722..ee858c3f5e 100644 --- a/features/dd-sdk-android-session-replay/src/main/kotlin/com/datadog/android/sessionreplay/internal/recorder/SessionReplayRecorder.kt +++ b/features/dd-sdk-android-session-replay/src/main/kotlin/com/datadog/android/sessionreplay/internal/recorder/SessionReplayRecorder.kt @@ -9,6 +9,7 @@ package com.datadog.android.sessionreplay.internal.recorder import android.app.Application import android.os.Handler import android.os.Looper +import android.text.format.DateUtils import android.view.Window import androidx.annotation.MainThread import androidx.annotation.VisibleForTesting @@ -46,6 +47,10 @@ import com.datadog.android.sessionreplay.utils.DefaultViewIdentifierResolver import com.datadog.android.sessionreplay.utils.DrawableToColorMapper import com.datadog.android.sessionreplay.utils.ViewBoundsResolver import com.datadog.android.sessionreplay.utils.ViewIdentifierResolver +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.LinkedBlockingDeque +import java.util.concurrent.ThreadPoolExecutor +import java.util.concurrent.TimeUnit internal class SessionReplayRecorder : OnWindowRefreshedCallback, Recorder { @@ -104,8 +109,25 @@ internal class SessionReplayRecorder : OnWindowRefreshedCallback, Recorder { this.recordedDataQueueHandler = RecordedDataQueueHandler( processor = processor, rumContextDataHandler = rumContextDataHandler, - timeProvider = timeProvider, - internalLogger = internalLogger + internalLogger = internalLogger, + + /** + * TODO RUMM-0000 consider change to LoggingThreadPoolExecutor once V2 is merged. + * if we ever decide to make the poolsize greater than 1, we need to ensure + * synchronization works correctly in the triggerProcessingLoop method below + */ + executorService = // all parameters are non-negative and queue is not null + @Suppress("UnsafeThirdPartyFunctionCall") + ( + ThreadPoolExecutor( + CORE_DEFAULT_POOL_SIZE, + CORE_DEFAULT_POOL_SIZE, + THREAD_POOL_MAX_KEEP_ALIVE_MS, + TimeUnit.MILLISECONDS, + LinkedBlockingDeque() + ) + ), + recordedDataQueue = ConcurrentLinkedQueue() ) val viewIdentifierResolver: ViewIdentifierResolver = DefaultViewIdentifierResolver @@ -259,4 +281,9 @@ internal class SessionReplayRecorder : OnWindowRefreshedCallback, Recorder { viewOnDrawInterceptor.intercept(decorViews, privacy) } } + + private companion object { + private const val THREAD_POOL_MAX_KEEP_ALIVE_MS = DateUtils.SECOND_IN_MILLIS * 5 // 5000ms + private const val CORE_DEFAULT_POOL_SIZE = 1 // Only one thread will be kept alive + } } diff --git a/features/dd-sdk-android-session-replay/src/test/kotlin/com/datadog/android/sessionreplay/internal/async/RecordedDataQueueHandlerTest.kt b/features/dd-sdk-android-session-replay/src/test/kotlin/com/datadog/android/sessionreplay/internal/async/RecordedDataQueueHandlerTest.kt index fdf6c47ce4..6c8510b307 100644 --- a/features/dd-sdk-android-session-replay/src/test/kotlin/com/datadog/android/sessionreplay/internal/async/RecordedDataQueueHandlerTest.kt +++ b/features/dd-sdk-android-session-replay/src/test/kotlin/com/datadog/android/sessionreplay/internal/async/RecordedDataQueueHandlerTest.kt @@ -8,12 +8,12 @@ package com.datadog.android.sessionreplay.internal.async import com.datadog.android.api.InternalLogger import com.datadog.android.sessionreplay.forge.ForgeConfigurator -import com.datadog.android.sessionreplay.internal.async.RecordedDataQueueHandler.Companion.MAX_DELAY_MS +import com.datadog.android.sessionreplay.internal.async.RecordedDataQueueHandler.Companion.ITEM_DROPPED_EXPIRED_MESSAGE +import com.datadog.android.sessionreplay.internal.async.RecordedDataQueueHandler.Companion.ITEM_DROPPED_INVALID_MESSAGE import com.datadog.android.sessionreplay.internal.processor.RecordedDataProcessor import com.datadog.android.sessionreplay.internal.processor.RecordedQueuedItemContext import com.datadog.android.sessionreplay.internal.processor.RumContextDataHandler import com.datadog.android.sessionreplay.internal.recorder.Node -import com.datadog.android.sessionreplay.internal.time.SessionReplayTimeProvider import com.datadog.android.sessionreplay.model.MobileSegment import com.datadog.android.sessionreplay.recorder.SystemInformation import com.datadog.android.utils.verifyLog @@ -47,6 +47,7 @@ import org.mockito.kotlin.verifyNoInteractions import org.mockito.kotlin.verifyNoMoreInteractions import org.mockito.kotlin.whenever import org.mockito.quality.Strictness +import java.util.Locale import java.util.Queue import java.util.UUID import java.util.concurrent.ConcurrentLinkedQueue @@ -76,9 +77,6 @@ internal class RecordedDataQueueHandlerTest { @Mock lateinit var mockSystemInformation: SystemInformation - @Mock - lateinit var mockTimeProvider: SessionReplayTimeProvider - @Mock lateinit var mockInternalLogger: InternalLogger @@ -129,10 +127,9 @@ internal class RecordedDataQueueHandlerTest { testedHandler = RecordedDataQueueHandler( processor = mockProcessor, rumContextDataHandler = mockRumContextDataHandler, - timeProvider = mockTimeProvider, executorService = spyExecutorService, internalLogger = mockInternalLogger, - recordedQueue = fakeRecordedDataQueue + recordedDataQueue = fakeRecordedDataQueue ) } @@ -152,10 +149,9 @@ internal class RecordedDataQueueHandlerTest { testedHandler = RecordedDataQueueHandler( processor = mockProcessor, rumContextDataHandler = mockRumContextDataHandler, - timeProvider = mockTimeProvider, executorService = spyExecutorService, internalLogger = mockInternalLogger, - recordedQueue = mockQueue + recordedDataQueue = mockQueue ) testedHandler.recordedDataQueue.add(fakeSnapshotQueueItem) @@ -279,14 +275,19 @@ internal class RecordedDataQueueHandlerTest { } @Test - fun `M remove item from queue W tryToConsumeItems() { expired item }`() { + fun `M remove item from queue W tryToConsumeItems() { expired item }`( + @Mock mockSnapshotItem: SnapshotRecordedDataQueueItem + ) { // Given - val item = testedHandler.addSnapshotItem(mockSystemInformation) - ?: fail("item is null") - item.nodes = fakeNodeData + mockSnapshotItem.apply { + val expiredTime = System.nanoTime() - RecordedDataQueueHandler.MAX_DELAY_NS + whenever(creationTimeStampInNs).thenReturn(expiredTime) + whenever(isValid()).thenReturn(true) + whenever(isReady()).thenReturn(true) + whenever(nodes).thenReturn(fakeNodeData) + } - whenever(mockTimeProvider.getDeviceTimestamp()) - .thenReturn(item.recordedQueuedItemContext.timestamp + MAX_DELAY_MS + 1) + testedHandler.recordedDataQueue.offer(mockSnapshotItem) // When testedHandler.tryToConsumeItems() @@ -295,13 +296,14 @@ internal class RecordedDataQueueHandlerTest { // Then assertThat(testedHandler.recordedDataQueue.isEmpty()).isTrue + val expectedMessage = ITEM_DROPPED_EXPIRED_MESSAGE.split("=")[0] mockInternalLogger.verifyLog( InternalLogger.Level.WARN, listOf( InternalLogger.Target.MAINTAINER, InternalLogger.Target.TELEMETRY ), - { it.startsWith("SR RecordedDataQueueHandler: dropped item from the queue. age=") } + { it.startsWith(expectedMessage) } ) verifyNoMoreInteractions(mockProcessor) } @@ -317,10 +319,6 @@ internal class RecordedDataQueueHandlerTest { testedHandler.recordedDataQueue.offer(mockSnapshotItem) - val timestamp = mockSnapshotItem.recordedQueuedItemContext.timestamp - whenever(mockTimeProvider.getDeviceTimestamp()) - .thenReturn(timestamp) - // When testedHandler.tryToConsumeItems() spyExecutorService.shutdown() @@ -328,14 +326,15 @@ internal class RecordedDataQueueHandlerTest { // Then assertThat(testedHandler.recordedDataQueue).isEmpty() + val expectedMessage = + ITEM_DROPPED_INVALID_MESSAGE.format(Locale.US, "SnapshotRecordedDataQueueItem") mockInternalLogger.verifyLog( InternalLogger.Level.WARN, listOf( InternalLogger.Target.MAINTAINER, InternalLogger.Target.TELEMETRY ), - "SR RecordedDataQueueHandler: dropped item from the queue. isValid=false, " + - "type=SnapshotRecordedDataQueueItem" + expectedMessage ) verifyNoMoreInteractions(mockProcessor) } @@ -350,10 +349,6 @@ internal class RecordedDataQueueHandlerTest { testedHandler.recordedDataQueue.offer(mockTouchEventItem) - val spyTimestamp = mockTouchEventItem.recordedQueuedItemContext.timestamp - whenever(mockTimeProvider.getDeviceTimestamp()) - .thenReturn(spyTimestamp) - // When testedHandler.tryToConsumeItems() spyExecutorService.shutdown() @@ -361,14 +356,15 @@ internal class RecordedDataQueueHandlerTest { // Then assertThat(testedHandler.recordedDataQueue).isEmpty() + val expectedMessage = + ITEM_DROPPED_INVALID_MESSAGE.format(Locale.US, "TouchEventRecordedDataQueueItem") mockInternalLogger.verifyLog( InternalLogger.Level.WARN, listOf( InternalLogger.Target.MAINTAINER, InternalLogger.Target.TELEMETRY ), - "SR RecordedDataQueueHandler: dropped item from the queue. isValid=false, " + - "type=TouchEventRecordedDataQueueItem" + expectedMessage ) verifyNoMoreInteractions(mockProcessor) } @@ -382,10 +378,6 @@ internal class RecordedDataQueueHandlerTest { whenever(mockResourceItem.recordedQueuedItemContext).thenReturn(fakeRecordedQueuedItemContext) testedHandler.recordedDataQueue.offer(mockResourceItem) - val timestamp = mockResourceItem.recordedQueuedItemContext.timestamp - whenever(mockTimeProvider.getDeviceTimestamp()) - .thenReturn(timestamp) - // When testedHandler.tryToConsumeItems() spyExecutorService.shutdown() @@ -393,14 +385,15 @@ internal class RecordedDataQueueHandlerTest { // Then assertThat(testedHandler.recordedDataQueue).isEmpty() + val expectedMessage = + ITEM_DROPPED_INVALID_MESSAGE.format(Locale.US, "ResourceRecordedDataQueueItem") mockInternalLogger.verifyLog( InternalLogger.Level.WARN, listOf( InternalLogger.Target.MAINTAINER, InternalLogger.Target.TELEMETRY ), - "SR RecordedDataQueueHandler: dropped item from the queue. isValid=false, " + - "type=ResourceRecordedDataQueueItem" + expectedMessage ) verifyNoMoreInteractions(mockProcessor) } @@ -415,9 +408,6 @@ internal class RecordedDataQueueHandlerTest { testedHandler.recordedDataQueue.add(mockSnapshotItem) - whenever(mockTimeProvider.getDeviceTimestamp()) - .thenReturn(fakeRecordedQueuedItemContext.timestamp) - // When testedHandler.tryToConsumeItems() spyExecutorService.shutdown() @@ -434,9 +424,6 @@ internal class RecordedDataQueueHandlerTest { item.nodes = fakeNodeData item.isFinishedTraversal = true - whenever(mockTimeProvider.getDeviceTimestamp()) - .thenReturn(item.recordedQueuedItemContext.timestamp) - // When testedHandler.tryToConsumeItems() spyExecutorService.shutdown() @@ -455,9 +442,6 @@ internal class RecordedDataQueueHandlerTest { // Given val item = testedHandler.addTouchEventItem(fakeTouchData) ?: fail("item is null") - whenever(mockTimeProvider.getDeviceTimestamp()) - .thenReturn(item.recordedQueuedItemContext.timestamp) - // When testedHandler.tryToConsumeItems() spyExecutorService.shutdown() @@ -483,9 +467,6 @@ internal class RecordedDataQueueHandlerTest { fakePayload.toByteArray() ) ?: fail("item is null") - whenever(mockTimeProvider.getDeviceTimestamp()) - .thenReturn(item.recordedQueuedItemContext.timestamp) - // When testedHandler.tryToConsumeItems() spyExecutorService.shutdown() @@ -513,13 +494,9 @@ internal class RecordedDataQueueHandlerTest { item3.isFinishedTraversal = true assertThat(testedHandler.recordedDataQueue.size).isEqualTo(3) - val itemTimestamp = item1.recordedQueuedItemContext.timestamp // When repeat(50) { - whenever(mockTimeProvider.getDeviceTimestamp()) - .thenReturn(itemTimestamp + it) - testedHandler.tryToConsumeItems() spyExecutorService.shutdown() spyExecutorService.awaitTermination(1, TimeUnit.SECONDS) @@ -542,41 +519,37 @@ internal class RecordedDataQueueHandlerTest { ) { // Given // item1 - val item1RumContextData = fakeRecordedQueuedItemContext.copy(timestamp = 1) - - whenever(mockSnapshotItem1.recordedQueuedItemContext).thenReturn(item1RumContextData) - whenever(mockSnapshotItem1.systemInformation).thenReturn(mockSystemInformation) - whenever(mockSnapshotItem1.nodes).thenReturn(fakeNodeData) - doReturn(true).whenever(mockSnapshotItem1).isValid() - doReturn(true).whenever(mockSnapshotItem1).isReady() + mockSnapshotItem1.apply { + whenever(systemInformation).thenReturn(mockSystemInformation) + whenever(nodes).thenReturn(fakeNodeData) + whenever(creationTimeStampInNs).thenReturn(System.nanoTime()) + whenever(isValid()).thenReturn(true) + whenever(isReady()).thenReturn(true) + } // item2 - val item2RumContextData = fakeRecordedQueuedItemContext.copy(timestamp = 2) - - whenever(mockSnapshotItem2.recordedQueuedItemContext).thenReturn(item2RumContextData) - whenever(mockSnapshotItem2.systemInformation).thenReturn(mockSystemInformation) - whenever(mockSnapshotItem2.nodes).thenReturn(emptyList()) - doReturn(true).whenever(mockSnapshotItem2).isValid() - doReturn(false).whenever(mockSnapshotItem2).isReady() + mockSnapshotItem2.apply { + whenever(systemInformation).thenReturn(mockSystemInformation) + whenever(nodes).thenReturn(fakeNodeData) + whenever(creationTimeStampInNs).thenReturn(System.nanoTime()) + whenever(isValid()).thenReturn(true) + whenever(isReady()).thenReturn(false) + } // item3 - val item3RumContextData = fakeRecordedQueuedItemContext.copy(timestamp = 3) - - whenever(mockSnapshotItem3.recordedQueuedItemContext).thenReturn(item3RumContextData) - whenever(mockSnapshotItem3.systemInformation).thenReturn(mockSystemInformation) - whenever(mockSnapshotItem3.nodes).thenReturn(fakeNodeData) - doReturn(true).whenever(mockSnapshotItem3).isValid() - doReturn(true).whenever(mockSnapshotItem3).isReady() + mockSnapshotItem3.apply { + whenever(systemInformation).thenReturn(mockSystemInformation) + whenever(nodes).thenReturn(fakeNodeData) + whenever(creationTimeStampInNs).thenReturn(System.nanoTime()) + whenever(isValid()).thenReturn(true) + whenever(isReady()).thenReturn(true) + } testedHandler.recordedDataQueue.offer(mockSnapshotItem1) testedHandler.recordedDataQueue.offer(mockSnapshotItem2) testedHandler.recordedDataQueue.offer(mockSnapshotItem3) assertThat(testedHandler.recordedDataQueue.size).isEqualTo(3) - val item1Time = mockSnapshotItem1.recordedQueuedItemContext.timestamp - - whenever(mockTimeProvider.getDeviceTimestamp()) - .thenReturn(item1Time + 1) // When repeat(3) { @@ -699,7 +672,7 @@ internal class RecordedDataQueueHandlerTest { private fun createFakeSnapshotItemWithDelayMs(delay: Int): SnapshotRecordedDataQueueItem { val newRumContext = RecordedQueuedItemContext( - timestamp = System.currentTimeMillis() + delay, + timestamp = System.nanoTime() + delay, newRumContext = fakeRecordedQueuedItemContext.newRumContext )