From b419b7e714d37a0e6ed0c8953c5dae985f23a196 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Wed, 29 Mar 2023 15:30:14 +0200 Subject: [PATCH 1/5] to_device updates could be dropped when consuming the replication stream --- synapse/storage/databases/main/deviceinbox.py | 6 +- .../tcp/streams/test_account_data.py | 5 -- .../replication/tcp/streams/test_to_device.py | 89 +++++++++++++++++++ 3 files changed, 93 insertions(+), 7 deletions(-) create mode 100644 tests/replication/tcp/streams/test_to_device.py diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 0d75d9739a70..6b87910f5721 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -639,10 +639,12 @@ def get_all_new_device_messages_txn( # Order by ascending stream ordering updates.sort() - limited = False - upto_token = current_id + upto_token = upper_pos if len(updates) >= limit: upto_token = updates[-1][0] + + limited = False + if upto_token < current_id: limited = True return updates, upto_token, limited diff --git a/tests/replication/tcp/streams/test_account_data.py b/tests/replication/tcp/streams/test_account_data.py index 01df1be0473c..b9075e3f207d 100644 --- a/tests/replication/tcp/streams/test_account_data.py +++ b/tests/replication/tcp/streams/test_account_data.py @@ -37,11 +37,6 @@ def test_update_function_room_account_data_limit(self) -> None: # also one global update self.get_success(store.add_account_data_for_user("test_user", "m.global", {})) - # tell the notifier to catch up to avoid duplicate rows. - # workaround for https://github.com/matrix-org/synapse/issues/7360 - # FIXME remove this when the above is fixed - self.replicate() - # check we're testing what we think we are: no rows should yet have been # received self.assertEqual([], self.test_handler.received_rdata_rows) diff --git a/tests/replication/tcp/streams/test_to_device.py b/tests/replication/tcp/streams/test_to_device.py new file mode 100644 index 000000000000..4de6a40c5738 --- /dev/null +++ b/tests/replication/tcp/streams/test_to_device.py @@ -0,0 +1,89 @@ +# Copyright 2023 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging + +import synapse +from synapse.replication.tcp.streams._base import _STREAM_UPDATE_TARGET_ROW_COUNT +from synapse.types import JsonDict + +from tests.replication._base import BaseStreamTestCase + +logger = logging.getLogger(__name__) + + +class ToDeviceStreamTestCase(BaseStreamTestCase): + servlets = [ + synapse.rest.admin.register_servlets, + synapse.rest.client.login.register_servlets, + ] + + def test_to_device_stream(self) -> None: + store = self.hs.get_datastores().main + + user1 = self.register_user("user1", "pass") + self.login("user1", "pass", "device") + user2 = self.register_user("user2", "pass") + self.login("user2", "pass", "device") + + # connect to pull the updates related to users creation/login + self.reconnect() + self.replicate() + self.test_handler.received_rdata_rows.clear() + # disconnect so we can accumulate the updates without pulling them + self.disconnect() + + msg: JsonDict = {} + msg["sender"] = "@sender:example.org" + msg["type"] = "m.new_device" + + # add messages to the device inbox for user1 up until the + # limit defined for a stream update batch + for i in range(0, _STREAM_UPDATE_TARGET_ROW_COUNT): + msg["content"] = {"device": {"device_id": f"{i}"}} + messages = {user1: {"device": msg}} + + self.get_success( + store.add_messages_from_remote_to_device_inbox( + "example.org", + f"{i}", + messages, + ) + ) + + # add one more message, for user2 this time + # this message would be dropped before fixing #15335 + msg["content"] = {"device": {"device_id": "1"}} + messages = {user2: {"device": msg}} + + self.get_success( + store.add_messages_from_remote_to_device_inbox( + "example.org", + f"{_STREAM_UPDATE_TARGET_ROW_COUNT}", + messages, + ) + ) + + # replication is disconnected so we shouldn't get any updates yet + self.assertEqual([], self.test_handler.received_rdata_rows) + + # now reconnect to pull the updates + self.reconnect() + self.replicate() + + # we should receive the fact that we have to_device updates + # for user1 and user2 + received_rows = self.test_handler.received_rdata_rows + self.assertEqual(len(received_rows), 2) + self.assertEqual(received_rows[0][2].entity, user1) + self.assertEqual(received_rows[1][2].entity, user2) From 6fafd212352198e3038f580822bc0e77d5104f08 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Wed, 29 Mar 2023 15:45:31 +0200 Subject: [PATCH 2/5] Add changelog --- changelog.d/15349.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/15349.bugfix diff --git a/changelog.d/15349.bugfix b/changelog.d/15349.bugfix new file mode 100644 index 000000000000..65ea7ae7eb1e --- /dev/null +++ b/changelog.d/15349.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where some to_device messages could be dropped when using workers. From abd0af713d7518f6a1128faccf9fe74a8a508902 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Wed, 29 Mar 2023 17:07:13 +0200 Subject: [PATCH 3/5] Disable stream replication tests when not using postgres --- tests/replication/_base.py | 4 ++++ tests/replication/tcp/streams/test_to_device.py | 4 ++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/replication/_base.py b/tests/replication/_base.py index 46a8e2013e41..0f1a8a145f6f 100644 --- a/tests/replication/_base.py +++ b/tests/replication/_base.py @@ -54,6 +54,10 @@ class BaseStreamTestCase(unittest.HomeserverTestCase): if not hiredis: skip = "Requires hiredis" + if not USE_POSTGRES_FOR_TESTS: + # Redis replication only takes place on Postgres + skip = "Requires Postgres" + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: # build a replication server server_factory = ReplicationStreamProtocolFactory(hs) diff --git a/tests/replication/tcp/streams/test_to_device.py b/tests/replication/tcp/streams/test_to_device.py index 4de6a40c5738..fb9eac668f19 100644 --- a/tests/replication/tcp/streams/test_to_device.py +++ b/tests/replication/tcp/streams/test_to_device.py @@ -50,7 +50,7 @@ def test_to_device_stream(self) -> None: # add messages to the device inbox for user1 up until the # limit defined for a stream update batch for i in range(0, _STREAM_UPDATE_TARGET_ROW_COUNT): - msg["content"] = {"device": {"device_id": f"{i}"}} + msg["content"] = {"device": {}} messages = {user1: {"device": msg}} self.get_success( @@ -63,7 +63,7 @@ def test_to_device_stream(self) -> None: # add one more message, for user2 this time # this message would be dropped before fixing #15335 - msg["content"] = {"device": {"device_id": "1"}} + msg["content"] = {"device": {}} messages = {user2: {"device": msg}} self.get_success( From c59743f60da4fe226b6eaffb1fdfbefb7d0f18c4 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Thu, 30 Mar 2023 12:56:06 +0200 Subject: [PATCH 4/5] Update synapse/storage/databases/main/deviceinbox.py Co-authored-by: reivilibre --- synapse/storage/databases/main/deviceinbox.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 6b87910f5721..3e96b3f3c5f2 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -643,9 +643,7 @@ def get_all_new_device_messages_txn( if len(updates) >= limit: upto_token = updates[-1][0] - limited = False - if upto_token < current_id: - limited = True + limited = upto_token < current_id return updates, upto_token, limited From e5118c244bd3322e5475b48e75912af3d3558394 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Thu, 30 Mar 2023 16:48:42 +0200 Subject: [PATCH 5/5] Simplify logic --- synapse/storage/databases/main/deviceinbox.py | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 3e96b3f3c5f2..b471fcb064a2 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -617,14 +617,14 @@ def get_all_new_device_messages_txn( # We limit like this as we might have multiple rows per stream_id, and # we want to make sure we always get all entries for any stream_id # we return. - upper_pos = min(current_id, last_id + limit) + upto_token = min(current_id, last_id + limit) sql = ( "SELECT max(stream_id), user_id" " FROM device_inbox" " WHERE ? < stream_id AND stream_id <= ?" " GROUP BY user_id" ) - txn.execute(sql, (last_id, upper_pos)) + txn.execute(sql, (last_id, upto_token)) updates = [(row[0], row[1:]) for row in txn] sql = ( @@ -633,19 +633,13 @@ def get_all_new_device_messages_txn( " WHERE ? < stream_id AND stream_id <= ?" " GROUP BY destination" ) - txn.execute(sql, (last_id, upper_pos)) + txn.execute(sql, (last_id, upto_token)) updates.extend((row[0], row[1:]) for row in txn) # Order by ascending stream ordering updates.sort() - upto_token = upper_pos - if len(updates) >= limit: - upto_token = updates[-1][0] - - limited = upto_token < current_id - - return updates, upto_token, limited + return updates, upto_token, upto_token < current_id return await self.db_pool.runInteraction( "get_all_new_device_messages", get_all_new_device_messages_txn