Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix] external_id is skipped and updates stop if something updates the User (such as addTag) shortly before login is called #2046

Merged
merged 7 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.onesignal.core.internal.operations

import kotlin.reflect.KClass

/**
* The operation queue provides a mechanism to queue one or more [Operation] with the promise
* it will be executed in a background thread at some point in the future. Operations are
Expand Down Expand Up @@ -31,4 +33,13 @@ interface IOperationRepo {
operation: Operation,
flush: Boolean = false,
): Boolean

/**
* Check if the queue contains a specific operation type
*/
fun <T : Operation> containsInstanceOf(type: KClass<T>): Boolean
}

// Extension function so the syntax containsInstanceOf<Operation>() can be used over
// containsInstanceOf(Operation::class)
inline fun <reified T : Operation> IOperationRepo.containsInstanceOf(): Boolean = containsInstanceOf(T::class)
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import com.onesignal.debug.internal.logging.Logging
import kotlinx.coroutines.delay
import kotlinx.coroutines.withTimeoutOrNull
import java.util.UUID
import kotlin.reflect.KClass

internal class OperationRepo(
executors: List<IOperationExecutor>,
Expand All @@ -26,7 +27,11 @@ internal class OperationRepo(
val operation: Operation,
val waiter: WaiterWithValue<Boolean>? = null,
var retries: Int = 0,
)
) {
override fun toString(): String {
return Pair(operation.toString(), retries).toString() + "\n"
}
}

private val executorsMap: Map<String, IOperationExecutor>
private val queue = mutableListOf<OperationQueueItem>()
Expand All @@ -48,6 +53,12 @@ internal class OperationRepo(
}
}

override fun <T : Operation> containsInstanceOf(type: KClass<T>): Boolean {
synchronized(queue) {
return queue.any { type.isInstance(it.operation) }
}
}

override fun start() {
paused = false
suspendifyOnThread(name = "OpRepo") {
Expand Down Expand Up @@ -105,7 +116,7 @@ internal class OperationRepo(
}

val ops = getNextOps()
Logging.debug("processQueueForever:ops:$ops")
Logging.debug("processQueueForever:ops:\n$ops")

if (ops != null) {
executeOperations(ops)
Expand Down Expand Up @@ -274,6 +285,10 @@ internal class OperationRepo(
val itemKey =
if (startingOp.operation.groupComparisonType == GroupComparisonType.CREATE) item.operation.createComparisonKey else item.operation.modifyComparisonKey

if (itemKey == "" && startingKey == "") {
throw Exception("Both comparison keys can not be blank!")
}

if (itemKey == startingKey) {
queue.remove(item)
ops.add(item)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import com.onesignal.user.internal.backend.impl.UserBackendService
import com.onesignal.user.internal.builduser.IRebuildUserService
import com.onesignal.user.internal.builduser.impl.RebuildUserService
import com.onesignal.user.internal.identity.IdentityModelStore
import com.onesignal.user.internal.migrations.RecoverFromDroppedLoginBug
import com.onesignal.user.internal.operations.impl.executors.IdentityOperationExecutor
import com.onesignal.user.internal.operations.impl.executors.LoginUserFromSubscriptionOperationExecutor
import com.onesignal.user.internal.operations.impl.executors.LoginUserOperationExecutor
Expand Down Expand Up @@ -65,5 +66,7 @@ internal class UserModule : IModule {
builder.register<UserManager>().provides<IUserManager>()

builder.register<UserRefreshService>().provides<IStartableService>()

builder.register<RecoverFromDroppedLoginBug>().provides<IStartableService>()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package com.onesignal.user.internal.migrations

import com.onesignal.common.IDManager
import com.onesignal.core.internal.config.ConfigModelStore
import com.onesignal.core.internal.operations.IOperationRepo
import com.onesignal.core.internal.operations.containsInstanceOf
import com.onesignal.core.internal.startup.IStartableService
import com.onesignal.debug.internal.logging.Logging
import com.onesignal.user.internal.identity.IdentityModelStore
import com.onesignal.user.internal.operations.LoginUserOperation

/**
* Purpose: Automatically recovers a stalled User in the OperationRepo due
* to a bug in the SDK from 5.0.0 to 5.1.7.
*
* Issue: Some calls to OneSignal.login() would not be reflected on the
* backend and would stall the the queue for that User. This would result
* in User and Subscription operations to not be processed by
* OperationRepo.
* See PR #2046 for more details.
*
* Even if the developer called OneSignal.login() again with the same
* externalId it would not correct the stalled User.
* - Only calling logout() or login() with different externalId would
* have un-stalled the OperationRepo. And then only after logging
* back to the stalled user would it have recover all the unsent
* operations they may exist.
*/
class RecoverFromDroppedLoginBug(
private val _operationRepo: IOperationRepo,
private val _identityModelStore: IdentityModelStore,
private val _configModelStore: ConfigModelStore,
) : IStartableService {
override fun start() {
if (isInBadState()) {
Logging.warn(
"User with externalId:" +
"${_identityModelStore.model.externalId} " +
"was in a bad state, causing it to not update on OneSignal's " +
"backend! We are recovering and replaying all unsent " +
"operations now.",
)
recoverByAddingBackDroppedLoginOperation()
}
}

// We are in the bad state if ALL are true:
// 1. externalId is set (because OneSignal.login was called)
// 2. We don't have a real yet onesignalId
// - We haven't made a successful user create call yet.
// 3. There is no attempt to create the User left in the
// OperationRepo's queue.
private fun isInBadState(): Boolean {
val externalId = _identityModelStore.model.externalId
val onesignalId = _identityModelStore.model.onesignalId

// NOTE: We are not accounting a more rare (and less important case)
// where a previously logged in User was never created.
// That is, if another user already logged in successfully, but
// the last user still has stuck pending operations due to the
// User never being created on the OneSignal's backend.
return externalId != null &&
IDManager.isLocalId(onesignalId) &&
!_operationRepo.containsInstanceOf<LoginUserOperation>()
}

private fun recoverByAddingBackDroppedLoginOperation() {
// This is the operation that was dropped by mistake in some cases,
// once it is added to the queue all and it gets executed, all
// operations waiting on it will be sent.

// This enqueues at the end, however this is ok, since
// the OperationRepo is designed find the first operation that is
// executable.
_operationRepo.enqueue(
LoginUserOperation(
_configModelStore.model.appId,
_identityModelStore.model.onesignalId,
_identityModelStore.model.externalId,
null,
),
true,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class DeleteTagOperation() : Operation(UpdateUserOperationExecutor.DELETE_TAG) {
}

override val createComparisonKey: String get() = ""
override val modifyComparisonKey: String get() = createComparisonKey
override val modifyComparisonKey: String get() = "$appId.User.$onesignalId"
override val groupComparisonType: GroupComparisonType = GroupComparisonType.ALTER
override val canStartExecute: Boolean get() = !IDManager.isLocalId(onesignalId)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class SetPropertyOperation() : Operation(UpdateUserOperationExecutor.SET_PROPERT
}

override val createComparisonKey: String get() = ""
override val modifyComparisonKey: String get() = createComparisonKey
override val modifyComparisonKey: String get() = "$appId.User.$onesignalId"
override val groupComparisonType: GroupComparisonType = GroupComparisonType.ALTER
override val canStartExecute: Boolean get() = !IDManager.isLocalId(onesignalId)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class SetTagOperation() : Operation(UpdateUserOperationExecutor.SET_TAG) {
}

override val createComparisonKey: String get() = ""
override val modifyComparisonKey: String get() = createComparisonKey
override val modifyComparisonKey: String get() = "$appId.User.$onesignalId"
override val groupComparisonType: GroupComparisonType = GroupComparisonType.ALTER
override val canStartExecute: Boolean get() = !IDManager.isLocalId(onesignalId)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@ internal class IdentityOperationExecutor(
override suspend fun execute(operations: List<Operation>): ExecutionResponse {
Logging.debug("IdentityOperationExecutor(operations: $operations)")

if (operations.any { it !is SetAliasOperation && it !is DeleteAliasOperation }) {
throw Exception("Unrecognized operation(s)! Attempted operations:\n$operations")
}

if (operations.any { it is SetAliasOperation } &&
operations.any { it is DeleteAliasOperation }
) {
throw Exception("Can't process SetAliasOperation and DeleteAliasOperation at the same time.")
}

// An alias group is an appId/onesignalId/aliasLabel combo, so we only care
// about the last operation in the group, as that will be the effective end
// state to this specific alias for this user.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ internal class LoginUserFromSubscriptionOperationExecutor(
override suspend fun execute(operations: List<Operation>): ExecutionResponse {
Logging.debug("LoginUserFromSubscriptionOperationExecutor(operation: $operations)")

if (operations.size > 1) {
throw Exception("Only supports one operation! Attempted operations:\n$operations")
}

val startingOp = operations.first()

if (startingOp is LoginUserFromSubscriptionOperation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ internal class LoginUserOperationExecutor(
val startingOp = operations.first()

if (startingOp is LoginUserOperation) {
return loginUser(startingOp, operations)
return loginUser(startingOp, operations.drop(1))
}

throw Exception("Unrecognized operation: $startingOp")
Expand Down Expand Up @@ -153,6 +153,7 @@ internal class LoginUserOperationExecutor(
is TransferSubscriptionOperation -> subscriptions = createSubscriptionsFromOperation(operation, subscriptions)
is UpdateSubscriptionOperation -> subscriptions = createSubscriptionsFromOperation(operation, subscriptions)
is DeleteSubscriptionOperation -> subscriptions = createSubscriptionsFromOperation(operation, subscriptions)
else -> throw Exception("Unrecognized operation: $operation")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ internal class RefreshUserOperationExecutor(
override suspend fun execute(operations: List<Operation>): ExecutionResponse {
Logging.log(LogLevel.DEBUG, "RefreshUserOperationExecutor(operation: $operations)")

if (operations.any { it !is RefreshUserOperation }) {
throw Exception("Unrecognized operation(s)! Attempted operations:\n$operations")
}

val startingOp = operations.first()
if (startingOp is RefreshUserOperation) {
return getUser(startingOp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,17 @@ internal class SubscriptionOperationExecutor(
return if (startingOp is CreateSubscriptionOperation) {
createSubscription(startingOp, operations)
} else if (operations.any { it is DeleteSubscriptionOperation }) {
deleteSubscription(operations.first { it is DeleteSubscriptionOperation } as DeleteSubscriptionOperation)
if (operations.size > 1) {
throw Exception("Only supports one operation! Attempted operations:\n$operations")
}
val deleteSubOps = operations.filterIsInstance<DeleteSubscriptionOperation>()
deleteSubscription(deleteSubOps.first())
} else if (startingOp is UpdateSubscriptionOperation) {
updateSubscription(startingOp, operations)
} else if (startingOp is TransferSubscriptionOperation) {
if (operations.size > 1) {
throw Exception("TransferSubscriptionOperation only supports one operation! Attempted operations:\n$operations")
}
transferSubscription(startingOp)
} else {
throw Exception("Unrecognized operation: $startingOp")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ internal class UpdateUserOperationExecutor(

deltasObject = PropertiesDeltasObject(deltasObject.sessionTime, deltasObject.sessionCount, amountSpent, purchasesArray)
}
else -> throw Exception("Unrecognized operation: $operation")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,28 @@ class OperationRepoTests : FunSpec({
Logging.logLevel = LogLevel.NONE
}

test("containsInstanceOf") {
// Given
val operationRepo = Mocks().operationRepo

open class MyOperation : Operation("MyOp") {
override val createComparisonKey = ""
override val modifyComparisonKey = ""
override val groupComparisonType = GroupComparisonType.NONE
override val canStartExecute = false
}

class MyOperation2 : MyOperation()

// When
operationRepo.start()
operationRepo.enqueue(MyOperation())

// Then
operationRepo.containsInstanceOf<MyOperation>() shouldBe true
operationRepo.containsInstanceOf<MyOperation2>() shouldBe false
}

// Ensures we are not continuously waking the CPU
test("ensure processQueueForever suspends when queue is empty") {
// Given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,10 +333,6 @@ class LoginUserOperationExecutorTests : FunSpec({
val operations =
listOf<Operation>(
LoginUserOperation(appId, localOneSignalId, null, null),
SetAliasOperation(appId, localOneSignalId, "aliasLabel1", "aliasValue1-1"),
SetAliasOperation(appId, localOneSignalId, "aliasLabel1", "aliasValue1-2"),
SetAliasOperation(appId, localOneSignalId, "aliasLabel2", "aliasValue2"),
DeleteAliasOperation(appId, localOneSignalId, "aliasLabel2"),
CreateSubscriptionOperation(
appId,
localOneSignalId,
Expand Down Expand Up @@ -365,20 +361,6 @@ class LoginUserOperationExecutorTests : FunSpec({
SubscriptionStatus.SUBSCRIBED,
),
DeleteSubscriptionOperation(appId, localOneSignalId, "subscriptionId2"),
SetTagOperation(appId, localOneSignalId, "tagKey1", "tagValue1-1"),
SetTagOperation(appId, localOneSignalId, "tagKey1", "tagValue1-2"),
SetTagOperation(appId, localOneSignalId, "tagKey2", "tagValue2"),
DeleteTagOperation(appId, localOneSignalId, "tagKey2"),
SetPropertyOperation(appId, localOneSignalId, PropertiesModel::language.name, "lang1"),
SetPropertyOperation(appId, localOneSignalId, PropertiesModel::language.name, "lang2"),
SetPropertyOperation(appId, localOneSignalId, PropertiesModel::timezone.name, "timezone"),
SetPropertyOperation(appId, localOneSignalId, PropertiesModel::country.name, "country"),
SetPropertyOperation(appId, localOneSignalId, PropertiesModel::locationLatitude.name, 123.45),
SetPropertyOperation(appId, localOneSignalId, PropertiesModel::locationLongitude.name, 678.90),
SetPropertyOperation(appId, localOneSignalId, PropertiesModel::locationType.name, 1),
SetPropertyOperation(appId, localOneSignalId, PropertiesModel::locationAccuracy.name, 0.15),
SetPropertyOperation(appId, localOneSignalId, PropertiesModel::locationBackground.name, true),
SetPropertyOperation(appId, localOneSignalId, PropertiesModel::locationTimestamp.name, 1111L),
)

// When
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,15 +484,6 @@ class SubscriptionOperationExecutorTests : FunSpec({

val operations =
listOf<Operation>(
UpdateSubscriptionOperation(
appId,
remoteOneSignalId,
remoteSubscriptionId,
SubscriptionType.PUSH,
true,
"pushToken2",
SubscriptionStatus.SUBSCRIBED,
),
DeleteSubscriptionOperation(appId, remoteOneSignalId, remoteSubscriptionId),
)

Expand Down
Loading