Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix] grouping skipping opRepoPostCreateDelay, causing operations being applied out of order when multiple login operations are pending. (fixes issue since 5.1.10) #2087

Merged
merged 7 commits into from
May 16, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,12 @@ internal class OperationRepo(
flush: Boolean,
addToStore: Boolean,
index: Int? = null,
): Boolean {
) {
synchronized(queue) {
val hasExisting = queue.any { it.operation.id == queueItem.operation.id }
if (hasExisting) {
Logging.debug("OperationRepo: internalEnqueue - operation.id: ${queueItem.operation.id} already exists in the queue.")
return false
return
}

if (index != null) {
Expand All @@ -153,7 +153,6 @@ internal class OperationRepo(
}

waiter.wake(LoopWaiterMessage(flush, 0))
return true
}

/**
Expand Down Expand Up @@ -236,12 +235,17 @@ internal class OperationRepo(
queue.forEach { it.operation.translateIds(response.idTranslations) }
}
response.idTranslations.values.forEach { _newRecordState.add(it) }
coroutineScope.launch {
val waitTime = _configModelStore.model.opRepoPostCreateDelay
delay(waitTime)
synchronized(queue) {
if (queue.isNotEmpty()) waiter.wake(LoopWaiterMessage(false, waitTime))
}
// Stall processing the queue so the backend's DB has to time
// reflect the change before we do any other operations to it.
// NOTE: Future: We could run this logic in a
// coroutineScope.launch() block so other operations not
// effecting this these id's can still be done in parallel,
// however other parts of the system don't currently account
// for this so this is not safe to do.
val waitTime = _configModelStore.model.opRepoPostCreateDelay
delay(waitTime)
synchronized(queue) {
if (queue.isNotEmpty()) waiter.wake(LoopWaiterMessage(false, waitTime))
}
}

Expand Down Expand Up @@ -359,8 +363,7 @@ internal class OperationRepo(
* THIS SHOULD BE CALLED WHILE THE QUEUE IS SYNCHRONIZED!!
*/
private fun getGroupableOperations(startingOp: OperationQueueItem): List<OperationQueueItem> {
val ops = mutableListOf<OperationQueueItem>()
ops.add(startingOp)
val ops = mutableListOf(startingOp)

if (startingOp.operation.groupComparisonType == GroupComparisonType.NONE) {
return ops
Expand All @@ -373,23 +376,25 @@ internal class OperationRepo(
startingOp.operation.modifyComparisonKey
}

if (queue.isNotEmpty()) {
for (item in queue.toList()) {
val itemKey =
if (startingOp.operation.groupComparisonType == GroupComparisonType.CREATE) {
item.operation.createComparisonKey
} else {
item.operation.modifyComparisonKey
}

if (itemKey == "" && startingKey == "") {
throw Exception("Both comparison keys can not be blank!")
for (item in queue.toList()) {
val itemKey =
if (startingOp.operation.groupComparisonType == GroupComparisonType.CREATE) {
item.operation.createComparisonKey
} else {
item.operation.modifyComparisonKey
}

if (itemKey == startingKey) {
queue.remove(item)
ops.add(item)
}
if (itemKey == "" && startingKey == "") {
throw Exception("Both comparison keys can not be blank!")
}

if (!_newRecordState.canAccess(item.operation.applyToRecordId)) {
continue
}

if (itemKey == startingKey) {
queue.remove(item)
ops.add(item)
}
}

Expand All @@ -398,25 +403,18 @@ internal class OperationRepo(

/**
* Load saved operations from preference service and add them into the queue
* WARNING: Make sure queue.remove is NEVER called while this method is
* running, as internalEnqueue will throw IndexOutOfBounds or put things
* out of order if what was removed was something added by this method.
* - This never happens now, but is a landmine to be aware of!
* NOTE: Sometimes the loading might take longer than expected due to I/O reads from disk
* Any I/O implies executing time will vary greatly.
* NOTE: Sometimes the loading might take longer than expected due to I/O reads from disk,
* so always ensure this is call off the main thread.
*/
internal fun loadSavedOperations() {
_operationModelStore.loadOperations()
var successfulIndex = 0
for (operation in _operationModelStore.list()) {
val successful =
internalEnqueue(
OperationQueueItem(operation, bucket = enqueueIntoBucket),
flush = false,
addToStore = false,
index = successfulIndex,
)
if (successful) successfulIndex++
for (operation in _operationModelStore.list().reversed()) {
internalEnqueue(
OperationQueueItem(operation, bucket = enqueueIntoBucket),
flush = false,
addToStore = false,
index = 0,
)
}
initialized.complete(Unit)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class NewRecordsState(

fun canAccess(key: String): Boolean {
val timeLastMovedOrCreated = records[key] ?: return true
return _time.currentTimeMillis - timeLastMovedOrCreated > _configModelStore.model.opRepoPostCreateDelay
return _time.currentTimeMillis - timeLastMovedOrCreated >= _configModelStore.model.opRepoPostCreateDelay
}

fun isInMissingRetryWindow(key: String): Boolean {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import io.mockk.spyk
import io.mockk.verify
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.time.withTimeout
import kotlinx.coroutines.withTimeout
import kotlinx.coroutines.withTimeoutOrNull
import kotlinx.coroutines.yield
Expand Down Expand Up @@ -547,31 +546,42 @@ class OperationRepoTests : FunSpec({
mocks.operationRepo.start()
mocks.operationRepo.enqueue(operation1)
val job = launch { mocks.operationRepo.enqueueAndWait(operation2) }.also { yield() }
mocks.operationRepo.enqueue(operation3)
mocks.operationRepo.enqueueAndWait(operation3)
job.join()

// Then
coVerifyOrder {
mocks.executor.execute(
withArg {
it.count() shouldBe 1
it[0] shouldBe operation1
},
)
mocks.executor.execute(listOf(operation1))
operation2.translateIds(mapOf("local-id1" to "id2"))
mocks.executor.execute(
withArg {
it.count() shouldBe 1
it[0] shouldBe operation3
},
)
// Ensure operation2 runs after operation3 as it has to wait for the create delay
mocks.executor.execute(
withArg {
it.count() shouldBe 1
it[0] shouldBe operation2
},
)
mocks.executor.execute(listOf(operation2))
mocks.executor.execute(listOf(operation3))
}
}

// This tests the same logic as above, but makes sure the delay also
// applies to grouping operations.
test("execution of an operation with translation IDs delays follow up operations, including grouping") {
// Given
val mocks = Mocks()
mocks.configModelStore.model.opRepoPostCreateDelay = 100
val operation1 = mockOperation(groupComparisonType = GroupComparisonType.NONE)
val operation2 = mockOperation(groupComparisonType = GroupComparisonType.CREATE)
val operation3 = mockOperation(groupComparisonType = GroupComparisonType.CREATE, applyToRecordId = "id2")
coEvery {
mocks.executor.execute(listOf(operation1))
} returns ExecutionResponse(ExecutionResult.SUCCESS, mapOf("local-id1" to "id2"))

// When
mocks.operationRepo.start()
mocks.operationRepo.enqueue(operation1)
mocks.operationRepo.enqueue(operation2)
mocks.operationRepo.enqueueAndWait(operation3)

// Then
coVerifyOrder {
mocks.executor.execute(listOf(operation1))
operation2.translateIds(mapOf("local-id1" to "id2"))
mocks.executor.execute(listOf(operation2, operation3))
}
}

Expand Down
Loading