Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge branch 'release-v1.13.0' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
richvdh committed May 6, 2020
2 parents fa0b2bd + aee9130 commit 62ee862
Show file tree
Hide file tree
Showing 14 changed files with 264 additions and 150 deletions.
1 change: 1 addition & 0 deletions changelog.d/7420.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Prevent methods in `synapse.handlers.auth` from polling the homeserver config every request.
1 change: 1 addition & 0 deletions changelog.d/7423.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Speed up fetching device lists changes when handling `/sync` requests.
1 change: 1 addition & 0 deletions changelog.d/7427.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add support for running replication over Redis when using workers.
83 changes: 10 additions & 73 deletions synapse/api/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,15 @@
import synapse.logging.opentracing as opentracing
import synapse.types
from synapse import event_auth
from synapse.api.constants import EventTypes, LimitBlockingTypes, Membership, UserTypes
from synapse.api.auth_blocking import AuthBlocking
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import (
AuthError,
Codes,
InvalidClientTokenError,
MissingClientTokenError,
ResourceLimitError,
)
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.config.server import is_threepid_reserved
from synapse.events import EventBase
from synapse.types import StateMap, UserID
from synapse.util.caches import CACHE_SIZE_FACTOR, register_cache
Expand Down Expand Up @@ -77,7 +76,11 @@ def __init__(self, hs):
self.token_cache = LruCache(CACHE_SIZE_FACTOR * 10000)
register_cache("cache", "token_cache", self.token_cache)

self._auth_blocking = AuthBlocking(self.hs)

self._account_validity = hs.config.account_validity
self._track_appservice_user_ips = hs.config.track_appservice_user_ips
self._macaroon_secret_key = hs.config.macaroon_secret_key

@defer.inlineCallbacks
def check_from_context(self, room_version: str, event, context, do_sig_check=True):
Expand Down Expand Up @@ -191,7 +194,7 @@ def get_user_by_req(
opentracing.set_tag("authenticated_entity", user_id)
opentracing.set_tag("appservice_id", app_service.id)

if ip_addr and self.hs.config.track_appservice_user_ips:
if ip_addr and self._track_appservice_user_ips:
yield self.store.insert_client_ip(
user_id=user_id,
access_token=access_token,
Expand Down Expand Up @@ -454,7 +457,7 @@ def validate_macaroon(self, macaroon, type_string, user_id):
# access_tokens include a nonce for uniqueness: any value is acceptable
v.satisfy_general(lambda c: c.startswith("nonce = "))

v.verify(macaroon, self.hs.config.macaroon_secret_key)
v.verify(macaroon, self._macaroon_secret_key)

def _verify_expiry(self, caveat):
prefix = "time < "
Expand Down Expand Up @@ -663,71 +666,5 @@ def check_user_in_room_or_world_readable(
% (user_id, room_id),
)

@defer.inlineCallbacks
def check_auth_blocking(self, user_id=None, threepid=None, user_type=None):
"""Checks if the user should be rejected for some external reason,
such as monthly active user limiting or global disable flag
Args:
user_id(str|None): If present, checks for presence against existing
MAU cohort
threepid(dict|None): If present, checks for presence against configured
reserved threepid. Used in cases where the user is trying register
with a MAU blocked server, normally they would be rejected but their
threepid is on the reserved list. user_id and
threepid should never be set at the same time.
user_type(str|None): If present, is used to decide whether to check against
certain blocking reasons like MAU.
"""

# Never fail an auth check for the server notices users or support user
# This can be a problem where event creation is prohibited due to blocking
if user_id is not None:
if user_id == self.hs.config.server_notices_mxid:
return
if (yield self.store.is_support_user(user_id)):
return

if self.hs.config.hs_disabled:
raise ResourceLimitError(
403,
self.hs.config.hs_disabled_message,
errcode=Codes.RESOURCE_LIMIT_EXCEEDED,
admin_contact=self.hs.config.admin_contact,
limit_type=LimitBlockingTypes.HS_DISABLED,
)
if self.hs.config.limit_usage_by_mau is True:
assert not (user_id and threepid)

# If the user is already part of the MAU cohort or a trial user
if user_id:
timestamp = yield self.store.user_last_seen_monthly_active(user_id)
if timestamp:
return

is_trial = yield self.store.is_trial_user(user_id)
if is_trial:
return
elif threepid:
# If the user does not exist yet, but is signing up with a
# reserved threepid then pass auth check
if is_threepid_reserved(
self.hs.config.mau_limits_reserved_threepids, threepid
):
return
elif user_type == UserTypes.SUPPORT:
# If the user does not exist yet and is of type "support",
# allow registration. Support users are excluded from MAU checks.
return
# Else if there is no room in the MAU bucket, bail
current_mau = yield self.store.get_monthly_active_count()
if current_mau >= self.hs.config.max_mau_value:
raise ResourceLimitError(
403,
"Monthly Active User Limit Exceeded",
admin_contact=self.hs.config.admin_contact,
errcode=Codes.RESOURCE_LIMIT_EXCEEDED,
limit_type=LimitBlockingTypes.MONTHLY_ACTIVE_USER,
)
def check_auth_blocking(self, *args, **kwargs):
return self._auth_blocking.check_auth_blocking(*args, **kwargs)
104 changes: 104 additions & 0 deletions synapse/api/auth_blocking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

from twisted.internet import defer

from synapse.api.constants import LimitBlockingTypes, UserTypes
from synapse.api.errors import Codes, ResourceLimitError
from synapse.config.server import is_threepid_reserved

logger = logging.getLogger(__name__)


class AuthBlocking(object):
def __init__(self, hs):
self.store = hs.get_datastore()

self._server_notices_mxid = hs.config.server_notices_mxid
self._hs_disabled = hs.config.hs_disabled
self._hs_disabled_message = hs.config.hs_disabled_message
self._admin_contact = hs.config.admin_contact
self._max_mau_value = hs.config.max_mau_value
self._limit_usage_by_mau = hs.config.limit_usage_by_mau
self._mau_limits_reserved_threepids = hs.config.mau_limits_reserved_threepids

@defer.inlineCallbacks
def check_auth_blocking(self, user_id=None, threepid=None, user_type=None):
"""Checks if the user should be rejected for some external reason,
such as monthly active user limiting or global disable flag
Args:
user_id(str|None): If present, checks for presence against existing
MAU cohort
threepid(dict|None): If present, checks for presence against configured
reserved threepid. Used in cases where the user is trying register
with a MAU blocked server, normally they would be rejected but their
threepid is on the reserved list. user_id and
threepid should never be set at the same time.
user_type(str|None): If present, is used to decide whether to check against
certain blocking reasons like MAU.
"""

# Never fail an auth check for the server notices users or support user
# This can be a problem where event creation is prohibited due to blocking
if user_id is not None:
if user_id == self._server_notices_mxid:
return
if (yield self.store.is_support_user(user_id)):
return

if self._hs_disabled:
raise ResourceLimitError(
403,
self._hs_disabled_message,
errcode=Codes.RESOURCE_LIMIT_EXCEEDED,
admin_contact=self._admin_contact,
limit_type=LimitBlockingTypes.HS_DISABLED,
)
if self._limit_usage_by_mau is True:
assert not (user_id and threepid)

# If the user is already part of the MAU cohort or a trial user
if user_id:
timestamp = yield self.store.user_last_seen_monthly_active(user_id)
if timestamp:
return

is_trial = yield self.store.is_trial_user(user_id)
if is_trial:
return
elif threepid:
# If the user does not exist yet, but is signing up with a
# reserved threepid then pass auth check
if is_threepid_reserved(self._mau_limits_reserved_threepids, threepid):
return
elif user_type == UserTypes.SUPPORT:
# If the user does not exist yet and is of type "support",
# allow registration. Support users are excluded from MAU checks.
return
# Else if there is no room in the MAU bucket, bail
current_mau = yield self.store.get_monthly_active_count()
if current_mau >= self._max_mau_value:
raise ResourceLimitError(
403,
"Monthly Active User Limit Exceeded",
admin_contact=self._admin_contact,
errcode=Codes.RESOURCE_LIMIT_EXCEEDED,
limit_type=LimitBlockingTypes.MONTHLY_ACTIVE_USER,
)
12 changes: 8 additions & 4 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1143,10 +1143,14 @@ async def _generate_sync_entry_for_device_list(
user_id
)

tracked_users = set(users_who_share_room)

# Always tell the user about their own devices
tracked_users.add(user_id)
# Always tell the user about their own devices. We check as the user
# ID is almost certainly already included (unless they're not in any
# rooms) and taking a copy of the set is relatively expensive.
if user_id not in users_who_share_room:
users_who_share_room = set(users_who_share_room)
users_who_share_room.add(user_id)

tracked_users = users_who_share_room

# Step 1a, check for changes in devices of users we share a room with
users_that_have_changed = await self.store.get_users_whose_devices_changed(
Expand Down
51 changes: 36 additions & 15 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,6 @@ def __init__(self, hs):
self._instance_id = hs.get_instance_id()
self._instance_name = hs.get_instance_name()

# Set of streams that we've caught up with.
self._streams_connected = set() # type: Set[str]

self._streams = {
stream.NAME: stream(hs) for stream in STREAMS_MAP.values()
} # type: Dict[str, Stream]
Expand All @@ -99,9 +96,13 @@ def __init__(self, hs):
# The factory used to create connections.
self._factory = None # type: Optional[ReconnectingClientFactory]

# The currently connected connections.
# The currently connected connections. (The list of places we need to send
# outgoing replication commands to.)
self._connections = [] # type: List[AbstractConnection]

# For each connection, the incoming streams that are coming from that connection
self._streams_by_connection = {} # type: Dict[AbstractConnection, Set[str]]

LaterGauge(
"synapse_replication_tcp_resource_total_connections",
"",
Expand Down Expand Up @@ -257,9 +258,11 @@ async def on_RDATA(self, conn: AbstractConnection, cmd: RdataCommand):
# 2. so we don't race with getting a POSITION command and fetching
# missing RDATA.
with await self._position_linearizer.queue(cmd.stream_name):
if stream_name not in self._streams_connected:
# If the stream isn't marked as connected then we haven't seen a
# `POSITION` command yet, and so we may have missed some rows.
# make sure that we've processed a POSITION for this stream *on this
# connection*. (A POSITION on another connection is no good, as there
# is no guarantee that we have seen all the intermediate updates.)
sbc = self._streams_by_connection.get(conn)
if not sbc or stream_name not in sbc:
# Let's drop the row for now, on the assumption we'll receive a
# `POSITION` soon and we'll catch up correctly then.
logger.debug(
Expand Down Expand Up @@ -302,21 +305,25 @@ async def on_POSITION(self, conn: AbstractConnection, cmd: PositionCommand):
# Ignore POSITION that are just our own echoes
return

stream = self._streams.get(cmd.stream_name)
logger.info("Handling '%s %s'", cmd.NAME, cmd.to_line())

stream_name = cmd.stream_name
stream = self._streams.get(stream_name)
if not stream:
logger.error("Got POSITION for unknown stream: %s", cmd.stream_name)
logger.error("Got POSITION for unknown stream: %s", stream_name)
return

# We protect catching up with a linearizer in case the replication
# connection reconnects under us.
with await self._position_linearizer.queue(cmd.stream_name):
with await self._position_linearizer.queue(stream_name):
# We're about to go and catch up with the stream, so remove from set
# of connected streams.
self._streams_connected.discard(cmd.stream_name)
for streams in self._streams_by_connection.values():
streams.discard(stream_name)

# We clear the pending batches for the stream as the fetching of the
# missing updates below will fetch all rows in the batch.
self._pending_batches.pop(cmd.stream_name, [])
self._pending_batches.pop(stream_name, [])

# Find where we previously streamed up to.
current_token = stream.current_token()
Expand All @@ -326,6 +333,12 @@ async def on_POSITION(self, conn: AbstractConnection, cmd: PositionCommand):
# between then and now.
missing_updates = cmd.token != current_token
while missing_updates:
logger.info(
"Fetching replication rows for '%s' between %i and %i",
stream_name,
current_token,
cmd.token,
)
(
updates,
current_token,
Expand All @@ -341,16 +354,18 @@ async def on_POSITION(self, conn: AbstractConnection, cmd: PositionCommand):

for token, rows in _batch_updates(updates):
await self.on_rdata(
cmd.stream_name,
stream_name,
cmd.instance_name,
token,
[stream.parse_row(row) for row in rows],
)

logger.info("Caught up with stream '%s' to %i", stream_name, cmd.token)

# We've now caught up to position sent to us, notify handler.
await self._replication_data_handler.on_position(cmd.stream_name, cmd.token)
await self._replication_data_handler.on_position(stream_name, cmd.token)

self._streams_connected.add(cmd.stream_name)
self._streams_by_connection.setdefault(conn, set()).add(stream_name)

async def on_REMOTE_SERVER_UP(
self, conn: AbstractConnection, cmd: RemoteServerUpCommand
Expand Down Expand Up @@ -408,6 +423,12 @@ def new_connection(self, connection: AbstractConnection):
def lost_connection(self, connection: AbstractConnection):
"""Called when a connection is closed/lost.
"""
# we no longer need _streams_by_connection for this connection.
streams = self._streams_by_connection.pop(connection, None)
if streams:
logger.info(
"Lost replication connection; streams now disconnected: %s", streams
)
try:
self._connections.remove(connection)
except ValueError:
Expand Down
Loading

0 comments on commit 62ee862

Please sign in to comment.