Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Fix DesktopSyncManager exit exception issue #1683

Merged
merged 1 commit into from
Aug 13, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import io.realm.kotlin.notifications.UpdatedResults
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
Expand Down Expand Up @@ -62,76 +63,79 @@ class DesktopSyncManager(

override var refreshing by mutableStateOf(false)

init {
realTimeSyncScope.launch(CoroutineName("SyncManagerListenChanger")) {
val syncRuntimeInfos = syncRuntimeInfoDao.getAllSyncRuntimeInfos()
internalSyncHandlers.putAll(
syncRuntimeInfos.map { syncRuntimeInfo ->
syncRuntimeInfo.appInstanceId to
DesktopSyncHandler(
syncRuntimeInfo,
tokenCache,
telnetUtils,
syncInfoFactory,
syncClientApi,
signalProtocolStore,
signalProcessorCache,
syncRuntimeInfoDao,
signalDao,
realTimeSyncScope,
)
},
)
withContext(mainDispatcher) {
realTimeSyncRuntimeInfos.addAll(syncRuntimeInfos)
refreshWaitToVerifySyncRuntimeInfo()
deviceManager.refresh()
resolveSyncs()
}
val syncRuntimeInfosFlow = syncRuntimeInfos.asFlow()
syncRuntimeInfosFlow.collect { changes: ResultsChange<SyncRuntimeInfo> ->
when (changes) {
is UpdatedResults -> {
for (deletion in changes.deletions) {
val deletionSyncRuntimeInfo = realTimeSyncRuntimeInfos[deletion]
internalSyncHandlers.remove(deletionSyncRuntimeInfo.appInstanceId)!!
.clearContext()
}
private val syncManagerListenJob: Job

for (insertion in changes.insertions) {
val insertionSyncRuntimeInfo = changes.list[insertion]
internalSyncHandlers[insertionSyncRuntimeInfo.appInstanceId] =
DesktopSyncHandler(
insertionSyncRuntimeInfo,
tokenCache,
telnetUtils,
syncInfoFactory,
syncClientApi,
signalProtocolStore,
signalProcessorCache,
syncRuntimeInfoDao,
signalDao,
realTimeSyncScope,
)
init {
syncManagerListenJob =
realTimeSyncScope.launch(CoroutineName("SyncManagerListenChanger")) {
val syncRuntimeInfos = syncRuntimeInfoDao.getAllSyncRuntimeInfos()
internalSyncHandlers.putAll(
syncRuntimeInfos.map { syncRuntimeInfo ->
syncRuntimeInfo.appInstanceId to
DesktopSyncHandler(
syncRuntimeInfo,
tokenCache,
telnetUtils,
syncInfoFactory,
syncClientApi,
signalProtocolStore,
signalProcessorCache,
syncRuntimeInfoDao,
signalDao,
realTimeSyncScope,
)
},
)
withContext(mainDispatcher) {
realTimeSyncRuntimeInfos.addAll(syncRuntimeInfos)
refreshWaitToVerifySyncRuntimeInfo()
deviceManager.refresh()
resolveSyncs()
}
val syncRuntimeInfosFlow = syncRuntimeInfos.asFlow()
syncRuntimeInfosFlow.collect { changes: ResultsChange<SyncRuntimeInfo> ->
when (changes) {
is UpdatedResults -> {
for (deletion in changes.deletions) {
val deletionSyncRuntimeInfo = realTimeSyncRuntimeInfos[deletion]
internalSyncHandlers.remove(deletionSyncRuntimeInfo.appInstanceId)!!
.clearContext()
}

for (insertion in changes.insertions) {
val insertionSyncRuntimeInfo = changes.list[insertion]
internalSyncHandlers[insertionSyncRuntimeInfo.appInstanceId] =
DesktopSyncHandler(
insertionSyncRuntimeInfo,
tokenCache,
telnetUtils,
syncInfoFactory,
syncClientApi,
signalProtocolStore,
signalProcessorCache,
syncRuntimeInfoDao,
signalDao,
realTimeSyncScope,
)
}

// When the synchronization parameters change,
// we will force resolve, so we do not need to process it redundantly here
// for (change in changes.changes) { }

withContext(Dispatchers.Main) {
realTimeSyncRuntimeInfos.clear()
realTimeSyncRuntimeInfos.addAll(changes.list)
refreshWaitToVerifySyncRuntimeInfo()
deviceManager.refresh()
}
}

// When the synchronization parameters change,
// we will force resolve, so we do not need to process it redundantly here
// for (change in changes.changes) { }

withContext(Dispatchers.Main) {
realTimeSyncRuntimeInfos.clear()
realTimeSyncRuntimeInfos.addAll(changes.list)
refreshWaitToVerifySyncRuntimeInfo()
deviceManager.refresh()
else -> {
// types other than UpdatedResults are not changes -- ignore them
}
}
else -> {
// types other than UpdatedResults are not changes -- ignore them
}
}
}
}
}

override fun refreshWaitToVerifySyncRuntimeInfo() {
Expand Down Expand Up @@ -198,7 +202,7 @@ class DesktopSyncManager(
}

override fun notifyExit() {
realTimeJob.cancel()
syncManagerListenJob.cancel()
internalSyncHandlers.values.forEach { syncHandler ->
// Ensure that the notification is completed before exiting
runBlocking { syncHandler.notifyExit() }
Expand Down
Loading