Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Cleanup opentracing logging for syncs (#10828)
Browse files Browse the repository at this point in the history
We added a bunch of spans in #10704, but this ended up adding a lot of
redundant spans for rooms where nothing changed, so instead we only
start the span if there might be something interesting going on.
  • Loading branch information
erikjohnston authored Sep 15, 2021
1 parent 474edce commit 9a6f4a6
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 126 deletions.
1 change: 1 addition & 0 deletions changelog.d/10828.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added opentrace logging to help debug #9424.
267 changes: 141 additions & 126 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1533,21 +1533,18 @@ async def _generate_sync_entry_for_rooms(
newly_left_rooms = room_changes.newly_left_rooms

async def handle_room_entries(room_entry: "RoomSyncResultBuilder"):
with start_active_span("generate_room_entry"):
set_tag("room_id", room_entry.room_id)
log_kv({"events": len(room_entry.events or [])})
logger.debug("Generating room entry for %s", room_entry.room_id)
res = await self._generate_room_entry(
sync_result_builder,
ignored_users,
room_entry,
ephemeral=ephemeral_by_room.get(room_entry.room_id, []),
tags=tags_by_room.get(room_entry.room_id),
account_data=account_data_by_room.get(room_entry.room_id, {}),
always_include=sync_result_builder.full_state,
)
logger.debug("Generated room entry for %s", room_entry.room_id)
return res
logger.debug("Generating room entry for %s", room_entry.room_id)
res = await self._generate_room_entry(
sync_result_builder,
ignored_users,
room_entry,
ephemeral=ephemeral_by_room.get(room_entry.room_id, []),
tags=tags_by_room.get(room_entry.room_id),
account_data=account_data_by_room.get(room_entry.room_id, {}),
always_include=sync_result_builder.full_state,
)
logger.debug("Generated room entry for %s", room_entry.room_id)
return res

await concurrently_execute(handle_room_entries, room_entries, 10)

Expand Down Expand Up @@ -1960,139 +1957,157 @@ async def _generate_room_entry(
room_id = room_builder.room_id
since_token = room_builder.since_token
upto_token = room_builder.upto_token
log_kv(
{
"since_token": since_token,
"upto_token": upto_token,
}
)

batch = await self._load_filtered_recents(
room_id,
sync_config,
now_token=upto_token,
since_token=since_token,
potential_recents=events,
newly_joined_room=newly_joined,
)
log_kv(
{
"batch_events": len(batch.events),
"prev_batch": batch.prev_batch,
"batch_limited": batch.limited,
}
)
with start_active_span("generate_room_entry"):
set_tag("room_id", room_id)
log_kv({"events": len(events or ())})

# Note: `batch` can be both empty and limited here in the case where
# `_load_filtered_recents` can't find any events the user should see
# (e.g. due to having ignored the sender of the last 50 events).
log_kv(
{
"since_token": since_token,
"upto_token": upto_token,
}
)

if newly_joined:
# debug for https://github.com/matrix-org/synapse/issues/4422
issue4422_logger.debug(
"Timeline events after filtering in newly-joined room %s: %r",
batch = await self._load_filtered_recents(
room_id,
batch,
sync_config,
now_token=upto_token,
since_token=since_token,
potential_recents=events,
newly_joined_room=newly_joined,
)
log_kv(
{
"batch_events": len(batch.events),
"prev_batch": batch.prev_batch,
"batch_limited": batch.limited,
}
)

# When we join the room (or the client requests full_state), we should
# send down any existing tags. Usually the user won't have tags in a
# newly joined room, unless either a) they've joined before or b) the
# tag was added by synapse e.g. for server notice rooms.
if full_state:
user_id = sync_result_builder.sync_config.user.to_string()
tags = await self.store.get_tags_for_room(user_id, room_id)
# Note: `batch` can be both empty and limited here in the case where
# `_load_filtered_recents` can't find any events the user should see
# (e.g. due to having ignored the sender of the last 50 events).

# If there aren't any tags, don't send the empty tags list down
# sync
if not tags:
tags = None
if newly_joined:
# debug for https://github.com/matrix-org/synapse/issues/4422
issue4422_logger.debug(
"Timeline events after filtering in newly-joined room %s: %r",
room_id,
batch,
)

account_data_events = []
if tags is not None:
account_data_events.append({"type": "m.tag", "content": {"tags": tags}})
# When we join the room (or the client requests full_state), we should
# send down any existing tags. Usually the user won't have tags in a
# newly joined room, unless either a) they've joined before or b) the
# tag was added by synapse e.g. for server notice rooms.
if full_state:
user_id = sync_result_builder.sync_config.user.to_string()
tags = await self.store.get_tags_for_room(user_id, room_id)

for account_data_type, content in account_data.items():
account_data_events.append({"type": account_data_type, "content": content})
# If there aren't any tags, don't send the empty tags list down
# sync
if not tags:
tags = None

account_data_events = sync_config.filter_collection.filter_room_account_data(
account_data_events
)
account_data_events = []
if tags is not None:
account_data_events.append({"type": "m.tag", "content": {"tags": tags}})

ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral)
for account_data_type, content in account_data.items():
account_data_events.append(
{"type": account_data_type, "content": content}
)

if not (
always_include or batch or account_data_events or ephemeral or full_state
):
return
account_data_events = (
sync_config.filter_collection.filter_room_account_data(
account_data_events
)
)

state = await self.compute_state_delta(
room_id, batch, sync_config, since_token, now_token, full_state=full_state
)
ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral)

summary: Optional[JsonDict] = {}

# we include a summary in room responses when we're lazy loading
# members (as the client otherwise doesn't have enough info to form
# the name itself).
if sync_config.filter_collection.lazy_load_members() and (
# we recalculate the summary:
# if there are membership changes in the timeline, or
# if membership has changed during a gappy sync, or
# if this is an initial sync.
any(ev.type == EventTypes.Member for ev in batch.events)
or (
# XXX: this may include false positives in the form of LL
# members which have snuck into state
batch.limited
and any(t == EventTypes.Member for (t, k) in state)
)
or since_token is None
):
summary = await self.compute_summary(
room_id, sync_config, batch, state, now_token
)
if not (
always_include
or batch
or account_data_events
or ephemeral
or full_state
):
return

if room_builder.rtype == "joined":
unread_notifications: Dict[str, int] = {}
room_sync = JoinedSyncResult(
room_id=room_id,
timeline=batch,
state=state,
ephemeral=ephemeral,
account_data=account_data_events,
unread_notifications=unread_notifications,
summary=summary,
unread_count=0,
state = await self.compute_state_delta(
room_id,
batch,
sync_config,
since_token,
now_token,
full_state=full_state,
)

if room_sync or always_include:
notifs = await self.unread_notifs_for_room_id(room_id, sync_config)
summary: Optional[JsonDict] = {}

# we include a summary in room responses when we're lazy loading
# members (as the client otherwise doesn't have enough info to form
# the name itself).
if sync_config.filter_collection.lazy_load_members() and (
# we recalculate the summary:
# if there are membership changes in the timeline, or
# if membership has changed during a gappy sync, or
# if this is an initial sync.
any(ev.type == EventTypes.Member for ev in batch.events)
or (
# XXX: this may include false positives in the form of LL
# members which have snuck into state
batch.limited
and any(t == EventTypes.Member for (t, k) in state)
)
or since_token is None
):
summary = await self.compute_summary(
room_id, sync_config, batch, state, now_token
)

if room_builder.rtype == "joined":
unread_notifications: Dict[str, int] = {}
room_sync = JoinedSyncResult(
room_id=room_id,
timeline=batch,
state=state,
ephemeral=ephemeral,
account_data=account_data_events,
unread_notifications=unread_notifications,
summary=summary,
unread_count=0,
)

unread_notifications["notification_count"] = notifs["notify_count"]
unread_notifications["highlight_count"] = notifs["highlight_count"]
if room_sync or always_include:
notifs = await self.unread_notifs_for_room_id(room_id, sync_config)

room_sync.unread_count = notifs["unread_count"]
unread_notifications["notification_count"] = notifs["notify_count"]
unread_notifications["highlight_count"] = notifs["highlight_count"]

sync_result_builder.joined.append(room_sync)
room_sync.unread_count = notifs["unread_count"]

if batch.limited and since_token:
user_id = sync_result_builder.sync_config.user.to_string()
logger.debug(
"Incremental gappy sync of %s for user %s with %d state events"
% (room_id, user_id, len(state))
sync_result_builder.joined.append(room_sync)

if batch.limited and since_token:
user_id = sync_result_builder.sync_config.user.to_string()
logger.debug(
"Incremental gappy sync of %s for user %s with %d state events"
% (room_id, user_id, len(state))
)
elif room_builder.rtype == "archived":
archived_room_sync = ArchivedSyncResult(
room_id=room_id,
timeline=batch,
state=state,
account_data=account_data_events,
)
elif room_builder.rtype == "archived":
archived_room_sync = ArchivedSyncResult(
room_id=room_id,
timeline=batch,
state=state,
account_data=account_data_events,
)
if archived_room_sync or always_include:
sync_result_builder.archived.append(archived_room_sync)
else:
raise Exception("Unrecognized rtype: %r", room_builder.rtype)
if archived_room_sync or always_include:
sync_result_builder.archived.append(archived_room_sync)
else:
raise Exception("Unrecognized rtype: %r", room_builder.rtype)

async def get_rooms_for_user_at(
self, user_id: str, room_key: RoomStreamToken
Expand Down

0 comments on commit 9a6f4a6

Please sign in to comment.