diff --git a/changelog.d/17127.bugfix b/changelog.d/17127.bugfix new file mode 100644 index 00000000000..93c7314098b --- /dev/null +++ b/changelog.d/17127.bugfix @@ -0,0 +1 @@ +Fix a bug which meant that to-device messages received over federation could be dropped when the server was under load or networking problems caused problems between Synapse processes or the database. diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 65d3a661fe1..7ffc650aa1e 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -546,7 +546,25 @@ async def _process_edu(edu_dict: JsonDict) -> None: edu_type=edu_dict["edu_type"], content=edu_dict["content"], ) - await self.registry.on_edu(edu.edu_type, origin, edu.content) + try: + await self.registry.on_edu(edu.edu_type, origin, edu.content) + except Exception: + # If there was an error handling the EDU, we must reject the + # transaction. + # + # Some EDU types (notably, to-device messages) are, despite their name, + # expected to be reliable; if we weren't able to do something with it, + # we have to tell the sender that, and the only way the protocol gives + # us to do so is by sending an HTTP error back on the transaction. + # + # We log the exception now, and then raise a new SynapseError to cause + # the transaction to be failed. + logger.exception("Error handling EDU of type %s", edu.edu_type) + raise SynapseError(500, f"Error handing EDU of type {edu.edu_type}") + + # TODO: if the first EDU fails, we should probably abort the whole + # thing rather than carrying on with the rest of them. That would + # probably be best done inside `concurrently_execute`. await concurrently_execute( _process_edu, @@ -1414,12 +1432,7 @@ async def on_edu(self, edu_type: str, origin: str, content: dict) -> None: handler = self.edu_handlers.get(edu_type) if handler: with start_active_span_from_edu(content, "handle_edu"): - try: - await handler(origin, content) - except SynapseError as e: - logger.info("Failed to handle edu %r: %r", edu_type, e) - except Exception: - logger.exception("Failed to handle edu %r", edu_type) + await handler(origin, content) return # Check if we can route it somewhere else that isn't us @@ -1428,17 +1441,12 @@ async def on_edu(self, edu_type: str, origin: str, content: dict) -> None: # Pick an instance randomly so that we don't overload one. route_to = random.choice(instances) - try: - await self._send_edu( - instance_name=route_to, - edu_type=edu_type, - origin=origin, - content=content, - ) - except SynapseError as e: - logger.info("Failed to handle edu %r: %r", edu_type, e) - except Exception: - logger.exception("Failed to handle edu %r", edu_type) + await self._send_edu( + instance_name=route_to, + edu_type=edu_type, + origin=origin, + content=content, + ) return # Oh well, let's just log and move on. diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index 2b034dcbb7f..79be7c97c8e 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -104,6 +104,9 @@ async def on_direct_to_device_edu(self, origin: str, content: JsonDict) -> None: """ Handle receiving to-device messages from remote homeservers. + Note that any errors thrown from this method will cause the federation /send + request to receive an error response. + Args: origin: The remote homeserver. content: The JSON dictionary containing the to-device messages. diff --git a/tests/federation/test_federation_server.py b/tests/federation/test_federation_server.py index 36684c2c915..88261450b1f 100644 --- a/tests/federation/test_federation_server.py +++ b/tests/federation/test_federation_server.py @@ -67,6 +67,23 @@ def test_bad_request(self, query_content: bytes) -> None: self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, channel.result) self.assertEqual(channel.json_body["errcode"], "M_NOT_JSON") + def test_failed_edu_causes_500(self) -> None: + """If the EDU handler fails, /send should return a 500.""" + + async def failing_handler(_origin: str, _content: JsonDict) -> None: + raise Exception("bleh") + + self.hs.get_federation_registry().register_edu_handler( + "FAIL_EDU_TYPE", failing_handler + ) + + channel = self.make_signed_federation_request( + "PUT", + "/_matrix/federation/v1/send/txn", + {"edus": [{"edu_type": "FAIL_EDU_TYPE", "content": {}}]}, + ) + self.assertEqual(500, channel.code, channel.result) + class ServerACLsTestCase(unittest.TestCase): def test_blocked_server(self) -> None: diff --git a/tests/federation/transport/test_server.py b/tests/federation/transport/test_server.py index 190b79bf26c..02373699986 100644 --- a/tests/federation/transport/test_server.py +++ b/tests/federation/transport/test_server.py @@ -59,7 +59,14 @@ def test_edu_debugging_doesnt_explode(self) -> None: "/_matrix/federation/v1/send/txn_id_1234/", content={ "edus": [ - {"edu_type": EduTypes.DEVICE_LIST_UPDATE, "content": {"foo": "bar"}} + { + "edu_type": EduTypes.DEVICE_LIST_UPDATE, + "content": { + "device_id": "QBUAZIFURK", + "stream_id": 0, + "user_id": "@user:id", + }, + }, ], "pdus": [], },