Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix locked upsert on device_lists_remote_cache #4132

Merged
merged 3 commits into from
Nov 2, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/4132.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix table lock of device_lists_remote_cache which could freeze the application
94 changes: 87 additions & 7 deletions synapse/storage/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,19 @@

from synapse.api.errors import StoreError
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList

from ._base import Cache, SQLBaseStore, db_to_json
from ._base import Cache, db_to_json

logger = logging.getLogger(__name__)

DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES = (
"drop_device_list_streams_non_unique_indexes"
)

class DeviceStore(SQLBaseStore):

class DeviceStore(BackgroundUpdateStore):
def __init__(self, db_conn, hs):
super(DeviceStore, self).__init__(db_conn, hs)

Expand All @@ -52,6 +57,30 @@ def __init__(self, db_conn, hs):
columns=["user_id", "device_id"],
)

# create a unique index on device_lists_remote_cache
self.register_background_index_update(
"device_lists_remote_cache_unique_idx",
index_name="device_lists_remote_cache_unique_id",
table="device_lists_remote_cache",
columns=["user_id", "device_id"],
unique=True,
)

# And one on device_lists_remote_extremeties
self.register_background_index_update(
"device_lists_remote_extremeties_unique_idx",
index_name="device_lists_remote_extremeties_unique_idx",
table="device_lists_remote_extremeties",
columns=["user_id"],
unique=True,
)

# once they complete, we can remove the old non-unique indexes.
self.register_background_update_handler(
DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES,
self._drop_device_list_streams_non_unique_indexes,
)

@defer.inlineCallbacks
def store_device(self, user_id, device_id,
initial_device_display_name):
Expand Down Expand Up @@ -239,7 +268,19 @@ def mark_remote_user_device_list_as_unsubscribed(self, user_id):

def update_remote_device_list_cache_entry(self, user_id, device_id, content,
stream_id):
"""Updates a single user's device in the cache.
"""Updates a single device in the cache of a remote user's devicelist.

Note: assumes that we are the only thread that can be updating this user's
device list.

Args:
user_id (str): User to update device list for
device_id (str): ID of decivice being updated
content (dict): new data on this device
stream_id (int): the version of the device list

Returns:
Deferred[None]
"""
return self.runInteraction(
"update_remote_device_list_cache_entry",
Expand Down Expand Up @@ -272,7 +313,11 @@ def _update_remote_device_list_cache_entry_txn(self, txn, user_id, device_id,
},
values={
"content": json.dumps(content),
}
},

# we don't need to lock, because we assume we are the only thread
# updating this user's devices.
lock=False,
)

txn.call_after(self._get_cached_user_device.invalidate, (user_id, device_id,))
Expand All @@ -289,11 +334,26 @@ def _update_remote_device_list_cache_entry_txn(self, txn, user_id, device_id,
},
values={
"stream_id": stream_id,
}
},

# again, we can assume we are the only thread updating this user's
# extremity.
lock=False,
)

def update_remote_device_list_cache(self, user_id, devices, stream_id):
"""Replace the cache of the remote user's devices.
"""Replace the entire cache of the remote user's devices.

Note: assumes that we are the only thread that can be updating this user's
device list.

Args:
user_id (str): User to update device list for
devices (list[dict]): list of device objects supplied over federation
stream_id (int): the version of the device list

Returns:
Deferred[None]
"""
return self.runInteraction(
"update_remote_device_list_cache",
Expand Down Expand Up @@ -338,7 +398,11 @@ def _update_remote_device_list_cache_txn(self, txn, user_id, devices,
},
values={
"stream_id": stream_id,
}
},

# we don't need to lock, because we can assume we are the only thread
# updating this user's extremity.
lock=False,
)

def get_devices_by_remote(self, destination, from_stream_id):
Expand Down Expand Up @@ -722,3 +786,19 @@ def _prune_txn(txn):
"_prune_old_outbound_device_pokes",
_prune_txn,
)

@defer.inlineCallbacks
def _drop_device_list_streams_non_unique_indexes(self, progress, batch_size):
def f(conn):
txn = conn.cursor()
txn.execute(
"DROP INDEX IF EXISTS device_lists_remote_cache_id"
)
txn.execute(
"DROP INDEX IF EXISTS device_lists_remote_extremeties_id"
)
txn.close()

yield self.runWithConnection(f)
yield self._end_background_update(DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES)
defer.returnValue(1)
9 changes: 5 additions & 4 deletions synapse/storage/schema/delta/40/device_list_streams.sql
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@ CREATE TABLE device_lists_remote_cache (
content TEXT NOT NULL
);

CREATE INDEX device_lists_remote_cache_id ON device_lists_remote_cache(user_id, device_id);


-- The last update we got for a user. Empty if we're not receiving updates for
-- that user.
CREATE TABLE device_lists_remote_extremeties (
user_id TEXT NOT NULL,
stream_id TEXT NOT NULL
);

CREATE INDEX device_lists_remote_extremeties_id ON device_lists_remote_extremeties(user_id, stream_id);
-- we used to create non-unique indexes on these tables, but as of update 52 we create
-- unique indexes concurrently:
--
-- CREATE INDEX device_lists_remote_cache_id ON device_lists_remote_cache(user_id, device_id);
-- CREATE INDEX device_lists_remote_extremeties_id ON device_lists_remote_extremeties(user_id, stream_id);


-- Stream of device lists updates. Includes both local and remotes
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/* Copyright 2018 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- register a background update which will create a unique index on
-- device_lists_remote_cache
INSERT into background_updates (update_name, progress_json)
VALUES ('device_lists_remote_cache_unique_idx', '{}');

-- and one on device_lists_remote_extremeties
INSERT into background_updates (update_name, progress_json, depends_on)
VALUES (
'device_lists_remote_extremeties_unique_idx', '{}',

-- doesn't really depend on this, but we need to make sure both happen
-- before we drop the old indexes.
'device_lists_remote_cache_unique_idx'
);

-- once they complete, we can drop the old indexes.
INSERT into background_updates (update_name, progress_json, depends_on)
VALUES (
'drop_device_list_streams_non_unique_indexes', '{}',
'device_lists_remote_extremeties_unique_idx'
);