Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read-Your-Write Consistency #2168

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.onesignal.common.consistency

import com.onesignal.common.consistency.enums.IamFetchRywTokenKey
import com.onesignal.common.consistency.models.ICondition
import com.onesignal.common.consistency.models.IConsistencyKeyEnum

/**
* Used for read your write consistency when fetching In-App Messages.
*
* Params:
* key : String - the index of the RYW token map
*/
class IamFetchReadyCondition(
private val key: String,
) : ICondition {
companion object {
const val ID = "IamFetchReadyCondition"
}

override val id: String
get() = ID

override fun isMet(indexedTokens: Map<String, Map<IConsistencyKeyEnum, String>>): Boolean {
val tokenMap = indexedTokens[key] ?: return false
val userUpdateTokenSet = tokenMap[IamFetchRywTokenKey.USER] != null

/**
* We always update the session count so we know we will have a userUpdateToken. We don't
* necessarily make a subscriptionUpdate call on every session. The following logic
* doesn't consider tokenMap[IamFetchRywTokenKey.SUBSCRIPTION] for this reason. This doesn't
* mean it isn't considered if present when doing the token comparison.
*/
return userUpdateTokenSet
}

override fun getNewestToken(indexedTokens: Map<String, Map<IConsistencyKeyEnum, String?>>): String? {
val tokenMap = indexedTokens[key] ?: return null
// maxOrNull compares lexicographically
return listOfNotNull(tokenMap[IamFetchRywTokenKey.USER], tokenMap[IamFetchRywTokenKey.SUBSCRIPTION]).maxOrNull()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.onesignal.common.consistency.enums

import com.onesignal.common.consistency.models.IConsistencyKeyEnum

/**
* Each enum is a key that we use to keep track of read-your-write tokens.
* Although the enums are named with "UPDATE", they serve as keys for tokens from both PATCH & POST
*/
enum class IamFetchRywTokenKey : IConsistencyKeyEnum {
USER,
SUBSCRIPTION,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package com.onesignal.common.consistency.impl

import com.onesignal.common.consistency.models.ICondition
import com.onesignal.common.consistency.models.IConsistencyKeyEnum
import com.onesignal.common.consistency.models.IConsistencyManager
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

/**
* Manages read-your-write tokens for more accurate segment membership
* calculation. Uses customizable conditions that block retrieval of the newest token until met.
*
* Usage:
* val consistencyManager = ConsistencyManager<MyEnum>()
* val updateConditionDeferred = consistencyManager.registerCondition(MyCustomCondition())
* val rywToken = updateConditionDeferred.await()
*/
class ConsistencyManager : IConsistencyManager {
rgomezp marked this conversation as resolved.
Show resolved Hide resolved
private val mutex = Mutex()
private val indexedTokens: MutableMap<String, MutableMap<IConsistencyKeyEnum, String>> = mutableMapOf()
private val conditions: MutableList<Pair<ICondition, CompletableDeferred<String?>>> =
mutableListOf()

/**
* Set method to update the token based on the key.
* Params:
* id: String - the index of the token map (e.g. onesignalId)
* key: K - corresponds to the operation for which we have a read-your-write token
* value: String? - the token (read-your-write token)
*/
override suspend fun setRywToken(
id: String,
key: IConsistencyKeyEnum,
value: String,
) {
mutex.withLock {
val rywTokens = indexedTokens.getOrPut(id) { mutableMapOf() }
rywTokens[key] = value
checkConditionsAndComplete()
}
}

/**
* Register a condition with its corresponding deferred action. Returns a deferred condition.
*/
override suspend fun registerCondition(condition: ICondition): CompletableDeferred<String?> {
mutex.withLock {
val deferred = CompletableDeferred<String?>()
val pair = Pair(condition, deferred)
conditions.add(pair)
checkConditionsAndComplete()
return deferred
}
}

override suspend fun resolveConditionsWithID(id: String) {
val completedConditions = mutableListOf<Pair<ICondition, CompletableDeferred<String?>>>()

for ((condition, deferred) in conditions) {
if (condition.id == id) {
if (!deferred.isCompleted) {
deferred.complete(null)
}
}
completedConditions.add(Pair(condition, deferred))
}

// Remove completed conditions from the list
conditions.removeAll(completedConditions)
}

/**
* IMPORTANT: calling code should be protected by mutex to avoid potential inconsistencies
*/
private fun checkConditionsAndComplete() {
rgomezp marked this conversation as resolved.
Show resolved Hide resolved
val completedConditions = mutableListOf<Pair<ICondition, CompletableDeferred<String?>>>()

for ((condition, deferred) in conditions) {
jkasten2 marked this conversation as resolved.
Show resolved Hide resolved
if (condition.isMet(indexedTokens)) {
val newestToken = condition.getNewestToken(indexedTokens)
if (!deferred.isCompleted) {
deferred.complete(newestToken)
}
completedConditions.add(Pair(condition, deferred))
}
}

// Remove completed conditions from the list
conditions.removeAll(completedConditions)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.onesignal.common.consistency.models

interface ICondition {
/**
* Every implementation should define a unique ID & make available via a companion object for
* ease of use
*/
val id: String

/**
* Define a condition that "unblocks" execution
* e.g. we have token (A && B) || A
*/
fun isMet(indexedTokens: Map<String, Map<IConsistencyKeyEnum, String>>): Boolean

/**
* Used to process tokens according to their format & return the newest token.
* e.g. numeric strings would be compared differently from JWT tokens
*/
fun getNewestToken(indexedTokens: Map<String, Map<IConsistencyKeyEnum, String?>>): String?
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package com.onesignal.common.consistency.models

interface IConsistencyKeyEnum
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.onesignal.common.consistency.models

import kotlinx.coroutines.CompletableDeferred

interface IConsistencyManager {
/**
* Set method to update the RYW token based on the key.
* Params:
* id: String - the index of the RYW token map (e.g., onesignalId)
* key: IConsistencyKeyEnum - corresponds to the operation for which we have a read-your-write token
* value: String? - the read-your-write token
*/
suspend fun setRywToken(
id: String,
key: IConsistencyKeyEnum,
value: String,
)

/**
* Register a condition with its corresponding deferred action. Returns a deferred condition.
* Params:
* condition: ICondition - the condition to be registered
* Returns: CompletableDeferred<String?> - a deferred action that completes when the condition is met
*/
suspend fun registerCondition(condition: ICondition): CompletableDeferred<String?>

/**
* Resolve all conditions with a specific ID
*/
suspend fun resolveConditionsWithID(id: String)
}
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,18 @@ internal class HttpClient(
}
}

if (headers?.rywToken != null) {
con.setRequestProperty("OneSignal-RYW-Token", headers.rywToken.toString())
}

if (headers?.retryCount != null) {
con.setRequestProperty("Onesignal-Retry-Count", headers.retryCount.toString())
}

if (headers?.sessionDuration != null) {
con.setRequestProperty("OneSignal-Session-Duration", headers.sessionDuration.toString())
}

// Network request is made from getResponseCode()
httpResponse = con.responseCode

Expand Down Expand Up @@ -299,9 +311,9 @@ internal class HttpClient(
* Reads the HTTP Retry-Limit from the response.
*/
private fun retryLimitFromResponse(con: HttpURLConnection): Int? {
val retryLimitStr = con.getHeaderField("Retry-Limit")
val retryLimitStr = con.getHeaderField("OneSignal-Retry-Limit")
return if (retryLimitStr != null) {
Logging.debug("HttpClient: Response Retry-After: $retryLimitStr")
Logging.debug("HttpClient: Response OneSignal-Retry-Limit: $retryLimitStr")
retryLimitStr.toIntOrNull()
} else {
null
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
package com.onesignal.core.internal.http.impl

data class OptionalHeaders(
/**
* Used as an E-Tag
*/
val cacheKey: String? = null,
/**
* Used for read your write consistency
*/
val rywToken: String? = null,
/**
* Current retry count
*/
val retryCount: Int? = null,
/**
* Used to track delay between session start and request
*/
val sessionDuration: Long? = null,
)
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ internal class SessionListener(
}

override fun onSessionStarted() {
_operationRepo.enqueue(TrackSessionStartOperation(_configModelStore.model.appId, _identityModelStore.model.onesignalId))
_operationRepo.enqueue(TrackSessionStartOperation(_configModelStore.model.appId, _identityModelStore.model.onesignalId), true)
}

override fun onSessionActive() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.onesignal.user

import com.onesignal.common.consistency.impl.ConsistencyManager
import com.onesignal.common.consistency.models.IConsistencyManager
import com.onesignal.common.modules.IModule
import com.onesignal.common.services.ServiceBuilder
import com.onesignal.core.internal.operations.IOperationExecutor
Expand Down Expand Up @@ -34,6 +36,9 @@ import com.onesignal.user.internal.subscriptions.impl.SubscriptionManager

internal class UserModule : IModule {
override fun register(builder: ServiceBuilder) {
// Consistency
Koji23 marked this conversation as resolved.
Show resolved Hide resolved
builder.register<ConsistencyManager>().provides<IConsistencyManager>()

// Properties
builder.register<PropertiesModelStore>().provides<PropertiesModelStore>()
builder.register<PropertiesModelStoreListener>().provides<IBootstrapService>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ interface ISubscriptionBackendService {
aliasLabel: String,
aliasValue: String,
subscription: SubscriptionObject,
): String?
): Pair<String, String?>?

/**
* Update an existing subscription with the properties provided.
Expand All @@ -34,7 +34,7 @@ interface ISubscriptionBackendService {
appId: String,
subscriptionId: String,
subscription: SubscriptionObject,
)
): String?

/**
* Delete an existing subscription.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ interface IUserBackendService {
properties: PropertiesObject,
refreshDeviceMetadata: Boolean,
propertyiesDelta: PropertiesDeltasObject,
)
): String?

/**
* Retrieve a user from the backend.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ internal class SubscriptionBackendService(
aliasLabel: String,
aliasValue: String,
subscription: SubscriptionObject,
): String? {
): Pair<String, String?>? {
val jsonSubscription = JSONConverter.convertToJSON(subscription)
jsonSubscription.remove("id")
val requestJSON = JSONObject().put("subscription", jsonSubscription)
Expand All @@ -33,14 +33,19 @@ internal class SubscriptionBackendService(
return null
}

return subscriptionJSON.getString("id")
var rywToken: String? = null
if (responseJSON.has("ryw_token")) {
rywToken = responseJSON.getString("ryw_token")
}
rgomezp marked this conversation as resolved.
Show resolved Hide resolved

return Pair(subscriptionJSON.getString("id"), rywToken)
}

override suspend fun updateSubscription(
appId: String,
subscriptionId: String,
subscription: SubscriptionObject,
) {
): String? {
val requestJSON =
JSONObject()
.put("subscription", JSONConverter.convertToJSON(subscription))
Expand All @@ -50,6 +55,13 @@ internal class SubscriptionBackendService(
if (!response.isSuccess) {
throw BackendException(response.statusCode, response.payload, response.retryAfterSeconds)
}

val responseBody = JSONObject(response.payload)
rgomezp marked this conversation as resolved.
Show resolved Hide resolved
return if (responseBody.has("ryw_token")) {
responseBody.getString("ryw_token")
} else {
null
}
}

override suspend fun deleteSubscription(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ internal class UserBackendService(
properties: PropertiesObject,
refreshDeviceMetadata: Boolean,
propertyiesDelta: PropertiesDeltasObject,
) {
): String? {
val jsonObject =
JSONObject()
.put("refresh_device_metadata", refreshDeviceMetadata)
Expand All @@ -70,6 +70,13 @@ internal class UserBackendService(
if (!response.isSuccess) {
throw BackendException(response.statusCode, response.payload, response.retryAfterSeconds)
}

val responseBody = JSONObject(response.payload)
rgomezp marked this conversation as resolved.
Show resolved Hide resolved
return if (responseBody.has("ryw_token")) {
responseBody.getString("ryw_token")
} else {
null
}
}

override suspend fun getUser(
Expand Down
Loading
Loading