Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Use execute_values more in PostgreSQL #10754

Merged
merged 4 commits into from
Sep 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/10754.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Minor speed ups when joining large rooms over federation.
61 changes: 42 additions & 19 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,18 +280,18 @@ def execute_batch(self, sql: str, args: Iterable[Iterable[Any]]) -> None:
else:
self.executemany(sql, args)

def execute_values(self, sql: str, *args: Any) -> List[Tuple]:
def execute_values(self, sql: str, *args: Any, fetch: bool = True) -> List[Tuple]:
"""Corresponds to psycopg2.extras.execute_values. Only available when
using postgres.

Always sets fetch=True when caling `execute_values`, so will return the
results.
The `fetch` parameter must be set to False if the query does not return
rows (e.g. INSERTs).
"""
assert isinstance(self.database_engine, PostgresEngine)
from psycopg2.extras import execute_values # type: ignore

return self._do_execute(
lambda *x: execute_values(self.txn, *x, fetch=True), sql, *args
lambda *x: execute_values(self.txn, *x, fetch=fetch), sql, *args
)

def execute(self, sql: str, *args: Any) -> None:
Expand Down Expand Up @@ -920,13 +920,23 @@ def simple_insert_many_txn(
if k != keys[0]:
raise RuntimeError("All items must have the same keys")

sql = "INSERT INTO %s (%s) VALUES(%s)" % (
table,
", ".join(k for k in keys[0]),
", ".join("?" for _ in keys[0]),
)
if isinstance(txn.database_engine, PostgresEngine):
# We use `execute_values` as it can be a lot faster than `execute_batch`,
# but it's only available on postgres.
sql = "INSERT INTO %s (%s) VALUES ?" % (
table,
", ".join(k for k in keys[0]),
)

txn.execute_batch(sql, vals)
txn.execute_values(sql, vals, fetch=False)
else:
sql = "INSERT INTO %s (%s) VALUES(%s)" % (
table,
", ".join(k for k in keys[0]),
", ".join("?" for _ in keys[0]),
)

txn.execute_batch(sql, vals)

async def simple_upsert(
self,
Expand Down Expand Up @@ -1281,20 +1291,33 @@ def simple_upsert_many_txn_native_upsert(
k + "=EXCLUDED." + k for k in value_names
)

sql = "INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO %s" % (
table,
", ".join(k for k in allnames),
", ".join("?" for _ in allnames),
", ".join(key_names),
latter,
)

args = []

for x, y in zip(key_values, value_values):
args.append(tuple(x) + tuple(y))

return txn.execute_batch(sql, args)
if isinstance(txn.database_engine, PostgresEngine):
# We use `execute_values` as it can be a lot faster than `execute_batch`,
# but it's only available on postgres.
sql = "INSERT INTO %s (%s) VALUES ? ON CONFLICT (%s) DO %s" % (
table,
", ".join(k for k in allnames),
", ".join(key_names),
latter,
)

txn.execute_values(sql, args, fetch=False)

else:
clokep marked this conversation as resolved.
Show resolved Hide resolved
sql = "INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO %s" % (
table,
", ".join(k for k in allnames),
", ".join("?" for _ in allnames),
", ".join(key_names),
latter,
)

return txn.execute_batch(sql, args)

@overload
async def simple_select_one(
Expand Down