Skip to content

Commit

Permalink
RUM-2014 Use the synchronous equivalent of readNextBatch API into the…
Browse files Browse the repository at this point in the history
… uploader
  • Loading branch information
mariusc83 committed Dec 19, 2023
1 parent e609161 commit 6e89ab6
Show file tree
Hide file tree
Showing 14 changed files with 346 additions and 694 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import com.datadog.android.core.internal.metrics.RemovalReason
import com.datadog.android.core.internal.utils.unboundInternalLogger
import java.util.LinkedList
import java.util.Queue
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit

internal class UploadWorker(
appContext: Context,
Expand Down Expand Up @@ -87,32 +85,22 @@ internal class UploadWorker(

val storage = feature.storage
val uploader = feature.uploader

// storage APIs may be async, so we need to block current thread to keep Worker alive
@Suppress("UnsafeThirdPartyFunctionCall") // safe to create, argument is not negative
val lock = CountDownLatch(1)

storage.readNextBatch(noBatchCallback = {
lock.countDown()
}) { batchId, reader ->
val batch = reader.read()
val batchMeta = reader.currentMetadata()

val uploadStatus = consumeBatch(context, batch, batchMeta, uploader)
val nextBatchData = storage.readNextBatch()
if (nextBatchData != null) {
val uploadStatus = consumeBatch(
context,
nextBatchData.data,
nextBatchData.metadata,
uploader
)
storage.confirmBatchRead(
batchId,
RemovalReason.IntakeCode(uploadStatus.code)
) { confirmation ->
confirmation.markAsRead(deleteBatch = !uploadStatus.shouldRetry)
@Suppress("UnsafeThirdPartyFunctionCall") // safe to add
taskQueue.offer(UploadNextBatchTask(taskQueue, sdkCore, feature))
lock.countDown()
}
nextBatchData.id,
RemovalReason.IntakeCode(uploadStatus.code),
deleteBatch = !uploadStatus.shouldRetry
)
@Suppress("UnsafeThirdPartyFunctionCall") // safe to add
taskQueue.offer(UploadNextBatchTask(taskQueue, sdkCore, feature))
}

@Suppress("UnsafeThirdPartyFunctionCall") // if interrupt happens, WorkManager
// will handle it
lock.await(LOCK_AWAIT_SECONDS, TimeUnit.SECONDS)
}

private fun consumeBatch(
Expand All @@ -128,7 +116,6 @@ internal class UploadWorker(
// endregion

companion object {
const val LOCK_AWAIT_SECONDS = 30L

const val MESSAGE_NOT_INITIALIZED = "Datadog has not been initialized."

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import com.datadog.android.api.context.DatadogContext
import com.datadog.android.api.context.NetworkInfo
import com.datadog.android.api.storage.RawBatchEvent
import com.datadog.android.core.internal.ContextProvider
import com.datadog.android.core.internal.CoreFeature
import com.datadog.android.core.internal.configuration.DataUploadConfiguration
import com.datadog.android.core.internal.data.upload.UploadRunnable
import com.datadog.android.core.internal.data.upload.UploadStatus
Expand All @@ -22,7 +21,6 @@ import com.datadog.android.core.internal.persistence.BatchId
import com.datadog.android.core.internal.persistence.Storage
import com.datadog.android.core.internal.system.SystemInfoProvider
import com.datadog.android.core.internal.utils.scheduleSafe
import java.util.concurrent.CountDownLatch
import java.util.concurrent.ScheduledThreadPoolExecutor
import java.util.concurrent.TimeUnit
import kotlin.math.max
Expand All @@ -37,7 +35,6 @@ internal class DataUploadRunnable(
private val networkInfoProvider: NetworkInfoProvider,
private val systemInfoProvider: SystemInfoProvider,
uploadConfiguration: DataUploadConfiguration,
private val batchUploadWaitTimeoutMs: Long = CoreFeature.NETWORK_TIMEOUT_MS,
private val internalLogger: InternalLogger
) : UploadRunnable {

Expand All @@ -52,8 +49,6 @@ internal class DataUploadRunnable(
override fun run() {
if (isNetworkAvailable() && isSystemReady()) {
val context = contextProvider.context
// TODO RUMM-0000 it should be already on the worker thread and if readNextBatch is async,
// we should wait until it completes before scheduling further
var batchConsumerAvailableAttempts = maxBatchesPerJob
var lastBatchUploadStatus: UploadStatus?
do {
Expand Down Expand Up @@ -90,27 +85,15 @@ internal class DataUploadRunnable(
@Suppress("UnsafeThirdPartyFunctionCall") // called inside a dedicated executor
private fun handleNextBatch(context: DatadogContext): UploadStatus? {
var uploadStatus: UploadStatus? = null
val lock = CountDownLatch(1)
storage.readNextBatch(
noBatchCallback = {
lock.countDown()
}
) { batchId, reader ->
try {
val batch = reader.read()
val batchMeta = reader.currentMetadata()

uploadStatus = consumeBatch(
context,
batchId,
batch,
batchMeta
)
} finally {
lock.countDown()
}
val nextBatchData = storage.readNextBatch()
if (nextBatchData != null) {
uploadStatus = consumeBatch(
context,
nextBatchData.id,
nextBatchData.data,
nextBatchData.metadata
)
}
lock.await(batchUploadWaitTimeoutMs, TimeUnit.MILLISECONDS)
return uploadStatus
}

Expand Down Expand Up @@ -151,9 +134,7 @@ internal class DataUploadRunnable(
} else {
RemovalReason.IntakeCode(status.code)
}
storage.confirmBatchRead(batchId, removalReason) {
it.markAsRead(deleteBatch = !status.shouldRetry)
}
storage.confirmBatchRead(batchId, removalReason, deleteBatch = !status.shouldRetry)
return status
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,48 +86,27 @@ internal class AbstractStorage(
}

@WorkerThread
override fun readNextBatch(
noBatchCallback: () -> Unit,
readBatchCallback: (BatchId, BatchReader) -> Unit
) {
val batch = grantedPersistenceStrategy.lockAndReadNext()
if (batch == null) {
noBatchCallback()
} else {
val batchId = BatchId(batch.batchId)
val reader = object : BatchReader {

@WorkerThread
override fun currentMetadata(): ByteArray? {
return batch.metadata
}

@WorkerThread
override fun read(): List<RawBatchEvent> {
return batch.events
}
}
readBatchCallback.invoke(batchId, reader)
override fun readNextBatch(): BatchData? {
return grantedPersistenceStrategy.lockAndReadNext()?.let {
BatchData(
id = BatchId(it.batchId),
data = it.events,
metadata = it.metadata
)
}
}

@WorkerThread
override fun confirmBatchRead(
batchId: BatchId,
removalReason: RemovalReason,
callback: (BatchConfirmation) -> Unit
deleteBatch: Boolean
) {
val confirmation = object : BatchConfirmation {
@WorkerThread
override fun markAsRead(deleteBatch: Boolean) {
if (deleteBatch) {
grantedPersistenceStrategy.unlockAndDelete(batchId.id)
} else {
grantedPersistenceStrategy.unlockAndKeep(batchId.id)
}
}
if (deleteBatch) {
grantedPersistenceStrategy.unlockAndDelete(batchId.id)
} else {
grantedPersistenceStrategy.unlockAndKeep(batchId.id)
}
callback(confirmation)
}

override fun dropAll() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Unless explicitly stated otherwise all files in this repository are licensed under the Apache License Version 2.0.
* This product includes software developed at Datadog (https://www.datadoghq.com/).
* Copyright 2016-Present Datadog, Inc.
*/

package com.datadog.android.core.internal.persistence

import com.datadog.android.api.storage.RawBatchEvent

internal data class BatchData(
val id: BatchId,
val data: List<RawBatchEvent>,
val metadata: ByteArray? = null
) {
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false

other as BatchData

if (id != other.id) return false
if (data != other.data) return false
if (metadata != null) {
if (other.metadata == null) return false
if (!metadata.contentEquals(other.metadata)) return false
} else if (other.metadata != null) return false

return true
}

override fun hashCode(): Int {
var result = id.hashCode()
result = 31 * result + data.hashCode()
result = 31 * result + (metadata?.contentHashCode() ?: 0)
return result
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import androidx.annotation.WorkerThread
import com.datadog.android.api.InternalLogger
import com.datadog.android.api.context.DatadogContext
import com.datadog.android.api.storage.EventBatchWriter
import com.datadog.android.api.storage.RawBatchEvent
import com.datadog.android.core.internal.metrics.MetricsDispatcher
import com.datadog.android.core.internal.metrics.RemovalReason
import com.datadog.android.core.internal.persistence.file.FileMover
Expand Down Expand Up @@ -84,63 +83,44 @@ internal class ConsentAwareStorage(

/** @inheritdoc */
@WorkerThread
override fun readNextBatch(
noBatchCallback: () -> Unit,
readBatchCallback: (BatchId, BatchReader) -> Unit
) {
override fun readNextBatch(): BatchData? {
val (batchFile, metaFile) = synchronized(lockedBatches) {
val batchFile = grantedOrchestrator
.getReadableFile(lockedBatches.map { it.file }.toSet())
if (batchFile == null) {
noBatchCallback()
return
}
.getReadableFile(lockedBatches.map { it.file }.toSet()) ?: return null

val metaFile = grantedOrchestrator.getMetadataFile(batchFile)
lockedBatches.add(Batch(batchFile, metaFile))
batchFile to metaFile
}

val batchId = BatchId.fromFile(batchFile)
val reader = object : BatchReader {

@WorkerThread
override fun currentMetadata(): ByteArray? {
if (metaFile == null || !metaFile.existsSafe(internalLogger)) return null

return batchMetadataReaderWriter.readData(metaFile)
}

@WorkerThread
override fun read(): List<RawBatchEvent> {
return batchEventsReaderWriter.readData(batchFile)
}
val batchMetadata = if (metaFile == null || !metaFile.existsSafe(internalLogger)) {
null
} else {
batchMetadataReaderWriter.readData(metaFile)
}
readBatchCallback(batchId, reader)
val batchData = batchEventsReaderWriter.readData(batchFile)

return BatchData(id = batchId, data = batchData, metadata = batchMetadata)
}

/** @inheritdoc */
@WorkerThread
override fun confirmBatchRead(
batchId: BatchId,
removalReason: RemovalReason,
callback: (BatchConfirmation) -> Unit
deleteBatch: Boolean
) {
val batch = synchronized(lockedBatches) {
lockedBatches.firstOrNull { batchId.matchesFile(it.file) }
} ?: return
val confirmation = object : BatchConfirmation {
@WorkerThread
override fun markAsRead(deleteBatch: Boolean) {
if (deleteBatch) {
deleteBatch(batch, removalReason)
}
synchronized(lockedBatches) {
lockedBatches.remove(batch)
}
}

if (deleteBatch) {
deleteBatch(batch, removalReason)
}
synchronized(lockedBatches) {
lockedBatches.remove(batch)
}
callback(confirmation)
}

/** @inheritdoc */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,28 +39,23 @@ internal interface Storage {
)

/**
* Utility to read a batch, asynchronously.
* @param noBatchCallback an optional callback which is called when there is no batch available to read.
* @param readBatchCallback an operation to perform with a [BatchId] and [BatchReader] that will target
* the next readable Batch
* Utility to read a batch, synchronously.
*/
@WorkerThread
fun readNextBatch(
noBatchCallback: () -> Unit = {},
readBatchCallback: (BatchId, BatchReader) -> Unit
)
fun readNextBatch(): BatchData?

/**
* Utility to update the state of a batch, asynchronously.
* Utility to update the state of a batch, synchronously.
* @param batchId the id of the Batch to confirm
* @param removalReason the reason why the batch is being removed
* @param callback an operation to perform with a [BatchConfirmation]
* @param deleteBatch if `true` the batch will be deleted, otherwise it will be marked as
* not readable.
*/
@WorkerThread
fun confirmBatchRead(
batchId: BatchId,
removalReason: RemovalReason,
callback: (BatchConfirmation) -> Unit
deleteBatch: Boolean
)

/**
Expand Down
Loading

0 comments on commit 6e89ab6

Please sign in to comment.