Skip to content

Commit

Permalink
Property Executor: Add private dispatch queue
Browse files Browse the repository at this point in the history
* Reproduced crashes by creating multiple async threads that added and removed tags concurrently.
* Added a private dispatch queue to synchronize access to the delta queue and request queue.
* Crashes no longer happened after this change.
* It is possible for the executor to be flushing while a client response is received and modify the request queue.
* Additionally, there are some code paths that enqueue and update request but does not go through the operation repo, such as updating session count at the start of a new session.
  • Loading branch information
nan-li committed Mar 4, 2024
1 parent 3183463 commit cb88aa6
Showing 1 changed file with 68 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ class OSPropertyOperationExecutor: OSOperationExecutor {
var deltaQueue: [OSDelta] = []
var updateRequestQueue: [OSRequestUpdateProperties] = []

// The property executor dispatch queue, serial. This synchronizes access to `deltaQueue` and `updateRequestQueue`.
private let dispatchQueue = DispatchQueue(label: "OneSignal.OSPropertyOperationExecutor", target: .global())

init() {
// Read unfinished deltas from cache, if any...
// Note that we should only have deltas for the current user as old ones are flushed..
Expand Down Expand Up @@ -78,42 +81,49 @@ class OSPropertyOperationExecutor: OSOperationExecutor {
}

func enqueueDelta(_ delta: OSDelta) {
OneSignalLog.onesignalLog(.LL_VERBOSE, message: "OSPropertyOperationExecutor enqueue delta\(delta)")
deltaQueue.append(delta)
self.dispatchQueue.async {
OneSignalLog.onesignalLog(.LL_VERBOSE, message: "OSPropertyOperationExecutor enqueue delta\(delta)")
self.deltaQueue.append(delta)
}
}

func cacheDeltaQueue() {
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_PROPERTIES_EXECUTOR_DELTA_QUEUE_KEY, withValue: self.deltaQueue)
self.dispatchQueue.async {
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_PROPERTIES_EXECUTOR_DELTA_QUEUE_KEY, withValue: self.deltaQueue)
}
}

func processDeltaQueue(inBackground: Bool) {
if !deltaQueue.isEmpty {
OneSignalLog.onesignalLog(.LL_VERBOSE, message: "OSPropertyOperationExecutor processDeltaQueue with queue: \(deltaQueue)")
}
for delta in deltaQueue {
guard let model = delta.model as? OSPropertiesModel else {
// Log error
continue
self.dispatchQueue.async {
if !self.deltaQueue.isEmpty {
OneSignalLog.onesignalLog(.LL_VERBOSE, message: "OSPropertyOperationExecutor processDeltaQueue with queue: \(self.deltaQueue)")
}
for delta in self.deltaQueue {
guard let model = delta.model as? OSPropertiesModel else {
// Log error
continue
}

let request = OSRequestUpdateProperties(
properties: [delta.property: delta.value],
deltas: nil,
refreshDeviceMetadata: false, // Sort this out.
modelToUpdate: model,
identityModel: OneSignalUserManagerImpl.sharedInstance.user.identityModel // TODO: Make sure this is ok
)
updateRequestQueue.append(request)
}
self.deltaQueue = [] // TODO: Check that we can simply clear all the deltas in the deltaQueue
let request = OSRequestUpdateProperties(
properties: [delta.property: delta.value],
deltas: nil,
refreshDeviceMetadata: false, // Sort this out.
modelToUpdate: model,
identityModel: OneSignalUserManagerImpl.sharedInstance.user.identityModel // TODO: Make sure this is ok
)
self.updateRequestQueue.append(request)
}
self.deltaQueue = [] // TODO: Check that we can simply clear all the deltas in the deltaQueue

// persist executor's requests (including new request) to storage
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_PROPERTIES_EXECUTOR_UPDATE_REQUEST_QUEUE_KEY, withValue: self.updateRequestQueue)
// persist executor's requests (including new request) to storage
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_PROPERTIES_EXECUTOR_UPDATE_REQUEST_QUEUE_KEY, withValue: self.updateRequestQueue)

OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_PROPERTIES_EXECUTOR_DELTA_QUEUE_KEY, withValue: self.deltaQueue) // This should be empty, can remove instead?
processRequestQueue(inBackground: inBackground)
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_PROPERTIES_EXECUTOR_DELTA_QUEUE_KEY, withValue: self.deltaQueue) // This should be empty, can remove instead?
self.processRequestQueue(inBackground: inBackground)
}
}

// This method is called by `processDeltaQueue` only and does not need to be added to the dispatchQueue.
func processRequestQueue(inBackground: Bool) {
if updateRequestQueue.isEmpty {
return
Expand Down Expand Up @@ -141,38 +151,42 @@ class OSPropertyOperationExecutor: OSOperationExecutor {
OneSignalCore.sharedClient().execute(request) { _ in
// On success, remove request from cache, and we do need to hydrate
// TODO: We need to hydrate after all ? What why ?
self.updateRequestQueue.removeAll(where: { $0 == request})
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_PROPERTIES_EXECUTOR_UPDATE_REQUEST_QUEUE_KEY, withValue: self.updateRequestQueue)
if inBackground {
OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier)
self.dispatchQueue.async {
self.updateRequestQueue.removeAll(where: { $0 == request})
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_PROPERTIES_EXECUTOR_UPDATE_REQUEST_QUEUE_KEY, withValue: self.updateRequestQueue)
if inBackground {
OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier)
}
}
} onFailure: { error in
OneSignalLog.onesignalLog(.LL_ERROR, message: "OSPropertyOperationExecutor update properties request failed with error: \(error.debugDescription)")
if let nsError = error as? NSError {
let responseType = OSNetworkingUtils.getResponseStatusType(nsError.code)
if responseType == .missing {
// remove from cache and queue
self.updateRequestQueue.removeAll(where: { $0 == request})
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_PROPERTIES_EXECUTOR_UPDATE_REQUEST_QUEUE_KEY, withValue: self.updateRequestQueue)
// Logout if the user in the SDK is the same
guard OneSignalUserManagerImpl.sharedInstance.isCurrentUser(request.identityModel)
else {
if inBackground {
OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier)
self.dispatchQueue.async {
if let nsError = error as? NSError {
let responseType = OSNetworkingUtils.getResponseStatusType(nsError.code)
if responseType == .missing {
// remove from cache and queue
self.updateRequestQueue.removeAll(where: { $0 == request})
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_PROPERTIES_EXECUTOR_UPDATE_REQUEST_QUEUE_KEY, withValue: self.updateRequestQueue)
// Logout if the user in the SDK is the same
guard OneSignalUserManagerImpl.sharedInstance.isCurrentUser(request.identityModel)
else {
if inBackground {
OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier)
}
return
}
return
// The subscription has been deleted along with the user, so remove the subscription_id but keep the same push subscription model
OneSignalUserManagerImpl.sharedInstance.pushSubscriptionModel?.subscriptionId = nil
OneSignalUserManagerImpl.sharedInstance._logout()
} else if responseType != .retryable {
// Fail, no retry, remove from cache and queue
self.updateRequestQueue.removeAll(where: { $0 == request})
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_PROPERTIES_EXECUTOR_UPDATE_REQUEST_QUEUE_KEY, withValue: self.updateRequestQueue)
}
// The subscription has been deleted along with the user, so remove the subscription_id but keep the same push subscription model
OneSignalUserManagerImpl.sharedInstance.pushSubscriptionModel?.subscriptionId = nil
OneSignalUserManagerImpl.sharedInstance._logout()
} else if responseType != .retryable {
// Fail, no retry, remove from cache and queue
self.updateRequestQueue.removeAll(where: { $0 == request})
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_PROPERTIES_EXECUTOR_UPDATE_REQUEST_QUEUE_KEY, withValue: self.updateRequestQueue)
}
}
if inBackground {
OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier)
if inBackground {
OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier)
}
}
}
}
Expand Down Expand Up @@ -201,8 +215,10 @@ extension OSPropertyOperationExecutor {
}
}
} else {
updateRequestQueue.append(request)
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_PROPERTIES_EXECUTOR_UPDATE_REQUEST_QUEUE_KEY, withValue: self.updateRequestQueue)
self.dispatchQueue.async {
self.updateRequestQueue.append(request)
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_PROPERTIES_EXECUTOR_UPDATE_REQUEST_QUEUE_KEY, withValue: self.updateRequestQueue)
}
}
}
}

0 comments on commit cb88aa6

Please sign in to comment.