Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Prevent large ephemeral event traffic to appservices upon startup #10836

Open
anoadragon453 opened this issue Sep 16, 2021 · 2 comments
Open
Labels
A-Application-Service Related to AS support T-Enhancement New features, changes in functionality, improvements in performance, or user-facing enhancements. Z-Help-Wanted We know exactly how to fix this issue, and would be grateful for any contribution

Comments

@anoadragon453
Copy link
Member

@Half-Shot mentioned that upon start up, Synapse will send all missed ephemeral events to application services that have opted into receiving them (see MSC2409). There are some performance issues present with the feature's implementation in a couple of different circumstances.

Some background: a "stream token" is an incrementing integer. Every occurrence (a new typing event, a new read receipt, etc.) gets one of these IDs, and they're (usually rigidly) ordered in this way. Thus we can simply keep track of the last successfully sent stream token per app service to know what the AS needs to still receive.

  1. When the bridge is first added to the homeserver: it is assumed to have a last-successful stream token of 0:
    async def get_type_stream_id_for_appservice(
    self, service: ApplicationService, type: str
    ) -> int:
    if type not in ("read_receipt", "presence"):
    raise ValueError(
    "Expected type to be a valid application stream id type, got %s"
    % (type,)
    )
    def get_type_stream_id_for_appservice_txn(txn):
    stream_id_type = "%s_stream_id" % type
    txn.execute(
    # We do NOT want to escape `stream_id_type`.
    "SELECT %s FROM application_services_state WHERE as_id=?"
    % stream_id_type,
    (service.id,),
    )
    last_stream_id = txn.fetchone()
    if last_stream_id is None or last_stream_id[0] is None: # no row exists
    return 0
    else:
    return int(last_stream_id[0])
    return await self.db_pool.runInteraction(
    "get_type_stream_id_for_appservice", get_type_stream_id_for_appservice_txn
    )

When it comes time to send out the ephemeral events the appservice has missed, Synapse will send all read receipts and presence updates that it knows about and that are relevant to the namespaces the appservice has registered:

from_key = await self.store.get_type_stream_id_for_appservice(
service, "read_receipt"
)
receipts_source = self.event_sources.sources["receipt"]
receipts, _ = await receipts_source.get_new_events_as(
service=service, from_key=from_key
)
return receipts

This historical data isn't very useful to the appservice as it has just been registered.

  1. When a bridge has been offline for a while and comes back: it will receive all missed, relevant EDUs since it went offline. In particular, historical EDUs are typically less useful than the current state of things.

I'd be tempted to implement a time bound to the EDUs we send to appservices, such that very old EDUs are no longer considered.

Fixing both of these would help reduce the large amount of EDUs that can be sent to and overload both the appservice and Synapse itself when it tries to pull all of that information out of the database.

Finally, note that the above problems only seem to apply to the implementation of read receipts and presence events. Historical typing events are flat out not sent to appservices:

# We don't persist the token for typing_key for performance reasons

@anoadragon453 anoadragon453 added Z-Help-Wanted We know exactly how to fix this issue, and would be grateful for any contribution T-Enhancement New features, changes in functionality, improvements in performance, or user-facing enhancements. labels Sep 16, 2021
@anoadragon453
Copy link
Member Author

After some investigation by Scott Weber and myself, it appears that only the most recent 100 read receipts will be sent to the appservice:

async def get_new_events_as(
self, from_key: int, service: ApplicationService
) -> Tuple[List[JsonDict], int]:
"""Returns a set of new receipt events that an appservice
may be interested in.
Args:
from_key: the stream position at which events should be fetched from
service: The appservice which may be interested
"""
from_key = int(from_key)
to_key = self.get_current_key()
if from_key == to_key:
return [], to_key
# Fetch all read receipts for all rooms, up to a limit of 100. This is ordered
# by most recent.
rooms_to_events = await self.store.get_linearized_receipts_for_all_rooms(
from_key=from_key, to_key=to_key
)
# Then filter down to rooms that the AS can read
events = []
for room_id, event in rooms_to_events.items():
if not await service.matches_user_in_member_list(room_id, self.store):
continue
events.append(event)
return (events, to_key)

In contrast, presence events don't appear to be bounded. When a new presence update occurs, that event and the users that may potentially be interested in it are passed to _notify_app_services_ephemeral:

synapse/synapse/notifier.py

Lines 408 to 468 in ac22650

def on_new_event(
self,
stream_key: str,
new_token: Union[int, RoomStreamToken],
users: Optional[Collection[Union[str, UserID]]] = None,
rooms: Optional[Collection[str]] = None,
):
"""Used to inform listeners that something has happened event wise.
Will wake up all listeners for the given users and rooms.
Args:
stream_key: The stream the event came from.
new_token: The value of the new stream token.
users: The users that should be informed of the new event.
rooms: A collection of room IDs for which each joined member will be
informed of the new event.
"""
users = users or []
rooms = rooms or []
with Measure(self.clock, "on_new_event"):
user_streams = set()
log_kv(
{
"waking_up_explicit_users": len(users),
"waking_up_explicit_rooms": len(rooms),
}
)
for user in users:
user_stream = self.user_to_user_stream.get(str(user))
if user_stream is not None:
user_streams.add(user_stream)
for room in rooms:
user_streams |= self.room_to_user_streams.get(room, set())
if stream_key == "to_device_key":
issue9533_logger.debug(
"to-device messages stream id %s, awaking streams for %s",
new_token,
users,
)
time_now_ms = self.clock.time_msec()
for user_stream in user_streams:
try:
user_stream.notify(stream_key, new_token, time_now_ms)
except Exception:
logger.exception("Failed to notify listener")
self.notify_replication()
# Notify appservices
self._notify_app_services_ephemeral(
stream_key,
new_token,
users,
)

A few levels down, ApplicationServicesHandler._handle_presence is called with each of the homeserver's registered application services and those users that were interested in the presence update:

async def _handle_presence(
self, service: ApplicationService, users: Collection[Union[str, UserID]]
) -> List[JsonDict]:
events: List[JsonDict] = []
presence_source = self.event_sources.sources["presence"]
from_key = await self.store.get_type_stream_id_for_appservice(
service, "presence"
)
for user in users:
if isinstance(user, str):
user = UserID.from_string(user)
interested = await service.is_interested_in_presence(user, self.store)
if not interested:
continue
presence_events, _ = await presence_source.get_new_events(
user=user,
service=service,
from_key=from_key,
)
time_now = self.clock.time_msec()
events.extend(
{
"type": "m.presence",
"sender": event.user_id,
"content": format_user_presence_state(
event, time_now, include_user_id=False
),
}
for event in presence_events
)
return events
async def _handle_device_list_updates(
self,
service: ApplicationService,
users: Collection[Union[str, UserID]],
) -> List[JsonDict]:
events: List[JsonDict] = []
async def query_user_exists(self, user_id: str) -> bool:
"""Check if any application service knows this user_id exists.
Args:

At this point, a few things happen:

  1. We retrieve the latest stream token for which this application service has received presence updates up until. Note above that this may be 0.
  2. For each user who was interested in the presence update, we receive all presence updates that are destined for this user given the stream token. This is the same process that occurs when a user syncs with a token and gets all presence updates since the last time they sync'd. Note that multiple presence updates from the same user are deduplicated here.
  3. Finally, every presence update destined for every relevant user are appended together and sent off to the application service.

There's a couple problems with this process:

  1. If the application service has just been registered (it's stream token is 0) or has been offline for a while (it's stream token is old), then there is potential for a large amount of presence updates to be sent to the application service.
    • This could be solved by placing a limit on the amount of presence updates that can be sent out at any given time to an application service; much like the implementation of read receipts above.
    • Presence states for users are refreshed something like every 25m, and each time they change as well. After that point, the application service should have a fully-updated idea of presence state; even if not immediately. This seems worth the performance tradeoff of sending all presence state all at once on startup.
  2. If the application service has multiple users, then it's possible (and likely) that each of those separate users are each set to receive the same presence updates. The application service will receive all of them, without deduplication.
    • This could be partially solved by deduplicating updates before they're sent out, but what about deduplication across multiple calls of _handle_presence? Well, this is mostly fine as the stream token for the application service is updated after _handle_presence has finished executing.
    • But if multiple calls to _handle_presence occur simultaneously, then there is potential for more duplication of updates. This would probably be fixed by an addition of a Linearizer, as mentioned here. With some extra logic to ensure that subsequent calls to notify_interested_services_ephemeral always have an higher stream token value than the last.

@anoadragon453
Copy link
Member Author

@turt2live notes that normal events can come in large bursts as well when an appservice has been offline.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
A-Application-Service Related to AS support T-Enhancement New features, changes in functionality, improvements in performance, or user-facing enhancements. Z-Help-Wanted We know exactly how to fix this issue, and would be grateful for any contribution
Projects
None yet
Development

No branches or pull requests

2 participants