Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add a module API to send an HTTP push notification #15387

Merged
merged 21 commits into from
Apr 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/15387.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a module API to send an HTTP push notification.
46 changes: 46 additions & 0 deletions synapse/module_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
USER_MAY_SEND_3PID_INVITE_CALLBACK,
SpamCheckerModuleApiCallbacks,
)
from synapse.push.httppusher import HttpPusher
from synapse.rest.client.login import LoginResponse
from synapse.storage import DataStore
from synapse.storage.background_updates import (
Expand Down Expand Up @@ -248,6 +249,7 @@ def __init__(self, hs: "HomeServer", auth_handler: AuthHandler) -> None:
self._registration_handler = hs.get_registration_handler()
self._send_email_handler = hs.get_send_email_handler()
self._push_rules_handler = hs.get_push_rules_handler()
self._pusherpool = hs.get_pusherpool()
self._device_handler = hs.get_device_handler()
self.custom_template_dir = hs.config.server.custom_template_directory
self._callbacks = hs.get_module_api_callbacks()
Expand Down Expand Up @@ -1225,6 +1227,50 @@ async def sleep(self, seconds: float) -> None:

await self._clock.sleep(seconds)

async def send_http_push_notification(
self,
user_id: str,
device_id: Optional[str],
content: JsonDict,
tweaks: Optional[JsonMapping] = None,
default_payload: Optional[JsonMapping] = None,
) -> Dict[str, bool]:
"""Send an HTTP push notification that is forwarded to the registered push gateway
for the specified user/device.

Added in Synapse v1.82.0.

Args:
user_id: The user ID to send the push notification to.
device_id: The device ID of the device where to send the push notification. If `None`,
the notification will be sent to all registered HTTP pushers of the user.
content: A dict of values that will be put in the `notification` field of the push
(cf Push Gateway spec). `devices` field will be overrided if included.
tweaks: A dict of `tweaks` that will be inserted in the `devices` section, cf spec.
default_payload: default payload to add in `devices[0].data.default_payload`.
This will be merged (and override if some matching values already exist there)
with existing `default_payload`.

Returns:
a dict reprensenting the status of the push per device ID
"""
status = {}
if user_id in self._pusherpool.pushers:
for p in self._pusherpool.pushers[user_id].values():
if isinstance(p, HttpPusher) and (
not device_id or p.device_id == device_id
):
res = await p.dispatch_push(content, tweaks, default_payload)
# Check if the push was successful and no pushers were rejected.
sent = res is not False and not res

# This is mainly to accomodate mypy
# device_id should never be empty after the `set_device_id_for_pushers`
# background job has been properly run.
if p.device_id:
status[p.device_id] = sent
return status

async def send_mail(
self,
recipient: str,
Expand Down
190 changes: 113 additions & 77 deletions synapse/push/httppusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.
import logging
import urllib.parse
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Union
from typing import TYPE_CHECKING, Dict, List, Optional, Union

from prometheus_client import Counter

Expand All @@ -27,6 +27,7 @@
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push import Pusher, PusherConfig, PusherConfigException
from synapse.storage.databases.main.event_push_actions import HttpPushAction
from synapse.types import JsonDict, JsonMapping

from . import push_tools

Expand Down Expand Up @@ -56,7 +57,7 @@
)


def tweaks_for_actions(actions: List[Union[str, Dict]]) -> Dict[str, Any]:
def tweaks_for_actions(actions: List[Union[str, Dict]]) -> JsonMapping:
"""
Converts a list of actions into a `tweaks` dict (which can then be passed to
the push gateway).
Expand Down Expand Up @@ -101,6 +102,7 @@ def __init__(self, hs: "HomeServer", pusher_config: PusherConfig):
self._storage_controllers = self.hs.get_storage_controllers()
self.app_display_name = pusher_config.app_display_name
self.device_display_name = pusher_config.device_display_name
self.device_id = pusher_config.device_id
MatMaul marked this conversation as resolved.
Show resolved Hide resolved
self.pushkey_ts = pusher_config.ts
self.data = pusher_config.data
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
Expand Down Expand Up @@ -324,7 +326,7 @@ async def _process_one(self, push_action: HttpPushAction) -> bool:
event = await self.store.get_event(push_action.event_id, allow_none=True)
if event is None:
return True # It's been redacted
rejected = await self.dispatch_push(event, tweaks, badge)
rejected = await self.dispatch_push_event(event, tweaks, badge)
if rejected is False:
return False

Expand All @@ -342,9 +344,83 @@ async def _process_one(self, push_action: HttpPushAction) -> bool:
await self._pusherpool.remove_pusher(self.app_id, pk, self.user_id)
return True

async def _build_notification_dict(
self, event: EventBase, tweaks: Dict[str, bool], badge: int
) -> Dict[str, Any]:
async def dispatch_push(
self,
content: JsonDict,
tweaks: Optional[JsonMapping] = None,
default_payload: Optional[JsonMapping] = None,
) -> Union[bool, List[str]]:
"""Send a notification to the registered push gateway, with `content` being
the content of the `notification` top property specified in the spec.
Note that the `devices` property will be added with device-specific
information for this pusher.

Args:
content: the content
tweaks: tweaks to add into the `devices` section
default_payload: default payload to add in `devices[0].data.default_payload`.
This will be merged (and override if some matching values already exist there)
with existing `default_payload`.

Returns:
False if an error occured when calling the push gateway, or an array of
rejected push keys otherwise. If this array is empty, the push fully
succeeded.
"""
content = content.copy()

data = self.data_minus_url.copy()
if default_payload:
data.setdefault("default_payload", {}).update(default_payload)

device = {
"app_id": self.app_id,
"pushkey": self.pushkey,
"pushkey_ts": int(self.pushkey_ts / 1000),
"data": data,
}
if tweaks:
device["tweaks"] = tweaks
MatMaul marked this conversation as resolved.
Show resolved Hide resolved

content["devices"] = [device]
MatMaul marked this conversation as resolved.
Show resolved Hide resolved

try:
resp = await self.http_client.post_json_get_json(
self.url, {"notification": content}
)
except Exception as e:
logger.warning(
"Failed to push data to %s: %s %s",
self.name,
type(e),
e,
)
return False
rejected = []
if "rejected" in resp:
rejected = resp["rejected"]
return rejected

async def dispatch_push_event(
self,
event: EventBase,
tweaks: JsonMapping,
badge: int,
) -> Union[bool, List[str]]:
"""Send a notification to the registered push gateway by building it
from an event.

Args:
event: the event
tweaks: tweaks to add into the `devices` section, used to decide the
push priority
badge: unread count to send with the push notification

Returns:
False if an error occured when calling the push gateway, or an array of
rejected push keys otherwise. If this array is empty, the push fully
succeeded.
"""
priority = "low"
if (
event.type == EventTypes.Encrypted
Expand All @@ -358,30 +434,20 @@ async def _build_notification_dict(
# This was checked in the __init__, but mypy doesn't seem to know that.
assert self.data is not None
if self.data.get("format") == "event_id_only":
d: Dict[str, Any] = {
"notification": {
"event_id": event.event_id,
"room_id": event.room_id,
"counts": {"unread": badge},
"prio": priority,
"devices": [
{
"app_id": self.app_id,
"pushkey": self.pushkey,
"pushkey_ts": int(self.pushkey_ts / 1000),
"data": self.data_minus_url,
}
],
}
content: JsonDict = {
"event_id": event.event_id,
"room_id": event.room_id,
"counts": {"unread": badge},
"prio": priority,
}
return d

ctx = await push_tools.get_context_for_event(
self._storage_controllers, event, self.user_id
)
# event_id_only doesn't include the tweaks, so override them.
tweaks = {}
MatMaul marked this conversation as resolved.
Show resolved Hide resolved
else:
ctx = await push_tools.get_context_for_event(
self._storage_controllers, event, self.user_id
)

d = {
"notification": {
content = {
"id": event.event_id, # deprecated: remove soon
"event_id": event.event_id,
"room_id": event.room_id,
Expand All @@ -392,57 +458,27 @@ async def _build_notification_dict(
"unread": badge,
# 'missed_calls': 2
},
"devices": [
{
"app_id": self.app_id,
"pushkey": self.pushkey,
"pushkey_ts": int(self.pushkey_ts / 1000),
"data": self.data_minus_url,
"tweaks": tweaks,
}
],
}
}
if event.type == "m.room.member" and event.is_state():
d["notification"]["membership"] = event.content["membership"]
d["notification"]["user_is_target"] = event.state_key == self.user_id
if self.hs.config.push.push_include_content and event.content:
d["notification"]["content"] = event.content

# We no longer send aliases separately, instead, we send the human
# readable name of the room, which may be an alias.
if "sender_display_name" in ctx and len(ctx["sender_display_name"]) > 0:
d["notification"]["sender_display_name"] = ctx["sender_display_name"]
if "name" in ctx and len(ctx["name"]) > 0:
d["notification"]["room_name"] = ctx["name"]

return d

async def dispatch_push(
self, event: EventBase, tweaks: Dict[str, bool], badge: int
) -> Union[bool, Iterable[str]]:
notification_dict = await self._build_notification_dict(event, tweaks, badge)
if not notification_dict:
return []
try:
resp = await self.http_client.post_json_get_json(
self.url, notification_dict
)
except Exception as e:
logger.warning(
"Failed to push event %s to %s: %s %s",
event.event_id,
self.name,
type(e),
e,
)
return False
rejected = []
if "rejected" in resp:
rejected = resp["rejected"]
if not rejected:
if event.type == "m.room.member" and event.is_state():
content["membership"] = event.content["membership"]
content["user_is_target"] = event.state_key == self.user_id
if self.hs.config.push.push_include_content and event.content:
content["content"] = event.content

# We no longer send aliases separately, instead, we send the human
# readable name of the room, which may be an alias.
if "sender_display_name" in ctx and len(ctx["sender_display_name"]) > 0:
content["sender_display_name"] = ctx["sender_display_name"]
if "name" in ctx and len(ctx["name"]) > 0:
content["room_name"] = ctx["name"]

res = await self.dispatch_push(content, tweaks)

# If the push is successful and none are rejected, update the badge count.
if res is not False and not res:
self.badge_count_last_call = badge
return rejected

return res

async def _send_badge(self, badge: int) -> None:
"""
Expand Down