diff --git a/changelog.d/10591.misc b/changelog.d/10591.misc new file mode 100644 index 000000000000..9a765435dbe4 --- /dev/null +++ b/changelog.d/10591.misc @@ -0,0 +1 @@ +Clean up some of the federation event authentication code for clarity. diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 0298af4c02d7..a730c1719a95 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -396,10 +396,11 @@ def __str__(self): return self.__repr__() def __repr__(self): - return "" % ( + return "" % ( self.get("event_id", None), self.get("type", None), self.get("state_key", None), + self.internal_metadata.is_outlier(), ) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 78d5aac6afd2..afd8f8580a2f 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -1003,6 +1003,7 @@ async def _process_incoming_pdus_in_room_inner( # has started processing). while True: async with lock: + logger.info("handling received PDU: %s", event) try: await self.handler.on_receive_pdu( origin, event, sent_to_us_directly=True diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 9a5e7265330c..c0e13bdaac1d 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -220,8 +220,6 @@ async def on_receive_pdu( room_id = pdu.room_id event_id = pdu.event_id - logger.info("handling received PDU: %s", pdu) - # We reprocess pdus when we have seen them only as outliers existing = await self.store.get_event( event_id, allow_none=True, allow_rejected=True @@ -229,14 +227,19 @@ async def on_receive_pdu( # FIXME: Currently we fetch an event again when we already have it # if it has been marked as an outlier. - - already_seen = existing and ( - not existing.internal_metadata.is_outlier() - or pdu.internal_metadata.is_outlier() - ) - if already_seen: - logger.debug("Already seen pdu") - return + if existing: + if not existing.internal_metadata.is_outlier(): + logger.info( + "Ignoring received event %s which we have already seen", event_id + ) + return + if pdu.internal_metadata.is_outlier(): + logger.info( + "Ignoring received outlier %s which we already have as an outlier", + event_id, + ) + return + logger.info("De-outliering event %s", event_id) # do some initial sanity-checking of the event. In particular, make # sure it doesn't have hundreds of prev_events or auth_events, which @@ -331,7 +334,8 @@ async def on_receive_pdu( "Found all missing prev_events", ) - if prevs - seen: + missing_prevs = prevs - seen + if missing_prevs: # We've still not been able to get all of the prev_events for this event. # # In this case, we need to fall back to asking another server in the @@ -359,8 +363,8 @@ async def on_receive_pdu( if sent_to_us_directly: logger.warning( "Rejecting: failed to fetch %d prev events: %s", - len(prevs - seen), - shortstr(prevs - seen), + len(missing_prevs), + shortstr(missing_prevs), ) raise FederationError( "ERROR", @@ -373,9 +377,10 @@ async def on_receive_pdu( ) logger.info( - "Event %s is missing prev_events: calculating state for a " + "Event %s is missing prev_events %s: calculating state for a " "backwards extremity", event_id, + shortstr(missing_prevs), ) # Calculate the state after each of the previous events, and @@ -393,7 +398,7 @@ async def on_receive_pdu( # Ask the remote server for the states we don't # know about - for p in prevs - seen: + for p in missing_prevs: logger.info("Requesting state after missing prev_event %s", p) with nested_logging_context(p): @@ -556,21 +561,14 @@ async def _get_missing_events_for_pdu( logger.warning("Failed to get prev_events: %s", e) return - logger.info( - "Got %d prev_events: %s", - len(missing_events), - shortstr(missing_events), - ) + logger.info("Got %d prev_events", len(missing_events)) # We want to sort these by depth so we process them and # tell clients about them in order. missing_events.sort(key=lambda x: x.depth) for ev in missing_events: - logger.info( - "Handling received prev_event %s", - ev.event_id, - ) + logger.info("Handling received prev_event %s", ev) with nested_logging_context(ev.event_id): try: await self.on_receive_pdu(origin, ev, sent_to_us_directly=False) @@ -1762,10 +1760,8 @@ async def _handle_queued_pdus( for p, origin in room_queue: try: logger.info( - "Processing queued PDU %s which was received " - "while we were joining %s", - p.event_id, - p.room_id, + "Processing queued PDU %s which was received while we were joining", + p, ) with nested_logging_context(p.event_id): await self.on_receive_pdu(origin, p, sent_to_us_directly=True)