From 522c29bfc7532f5fb9a4c4af86c9d9c9901c1a0e Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 22 Jul 2022 22:29:18 -0500 Subject: [PATCH 01/18] Instrument /messages for understandable traces --- synapse/api/auth.py | 8 +++++++- synapse/federation/federation_client.py | 2 ++ synapse/handlers/federation.py | 2 ++ synapse/handlers/federation_event.py | 5 +++++ synapse/handlers/pagination.py | 2 ++ synapse/handlers/relations.py | 2 ++ synapse/storage/controllers/persist_events.py | 2 +- synapse/storage/controllers/state.py | 5 +++++ synapse/storage/databases/main/stream.py | 2 ++ synapse/visibility.py | 2 ++ 10 files changed, 30 insertions(+), 2 deletions(-) diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 6e6eaf3805bd..c6f8733e6017 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -30,7 +30,12 @@ from synapse.appservice import ApplicationService from synapse.http import get_request_user_agent from synapse.http.site import SynapseRequest -from synapse.logging.opentracing import active_span, force_tracing, start_active_span +from synapse.logging.opentracing import ( + active_span, + force_tracing, + start_active_span, + trace, +) from synapse.storage.databases.main.registration import TokenLookupResult from synapse.types import Requester, UserID, create_requester @@ -563,6 +568,7 @@ def get_access_token_from_request(request: Request) -> str: return query_params[0].decode("ascii") + @trace async def check_user_in_room_or_world_readable( self, room_id: str, user_id: str, allow_departed_users: bool = False ) -> Tuple[str, Optional[str]]: diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 842f5327c227..c1f96328b44a 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -61,6 +61,7 @@ ) from synapse.federation.transport.client import SendJoinResponse from synapse.http.types import QueryParams +from synapse.logging.opentracing import trace from synapse.types import JsonDict, UserID, get_domain_from_id from synapse.util.async_helpers import concurrently_execute from synapse.util.caches.expiringcache import ExpiringCache @@ -233,6 +234,7 @@ async def claim_client_keys( destination, content, timeout ) + @trace async def backfill( self, dest: str, room_id: str, limit: int, extremities: Collection[str] ) -> Optional[List[EventBase]]: diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 3b5eaf515624..59580ef93e55 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -59,6 +59,7 @@ from synapse.federation.federation_client import InvalidResponseError from synapse.http.servlet import assert_params_in_dict from synapse.logging.context import nested_logging_context +from synapse.logging.opentracing import trace from synapse.metrics.background_process_metrics import run_as_background_process from synapse.module_api import NOT_SPAM from synapse.replication.http.federation import ( @@ -180,6 +181,7 @@ def __init__(self, hs: "HomeServer"): "resume_sync_partial_state_room", self._resume_sync_partial_state_room ) + @trace async def maybe_backfill( self, room_id: str, current_depth: int, limit: int ) -> bool: diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 16f20c8be7bc..4429319265db 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -59,6 +59,7 @@ from synapse.events.snapshot import EventContext from synapse.federation.federation_client import InvalidResponseError from synapse.logging.context import nested_logging_context +from synapse.logging.opentracing import trace from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet from synapse.replication.http.federation import ( @@ -560,6 +561,7 @@ async def update_state_for_partial_state_event( event.event_id ) + @trace async def backfill( self, dest: str, room_id: str, limit: int, extremities: Collection[str] ) -> None: @@ -604,6 +606,7 @@ async def backfill( backfilled=True, ) + @trace async def _get_missing_events_for_pdu( self, origin: str, pdu: EventBase, prevs: Set[str], min_depth: int ) -> None: @@ -704,6 +707,7 @@ async def _get_missing_events_for_pdu( logger.info("Got %d prev_events", len(missing_events)) await self._process_pulled_events(origin, missing_events, backfilled=False) + @trace async def _process_pulled_events( self, origin: str, events: Iterable[EventBase], backfilled: bool ) -> None: @@ -742,6 +746,7 @@ async def _process_pulled_events( with nested_logging_context(ev.event_id): await self._process_pulled_event(origin, ev, backfilled=backfilled) + @trace async def _process_pulled_event( self, origin: str, event: EventBase, backfilled: bool ) -> None: diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 6262a35822f3..e1e34e3b16ba 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -24,6 +24,7 @@ from synapse.api.filtering import Filter from synapse.events.utils import SerializeEventConfig from synapse.handlers.room import ShutdownRoomResponse +from synapse.logging.opentracing import trace from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.state import StateFilter from synapse.streams.config import PaginationConfig @@ -416,6 +417,7 @@ async def purge_room(self, room_id: str, force: bool = False) -> None: await self._storage_controllers.purge_events.purge_room(room_id) + @trace async def get_messages( self, requester: Requester, diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py index 0b63cd218615..a02fc45e710d 100644 --- a/synapse/handlers/relations.py +++ b/synapse/handlers/relations.py @@ -19,6 +19,7 @@ from synapse.api.constants import RelationTypes from synapse.api.errors import SynapseError from synapse.events import EventBase, relation_from_event +from synapse.logging.opentracing import trace from synapse.storage.databases.main.relations import _RelatedEvent from synapse.types import JsonDict, Requester, StreamToken, UserID from synapse.visibility import filter_events_for_client @@ -364,6 +365,7 @@ async def _get_threads_for_events( return results + @trace async def get_bundled_aggregations( self, events: Iterable[EventBase], user_id: str ) -> Dict[str, BundledAggregations]: diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py index cf98b0ab48f8..b4b904ff1d0a 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py @@ -778,7 +778,7 @@ async def _calculate_new_extremities( stale_forward_extremities_counter.observe(len(stale)) return result - + async def _get_new_state_after_events( self, room_id: str, diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py index e08f956e6ef7..f584e6c92e2c 100644 --- a/synapse/storage/controllers/state.py +++ b/synapse/storage/controllers/state.py @@ -29,6 +29,7 @@ from synapse.api.constants import EventTypes from synapse.events import EventBase +from synapse.logging.opentracing import trace from synapse.storage.state import StateFilter from synapse.storage.util.partial_state_events_tracker import ( PartialCurrentStateTracker, @@ -175,6 +176,7 @@ def _get_state_groups_from_groups( return self.stores.state._get_state_groups_from_groups(groups, state_filter) + @trace async def get_state_for_events( self, event_ids: Collection[str], state_filter: Optional[StateFilter] = None ) -> Dict[str, StateMap[EventBase]]: @@ -221,6 +223,7 @@ async def get_state_for_events( return {event: event_to_state[event] for event in event_ids} + @trace async def get_state_ids_for_events( self, event_ids: Collection[str], @@ -283,6 +286,7 @@ async def get_state_for_event( ) return state_map[event_id] + @trace async def get_state_ids_for_event( self, event_id: str, state_filter: Optional[StateFilter] = None ) -> StateMap[str]: @@ -323,6 +327,7 @@ def get_state_for_groups( groups, state_filter or StateFilter.all() ) + @trace async def get_state_group_for_events( self, event_ids: Collection[str], diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 2590b52f7352..a347430aa7e3 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -58,6 +58,7 @@ from synapse.api.filtering import Filter from synapse.events import EventBase from synapse.logging.context import make_deferred_yieldable, run_in_background +from synapse.logging.opentracing import trace from synapse.storage._base import SQLBaseStore from synapse.storage.database import ( DatabasePool, @@ -1346,6 +1347,7 @@ def _paginate_room_events_txn( return rows, next_token + @trace async def paginate_room_events( self, room_id: str, diff --git a/synapse/visibility.py b/synapse/visibility.py index 9abbaa5a6464..d947edde66dd 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -23,6 +23,7 @@ from synapse.events import EventBase from synapse.events.snapshot import EventContext from synapse.events.utils import prune_event +from synapse.logging.opentracing import trace from synapse.storage.controllers import StorageControllers from synapse.storage.databases.main import DataStore from synapse.storage.state import StateFilter @@ -51,6 +52,7 @@ _HISTORY_VIS_KEY: Final[Tuple[str, str]] = (EventTypes.RoomHistoryVisibility, "") +@trace async def filter_events_for_client( storage: StorageControllers, user_id: str, From b6a18d2822fe79b157c4a17d75575adfbe6d44d1 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 2 Aug 2022 20:11:07 -0500 Subject: [PATCH 02/18] Trace in Complement --- docker/conf/homeserver.yaml | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/docker/conf/homeserver.yaml b/docker/conf/homeserver.yaml index f10f78a48cd2..55977017e354 100644 --- a/docker/conf/homeserver.yaml +++ b/docker/conf/homeserver.yaml @@ -184,3 +184,21 @@ trusted_key_servers: password_config: enabled: true + + +# foo +tracing: + enabled: true + sample_rate: 1 + jaeger_exporter_config: + agent_host_name: host.docker.internal + agent_port: 6831 + # Split UDP packets (UDP_PACKET_MAX_LENGTH is set to 65k in OpenTelemetry) + udp_split_oversized_batches: true + # If you define a collector, it will communicate directly to the collector, + # bypassing the agent + # + # It does not seem like the agent can keep up with the massive UDP load + # (1065 spans in one trace) so lets just use the HTTP collector endpoint + # instead which seems to work. + collector_endpoint: "http://host.docker.internal:14268/api/traces?format=jaeger.thrift" From 9cd63206fbbd3e4a94491fab6fde2a235363f4d8 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 2 Aug 2022 20:23:14 -0500 Subject: [PATCH 03/18] Fix imports after OTEL changes --- synapse/federation/federation_client.py | 2 +- synapse/handlers/federation.py | 2 +- synapse/handlers/federation_event.py | 2 +- synapse/handlers/pagination.py | 2 +- synapse/handlers/relations.py | 2 +- synapse/storage/controllers/state.py | 2 +- synapse/storage/databases/main/stream.py | 2 +- synapse/visibility.py | 2 +- 8 files changed, 8 insertions(+), 8 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 54ffbd817095..dfd0bc414e6c 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -61,7 +61,7 @@ ) from synapse.federation.transport.client import SendJoinResponse from synapse.http.types import QueryParams -from synapse.logging.opentracing import trace +from synapse.logging.tracing import trace from synapse.types import JsonDict, UserID, get_domain_from_id from synapse.util.async_helpers import concurrently_execute from synapse.util.caches.expiringcache import ExpiringCache diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 30f1585a8594..0aa4878f5b8b 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -59,7 +59,7 @@ from synapse.federation.federation_client import InvalidResponseError from synapse.http.servlet import assert_params_in_dict from synapse.logging.context import nested_logging_context -from synapse.logging.opentracing import trace +from synapse.logging.tracing import trace from synapse.metrics.background_process_metrics import run_as_background_process from synapse.module_api import NOT_SPAM from synapse.replication.http.federation import ( diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 8968b705d43a..740a04aad61d 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -59,7 +59,7 @@ from synapse.events.snapshot import EventContext from synapse.federation.federation_client import InvalidResponseError from synapse.logging.context import nested_logging_context -from synapse.logging.opentracing import trace +from synapse.logging.tracing import trace from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet from synapse.replication.http.federation import ( diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index e1e34e3b16ba..14ba19d4f88c 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -24,7 +24,7 @@ from synapse.api.filtering import Filter from synapse.events.utils import SerializeEventConfig from synapse.handlers.room import ShutdownRoomResponse -from synapse.logging.opentracing import trace +from synapse.logging.tracing import trace from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.state import StateFilter from synapse.streams.config import PaginationConfig diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py index 72d25df8c88c..41c64d62c8ba 100644 --- a/synapse/handlers/relations.py +++ b/synapse/handlers/relations.py @@ -19,7 +19,7 @@ from synapse.api.constants import RelationTypes from synapse.api.errors import SynapseError from synapse.events import EventBase, relation_from_event -from synapse.logging.opentracing import trace +from synapse.logging.tracing import trace from synapse.storage.databases.main.relations import _RelatedEvent from synapse.types import JsonDict, Requester, StreamToken, UserID from synapse.visibility import filter_events_for_client diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py index 0d480f101432..7b2084664cdf 100644 --- a/synapse/storage/controllers/state.py +++ b/synapse/storage/controllers/state.py @@ -29,7 +29,7 @@ from synapse.api.constants import EventTypes from synapse.events import EventBase -from synapse.logging.opentracing import trace +from synapse.logging.tracing import trace from synapse.storage.state import StateFilter from synapse.storage.util.partial_state_events_tracker import ( PartialCurrentStateTracker, diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index a347430aa7e3..f61f29054761 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -58,7 +58,7 @@ from synapse.api.filtering import Filter from synapse.events import EventBase from synapse.logging.context import make_deferred_yieldable, run_in_background -from synapse.logging.opentracing import trace +from synapse.logging.tracing import trace from synapse.storage._base import SQLBaseStore from synapse.storage.database import ( DatabasePool, diff --git a/synapse/visibility.py b/synapse/visibility.py index d947edde66dd..813fe1a15586 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -23,7 +23,7 @@ from synapse.events import EventBase from synapse.events.snapshot import EventContext from synapse.events.utils import prune_event -from synapse.logging.opentracing import trace +from synapse.logging.tracing import trace from synapse.storage.controllers import StorageControllers from synapse.storage.databases.main import DataStore from synapse.storage.state import StateFilter From 9f691824d9e2a14a121c67ead4e174dcc9ae787d Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 3 Aug 2022 17:18:15 -0500 Subject: [PATCH 04/18] Move Twisted git install where it was before --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 989d1b3a6908..6c81ab8efc41 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -119,6 +119,7 @@ signedjson = "^1.1.0" service-identity = ">=18.1.0" # Twisted 18.9 introduces some logger improvements that the structured # logger utilises +twisted = {git = "https://github.com/twisted/twisted.git", rev = "trunk"} treq = ">=15.1" # Twisted has required pyopenssl 16.0 since about Twisted 16.6. pyOpenSSL = ">=16.0.0" @@ -182,7 +183,6 @@ idna = { version = ">=2.5", optional = true } opentelemetry-api = {version = "^1.11.1", optional = true} opentelemetry-sdk = {version = "^1.11.1", optional = true} opentelemetry-exporter-jaeger = {version = "^1.11.1", optional = true} -twisted = {git = "https://github.com/twisted/twisted.git", rev = "trunk"} [tool.poetry.extras] # NB: Packages that should be part of `pip install matrix-synapse[all]` need to be specified From 2f752877fc80443013e2d74c7346681811221ecd Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 3 Aug 2022 18:30:23 -0500 Subject: [PATCH 05/18] Fix @tag_args being one-off (ahead) --- synapse/logging/tracing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/logging/tracing.py b/synapse/logging/tracing.py index e3a1a010a251..109ae185e10b 100644 --- a/synapse/logging/tracing.py +++ b/synapse/logging/tracing.py @@ -876,7 +876,7 @@ def tag_args(func: Callable[P, R]) -> Callable[P, R]: def _tag_args_inner(*args: P.args, **kwargs: P.kwargs) -> R: argspec = inspect.getfullargspec(func) for i, arg in enumerate(argspec.args[1:]): - set_attribute("ARG_" + arg, str(args[i])) # type: ignore[index] + set_attribute("ARG_" + arg, str(args[i + 1])) # type: ignore[index] set_attribute("args", str(args[len(argspec.args) :])) # type: ignore[index] set_attribute("kwargs", str(kwargs)) return func(*args, **kwargs) From fdce1c2ec317b8ecafde8c1f272f7062bccac85a Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 3 Aug 2022 20:50:44 -0500 Subject: [PATCH 06/18] Allow @trace and @tag_args to be used together --- synapse/logging/tracing.py | 144 +++++++++++------- .../databases/main/event_federation.py | 3 + 2 files changed, 96 insertions(+), 51 deletions(-) diff --git a/synapse/logging/tracing.py b/synapse/logging/tracing.py index 109ae185e10b..552f5f8504f5 100644 --- a/synapse/logging/tracing.py +++ b/synapse/logging/tracing.py @@ -166,6 +166,7 @@ def set_fates(clotho, lachesis, atropos, father="Zues", mother="Themis"): from typing import ( TYPE_CHECKING, Any, + Awaitable, Callable, ContextManager, Dict, @@ -789,67 +790,108 @@ def extract_text_map( # Tracing decorators -def trace_with_opname(opname: str) -> Callable[[Callable[P, R]], Callable[P, R]]: +def create_decorator( + func: Callable[P, R], + # TODO: What is the correct type for these `Any`? `P.args, P.kwargs` isn't allowed here + wrapping_logic: Callable[[Callable[P, R], Any, Any], ContextManager[None]], +) -> Callable[P, R]: """ - Decorator to trace a function with a custom opname. + Creates a decorator that is able to handle sync functions, async functions + (coroutines), and inlineDeferred from Twisted. + + Example usage: + ```py + # Decorator to time the functiona and log it out + def duration(func: Callable[P, R]) -> Callable[P, R]: + @contextlib.contextmanager + def _wrapping_logic(func: Callable[P, R], *args: P.args, **kwargs: P.kwargs): + start_ts = time.time() + yield + end_ts = time.time() + duration = end_ts - start_ts + logger.info("%s took %s seconds", func.__name__, duration) - See the module's doc string for usage examples. + return create_decorator(func, _wrapping_logic) + ``` + Args: + func: The function to be decorated + wrapping_logic: The business logic of your custom decorator. + This should be a ContextManager so you are able to run your logic + before/after the function as desired. """ - def decorator(func: Callable[P, R]) -> Callable[P, R]: - if opentelemetry is None: - return func # type: ignore[unreachable] - + @wraps(func) + async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R: if inspect.iscoroutinefunction(func): - - @wraps(func) - async def _trace_inner(*args: P.args, **kwargs: P.kwargs) -> R: - with start_active_span(opname): - return await func(*args, **kwargs) # type: ignore[misc] - + with wrapping_logic(func, *args, **kwargs): + return await func(*args, **kwargs) else: # The other case here handles both sync functions and those # decorated with inlineDeferred. - @wraps(func) - def _trace_inner(*args: P.args, **kwargs: P.kwargs) -> R: - scope = start_active_span(opname) - scope.__enter__() - - try: - result = func(*args, **kwargs) - if isinstance(result, defer.Deferred): - - def call_back(result: R) -> R: - scope.__exit__(None, None, None) - return result - - def err_back(result: R) -> R: - scope.__exit__(None, None, None) - return result - - result.addCallbacks(call_back, err_back) - - else: - if inspect.isawaitable(result): - logger.error( - "@trace may not have wrapped %s correctly! " - "The function is not async but returned a %s.", - func.__qualname__, - type(result).__name__, - ) + scope = wrapping_logic(func, *args, **kwargs) + scope.__enter__() + + try: + result = func(*args, **kwargs) + if isinstance(result, defer.Deferred): + + def call_back(result: R) -> R: + scope.__exit__(None, None, None) + return result + def err_back(result: R) -> R: scope.__exit__(None, None, None) + return result - return result + result.addCallbacks(call_back, err_back) - except Exception as e: - scope.__exit__(type(e), None, e.__traceback__) - raise + else: + if inspect.isawaitable(result): + logger.error( + "@trace may not have wrapped %s correctly! " + "The function is not async but returned a %s.", + func.__qualname__, + type(result).__name__, + ) - return _trace_inner # type: ignore[return-value] + scope.__exit__(None, None, None) - return decorator + return result + + except Exception as e: + scope.__exit__(type(e), None, e.__traceback__) + raise + + return _wrapper # type: ignore[return-value] + + +def trace_with_opname(opname: str) -> Callable[[Callable[P, R]], Callable[P, R]]: + """ + Decorator to trace a function with a custom opname. + + See the module's doc string for usage examples. + """ + + @contextlib.contextmanager + def _wrapping_logic(func: Callable[P, R], *args: P.args, **kwargs: P.kwargs): + if opentelemetry is None: + return None + + scope = start_active_span(opname) + scope.__enter__() + try: + yield + except Exception as e: + scope.__exit__(type(e), None, e.__traceback__) + raise + finally: + scope.__exit__(None, None, None) + + def _decorator(func: Callable[P, R]): + return create_decorator(func, _wrapping_logic) + + return _decorator def trace(func: Callable[P, R]) -> Callable[P, R]: @@ -866,22 +908,22 @@ def trace(func: Callable[P, R]) -> Callable[P, R]: def tag_args(func: Callable[P, R]) -> Callable[P, R]: """ - Tags all of the args to the active span. + Decorator to tag all of the args to the active span. """ if not opentelemetry: return func - @wraps(func) - def _tag_args_inner(*args: P.args, **kwargs: P.kwargs) -> R: + @contextlib.contextmanager + def _wrapping_logic(func: Callable[P, R], *args: P.args, **kwargs: P.kwargs): argspec = inspect.getfullargspec(func) for i, arg in enumerate(argspec.args[1:]): set_attribute("ARG_" + arg, str(args[i + 1])) # type: ignore[index] set_attribute("args", str(args[len(argspec.args) :])) # type: ignore[index] set_attribute("kwargs", str(kwargs)) - return func(*args, **kwargs) + yield - return _tag_args_inner + return create_decorator(func, _wrapping_logic) @contextlib.contextmanager diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index eec55b647857..22e72a31de4e 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -33,6 +33,7 @@ from synapse.api.errors import StoreError from synapse.api.room_versions import EventFormatVersions, RoomVersion from synapse.events import EventBase, make_event_from_dict +from synapse.logging.tracing import tag_args, trace from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause from synapse.storage.database import ( @@ -709,6 +710,8 @@ def _get_auth_chain_difference_txn( # Return all events where not all sets can reach them. return {eid for eid, n in event_to_missing_sets.items() if n} + @trace + @tag_args async def get_oldest_event_ids_with_depth_in_room( self, room_id: str ) -> List[Tuple[str, int]]: From a7eabb78a2b22ca4ce5fd5838b77a84530d109f4 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 4 Aug 2022 01:24:58 -0500 Subject: [PATCH 07/18] Trace more --- synapse/federation/federation_client.py | 3 ++- synapse/handlers/federation.py | 5 +++-- synapse/logging/tracing.py | 4 ++-- synapse/storage/controllers/state.py | 1 + synapse/storage/databases/main/event_federation.py | 3 +++ 5 files changed, 11 insertions(+), 5 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index dfd0bc414e6c..76b39408ec09 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -61,7 +61,7 @@ ) from synapse.federation.transport.client import SendJoinResponse from synapse.http.types import QueryParams -from synapse.logging.tracing import trace +from synapse.logging.tracing import trace, tag_args from synapse.types import JsonDict, UserID, get_domain_from_id from synapse.util.async_helpers import concurrently_execute from synapse.util.caches.expiringcache import ExpiringCache @@ -235,6 +235,7 @@ async def claim_client_keys( ) @trace + @tag_args async def backfill( self, dest: str, room_id: str, limit: int, extremities: Collection[str] ) -> Optional[List[EventBase]]: diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0aa4878f5b8b..0cd9597dc544 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -59,7 +59,7 @@ from synapse.federation.federation_client import InvalidResponseError from synapse.http.servlet import assert_params_in_dict from synapse.logging.context import nested_logging_context -from synapse.logging.tracing import trace +from synapse.logging.tracing import trace, set_attribute from synapse.metrics.background_process_metrics import run_as_background_process from synapse.module_api import NOT_SPAM from synapse.replication.http.federation import ( @@ -319,7 +319,8 @@ async def _maybe_backfill_inner( # attempting to paginate before backfill reached the visible history. extremities_to_request: List[str] = [] - for bp in sorted_backfill_points: + for i, bp in enumerate(sorted_backfill_points): + set_attribute("backfill_point" + str(i), str(bp)) if len(extremities_to_request) >= 5: break diff --git a/synapse/logging/tracing.py b/synapse/logging/tracing.py index 552f5f8504f5..2f4555cc2eb0 100644 --- a/synapse/logging/tracing.py +++ b/synapse/logging/tracing.py @@ -917,8 +917,8 @@ def tag_args(func: Callable[P, R]) -> Callable[P, R]: @contextlib.contextmanager def _wrapping_logic(func: Callable[P, R], *args: P.args, **kwargs: P.kwargs): argspec = inspect.getfullargspec(func) - for i, arg in enumerate(argspec.args[1:]): - set_attribute("ARG_" + arg, str(args[i + 1])) # type: ignore[index] + for i, arg in enumerate(args[1:]): + set_attribute("ARG_" + argspec.args[i + 1], str(arg)) # type: ignore[index] set_attribute("args", str(args[len(argspec.args) :])) # type: ignore[index] set_attribute("kwargs", str(kwargs)) yield diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py index 7b2084664cdf..7fb12f4df6cc 100644 --- a/synapse/storage/controllers/state.py +++ b/synapse/storage/controllers/state.py @@ -473,6 +473,7 @@ async def get_current_state_deltas( prev_stream_id, max_stream_id ) + @trace async def get_current_state( self, room_id: str, state_filter: Optional[StateFilter] = None ) -> StateMap[EventBase]: diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 22e72a31de4e..3fcb2069df36 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -770,6 +770,7 @@ def get_oldest_event_ids_with_depth_in_room_txn( room_id, ) + @trace async def get_insertion_event_backward_extremities_in_room( self, room_id: str ) -> List[Tuple[str, int]]: @@ -1342,6 +1343,8 @@ def _get_missing_events( event_results.reverse() return event_results + @trace + @tag_args async def get_successor_events(self, event_id: str) -> List[str]: """Fetch all events that have the given event as a prev event From 13855c59160b472de2eabd5f9363e9cfbf3e3439 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 5 Aug 2022 20:44:21 -0500 Subject: [PATCH 08/18] More tracing for federated side --- poetry.lock | 28 +++--- pyproject.toml | 2 +- synapse/handlers/federation_event.py | 28 +++++- synapse/logging/tracing.py | 85 ++++++++++--------- synapse/storage/controllers/persist_events.py | 17 +++- .../databases/main/event_federation.py | 1 + synapse/storage/databases/main/events.py | 2 + .../storage/databases/main/events_worker.py | 5 ++ 8 files changed, 116 insertions(+), 52 deletions(-) diff --git a/poetry.lock b/poetry.lock index a78ceb0ae5f6..5fa6db6bad27 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1267,17 +1267,22 @@ telegram = ["requests"] [[package]] name = "treq" -version = "15.1.0" -description = "A requests-like API built on top of twisted.web's Agent" +version = "22.2.0" +description = "High-level Twisted HTTP Client API" category = "main" optional = false -python-versions = "*" +python-versions = ">=3.6" [package.dependencies] -pyOpenSSL = {version = ">=0.15.1", markers = "python_version > \"3.0\""} +attrs = "*" +hyperlink = ">=21.0.0" +incremental = "*" requests = ">=2.1.0" -service_identity = ">=14.0.0" -Twisted = {version = ">=15.5.0", markers = "python_version > \"3.0\""} +Twisted = {version = ">=18.7.0", extras = ["tls"]} + +[package.extras] +dev = ["pep8", "pyflakes", "httpbin (==0.5.0)"] +docs = ["sphinx (>=1.4.8)"] [[package]] name = "twine" @@ -1313,7 +1318,10 @@ attrs = ">=19.2.0" Automat = ">=0.8.0" constantly = ">=15.1" hyperlink = ">=17.1.1" +idna = {version = ">=2.4", optional = true, markers = "extra == \"tls\""} incremental = ">=21.3.0" +pyopenssl = {version = ">=21.0.0", optional = true, markers = "extra == \"tls\""} +service-identity = {version = ">=18.1.0", optional = true, markers = "extra == \"tls\""} twisted-iocpsupport = {version = ">=1.0.2,<2", markers = "platform_system == \"Windows\""} typing-extensions = ">=3.6.5" "zope.interface" = ">=4.4.2" @@ -1339,7 +1347,7 @@ windows_platform = ["pywin32 (!=226)", "cython-test-exception-raiser (>=1.0.2,<2 type = "git" url = "https://github.com/twisted/twisted.git" reference = "trunk" -resolved_reference = "ff2ea6181f7ca4887adbaf4158b2fe0891e17ef9" +resolved_reference = "b249a121afffefa3d9d9ab5b7a1315c5a1bb454d" [[package]] name = "twisted-iocpsupport" @@ -1615,7 +1623,7 @@ url_preview = ["lxml"] [metadata] lock-version = "1.1" python-versions = "^3.7.1" -content-hash = "c2cfbb348a49e088c404148c1b682fc5af5abb6278cf4479c6a51fff1656328c" +content-hash = "94116a568c9ab41174ec66c60cb0cb783e349bf586352b1fab08c714e5191665" [metadata.files] attrs = [ @@ -2642,8 +2650,8 @@ tqdm = [ {file = "tqdm-4.63.0.tar.gz", hash = "sha256:1d9835ede8e394bb8c9dcbffbca02d717217113adc679236873eeaac5bc0b3cd"}, ] treq = [ - {file = "treq-15.1.0-py2.py3-none-any.whl", hash = "sha256:1ad1ba89ddc62ae877084b290bd327755b13f6e7bc7076dc4d8e2efb701bfd63"}, - {file = "treq-15.1.0.tar.gz", hash = "sha256:425a47d5d52a993d51211028fb6ade252e5fbea094e878bb4b644096a7322de8"}, + {file = "treq-22.2.0-py3-none-any.whl", hash = "sha256:27d95b07c5c14be3e7b280416139b036087617ad5595be913b1f9b3ce981b9b2"}, + {file = "treq-22.2.0.tar.gz", hash = "sha256:df757e3f141fc782ede076a604521194ffcb40fa2645cf48e5a37060307f52ec"}, ] twine = [ {file = "twine-3.8.0-py3-none-any.whl", hash = "sha256:d0550fca9dc19f3d5e8eadfce0c227294df0a2a951251a4385797c8a6198b7c8"}, diff --git a/pyproject.toml b/pyproject.toml index 6c81ab8efc41..a6138957cab6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -119,7 +119,7 @@ signedjson = "^1.1.0" service-identity = ">=18.1.0" # Twisted 18.9 introduces some logger improvements that the structured # logger utilises -twisted = {git = "https://github.com/twisted/twisted.git", rev = "trunk"} +twisted = {git = "https://github.com/twisted/twisted.git", rev = "trunk", extras = ["tls"]} treq = ">=15.1" # Twisted has required pyopenssl 16.0 since about Twisted 16.6. pyOpenSSL = ">=16.0.0" diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 740a04aad61d..60436fe497ff 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -59,7 +59,7 @@ from synapse.events.snapshot import EventContext from synapse.federation.federation_client import InvalidResponseError from synapse.logging.context import nested_logging_context -from synapse.logging.tracing import trace +from synapse.logging.tracing import trace, tag_args, set_attribute, SynapseTags from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet from synapse.replication.http.federation import ( @@ -410,6 +410,7 @@ async def check_join_restrictions( prev_member_event, ) + @trace async def process_remote_join( self, origin: str, @@ -753,6 +754,7 @@ async def _process_pulled_events( await self._process_pulled_event(origin, ev, backfilled=backfilled) @trace + @tag_args async def _process_pulled_event( self, origin: str, event: EventBase, backfilled: bool ) -> None: @@ -854,6 +856,7 @@ async def _process_pulled_event( else: raise + @trace async def _compute_event_context_with_maybe_missing_prevs( self, dest: str, event: EventBase ) -> EventContext: @@ -970,6 +973,8 @@ async def _compute_event_context_with_maybe_missing_prevs( event, state_ids_before_event=state_map, partial_state=partial_state ) + @trace + @tag_args async def _get_state_ids_after_missing_prev_event( self, destination: str, @@ -1112,6 +1117,8 @@ async def _get_state_ids_after_missing_prev_event( return state_map + @trace + @tag_args async def _get_state_and_persist( self, destination: str, room_id: str, event_id: str ) -> None: @@ -1133,6 +1140,7 @@ async def _get_state_and_persist( destination=destination, room_id=room_id, event_ids=(event_id,) ) + @trace async def _process_received_pdu( self, origin: str, @@ -1283,6 +1291,7 @@ async def _resync_device(self, sender: str) -> None: except Exception: logger.exception("Failed to resync device for %s", sender) + @trace async def _handle_marker_event(self, origin: str, marker_event: EventBase) -> None: """Handles backfilling the insertion event when we receive a marker event that points to one. @@ -1414,6 +1423,8 @@ async def backfill_event_id( return event_from_response + @trace + @tag_args async def _get_events_and_persist( self, destination: str, room_id: str, event_ids: Collection[str] ) -> None: @@ -1459,6 +1470,7 @@ async def get_event(event_id: str) -> None: logger.info("Fetched %i events of %i requested", len(events), len(event_ids)) await self._auth_and_persist_outliers(room_id, events) + @trace async def _auth_and_persist_outliers( self, room_id: str, events: Iterable[EventBase] ) -> None: @@ -1477,6 +1489,11 @@ async def _auth_and_persist_outliers( """ event_map = {event.event_id: event for event in events} + set_attribute( + SynapseTags.FUNC_ARG_PREFIX + "event_ids", + event_map.keys(), + ) + # filter out any events we have already seen. This might happen because # the events were eagerly pushed to us (eg, during a room join), or because # another thread has raced against us since we decided to request the event. @@ -1593,6 +1610,7 @@ async def prep(event: EventBase) -> None: backfilled=True, ) + @trace async def _check_event_auth( self, origin: Optional[str], event: EventBase, context: EventContext ) -> None: @@ -1631,6 +1649,9 @@ async def _check_event_auth( claimed_auth_events = await self._load_or_fetch_auth_events_for_event( origin, event ) + set_attribute( + "claimed_auth_events", [ev.event_id for ev in claimed_auth_events] + ) # ... and check that the event passes auth at those auth events. # https://spec.matrix.org/v1.3/server-server-api/#checks-performed-on-receipt-of-a-pdu: @@ -1728,6 +1749,7 @@ async def _check_event_auth( ) context.rejected = RejectedReason.AUTH_ERROR + @trace async def _maybe_kick_guest_users(self, event: EventBase) -> None: if event.type != EventTypes.GuestAccess: return @@ -1935,6 +1957,8 @@ async def _load_or_fetch_auth_events_for_event( # instead we raise an AuthError, which will make the caller ignore it. raise AuthError(code=HTTPStatus.FORBIDDEN, msg="Auth events could not be found") + @trace + @tag_args async def _get_remote_auth_chain_for_event( self, destination: str, room_id: str, event_id: str ) -> None: @@ -1963,6 +1987,7 @@ async def _get_remote_auth_chain_for_event( await self._auth_and_persist_outliers(room_id, remote_auth_events) + @trace async def _run_push_actions_and_persist_event( self, event: EventBase, context: EventContext, backfilled: bool = False ) -> None: @@ -2008,6 +2033,7 @@ async def _run_push_actions_and_persist_event( await self._store.remove_push_actions_from_staging(event.event_id) raise + @trace async def persist_events_and_notify( self, room_id: str, diff --git a/synapse/logging/tracing.py b/synapse/logging/tracing.py index 2f4555cc2eb0..cb557a147dd4 100644 --- a/synapse/logging/tracing.py +++ b/synapse/logging/tracing.py @@ -281,6 +281,16 @@ class SynapseTags: # The name of the external cache CACHE_NAME = "cache.name" + # Used to tag function arguments + # + # Tag a named arg. The name of the argument should be appended to this + # prefix + FUNC_ARG_PREFIX = "ARG." + # Tag extra variadic number of positional arguments (`def foo(first, second, *extras)`) + FUNC_ARGS = "args" + # Tag keyword args + FUNC_KWARGS = "kwargs" + class SynapseBaggage: FORCE_TRACING = "synapse-force-tracing" @@ -790,30 +800,28 @@ def extract_text_map( # Tracing decorators -def create_decorator( +def _custom_sync_async_decorator( func: Callable[P, R], - # TODO: What is the correct type for these `Any`? `P.args, P.kwargs` isn't allowed here wrapping_logic: Callable[[Callable[P, R], Any, Any], ContextManager[None]], ) -> Callable[P, R]: """ - Creates a decorator that is able to handle sync functions, async functions - (coroutines), and inlineDeferred from Twisted. - + Decorates a function that is sync or async (coroutines), or that returns a Twisted + `Deferred`. The custom business logic of the decorator goes in `wrapping_logic`. Example usage: ```py - # Decorator to time the functiona and log it out + # Decorator to time the function and log it out def duration(func: Callable[P, R]) -> Callable[P, R]: @contextlib.contextmanager - def _wrapping_logic(func: Callable[P, R], *args: P.args, **kwargs: P.kwargs): + def _wrapping_logic(func: Callable[P, R], *args: P.args, **kwargs: P.kwargs) -> Generator[None, None, None]: start_ts = time.time() - yield - end_ts = time.time() - duration = end_ts - start_ts - logger.info("%s took %s seconds", func.__name__, duration) - - return create_decorator(func, _wrapping_logic) + try: + yield + finally: + end_ts = time.time() + duration = end_ts - start_ts + logger.info("%s took %s seconds", func.__name__, duration) + return _custom_sync_async_decorator(func, _wrapping_logic) ``` - Args: func: The function to be decorated wrapping_logic: The business logic of your custom decorator. @@ -821,14 +829,18 @@ def _wrapping_logic(func: Callable[P, R], *args: P.args, **kwargs: P.kwargs): before/after the function as desired. """ - @wraps(func) - async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R: - if inspect.iscoroutinefunction(func): + if inspect.iscoroutinefunction(func): + + @wraps(func) + async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R: with wrapping_logic(func, *args, **kwargs): - return await func(*args, **kwargs) - else: - # The other case here handles both sync functions and those - # decorated with inlineDeferred. + return await func(*args, **kwargs) # type: ignore[misc] + + else: + # The other case here handles both sync functions and those + # decorated with inlineDeferred. + @wraps(func) + def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R: scope = wrapping_logic(func, *args, **kwargs) scope.__enter__() @@ -866,7 +878,11 @@ def err_back(result: R) -> R: return _wrapper # type: ignore[return-value] -def trace_with_opname(opname: str) -> Callable[[Callable[P, R]], Callable[P, R]]: +def trace_with_opname( + opname: str, + *, + tracer: Optional["opentelemetry.trace.Tracer"] = None, +) -> Callable[[Callable[P, R]], Callable[P, R]]: """ Decorator to trace a function with a custom opname. @@ -875,21 +891,14 @@ def trace_with_opname(opname: str) -> Callable[[Callable[P, R]], Callable[P, R]] @contextlib.contextmanager def _wrapping_logic(func: Callable[P, R], *args: P.args, **kwargs: P.kwargs): - if opentelemetry is None: - return None - - scope = start_active_span(opname) - scope.__enter__() - try: + with start_active_span(opname, tracer=tracer): yield - except Exception as e: - scope.__exit__(type(e), None, e.__traceback__) - raise - finally: - scope.__exit__(None, None, None) def _decorator(func: Callable[P, R]): - return create_decorator(func, _wrapping_logic) + if not opentelemetry: + return func + + return _custom_sync_async_decorator(func, _wrapping_logic) return _decorator @@ -918,12 +927,12 @@ def tag_args(func: Callable[P, R]) -> Callable[P, R]: def _wrapping_logic(func: Callable[P, R], *args: P.args, **kwargs: P.kwargs): argspec = inspect.getfullargspec(func) for i, arg in enumerate(args[1:]): - set_attribute("ARG_" + argspec.args[i + 1], str(arg)) # type: ignore[index] - set_attribute("args", str(args[len(argspec.args) :])) # type: ignore[index] - set_attribute("kwargs", str(kwargs)) + set_attribute(SynapseTags.FUNC_ARG_PREFIX + argspec.args[i + 1], str(arg)) # type: ignore[index] + set_attribute(SynapseTags.FUNC_ARGS, str(args[len(argspec.args) :])) # type: ignore[index] + set_attribute(SynapseTags.FUNC_KWARGS, str(kwargs)) yield - return create_decorator(func, _wrapping_logic) + return _custom_sync_async_decorator(func, _wrapping_logic) @contextlib.contextmanager diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py index f87f5098a5d1..0bdb213286a8 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py @@ -46,7 +46,14 @@ from synapse.events import EventBase from synapse.events.snapshot import EventContext from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable -from synapse.logging.tracing import Link, get_active_span, start_active_span, trace +from synapse.logging.tracing import ( + Link, + get_active_span, + set_attribute, + start_active_span, + trace, + SynapseTags, +) from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.controllers.state import StateStorageController from synapse.storage.databases import Databases @@ -383,6 +390,12 @@ async def persist_events( PartialStateConflictError: if attempting to persist a partial state event in a room that has been un-partial stated. """ + set_attribute( + SynapseTags.FUNC_ARG_PREFIX + "event_ids", + [e.event_id for e, _ in events_and_contexts], + ) + set_attribute(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled)) + partitioned: Dict[str, List[Tuple[EventBase, EventContext]]] = {} for event, ctx in events_and_contexts: partitioned.setdefault(event.room_id, []).append((event, ctx)) @@ -781,7 +794,7 @@ async def _calculate_new_extremities( stale_forward_extremities_counter.observe(len(stale)) return result - + async def _get_new_state_after_events( self, room_id: str, diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 3fcb2069df36..178536b10fde 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1381,6 +1381,7 @@ def _delete_old_forward_extrem_cache_txn(txn: LoggingTransaction) -> None: _delete_old_forward_extrem_cache_txn, ) + @trace async def insert_insertion_extremity(self, event_id: str, room_id: str) -> None: await self.db_pool.simple_upsert( table="insertion_event_extremities", diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 1f600f119029..21ba7a540efb 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -40,6 +40,7 @@ from synapse.api.room_versions import RoomVersions from synapse.events import EventBase, relation_from_event from synapse.events.snapshot import EventContext +from synapse.logging.tracing import trace from synapse.storage._base import db_to_json, make_in_list_sql_clause from synapse.storage.database import ( DatabasePool, @@ -145,6 +146,7 @@ def __init__( self._backfill_id_gen: AbstractStreamIdGenerator = self.store._backfill_id_gen self._stream_id_gen: AbstractStreamIdGenerator = self.store._stream_id_gen + @trace async def _persist_events_and_state_updates( self, events_and_contexts: List[Tuple[EventBase, EventContext]], diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 29c99c635735..449cb03276a4 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -54,6 +54,7 @@ current_context, make_deferred_yieldable, ) +from synapse.logging.tracing import trace, tag_args from synapse.metrics.background_process_metrics import ( run_as_background_process, wrap_as_background_process, @@ -394,6 +395,8 @@ async def get_event( return event + @trace + @tag_args async def get_events( self, event_ids: Collection[str], @@ -1363,6 +1366,8 @@ async def have_events_in_timeline(self, event_ids: Iterable[str]) -> Set[str]: return {r["event_id"] for r in rows} + @trace + @tag_args async def have_seen_events( self, room_id: str, event_ids: Iterable[str] ) -> Set[str]: From 552b7f13b6f27e5a6f1c7b3bebe38fb2ef81c0bc Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Sat, 6 Aug 2022 01:06:48 -0500 Subject: [PATCH 09/18] More tracing for federation --- synapse/federation/federation_client.py | 7 ++++++- synapse/handlers/federation.py | 2 +- synapse/handlers/federation_event.py | 20 ++++++++++++++----- synapse/storage/controllers/persist_events.py | 4 ++-- .../storage/databases/main/events_worker.py | 2 +- 5 files changed, 25 insertions(+), 10 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 76b39408ec09..54c3478a822c 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -61,7 +61,7 @@ ) from synapse.federation.transport.client import SendJoinResponse from synapse.http.types import QueryParams -from synapse.logging.tracing import trace, tag_args +from synapse.logging.tracing import tag_args, trace from synapse.types import JsonDict, UserID, get_domain_from_id from synapse.util.async_helpers import concurrently_execute from synapse.util.caches.expiringcache import ExpiringCache @@ -338,6 +338,8 @@ async def get_pdu_from_destination_raw( return None + @trace + @tag_args async def get_pdu( self, destinations: Iterable[str], @@ -475,6 +477,8 @@ async def get_room_state_ids( return state_event_ids, auth_event_ids + @trace + @tag_args async def get_room_state( self, destination: str, @@ -534,6 +538,7 @@ async def get_room_state( return valid_state_events, valid_auth_events + @trace async def _check_sigs_and_hash_and_fetch( self, origin: str, diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0cd9597dc544..d77a0adc3dd9 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -59,7 +59,7 @@ from synapse.federation.federation_client import InvalidResponseError from synapse.http.servlet import assert_params_in_dict from synapse.logging.context import nested_logging_context -from synapse.logging.tracing import trace, set_attribute +from synapse.logging.tracing import set_attribute, trace from synapse.metrics.background_process_metrics import run_as_background_process from synapse.module_api import NOT_SPAM from synapse.replication.http.federation import ( diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 60436fe497ff..7fa80439b297 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -59,7 +59,13 @@ from synapse.events.snapshot import EventContext from synapse.federation.federation_client import InvalidResponseError from synapse.logging.context import nested_logging_context -from synapse.logging.tracing import trace, tag_args, set_attribute, SynapseTags +from synapse.logging.tracing import ( + SynapseTags, + start_active_span, + set_attribute, + tag_args, + trace, +) from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet from synapse.replication.http.federation import ( @@ -1491,7 +1497,7 @@ async def _auth_and_persist_outliers( set_attribute( SynapseTags.FUNC_ARG_PREFIX + "event_ids", - event_map.keys(), + str(event_map.keys()), ) # filter out any events we have already seen. This might happen because @@ -2033,7 +2039,6 @@ async def _run_push_actions_and_persist_event( await self._store.remove_push_actions_from_staging(event.event_id) raise - @trace async def persist_events_and_notify( self, room_id: str, @@ -2097,8 +2102,13 @@ async def persist_events_and_notify( self._message_handler.maybe_schedule_expiry(event) if not backfilled: # Never notify for backfilled events - for event in events: - await self._notify_persisted_event(event, max_stream_token) + with start_active_span("notify_persisted_events"): + set_attribute( + SynapseTags.FUNC_ARG_PREFIX + "event_ids", + str([ev.event_id for ev in events]), + ) + for event in events: + await self._notify_persisted_event(event, max_stream_token) return max_stream_token.stream diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py index 0bdb213286a8..72b4dcef5940 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py @@ -48,11 +48,11 @@ from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable from synapse.logging.tracing import ( Link, + SynapseTags, get_active_span, set_attribute, start_active_span, trace, - SynapseTags, ) from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.controllers.state import StateStorageController @@ -392,7 +392,7 @@ async def persist_events( """ set_attribute( SynapseTags.FUNC_ARG_PREFIX + "event_ids", - [e.event_id for e, _ in events_and_contexts], + str([e.event_id for e, _ in events_and_contexts]), ) set_attribute(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled)) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 449cb03276a4..00f9298a9f1b 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -54,7 +54,7 @@ current_context, make_deferred_yieldable, ) -from synapse.logging.tracing import trace, tag_args +from synapse.logging.tracing import tag_args, trace from synapse.metrics.background_process_metrics import ( run_as_background_process, wrap_as_background_process, From c51883e509abde8e2d34da3e8120f20b47b40aaf Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Sat, 6 Aug 2022 01:37:36 -0500 Subject: [PATCH 10/18] Add length to the list of events --- synapse/handlers/federation_event.py | 12 +++++++++--- synapse/storage/controllers/persist_events.py | 2 +- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 7fa80439b297..61882fb40b0f 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -737,6 +737,11 @@ async def _process_pulled_events( backfilled: True if this is part of a historical batch of events (inhibits notification to clients, and validation of device keys.) """ + set_attribute( + SynapseTags.FUNC_ARG_PREFIX + f"event_ids ({len(events)})", + str([event.event_id for event in events]), + ) + set_attribute(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled)) logger.debug( "processing pulled backfilled=%s events=%s", backfilled, @@ -1495,9 +1500,10 @@ async def _auth_and_persist_outliers( """ event_map = {event.event_id: event for event in events} + event_ids = event_map.keys() set_attribute( - SynapseTags.FUNC_ARG_PREFIX + "event_ids", - str(event_map.keys()), + SynapseTags.FUNC_ARG_PREFIX + f"event_ids ({len(event_ids)})", + str(event_ids), ) # filter out any events we have already seen. This might happen because @@ -2104,7 +2110,7 @@ async def persist_events_and_notify( if not backfilled: # Never notify for backfilled events with start_active_span("notify_persisted_events"): set_attribute( - SynapseTags.FUNC_ARG_PREFIX + "event_ids", + SynapseTags.FUNC_ARG_PREFIX + f"event_ids ({len(events)})", str([ev.event_id for ev in events]), ) for event in events: diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py index 72b4dcef5940..7cb69fa4f36b 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py @@ -391,7 +391,7 @@ async def persist_events( a room that has been un-partial stated. """ set_attribute( - SynapseTags.FUNC_ARG_PREFIX + "event_ids", + SynapseTags.FUNC_ARG_PREFIX + f"event_ids ({len(events_and_contexts)})", str([e.event_id for e, _ in events_and_contexts]), ) set_attribute(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled)) From ee465f993b1e9d16ff79839bdf30abff6126a216 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Sat, 6 Aug 2022 02:24:48 -0500 Subject: [PATCH 11/18] Fix some lints (mistakes) and better trace when fetching events --- synapse/handlers/federation_event.py | 6 +++--- synapse/storage/controllers/persist_events.py | 13 ++++++++----- .../storage/databases/main/events_worker.py | 19 ++++++++++++++++--- 3 files changed, 27 insertions(+), 11 deletions(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 61882fb40b0f..578b4941855d 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -61,8 +61,8 @@ from synapse.logging.context import nested_logging_context from synapse.logging.tracing import ( SynapseTags, - start_active_span, set_attribute, + start_active_span, tag_args, trace, ) @@ -722,7 +722,7 @@ async def _get_missing_events_for_pdu( @trace async def _process_pulled_events( - self, origin: str, events: Iterable[EventBase], backfilled: bool + self, origin: str, events: List[EventBase], backfilled: bool ) -> None: """Process a batch of events we have pulled from a remote server @@ -1662,7 +1662,7 @@ async def _check_event_auth( origin, event ) set_attribute( - "claimed_auth_events", [ev.event_id for ev in claimed_auth_events] + "claimed_auth_events", str([ev.event_id for ev in claimed_auth_events]) ) # ... and check that the event passes auth at those auth events. diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py index 7cb69fa4f36b..8a039dbc8358 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py @@ -390,15 +390,18 @@ async def persist_events( PartialStateConflictError: if attempting to persist a partial state event in a room that has been un-partial stated. """ - set_attribute( - SynapseTags.FUNC_ARG_PREFIX + f"event_ids ({len(events_and_contexts)})", - str([e.event_id for e, _ in events_and_contexts]), - ) - set_attribute(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled)) + event_ids: List[str] = [] partitioned: Dict[str, List[Tuple[EventBase, EventContext]]] = {} for event, ctx in events_and_contexts: partitioned.setdefault(event.room_id, []).append((event, ctx)) + event_ids.append(event.event_id) + + set_attribute( + SynapseTags.FUNC_ARG_PREFIX + f"event_ids ({len(event_ids)})", + str(event_ids), + ) + set_attribute(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled)) async def enqueue( item: Tuple[str, List[Tuple[EventBase, EventContext]]] diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 00f9298a9f1b..c547ba7afd8a 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -54,7 +54,7 @@ current_context, make_deferred_yieldable, ) -from synapse.logging.tracing import tag_args, trace +from synapse.logging.tracing import start_active_span, tag_args, trace from synapse.metrics.background_process_metrics import ( run_as_background_process, wrap_as_background_process, @@ -395,8 +395,6 @@ async def get_event( return event - @trace - @tag_args async def get_events( self, event_ids: Collection[str], @@ -433,6 +431,8 @@ async def get_events( return {e.event_id: e for e in events} + @trace + @tag_args async def get_events_as_list( self, event_ids: Collection[str], @@ -1034,6 +1034,11 @@ async def _get_events_from_db( fetched_events: Dict[str, _EventRow] = {} events_to_fetch = event_ids + is_recording_redaction_trace = False + fetching_redactions_tracing_span_cm = start_active_span( + "recursively fetching redactions" + ) + while events_to_fetch: row_map = await self._enqueue_events(events_to_fetch) @@ -1049,6 +1054,14 @@ async def _get_events_from_db( events_to_fetch = redaction_ids.difference(fetched_event_ids) if events_to_fetch: logger.debug("Also fetching redaction events %s", events_to_fetch) + # Start tracing how long it takes for us to get all of the redactions + if not is_recording_redaction_trace: + fetching_redactions_tracing_span_cm.__enter__() + is_recording_redaction_trace = True + + # Only stop recording if we were recording in the first place + if is_recording_redaction_trace: + fetching_redactions_tracing_span_cm.__exit__(None, None, None) # build a map from event_id to EventBase event_map: Dict[str, EventBase] = {} From aa5e92506bd374cf9d07a45d6b9e2b8e03d8b65d Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 8 Aug 2022 20:37:05 -0500 Subject: [PATCH 12/18] Only set attribute if going forward --- synapse/handlers/federation.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index d77a0adc3dd9..a27f9f6246f8 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -320,10 +320,11 @@ async def _maybe_backfill_inner( extremities_to_request: List[str] = [] for i, bp in enumerate(sorted_backfill_points): - set_attribute("backfill_point" + str(i), str(bp)) if len(extremities_to_request) >= 5: break + set_attribute("backfill_point" + str(i), str(bp)) + # For regular backwards extremities, we don't have the extremity events # themselves, so we need to actually check the events that reference them - # their "successor" events. From 597c3f276ebf5de01923383b4bcaca2fab7578b4 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 9 Aug 2022 16:39:29 -0500 Subject: [PATCH 13/18] Trace some results --- synapse/federation/federation_client.py | 13 +++++- synapse/handlers/federation.py | 6 ++- synapse/handlers/federation_event.py | 42 +++++++++++++------ synapse/logging/tracing.py | 4 +- synapse/storage/controllers/state.py | 4 +- .../util/partial_state_events_tracker.py | 3 ++ 6 files changed, 55 insertions(+), 17 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 54c3478a822c..e8003a2b7cad 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -61,7 +61,7 @@ ) from synapse.federation.transport.client import SendJoinResponse from synapse.http.types import QueryParams -from synapse.logging.tracing import tag_args, trace +from synapse.logging.tracing import SynapseTags, set_attribute, tag_args, trace from synapse.types import JsonDict, UserID, get_domain_from_id from synapse.util.async_helpers import concurrently_execute from synapse.util.caches.expiringcache import ExpiringCache @@ -451,6 +451,8 @@ async def get_pdu( return event_copy + @trace + @tag_args async def get_room_state_ids( self, destination: str, room_id: str, event_id: str ) -> Tuple[List[str], List[str]]: @@ -470,6 +472,15 @@ async def get_room_state_ids( state_event_ids = result["pdu_ids"] auth_event_ids = result.get("auth_chain_ids", []) + set_attribute( + SynapseTags.RESULT_PREFIX + f"state_event_ids ({len(state_event_ids)})", + str(state_event_ids), + ) + set_attribute( + SynapseTags.RESULT_PREFIX + f"auth_event_ids ({len(auth_event_ids)})", + str(auth_event_ids), + ) + if not isinstance(state_event_ids, list) or not isinstance( auth_event_ids, list ): diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 39be7829373d..0851ea4bc76d 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -59,7 +59,7 @@ from synapse.federation.federation_client import InvalidResponseError from synapse.http.servlet import assert_params_in_dict from synapse.logging.context import nested_logging_context -from synapse.logging.tracing import set_attribute, trace +from synapse.logging.tracing import SynapseTags, set_attribute, trace from synapse.metrics.background_process_metrics import run_as_background_process from synapse.module_api import NOT_SPAM from synapse.replication.http.federation import ( @@ -323,7 +323,9 @@ async def _maybe_backfill_inner( if len(extremities_to_request) >= 5: break - set_attribute("backfill_point" + str(i), str(bp)) + set_attribute( + SynapseTags.RESULT_PREFIX + "backfill_point" + str(i), str(bp) + ) # For regular backwards extremities, we don't have the extremity events # themselves, so we need to actually check the events that reference them - diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 578b4941855d..efecdd1e306c 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -1025,10 +1025,10 @@ async def _get_state_ids_after_missing_prev_event( logger.debug("Fetching %i events from cache/store", len(desired_events)) have_events = await self._store.have_seen_events(room_id, desired_events) - missing_desired_events = desired_events - have_events + missing_desired_event_ids = desired_events - have_events logger.debug( "We are missing %i events (got %i)", - len(missing_desired_events), + len(missing_desired_event_ids), len(have_events), ) @@ -1040,13 +1040,24 @@ async def _get_state_ids_after_missing_prev_event( # already have a bunch of the state events. It would be nice if the # federation api gave us a way of finding out which we actually need. - missing_auth_events = set(auth_event_ids) - have_events - missing_auth_events.difference_update( - await self._store.have_seen_events(room_id, missing_auth_events) + missing_auth_event_ids = set(auth_event_ids) - have_events + missing_auth_event_ids.difference_update( + await self._store.have_seen_events(room_id, missing_auth_event_ids) ) - logger.debug("We are also missing %i auth events", len(missing_auth_events)) + logger.debug("We are also missing %i auth events", len(missing_auth_event_ids)) - missing_events = missing_desired_events | missing_auth_events + missing_event_ids = missing_desired_event_ids | missing_auth_event_ids + + set_attribute( + SynapseTags.RESULT_PREFIX + + f"missing_auth_event_ids ({len(missing_auth_event_ids)})", + str(missing_auth_event_ids), + ) + set_attribute( + SynapseTags.RESULT_PREFIX + + f"missing_desired_event_ids ({len(missing_desired_event_ids)})", + str(missing_desired_event_ids), + ) # Making an individual request for each of 1000s of events has a lot of # overhead. On the other hand, we don't really want to fetch all of the events @@ -1057,13 +1068,13 @@ async def _get_state_ids_after_missing_prev_event( # # TODO: might it be better to have an API which lets us do an aggregate event # request - if (len(missing_events) * 10) >= len(auth_event_ids) + len(state_event_ids): + if (len(missing_event_ids) * 10) >= len(auth_event_ids) + len(state_event_ids): logger.debug("Requesting complete state from remote") await self._get_state_and_persist(destination, room_id, event_id) else: - logger.debug("Fetching %i events from remote", len(missing_events)) + logger.debug("Fetching %i events from remote", len(missing_event_ids)) await self._get_events_and_persist( - destination=destination, room_id=room_id, event_ids=missing_events + destination=destination, room_id=room_id, event_ids=missing_event_ids ) # We now need to fill out the state map, which involves fetching the @@ -1121,6 +1132,11 @@ async def _get_state_ids_after_missing_prev_event( failed_to_fetch, ) + set_attribute( + SynapseTags.RESULT_PREFIX + f"failed_to_fetch ({len(failed_to_fetch)})", + str(failed_to_fetch), + ) + if remote_event.is_state() and remote_event.rejected_reason is None: state_map[ (remote_event.type, remote_event.state_key) @@ -1662,7 +1678,9 @@ async def _check_event_auth( origin, event ) set_attribute( - "claimed_auth_events", str([ev.event_id for ev in claimed_auth_events]) + SynapseTags.RESULT_PREFIX + + f"claimed_auth_events ({len(claimed_auth_events)})", + str([ev.event_id for ev in claimed_auth_events]), ) # ... and check that the event passes auth at those auth events. @@ -2110,7 +2128,7 @@ async def persist_events_and_notify( if not backfilled: # Never notify for backfilled events with start_active_span("notify_persisted_events"): set_attribute( - SynapseTags.FUNC_ARG_PREFIX + f"event_ids ({len(events)})", + SynapseTags.RESULT_PREFIX + f"event_ids ({len(events)})", str([ev.event_id for ev in events]), ) for event in events: diff --git a/synapse/logging/tracing.py b/synapse/logging/tracing.py index 38521d18dff6..e629548a04aa 100644 --- a/synapse/logging/tracing.py +++ b/synapse/logging/tracing.py @@ -166,7 +166,6 @@ def set_fates(clotho, lachesis, atropos, father="Zues", mother="Themis"): from typing import ( TYPE_CHECKING, Any, - Awaitable, Callable, ContextManager, Dict, @@ -291,6 +290,9 @@ class SynapseTags: # Tag keyword args FUNC_KWARGS = "kwargs" + # Some intermediate result that's interesting to the function + RESULT_PREFIX = "RESULT." + class SynapseBaggage: FORCE_TRACING = "synapse-force-tracing" diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py index 7fb12f4df6cc..8f82d5a8edff 100644 --- a/synapse/storage/controllers/state.py +++ b/synapse/storage/controllers/state.py @@ -29,7 +29,7 @@ from synapse.api.constants import EventTypes from synapse.events import EventBase -from synapse.logging.tracing import trace +from synapse.logging.tracing import tag_args, trace from synapse.storage.state import StateFilter from synapse.storage.util.partial_state_events_tracker import ( PartialCurrentStateTracker, @@ -228,6 +228,7 @@ async def get_state_for_events( return {event: event_to_state[event] for event in event_ids} @trace + @tag_args async def get_state_ids_for_events( self, event_ids: Collection[str], @@ -332,6 +333,7 @@ def get_state_for_groups( ) @trace + @tag_args async def get_state_group_for_events( self, event_ids: Collection[str], diff --git a/synapse/storage/util/partial_state_events_tracker.py b/synapse/storage/util/partial_state_events_tracker.py index 466e5137f2d3..55a275392c24 100644 --- a/synapse/storage/util/partial_state_events_tracker.py +++ b/synapse/storage/util/partial_state_events_tracker.py @@ -20,6 +20,7 @@ from twisted.internet.defer import Deferred from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable +from synapse.logging.tracing import trace from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.databases.main.room import RoomWorkerStore from synapse.util import unwrapFirstError @@ -58,6 +59,7 @@ def notify_un_partial_stated(self, event_id: str) -> None: for o in observers: o.callback(None) + @trace async def await_full_state(self, event_ids: Collection[str]) -> None: """Wait for all the given events to have full state. @@ -151,6 +153,7 @@ def notify_un_partial_stated(self, room_id: str) -> None: for o in observers: o.callback(None) + @trace async def await_full_state(self, room_id: str) -> None: # We add the deferred immediately so that the DB call to check for # partial state doesn't race when we unpartial the room. From f4ec9d1f74a8b1e759a9a58715e0bdbffd65020e Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 10 Aug 2022 18:13:16 -0500 Subject: [PATCH 14/18] Instrument FederationStateIdsServlet --- docker/conf/homeserver.yaml | 2 ++ synapse/federation/federation_server.py | 6 +++++- synapse/handlers/federation.py | 4 +++- synapse/rest/client/room.py | 2 ++ synapse/storage/databases/main/event_federation.py | 2 ++ synapse/util/ratelimitutils.py | 5 +++++ 6 files changed, 19 insertions(+), 2 deletions(-) diff --git a/docker/conf/homeserver.yaml b/docker/conf/homeserver.yaml index 55977017e354..d35be7650933 100644 --- a/docker/conf/homeserver.yaml +++ b/docker/conf/homeserver.yaml @@ -190,6 +190,8 @@ password_config: tracing: enabled: true sample_rate: 1 + homeserver_whitelist: + - ".*" jaeger_exporter_config: agent_host_name: host.docker.internal agent_port: 6831 diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index c5b3b3cedf2d..716cbc5cefe1 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -61,7 +61,7 @@ nested_logging_context, run_in_background, ) -from synapse.logging.tracing import log_kv, start_active_span_from_edu, trace +from synapse.logging.tracing import log_kv, start_active_span_from_edu, trace, tag_args from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.replication.http.federation import ( ReplicationFederationSendEduRestServlet, @@ -547,6 +547,8 @@ async def on_room_state_request( return 200, resp + @trace + @tag_args async def on_state_ids_request( self, origin: str, room_id: str, event_id: str ) -> Tuple[int, JsonDict]: @@ -569,6 +571,8 @@ async def on_state_ids_request( return 200, resp + @trace + @tag_args async def _on_state_ids_request_compute( self, room_id: str, event_id: str ) -> JsonDict: diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0851ea4bc76d..45d56402ecf8 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -59,7 +59,7 @@ from synapse.federation.federation_client import InvalidResponseError from synapse.http.servlet import assert_params_in_dict from synapse.logging.context import nested_logging_context -from synapse.logging.tracing import SynapseTags, set_attribute, trace +from synapse.logging.tracing import SynapseTags, set_attribute, trace, tag_args from synapse.metrics.background_process_metrics import run_as_background_process from synapse.module_api import NOT_SPAM from synapse.replication.http.federation import ( @@ -1085,6 +1085,8 @@ async def on_make_knock_request( return event + @trace + @tag_args async def get_state_ids_for_pdu(self, room_id: str, event_id: str) -> List[str]: """Returns the state at the event. i.e. not including said event.""" event = await self.store.get_event(event_id, check_room_id=room_id) diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index 3880846e9a55..94fc6bba84a2 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -563,6 +563,7 @@ def __init__(self, hs: "HomeServer"): async def on_GET( self, request: SynapseRequest, room_id: str ) -> Tuple[int, JsonDict]: + logger.info("RoomMessageListRestServlet afwefaewfewfew") requester = await self.auth.get_user_by_req(request, allow_guest=True) pagination_config = await PaginationConfig.from_request( self.store, request, default_limit=10 @@ -592,6 +593,7 @@ async def on_GET( as_client_event=as_client_event, event_filter=event_filter, ) + logger.info("RoomMessageListRestServlet afwefaewfewfew got msgs") return 200, msgs diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 178536b10fde..41b015dba133 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -127,6 +127,8 @@ async def get_auth_chain( ) return await self.get_events_as_list(event_ids) + @trace + @tag_args async def get_auth_chain_ids( self, room_id: str, diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index 6394cc39ac02..b04e440f589a 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -21,6 +21,7 @@ from twisted.internet import defer from synapse.api.errors import LimitExceededError +from synapse.logging.tracing import start_active_span from synapse.config.ratelimiting import FederationRatelimitSettings from synapse.logging.context import ( PreserveLoggingContext, @@ -110,6 +111,9 @@ def ratelimit(self) -> "Iterator[defer.Deferred[None]]": def _on_enter(self, request_id: object) -> "defer.Deferred[None]": time_now = self.clock.time_msec() + wait_span_cm = start_active_span("ratelimit wait") + wait_span_cm.__enter__() + # remove any entries from request_times which aren't within the window self.request_times[:] = [ r for r in self.request_times if time_now - r < self.window_size @@ -162,6 +166,7 @@ def on_wait_finished(_: Any) -> "defer.Deferred[None]": def on_start(r: object) -> object: logger.debug("Ratelimit [%s]: Processing req", id(request_id)) + wait_span_cm.__exit__(None, None, None) self.current_processing.add(request_id) return r From 898ba0effe3d2563764c2a057df22f901f836d23 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 11 Aug 2022 10:27:55 -0500 Subject: [PATCH 15/18] More tracing --- synapse/federation/federation_server.py | 2 +- synapse/handlers/federation.py | 21 +++++++++++++------ synapse/handlers/relations.py | 10 ++++++++- synapse/logging/tracing.py | 1 - synapse/rest/client/room.py | 2 -- synapse/storage/databases/main/relations.py | 5 +++++ .../util/partial_state_events_tracker.py | 6 +++--- synapse/util/ratelimitutils.py | 2 +- tests/logging/test_tracing.py | 2 +- 9 files changed, 35 insertions(+), 16 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 716cbc5cefe1..dd9aeba9c86f 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -61,7 +61,7 @@ nested_logging_context, run_in_background, ) -from synapse.logging.tracing import log_kv, start_active_span_from_edu, trace, tag_args +from synapse.logging.tracing import log_kv, start_active_span_from_edu, tag_args, trace from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.replication.http.federation import ( ReplicationFederationSendEduRestServlet, diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 45d56402ecf8..54c23c1eb22c 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -59,7 +59,13 @@ from synapse.federation.federation_client import InvalidResponseError from synapse.http.servlet import assert_params_in_dict from synapse.logging.context import nested_logging_context -from synapse.logging.tracing import SynapseTags, set_attribute, trace, tag_args +from synapse.logging.tracing import ( + SynapseTags, + set_attribute, + start_active_span, + tag_args, + trace, +) from synapse.metrics.background_process_metrics import run_as_background_process from synapse.module_api import NOT_SPAM from synapse.replication.http.federation import ( @@ -380,13 +386,16 @@ async def _maybe_backfill_inner( # First we try hosts that are already in the room # TODO: HEURISTIC ALERT. - curr_state = await self._storage_controllers.state.get_current_state(room_id) + with start_active_span("getting likely_domains"): + curr_state = await self._storage_controllers.state.get_current_state( + room_id + ) - curr_domains = get_domains_from_state(curr_state) + curr_domains = get_domains_from_state(curr_state) - likely_domains = [ - domain for domain, depth in curr_domains if domain != self.server_name - ] + likely_domains = [ + domain for domain, depth in curr_domains if domain != self.server_name + ] async def try_backfill(domains: List[str]) -> bool: # TODO: Should we try multiple of these at a time? diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py index 41c64d62c8ba..70497353a349 100644 --- a/synapse/handlers/relations.py +++ b/synapse/handlers/relations.py @@ -19,7 +19,7 @@ from synapse.api.constants import RelationTypes from synapse.api.errors import SynapseError from synapse.events import EventBase, relation_from_event -from synapse.logging.tracing import trace +from synapse.logging.tracing import SynapseTags, set_attribute, trace from synapse.storage.databases.main.relations import _RelatedEvent from synapse.types import JsonDict, Requester, StreamToken, UserID from synapse.visibility import filter_events_for_client @@ -166,6 +166,7 @@ async def get_relations( return return_value + @trace async def get_relations_for_event( self, event_id: str, @@ -200,6 +201,7 @@ async def get_relations_for_event( return related_events, next_token + @trace async def get_annotations_for_event( self, event_id: str, @@ -245,6 +247,7 @@ async def get_annotations_for_event( return filtered_results + @trace async def _get_threads_for_events( self, events_by_id: Dict[str, EventBase], @@ -406,6 +409,11 @@ async def get_bundled_aggregations( # The event should get bundled aggregations. events_by_id[event.event_id] = event + set_attribute( + SynapseTags.FUNC_ARG_PREFIX + f"event_ids ({len(events_by_id)})", + str(events_by_id.keys()), + ) + # event ID -> bundled aggregation in non-serialized form. results: Dict[str, BundledAggregations] = {} diff --git a/synapse/logging/tracing.py b/synapse/logging/tracing.py index e629548a04aa..86ff8fc637a4 100644 --- a/synapse/logging/tracing.py +++ b/synapse/logging/tracing.py @@ -912,7 +912,6 @@ def trace(func: Callable[P, R]) -> Callable[P, R]: Sets the operation name to that of the function's name. See the module's doc string for usage examples. """ - return trace_with_opname(func.__name__)(func) diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index 94fc6bba84a2..3880846e9a55 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -563,7 +563,6 @@ def __init__(self, hs: "HomeServer"): async def on_GET( self, request: SynapseRequest, room_id: str ) -> Tuple[int, JsonDict]: - logger.info("RoomMessageListRestServlet afwefaewfewfew") requester = await self.auth.get_user_by_req(request, allow_guest=True) pagination_config = await PaginationConfig.from_request( self.store, request, default_limit=10 @@ -593,7 +592,6 @@ async def on_GET( as_client_event=as_client_event, event_filter=event_filter, ) - logger.info("RoomMessageListRestServlet afwefaewfewfew got msgs") return 200, msgs diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index 7bd27790ebfe..255854cd66ed 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -30,6 +30,7 @@ from synapse.api.constants import RelationTypes from synapse.events import EventBase +from synapse.logging.tracing import trace from synapse.storage._base import SQLBaseStore from synapse.storage.database import LoggingTransaction, make_in_list_sql_clause from synapse.storage.databases.main.stream import generate_pagination_where_clause @@ -349,6 +350,10 @@ def _get_aggregation_groups_for_users_txn( def get_applicable_edit(self, event_id: str) -> Optional[EventBase]: raise NotImplementedError() + # TODO: What's the proper way to fix this so we can stack @trace on top of + # @cachedList + # + # @trace @cachedList(cached_method_name="get_applicable_edit", list_name="event_ids") async def get_applicable_edits( self, event_ids: Collection[str] diff --git a/synapse/storage/util/partial_state_events_tracker.py b/synapse/storage/util/partial_state_events_tracker.py index 55a275392c24..07af89ee317d 100644 --- a/synapse/storage/util/partial_state_events_tracker.py +++ b/synapse/storage/util/partial_state_events_tracker.py @@ -20,7 +20,7 @@ from twisted.internet.defer import Deferred from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable -from synapse.logging.tracing import trace +from synapse.logging.tracing import trace_with_opname from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.databases.main.room import RoomWorkerStore from synapse.util import unwrapFirstError @@ -59,7 +59,7 @@ def notify_un_partial_stated(self, event_id: str) -> None: for o in observers: o.callback(None) - @trace + @trace_with_opname("PartialStateEventsTracker.await_full_state") async def await_full_state(self, event_ids: Collection[str]) -> None: """Wait for all the given events to have full state. @@ -153,7 +153,7 @@ def notify_un_partial_stated(self, room_id: str) -> None: for o in observers: o.callback(None) - @trace + @trace_with_opname("PartialCurrentStateTracker.await_full_state") async def await_full_state(self, room_id: str) -> None: # We add the deferred immediately so that the DB call to check for # partial state doesn't race when we unpartial the room. diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index b04e440f589a..2c0c6d53de86 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -21,13 +21,13 @@ from twisted.internet import defer from synapse.api.errors import LimitExceededError -from synapse.logging.tracing import start_active_span from synapse.config.ratelimiting import FederationRatelimitSettings from synapse.logging.context import ( PreserveLoggingContext, make_deferred_yieldable, run_in_background, ) +from synapse.logging.tracing import start_active_span from synapse.util import Clock if typing.TYPE_CHECKING: diff --git a/tests/logging/test_tracing.py b/tests/logging/test_tracing.py index cb0ebb08443b..337796bccfd8 100644 --- a/tests/logging/test_tracing.py +++ b/tests/logging/test_tracing.py @@ -16,7 +16,7 @@ from twisted.test.proto_helpers import MemoryReactorClock from synapse.logging.context import make_deferred_yieldable, run_in_background -from synapse.logging.tracing import start_active_span, trace_with_opname, tag_args +from synapse.logging.tracing import start_active_span, tag_args, trace_with_opname from synapse.util import Clock from tests.unittest import TestCase From 53b8453a9914a396906748870d01f60aa4da7e81 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 15 Aug 2022 12:25:28 -0500 Subject: [PATCH 16/18] Refactor from feedback From feedback in https://github.com/matrix-org/synapse/pull/13499 --- synapse/util/ratelimitutils.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index 2c0c6d53de86..7d48e7264dff 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -111,9 +111,6 @@ def ratelimit(self) -> "Iterator[defer.Deferred[None]]": def _on_enter(self, request_id: object) -> "defer.Deferred[None]": time_now = self.clock.time_msec() - wait_span_cm = start_active_span("ratelimit wait") - wait_span_cm.__enter__() - # remove any entries from request_times which aren't within the window self.request_times[:] = [ r for r in self.request_times if time_now - r < self.window_size @@ -166,7 +163,6 @@ def on_wait_finished(_: Any) -> "defer.Deferred[None]": def on_start(r: object) -> object: logger.debug("Ratelimit [%s]: Processing req", id(request_id)) - wait_span_cm.__exit__(None, None, None) self.current_processing.add(request_id) return r @@ -181,8 +177,11 @@ def on_both(r: object) -> object: # Ensure that we've properly cleaned up. self.sleeping_requests.discard(request_id) self.ready_request_queue.pop(request_id, None) + wait_span_cm.__exit__(None, None, None) return r + wait_span_cm = start_active_span("ratelimit wait") + wait_span_cm.__enter__() ret_defer.addCallbacks(on_start, on_err) ret_defer.addBoth(on_both) return make_deferred_yieldable(ret_defer) From db04b16060cf18c248c92e6c995ea7d1449cad03 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 18 Aug 2022 16:55:11 -0500 Subject: [PATCH 17/18] Some cleanup --- synapse/handlers/federation.py | 4 ++++ synapse/logging/tracing.py | 2 ++ synapse/util/ratelimitutils.py | 3 --- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index ad4e09c8f892..9c583720fc41 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -494,6 +494,10 @@ async def try_backfill(domains: List[str]) -> bool: return False processing_end_time = self.clock.time_msec() + logger.info( + "backfill_processing_before_timer asfd=%s", + (processing_start_time - processing_end_time) / 1000, + ) backfill_processing_before_timer.observe( (processing_start_time - processing_end_time) / 1000 ) diff --git a/synapse/logging/tracing.py b/synapse/logging/tracing.py index c8cb17c84e31..838b56f807b8 100644 --- a/synapse/logging/tracing.py +++ b/synapse/logging/tracing.py @@ -910,7 +910,9 @@ def _decorator(func: Callable[P, R]) -> Callable[P, R]: def trace(func: Callable[P, R]) -> Callable[P, R]: """ Decorator to trace a function. + Sets the operation name to that of the function's name. + See the module's doc string for usage examples. """ return trace_with_opname(func.__name__)(func) diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index e2acd7859525..603570c10f8d 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -263,11 +263,8 @@ def on_both(r: object) -> object: # Ensure that we've properly cleaned up. self.sleeping_requests.discard(request_id) self.ready_request_queue.pop(request_id, None) - wait_span_cm.__exit__(None, None, None) return r - wait_span_cm = start_active_span("ratelimit wait") - wait_span_cm.__enter__() ret_defer.addCallbacks(on_start, on_err) ret_defer.addBoth(on_both) return make_deferred_yieldable(ret_defer) From 4168ba53eecb00f45adc8ca4906f820b8ef15ff1 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 18 Aug 2022 22:11:47 -0500 Subject: [PATCH 18/18] Remove debug logs --- synapse/handlers/federation.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 9c583720fc41..ad4e09c8f892 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -494,10 +494,6 @@ async def try_backfill(domains: List[str]) -> bool: return False processing_end_time = self.clock.time_msec() - logger.info( - "backfill_processing_before_timer asfd=%s", - (processing_start_time - processing_end_time) / 1000, - ) backfill_processing_before_timer.observe( (processing_start_time - processing_end_time) / 1000 )