diff --git a/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt b/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt index 88c13b03da..42adaae1da 100644 --- a/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt +++ b/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt @@ -134,12 +134,12 @@ internal class OperationRepo( flush: Boolean, addToStore: Boolean, index: Int? = null, - ): Boolean { + ) { synchronized(queue) { val hasExisting = queue.any { it.operation.id == queueItem.operation.id } if (hasExisting) { Logging.debug("OperationRepo: internalEnqueue - operation.id: ${queueItem.operation.id} already exists in the queue.") - return false + return } if (index != null) { @@ -153,7 +153,6 @@ internal class OperationRepo( } waiter.wake(LoopWaiterMessage(flush, 0)) - return true } /** @@ -236,12 +235,17 @@ internal class OperationRepo( queue.forEach { it.operation.translateIds(response.idTranslations) } } response.idTranslations.values.forEach { _newRecordState.add(it) } - coroutineScope.launch { - val waitTime = _configModelStore.model.opRepoPostCreateDelay - delay(waitTime) - synchronized(queue) { - if (queue.isNotEmpty()) waiter.wake(LoopWaiterMessage(false, waitTime)) - } + // Stall processing the queue so the backend's DB has to time + // reflect the change before we do any other operations to it. + // NOTE: Future: We could run this logic in a + // coroutineScope.launch() block so other operations not + // effecting this these id's can still be done in parallel, + // however other parts of the system don't currently account + // for this so this is not safe to do. + val waitTime = _configModelStore.model.opRepoPostCreateDelay + delay(waitTime) + synchronized(queue) { + if (queue.isNotEmpty()) waiter.wake(LoopWaiterMessage(false, waitTime)) } } @@ -359,8 +363,7 @@ internal class OperationRepo( * THIS SHOULD BE CALLED WHILE THE QUEUE IS SYNCHRONIZED!! */ private fun getGroupableOperations(startingOp: OperationQueueItem): List { - val ops = mutableListOf() - ops.add(startingOp) + val ops = mutableListOf(startingOp) if (startingOp.operation.groupComparisonType == GroupComparisonType.NONE) { return ops @@ -373,23 +376,25 @@ internal class OperationRepo( startingOp.operation.modifyComparisonKey } - if (queue.isNotEmpty()) { - for (item in queue.toList()) { - val itemKey = - if (startingOp.operation.groupComparisonType == GroupComparisonType.CREATE) { - item.operation.createComparisonKey - } else { - item.operation.modifyComparisonKey - } - - if (itemKey == "" && startingKey == "") { - throw Exception("Both comparison keys can not be blank!") + for (item in queue.toList()) { + val itemKey = + if (startingOp.operation.groupComparisonType == GroupComparisonType.CREATE) { + item.operation.createComparisonKey + } else { + item.operation.modifyComparisonKey } - if (itemKey == startingKey) { - queue.remove(item) - ops.add(item) - } + if (itemKey == "" && startingKey == "") { + throw Exception("Both comparison keys can not be blank!") + } + + if (!_newRecordState.canAccess(item.operation.applyToRecordId)) { + continue + } + + if (itemKey == startingKey) { + queue.remove(item) + ops.add(item) } } @@ -398,25 +403,18 @@ internal class OperationRepo( /** * Load saved operations from preference service and add them into the queue - * WARNING: Make sure queue.remove is NEVER called while this method is - * running, as internalEnqueue will throw IndexOutOfBounds or put things - * out of order if what was removed was something added by this method. - * - This never happens now, but is a landmine to be aware of! - * NOTE: Sometimes the loading might take longer than expected due to I/O reads from disk - * Any I/O implies executing time will vary greatly. + * NOTE: Sometimes the loading might take longer than expected due to I/O reads from disk, + * so always ensure this is call off the main thread. */ internal fun loadSavedOperations() { _operationModelStore.loadOperations() - var successfulIndex = 0 - for (operation in _operationModelStore.list()) { - val successful = - internalEnqueue( - OperationQueueItem(operation, bucket = enqueueIntoBucket), - flush = false, - addToStore = false, - index = successfulIndex, - ) - if (successful) successfulIndex++ + for (operation in _operationModelStore.list().reversed()) { + internalEnqueue( + OperationQueueItem(operation, bucket = enqueueIntoBucket), + flush = false, + addToStore = false, + index = 0, + ) } initialized.complete(Unit) } diff --git a/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/user/internal/operations/impl/states/NewRecordsState.kt b/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/user/internal/operations/impl/states/NewRecordsState.kt index e884fc7f6d..cdc44a4c38 100644 --- a/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/user/internal/operations/impl/states/NewRecordsState.kt +++ b/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/user/internal/operations/impl/states/NewRecordsState.kt @@ -23,7 +23,7 @@ class NewRecordsState( fun canAccess(key: String): Boolean { val timeLastMovedOrCreated = records[key] ?: return true - return _time.currentTimeMillis - timeLastMovedOrCreated > _configModelStore.model.opRepoPostCreateDelay + return _time.currentTimeMillis - timeLastMovedOrCreated >= _configModelStore.model.opRepoPostCreateDelay } fun isInMissingRetryWindow(key: String): Boolean { diff --git a/OneSignalSDK/onesignal/core/src/test/java/com/onesignal/core/internal/operations/OperationRepoTests.kt b/OneSignalSDK/onesignal/core/src/test/java/com/onesignal/core/internal/operations/OperationRepoTests.kt index aada5b40fd..1094b06671 100644 --- a/OneSignalSDK/onesignal/core/src/test/java/com/onesignal/core/internal/operations/OperationRepoTests.kt +++ b/OneSignalSDK/onesignal/core/src/test/java/com/onesignal/core/internal/operations/OperationRepoTests.kt @@ -25,7 +25,6 @@ import io.mockk.spyk import io.mockk.verify import kotlinx.coroutines.delay import kotlinx.coroutines.launch -import kotlinx.coroutines.time.withTimeout import kotlinx.coroutines.withTimeout import kotlinx.coroutines.withTimeoutOrNull import kotlinx.coroutines.yield @@ -547,31 +546,42 @@ class OperationRepoTests : FunSpec({ mocks.operationRepo.start() mocks.operationRepo.enqueue(operation1) val job = launch { mocks.operationRepo.enqueueAndWait(operation2) }.also { yield() } - mocks.operationRepo.enqueue(operation3) + mocks.operationRepo.enqueueAndWait(operation3) job.join() // Then coVerifyOrder { - mocks.executor.execute( - withArg { - it.count() shouldBe 1 - it[0] shouldBe operation1 - }, - ) + mocks.executor.execute(listOf(operation1)) operation2.translateIds(mapOf("local-id1" to "id2")) - mocks.executor.execute( - withArg { - it.count() shouldBe 1 - it[0] shouldBe operation3 - }, - ) - // Ensure operation2 runs after operation3 as it has to wait for the create delay - mocks.executor.execute( - withArg { - it.count() shouldBe 1 - it[0] shouldBe operation2 - }, - ) + mocks.executor.execute(listOf(operation2)) + mocks.executor.execute(listOf(operation3)) + } + } + + // This tests the same logic as above, but makes sure the delay also + // applies to grouping operations. + test("execution of an operation with translation IDs delays follow up operations, including grouping") { + // Given + val mocks = Mocks() + mocks.configModelStore.model.opRepoPostCreateDelay = 100 + val operation1 = mockOperation(groupComparisonType = GroupComparisonType.NONE) + val operation2 = mockOperation(groupComparisonType = GroupComparisonType.CREATE) + val operation3 = mockOperation(groupComparisonType = GroupComparisonType.CREATE, applyToRecordId = "id2") + coEvery { + mocks.executor.execute(listOf(operation1)) + } returns ExecutionResponse(ExecutionResult.SUCCESS, mapOf("local-id1" to "id2")) + + // When + mocks.operationRepo.start() + mocks.operationRepo.enqueue(operation1) + mocks.operationRepo.enqueue(operation2) + mocks.operationRepo.enqueueAndWait(operation3) + + // Then + coVerifyOrder { + mocks.executor.execute(listOf(operation1)) + operation2.translateIds(mapOf("local-id1" to "id2")) + mocks.executor.execute(listOf(operation2, operation3)) } }