Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Allow moving account data and receipts streams off master #9104

Merged
merged 14 commits into from
Jan 18, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 144 additions & 0 deletions synapse/handlers/account_data.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -12,14 +13,157 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random
from typing import TYPE_CHECKING, List, Tuple

from synapse.replication.http.account_data import (
ReplicationAddTagRestServlet,
ReplicationRemoveTagRestServlet,
ReplicationRoomAccountDataRestServlet,
ReplicationUserAccountDataRestServlet,
)
from synapse.types import JsonDict, UserID

if TYPE_CHECKING:
from synapse.app.homeserver import HomeServer


class AccountDataHandler:
def __init__(self, hs: "HomeServer"):
self._store = hs.get_datastore()
self._instance_name = hs.get_instance_name()
self._notifier = hs.get_notifier()

self._user_data_client = ReplicationUserAccountDataRestServlet.make_client(hs)
self._room_data_client = ReplicationRoomAccountDataRestServlet.make_client(hs)
self._add_tag_client = ReplicationAddTagRestServlet.make_client(hs)
self._remove_tag_client = ReplicationRemoveTagRestServlet.make_client(hs)
self._account_data_writers = hs.config.worker.writers.account_data

async def add_account_data_to_room(
self, user_id: str, room_id: str, account_data_type: str, content: JsonDict
) -> int:
"""Add some account_data to a room for a user.

Args:
user_id: The user to add a tag for.
room_id: The room to add a tag for.
account_data_type: The type of account_data to add.
content: A json object to associate with the tag.

Returns:
The maximum stream ID.
"""
if self._instance_name in self._account_data_writers:
max_stream_id = await self._store.add_account_data_to_room(
user_id, room_id, account_data_type, content
)

self._notifier.on_new_event(
"account_data_key", max_stream_id, users=[user_id]
)

return max_stream_id
else:
response = await self._room_data_client(
instance_name=random.choice(self._account_data_writers),
user_id=user_id,
room_id=room_id,
account_data_type=account_data_type,
content=content,
)
return response["max_stream_id"]

async def add_account_data_for_user(
self, user_id: str, account_data_type: str, content: JsonDict
) -> int:
"""Add some account_data to a room for a user.

Args:
user_id: The user to add a tag for.
account_data_type: The type of account_data to add.
content: A json object to associate with the tag.

Returns:
The maximum stream ID.
"""

if self._instance_name in self._account_data_writers:
max_stream_id = await self._store.add_account_data_for_user(
user_id, account_data_type, content
)

self._notifier.on_new_event(
"account_data_key", max_stream_id, users=[user_id]
)
return max_stream_id
else:
response = await self._user_data_client(
instance_name=random.choice(self._account_data_writers),
user_id=user_id,
account_data_type=account_data_type,
content=content,
)
return response["max_stream_id"]

async def add_tag_to_room(
self, user_id: str, room_id: str, tag: str, content: JsonDict
) -> int:
"""Add a tag to a room for a user.

Args:
user_id: The user to add a tag for.
room_id: The room to add a tag for.
tag: The tag name to add.
content: A json object to associate with the tag.

Returns:
The next account data ID.
"""
if self._instance_name in self._account_data_writers:
max_stream_id = await self._store.add_tag_to_room(
user_id, room_id, tag, content
)

self._notifier.on_new_event(
"account_data_key", max_stream_id, users=[user_id]
)
return max_stream_id
else:
response = await self._add_tag_client(
instance_name=random.choice(self._account_data_writers),
user_id=user_id,
room_id=room_id,
tag=tag,
content=content,
)
return response["max_stream_id"]

async def remove_tag_from_room(self, user_id: str, room_id: str, tag: str) -> int:
"""Remove a tag from a room for a user.

Returns:
The next account data ID.
"""
if self._instance_name in self._account_data_writers:
max_stream_id = await self._store.remove_tag_from_room(
user_id, room_id, tag
)

self._notifier.on_new_event(
"account_data_key", max_stream_id, users=[user_id]
)
return max_stream_id
else:
response = await self._remove_tag_client(
instance_name=random.choice(self._account_data_writers),
user_id=user_id,
room_id=room_id,
tag=tag,
)
return response["max_stream_id"]


class AccountDataEventSource:
def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastore()
Expand Down
7 changes: 5 additions & 2 deletions synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def __init__(self, hs: "HomeServer"):
self.registration_handler = hs.get_registration_handler()
self.profile_handler = hs.get_profile_handler()
self.event_creation_handler = hs.get_event_creation_handler()
self.account_data_handler = hs.get_account_data_handler()

self.member_linearizer = Linearizer(name="member")

Expand Down Expand Up @@ -253,7 +254,7 @@ async def copy_room_tags_and_direct_to_room(
direct_rooms[key].append(new_room_id)

# Save back to user's m.direct account data
await self.store.add_account_data_for_user(
await self.account_data_handler.add_account_data_for_user(
user_id, AccountDataTypes.DIRECT, direct_rooms
)
break
Expand All @@ -263,7 +264,9 @@ async def copy_room_tags_and_direct_to_room(

# Copy each room tag to the new room
for tag, tag_content in room_tags.items():
await self.store.add_tag_to_room(user_id, new_room_id, tag, tag_content)
await self.account_data_handler.add_tag_to_room(
user_id, new_room_id, tag, tag_content
)

async def update_membership(
self,
Expand Down
2 changes: 2 additions & 0 deletions synapse/replication/http/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from synapse.http.server import JsonResource
from synapse.replication.http import (
account_data,
devices,
federation,
login,
Expand All @@ -40,6 +41,7 @@ def register_servlets(self, hs):
presence.register_servlets(hs, self)
membership.register_servlets(hs, self)
streams.register_servlets(hs, self)
account_data.register_servlets(hs, self)

# The following can't currently be instantiated on workers.
if hs.config.worker.worker_app is None:
Expand Down
189 changes: 189 additions & 0 deletions synapse/replication/http/account_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
# -*- coding: utf-8 -*-
# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint

logger = logging.getLogger(__name__)


class ReplicationUserAccountDataRestServlet(ReplicationEndpoint):
"""Add user account data on the appropriate account data worker.

Request format:

POST /_synapse/replication/add_user_account_data/:user_id/:type

{
"account_data_type": "m.direct",
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
"content": { ... },
}

"""

NAME = "add_user_account_data"
PATH_ARGS = ("user_id", "account_data_type")
CACHE = False

def __init__(self, hs):
super().__init__(hs)

self.handler = hs.get_account_data_handler()
self.clock = hs.get_clock()

@staticmethod
async def _serialize_payload(user_id, account_data_type, content):
payload = {
"content": content,
}

return payload

async def _handle_request(self, request, user_id, account_data_type):
content = parse_json_object_from_request(request)

max_stream_id = await self.handler.add_account_data_for_user(
user_id, account_data_type, content["content"]
)

return 200, {"max_stream_id": max_stream_id}


class ReplicationRoomAccountDataRestServlet(ReplicationEndpoint):
"""Add room account data on the appropriate account data worker.

Request format:

POST /_synapse/replication/add_room_account_data/:user_id/:room_id/:account_data_type

{
"account_data_type": "m.read_marker",
"content": { ... },
}

"""

NAME = "add_room_account_data"
PATH_ARGS = ("user_id", "room_id", "account_data_type")
CACHE = False

def __init__(self, hs):
super().__init__(hs)

self.handler = hs.get_account_data_handler()
self.clock = hs.get_clock()

@staticmethod
async def _serialize_payload(user_id, room_id, account_data_type, content):
payload = {
"content": content,
}

return payload

async def _handle_request(self, request, user_id, room_id, account_data_type):
content = parse_json_object_from_request(request)

max_stream_id = await self.handler.add_account_data_to_room(
user_id, room_id, account_data_type, content["content"]
)

return 200, {"max_stream_id": max_stream_id}


class ReplicationAddTagRestServlet(ReplicationEndpoint):
"""Add tag on the appropriate account data worker.

Request format:

POST /_synapse/replication/add_tag/:user_id/:room_id/:tag

{
"content": { ... },
}

"""

NAME = "add_tag"
PATH_ARGS = ("user_id", "room_id", "tag")
CACHE = False

def __init__(self, hs):
super().__init__(hs)

self.handler = hs.get_account_data_handler()
self.clock = hs.get_clock()

@staticmethod
async def _serialize_payload(user_id, room_id, tag, content):
payload = {
"content": content,
}

return payload

async def _handle_request(self, request, user_id, room_id, tag):
content = parse_json_object_from_request(request)

max_stream_id = await self.handler.add_tag_to_room(
user_id, room_id, tag, content["content"]
)

return 200, {"max_stream_id": max_stream_id}


class ReplicationRemoveTagRestServlet(ReplicationEndpoint):
"""Remove tag on the appropriate account data worker.

Request format:

POST /_synapse/replication/remove_tag/:user_id/:room_id/:tag

{}

"""

NAME = "remove_tag"
PATH_ARGS = (
"user_id",
"room_id",
"tag",
)
CACHE = False

def __init__(self, hs):
super().__init__(hs)

self.handler = hs.get_account_data_handler()
self.clock = hs.get_clock()

@staticmethod
async def _serialize_payload(user_id, room_id, tag):

return {}

async def _handle_request(self, request, user_id, room_id, tag):
max_stream_id = await self.handler.remove_tag_from_room(user_id, room_id, tag,)

return 200, {"max_stream_id": max_stream_id}


def register_servlets(hs, http_server):
ReplicationUserAccountDataRestServlet(hs).register(http_server)
ReplicationRoomAccountDataRestServlet(hs).register(http_server)
ReplicationAddTagRestServlet(hs).register(http_server)
ReplicationRemoveTagRestServlet(hs).register(http_server)
Loading