Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Use attrs for appservice namespaces.
Browse files Browse the repository at this point in the history
  • Loading branch information
clokep committed Nov 16, 2021
1 parent 2f0912c commit 9e46385
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 66 deletions.
83 changes: 52 additions & 31 deletions synapse/appservice/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
# limitations under the License.
import logging
import re
from typing import TYPE_CHECKING, Iterable, List, Match, Optional, Pattern
from typing import TYPE_CHECKING, Iterable, List, Optional, Pattern

import attr
from netaddr import IPSet

from synapse.api.constants import EventTypes
Expand All @@ -34,6 +35,20 @@ class ApplicationServiceState:
UP = "up"


@attr.s(slots=True, frozen=True, auto_attribs=True)
class Namespace:
exclusive: bool
group_id: Optional[str]
regex: Pattern


@attr.s(slots=True, frozen=True, auto_attribs=True)
class Namespaces:
users: List[Namespace]
aliases: List[Namespace]
rooms: List[Namespace]


class ApplicationService:
"""Defines an application service. This definition is mostly what is
provided to the /register AS API.
Expand Down Expand Up @@ -86,27 +101,30 @@ def __init__(

self.rate_limited = rate_limited

def _check_namespaces(self, namespaces):
def _check_namespaces(self, namespaces: Optional[JsonDict]) -> Namespaces:
# Sanity check that it is of the form:
# {
# users: [ {regex: "[A-z]+.*", exclusive: true}, ...],
# aliases: [ {regex: "[A-z]+.*", exclusive: true}, ...],
# rooms: [ {regex: "[A-z]+.*", exclusive: true}, ...],
# }
result = Namespaces([], [], [])
if not namespaces:
namespaces = {}
return result

for ns in ApplicationService.NS_LIST:
if ns not in namespaces:
namespaces[ns] = []
continue

if type(namespaces[ns]) != list:
namespace: List[Namespace] = getattr(result, ns)

if not isinstance(namespaces[ns], list):
raise ValueError("Bad namespace value for '%s'" % ns)
for regex_obj in namespaces[ns]:
if not isinstance(regex_obj, dict):
raise ValueError("Expected dict regex for ns '%s'" % ns)
if not isinstance(regex_obj.get("exclusive"), bool):
exclusive = regex_obj.get("exclusive")
if not isinstance(exclusive, bool):
raise ValueError("Expected bool for 'exclusive' in ns '%s'" % ns)
group_id = regex_obj.get("group_id")
if group_id:
Expand All @@ -127,22 +145,26 @@ def _check_namespaces(self, namespaces):
)

regex = regex_obj.get("regex")
if isinstance(regex, str):
regex_obj["regex"] = re.compile(regex) # Pre-compile regex
else:
if not isinstance(regex, str):
raise ValueError("Expected string for 'regex' in ns '%s'" % ns)
return namespaces

def _matches_regex(self, namespace_key: str, test_string: str) -> Optional[Match]:
for regex_obj in self.namespaces[namespace_key]:
if regex_obj["regex"].match(test_string):
return regex_obj
# Pre-compile regex.
namespace.append(Namespace(exclusive, group_id, re.compile(regex)))

return result

def _matches_regex(
self, namespaces: List[Namespace], test_string: str
) -> Optional[Namespace]:
for namespace in namespaces:
if namespace.regex.match(test_string):
return namespace
return None

def _is_exclusive(self, namespace_key: str, test_string: str) -> bool:
regex_obj = self._matches_regex(namespace_key, test_string)
if regex_obj:
return regex_obj["exclusive"]
def _is_exclusive(self, namespaces: List[Namespace], test_string: str) -> bool:
namespace = self._matches_regex(namespaces, test_string)
if namespace:
return namespace.exclusive
return False

async def _matches_user(
Expand Down Expand Up @@ -261,39 +283,38 @@ async def is_interested_in_presence(

def is_interested_in_user(self, user_id: str) -> bool:
return (
bool(self._matches_regex(ApplicationService.NS_USERS), user_id)
bool(self._matches_regex(self.namespaces.users, user_id))
or user_id == self.sender
)

def is_interested_in_alias(self, alias: str) -> bool:
return bool(self._matches_regex(ApplicationService.NS_ALIASES), alias)
return bool(self._matches_regex(self.namespaces.aliases, alias))

def is_interested_in_room(self, room_id: str) -> bool:
return bool(self._matches_regex(ApplicationService.NS_ROOMS), room_id)
return bool(self._matches_regex(self.namespaces.rooms, room_id))

def is_exclusive_user(self, user_id: str) -> bool:
return (
self._is_exclusive(ApplicationService.NS_USERS, user_id)
or user_id == self.sender
self._is_exclusive(self.namespaces.users, user_id) or user_id == self.sender
)

def is_interested_in_protocol(self, protocol: str) -> bool:
return protocol in self.protocols

def is_exclusive_alias(self, alias: str) -> bool:
return self._is_exclusive(ApplicationService.NS_ALIASES, alias)
return self._is_exclusive(self.namespaces.aliases, alias)

def is_exclusive_room(self, room_id: str) -> bool:
return self._is_exclusive(ApplicationService.NS_ROOMS, room_id)
return self._is_exclusive(self.namespaces.rooms, room_id)

def get_exclusive_user_regexes(self) -> List[Pattern]:
"""Get the list of regexes used to determine if a user is exclusively
registered by the AS
"""
return [
regex_obj["regex"]
for regex_obj in self.namespaces[ApplicationService.NS_USERS]
if regex_obj["exclusive"]
namespace.regex
for namespace in self.namespaces.users
if namespace.exclusive
]

def get_groups_for_user(self, user_id: str) -> Iterable[str]:
Expand All @@ -306,9 +327,9 @@ def get_groups_for_user(self, user_id: str) -> Iterable[str]:
An iterable that yields group_id strings.
"""
return (
regex_obj["group_id"]
for regex_obj in self.namespaces[ApplicationService.NS_USERS]
if "group_id" in regex_obj and regex_obj["regex"].match(user_id)
namespace.group_id
for namespace in self.namespaces.users
if namespace.group_id and namespace.regex.match(user_id)
)

def is_rate_limited(self) -> bool:
Expand Down
51 changes: 20 additions & 31 deletions tests/appservice/test_appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@

from twisted.internet import defer

from synapse.appservice import ApplicationService
from synapse.appservice import ApplicationService, Namespace

from tests import unittest


def _regex(regex, exclusive=True):
return {"regex": re.compile(regex), "exclusive": exclusive}
def _regex(regex: str, exclusive: bool = True) -> Namespace:
return Namespace(exclusive, None, re.compile(regex))


class ApplicationServiceTestCase(unittest.TestCase):
Expand All @@ -33,11 +33,6 @@ def setUp(self):
url="some_url",
token="some_token",
hostname="matrix.org", # only used by get_groups_for_user
namespaces={
ApplicationService.NS_USERS: [],
ApplicationService.NS_ROOMS: [],
ApplicationService.NS_ALIASES: [],
},
)
self.event = Mock(
type="m.something", room_id="!foo:bar", sender="@someone:somewhere"
Expand All @@ -47,23 +42,23 @@ def setUp(self):

@defer.inlineCallbacks
def test_regex_user_id_prefix_match(self):
self.service.namespaces[ApplicationService.NS_USERS].append(_regex("@irc_.*"))
self.service.namespaces.users.append(_regex("@irc_.*"))
self.event.sender = "@irc_foobar:matrix.org"
self.assertTrue(
(yield defer.ensureDeferred(self.service.is_interested(self.event)))
)

@defer.inlineCallbacks
def test_regex_user_id_prefix_no_match(self):
self.service.namespaces[ApplicationService.NS_USERS].append(_regex("@irc_.*"))
self.service.namespaces.users.append(_regex("@irc_.*"))
self.event.sender = "@someone_else:matrix.org"
self.assertFalse(
(yield defer.ensureDeferred(self.service.is_interested(self.event)))
)

@defer.inlineCallbacks
def test_regex_room_member_is_checked(self):
self.service.namespaces[ApplicationService.NS_USERS].append(_regex("@irc_.*"))
self.service.namespaces.users.append(_regex("@irc_.*"))
self.event.sender = "@someone_else:matrix.org"
self.event.type = "m.room.member"
self.event.state_key = "@irc_foobar:matrix.org"
Expand All @@ -73,7 +68,7 @@ def test_regex_room_member_is_checked(self):

@defer.inlineCallbacks
def test_regex_room_id_match(self):
self.service.namespaces[ApplicationService.NS_ROOMS].append(
self.service.namespaces.rooms.append(
_regex("!some_prefix.*some_suffix:matrix.org")
)
self.event.room_id = "!some_prefixs0m3th1nGsome_suffix:matrix.org"
Expand All @@ -83,7 +78,7 @@ def test_regex_room_id_match(self):

@defer.inlineCallbacks
def test_regex_room_id_no_match(self):
self.service.namespaces[ApplicationService.NS_ROOMS].append(
self.service.namespaces.rooms.append(
_regex("!some_prefix.*some_suffix:matrix.org")
)
self.event.room_id = "!XqBunHwQIXUiqCaoxq:matrix.org"
Expand All @@ -93,9 +88,7 @@ def test_regex_room_id_no_match(self):

@defer.inlineCallbacks
def test_regex_alias_match(self):
self.service.namespaces[ApplicationService.NS_ALIASES].append(
_regex("#irc_.*:matrix.org")
)
self.service.namespaces.aliases.append(_regex("#irc_.*:matrix.org"))
self.store.get_aliases_for_room.return_value = defer.succeed(
["#irc_foobar:matrix.org", "#athing:matrix.org"]
)
Expand All @@ -109,46 +102,44 @@ def test_regex_alias_match(self):
)

def test_non_exclusive_alias(self):
self.service.namespaces[ApplicationService.NS_ALIASES].append(
self.service.namespaces.aliases.append(
_regex("#irc_.*:matrix.org", exclusive=False)
)
self.assertFalse(self.service.is_exclusive_alias("#irc_foobar:matrix.org"))

def test_non_exclusive_room(self):
self.service.namespaces[ApplicationService.NS_ROOMS].append(
self.service.namespaces.rooms.append(
_regex("!irc_.*:matrix.org", exclusive=False)
)
self.assertFalse(self.service.is_exclusive_room("!irc_foobar:matrix.org"))

def test_non_exclusive_user(self):
self.service.namespaces[ApplicationService.NS_USERS].append(
self.service.namespaces.users.append(
_regex("@irc_.*:matrix.org", exclusive=False)
)
self.assertFalse(self.service.is_exclusive_user("@irc_foobar:matrix.org"))

def test_exclusive_alias(self):
self.service.namespaces[ApplicationService.NS_ALIASES].append(
self.service.namespaces.aliases.append(
_regex("#irc_.*:matrix.org", exclusive=True)
)
self.assertTrue(self.service.is_exclusive_alias("#irc_foobar:matrix.org"))

def test_exclusive_user(self):
self.service.namespaces[ApplicationService.NS_USERS].append(
self.service.namespaces.users.append(
_regex("@irc_.*:matrix.org", exclusive=True)
)
self.assertTrue(self.service.is_exclusive_user("@irc_foobar:matrix.org"))

def test_exclusive_room(self):
self.service.namespaces[ApplicationService.NS_ROOMS].append(
self.service.namespaces.rooms.append(
_regex("!irc_.*:matrix.org", exclusive=True)
)
self.assertTrue(self.service.is_exclusive_room("!irc_foobar:matrix.org"))

@defer.inlineCallbacks
def test_regex_alias_no_match(self):
self.service.namespaces[ApplicationService.NS_ALIASES].append(
_regex("#irc_.*:matrix.org")
)
self.service.namespaces.aliases.append(_regex("#irc_.*:matrix.org"))
self.store.get_aliases_for_room.return_value = defer.succeed(
["#xmpp_foobar:matrix.org", "#athing:matrix.org"]
)
Expand All @@ -163,10 +154,8 @@ def test_regex_alias_no_match(self):

@defer.inlineCallbacks
def test_regex_multiple_matches(self):
self.service.namespaces[ApplicationService.NS_ALIASES].append(
_regex("#irc_.*:matrix.org")
)
self.service.namespaces[ApplicationService.NS_USERS].append(_regex("@irc_.*"))
self.service.namespaces.aliases.append(_regex("#irc_.*:matrix.org"))
self.service.namespaces.users.append(_regex("@irc_.*"))
self.event.sender = "@irc_foobar:matrix.org"
self.store.get_aliases_for_room.return_value = defer.succeed(
["#irc_barfoo:matrix.org"]
Expand All @@ -184,7 +173,7 @@ def test_regex_multiple_matches(self):
def test_interested_in_self(self):
# make sure invites get through
self.service.sender = "@appservice:name"
self.service.namespaces[ApplicationService.NS_USERS].append(_regex("@irc_.*"))
self.service.namespaces.users.append(_regex("@irc_.*"))
self.event.type = "m.room.member"
self.event.content = {"membership": "invite"}
self.event.state_key = self.service.sender
Expand All @@ -194,7 +183,7 @@ def test_interested_in_self(self):

@defer.inlineCallbacks
def test_member_list_match(self):
self.service.namespaces[ApplicationService.NS_USERS].append(_regex("@irc_.*"))
self.service.namespaces.users.append(_regex("@irc_.*"))
# Note that @irc_fo:here is the AS user.
self.store.get_users_in_room.return_value = defer.succeed(
["@alice:here", "@irc_fo:here", "@bob:here"]
Expand Down
8 changes: 4 additions & 4 deletions tests/storage/test_appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from twisted.internet import defer

from synapse.appservice import ApplicationService, ApplicationServiceState
from synapse.appservice import ApplicationServiceState
from synapse.config._base import ConfigError
from synapse.storage.database import DatabasePool, make_conn
from synapse.storage.databases.main.appservice import (
Expand Down Expand Up @@ -89,9 +89,9 @@ def test_retrieval_of_service(self):
self.assertEquals(stored_service.token, self.as_token)
self.assertEquals(stored_service.id, self.as_id)
self.assertEquals(stored_service.url, self.as_url)
self.assertEquals(stored_service.namespaces[ApplicationService.NS_ALIASES], [])
self.assertEquals(stored_service.namespaces[ApplicationService.NS_ROOMS], [])
self.assertEquals(stored_service.namespaces[ApplicationService.NS_USERS], [])
self.assertEquals(stored_service.namespaces.aliases, [])
self.assertEquals(stored_service.namespaces.rooms, [])
self.assertEquals(stored_service.namespaces.users, [])

def test_retrieval_of_all_services(self):
services = self.store.get_app_services()
Expand Down

0 comments on commit 9e46385

Please sign in to comment.