Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix assertions being thrown by the EventsStream update function #7337

Merged
merged 4 commits into from
Apr 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7337.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug where event updates might not be sent over replication to worker processes after the stream falls behind.
12 changes: 7 additions & 5 deletions synapse/replication/http/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class ReplicationGetStreamUpdates(ReplicationEndpoint):

The API looks like:

GET /_synapse/replication/get_repl_stream_updates/events?from_token=0&to_token=10&limit=100
GET /_synapse/replication/get_repl_stream_updates/<stream name>?from_token=0&to_token=10

200 OK

Expand All @@ -38,6 +38,9 @@ class ReplicationGetStreamUpdates(ReplicationEndpoint):
limited: False,
}

If there are more rows than can sensibly be returned in one lump, `limited` will be
set to true, and the caller should call again with a new `from_token`.

"""

NAME = "get_repl_stream_updates"
Expand All @@ -52,8 +55,8 @@ def __init__(self, hs):
self.streams = hs.get_replication_streamer().get_streams()

@staticmethod
def _serialize_payload(stream_name, from_token, upto_token, limit):
return {"from_token": from_token, "upto_token": upto_token, "limit": limit}
def _serialize_payload(stream_name, from_token, upto_token):
return {"from_token": from_token, "upto_token": upto_token}

async def _handle_request(self, request, stream_name):
stream = self.streams.get(stream_name)
Expand All @@ -62,10 +65,9 @@ async def _handle_request(self, request, stream_name):

from_token = parse_integer(request, "from_token", required=True)
upto_token = parse_integer(request, "upto_token", required=True)
limit = parse_integer(request, "limit", required=True)

updates, upto_token, limited = await stream.get_updates_since(
from_token, upto_token, limit
from_token, upto_token
)

return (
Expand Down
19 changes: 10 additions & 9 deletions synapse/replication/tcp/streams/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@

logger = logging.getLogger(__name__)


MAX_EVENTS_BEHIND = 500000
# the number of rows to request from an update_function.
_STREAM_UPDATE_TARGET_ROW_COUNT = 100


# Some type aliases to make things a bit easier.
Expand Down Expand Up @@ -56,7 +56,11 @@
# * from_token: the previous stream token: the starting point for fetching the
# updates
# * to_token: the new stream token: the point to get updates up to
# * limit: the maximum number of rows to return
# * target_row_count: a target for the number of rows to be returned.
#
# The update_function is expected to return up to _approximately_ target_row_count rows.
# If there are more updates available, it should set `limited` in the result, and
# it will be called again to get the next batch.
#
UpdateFunction = Callable[[Token, Token, int], Awaitable[StreamUpdateResult]]

Expand Down Expand Up @@ -138,7 +142,7 @@ async def get_updates(self) -> StreamUpdateResult:
return updates, current_token, limited

async def get_updates_since(
self, from_token: Token, upto_token: Token, limit: int = 100
self, from_token: Token, upto_token: Token
) -> StreamUpdateResult:
"""Like get_updates except allows specifying from when we should
stream updates
Expand All @@ -156,7 +160,7 @@ async def get_updates_since(
return [], upto_token, False

updates, upto_token, limited = await self.update_function(
from_token, upto_token, limit,
from_token, upto_token, _STREAM_UPDATE_TARGET_ROW_COUNT,
)
return updates, upto_token, limited

Expand Down Expand Up @@ -193,10 +197,7 @@ async def update_function(
from_token: int, upto_token: int, limit: int
) -> StreamUpdateResult:
result = await client(
stream_name=stream_name,
from_token=from_token,
upto_token=upto_token,
limit=limit,
stream_name=stream_name, from_token=from_token, upto_token=upto_token,
)
return result["updates"], result["upto_token"], result["limited"]

Expand Down
113 changes: 95 additions & 18 deletions synapse/replication/tcp/streams/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
# limitations under the License.

import heapq
from typing import Iterable, Tuple, Type
from collections import Iterable
from typing import List, Tuple, Type

import attr

from ._base import Stream, Token, db_query_to_update_function
from ._base import Stream, StreamUpdateResult, Token


"""Handling of the 'events' replication stream
Expand Down Expand Up @@ -117,30 +118,106 @@ class EventsStream(Stream):
def __init__(self, hs):
self._store = hs.get_datastore()
super().__init__(
self._store.get_current_events_token,
db_query_to_update_function(self._update_function),
self._store.get_current_events_token, self._update_function,
)

async def _update_function(
self, from_token: Token, current_token: Token, limit: int
) -> Iterable[tuple]:
self, from_token: Token, current_token: Token, target_row_count: int
) -> StreamUpdateResult:

# the events stream merges together three separate sources:
# * new events
# * current_state changes
# * events which were previously outliers, but have now been de-outliered.
#
# The merge operation is complicated by the fact that we only have a single
# "stream token" which is supposed to indicate how far we have got through
# all three streams. It's therefore no good to return rows 1-1000 from the
# "new events" table if the state_deltas are limited to rows 1-100 by the
# target_row_count.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It took me a while to realise the crux here is that: "for each source we must return all rows up to the same token."

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair. Will add another comment.

#
# In other words: we must pick a new upper limit, and must return *all* rows
# up to that point for each of the three sources.
#
# Start by trying to split the target_row_count up. We expect to have a
# negligible number of ex-outliers, and a rough approximation based on recent
# traffic on sw1v.org shows that there are approximately the same number of
# event rows between a given pair of stream ids as there are state
# updates, so let's split our target_row_count among those two types. The target
# is only an approximation - it doesn't matter if we end up going a bit over it.

target_row_count //= 2

# now we fetch up to that many rows from the events table

event_rows = await self._store.get_all_new_forward_event_rows(
from_token, current_token, limit
)
event_updates = (
(row[0], EventsStreamEventRow.TypeId, row[1:]) for row in event_rows
)
from_token, current_token, target_row_count
) # type: List[Tuple]

# we rely on get_all_new_forward_event_rows strictly honouring the limit, so
# that we know it is safe to just take upper_limit = event_rows[-1][0].
assert (
len(event_rows) <= target_row_count
), "get_all_new_forward_event_rows did not honour row limit"

# if we hit the limit on event_updates, there's no point in going beyond the
# last stream_id in the batch for the other sources.

if len(event_rows) == target_row_count:
limited = True
upper_limit = event_rows[-1][0] # type: int
else:
limited = False
upper_limit = current_token

# next up is the state delta table

state_rows = await self._store.get_all_updated_current_state_deltas(
from_token, current_token, limit
)
state_updates = (
(row[0], EventsStreamCurrentStateRow.TypeId, row[1:]) for row in state_rows
)
from_token, upper_limit, target_row_count
) # type: List[Tuple]

# again, if we've hit the limit there, we'll need to limit the other sources
assert len(state_rows) < target_row_count
if len(state_rows) == target_row_count:
assert state_rows[-1][0] <= upper_limit
upper_limit = state_rows[-1][0]
limited = True
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is safe, either:

  1. get_all_updated_current_state_deltas takes a hard limit and may not return all rows with the last stream_id, or
  2. get_all_updated_current_state_deltas always returns all rows for the last stream_id, and so may return more than limit items.

Currently, it looks like we're in the first case, and so we need to discard all rows with the stream_id of state_rows[-1][0].

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. I think this is the same thing as I was worrying about for the FIXME, below.


# FIXME: is it a given that there is only one row per stream_id in the
# state_deltas table (so that we can be sure that we have got all of the
# rows for upper_limit)?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it isn't a given.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i.e. they can have multiple rows per stream ID afaict

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I suspected as much. I was rather hoping that this wouldn't be any worse than it has been for years, and that it would therefore be safe to deploy while I thought about it a bit harder.


# finally, fetch the ex-outliers rows. We assume there are few enough of these
# not to bother with the limit.

all_updates = heapq.merge(event_updates, state_updates)
ex_outliers_rows = await self._store.get_ex_outlier_stream_rows(
from_token, upper_limit
) # type: List[Tuple]

return all_updates
# we now need to turn the raw database rows returned into tuples suitable
# for the replication protocol (basically, we add an identifier to
# distinguish the row type). At the same time, we can limit the event_rows
# to the max stream_id from state_rows.

event_updates = (
(stream_id, (EventsStreamEventRow.TypeId, rest))
for (stream_id, *rest) in event_rows
if stream_id <= upper_limit
) # type: Iterable[Tuple[int, Tuple]]

state_updates = (
(stream_id, (EventsStreamCurrentStateRow.TypeId, rest))
for (stream_id, *rest) in state_rows
) # type: Iterable[Tuple[int, Tuple]]

ex_outliers_updates = (
(stream_id, (EventsStreamEventRow.TypeId, rest))
for (stream_id, *rest) in ex_outliers_rows
) # type: Iterable[Tuple[int, Tuple]]

# we need to return a sorted list, so merge them together.
updates = list(heapq.merge(event_updates, state_updates, ex_outliers_updates))
return updates, upper_limit, limited

@classmethod
def parse_row(cls, row):
Expand Down
46 changes: 34 additions & 12 deletions synapse/storage/data_stores/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -973,8 +973,18 @@ def get_current_events_token(self):
return self._stream_id_gen.get_current_token()

def get_all_new_forward_event_rows(self, last_id, current_id, limit):
if last_id == current_id:
return defer.succeed([])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why has this been removed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's redundant: there is another check for the same thing up in get_updates_since, and if in the future we do manage to call get_all_new_forward_event_rows with last_id == current_id, it still behaves correctly.

I'm not a fan of shortcuts like this - they make me suspicious that the algorithm they are short-cutting is incorrect somehow. Obviously if there's a significant efficiency gain from the shortcut, there's an argument for it, but since in practice the shortcut is there in get_updates_since, I don't think it has any merit here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Three thoughts here:

  1. I'd rather functions did easy wins in terms of performance to avoid foot guns when changes are made 10 layers up the stack, especially when pretty much all other functions that fetch streams from the DB do this and other optimisations.
  2. Avoiding a DB query is a big win for functions that get called in a loop.
  3. Personally I don't really feel that the presence of a short circuit makes me any more or less inclined to trust the algorithm is right.

"""Returns new events, for the Events replication stream

Args:
last_id: the last stream_id from the previous batch.
current_id: the maximum stream_id to return up to
limit: the maximum number of rows to return

Returns: Deferred[List[Tuple]]
a list of events stream rows. Each tuple consists of a stream id as
the first element, followed by fields suitable for casting into an
EventsStreamRow.
"""

def get_all_new_forward_event_rows(txn):
sql = (
Expand All @@ -989,13 +999,26 @@ def get_all_new_forward_event_rows(txn):
" LIMIT ?"
)
txn.execute(sql, (last_id, current_id, limit))
new_event_updates = txn.fetchall()
return txn.fetchall()

if len(new_event_updates) == limit:
upper_bound = new_event_updates[-1][0]
else:
upper_bound = current_id
return self.db.runInteraction(
"get_all_new_forward_event_rows", get_all_new_forward_event_rows
)

def get_ex_outlier_stream_rows(self, last_id, current_id):
"""Returns de-outliered events, for the Events replication stream

Args:
last_id: the last stream_id from the previous batch.
current_id: the maximum stream_id to return up to

Returns: Deferred[List[Tuple]]
a list of events stream rows. Each tuple consists of a stream id as
the first element, followed by fields suitable for casting into an
EventsStreamRow.
"""

def get_ex_outlier_stream_rows_txn(txn):
sql = (
"SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
" state_key, redacts, relates_to_id"
Expand All @@ -1006,15 +1029,14 @@ def get_all_new_forward_event_rows(txn):
" LEFT JOIN event_relations USING (event_id)"
" WHERE ? < event_stream_ordering"
" AND event_stream_ordering <= ?"
" ORDER BY event_stream_ordering DESC"
" ORDER BY event_stream_ordering ASC"
)
txn.execute(sql, (last_id, upper_bound))
new_event_updates.extend(txn)

return new_event_updates
txn.execute(sql, (last_id, current_id))
return txn.fetchall()

return self.db.runInteraction(
"get_all_new_forward_event_rows", get_all_new_forward_event_rows
"get_ex_outlier_stream_rows", get_ex_outlier_stream_rows_txn
)

def get_all_new_backfill_event_rows(self, last_id, current_id, limit):
Expand Down