Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Prevent expired events from being filtered out when retention is disabled #12611

Merged
merged 16 commits into from
May 23, 2022
Merged
1 change: 1 addition & 0 deletions changelog.d/12611.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug introduced in Synapse 1.7.0 that would prevent events from being sent to clients if there's a retention policy in the room when the support for retention policies is disabled.
2 changes: 1 addition & 1 deletion synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ async def purge_history_for_rooms_in_range(
# defined in the server's configuration, we can safely assume that's the
# case and use it for this room.
max_lifetime = (
retention_policy["max_lifetime"] or self._retention_default_max_lifetime
retention_policy.max_lifetime or self._retention_default_max_lifetime
)

# Cap the effective max_lifetime to be within the range allowed in the
Expand Down
45 changes: 25 additions & 20 deletions synapse/storage/databases/main/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.types import Cursor
from synapse.storage.util.id_generators import IdGenerator
from synapse.types import JsonDict, ThirdPartyInstanceID
from synapse.types import JsonDict, RetentionPolicy, ThirdPartyInstanceID
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached
from synapse.util.stringutils import MXC_REGEX
Expand Down Expand Up @@ -699,20 +699,28 @@ def delete_ratelimit_txn(txn: LoggingTransaction) -> None:
await self.db_pool.runInteraction("delete_ratelimit", delete_ratelimit_txn)

@cached()
async def get_retention_policy_for_room(self, room_id: str) -> Dict[str, int]:
async def get_retention_policy_for_room(self, room_id: str) -> RetentionPolicy:
"""Get the retention policy for a given room.

If no retention policy has been found for this room, returns a policy defined
babolivier marked this conversation as resolved.
Show resolved Hide resolved
by the configured default policy (which has None as both the 'min_lifetime' and
the 'max_lifetime' if no default policy has been defined in the server's
configuration).

If support for retention policies is disabled, a policy with a 'min_lifetime' and
'max_lifetime' of None is returned.

Args:
room_id: The ID of the room to get the retention policy of.

Returns:
A dict containing "min_lifetime" and "max_lifetime" for this room.
"""
# If the room retention feature is disabled, return a policy with no minimum nor
# maximum. This prevents incorrectly filtering out events when sending to
# the client.
if not self.config.retention.retention_enabled:
return RetentionPolicy()

def get_retention_policy_for_room_txn(
txn: LoggingTransaction,
Expand All @@ -736,10 +744,10 @@ def get_retention_policy_for_room_txn(
# If we don't know this room ID, ret will be None, in this case return the default
# policy.
if not ret:
return {
"min_lifetime": self.config.retention.retention_default_min_lifetime,
"max_lifetime": self.config.retention.retention_default_max_lifetime,
}
return RetentionPolicy(
min_lifetime=self.config.retention.retention_default_min_lifetime,
max_lifetime=self.config.retention.retention_default_max_lifetime,
)

min_lifetime = ret[0]["min_lifetime"]
max_lifetime = ret[0]["max_lifetime"]
Expand All @@ -754,10 +762,10 @@ def get_retention_policy_for_room_txn(
if max_lifetime is None:
max_lifetime = self.config.retention.retention_default_max_lifetime

return {
"min_lifetime": min_lifetime,
"max_lifetime": max_lifetime,
}
return RetentionPolicy(
min_lifetime=min_lifetime,
max_lifetime=max_lifetime,
)

async def get_media_mxcs_in_room(self, room_id: str) -> Tuple[List[str], List[str]]:
"""Retrieves all the local and remote media MXC URIs in a given room
Expand Down Expand Up @@ -994,7 +1002,7 @@ def _quarantine_media_txn(

async def get_rooms_for_retention_period_in_range(
self, min_ms: Optional[int], max_ms: Optional[int], include_null: bool = False
) -> Dict[str, Dict[str, Optional[int]]]:
) -> Dict[str, RetentionPolicy]:
"""Retrieves all of the rooms within the given retention range.

Optionally includes the rooms which don't have a retention policy.
Expand All @@ -1016,7 +1024,7 @@ async def get_rooms_for_retention_period_in_range(

def get_rooms_for_retention_period_in_range_txn(
txn: LoggingTransaction,
) -> Dict[str, Dict[str, Optional[int]]]:
) -> Dict[str, RetentionPolicy]:
range_conditions = []
args = []

Expand Down Expand Up @@ -1047,10 +1055,10 @@ def get_rooms_for_retention_period_in_range_txn(
rooms_dict = {}

for row in rows:
rooms_dict[row["room_id"]] = {
"min_lifetime": row["min_lifetime"],
"max_lifetime": row["max_lifetime"],
}
rooms_dict[row["room_id"]] = RetentionPolicy(
min_lifetime=row["min_lifetime"],
max_lifetime=row["max_lifetime"],
)

if include_null:
# If required, do a second query that retrieves all of the rooms we know
Expand All @@ -1065,10 +1073,7 @@ def get_rooms_for_retention_period_in_range_txn(
# policy in its state), add it with a null policy.
for row in rows:
if row["room_id"] not in rooms_dict:
rooms_dict[row["room_id"]] = {
"min_lifetime": None,
"max_lifetime": None,
}
rooms_dict[row["room_id"]] = RetentionPolicy()

return rooms_dict

Expand Down
6 changes: 6 additions & 0 deletions synapse/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -916,3 +916,9 @@ class UserProfile(TypedDict):
user_id: str
display_name: Optional[str]
avatar_url: Optional[str]


@attr.s(auto_attribs=True, frozen=True, slots=True)
class RetentionPolicy:
min_lifetime: Optional[int] = None
max_lifetime: Optional[int] = None
6 changes: 3 additions & 3 deletions synapse/visibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from synapse.events.utils import prune_event
from synapse.storage import Storage
from synapse.storage.state import StateFilter
from synapse.types import StateMap, get_domain_from_id
from synapse.types import RetentionPolicy, StateMap, get_domain_from_id

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -94,7 +94,7 @@ async def filter_events_for_client(

if filter_send_to_client:
room_ids = {e.room_id for e in events}
retention_policies = {}
retention_policies: Dict[str, RetentionPolicy] = {}

for room_id in room_ids:
retention_policies[
Expand Down Expand Up @@ -137,7 +137,7 @@ def allowed(event: EventBase) -> Optional[EventBase]:
# events.
if not event.is_state():
retention_policy = retention_policies[event.room_id]
max_lifetime = retention_policy.get("max_lifetime")
max_lifetime = retention_policy.max_lifetime

if max_lifetime is not None:
oldest_allowed_ts = storage.main.clock.time_msec() - max_lifetime
Expand Down
8 changes: 4 additions & 4 deletions tests/rest/client/test_relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -995,7 +995,7 @@ def assert_annotations(bundled_aggregations: JsonDict) -> None:
bundled_aggregations,
)

self._test_bundled_aggregations(RelationTypes.ANNOTATION, assert_annotations, 7)
self._test_bundled_aggregations(RelationTypes.ANNOTATION, assert_annotations, 6)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This (and the two other similar changes) is because these tests work by counting database transactions. In these transactions there would be get_retention_policy_for_room because we would always run it when responding with events. This PR changes things so that we skip it if retention is disabled, which mean it's one less transaction happening.


def test_annotation_to_annotation(self) -> None:
"""Any relation to an annotation should be ignored."""
Expand Down Expand Up @@ -1031,7 +1031,7 @@ def assert_annotations(bundled_aggregations: JsonDict) -> None:
bundled_aggregations,
)

self._test_bundled_aggregations(RelationTypes.REFERENCE, assert_annotations, 7)
self._test_bundled_aggregations(RelationTypes.REFERENCE, assert_annotations, 6)

def test_thread(self) -> None:
"""
Expand Down Expand Up @@ -1060,7 +1060,7 @@ def assert_thread(bundled_aggregations: JsonDict) -> None:
bundled_aggregations.get("latest_event"),
)

self._test_bundled_aggregations(RelationTypes.THREAD, assert_thread, 10)
self._test_bundled_aggregations(RelationTypes.THREAD, assert_thread, 9)

def test_thread_with_bundled_aggregations_for_latest(self) -> None:
"""
Expand Down Expand Up @@ -1106,7 +1106,7 @@ def assert_thread(bundled_aggregations: JsonDict) -> None:
bundled_aggregations["latest_event"].get("unsigned"),
)

self._test_bundled_aggregations(RelationTypes.THREAD, assert_thread, 10)
self._test_bundled_aggregations(RelationTypes.THREAD, assert_thread, 9)

def test_nested_thread(self) -> None:
"""
Expand Down
35 changes: 31 additions & 4 deletions tests/rest/client/test_retention.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any, Dict
from unittest.mock import Mock

from twisted.test.proto_helpers import MemoryReactor
Expand Down Expand Up @@ -252,16 +253,24 @@ class RetentionNoDefaultPolicyTestCase(unittest.HomeserverTestCase):
room.register_servlets,
]

def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
config = self.default_config()
config["retention"] = {
def default_config(self) -> Dict[str, Any]:
config = super().default_config()

retention_config = {
"enabled": True,
}

# Update this config with what's in the default config so that
# override_config works as expected.
retention_config.update(config.get("retention", {}))
config["retention"] = retention_config

return config

def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
mock_federation_client = Mock(spec=["backfill"])

self.hs = self.setup_test_homeserver(
config=config,
federation_client=mock_federation_client,
)
return self.hs
Expand Down Expand Up @@ -295,6 +304,24 @@ def test_state_policy(self) -> None:

self._test_retention(room_id, expected_code_for_first_event=404)

@unittest.override_config({"retention": {"enabled": False}})
def test_visibility_when_disabled(self) -> None:
babolivier marked this conversation as resolved.
Show resolved Hide resolved
"""Retention policies should be ignored when the retention feature is disabled."""
room_id = self.helper.create_room_as(self.user_id, tok=self.token)

self.helper.send_state(
room_id=room_id,
event_type=EventTypes.Retention,
body={"max_lifetime": one_day_ms},
tok=self.token,
)

resp = self.helper.send(room_id=room_id, body="test", tok=self.token)

self.reactor.advance(one_day_ms * 2 / 1000)

self.get_event(room_id, resp["event_id"])

def _test_retention(
self, room_id: str, expected_code_for_first_event: int = 200
) -> None:
Expand Down