Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 412a215

Browse files
committed
Merge commit 'be16ee59a' into anoa/dinsic_release_1_21_x
* commit 'be16ee59a': Add type hints to more handlers (#8244) Remove obsolete order field in `send_new_transaction` (#8245) Split fetching device keys and signatures into two transactions (#8233)
2 parents 255860b + be16ee5 commit 412a215

File tree

11 files changed

+199
-151
lines changed

11 files changed

+199
-151
lines changed

changelog.d/8233.misc

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Refactor queries for device keys and cross-signatures.

changelog.d/8244.misc

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add type hints to pagination, initial sync and events handlers.

changelog.d/8245.misc

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Remove obsolete `order` field from federation send queues.

mypy.ini

+3
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@ files =
1717
synapse/handlers/auth.py,
1818
synapse/handlers/cas_handler.py,
1919
synapse/handlers/directory.py,
20+
synapse/handlers/events.py,
2021
synapse/handlers/federation.py,
2122
synapse/handlers/identity.py,
23+
synapse/handlers/initial_sync.py,
2224
synapse/handlers/message.py,
2325
synapse/handlers/oidc_handler.py,
26+
synapse/handlers/pagination.py,
2427
synapse/handlers/presence.py,
2528
synapse/handlers/room.py,
2629
synapse/handlers/room_member.py,

synapse/federation/sender/__init__.py

+1-6
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,6 @@ def __init__(self, hs: "synapse.server.HomeServer"):
108108
),
109109
)
110110

111-
self._order = 1
112-
113111
self._is_processing = False
114112
self._last_poked_id = -1
115113

@@ -272,9 +270,6 @@ def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
272270
# a transaction in progress. If we do, stick it in the pending_pdus
273271
# table and we'll get back to it later.
274272

275-
order = self._order
276-
self._order += 1
277-
278273
destinations = set(destinations)
279274
destinations.discard(self.server_name)
280275
logger.debug("Sending to: %s", str(destinations))
@@ -286,7 +281,7 @@ def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
286281
sent_pdus_destination_dist_count.inc()
287282

288283
for destination in destinations:
289-
self._get_per_destination_queue(destination).send_pdu(pdu, order)
284+
self._get_per_destination_queue(destination).send_pdu(pdu)
290285

291286
async def send_read_receipt(self, receipt: ReadReceipt) -> None:
292287
"""Send a RR to any other servers in the room

synapse/federation/sender/per_destination_queue.py

+8-9
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,8 @@ def __init__(
9292
self._destination = destination
9393
self.transmission_loop_running = False
9494

95-
# a list of tuples of (pending pdu, order)
96-
self._pending_pdus = [] # type: List[Tuple[EventBase, int]]
95+
# a list of pending PDUs
96+
self._pending_pdus = [] # type: List[EventBase]
9797

9898
# XXX this is never actually used: see
9999
# https://github.com/matrix-org/synapse/issues/7549
@@ -132,14 +132,13 @@ def pending_edu_count(self) -> int:
132132
+ len(self._pending_edus_keyed)
133133
)
134134

135-
def send_pdu(self, pdu: EventBase, order: int) -> None:
135+
def send_pdu(self, pdu: EventBase) -> None:
136136
"""Add a PDU to the queue, and start the transmission loop if necessary
137137
138138
Args:
139139
pdu: pdu to send
140-
order
141140
"""
142-
self._pending_pdus.append((pdu, order))
141+
self._pending_pdus.append(pdu)
143142
self.attempt_new_transaction()
144143

145144
def send_presence(self, states: Iterable[UserPresenceState]) -> None:
@@ -185,7 +184,7 @@ def attempt_new_transaction(self) -> None:
185184
returns immediately. Otherwise kicks off the process of sending a
186185
transaction in the background.
187186
"""
188-
# list of (pending_pdu, deferred, order)
187+
189188
if self.transmission_loop_running:
190189
# XXX: this can get stuck on by a never-ending
191190
# request at which point pending_pdus just keeps growing.
@@ -210,7 +209,7 @@ def attempt_new_transaction(self) -> None:
210209
)
211210

212211
async def _transaction_transmission_loop(self) -> None:
213-
pending_pdus = [] # type: List[Tuple[EventBase, int]]
212+
pending_pdus = [] # type: List[EventBase]
214213
try:
215214
self.transmission_loop_running = True
216215

@@ -373,13 +372,13 @@ async def _transaction_transmission_loop(self) -> None:
373372
"TX [%s] Failed to send transaction: %s", self._destination, e
374373
)
375374

376-
for p, _ in pending_pdus:
375+
for p in pending_pdus:
377376
logger.info(
378377
"Failed to send event %s to %s", p.event_id, self._destination
379378
)
380379
except Exception:
381380
logger.exception("TX [%s] Failed to send transaction", self._destination)
382-
for p, _ in pending_pdus:
381+
for p in pending_pdus:
383382
logger.info(
384383
"Failed to send event %s to %s", p.event_id, self._destination
385384
)

synapse/federation/sender/transaction_manager.py

+13-13
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515
import logging
16-
from typing import TYPE_CHECKING, List, Tuple
16+
from typing import TYPE_CHECKING, List
1717

1818
from synapse.api.errors import HttpResponseException
1919
from synapse.events import EventBase
@@ -53,11 +53,17 @@ def __init__(self, hs: "synapse.server.HomeServer"):
5353

5454
@measure_func("_send_new_transaction")
5555
async def send_new_transaction(
56-
self,
57-
destination: str,
58-
pending_pdus: List[Tuple[EventBase, int]],
59-
pending_edus: List[Edu],
60-
):
56+
self, destination: str, pdus: List[EventBase], edus: List[Edu],
57+
) -> bool:
58+
"""
59+
Args:
60+
destination: The destination to send to (e.g. 'example.org')
61+
pdus: In-order list of PDUs to send
62+
edus: List of EDUs to send
63+
64+
Returns:
65+
True iff the transaction was successful
66+
"""
6167

6268
# Make a transaction-sending opentracing span. This span follows on from
6369
# all the edus in that transaction. This needs to be done since there is
@@ -67,20 +73,14 @@ async def send_new_transaction(
6773
span_contexts = []
6874
keep_destination = whitelisted_homeserver(destination)
6975

70-
for edu in pending_edus:
76+
for edu in edus:
7177
context = edu.get_context()
7278
if context:
7379
span_contexts.append(extract_text_map(json_decoder.decode(context)))
7480
if keep_destination:
7581
edu.strip_context()
7682

7783
with start_active_span_follows_from("send_transaction", span_contexts):
78-
79-
# Sort based on the order field
80-
pending_pdus.sort(key=lambda t: t[1])
81-
pdus = [x[0] for x in pending_pdus]
82-
edus = pending_edus
83-
8484
success = True
8585

8686
logger.debug("TX [%s] _attempt_new_transaction", destination)

synapse/handlers/events.py

+26-23
Original file line numberDiff line numberDiff line change
@@ -15,29 +15,30 @@
1515

1616
import logging
1717
import random
18+
from typing import TYPE_CHECKING, Iterable, List, Optional
1819

1920
from synapse.api.constants import EventTypes, Membership
2021
from synapse.api.errors import AuthError, SynapseError
2122
from synapse.events import EventBase
2223
from synapse.handlers.presence import format_user_presence_state
2324
from synapse.logging.utils import log_function
24-
from synapse.types import UserID
25+
from synapse.streams.config import PaginationConfig
26+
from synapse.types import JsonDict, UserID
2527
from synapse.visibility import filter_events_for_client
2628

2729
from ._base import BaseHandler
2830

31+
if TYPE_CHECKING:
32+
from synapse.server import HomeServer
33+
34+
2935
logger = logging.getLogger(__name__)
3036

3137

3238
class EventStreamHandler(BaseHandler):
33-
def __init__(self, hs):
39+
def __init__(self, hs: "HomeServer"):
3440
super(EventStreamHandler, self).__init__(hs)
3541

36-
# Count of active streams per user
37-
self._streams_per_user = {}
38-
# Grace timers per user to delay the "stopped" signal
39-
self._stop_timer_per_user = {}
40-
4142
self.distributor = hs.get_distributor()
4243
self.distributor.declare("started_user_eventstream")
4344
self.distributor.declare("stopped_user_eventstream")
@@ -52,14 +53,14 @@ def __init__(self, hs):
5253
@log_function
5354
async def get_stream(
5455
self,
55-
auth_user_id,
56-
pagin_config,
57-
timeout=0,
58-
as_client_event=True,
59-
affect_presence=True,
60-
room_id=None,
61-
is_guest=False,
62-
):
56+
auth_user_id: str,
57+
pagin_config: PaginationConfig,
58+
timeout: int = 0,
59+
as_client_event: bool = True,
60+
affect_presence: bool = True,
61+
room_id: Optional[str] = None,
62+
is_guest: bool = False,
63+
) -> JsonDict:
6364
"""Fetches the events stream for a given user.
6465
"""
6566

@@ -98,7 +99,7 @@ async def get_stream(
9899

99100
# When the user joins a new room, or another user joins a currently
100101
# joined room, we need to send down presence for those users.
101-
to_add = []
102+
to_add = [] # type: List[JsonDict]
102103
for event in events:
103104
if not isinstance(event, EventBase):
104105
continue
@@ -110,7 +111,7 @@ async def get_stream(
110111
# Send down presence for everyone in the room.
111112
users = await self.state.get_current_users_in_room(
112113
event.room_id
113-
)
114+
) # type: Iterable[str]
114115
else:
115116
users = [event.state_key]
116117

@@ -144,20 +145,22 @@ async def get_stream(
144145

145146

146147
class EventHandler(BaseHandler):
147-
def __init__(self, hs):
148+
def __init__(self, hs: "HomeServer"):
148149
super(EventHandler, self).__init__(hs)
149150
self.storage = hs.get_storage()
150151

151-
async def get_event(self, user, room_id, event_id):
152+
async def get_event(
153+
self, user: UserID, room_id: Optional[str], event_id: str
154+
) -> Optional[EventBase]:
152155
"""Retrieve a single specified event.
153156
154157
Args:
155-
user (synapse.types.UserID): The user requesting the event
156-
room_id (str|None): The expected room id. We'll return None if the
158+
user: The user requesting the event
159+
room_id: The expected room id. We'll return None if the
157160
event's room does not match.
158-
event_id (str): The event ID to obtain.
161+
event_id: The event ID to obtain.
159162
Returns:
160-
dict: An event, or None if there is no event matching this ID.
163+
An event, or None if there is no event matching this ID.
161164
Raises:
162165
SynapseError if there was a problem retrieving this event, or
163166
AuthError if the user does not have the rights to inspect this

0 commit comments

Comments
 (0)