Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Fix up _filter_results
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed Oct 6, 2020
1 parent e91f0e9 commit 604e33f
Showing 1 changed file with 56 additions and 38 deletions.
94 changes: 56 additions & 38 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,44 +211,50 @@ def _make_generic_sql_bound(


def _filter_results(
direction: str,
from_token: Optional[RoomStreamToken],
to_token: Optional[RoomStreamToken],
lower_token: Optional[RoomStreamToken],
upper_token: Optional[RoomStreamToken],
instance_name: str,
topological_ordering: int,
stream_ordering: int,
) -> bool:
"""Filter results from fetching events in the DB against the given tokens.
This is necessary to handle the case where the tokens include position
maps, which we handle by fetching more than necessary from the DB and then
filtering (rather than attempting to construct a complicated SQL query).
"""Returns True if the event persisted by the given instance at the given
topological/stream_ordering falls between the two tokens (taking a None
token to mean unbounded).
Used to filter results from fetching events in the DB against the given
tokens. This is necessary to handle the case where the tokens include
position maps, which we handle by fetching more than necessary from the DB
and then filtering (rather than attempting to construct a complicated SQL
query).
"""

# We will have already filtered by the topological tokens, so we don't
# bother checking topological token bounds again.
if from_token and from_token.topological:
from_token = None

if to_token and to_token.topological:
to_token = None

lower_bound = None
if direction == "f" and from_token:
lower_bound = from_token.get_stream_pos_for_instance(instance_name)
elif direction == "b" and to_token:
lower_bound = to_token.get_stream_pos_for_instance(instance_name)

if lower_bound and stream_ordering <= lower_bound:
return False
event_historical_tuple = (
topological_ordering,
stream_ordering,
)

upper_bound = None
if direction == "b" and from_token:
upper_bound = from_token.get_stream_pos_for_instance(instance_name)
elif direction == "f" and to_token:
upper_bound = to_token.get_stream_pos_for_instance(instance_name)
if lower_token:
if lower_token.topological:
# If these are historical tokens we compare the `(topological, stream)`
# tuples.
if event_historical_tuple <= lower_token.as_historical_tuple():
return False

if upper_bound and upper_bound < stream_ordering:
return False
else:
# If these are live tokens we compare the stream ordering against the
# writers stream position.
if stream_ordering <= lower_token.get_stream_pos_for_instance(
instance_name
):
return False

if upper_token:
if upper_token.topological:
if upper_token.as_historical_tuple() < event_historical_tuple:
return False
else:
if upper_token.get_stream_pos_for_instance(instance_name) < stream_ordering:
return False

return True

Expand Down Expand Up @@ -482,7 +488,7 @@ def f(txn):
max_to_id = to_key.get_max_stream_pos()

sql = """
SELECT event_id, instance_name, stream_ordering
SELECT event_id, instance_name, topological_ordering, stream_ordering
FROM events
WHERE
room_id = ?
Expand All @@ -496,9 +502,13 @@ def f(txn):

rows = [
_EventDictReturn(event_id, None, stream_ordering)
for event_id, instance_name, stream_ordering in txn
for event_id, instance_name, topological_ordering, stream_ordering in txn
if _filter_results(
"f", from_key, to_key, instance_name, stream_ordering
from_key,
to_key,
instance_name,
topological_ordering,
stream_ordering,
)
][:limit]
return rows
Expand Down Expand Up @@ -543,7 +553,7 @@ def f(txn):
max_to_id = to_key.get_max_stream_pos()

sql = """
SELECT m.event_id, instance_name, stream_ordering
SELECT m.event_id, instance_name, topological_ordering, stream_ordering
FROM events AS e, room_memberships AS m
WHERE e.event_id = m.event_id
AND m.user_id = ?
Expand All @@ -554,9 +564,13 @@ def f(txn):

rows = [
_EventDictReturn(event_id, None, stream_ordering)
for event_id, instance_name, stream_ordering in txn
for event_id, instance_name, topological_ordering, stream_ordering in txn
if _filter_results(
"f", from_key, to_key, instance_name, stream_ordering
from_key,
to_key,
instance_name,
topological_ordering,
stream_ordering,
)
]

Expand Down Expand Up @@ -1159,7 +1173,11 @@ def _paginate_room_events_txn(
_EventDictReturn(event_id, topological_ordering, stream_ordering)
for event_id, instance_name, topological_ordering, stream_ordering in txn
if _filter_results(
direction, from_token, to_token, instance_name, stream_ordering
lower_token=to_token if direction == "b" else from_token,
upper_token=from_token if direction == "b" else to_token,
instance_name=instance_name,
topological_ordering=topological_ordering,
stream_ordering=stream_ordering,
)
][:limit]

Expand Down

0 comments on commit 604e33f

Please sign in to comment.