Skip to content

Commit

Permalink
Always recreate stream on failure. (#205)
Browse files Browse the repository at this point in the history
  • Loading branch information
skhugh authored Jun 13, 2024
1 parent da17b52 commit 971eb30
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 26 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ android.suppressUnsupportedOptionWarnings=android.suppressUnsupportedOptionWarni
kotlin.code.style=official
kotlin.mpp.stability.nowarn=true
GROUP=dev.yorkie
VERSION_NAME=0.4.24-rc
VERSION_NAME=0.4.24-rc2
POM_DESCRIPTION=Document store for building collaborative editing applications.
POM_INCEPTION_YEAR=2022
POM_URL=https://github.com/yorkie-team/yorkie-android-sdk
Expand Down
60 changes: 35 additions & 25 deletions yorkie/src/main/kotlin/dev/yorkie/core/Client.kt
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import dev.yorkie.util.createSingleThreadDispatcher
import java.io.Closeable
import java.io.InterruptedIOException
import java.util.UUID
import java.util.concurrent.TimeoutException
import kotlin.collections.Map.Entry
import kotlin.coroutines.coroutineContext
import kotlin.time.Duration
Expand Down Expand Up @@ -75,6 +76,7 @@ import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeoutOrNull
import okhttp3.OkHttpClient

/**
Expand Down Expand Up @@ -310,32 +312,39 @@ public class Client @VisibleForTesting internal constructor(
while (true) {
ensureActive()
latestStream.safeClose()
val stream = service.watchDocument(
attachment.document.key.documentBasedRequestHeader,
).also {
latestStream = it
}
val stream = withTimeoutOrNull(1_000) {
service.watchDocument(
attachment.document.key.documentBasedRequestHeader,
).also {
latestStream = it
}
} ?: continue
val streamJob = launch(start = CoroutineStart.UNDISPATCHED) {
val channel = stream.responseChannel()
var retry = 0
while (!stream.isReceiveClosed() && !channel.isClosedForReceive) {
val receiveResult = channel.receiveCatching()
receiveResult.onSuccess {
attachment.document.publishEvent(StreamConnectionChanged.Connected)
handleWatchDocumentsResponse(attachment.document.key, it)
retry = 0
}.onFailure {
if (receiveResult.isClosed) {
return@onFailure
withTimeoutOrNull(60_000) {
val receiveResult = channel.receiveCatching()
receiveResult.onSuccess {
attachment.document.publishEvent(StreamConnectionChanged.Connected)
handleWatchDocumentsResponse(attachment.document.key, it)
}.onFailure {
if (receiveResult.isClosed) {
stream.safeClose()
return@onFailure
}
handleWatchStreamFailure(attachment.document, stream, it)
}.onClosed {
handleWatchStreamFailure(
attachment.document,
stream,
it ?: ClosedReceiveChannelException("Channel was closed"),
)
}
retry++
handleWatchStreamFailure(attachment.document, stream, it, retry > 3)
}.onClosed {
} ?: run {
handleWatchStreamFailure(
attachment.document,
stream,
it ?: ClosedReceiveChannelException("Channel was closed"),
true,
TimeoutException("channel timed out"),
)
}
}
Expand All @@ -362,12 +371,10 @@ public class Client @VisibleForTesting internal constructor(
document: Document,
stream: ServerOnlyStreamInterface<*, *>,
cause: Throwable?,
closeStream: Boolean,
) {
onWatchStreamCanceled(document)
if (closeStream) {
stream.safeClose()
}
stream.safeClose()

cause?.let(::sendWatchStreamException)
coroutineContext.ensureActive()
delay(options.reconnectStreamDelay.inWholeMilliseconds)
Expand Down Expand Up @@ -409,11 +416,14 @@ public class Client @VisibleForTesting internal constructor(
}

private suspend fun ServerOnlyStreamInterface<*, *>?.safeClose() {
if (this == null || isReceiveClosed()) {
if (this == null) {
return
}
withContext(NonCancellable) {
receiveClose()
runCatching {
responseChannel().cancel()
receiveClose()
}
}
}

Expand Down

0 comments on commit 971eb30

Please sign in to comment.