Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Remove obsolete order field in send_new_transaction #8245

Merged
merged 3 commits into from
Sep 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8245.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Remove obsolete `order` field from federation send queues.
7 changes: 1 addition & 6 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,6 @@ def __init__(self, hs: "synapse.server.HomeServer"):
),
)

self._order = 1

self._is_processing = False
self._last_poked_id = -1

Expand Down Expand Up @@ -272,9 +270,6 @@ def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
# a transaction in progress. If we do, stick it in the pending_pdus
# table and we'll get back to it later.

order = self._order
self._order += 1

destinations = set(destinations)
destinations.discard(self.server_name)
logger.debug("Sending to: %s", str(destinations))
Expand All @@ -286,7 +281,7 @@ def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
sent_pdus_destination_dist_count.inc()

for destination in destinations:
self._get_per_destination_queue(destination).send_pdu(pdu, order)
self._get_per_destination_queue(destination).send_pdu(pdu)

async def send_read_receipt(self, receipt: ReadReceipt) -> None:
"""Send a RR to any other servers in the room
Expand Down
17 changes: 8 additions & 9 deletions synapse/federation/sender/per_destination_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ def __init__(
self._destination = destination
self.transmission_loop_running = False

# a list of tuples of (pending pdu, order)
self._pending_pdus = [] # type: List[Tuple[EventBase, int]]
# a list of pending PDUs
self._pending_pdus = [] # type: List[EventBase]

# XXX this is never actually used: see
# https://github.com/matrix-org/synapse/issues/7549
Expand Down Expand Up @@ -132,14 +132,13 @@ def pending_edu_count(self) -> int:
+ len(self._pending_edus_keyed)
)

def send_pdu(self, pdu: EventBase, order: int) -> None:
def send_pdu(self, pdu: EventBase) -> None:
"""Add a PDU to the queue, and start the transmission loop if necessary

Args:
pdu: pdu to send
order
"""
self._pending_pdus.append((pdu, order))
self._pending_pdus.append(pdu)
self.attempt_new_transaction()

def send_presence(self, states: Iterable[UserPresenceState]) -> None:
Expand Down Expand Up @@ -185,7 +184,7 @@ def attempt_new_transaction(self) -> None:
returns immediately. Otherwise kicks off the process of sending a
transaction in the background.
"""
# list of (pending_pdu, deferred, order)

if self.transmission_loop_running:
# XXX: this can get stuck on by a never-ending
# request at which point pending_pdus just keeps growing.
Expand All @@ -210,7 +209,7 @@ def attempt_new_transaction(self) -> None:
)

async def _transaction_transmission_loop(self) -> None:
pending_pdus = [] # type: List[Tuple[EventBase, int]]
pending_pdus = [] # type: List[EventBase]
try:
self.transmission_loop_running = True

Expand Down Expand Up @@ -373,13 +372,13 @@ async def _transaction_transmission_loop(self) -> None:
"TX [%s] Failed to send transaction: %s", self._destination, e
)

for p, _ in pending_pdus:
for p in pending_pdus:
logger.info(
"Failed to send event %s to %s", p.event_id, self._destination
)
except Exception:
logger.exception("TX [%s] Failed to send transaction", self._destination)
for p, _ in pending_pdus:
for p in pending_pdus:
logger.info(
"Failed to send event %s to %s", p.event_id, self._destination
)
Expand Down
26 changes: 13 additions & 13 deletions synapse/federation/sender/transaction_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import TYPE_CHECKING, List, Tuple
from typing import TYPE_CHECKING, List

from synapse.api.errors import HttpResponseException
from synapse.events import EventBase
Expand Down Expand Up @@ -53,11 +53,17 @@ def __init__(self, hs: "synapse.server.HomeServer"):

@measure_func("_send_new_transaction")
async def send_new_transaction(
self,
destination: str,
pending_pdus: List[Tuple[EventBase, int]],
pending_edus: List[Edu],
):
self, destination: str, pdus: List[EventBase], edus: List[Edu],
) -> bool:
"""
Args:
destination: The destination to send to (e.g. 'example.org')
pdus: In-order list of PDUs to send
edus: List of EDUs to send

Returns:
True iff the transaction was successful
"""

# Make a transaction-sending opentracing span. This span follows on from
# all the edus in that transaction. This needs to be done since there is
Expand All @@ -67,20 +73,14 @@ async def send_new_transaction(
span_contexts = []
keep_destination = whitelisted_homeserver(destination)

for edu in pending_edus:
for edu in edus:
context = edu.get_context()
if context:
span_contexts.append(extract_text_map(json_decoder.decode(context)))
if keep_destination:
edu.strip_context()

with start_active_span_follows_from("send_transaction", span_contexts):

# Sort based on the order field
pending_pdus.sort(key=lambda t: t[1])
pdus = [x[0] for x in pending_pdus]
edus = pending_edus

success = True

logger.debug("TX [%s] _attempt_new_transaction", destination)
Expand Down