From cb88aa60befc548204020addc07d9b038444d80d Mon Sep 17 00:00:00 2001 From: Nan Date: Mon, 4 Mar 2024 00:30:20 -0800 Subject: [PATCH] Property Executor: Add private dispatch queue * Reproduced crashes by creating multiple async threads that added and removed tags concurrently. * Added a private dispatch queue to synchronize access to the delta queue and request queue. * Crashes no longer happened after this change. * It is possible for the executor to be flushing while a client response is received and modify the request queue. * Additionally, there are some code paths that enqueue and update request but does not go through the operation repo, such as updating session count at the start of a new session. --- .../OSPropertyOperationExecutor.swift | 120 ++++++++++-------- 1 file changed, 68 insertions(+), 52 deletions(-) diff --git a/iOS_SDK/OneSignalSDK/OneSignalUser/Source/Executors/OSPropertyOperationExecutor.swift b/iOS_SDK/OneSignalSDK/OneSignalUser/Source/Executors/OSPropertyOperationExecutor.swift index f01f60b89..7474284a7 100644 --- a/iOS_SDK/OneSignalSDK/OneSignalUser/Source/Executors/OSPropertyOperationExecutor.swift +++ b/iOS_SDK/OneSignalSDK/OneSignalUser/Source/Executors/OSPropertyOperationExecutor.swift @@ -33,6 +33,9 @@ class OSPropertyOperationExecutor: OSOperationExecutor { var deltaQueue: [OSDelta] = [] var updateRequestQueue: [OSRequestUpdateProperties] = [] + // The property executor dispatch queue, serial. This synchronizes access to `deltaQueue` and `updateRequestQueue`. + private let dispatchQueue = DispatchQueue(label: "OneSignal.OSPropertyOperationExecutor", target: .global()) + init() { // Read unfinished deltas from cache, if any... // Note that we should only have deltas for the current user as old ones are flushed.. @@ -78,42 +81,49 @@ class OSPropertyOperationExecutor: OSOperationExecutor { } func enqueueDelta(_ delta: OSDelta) { - OneSignalLog.onesignalLog(.LL_VERBOSE, message: "OSPropertyOperationExecutor enqueue delta\(delta)") - deltaQueue.append(delta) + self.dispatchQueue.async { + OneSignalLog.onesignalLog(.LL_VERBOSE, message: "OSPropertyOperationExecutor enqueue delta\(delta)") + self.deltaQueue.append(delta) + } } func cacheDeltaQueue() { - OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_PROPERTIES_EXECUTOR_DELTA_QUEUE_KEY, withValue: self.deltaQueue) + self.dispatchQueue.async { + OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_PROPERTIES_EXECUTOR_DELTA_QUEUE_KEY, withValue: self.deltaQueue) + } } func processDeltaQueue(inBackground: Bool) { - if !deltaQueue.isEmpty { - OneSignalLog.onesignalLog(.LL_VERBOSE, message: "OSPropertyOperationExecutor processDeltaQueue with queue: \(deltaQueue)") - } - for delta in deltaQueue { - guard let model = delta.model as? OSPropertiesModel else { - // Log error - continue + self.dispatchQueue.async { + if !self.deltaQueue.isEmpty { + OneSignalLog.onesignalLog(.LL_VERBOSE, message: "OSPropertyOperationExecutor processDeltaQueue with queue: \(self.deltaQueue)") } + for delta in self.deltaQueue { + guard let model = delta.model as? OSPropertiesModel else { + // Log error + continue + } - let request = OSRequestUpdateProperties( - properties: [delta.property: delta.value], - deltas: nil, - refreshDeviceMetadata: false, // Sort this out. - modelToUpdate: model, - identityModel: OneSignalUserManagerImpl.sharedInstance.user.identityModel // TODO: Make sure this is ok - ) - updateRequestQueue.append(request) - } - self.deltaQueue = [] // TODO: Check that we can simply clear all the deltas in the deltaQueue + let request = OSRequestUpdateProperties( + properties: [delta.property: delta.value], + deltas: nil, + refreshDeviceMetadata: false, // Sort this out. + modelToUpdate: model, + identityModel: OneSignalUserManagerImpl.sharedInstance.user.identityModel // TODO: Make sure this is ok + ) + self.updateRequestQueue.append(request) + } + self.deltaQueue = [] // TODO: Check that we can simply clear all the deltas in the deltaQueue - // persist executor's requests (including new request) to storage - OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_PROPERTIES_EXECUTOR_UPDATE_REQUEST_QUEUE_KEY, withValue: self.updateRequestQueue) + // persist executor's requests (including new request) to storage + OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_PROPERTIES_EXECUTOR_UPDATE_REQUEST_QUEUE_KEY, withValue: self.updateRequestQueue) - OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_PROPERTIES_EXECUTOR_DELTA_QUEUE_KEY, withValue: self.deltaQueue) // This should be empty, can remove instead? - processRequestQueue(inBackground: inBackground) + OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_PROPERTIES_EXECUTOR_DELTA_QUEUE_KEY, withValue: self.deltaQueue) // This should be empty, can remove instead? + self.processRequestQueue(inBackground: inBackground) + } } + // This method is called by `processDeltaQueue` only and does not need to be added to the dispatchQueue. func processRequestQueue(inBackground: Bool) { if updateRequestQueue.isEmpty { return @@ -141,38 +151,42 @@ class OSPropertyOperationExecutor: OSOperationExecutor { OneSignalCore.sharedClient().execute(request) { _ in // On success, remove request from cache, and we do need to hydrate // TODO: We need to hydrate after all ? What why ? - self.updateRequestQueue.removeAll(where: { $0 == request}) - OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_PROPERTIES_EXECUTOR_UPDATE_REQUEST_QUEUE_KEY, withValue: self.updateRequestQueue) - if inBackground { - OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier) + self.dispatchQueue.async { + self.updateRequestQueue.removeAll(where: { $0 == request}) + OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_PROPERTIES_EXECUTOR_UPDATE_REQUEST_QUEUE_KEY, withValue: self.updateRequestQueue) + if inBackground { + OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier) + } } } onFailure: { error in OneSignalLog.onesignalLog(.LL_ERROR, message: "OSPropertyOperationExecutor update properties request failed with error: \(error.debugDescription)") - if let nsError = error as? NSError { - let responseType = OSNetworkingUtils.getResponseStatusType(nsError.code) - if responseType == .missing { - // remove from cache and queue - self.updateRequestQueue.removeAll(where: { $0 == request}) - OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_PROPERTIES_EXECUTOR_UPDATE_REQUEST_QUEUE_KEY, withValue: self.updateRequestQueue) - // Logout if the user in the SDK is the same - guard OneSignalUserManagerImpl.sharedInstance.isCurrentUser(request.identityModel) - else { - if inBackground { - OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier) + self.dispatchQueue.async { + if let nsError = error as? NSError { + let responseType = OSNetworkingUtils.getResponseStatusType(nsError.code) + if responseType == .missing { + // remove from cache and queue + self.updateRequestQueue.removeAll(where: { $0 == request}) + OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_PROPERTIES_EXECUTOR_UPDATE_REQUEST_QUEUE_KEY, withValue: self.updateRequestQueue) + // Logout if the user in the SDK is the same + guard OneSignalUserManagerImpl.sharedInstance.isCurrentUser(request.identityModel) + else { + if inBackground { + OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier) + } + return } - return + // The subscription has been deleted along with the user, so remove the subscription_id but keep the same push subscription model + OneSignalUserManagerImpl.sharedInstance.pushSubscriptionModel?.subscriptionId = nil + OneSignalUserManagerImpl.sharedInstance._logout() + } else if responseType != .retryable { + // Fail, no retry, remove from cache and queue + self.updateRequestQueue.removeAll(where: { $0 == request}) + OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_PROPERTIES_EXECUTOR_UPDATE_REQUEST_QUEUE_KEY, withValue: self.updateRequestQueue) } - // The subscription has been deleted along with the user, so remove the subscription_id but keep the same push subscription model - OneSignalUserManagerImpl.sharedInstance.pushSubscriptionModel?.subscriptionId = nil - OneSignalUserManagerImpl.sharedInstance._logout() - } else if responseType != .retryable { - // Fail, no retry, remove from cache and queue - self.updateRequestQueue.removeAll(where: { $0 == request}) - OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_PROPERTIES_EXECUTOR_UPDATE_REQUEST_QUEUE_KEY, withValue: self.updateRequestQueue) } - } - if inBackground { - OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier) + if inBackground { + OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier) + } } } } @@ -201,8 +215,10 @@ extension OSPropertyOperationExecutor { } } } else { - updateRequestQueue.append(request) - OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_PROPERTIES_EXECUTOR_UPDATE_REQUEST_QUEUE_KEY, withValue: self.updateRequestQueue) + self.dispatchQueue.async { + self.updateRequestQueue.append(request) + OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_PROPERTIES_EXECUTOR_UPDATE_REQUEST_QUEUE_KEY, withValue: self.updateRequestQueue) + } } } }