Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix UPSERT check #4459

Merged
merged 3 commits into from
Jan 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/4459.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Synapse will now take advantage of native UPSERT functionality in PostgreSQL 9.5+ and SQLite 3.24+.
33 changes: 28 additions & 5 deletions synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from twisted.internet import defer

from synapse.api.errors import StoreError
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.engines import PostgresEngine
from synapse.util.caches.descriptors import Cache
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
Expand Down Expand Up @@ -198,7 +199,12 @@ def __init__(self, db_conn, hs):
if self.database_engine.can_native_upsert:
# Check ASAP (and then later, every 1s) to see if we have finished
# background updates of tables that aren't safe to update.
self._clock.call_later(0.0, self._check_safe_to_upsert)
self._clock.call_later(
0.0,
run_as_background_process,
"upsert_safety_check",
self._check_safe_to_upsert
)

@defer.inlineCallbacks
def _check_safe_to_upsert(self):
Expand All @@ -208,7 +214,7 @@ def _check_safe_to_upsert(self):
If there are background updates, we will need to wait, as they may be
the addition of indexes that set the UNIQUE constraint that we require.

If the background updates have not completed, wait a second and check again.
If the background updates have not completed, wait 15 sec and check again.
"""
updates = yield self._simple_select_list(
"background_updates",
Expand All @@ -221,11 +227,16 @@ def _check_safe_to_upsert(self):
# The User IPs table in schema #53 was missing a unique index, which we
# run as a background update.
if "user_ips_device_unique_index" not in updates:
self._unsafe_to_upsert_tables.discard("user_id")
self._unsafe_to_upsert_tables.discard("user_ips")

# If there's any tables left to check, reschedule to run.
if self._unsafe_to_upsert_tables:
self._clock.call_later(1.0, self._check_safe_to_upsert)
self._clock.call_later(
15.0,
run_as_background_process,
"upsert_safety_check",
self._check_safe_to_upsert
)

def start_profiling(self):
self._previous_loop_ts = self._clock.time_msec()
Expand Down Expand Up @@ -609,7 +620,7 @@ def _simple_upsert_txn(
inserting
lock (bool): True to lock the table when doing the upsert.
Returns:
Deferred(None or bool): Native upserts always return None. Emulated
None or bool: Native upserts always return None. Emulated
upserts return True if a new entry was created, False if an existing
one was updated.
"""
Expand Down Expand Up @@ -637,6 +648,18 @@ def _simple_upsert_txn(
def _simple_upsert_txn_emulated(
self, txn, table, keyvalues, values, insertion_values={}, lock=True
):
"""
Args:
table (str): The table to upsert into
keyvalues (dict): The unique key tables and their new values
values (dict): The nonunique columns and their new values
insertion_values (dict): additional key/values to use only when
inserting
lock (bool): True to lock the table when doing the upsert.
Returns:
bool: Return True if a new entry was created, False if an existing
one was updated.
"""
# We need to lock the table :(, unless we're *really* careful
if lock:
self.database_engine.lock_table(txn, table)
Expand Down