Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Block clients from sending server ACLs that lock the local server out. #8708

Merged
merged 6 commits into from
Nov 3, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions synapse/events/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from synapse.events import EventBase
from synapse.events.builder import EventBuilder
from synapse.events.utils import validate_canonicaljson
from synapse.federation.federation_server import server_matches_acl_event
from synapse.types import EventID, RoomID, UserID


Expand Down Expand Up @@ -81,6 +82,12 @@ def validate_new(self, event: EventBase, config: HomeServerConfig):
if event.type == EventTypes.Retention:
self._validate_retention(event)

if event.type == EventTypes.ServerACL:
if not server_matches_acl_event(config.server_name, event):
raise SynapseError(
400, "Can't create an ACL event that denies the local server"
)

def _validate_retention(self, event: EventBase):
"""Checks that an event that defines the retention policy for a room respects the
format enforced by the spec.
Expand Down
36 changes: 36 additions & 0 deletions tests/handlers/test_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,39 @@ def test_duplicated_txn_id_one_call(self):
# Check that we've deduplicated the events.
self.assertEqual(len(events), 2)
self.assertEqual(events[0].event_id, events[1].event_id)


class ServerAclValidationTestCase(unittest.HomeserverTestCase):
servlets = [
admin.register_servlets,
login.register_servlets,
room.register_servlets,
]

def prepare(self, reactor, clock, hs):
self.user_id = self.register_user("tester", "foobar")
self.access_token = self.login("tester", "foobar")
self.room_id = self.helper.create_room_as(self.user_id, tok=self.access_token)

def test_allow_server_acl(self):
"""Test that sending an ACL that blocks everyone but ourselves work.
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
"""

self.helper.send_state(
self.room_id,
EventTypes.ServerACL,
body={"allow": [self.hs.hostname]},
tok=self.access_token,
expect_code=200,
)

def test_deny_server_acl_block_outselves(self):
"""Test that sending an ACL that blocks ourselves does not work.
"""
self.helper.send_state(
self.room_id,
EventTypes.ServerACL,
body={},
tok=self.access_token,
expect_code=400,
)