Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Convert a grab bag of database code to async/await #8195

Merged
merged 8 commits into from
Aug 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8195.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Convert various parts of the codebase to async/await.
19 changes: 11 additions & 8 deletions synapse/appservice/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,16 @@
# limitations under the License.
import logging
import re
from typing import TYPE_CHECKING

from synapse.api.constants import EventTypes
from synapse.appservice.api import ApplicationServiceApi
from synapse.types import GroupID, get_domain_from_id
from synapse.util.caches.descriptors import cached

if TYPE_CHECKING:
from synapse.storage.databases.main import DataStore

logger = logging.getLogger(__name__)


Expand All @@ -35,30 +40,28 @@ def __init__(self, service, id, events):
self.id = id
self.events = events

def send(self, as_api):
async def send(self, as_api: ApplicationServiceApi) -> bool:
"""Sends this transaction using the provided AS API interface.

Args:
as_api(ApplicationServiceApi): The API to use to send.
as_api: The API to use to send.
Returns:
An Awaitable which resolves to True if the transaction was sent.
True if the transaction was sent.
"""
return as_api.push_bulk(
return await as_api.push_bulk(
service=self.service, events=self.events, txn_id=self.id
)

def complete(self, store):
async def complete(self, store: "DataStore") -> None:
"""Completes this transaction as successful.

Marks this transaction ID on the application service and removes the
transaction contents from the database.

Args:
store: The database store to operate on.
Returns:
A Deferred which resolves to True if the transaction was completed.
"""
return store.complete_appservice_txn(service=self.service, txn_id=self.id)
await store.complete_appservice_txn(service=self.service, txn_id=self.id)


class ApplicationService(object):
Expand Down
19 changes: 11 additions & 8 deletions synapse/federation/persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"""

import logging
from typing import Optional, Tuple

from synapse.federation.units import Transaction
from synapse.logging.utils import log_function
Expand All @@ -36,25 +37,27 @@ def __init__(self, datastore):
self.store = datastore

@log_function
def have_responded(self, origin, transaction):
""" Have we already responded to a transaction with the same id and
async def have_responded(
self, origin: str, transaction: Transaction
) -> Optional[Tuple[int, JsonDict]]:
"""Have we already responded to a transaction with the same id and
origin?

Returns:
Deferred: Results in `None` if we have not previously responded to
this transaction or a 2-tuple of `(int, dict)` representing the
response code and response body.
`None` if we have not previously responded to this transaction or a
2-tuple of `(int, dict)` representing the response code and response body.
"""
if not transaction.transaction_id:
transaction_id = transaction.transaction_id # type: ignore
if not transaction_id:
raise RuntimeError("Cannot persist a transaction with no transaction_id")

return self.store.get_received_txn_response(transaction.transaction_id, origin)
return await self.store.get_received_txn_response(transaction_id, origin)

@log_function
async def set_response(
self, origin: str, transaction: Transaction, code: int, response: JsonDict
) -> None:
""" Persist how we responded to a transaction.
"""Persist how we responded to a transaction.
"""
transaction_id = transaction.transaction_id # type: ignore
if not transaction_id:
Expand Down
4 changes: 2 additions & 2 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1873,8 +1873,8 @@ async def get_persisted_pdu(
else:
return None

def get_min_depth_for_context(self, context):
return self.store.get_min_depth(context)
async def get_min_depth_for_context(self, context):
return await self.store.get_min_depth(context)

async def _handle_new_event(
self, origin, event, state=None, auth_events=None, backfilled=False
Expand Down
15 changes: 6 additions & 9 deletions synapse/storage/databases/main/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ async def set_appservice_state(self, service, state) -> None:
"application_services_state", {"as_id": service.id}, {"state": state}
)

def create_appservice_txn(self, service, events):
async def create_appservice_txn(self, service, events):
"""Atomically creates a new transaction for this application service
with the given list of events.

Expand Down Expand Up @@ -209,20 +209,17 @@ def _create_appservice_txn(txn):
)
return AppServiceTransaction(service=service, id=new_txn_id, events=events)

return self.db_pool.runInteraction(
return await self.db_pool.runInteraction(
"create_appservice_txn", _create_appservice_txn
)

def complete_appservice_txn(self, txn_id, service):
async def complete_appservice_txn(self, txn_id, service) -> None:
"""Completes an application service transaction.

Args:
txn_id(str): The transaction ID being completed.
service(ApplicationService): The application service which was sent
this transaction.
Returns:
A Deferred which resolves if this transaction was stored
successfully.
"""
txn_id = int(txn_id)

Expand Down Expand Up @@ -258,7 +255,7 @@ def _complete_appservice_txn(txn):
{"txn_id": txn_id, "as_id": service.id},
)

return self.db_pool.runInteraction(
await self.db_pool.runInteraction(
"complete_appservice_txn", _complete_appservice_txn
)

Expand Down Expand Up @@ -312,13 +309,13 @@ def _get_last_txn(self, txn, service_id):
else:
return int(last_txn_id[0]) # select 'last_txn' col

def set_appservice_last_pos(self, pos):
async def set_appservice_last_pos(self, pos) -> None:
def set_appservice_last_pos_txn(txn):
txn.execute(
"UPDATE appservice_stream_position SET stream_ordering = ?", (pos,)
)

return self.db_pool.runInteraction(
await self.db_pool.runInteraction(
"set_appservice_last_pos", set_appservice_last_pos_txn
)

Expand Down
12 changes: 6 additions & 6 deletions synapse/storage/databases/main/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,15 +190,15 @@ def get_new_messages_for_remote_destination_txn(txn):
)

@trace
def delete_device_msgs_for_remote(self, destination, up_to_stream_id):
async def delete_device_msgs_for_remote(
self, destination: str, up_to_stream_id: int
) -> None:
"""Used to delete messages when the remote destination acknowledges
their receipt.

Args:
destination(str): The destination server_name
up_to_stream_id(int): Where to delete messages up to.
Returns:
A deferred that resolves when the messages have been deleted.
destination: The destination server_name
up_to_stream_id: Where to delete messages up to.
"""

def delete_messages_for_remote_destination_txn(txn):
Expand All @@ -209,7 +209,7 @@ def delete_messages_for_remote_destination_txn(txn):
)
txn.execute(sql, (destination, up_to_stream_id))

return self.db_pool.runInteraction(
await self.db_pool.runInteraction(
"delete_device_msgs_for_remote", delete_messages_for_remote_destination_txn
)

Expand Down
30 changes: 16 additions & 14 deletions synapse/storage/databases/main/e2e_room_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ async def get_e2e_room_keys(self, user_id, version, room_id=None, session_id=Non

return sessions

def get_e2e_room_keys_multi(self, user_id, version, room_keys):
async def get_e2e_room_keys_multi(self, user_id, version, room_keys):
"""Get multiple room keys at a time. The difference between this function and
get_e2e_room_keys is that this function can be used to retrieve
multiple specific keys at a time, whereas get_e2e_room_keys is used for
Expand All @@ -166,10 +166,10 @@ def get_e2e_room_keys_multi(self, user_id, version, room_keys):
that we want to query

Returns:
Deferred[dict[str, dict[str, dict]]]: a map of room IDs to session IDs to room key
dict[str, dict[str, dict]]: a map of room IDs to session IDs to room key
"""

return self.db_pool.runInteraction(
return await self.db_pool.runInteraction(
"get_e2e_room_keys_multi",
self._get_e2e_room_keys_multi_txn,
user_id,
Expand Down Expand Up @@ -283,7 +283,7 @@ def _get_current_version(txn, user_id):
raise StoreError(404, "No current backup version")
return row[0]

def get_e2e_room_keys_version_info(self, user_id, version=None):
async def get_e2e_room_keys_version_info(self, user_id, version=None):
"""Get info metadata about a version of our room_keys backup.

Args:
Expand All @@ -293,7 +293,7 @@ def get_e2e_room_keys_version_info(self, user_id, version=None):
Raises:
StoreError: with code 404 if there are no e2e_room_keys_versions present
Returns:
A deferred dict giving the info metadata for this backup version, with
A dict giving the info metadata for this backup version, with
fields including:
version(str)
algorithm(str)
Expand Down Expand Up @@ -324,12 +324,12 @@ def _get_e2e_room_keys_version_info_txn(txn):
result["etag"] = 0
return result

return self.db_pool.runInteraction(
return await self.db_pool.runInteraction(
"get_e2e_room_keys_version_info", _get_e2e_room_keys_version_info_txn
)

@trace
def create_e2e_room_keys_version(self, user_id, info):
async def create_e2e_room_keys_version(self, user_id: str, info: dict) -> str:
"""Atomically creates a new version of this user's e2e_room_keys store
with the given version info.

Expand All @@ -338,7 +338,7 @@ def create_e2e_room_keys_version(self, user_id, info):
info(dict): the info about the backup version to be created

Returns:
A deferred string for the newly created version ID
The newly created version ID
"""

def _create_e2e_room_keys_version_txn(txn):
Expand All @@ -365,7 +365,7 @@ def _create_e2e_room_keys_version_txn(txn):

return new_version

return self.db_pool.runInteraction(
return await self.db_pool.runInteraction(
"create_e2e_room_keys_version_txn", _create_e2e_room_keys_version_txn
)

Expand Down Expand Up @@ -403,13 +403,15 @@ async def update_e2e_room_keys_version(
)

@trace
def delete_e2e_room_keys_version(self, user_id, version=None):
async def delete_e2e_room_keys_version(
self, user_id: str, version: Optional[str] = None
) -> None:
"""Delete a given backup version of the user's room keys.
Doesn't delete their actual key data.

Args:
user_id(str): the user whose backup version we're deleting
version(str): Optional. the version ID of the backup version we're deleting
user_id: the user whose backup version we're deleting
version: Optional. the version ID of the backup version we're deleting
If missing, we delete the current backup version info.
Raises:
StoreError: with code 404 if there are no e2e_room_keys_versions present,
Expand All @@ -430,13 +432,13 @@ def _delete_e2e_room_keys_version_txn(txn):
keyvalues={"user_id": user_id, "version": this_version},
)

return self.db_pool.simple_update_one_txn(
self.db_pool.simple_update_one_txn(
txn,
table="e2e_room_keys_versions",
keyvalues={"user_id": user_id, "version": this_version},
updatevalues={"deleted": 1},
)

return self.db_pool.runInteraction(
await self.db_pool.runInteraction(
"delete_e2e_room_keys_version", _delete_e2e_room_keys_version_txn
)
Loading