Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Reduce serialization errors in MultiWriterIdGen
Browse files Browse the repository at this point in the history
We call `_update_stream_positions_table_txn` a lot, which is an UPSERT
that can conflict in `REPEATABLE READ` isolation level. Instead of doing
a transaction consisting of a single query we may as well run it outside
of a transaction.
  • Loading branch information
erikjohnston committed Oct 2, 2020
1 parent e3debf9 commit 9d56999
Showing 1 changed file with 21 additions and 5 deletions.
26 changes: 21 additions & 5 deletions synapse/storage/util/id_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@
from typing_extensions import Deque

from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.database import (
DatabasePool,
LoggingDatabaseConnection,
LoggingTransaction,
)
from synapse.storage.util.sequence import PostgresSequenceGenerator

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -548,7 +552,7 @@ def _add_persisted_position(self, new_id: int):
# do.
break

def _update_stream_positions_table_txn(self, txn):
def _update_stream_positions_table_txn(self, txn: LoggingTransaction):
"""Update the `stream_positions` table with newly persisted position.
"""

Expand All @@ -570,6 +574,19 @@ def _update_stream_positions_table_txn(self, txn):
pos = (self.get_current_token_for_writer(self._instance_name),)
txn.execute(sql, (self._stream_name, self._instance_name, pos))

def _update_stream_positions_table_conn(self, conn: LoggingDatabaseConnection):
# We use autocommit here so that we don't have to go through a
# transaction dance, which a) adds latency and b) runs the risk of
# serialization errors.
try:
assert hasattr(conn.conn, "autocommit")
conn.conn.autocommit = True # type: ignore

with conn.cursor(txn_name="MultiWriterIdGenerator._update_table") as cur:
self._update_stream_positions_table_txn(cur)
finally:
conn.conn.autocommit = False # type: ignore


@attr.s(slots=True)
class _AsyncCtxManagerWrapper:
Expand Down Expand Up @@ -633,9 +650,8 @@ async def __aexit__(self, exc_type, exc, tb):
# We only do this on the success path so that the persisted current
# position points to a persisted row with the correct instance name.
if self.id_gen._writers:
await self.id_gen._db.runInteraction(
"MultiWriterIdGenerator._update_table",
self.id_gen._update_stream_positions_table_txn,
await self.id_gen._db.runWithConnection(
self.id_gen._update_stream_positions_table_conn,
)

return False

0 comments on commit 9d56999

Please sign in to comment.