Skip to content

Commit

Permalink
RUM-2014 Use the synchronous equivalent of readNextBatch API into the…
Browse files Browse the repository at this point in the history
… uploader
  • Loading branch information
mariusc83 committed Dec 15, 2023
1 parent b628685 commit 1183b5e
Show file tree
Hide file tree
Showing 13 changed files with 313 additions and 215 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import com.datadog.android.api.context.DatadogContext
import com.datadog.android.api.context.NetworkInfo
import com.datadog.android.api.storage.RawBatchEvent
import com.datadog.android.core.internal.ContextProvider
import com.datadog.android.core.internal.CoreFeature
import com.datadog.android.core.internal.configuration.DataUploadConfiguration
import com.datadog.android.core.internal.data.upload.UploadRunnable
import com.datadog.android.core.internal.data.upload.UploadStatus
Expand All @@ -22,7 +21,6 @@ import com.datadog.android.core.internal.persistence.BatchId
import com.datadog.android.core.internal.persistence.Storage
import com.datadog.android.core.internal.system.SystemInfoProvider
import com.datadog.android.core.internal.utils.scheduleSafe
import java.util.concurrent.CountDownLatch
import java.util.concurrent.ScheduledThreadPoolExecutor
import java.util.concurrent.TimeUnit
import kotlin.math.max
Expand All @@ -37,7 +35,6 @@ internal class DataUploadRunnable(
private val networkInfoProvider: NetworkInfoProvider,
private val systemInfoProvider: SystemInfoProvider,
uploadConfiguration: DataUploadConfiguration,
private val batchUploadWaitTimeoutMs: Long = CoreFeature.NETWORK_TIMEOUT_MS,
private val internalLogger: InternalLogger
) : UploadRunnable {

Expand Down Expand Up @@ -90,27 +87,15 @@ internal class DataUploadRunnable(
@Suppress("UnsafeThirdPartyFunctionCall") // called inside a dedicated executor
private fun handleNextBatch(context: DatadogContext): UploadStatus? {
var uploadStatus: UploadStatus? = null
val lock = CountDownLatch(1)
storage.readNextBatch(
noBatchCallback = {
lock.countDown()
}
) { batchId, reader ->
try {
val batch = reader.read()
val batchMeta = reader.currentMetadata()

uploadStatus = consumeBatch(
context,
batchId,
batch,
batchMeta
)
} finally {
lock.countDown()
}
val nextBatchData = storage.readNextBatch()
if (nextBatchData != null) {
uploadStatus = consumeBatch(
context,
nextBatchData.id,
nextBatchData.data,
nextBatchData.metadata
)
}
lock.await(batchUploadWaitTimeoutMs, TimeUnit.MILLISECONDS)
return uploadStatus
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,17 @@ internal class AbstractStorage(
}
}

@WorkerThread
override fun readNextBatch(): BatchData? {
return grantedPersistenceStrategy.lockAndReadNext()?.let {
BatchData(
id = BatchId(it.batchId),
data = it.events,
metadata = it.metadata
)
}
}

@WorkerThread
override fun confirmBatchRead(
batchId: BatchId,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Unless explicitly stated otherwise all files in this repository are licensed under the Apache License Version 2.0.
* This product includes software developed at Datadog (https://www.datadoghq.com/).
* Copyright 2016-Present Datadog, Inc.
*/

package com.datadog.android.core.internal.persistence

import com.datadog.android.api.storage.RawBatchEvent

internal data class BatchData(
val id: BatchId,
val data: List<RawBatchEvent>,
val metadata: ByteArray? = null
) {
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false

other as BatchData

if (id != other.id) return false
if (data != other.data) return false
if (metadata != null) {
if (other.metadata == null) return false
if (!metadata.contentEquals(other.metadata)) return false
} else if (other.metadata != null) return false

return true
}

override fun hashCode(): Int {
var result = id.hashCode()
result = 31 * result + data.hashCode()
result = 31 * result + (metadata?.contentHashCode() ?: 0)
return result
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,28 @@ internal class ConsentAwareStorage(
readBatchCallback(batchId, reader)
}

@WorkerThread
override fun readNextBatch(): BatchData? {
val (batchFile, metaFile) = synchronized(lockedBatches) {
val batchFile = grantedOrchestrator
.getReadableFile(lockedBatches.map { it.file }.toSet()) ?: return null

val metaFile = grantedOrchestrator.getMetadataFile(batchFile)
lockedBatches.add(Batch(batchFile, metaFile))
batchFile to metaFile
}

val batchId = BatchId.fromFile(batchFile)
val batchMetadata = if (metaFile == null || !metaFile.existsSafe(internalLogger)) {
null
} else {
batchMetadataReaderWriter.readData(metaFile)
}
val batchData = batchEventsReaderWriter.readData(batchFile)

return BatchData(id = batchId, data = batchData, metadata = batchMetadata)
}

/** @inheritdoc */
@WorkerThread
override fun confirmBatchRead(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ internal interface Storage {
readBatchCallback: (BatchId, BatchReader) -> Unit
)

/**
* Utility to read a batch, synchronously.
*/
@WorkerThread
fun readNextBatch(): BatchData?

/**
* Utility to update the state of a batch, asynchronously.
* @param batchId the id of the Batch to confirm
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.datadog.android.core.internal.SdkFeature
import com.datadog.android.core.internal.data.upload.v2.DataUploader
import com.datadog.android.core.internal.metrics.RemovalReason
import com.datadog.android.core.internal.persistence.BatchConfirmation
import com.datadog.android.core.internal.persistence.BatchData
import com.datadog.android.core.internal.persistence.BatchId
import com.datadog.android.core.internal.persistence.BatchReader
import com.datadog.android.core.internal.persistence.Storage
Expand Down Expand Up @@ -717,6 +718,10 @@ internal class UploadWorkerTest {
}
}

override fun readNextBatch(): BatchData? {
return delegate.readNextBatch()
}

override fun confirmBatchRead(
batchId: BatchId,
removalReason: RemovalReason,
Expand Down
Loading

0 comments on commit 1183b5e

Please sign in to comment.