Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Use a sequence to generate AS transaction IDs, drop last_txn AS sta…
Browse files Browse the repository at this point in the history
…te (#12209)

Switching to a sequence means there's no need to track `last_txn` on the
AS state table to generate new TXN IDs. This also means that there is
no longer contention between the AS scheduler and AS handler on updates
to the `application_services_state` table, which will prevent serialization
errors during the complete AS txn transaction.
  • Loading branch information
Fizzadar authored Apr 1, 2022
1 parent 2135182 commit 993d90f
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 113 deletions.
1 change: 1 addition & 0 deletions changelog.d/12209.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Switch to using a sequence to generate AS transaction IDs. Contributed by Nick Beeper. If running synapse with a dedicated appservice worker, this MUST be stopped before upgrading the main process and database.
13 changes: 13 additions & 0 deletions docs/upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,19 @@ process, for example:
dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb
```

# Upgrading to v1.57.0

## Changes to database schema for application services

Synapse v1.57.0 includes a [change](https://github.com/matrix-org/synapse/pull/12209) to the
way transaction IDs are managed for application services. If your deployment uses a dedicated
worker for application service traffic, **it must be stopped** when the database is upgraded
(which normally happens when the main process is upgraded), to ensure the change is made safely
without any risk of reusing transaction IDs.

Deployments which do not use separate worker processes can be upgraded as normal. Similarly,
deployments where no applciation services are in use can be upgraded as normal.

# Upgrading to v1.56.0

## Groups/communities feature has been deprecated
Expand Down
62 changes: 19 additions & 43 deletions synapse/storage/databases/main/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
from synapse.storage.types import Cursor
from synapse.storage.util.sequence import build_sequence_generator
from synapse.types import DeviceListUpdates, JsonDict
from synapse.util import json_encoder
from synapse.util.caches.descriptors import _CacheContext, cached
Expand Down Expand Up @@ -72,6 +74,22 @@ def __init__(
)
self.exclusive_user_regex = _make_exclusive_regex(self.services_cache)

def get_max_as_txn_id(txn: Cursor) -> int:
logger.warning("Falling back to slow query, you should port to postgres")
txn.execute(
"SELECT COALESCE(max(txn_id), 0) FROM application_services_txns"
)
return txn.fetchone()[0] # type: ignore

self._as_txn_seq_gen = build_sequence_generator(
db_conn,
database.engine,
get_max_as_txn_id,
"application_services_txn_id_seq",
table="application_services_txns",
id_column="txn_id",
)

super().__init__(database, db_conn, hs)

def get_app_services(self):
Expand Down Expand Up @@ -239,21 +257,7 @@ async def create_appservice_txn(
"""

def _create_appservice_txn(txn):
# work out new txn id (highest txn id for this service += 1)
# The highest id may be the last one sent (in which case it is last_txn)
# or it may be the highest in the txns list (which are waiting to be/are
# being sent)
last_txn_id = self._get_last_txn(txn, service.id)

txn.execute(
"SELECT MAX(txn_id) FROM application_services_txns WHERE as_id=?",
(service.id,),
)
highest_txn_id = txn.fetchone()[0]
if highest_txn_id is None:
highest_txn_id = 0

new_txn_id = max(highest_txn_id, last_txn_id) + 1
new_txn_id = self._as_txn_seq_gen.get_next_id_txn(txn)

# Insert new txn into txn table
event_ids = json_encoder.encode([e.event_id for e in events])
Expand Down Expand Up @@ -286,25 +290,8 @@ async def complete_appservice_txn(
txn_id: The transaction ID being completed.
service: The application service which was sent this transaction.
"""
txn_id = int(txn_id)

def _complete_appservice_txn(txn):
# Debugging query: Make sure the txn being completed is EXACTLY +1 from
# what was there before. If it isn't, we've got problems (e.g. the AS
# has probably missed some events), so whine loudly but still continue,
# since it shouldn't fail completion of the transaction.
last_txn_id = self._get_last_txn(txn, service.id)
if (last_txn_id + 1) != txn_id:
logger.error(
"appservice: Completing a transaction which has an ID > 1 from "
"the last ID sent to this AS. We've either dropped events or "
"sent it to the AS out of order. FIX ME. last_txn=%s "
"completing_txn=%s service_id=%s",
last_txn_id,
txn_id,
service.id,
)

# Set current txn_id for AS to 'txn_id'
self.db_pool.simple_upsert_txn(
txn,
Expand Down Expand Up @@ -376,17 +363,6 @@ def _get_oldest_unsent_txn(txn):
device_list_summary=DeviceListUpdates(),
)

def _get_last_txn(self, txn, service_id: Optional[str]) -> int:
txn.execute(
"SELECT last_txn FROM application_services_state WHERE as_id=?",
(service_id,),
)
last_txn_id = txn.fetchone()
if last_txn_id is None or last_txn_id[0] is None: # no row exists
return 0
else:
return int(last_txn_id[0]) # select 'last_txn' col

async def set_appservice_last_pos(self, pos: int) -> None:
def set_appservice_last_pos_txn(txn):
txn.execute(
Expand Down
5 changes: 4 additions & 1 deletion synapse/storage/schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

SCHEMA_VERSION = 68 # remember to update the list below when updating
SCHEMA_VERSION = 69 # remember to update the list below when updating
"""Represents the expectations made by the codebase about the database schema
This should be incremented whenever the codebase changes its requirements on the
Expand Down Expand Up @@ -58,6 +58,9 @@
- event_reference_hashes is no longer read.
- `events` has `state_key` and `rejection_reason` columns, which are populated for
new events.
Changes in SCHEMA_VERSION = 69:
- Use sequence to generate future `application_services_txns.txn_id`s
"""


Expand Down
44 changes: 44 additions & 0 deletions synapse/storage/schema/main/delta/69/01as_txn_seq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Copyright 2022 Beeper
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""
Adds a postgres SEQUENCE for generating application service transaction IDs.
"""

from synapse.storage.engines import PostgresEngine


def run_create(cur, database_engine, *args, **kwargs):
if isinstance(database_engine, PostgresEngine):
# If we already have some AS TXNs we want to start from the current
# maximum value. There are two potential places this is stored - the
# actual TXNs themselves *and* the AS state table. At time of migration
# it is possible the TXNs table is empty so we must include the AS state
# last_txn as a potential option, and pick the maximum.

cur.execute("SELECT COALESCE(max(txn_id), 0) FROM application_services_txns")
row = cur.fetchone()
txn_max = row[0]

cur.execute("SELECT COALESCE(max(last_txn), 0) FROM application_services_state")
row = cur.fetchone()
last_txn_max = row[0]

start_val = max(last_txn_max, txn_max) + 1

cur.execute(
"CREATE SEQUENCE application_services_txn_id_seq START WITH %s",
(start_val,),
)
71 changes: 2 additions & 69 deletions tests/storage/test_appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,15 +169,6 @@ def _insert_txn(self, as_id, txn_id, events):
(as_id, txn_id, json.dumps([e.event_id for e in events])),
)

def _set_last_txn(self, as_id, txn_id):
return self.db_pool.runOperation(
self.engine.convert_param_style(
"INSERT INTO application_services_state(as_id, last_txn, state) "
"VALUES(?,?,?)"
),
(as_id, txn_id, ApplicationServiceState.UP.value),
)

def test_get_appservice_state_none(
self,
) -> None:
Expand Down Expand Up @@ -277,64 +268,6 @@ def test_create_appservice_txn_first(
self.assertEqual(txn.events, events)
self.assertEqual(txn.service, service)

def test_create_appservice_txn_older_last_txn(
self,
) -> None:
service = Mock(id=self.as_list[0]["id"])
events = cast(List[EventBase], [Mock(event_id="e1"), Mock(event_id="e2")])
self.get_success(self._set_last_txn(service.id, 9643)) # AS is falling behind
self.get_success(self._insert_txn(service.id, 9644, events))
self.get_success(self._insert_txn(service.id, 9645, events))
txn = self.get_success(
self.store.create_appservice_txn(
service, events, [], [], {}, {}, DeviceListUpdates()
)
)
self.assertEqual(txn.id, 9646)
self.assertEqual(txn.events, events)
self.assertEqual(txn.service, service)

def test_create_appservice_txn_up_to_date_last_txn(
self,
) -> None:
service = Mock(id=self.as_list[0]["id"])
events = cast(List[EventBase], [Mock(event_id="e1"), Mock(event_id="e2")])
self.get_success(self._set_last_txn(service.id, 9643))
txn = self.get_success(
self.store.create_appservice_txn(
service, events, [], [], {}, {}, DeviceListUpdates()
)
)
self.assertEqual(txn.id, 9644)
self.assertEqual(txn.events, events)
self.assertEqual(txn.service, service)

def test_create_appservice_txn_up_fuzzing(
self,
) -> None:
service = Mock(id=self.as_list[0]["id"])
events = cast(List[EventBase], [Mock(event_id="e1"), Mock(event_id="e2")])
self.get_success(self._set_last_txn(service.id, 9643))

# dump in rows with higher IDs to make sure the queries aren't wrong.
self.get_success(self._set_last_txn(self.as_list[1]["id"], 119643))
self.get_success(self._set_last_txn(self.as_list[2]["id"], 9))
self.get_success(self._set_last_txn(self.as_list[3]["id"], 9643))
self.get_success(self._insert_txn(self.as_list[1]["id"], 119644, events))
self.get_success(self._insert_txn(self.as_list[1]["id"], 119645, events))
self.get_success(self._insert_txn(self.as_list[1]["id"], 119646, events))
self.get_success(self._insert_txn(self.as_list[2]["id"], 10, events))
self.get_success(self._insert_txn(self.as_list[3]["id"], 9643, events))

txn = self.get_success(
self.store.create_appservice_txn(
service, events, [], [], {}, {}, DeviceListUpdates()
)
)
self.assertEqual(txn.id, 9644)
self.assertEqual(txn.events, events)
self.assertEqual(txn.service, service)

def test_complete_appservice_txn_first_txn(
self,
) -> None:
Expand Down Expand Up @@ -368,13 +301,13 @@ def test_complete_appservice_txn_first_txn(
)
self.assertEqual(0, len(res))

def test_complete_appservice_txn_existing_in_state_table(
def test_complete_appservice_txn_updates_last_txn_state(
self,
) -> None:
service = Mock(id=self.as_list[0]["id"])
events = [Mock(event_id="e1"), Mock(event_id="e2")]
txn_id = 5
self.get_success(self._set_last_txn(service.id, 4))
self._set_state(self.as_list[0]["id"], ApplicationServiceState.UP)
self.get_success(self._insert_txn(service.id, txn_id, events))
self.get_success(
self.store.complete_appservice_txn(txn_id=txn_id, service=service)
Expand Down

0 comments on commit 993d90f

Please sign in to comment.