From b948a4f2b6645765729a61e7d839d51dff960ec2 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Fri, 21 Jan 2022 14:39:48 +0000 Subject: [PATCH 1/8] Add `attempt_to_set_isolation_level` function to the database engines. --- synapse/storage/engines/_base.py | 10 +++++++++- synapse/storage/engines/postgres.py | 6 ++++++ synapse/storage/engines/sqlite.py | 5 +++++ 3 files changed, 20 insertions(+), 1 deletion(-) diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py index 20cd63c3300a..b840c2ecff10 100644 --- a/synapse/storage/engines/_base.py +++ b/synapse/storage/engines/_base.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import abc -from typing import Generic, TypeVar +from typing import Generic, TypeVar, Optional from synapse.storage.types import Connection @@ -109,3 +109,11 @@ def attempt_to_set_autocommit(self, conn: Connection, autocommit: bool): commit/rollback the connections. """ ... + + @abc.abstractmethod + def attempt_to_set_isolation_level(self, conn: Connection, isolation_level: Optional[int]): + """Attempt to set the connections isolation level. + + Note: This has no effect on SQLite3, as transactions are SERIALIZABLE by default. + """ + ... diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 30f948a0f77d..a36ea5259e3e 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -13,6 +13,7 @@ # limitations under the License. import logging +from typing import Optional from synapse.storage.engines._base import BaseDatabaseEngine, IncorrectDatabaseSetup from synapse.storage.types import Connection @@ -175,3 +176,8 @@ def in_transaction(self, conn: Connection) -> bool: def attempt_to_set_autocommit(self, conn: Connection, autocommit: bool): return conn.set_session(autocommit=autocommit) # type: ignore + + def attempt_to_set_isolation_level(self, conn: Connection, isolation_level: Optional[int]): + if isolation_level is None: + isolation_level = self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ + return conn.set_isolation_level(isolation_level) # type: ignore diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py index 70d17d4f2cd8..67eee40c32bd 100644 --- a/synapse/storage/engines/sqlite.py +++ b/synapse/storage/engines/sqlite.py @@ -15,6 +15,7 @@ import struct import threading import typing +from typing import Optional from synapse.storage.engines import BaseDatabaseEngine from synapse.storage.types import Connection @@ -122,6 +123,10 @@ def attempt_to_set_autocommit(self, conn: Connection, autocommit: bool): # set the connection to autocommit mode. pass + def attempt_to_set_isolation_level(self, conn: Connection, isolation_level: Optional[int]): + # All transactions are SERIALIZABLE by default in sqllite + pass + # Following functions taken from: https://github.com/coleifer/peewee From 7cd8e7b5f73b25004a19169714b287c9e70c749f Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Fri, 21 Jan 2022 14:40:29 +0000 Subject: [PATCH 2/8] Add `isolation_level` argument to database run txn methods. --- synapse/storage/database.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 57cc1d76e02f..fda5be42cf2a 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -702,6 +702,7 @@ async def runInteraction( func: Callable[..., R], *args: Any, db_autocommit: bool = False, + isolation_level: Optional[int] = None, **kwargs: Any, ) -> R: """Starts a transaction on the database and runs a given function @@ -724,6 +725,7 @@ async def runInteraction( called multiple times if the transaction is retried, so must correctly handle that case. + isolation_level: Set the server isolation level for this transaction. args: positional args to pass to `func` kwargs: named args to pass to `func` @@ -763,6 +765,7 @@ async def runWithConnection( func: Callable[..., R], *args: Any, db_autocommit: bool = False, + isolation_level: Optional[int] = None, **kwargs: Any, ) -> R: """Wraps the .runWithConnection() method on the underlying db_pool. @@ -775,6 +778,7 @@ async def runWithConnection( db_autocommit: Whether to run the function in "autocommit" mode, i.e. outside of a transaction. This is useful for transaction that are only a single query. Currently only affects postgres. + isolation_level: Set the server isolation level for this transaction. kwargs: named args to pass to `func` Returns: @@ -834,6 +838,10 @@ def inner_func(conn, *args, **kwargs): try: if db_autocommit: self.engine.attempt_to_set_autocommit(conn, True) + if isolation_level: + self.engine.attempt_to_set_isolation_level( + conn, isolation_level + ) db_conn = LoggingDatabaseConnection( conn, self.engine, "runWithConnection" @@ -842,6 +850,8 @@ def inner_func(conn, *args, **kwargs): finally: if db_autocommit: self.engine.attempt_to_set_autocommit(conn, False) + if isolation_level: + self.engine.attempt_to_set_isolation_level(conn, None) return await make_deferred_yieldable( self._db_pool.runWithConnection(inner_func, *args, **kwargs) From 15807d962d7e11c0b80dd62ff835a50453864436 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Fri, 21 Jan 2022 14:53:32 +0000 Subject: [PATCH 3/8] Linting fixes. --- synapse/storage/engines/_base.py | 6 ++++-- synapse/storage/engines/postgres.py | 4 +++- synapse/storage/engines/sqlite.py | 4 +++- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py index b840c2ecff10..d66b8c7d9b85 100644 --- a/synapse/storage/engines/_base.py +++ b/synapse/storage/engines/_base.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import abc -from typing import Generic, TypeVar, Optional +from typing import Generic, Optional, TypeVar from synapse.storage.types import Connection @@ -111,7 +111,9 @@ def attempt_to_set_autocommit(self, conn: Connection, autocommit: bool): ... @abc.abstractmethod - def attempt_to_set_isolation_level(self, conn: Connection, isolation_level: Optional[int]): + def attempt_to_set_isolation_level( + self, conn: Connection, isolation_level: Optional[int] + ): """Attempt to set the connections isolation level. Note: This has no effect on SQLite3, as transactions are SERIALIZABLE by default. diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index a36ea5259e3e..f9322efcd957 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -177,7 +177,9 @@ def in_transaction(self, conn: Connection) -> bool: def attempt_to_set_autocommit(self, conn: Connection, autocommit: bool): return conn.set_session(autocommit=autocommit) # type: ignore - def attempt_to_set_isolation_level(self, conn: Connection, isolation_level: Optional[int]): + def attempt_to_set_isolation_level( + self, conn: Connection, isolation_level: Optional[int] + ): if isolation_level is None: isolation_level = self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ return conn.set_isolation_level(isolation_level) # type: ignore diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py index 67eee40c32bd..6c19e55999bd 100644 --- a/synapse/storage/engines/sqlite.py +++ b/synapse/storage/engines/sqlite.py @@ -123,7 +123,9 @@ def attempt_to_set_autocommit(self, conn: Connection, autocommit: bool): # set the connection to autocommit mode. pass - def attempt_to_set_isolation_level(self, conn: Connection, isolation_level: Optional[int]): + def attempt_to_set_isolation_level( + self, conn: Connection, isolation_level: Optional[int] + ): # All transactions are SERIALIZABLE by default in sqllite pass From 389228b7a5618da91d7c26d019076d7617f7f117 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Fri, 21 Jan 2022 14:53:36 +0000 Subject: [PATCH 4/8] Add changelog entry. --- changelog.d/11799.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/11799.misc diff --git a/changelog.d/11799.misc b/changelog.d/11799.misc new file mode 100644 index 000000000000..5c3b2bcaf4e5 --- /dev/null +++ b/changelog.d/11799.misc @@ -0,0 +1 @@ +Preparation for reducing Postgres serialization errors: allow setting transaction isolation level. Contributed by Nick @ Beeper. From 40e6463241d0c6d7055426bc1c90a6bcd75392ec Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Fri, 21 Jan 2022 18:11:12 +0000 Subject: [PATCH 5/8] Add `IsolationLevel` enum for use in DB calls. --- synapse/storage/engines/_base.py | 7 +++++++ synapse/storage/engines/postgres.py | 16 ++++++++++++++-- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py index d66b8c7d9b85..143cd98ca292 100644 --- a/synapse/storage/engines/_base.py +++ b/synapse/storage/engines/_base.py @@ -12,11 +12,18 @@ # See the License for the specific language governing permissions and # limitations under the License. import abc +from enum import IntEnum from typing import Generic, Optional, TypeVar from synapse.storage.types import Connection +class IsolationLevel(IntEnum): + READ_COMMITTED: int = 1 + REPEATABLE_READ: int = 2 + SERIALIZABLE: int = 3 + + class IncorrectDatabaseSetup(RuntimeError): pass diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index f9322efcd957..2bb100b0c43b 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -13,9 +13,13 @@ # limitations under the License. import logging -from typing import Optional +from typing import Mapping, Optional -from synapse.storage.engines._base import BaseDatabaseEngine, IncorrectDatabaseSetup +from synapse.storage.engines._base import ( + BaseDatabaseEngine, + IncorrectDatabaseSetup, + IsolationLevel, +) from synapse.storage.types import Connection logger = logging.getLogger(__name__) @@ -35,6 +39,12 @@ def _disable_bytes_adapter(_): self.synchronous_commit = database_config.get("synchronous_commit", True) self._version = None # unknown as yet + self.isolation_level_map: Mapping[int, int] = { + IsolationLevel.READ_COMMITTED: self.module.extensions.ISOLATION_LEVEL_READ_COMMITTED, + IsolationLevel.REPEATABLE_READ: self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ, + IsolationLevel.SERIALIZABLE: self.module.extensions.ISOLATION_LEVEL_SERIALIZABLE, + } + @property def single_threaded(self) -> bool: return False @@ -182,4 +192,6 @@ def attempt_to_set_isolation_level( ): if isolation_level is None: isolation_level = self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ + else: + isolation_level = self.isolation_level_map[isolation_level] return conn.set_isolation_level(isolation_level) # type: ignore From 9bf5eced40e927cd653a4d2d93b4371673ff3962 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Fri, 21 Jan 2022 18:12:03 +0000 Subject: [PATCH 6/8] Move postgres default isolation level into class attribute. --- synapse/storage/engines/postgres.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 2bb100b0c43b..a8e7d222b291 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -44,6 +44,7 @@ def _disable_bytes_adapter(_): IsolationLevel.REPEATABLE_READ: self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ, IsolationLevel.SERIALIZABLE: self.module.extensions.ISOLATION_LEVEL_SERIALIZABLE, } + self.default_isolation_level = self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ @property def single_threaded(self) -> bool: @@ -115,9 +116,7 @@ def convert_param_style(self, sql): return sql.replace("?", "%s") def on_new_connection(self, db_conn): - db_conn.set_isolation_level( - self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ - ) + db_conn.set_isolation_level(self.default_isolation_level) # Set the bytea output to escape, vs the default of hex cursor = db_conn.cursor() @@ -191,7 +190,7 @@ def attempt_to_set_isolation_level( self, conn: Connection, isolation_level: Optional[int] ): if isolation_level is None: - isolation_level = self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ + isolation_level = self.default_isolation_level else: isolation_level = self.isolation_level_map[isolation_level] return conn.set_isolation_level(isolation_level) # type: ignore From af7b5369a2fc80eca201eaf1fc47e3ec962f1144 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Sat, 22 Jan 2022 10:01:34 +0000 Subject: [PATCH 7/8] Linting fix. --- synapse/storage/engines/postgres.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index a8e7d222b291..9635faa1fc40 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -44,7 +44,9 @@ def _disable_bytes_adapter(_): IsolationLevel.REPEATABLE_READ: self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ, IsolationLevel.SERIALIZABLE: self.module.extensions.ISOLATION_LEVEL_SERIALIZABLE, } - self.default_isolation_level = self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ + self.default_isolation_level = ( + self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ + ) @property def single_threaded(self) -> bool: From e7a89a1fce0706f37b641d3ae324db79da386411 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Mon, 24 Jan 2022 19:32:41 +0000 Subject: [PATCH 8/8] Safer check on `None` isolation level. Co-authored-by: Brendan Abolivier --- synapse/storage/database.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index fda5be42cf2a..7455326ed331 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -838,7 +838,7 @@ def inner_func(conn, *args, **kwargs): try: if db_autocommit: self.engine.attempt_to_set_autocommit(conn, True) - if isolation_level: + if isolation_level is not None: self.engine.attempt_to_set_isolation_level( conn, isolation_level )