diff --git a/changelog.d/15349.bugfix b/changelog.d/15349.bugfix new file mode 100644 index 000000000000..65ea7ae7eb1e --- /dev/null +++ b/changelog.d/15349.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where some to_device messages could be dropped when using workers. diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 0d75d9739a70..b471fcb064a2 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -617,14 +617,14 @@ def get_all_new_device_messages_txn( # We limit like this as we might have multiple rows per stream_id, and # we want to make sure we always get all entries for any stream_id # we return. - upper_pos = min(current_id, last_id + limit) + upto_token = min(current_id, last_id + limit) sql = ( "SELECT max(stream_id), user_id" " FROM device_inbox" " WHERE ? < stream_id AND stream_id <= ?" " GROUP BY user_id" ) - txn.execute(sql, (last_id, upper_pos)) + txn.execute(sql, (last_id, upto_token)) updates = [(row[0], row[1:]) for row in txn] sql = ( @@ -633,19 +633,13 @@ def get_all_new_device_messages_txn( " WHERE ? < stream_id AND stream_id <= ?" " GROUP BY destination" ) - txn.execute(sql, (last_id, upper_pos)) + txn.execute(sql, (last_id, upto_token)) updates.extend((row[0], row[1:]) for row in txn) # Order by ascending stream ordering updates.sort() - limited = False - upto_token = current_id - if len(updates) >= limit: - upto_token = updates[-1][0] - limited = True - - return updates, upto_token, limited + return updates, upto_token, upto_token < current_id return await self.db_pool.runInteraction( "get_all_new_device_messages", get_all_new_device_messages_txn diff --git a/tests/replication/_base.py b/tests/replication/_base.py index 46a8e2013e41..0f1a8a145f6f 100644 --- a/tests/replication/_base.py +++ b/tests/replication/_base.py @@ -54,6 +54,10 @@ class BaseStreamTestCase(unittest.HomeserverTestCase): if not hiredis: skip = "Requires hiredis" + if not USE_POSTGRES_FOR_TESTS: + # Redis replication only takes place on Postgres + skip = "Requires Postgres" + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: # build a replication server server_factory = ReplicationStreamProtocolFactory(hs) diff --git a/tests/replication/tcp/streams/test_account_data.py b/tests/replication/tcp/streams/test_account_data.py index 01df1be0473c..b9075e3f207d 100644 --- a/tests/replication/tcp/streams/test_account_data.py +++ b/tests/replication/tcp/streams/test_account_data.py @@ -37,11 +37,6 @@ def test_update_function_room_account_data_limit(self) -> None: # also one global update self.get_success(store.add_account_data_for_user("test_user", "m.global", {})) - # tell the notifier to catch up to avoid duplicate rows. - # workaround for https://github.com/matrix-org/synapse/issues/7360 - # FIXME remove this when the above is fixed - self.replicate() - # check we're testing what we think we are: no rows should yet have been # received self.assertEqual([], self.test_handler.received_rdata_rows) diff --git a/tests/replication/tcp/streams/test_to_device.py b/tests/replication/tcp/streams/test_to_device.py new file mode 100644 index 000000000000..fb9eac668f19 --- /dev/null +++ b/tests/replication/tcp/streams/test_to_device.py @@ -0,0 +1,89 @@ +# Copyright 2023 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging + +import synapse +from synapse.replication.tcp.streams._base import _STREAM_UPDATE_TARGET_ROW_COUNT +from synapse.types import JsonDict + +from tests.replication._base import BaseStreamTestCase + +logger = logging.getLogger(__name__) + + +class ToDeviceStreamTestCase(BaseStreamTestCase): + servlets = [ + synapse.rest.admin.register_servlets, + synapse.rest.client.login.register_servlets, + ] + + def test_to_device_stream(self) -> None: + store = self.hs.get_datastores().main + + user1 = self.register_user("user1", "pass") + self.login("user1", "pass", "device") + user2 = self.register_user("user2", "pass") + self.login("user2", "pass", "device") + + # connect to pull the updates related to users creation/login + self.reconnect() + self.replicate() + self.test_handler.received_rdata_rows.clear() + # disconnect so we can accumulate the updates without pulling them + self.disconnect() + + msg: JsonDict = {} + msg["sender"] = "@sender:example.org" + msg["type"] = "m.new_device" + + # add messages to the device inbox for user1 up until the + # limit defined for a stream update batch + for i in range(0, _STREAM_UPDATE_TARGET_ROW_COUNT): + msg["content"] = {"device": {}} + messages = {user1: {"device": msg}} + + self.get_success( + store.add_messages_from_remote_to_device_inbox( + "example.org", + f"{i}", + messages, + ) + ) + + # add one more message, for user2 this time + # this message would be dropped before fixing #15335 + msg["content"] = {"device": {}} + messages = {user2: {"device": msg}} + + self.get_success( + store.add_messages_from_remote_to_device_inbox( + "example.org", + f"{_STREAM_UPDATE_TARGET_ROW_COUNT}", + messages, + ) + ) + + # replication is disconnected so we shouldn't get any updates yet + self.assertEqual([], self.test_handler.received_rdata_rows) + + # now reconnect to pull the updates + self.reconnect() + self.replicate() + + # we should receive the fact that we have to_device updates + # for user1 and user2 + received_rows = self.test_handler.received_rdata_rows + self.assertEqual(len(received_rows), 2) + self.assertEqual(received_rows[0][2].entity, user1) + self.assertEqual(received_rows[1][2].entity, user2)