Skip to content

Commit

Permalink
Prevent zombie presence events (#208)
Browse files Browse the repository at this point in the history
  • Loading branch information
7hong13 authored Jun 18, 2024
1 parent 1367862 commit 60ea15d
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 20 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ jobs:
- uses: actions/download-artifact@v2
- uses: codecov/codecov-action@v4
with:
token: ${{ secrets.CODECOV_TOKEN }}
fail_ci_if_error: true
verbose: false
token: ${{ secrets.CODECOV_TOKEN }}

microbenchmarks:
name: Microbenchmarks
Expand Down
47 changes: 46 additions & 1 deletion yorkie/src/androidTest/kotlin/dev/yorkie/core/PresenceTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,6 @@ class PresenceTest {
}
}

assertEquals(3, d1Events.size)
assertIs<Others.Watched>(d1Events.first())
assertIs<Others.PresenceChanged>(d1Events[1])
assertIs<Others.Unwatched>(d1Events.last())
Expand Down Expand Up @@ -827,5 +826,51 @@ class PresenceTest {
}
}

@Test
fun test_whether_presence_event_queue_is_empty_after_consecutive_presence_changes() {
withTwoClientsAndDocuments { c1, _, d1, d2, _ ->
val d1PresenceEvents = mutableListOf<MyPresence.PresenceChanged>()
val d2PresenceEvents = mutableListOf<Others.PresenceChanged>()
val jobs = listOf(
launch(start = CoroutineStart.UNDISPATCHED) {
d1.events.filterIsInstance<MyPresence.PresenceChanged>()
.collect(d1PresenceEvents::add)
},
launch(start = CoroutineStart.UNDISPATCHED) {
d2.events.filterIsInstance<Others.PresenceChanged>()
.collect(d2PresenceEvents::add)
},
)

d1.updateAsync { _, presence ->
repeat(10) {
presence.put(mapOf("a" to "${it + 1}"))
}
}.await()

val lastD1PresenceEvent = MyPresence.PresenceChanged(
PresenceInfo(c1.requireClientId(), mapOf("a" to "10")),
)
val lastD2PresenceEvent = Others.PresenceChanged(
PresenceInfo(c1.requireClientId(), mapOf("a" to "10")),
)

withTimeout(GENERAL_TIMEOUT) {
while (lastD1PresenceEvent !in d1PresenceEvents ||
lastD2PresenceEvent !in d2PresenceEvents
) {
delay(50)
}
}

assertEquals(lastD1PresenceEvent, d1PresenceEvents.last())
assertTrue(d1.presenceEventQueue.isEmpty())
assertEquals(lastD2PresenceEvent, d2PresenceEvents.last())
assertTrue(d2.presenceEventQueue.isEmpty())

jobs.forEach(Job::cancel)
}
}

private data class Cursor(val x: Int, val y: Int)
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import dev.yorkie.TreeTest
import dev.yorkie.core.Client.SyncMode.Manual
import dev.yorkie.core.withTwoClientsAndDocuments
import dev.yorkie.document.json.JsonTreeTest.Companion.rootTree
import dev.yorkie.document.json.JsonTreeTest.Companion.updateAndSync
import kotlin.test.assertEquals
import org.junit.Test
import org.junit.runner.RunWith
Expand All @@ -16,7 +17,7 @@ class JsonTreeSplitMergeTest {
@Test
fun test_contained_split_and_split_at_the_same_position() {
withTwoClientsAndDocuments(syncMode = Manual) { c1, c2, d1, d2, _ ->
JsonTreeTest.updateAndSync(
updateAndSync(
JsonTreeTest.Companion.Updater(c1, d1) { root, _ ->
root.setNewTree(
"t",
Expand All @@ -31,7 +32,7 @@ class JsonTreeSplitMergeTest {
)
JsonTreeTest.assertTreesXmlEquals("<r><p>ab</p></r>", d1, d2)

JsonTreeTest.updateAndSync(
updateAndSync(
JsonTreeTest.Companion.Updater(c1, d1) { root, _ ->
root.rootTree().edit(2, 2, 1)
},
Expand All @@ -48,7 +49,7 @@ class JsonTreeSplitMergeTest {
@Test
fun test_contained_split_and_split_at_different_positions_on_the_same_node() {
withTwoClientsAndDocuments(syncMode = Manual) { c1, c2, d1, d2, _ ->
JsonTreeTest.updateAndSync(
updateAndSync(
JsonTreeTest.Companion.Updater(c1, d1) { root, _ ->
root.setNewTree(
"t",
Expand All @@ -63,7 +64,7 @@ class JsonTreeSplitMergeTest {
)
JsonTreeTest.assertTreesXmlEquals("<r><p>abc</p></r>", d1, d2)

JsonTreeTest.updateAndSync(
updateAndSync(
JsonTreeTest.Companion.Updater(c1, d1) { root, _ ->
root.rootTree().edit(2, 2, 1)
},
Expand All @@ -81,7 +82,7 @@ class JsonTreeSplitMergeTest {
@Test
fun test_contained_split_and_insert_into_the_split_position() {
withTwoClientsAndDocuments(syncMode = Manual) { c1, c2, d1, d2, _ ->
JsonTreeTest.updateAndSync(
updateAndSync(
JsonTreeTest.Companion.Updater(c1, d1) { root, _ ->
root.setNewTree(
"t",
Expand All @@ -96,7 +97,7 @@ class JsonTreeSplitMergeTest {
)
JsonTreeTest.assertTreesXmlEquals("<r><p>ab</p></r>", d1, d2)

JsonTreeTest.updateAndSync(
updateAndSync(
JsonTreeTest.Companion.Updater(c1, d1) { root, _ ->
root.rootTree().edit(2, 2, 1)
},
Expand All @@ -114,7 +115,7 @@ class JsonTreeSplitMergeTest {
@Test
fun test_contained_split_and_insert_into_original_node() {
withTwoClientsAndDocuments(syncMode = Manual) { c1, c2, d1, d2, _ ->
JsonTreeTest.updateAndSync(
updateAndSync(
JsonTreeTest.Companion.Updater(c1, d1) { root, _ ->
root.setNewTree(
"t",
Expand All @@ -129,7 +130,7 @@ class JsonTreeSplitMergeTest {
)
JsonTreeTest.assertTreesXmlEquals("<r><p>ab</p></r>", d1, d2)

JsonTreeTest.updateAndSync(
updateAndSync(
JsonTreeTest.Companion.Updater(c1, d1) { root, _ ->
root.rootTree().edit(2, 2, 1)
},
Expand All @@ -147,7 +148,7 @@ class JsonTreeSplitMergeTest {
@Test
fun test_contained_split_and_insert_into_split_node() {
withTwoClientsAndDocuments(syncMode = Manual) { c1, c2, d1, d2, _ ->
JsonTreeTest.updateAndSync(
updateAndSync(
JsonTreeTest.Companion.Updater(c1, d1) { root, _ ->
root.setNewTree(
"t",
Expand All @@ -162,7 +163,7 @@ class JsonTreeSplitMergeTest {
)
JsonTreeTest.assertTreesXmlEquals("<r><p>ab</p></r>", d1, d2)

JsonTreeTest.updateAndSync(
updateAndSync(
JsonTreeTest.Companion.Updater(c1, d1) { root, _ ->
root.rootTree().edit(2, 2, 1)
},
Expand All @@ -180,7 +181,7 @@ class JsonTreeSplitMergeTest {
@Test
fun test_contained_split_and_delete_contents_in_split_node() {
withTwoClientsAndDocuments(syncMode = Manual) { c1, c2, d1, d2, _ ->
JsonTreeTest.updateAndSync(
updateAndSync(
JsonTreeTest.Companion.Updater(c1, d1) { root, _ ->
root.setNewTree(
"t",
Expand All @@ -195,7 +196,7 @@ class JsonTreeSplitMergeTest {
)
JsonTreeTest.assertTreesXmlEquals("<r><p>ab</p></r>", d1, d2)

JsonTreeTest.updateAndSync(
updateAndSync(
JsonTreeTest.Companion.Updater(c1, d1) { root, _ ->
root.rootTree().edit(2, 2, 1)
},
Expand Down Expand Up @@ -232,7 +233,7 @@ class JsonTreeSplitMergeTest {
}.await()
JsonTreeTest.assertTreesXmlEquals("<doc><p></p><p>ab</p></doc>", d1)

JsonTreeTest.updateAndSync(
updateAndSync(
JsonTreeTest.Companion.Updater(c1, d1) { root, _ ->
root.rootTree().edit(1, 3)
},
Expand Down Expand Up @@ -268,7 +269,7 @@ class JsonTreeSplitMergeTest {
d1.getRoot().rootTree().toXml(),
)

JsonTreeTest.updateAndSync(
updateAndSync(
JsonTreeTest.Companion.Updater(c1, d1) { root, _ ->
root.rootTree().edit(2, 6)
},
Expand Down Expand Up @@ -309,7 +310,7 @@ class JsonTreeSplitMergeTest {
}.await()
assertEquals("<doc><p>a</p><p>c</p><p>b</p></doc>", d1.getRoot().rootTree().toXml())

JsonTreeTest.updateAndSync(
updateAndSync(
JsonTreeTest.Companion.Updater(c1, d1) { root, _ ->
root.rootTree().edit(2, 7)
},
Expand Down
14 changes: 11 additions & 3 deletions yorkie/src/main/kotlin/dev/yorkie/document/Document.kt
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ public class Document(
public val garbageLength: Int
get() = root.garbageLength

private val presenceEventQueue = mutableListOf<PresenceChanged>()
@VisibleForTesting
internal val presenceEventQueue = mutableListOf<PresenceChanged>()
private val pendingPresenceEvents = mutableListOf<PresenceChanged>()

private val onlineClients = MutableStateFlow(setOf<ActorID>())
Expand Down Expand Up @@ -360,6 +361,7 @@ public class Document(
*/
private suspend fun publishPresenceEvent(presences: Presences) {
val iterator = presenceEventQueue.listIterator()
var clearPresenceEventQueue = false
while (iterator.hasNext()) {
val event = iterator.next()
if (event is Others && event.changed.actorID == changeID.actor) {
Expand All @@ -368,10 +370,16 @@ public class Document(
}

if (presenceEventReadyToBePublished(event, presences)) {
if (presenceEventQueue.first() != event) {
clearPresenceEventQueue = true
}
eventStream.emit(event)
iterator.remove()
}
}
if (clearPresenceEventQueue) {
presenceEventQueue.clear()
}
}

private fun presenceEventReadyToBePublished(
Expand All @@ -382,14 +390,14 @@ public class Document(
is MyPresence.Initialized -> presences.keys.containsAll(event.initialized.keys)
is MyPresence.PresenceChanged -> {
val actorID = event.changed.actorID
actorID !in presences || event.changed.presence == presences[actorID]
event.changed.presence == presences[actorID]
}

is Others.Watched -> event.changed.actorID in presences
is Others.Unwatched -> event.changed.actorID !in presences
is Others.PresenceChanged -> {
val actorID = event.changed.actorID
actorID !in presences || event.changed.presence == presences[actorID]
event.changed.presence == presences[actorID]
}
}
}
Expand Down

0 comments on commit 60ea15d

Please sign in to comment.