Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Fix deleting pushers when using sharded pushers. (#9465)
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston authored Feb 22, 2021
1 parent 1b2d6d5 commit 66f4949
Show file tree
Hide file tree
Showing 10 changed files with 94 additions and 67 deletions.
1 change: 1 addition & 0 deletions changelog.d/9465.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix deleting pushers when using sharded pushers.
4 changes: 0 additions & 4 deletions docs/tcp_replication.md
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,6 @@ Asks the server for the current position of all streams.

Acknowledge receipt of some federation data

#### REMOVE_PUSHER (C)

Inform the server a pusher should be removed

### REMOTE_SERVER_UP (S, C)

Inform other processes that a remote server may have come back online.
Expand Down
3 changes: 0 additions & 3 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -645,9 +645,6 @@ def start_listening(self, listeners: Iterable[ListenerConfig]):

self.get_tcp_replication().start_replication(self)

async def remove_pusher(self, app_id, push_key, user_id):
self.get_tcp_replication().send_remove_pusher(app_id, push_key, user_id)

@cache_in_self
def get_replication_data_handler(self):
return GenericWorkerReplicationHandler(self)
Expand Down
3 changes: 2 additions & 1 deletion synapse/push/httppusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def __init__(self, hs: "HomeServer", pusher_config: PusherConfig):
self.timed_call = None
self._is_processing = False
self._group_unread_count_by_room = hs.config.push_group_unread_count_by_room
self._pusherpool = hs.get_pusherpool()

self.data = pusher_config.data
if self.data is None:
Expand Down Expand Up @@ -299,7 +300,7 @@ async def _process_one(self, push_action: dict) -> bool:
)
else:
logger.info("Pushkey %s was rejected: removing", pk)
await self.hs.remove_pusher(self.app_id, pk, self.user_id)
await self._pusherpool.remove_pusher(self.app_id, pk, self.user_id)
return True

async def _build_notification_dict(
Expand Down
23 changes: 17 additions & 6 deletions synapse/push/pusherpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
)
from synapse.push import Pusher, PusherConfig, PusherConfigException
from synapse.push.pusher import PusherFactory
from synapse.replication.http.push import ReplicationRemovePusherRestServlet
from synapse.types import JsonDict, RoomStreamToken
from synapse.util.async_helpers import concurrently_execute

Expand Down Expand Up @@ -68,6 +69,13 @@ def __init__(self, hs: "HomeServer"):
self._pusher_shard_config = hs.config.push.pusher_shard_config
self._instance_name = hs.get_instance_name()

# We can only delete pushers on master.
self._remove_pusher_client = None
if hs.config.worker.worker_app:
self._remove_pusher_client = ReplicationRemovePusherRestServlet.make_client(
hs
)

# Record the last stream ID that we were poked about so we can get
# changes since then. We set this to the current max stream ID on
# startup as every individual pusher will have checked for changes on
Expand Down Expand Up @@ -175,9 +183,6 @@ async def remove_pushers_by_access_token(
user_id: user to remove pushers for
access_tokens: access token *ids* to remove pushers for
"""
if not self._pusher_shard_config.should_handle(self._instance_name, user_id):
return

tokens = set(access_tokens)
for p in await self.store.get_pushers_by_user_id(user_id):
if p.access_token in tokens:
Expand Down Expand Up @@ -380,6 +385,12 @@ async def remove_pusher(self, app_id: str, pushkey: str, user_id: str) -> None:

synapse_pushers.labels(type(pusher).__name__, pusher.app_id).dec()

await self.store.delete_pusher_by_app_id_pushkey_user_id(
app_id, pushkey, user_id
)
# We can only delete pushers on master.
if self._remove_pusher_client:
await self._remove_pusher_client(
app_id=app_id, pushkey=pushkey, user_id=user_id
)
else:
await self.store.delete_pusher_by_app_id_pushkey_user_id(
app_id, pushkey, user_id
)
2 changes: 2 additions & 0 deletions synapse/replication/http/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
login,
membership,
presence,
push,
register,
send_event,
streams,
Expand All @@ -42,6 +43,7 @@ def register_servlets(self, hs):
membership.register_servlets(hs, self)
streams.register_servlets(hs, self)
account_data.register_servlets(hs, self)
push.register_servlets(hs, self)

# The following can't currently be instantiated on workers.
if hs.config.worker.worker_app is None:
Expand Down
72 changes: 72 additions & 0 deletions synapse/replication/http/push.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# -*- coding: utf-8 -*-
# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from typing import TYPE_CHECKING

from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint

if TYPE_CHECKING:
from synapse.server import HomeServer

logger = logging.getLogger(__name__)


class ReplicationRemovePusherRestServlet(ReplicationEndpoint):
"""Deletes the given pusher.
Request format:
POST /_synapse/replication/remove_pusher/:user_id
{
"app_id": "<some_id>",
"pushkey": "<some_key>"
}
"""

NAME = "add_user_account_data"
PATH_ARGS = ("user_id",)
CACHE = False

def __init__(self, hs: "HomeServer"):
super().__init__(hs)

self.pusher_pool = hs.get_pusherpool()

@staticmethod
async def _serialize_payload(app_id, pushkey, user_id):
payload = {
"app_id": app_id,
"pushkey": pushkey,
}

return payload

async def _handle_request(self, request, user_id):
content = parse_json_object_from_request(request)

app_id = content["app_id"]
pushkey = content["pushkey"]

await self.pusher_pool.remove_pusher(app_id, pushkey, user_id)

return 200, {}


def register_servlets(hs, http_server):
ReplicationRemovePusherRestServlet(hs).register(http_server)
27 changes: 0 additions & 27 deletions synapse/replication/tcp/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,31 +325,6 @@ def to_line(self):
return "%s %s" % (self.instance_name, self.token)


class RemovePusherCommand(Command):
"""Sent by the client to request the master remove the given pusher.
Format::
REMOVE_PUSHER <app_id> <push_key> <user_id>
"""

NAME = "REMOVE_PUSHER"

def __init__(self, app_id, push_key, user_id):
self.user_id = user_id
self.app_id = app_id
self.push_key = push_key

@classmethod
def from_line(cls, line):
app_id, push_key, user_id = line.split(" ", 2)

return cls(app_id, push_key, user_id)

def to_line(self):
return " ".join((self.app_id, self.push_key, self.user_id))


class UserIpCommand(Command):
"""Sent periodically when a worker sees activity from a client.
Expand Down Expand Up @@ -416,7 +391,6 @@ class RemoteServerUpCommand(_SimpleCommand):
ReplicateCommand,
UserSyncCommand,
FederationAckCommand,
RemovePusherCommand,
UserIpCommand,
RemoteServerUpCommand,
ClearUserSyncsCommand,
Expand All @@ -443,7 +417,6 @@ class RemoteServerUpCommand(_SimpleCommand):
UserSyncCommand.NAME,
ClearUserSyncsCommand.NAME,
FederationAckCommand.NAME,
RemovePusherCommand.NAME,
UserIpCommand.NAME,
ErrorCommand.NAME,
RemoteServerUpCommand.NAME,
Expand Down
23 changes: 0 additions & 23 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
PositionCommand,
RdataCommand,
RemoteServerUpCommand,
RemovePusherCommand,
ReplicateCommand,
UserIpCommand,
UserSyncCommand,
Expand Down Expand Up @@ -373,23 +372,6 @@ def on_FEDERATION_ACK(self, conn: AbstractConnection, cmd: FederationAckCommand)
if self._federation_sender:
self._federation_sender.federation_ack(cmd.instance_name, cmd.token)

def on_REMOVE_PUSHER(
self, conn: AbstractConnection, cmd: RemovePusherCommand
) -> Optional[Awaitable[None]]:
remove_pusher_counter.inc()

if self._is_master:
return self._handle_remove_pusher(cmd)
else:
return None

async def _handle_remove_pusher(self, cmd: RemovePusherCommand):
await self._store.delete_pusher_by_app_id_pushkey_user_id(
app_id=cmd.app_id, pushkey=cmd.push_key, user_id=cmd.user_id
)

self._notifier.on_new_replication_data()

def on_USER_IP(
self, conn: AbstractConnection, cmd: UserIpCommand
) -> Optional[Awaitable[None]]:
Expand Down Expand Up @@ -684,11 +666,6 @@ def send_user_sync(
UserSyncCommand(instance_id, user_id, is_syncing, last_sync_ms)
)

def send_remove_pusher(self, app_id: str, push_key: str, user_id: str):
"""Poke the master to remove a pusher for a user"""
cmd = RemovePusherCommand(app_id, push_key, user_id)
self.send_command(cmd)

def send_user_ip(
self,
user_id: str,
Expand Down
3 changes: 0 additions & 3 deletions synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -758,9 +758,6 @@ def get_outbound_redis_connection(self) -> Optional["RedisProtocol"]:
reconnect=True,
)

async def remove_pusher(self, app_id: str, push_key: str, user_id: str):
return await self.get_pusherpool().remove_pusher(app_id, push_key, user_id)

def should_send_federation(self) -> bool:
"Should this server be sending federation traffic directly?"
return self.config.send_federation and (
Expand Down

0 comments on commit 66f4949

Please sign in to comment.