Skip to content

Commit

Permalink
RUM-3458: Update time on every processing iteration
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathanmos committed Jun 10, 2024
1 parent dc6dc20 commit e4fa696
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,74 +13,24 @@ import com.datadog.android.api.InternalLogger
import com.datadog.android.core.internal.utils.executeSafe
import com.datadog.android.sessionreplay.internal.processor.RecordedDataProcessor
import com.datadog.android.sessionreplay.internal.processor.RumContextDataHandler
import com.datadog.android.sessionreplay.internal.utils.TimeProvider
import com.datadog.android.sessionreplay.model.MobileSegment
import com.datadog.android.sessionreplay.recorder.SystemInformation
import java.util.Locale
import java.util.Queue
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.ExecutorService
import java.util.concurrent.LinkedBlockingDeque
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit

/**
* Responsible for storing the Snapshot and Interaction events in a queue.
* Allows for asynchronous enrichment, which still preserving the event order.
* The items are added to the queue from the main thread and processed on a background thread.
*/
@Suppress("TooManyFunctions")
internal class RecordedDataQueueHandler : DataQueueHandler {
private var executorService: ExecutorService
private var processor: RecordedDataProcessor
private var rumContextDataHandler: RumContextDataHandler
private var timeProvider: TimeProvider
private val internalLogger: InternalLogger
internal class RecordedDataQueueHandler(
private var processor: RecordedDataProcessor,
private var rumContextDataHandler: RumContextDataHandler,
private val internalLogger: InternalLogger,
private val executorService: ExecutorService,
internal val recordedDataQueue: Queue<RecordedDataQueueItem>

internal constructor(
processor: RecordedDataProcessor,
rumContextDataHandler: RumContextDataHandler,
timeProvider: TimeProvider,
internalLogger: InternalLogger
) : this(
processor = processor,
rumContextDataHandler = rumContextDataHandler,
timeProvider = timeProvider,
internalLogger = internalLogger,

/**
* TODO RUMM-0000 consider change to LoggingThreadPoolExecutor once V2 is merged.
* if we ever decide to make the poolsize greater than 1, we need to ensure
* synchronization works correctly in the triggerProcessingLoop method below
*/
// all parameters are non-negative and queue is not null
executorService = @Suppress("UnsafeThirdPartyFunctionCall") ThreadPoolExecutor(
CORE_DEFAULT_POOL_SIZE,
CORE_DEFAULT_POOL_SIZE,
THREAD_POOL_MAX_KEEP_ALIVE_MS,
TimeUnit.MILLISECONDS,
LinkedBlockingDeque()
),
recordedQueue = ConcurrentLinkedQueue()
)

@VisibleForTesting
internal constructor(
processor: RecordedDataProcessor,
rumContextDataHandler: RumContextDataHandler,
timeProvider: TimeProvider,
executorService: ExecutorService,
internalLogger: InternalLogger,
recordedQueue: Queue<RecordedDataQueueItem> = ConcurrentLinkedQueue()
) {
this.processor = processor
this.rumContextDataHandler = rumContextDataHandler
this.executorService = executorService
this.timeProvider = timeProvider
this.internalLogger = internalLogger
this.recordedDataQueue = recordedQueue
}
) : DataQueueHandler {

@Synchronized
override fun clearAndStopProcessingQueue() {
Expand Down Expand Up @@ -155,11 +105,8 @@ internal class RecordedDataQueueHandler : DataQueueHandler {
return
}

// currentTime needs to be obtained on the uithread
val currentTime = timeProvider.getDeviceTimestamp()

executorService.executeSafe("Recorded Data queue processing", internalLogger) {
triggerProcessingLoop(currentTime)
triggerProcessingLoop()
}
}

Expand All @@ -170,15 +117,15 @@ internal class RecordedDataQueueHandler : DataQueueHandler {
*/
@WorkerThread
@Synchronized
private fun triggerProcessingLoop(currentTime: Long) {
private fun triggerProcessingLoop() {
while (recordedDataQueue.isNotEmpty()) {
// peeking is safe here because we are in a synchronized block
// and we check for isEmpty first
@SuppressWarnings("UnsafeThirdPartyFunctionCall")
val nextItem = recordedDataQueue.peek()

if (nextItem != null) {
val nextItemAge = currentTime - nextItem.recordedQueuedItemContext.timestamp
val nextItemAgeInNs = System.nanoTime() - nextItem.creationTimeStampInNs
if (!nextItem.isValid()) {
internalLogger.log(
InternalLogger.Level.WARN,
Expand All @@ -189,14 +136,14 @@ internal class RecordedDataQueueHandler : DataQueueHandler {
{ ITEM_DROPPED_INVALID_MESSAGE.format(Locale.US, nextItem.javaClass.simpleName) }
)
recordedDataQueue.poll()
} else if (nextItemAge > MAX_DELAY_MS) {
} else if (nextItemAgeInNs > MAX_DELAY_NS) {
internalLogger.log(
InternalLogger.Level.WARN,
listOf(
InternalLogger.Target.MAINTAINER,
InternalLogger.Target.TELEMETRY
),
{ ITEM_DROPPED_EXPIRED_MESSAGE.format(Locale.US, nextItemAge) }
{ ITEM_DROPPED_EXPIRED_MESSAGE.format(Locale.US, nextItemAgeInNs) }
)
recordedDataQueue.poll()
} else if (nextItem.isReady()) {
Expand Down Expand Up @@ -263,12 +210,8 @@ internal class RecordedDataQueueHandler : DataQueueHandler {

internal companion object {
@VisibleForTesting
internal const val MAX_DELAY_MS = 1000L
internal const val MAX_DELAY_NS = 1_000_000_000L // 1 second in ns

private val THREAD_POOL_MAX_KEEP_ALIVE_MS = TimeUnit.SECONDS.toMillis(5)
private const val CORE_DEFAULT_POOL_SIZE = 1 // Only one thread will be kept alive
internal const val FAILED_TO_CONSUME_RECORDS_QUEUE_ERROR_MESSAGE =
"SR RecordedDataQueueHandler: failed to consume records from queue"
internal const val FAILED_TO_ADD_RECORDS_TO_QUEUE_ERROR_MESSAGE =
"SR RecordedDataQueueHandler: failed to add records into the queue"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ package com.datadog.android.sessionreplay.internal.async
import com.datadog.android.sessionreplay.internal.processor.RecordedQueuedItemContext

internal abstract class RecordedDataQueueItem(
internal val recordedQueuedItemContext: RecordedQueuedItemContext
internal val recordedQueuedItemContext: RecordedQueuedItemContext,
internal val creationTimeStampInNs: Long = System.nanoTime()
) {
internal abstract fun isValid(): Boolean

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package com.datadog.android.sessionreplay.internal.recorder
import android.app.Application
import android.os.Handler
import android.os.Looper
import android.text.format.DateUtils
import android.view.Window
import androidx.annotation.MainThread
import androidx.annotation.VisibleForTesting
Expand Down Expand Up @@ -46,6 +47,10 @@ import com.datadog.android.sessionreplay.utils.DefaultViewIdentifierResolver
import com.datadog.android.sessionreplay.utils.DrawableToColorMapper
import com.datadog.android.sessionreplay.utils.ViewBoundsResolver
import com.datadog.android.sessionreplay.utils.ViewIdentifierResolver
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.LinkedBlockingDeque
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit

internal class SessionReplayRecorder : OnWindowRefreshedCallback, Recorder {

Expand Down Expand Up @@ -104,8 +109,25 @@ internal class SessionReplayRecorder : OnWindowRefreshedCallback, Recorder {
this.recordedDataQueueHandler = RecordedDataQueueHandler(
processor = processor,
rumContextDataHandler = rumContextDataHandler,
timeProvider = timeProvider,
internalLogger = internalLogger
internalLogger = internalLogger,

/**
* TODO RUMM-0000 consider change to LoggingThreadPoolExecutor once V2 is merged.
* if we ever decide to make the poolsize greater than 1, we need to ensure
* synchronization works correctly in the triggerProcessingLoop method below
*/
executorService = // all parameters are non-negative and queue is not null
@Suppress("UnsafeThirdPartyFunctionCall")
(
ThreadPoolExecutor(
CORE_DEFAULT_POOL_SIZE,
CORE_DEFAULT_POOL_SIZE,
THREAD_POOL_MAX_KEEP_ALIVE_MS,
TimeUnit.MILLISECONDS,
LinkedBlockingDeque()
)
),
recordedDataQueue = ConcurrentLinkedQueue()
)

val viewIdentifierResolver: ViewIdentifierResolver = DefaultViewIdentifierResolver
Expand Down Expand Up @@ -259,4 +281,9 @@ internal class SessionReplayRecorder : OnWindowRefreshedCallback, Recorder {
viewOnDrawInterceptor.intercept(decorViews, privacy)
}
}

private companion object {
private const val THREAD_POOL_MAX_KEEP_ALIVE_MS = DateUtils.SECOND_IN_MILLIS * 5 // 5000ms
private const val CORE_DEFAULT_POOL_SIZE = 1 // Only one thread will be kept alive
}
}
Loading

0 comments on commit e4fa696

Please sign in to comment.