Skip to content

Commit

Permalink
Merge pull request #1182 from DataDog/nogorodnikov/improve-batch-wait…
Browse files Browse the repository at this point in the history
…-timeout-handling-for-the-batch-upload

Improve batch upload wait timeout handling
  • Loading branch information
0xnm authored Dec 9, 2022
2 parents 6c6af82 + c6f156a commit b37b44f
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ internal class DataUploadRunnable(
private val contextProvider: ContextProvider,
private val networkInfoProvider: NetworkInfoProvider,
private val systemInfoProvider: SystemInfoProvider,
uploadFrequency: UploadFrequency
uploadFrequency: UploadFrequency,
private val batchUploadWaitTimeoutMs: Long = CoreFeature.NETWORK_TIMEOUT_MS
) : UploadRunnable {

internal var currentDelayIntervalMs = DEFAULT_DELAY_FACTOR * uploadFrequency.baseStepMs
Expand All @@ -55,18 +56,21 @@ internal class DataUploadRunnable(
lock.countDown()
}
) { batchId, reader ->
val batch = reader.read()
val batchMeta = reader.currentMetadata()

consumeBatch(
context,
batchId,
batch,
batchMeta
)
lock.countDown()
try {
val batch = reader.read()
val batchMeta = reader.currentMetadata()

consumeBatch(
context,
batchId,
batch,
batchMeta
)
} finally {
lock.countDown()
}
}
lock.await(CoreFeature.NETWORK_TIMEOUT_MS, TimeUnit.MILLISECONDS)
lock.await(batchUploadWaitTimeoutMs, TimeUnit.MILLISECONDS)
}

scheduleNextUpload()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import com.datadog.android.v2.core.internal.storage.BatchConfirmation
import com.datadog.android.v2.core.internal.storage.BatchId
import com.datadog.android.v2.core.internal.storage.BatchReader
import com.datadog.android.v2.core.internal.storage.Storage
import com.datadog.tools.unit.forge.aThrowable
import com.nhaarman.mockitokotlin2.any
import com.nhaarman.mockitokotlin2.argumentCaptor
import com.nhaarman.mockitokotlin2.doAnswer
import com.nhaarman.mockitokotlin2.doReturn
import com.nhaarman.mockitokotlin2.doThrow
import com.nhaarman.mockitokotlin2.eq
import com.nhaarman.mockitokotlin2.mock
import com.nhaarman.mockitokotlin2.same
Expand Down Expand Up @@ -113,7 +115,8 @@ internal class DataUploadRunnableTest {
mockContextProvider,
mockNetworkInfoProvider,
mockSystemInfoProvider,
fakeUploadFrequency
fakeUploadFrequency,
TEST_BATCH_UPLOAD_WAIT_TIMEOUT_MS
)
}

Expand Down Expand Up @@ -328,7 +331,7 @@ internal class DataUploadRunnableTest {
}

@Test
fun `M not send batch W run() { batteryFullOrCharging, powerSaveMode}`(
fun `M not send batch W run() { batteryFullOrCharging, powerSaveMode }`(
@IntForgery(min = 0, max = 100) batteryLevel: Int
) {
// Given
Expand All @@ -353,7 +356,7 @@ internal class DataUploadRunnableTest {
}

@Test
fun `M not send batch W run() { batteryLeveHigh, powerSaveMode}`(
fun `M not send batch W run() { batteryLeveHigh, powerSaveMode }`(
@IntForgery(min = DataUploadRunnable.LOW_BATTERY_THRESHOLD + 1) batteryLevel: Int
) {
// Given
Expand All @@ -378,7 +381,7 @@ internal class DataUploadRunnableTest {
}

@Test
fun `M not send batch W run() { onExternalPower, powerSaveMode}`(
fun `M not send batch W run() { onExternalPower, powerSaveMode }`(
@IntForgery(min = 0, max = DataUploadRunnable.LOW_BATTERY_THRESHOLD) batteryLevel: Int
) {
// Given
Expand Down Expand Up @@ -880,4 +883,73 @@ internal class DataUploadRunnableTest {
}
}
}

// region async

@Test
fun `𝕄 respect batch wait upload timeout 𝕎 run()`() {
// Given
whenever(mockStorage.readNextBatch(any(), any())) doAnswer {
// imitate async which never completes
}

// When
testedRunnable.run()

// Then
verify(mockThreadPoolExecutor).schedule(
same(testedRunnable),
any(),
eq(TimeUnit.MILLISECONDS)
)
}

@Test
fun `𝕄 stop waiting 𝕎 run() { exception is thrown }`(
@StringForgery batch: List<String>,
@StringForgery batchMeta: String,
forge: Forge
) {
// Given
val batchId = mock<BatchId>()
val batchReader = mock<BatchReader>()
val batchData = batch.map { it.toByteArray() }
val batchMetadata = forge.aNullable { batchMeta.toByteArray() }

whenever(batchReader.read()) doReturn batchData
whenever(batchReader.currentMetadata()) doReturn batchMetadata

whenever(mockStorage.readNextBatch(any(), any())) doAnswer {
Thread {
it.getArgument<(BatchId, BatchReader) -> Unit>(1).invoke(batchId, batchReader)
}.start()
}

whenever(
mockDataUploader.upload(
fakeContext,
batchData,
batchMetadata
)
) doThrow forge.aThrowable()

// When
val start = System.currentTimeMillis()
testedRunnable.run()

// Then
assertThat(System.currentTimeMillis() - start)
.isLessThan(TEST_BATCH_UPLOAD_WAIT_TIMEOUT_MS)
verify(mockThreadPoolExecutor).schedule(
same(testedRunnable),
any(),
eq(TimeUnit.MILLISECONDS)
)
}

// endregion

companion object {
const val TEST_BATCH_UPLOAD_WAIT_TIMEOUT_MS = 100L
}
}

0 comments on commit b37b44f

Please sign in to comment.