Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Draft: Add support for MSC2716 marker events #10420

Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
164e32b
Add support for MSC2716 marker events
MadLittleMods Jul 16, 2021
435f074
Process markers when we receive it over federation
MadLittleMods Jul 16, 2021
e0e1bd0
WIP: make hs2 backfill historical messages after marker event
MadLittleMods Jul 17, 2021
d63c34c
hs2 to better ask for insertion event extremity
MadLittleMods Jul 17, 2021
2196ba5
Add insertion_event_extremities table
MadLittleMods Jul 17, 2021
8ebbc5f
Merge branch 'madlittlemods/2716-backfill-historical-events-for-feder…
MadLittleMods Jul 20, 2021
187ab28
Messy: Fix undefined state_group for federated historical events
MadLittleMods Jul 21, 2021
9d70e95
Revert "Messy: Fix undefined state_group for federated historical eve…
MadLittleMods Jul 21, 2021
7da24b9
Always send device_one_time_keys_count (#10457)
dbkr Jul 22, 2021
283bb5c
1.38.1
erikjohnston Jul 22, 2021
4565063
Merge commit '7da24b975dfb10c277cf963dfddb88f55b1ca598' into release-…
erikjohnston Jul 22, 2021
f76f8c1
1.39.0rc2
erikjohnston Jul 22, 2021
4c3fdfc
Fix an error in the docker workflow (#10461)
richvdh Jul 22, 2021
016f085
Merge tag 'v1.38.1'
richvdh Jul 22, 2021
347a3e1
Merge branch 'madlittlemods/2716-backfill-historical-events-for-feder…
MadLittleMods Jul 23, 2021
683deee
Merge branch 'master' into develop
erikjohnston Jul 23, 2021
c39a417
Merge tag 'v1.39.0rc2' into develop
erikjohnston Jul 23, 2021
6e22756
Merge tag 'v1.38.1' into release-v1.39
erikjohnston Jul 23, 2021
ab82fd6
Merge branch 'release-v1.39' into develop
erikjohnston Jul 23, 2021
f22252d
Enable docker image caching for the deb build (#10431)
richvdh Jul 26, 2021
4fb92d9
Add type hints to synapse.federation.transport.client. (#10408)
clokep Jul 26, 2021
228decf
Update the MSC3083 support to verify if joins are from an authorized …
clokep Jul 26, 2021
b7186c6
Add type hints to state handler. (#10482)
clokep Jul 26, 2021
b3a757e
Support MSC2033: Device ID on whoami (#9918)
turt2live Jul 27, 2021
92a8822
Change release script to update debian changelog for RCs (#10465)
erikjohnston Jul 27, 2021
2476d53
Mitigate media repo XSSs on IE11. (#10468)
dkasak Jul 27, 2021
1394467
Use new go test running syntax for complement. (#10488)
clokep Jul 27, 2021
e16eab2
Add a PeriodicallyFlushingMemoryHandler to prevent logging silence (#…
reivilibre Jul 27, 2021
74d09a4
Always communicate device OTK counts to clients (#10485)
anoadragon453 Jul 27, 2021
10dcfae
Fix typo that causes R30v2 to actually be old R30 (#10486)
reivilibre Jul 27, 2021
31c6b30
Fix import of the default SAML mapping provider. (#10477)
jaywink Jul 27, 2021
076dead
allow specifying https:// proxy (#10411)
dklimpel Jul 27, 2021
5b22d5e
Fix `oldest_pdu_in_federation_staging` (#10455)
erikjohnston Jul 27, 2021
8e1febc
Support underscores (in addition to hyphens) for charset detection. (…
srividyut Jul 27, 2021
0489683
Document Complement dev usage (#10483)
MadLittleMods Jul 27, 2021
c3b0377
Support for MSC2285 (hidden read receipts) (#10413)
SimonBrandner Jul 28, 2021
752fe0c
Restricted rooms (MSC3083) should not have their allow key redacted. …
clokep Jul 28, 2021
9643dfd
improve typing annotations in CachedCall (#10450)
richvdh Jul 28, 2021
d9cb658
Fix up type hints for Twisted 21.7 (#10490)
richvdh Jul 28, 2021
5146e19
1.39.0rc3
erikjohnston Jul 28, 2021
2254e67
Fixup changelog
erikjohnston Jul 28, 2021
8c201c9
Merge tag 'v1.39.0rc3' into develop
erikjohnston Jul 28, 2021
d0b294a
Make historical events discoverable from backfill for servers without…
MadLittleMods Jul 28, 2021
858363d
Generics for `ObservableDeferred` (#10491)
richvdh Jul 28, 2021
97fb158
Merge branch 'develop' into madlittlemods/2716-marker-events
MadLittleMods Jul 28, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion scripts-dev/complement.sh
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,4 @@ if [[ -n "$1" ]]; then
fi

# Run the tests!
go test -v -tags synapse_blacklist,msc2946,msc3083,msc2716,msc2403 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests
go test -v -tags synapse_blacklist,msc2946,msc3083,msc2716,msc2403 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests -run TestBackfillingHistory/parallel/Historical_messages_are_visible_when_already_joined_on_federated_server
75 changes: 72 additions & 3 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

from synapse import event_auth
from synapse.api.constants import (
EventContentFields,
EventTypes,
Membership,
RejectedReason,
Expand Down Expand Up @@ -263,6 +264,7 @@ async def on_receive_pdu(
state = None

# Get missing pdus if necessary.
# We don't need to worry about outliers because TODO!
Copy link
Contributor Author

@MadLittleMods MadLittleMods Jul 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to the PR but just want to better clarify why we skip outliers in this step. Any insight here?

  • Outliers are outside of the normal DAG
  • We don't have state for outliers
  • We trust their auth events over calculating them

if not pdu.internal_metadata.is_outlier():
# We only backfill backwards to the min depth.
min_depth = await self.get_min_depth_for_context(pdu.room_id)
Expand Down Expand Up @@ -889,6 +891,63 @@ async def _process_received_pdu(
"resync_device_due_to_pdu", self._resync_device, event.sender
)

await self._handle_marker_event(origin, event)

async def _handle_marker_event(self, origin: str, marker_event: EventBase):
"""Handles backfilling the insertion event when we receive a marker
event that points to one

Args:
origin: Origin of the event. Will be called to get the insertion event
event: The event to process
"""

if marker_event.type != EventTypes.MSC2716_MARKER:
# Not a marker event
return

logger.info("_handle_marker_event: received %s", marker_event)

insertion_event_id = marker_event.content.get(
EventContentFields.MSC2716_MARKER_INSERTION
)

if insertion_event_id is None:
# Nothing to retrieve then (invalid marker)
return

logger.info(
"_handle_marker_event: backfilling insertion event %s", insertion_event_id
)

await self._get_events_and_persist(
origin,
marker_event.room_id,
[insertion_event_id],
)

insertion_event = await self.store.get_event(
insertion_event_id, allow_none=True
)
if insertion_event is None:
logger.warning(
"_handle_marker_event: server %s didn't return insertion event %s for marker %s",
origin,
insertion_event_id,
marker_event.event_id,
)
return

logger.info(
"_handle_marker_event: Succesfully backfilled insertion event %s from marker event %s",
insertion_event,
marker_event,
)

await self.store.insert_backward_extremity(
insertion_event_id, marker_event.room_id
)

async def _resync_device(self, sender: str) -> None:
"""We have detected that the device list for the given user may be out
of sync, so we try and resync them.
Expand Down Expand Up @@ -1057,7 +1116,12 @@ async def maybe_backfill(
async def _maybe_backfill_inner(
self, room_id: str, current_depth: int, limit: int
) -> bool:
extremities = await self.store.get_oldest_events_with_depth_in_room(room_id)
oldest_events = await self.store.get_oldest_events_with_depth_in_room(room_id)
insertion_events_to_be_backfilled = (
await self.store.get_insertion_event_backwards_extremities_in_room(room_id)
)
extremities = {**oldest_events, **insertion_events_to_be_backfilled}
logger.info("_maybe_backfill_inner: extremities %s", extremities)

if not extremities:
logger.debug("Not backfilling as no extremeties found.")
Expand Down Expand Up @@ -1090,12 +1154,14 @@ async def _maybe_backfill_inner(
# types have.

forward_events = await self.store.get_successor_events(list(extremities))
logger.info("_maybe_backfill_inner: forward_events %s", forward_events)
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

extremities_events = await self.store.get_events(
forward_events,
redact_behaviour=EventRedactBehaviour.AS_IS,
get_prev_content=False,
)
logger.info("_maybe_backfill_inner: extremities_events %s", extremities_events)

# We set `check_history_visibility_only` as we might otherwise get false
# positives from users having been erased.
Expand All @@ -1106,6 +1172,9 @@ async def _maybe_backfill_inner(
redact=False,
check_history_visibility_only=True,
)
logger.info(
"_maybe_backfill_inner: filtered_extremities %s", filtered_extremities
)

if not filtered_extremities:
return False
Expand All @@ -1124,15 +1193,15 @@ async def _maybe_backfill_inner(
# much larger factor will result in triggering a backfill request much
# earlier than necessary.
if current_depth - 2 * limit > max_depth:
logger.debug(
logger.info(
"Not backfilling as we don't need to. %d < %d - 2 * %d",
max_depth,
current_depth,
limit,
)
return False

logger.debug(
logger.info(
"room_id: %s, backfill: current_depth: %s, max_depth: %s, extrems: %s",
room_id,
current_depth,
Expand Down
71 changes: 54 additions & 17 deletions synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -666,26 +666,48 @@ def _get_auth_chain_difference_txn(
return {eid for eid, n in event_to_missing_sets.items() if n}

async def get_oldest_events_with_depth_in_room(self, room_id):
def get_oldest_events_with_depth_in_room_txn(txn, room_id):
sql = (
"SELECT b.event_id, MAX(e.depth) FROM events as e"
" INNER JOIN event_edges as g"
" ON g.event_id = e.event_id"
" INNER JOIN event_backward_extremities as b"
" ON g.prev_event_id = b.event_id"
" WHERE b.room_id = ? AND g.is_state is ?"
" GROUP BY b.event_id"
)

txn.execute(sql, (room_id, False))

return dict(txn)

return await self.db_pool.runInteraction(
"get_oldest_events_with_depth_in_room",
self.get_oldest_events_with_depth_in_room_txn,
get_oldest_events_with_depth_in_room_txn,
room_id,
)

def get_oldest_events_with_depth_in_room_txn(self, txn, room_id):
sql = (
"SELECT b.event_id, MAX(e.depth) FROM events as e"
" INNER JOIN event_edges as g"
" ON g.event_id = e.event_id"
" INNER JOIN event_backward_extremities as b"
" ON g.prev_event_id = b.event_id"
" WHERE b.room_id = ? AND g.is_state is ?"
" GROUP BY b.event_id"
)
async def get_insertion_event_backwards_extremities_in_room(self, room_id):
def get_insertion_event_backwards_extremities_in_room_txn(txn, room_id):
sql = """
SELECT b.event_id, MAX(e.depth) FROM insertion_events as i
/* We only want insertion events that are also marked as backwards extremities */
INNER JOIN event_backward_extremities as b USING (event_id)
/* Get the depth of the insertion event from the events table */
INNER JOIN events AS e USING (event_id)
WHERE b.room_id = ?
GROUP BY b.event_id
"""

txn.execute(sql, (room_id, False))
txn.execute(sql, (room_id,))

return dict(txn)
return dict(txn)

return await self.db_pool.runInteraction(
"get_insertion_event_backwards_extremities_in_room",
get_insertion_event_backwards_extremities_in_room_txn,
room_id,
)

async def get_max_depth_of(self, event_ids: List[str]) -> Tuple[str, int]:
"""Returns the event ID and depth for the event that has the max depth from a set of event IDs
Expand Down Expand Up @@ -929,7 +951,7 @@ async def get_backfill_events(self, room_id: str, event_list: list, limit: int):
return sorted(events, key=lambda e: -e.depth)

def _get_backfill_events(self, txn, room_id, event_list, limit):
logger.debug("_get_backfill_events: %s, %r, %s", room_id, event_list, limit)
logger.info("_get_backfill_events: %s, %r, %s", room_id, event_list, limit)

event_results = set()

Expand Down Expand Up @@ -1005,7 +1027,7 @@ def _get_backfill_events(self, txn, room_id, event_list, limit):
connected_insertion_event_query, (event_id, limit - len(event_results))
)
connected_insertion_event_id_results = txn.fetchall()
logger.debug(
logger.info(
"_get_backfill_events: connected_insertion_event_query %s",
connected_insertion_event_id_results,
)
Expand All @@ -1020,7 +1042,7 @@ def _get_backfill_events(self, txn, room_id, event_list, limit):
(connected_insertion_event, limit - len(event_results)),
)
chunk_start_event_id_results = txn.fetchall()
logger.debug(
logger.info(
"_get_backfill_events: chunk_start_event_id_results %s",
chunk_start_event_id_results,
)
Expand All @@ -1030,7 +1052,7 @@ def _get_backfill_events(self, txn, room_id, event_list, limit):

txn.execute(query, (event_id, False, limit - len(event_results)))
prev_event_id_results = txn.fetchall()
logger.debug(
logger.info(
"_get_backfill_events: prev_event_ids %s", prev_event_id_results
)

Expand Down Expand Up @@ -1122,6 +1144,21 @@ def _delete_old_forward_extrem_cache_txn(txn):
_delete_old_forward_extrem_cache_txn,
)

async def insert_backward_extremity(self, event_id: str, room_id: str) -> None:
def _insert_backward_extremity_txn(txn):
self.db_pool.simple_insert_txn(
txn,
table="event_backward_extremities",
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
values={
"event_id": event_id,
"room_id": room_id,
},
)

await self.db_pool.runInteraction(
"_insert_backward_extremity_txn", _insert_backward_extremity_txn
)

async def insert_received_event_to_staging(
self, origin: str, event: EventBase
) -> None:
Expand Down
17 changes: 11 additions & 6 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1777,7 +1777,7 @@ def _handle_insertion_event(self, txn: LoggingTransaction, event: EventBase):
# Invalid insertion event without next chunk ID
return

logger.debug(
logger.info(
"_handle_insertion_event (next_chunk_id=%s) %s", next_chunk_id, event
)

Expand Down Expand Up @@ -1818,7 +1818,7 @@ def _handle_chunk_id(self, txn: LoggingTransaction, event: EventBase):
# No chunk connection to persist
return

logger.debug("_handle_chunk_id %s %s", chunk_id, event)
logger.info("_handle_chunk_id %s %s", chunk_id, event)

# Keep track of the insertion event and the chunk ID
self.db_pool.simple_insert_txn(
Expand Down Expand Up @@ -2091,18 +2091,21 @@ def _update_backward_extremeties(self, txn, events):
for ev in events:
events_by_room.setdefault(ev.room_id, []).append(ev)

# From the events passed in, add all of the prev events as backwards extremities.
# Ignore any events that are already backwards extrems or outliers.
query = (
"INSERT INTO event_backward_extremities (event_id, room_id)"
" SELECT ?, ? WHERE NOT EXISTS ("
" SELECT 1 FROM event_backward_extremities"
" WHERE event_id = ? AND room_id = ?"
" SELECT 1 FROM event_backward_extremities"
" WHERE event_id = ? AND room_id = ?"
" )"
" AND NOT EXISTS ("
" SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
" AND outlier = ?"
" SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
" AND outlier = ?"
" )"
)

logger.info("_update_backward_extremeties %s", events)
txn.execute_batch(
query,
[
Expand All @@ -2113,6 +2116,8 @@ def _update_backward_extremeties(self, txn, events):
],
)

# Delete all these events that we've already fetched and now know that their
# prev events are the new outliers.
query = (
"DELETE FROM event_backward_extremities"
" WHERE event_id = ? AND room_id = ?"
Expand Down
13 changes: 12 additions & 1 deletion synapse/visibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,13 +366,21 @@ def check_event_is_visible(event: EventBase, state: StateMap[EventBase]) -> bool
if erased_senders:
to_return = []
for e in events:
if not is_sender_erased(e, erased_senders):
erased = is_sender_erased(e, erased_senders)
logger.info(
"filter_events_for_server: (all_open) %s erased=%s", e, erased
)
if not erased:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: can revert

to_return.append(e)
elif redact:
to_return.append(prune_event(e))

logger.info("filter_events_for_server: (all_open) to_return=%s", to_return)
return to_return

logger.info(
"filter_events_for_server: all_open and no erased senders %s", events
)
# If there are no erased users then we can just return the given list
# of events without having to copy it.
return events
Expand Down Expand Up @@ -429,6 +437,9 @@ def include(typ, state_key):
for e in events:
erased = is_sender_erased(e, erased_senders)
visible = check_event_is_visible(e, event_to_state[e.event_id])
logger.info(
"filter_events_for_server: %s erased=%s visible=%s", e, erased, visible
)
if visible and not erased:
to_return.append(e)
elif redact:
Expand Down