Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Catch-up after Federation Outage (split, 3): Add tests for last_successful_stream_ordering #8258

Merged
merged 1 commit into from
Sep 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8258.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Track the `stream_ordering` of the last successfully-sent event to every destination, so we can use this information to 'catch up' a remote server after an outage.
76 changes: 76 additions & 0 deletions tests/federation/test_federation_catch_up.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,24 @@ def prepare(self, reactor, clock, hs):
return_value=make_awaitable(["test", "host2"])
)

# whenever send_transaction is called, record the pdu data
self.pdus = []
self.failed_pdus = []
self.is_online = True
self.hs.get_federation_transport_client().send_transaction.side_effect = (
self.record_transaction
)

async def record_transaction(self, txn, json_cb):
if self.is_online:
data = json_cb()
self.pdus.extend(data["pdus"])
return {}
else:
data = json_cb()
self.failed_pdus.extend(data["pdus"])
raise IOError("Failed to connect because this is a test!")

def get_destination_room(self, room: str, destination: str = "host2") -> dict:
"""
Gets the destination_rooms entry for a (destination, room_id) pair.
Expand Down Expand Up @@ -80,3 +98,61 @@ def test_catch_up_destination_rooms_tracking(self):
self.assertEqual(row_1["event_id"], event_id_1)
self.assertEqual(row_2["event_id"], event_id_2)
self.assertEqual(row_1["stream_ordering"], row_2["stream_ordering"] - 1)

@override_config({"send_federation": True})
def test_catch_up_last_successful_stream_ordering_tracking(self):
"""
Tests that we populate the `destination_rooms` table as needed.
"""
self.register_user("u1", "you the one")
u1_token = self.login("u1", "you the one")
room = self.helper.create_room_as("u1", tok=u1_token)

# take the remote offline
self.is_online = False

self.get_success(
event_injection.inject_member_event(self.hs, room, "@user:host2", "join")
)

self.helper.send(room, "wombats!", tok=u1_token)
self.pump()

lsso_1 = self.get_success(
self.hs.get_datastore().get_destination_last_successful_stream_ordering(
"host2"
)
)

self.assertIsNone(
lsso_1,
"There should be no last successful stream ordering for an always-offline destination",
)

# bring the remote online
self.is_online = True

event_id_2 = self.helper.send(room, "rabbits!", tok=u1_token)["event_id"]

lsso_2 = self.get_success(
self.hs.get_datastore().get_destination_last_successful_stream_ordering(
"host2"
)
)
row_2 = self.get_destination_room(room)

self.assertEqual(
self.pdus[0]["content"]["body"],
"rabbits!",
"Test fault: didn't receive the right PDU",
)
self.assertEqual(
row_2["event_id"],
event_id_2,
"Test fault: destination_rooms not updated correctly",
)
self.assertEqual(
lsso_2,
row_2["stream_ordering"],
"Send succeeded but not marked as last_successful_stream_ordering",
)