|
16 | 16 | from typing import Any, Dict, List, Tuple
|
17 | 17 |
|
18 | 18 | from synapse.storage._base import SQLBaseStore
|
| 19 | +from synapse.storage.database import LoggingTransaction |
| 20 | +from synapse.util.caches.stream_change_cache import StreamChangeCache |
19 | 21 |
|
20 | 22 | logger = logging.getLogger(__name__)
|
21 | 23 |
|
22 | 24 |
|
23 | 25 | class StateDeltasStore(SQLBaseStore):
|
| 26 | + # This class must be mixed in with a child class which provides the following |
| 27 | + # attribute. TODO: can we get static analysis to enforce this? |
| 28 | + _curr_state_delta_stream_cache: StreamChangeCache |
| 29 | + |
24 | 30 | async def get_current_state_deltas(
|
25 | 31 | self, prev_stream_id: int, max_stream_id: int
|
26 | 32 | ) -> Tuple[int, List[Dict[str, Any]]]:
|
@@ -60,7 +66,9 @@ async def get_current_state_deltas(
|
60 | 66 | # max_stream_id.
|
61 | 67 | return max_stream_id, []
|
62 | 68 |
|
63 |
| - def get_current_state_deltas_txn(txn): |
| 69 | + def get_current_state_deltas_txn( |
| 70 | + txn: LoggingTransaction, |
| 71 | + ) -> Tuple[int, List[Dict[str, Any]]]: |
64 | 72 | # First we calculate the max stream id that will give us less than
|
65 | 73 | # N results.
|
66 | 74 | # We arbitrarily limit to 100 stream_id entries to ensure we don't
|
@@ -106,15 +114,17 @@ def get_current_state_deltas_txn(txn):
|
106 | 114 | "get_current_state_deltas", get_current_state_deltas_txn
|
107 | 115 | )
|
108 | 116 |
|
109 |
| - def _get_max_stream_id_in_current_state_deltas_txn(self, txn): |
| 117 | + def _get_max_stream_id_in_current_state_deltas_txn( |
| 118 | + self, txn: LoggingTransaction |
| 119 | + ) -> int: |
110 | 120 | return self.db_pool.simple_select_one_onecol_txn(
|
111 | 121 | txn,
|
112 | 122 | table="current_state_delta_stream",
|
113 | 123 | keyvalues={},
|
114 | 124 | retcol="COALESCE(MAX(stream_id), -1)",
|
115 | 125 | )
|
116 | 126 |
|
117 |
| - async def get_max_stream_id_in_current_state_deltas(self): |
| 127 | + async def get_max_stream_id_in_current_state_deltas(self) -> int: |
118 | 128 | return await self.db_pool.runInteraction(
|
119 | 129 | "get_max_stream_id_in_current_state_deltas",
|
120 | 130 | self._get_max_stream_id_in_current_state_deltas_txn,
|
|
0 commit comments