Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement asynchronous notify() method #397

Merged
merged 20 commits into from
Jul 1, 2021
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
c56bcf2
refactor: remove async ThreadPoolExecutor
YoRyan Jun 17, 2021
68b9826
refactor: break out notification sending logic
YoRyan Jun 17, 2021
0a908e5
chore: delete unused import
YoRyan Jun 17, 2021
c1326f4
refactor: eliminate redundant peekable object
YoRyan Jun 17, 2021
26a594a
fix: restore Python 2 compatibility
YoRyan Jun 17, 2021
caffdd5
chore: fix whitespace
YoRyan Jun 17, 2021
e8f859f
feat: implement new Apprise.async_notify() method
YoRyan Jun 18, 2021
20f4ae8
chore: use more specific import name for notify() to avoid clashes
YoRyan Jun 18, 2021
a507f5e
fix: fix incorrect keyword argument to asyncio.gather()
YoRyan Jun 18, 2021
abc7ffc
test: add async Apprise object tests
YoRyan Jun 18, 2021
5c6e44d
refactor: consolidate sync-to-async code
YoRyan Jun 19, 2021
ca9fff0
refactor: avoid blocking the event loop with non-async code
YoRyan Jun 19, 2021
f2c17c6
test: cover the case with .async_notify() with an invalid tag
YoRyan Jun 19, 2021
b7ffa1b
refactor: consolidate async notification dispatch code
YoRyan Jun 19, 2021
d05ed94
chore: reinstate note about noqa flag
YoRyan Jun 19, 2021
f4387d1
fix: send all synchronous notifications even if one of them has failed
YoRyan Jun 19, 2021
9375bed
refactor: handle fatal errors that result in only some notifications …
YoRyan Jun 22, 2021
4aaaf61
style: it's not necessary to wrap async functions in the py3 library
YoRyan Jun 22, 2021
4373877
refactor: eliminate the code paths for partial notification sending
YoRyan Jun 23, 2021
90d8d2a
chore: move asyncio debug flag to proper location
YoRyan Jun 23, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 128 additions & 64 deletions apprise/Apprise.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import six
from markdown import markdown
from itertools import chain
from more_itertools import peekable
from .common import NotifyType
from .common import NotifyFormat
from .common import MATCH_ALL_TAG
Expand Down Expand Up @@ -319,16 +320,122 @@ def notify(self, body, title='', notify_type=NotifyType.INFO,
such as turning a \n into an actual new line, etc.
"""

if len(self) == 0:
# Nothing to notify
if ASYNCIO_SUPPORT:
return py3compat.asyncio.tosync(
self.async_notify(
body, title,
notify_type=notify_type, body_format=body_format,
tag=tag, attach=attach,
interpret_escapes=interpret_escapes,
)
)

else:
try:
results = peekable(
YoRyan marked this conversation as resolved.
Show resolved Hide resolved
self._notifyall(
Apprise._notifyhandler,
body, title,
notify_type=notify_type, body_format=body_format,
tag=tag, attach=attach,
interpret_escapes=interpret_escapes,
)
)

assigned = results.peek(None) is not None
if not assigned:
return None

else:
# Return False if any notification fails.
return all(results)

except TypeError:
# These our our internally thrown notifications
return False

def async_notify(self, *args, **kwargs):
"""
Send a notification to all of the plugins previously loaded, for
asynchronous callers. This method is an async method that should be
awaited on, even if it is missing the async keyword in its signature.
(This is omitted to preserve syntax compatibility with Python 2.)

The arguments are identical to those of Apprise.notify(). This method
is not available in Python 2.
"""

try:
coroutines = list(
self._notifyall(
Apprise._notifyhandlerasync, *args, **kwargs))

assigned = len(coroutines) > 0
if not assigned:
return py3compat.asyncio.toasyncwrap(None)

else:
return py3compat.asyncio.notify(
coroutines, debug=self.debug)

except TypeError:
# These our our internally thrown notifications
return py3compat.asyncio.toasyncwrap(False)

@staticmethod
def _notifyhandler(server, **kwargs):
"""
The synchronous notification sender. Returns True if the notification
sent successfully.
"""

try:
# Send notification
return server.notify(**kwargs)

except TypeError:
# These our our internally thrown notifications
return False

# Initialize our return result which only turns to True if we send
# at least one valid notification
status = None
except Exception:
# A catch all so we don't have to abort early
# just because one of our plugins has a bug in it.
logger.exception("Notification Exception")
return False

@staticmethod
def _notifyhandlerasync(server, **kwargs):
"""
The asynchronous notification sender. Returns a coroutine that yields
True if the notification sent successfully.
"""

if server.asset.async_mode:
return server.async_notify(**kwargs)

else:
# Send the notification immediately, and wrap the result in a
# coroutine.
status = Apprise._notifyhandler(server, **kwargs)
return py3compat.asyncio.toasyncwrap(status)

def _notifyall(self, handler, body, title='', notify_type=NotifyType.INFO,
body_format=None, tag=MATCH_ALL_TAG, attach=None,
interpret_escapes=None):
"""
Creates notifications for all of the plugins loaded.

Returns a generator that calls handler for each notification. The first
and only argument supplied to handler is the server, and the keyword
arguments are exactly as they would be passed to server.notify().
"""

if len(self) == 0:
# Nothing to notify
raise TypeError

if not (title or body):
return False
caronc marked this conversation as resolved.
Show resolved Hide resolved
raise TypeError

if six.PY2:
# Python 2.7.x Unicode Character Handling
Expand All @@ -344,13 +451,8 @@ def notify(self, body, title='', notify_type=NotifyType.INFO,

# Prepare attachments if required
if attach is not None and not isinstance(attach, AppriseAttachment):
try:
attach = AppriseAttachment(
attach, asset=self.asset, location=self.location)

except TypeError:
# bad attachments
return False
attach = AppriseAttachment(
caronc marked this conversation as resolved.
Show resolved Hide resolved
attach, asset=self.asset, location=self.location)

# Allow Asset default value
body_format = self.asset.body_format \
Expand All @@ -360,17 +462,8 @@ def notify(self, body, title='', notify_type=NotifyType.INFO,
interpret_escapes = self.asset.interpret_escapes \
if interpret_escapes is None else interpret_escapes

# for asyncio support; we track a list of our servers to notify
coroutines = []

# Iterate over our loaded plugins
for server in self.find(tag):
if status is None:
# We have at least one server to notify; change status
# to be a default value of True from now (purely an
# initialiation at this point)
status = True

# If our code reaches here, we either did not define a tag (it
# was set to None), or we did define a tag and the logic above
# determined we need to notify the service it's associated with
Expand Down Expand Up @@ -443,7 +536,7 @@ def notify(self, body, title='', notify_type=NotifyType.INFO,
except AttributeError:
# Must be of string type
logger.error('Failed to escape message body')
return False
raise TypeError
caronc marked this conversation as resolved.
Show resolved Hide resolved

try:
# Added overhead required due to Python 3 Encoding Bug
Expand All @@ -460,48 +553,15 @@ def notify(self, body, title='', notify_type=NotifyType.INFO,
except AttributeError:
# Must be of string type
logger.error('Failed to escape message title')
return False

if ASYNCIO_SUPPORT and server.asset.async_mode:
# Build a list of servers requiring notification
# that will be triggered asynchronously afterwards
coroutines.append(server.async_notify(
body=conversion_map[server.notify_format],
title=title,
notify_type=notify_type,
attach=attach))

# We gather at this point and notify at the end
continue
raise TypeError

try:
# Send notification
if not server.notify(
body=conversion_map[server.notify_format],
title=title,
notify_type=notify_type,
attach=attach):

# Toggle our return status flag
status = False

except TypeError:
# These our our internally thrown notifications
status = False

except Exception:
# A catch all so we don't have to abort early
# just because one of our plugins has a bug in it.
logger.exception("Notification Exception")
status = False

if coroutines:
# perform our async notification(s)
if not py3compat.asyncio.notify(coroutines, debug=self.debug):
# Toggle our status only if we had a failure
status = False

return status
yield handler(
server,
body=conversion_map[server.notify_format],
title=title,
notify_type=notify_type,
attach=attach
)

def details(self, lang=None):
"""
Expand Down Expand Up @@ -658,3 +718,7 @@ def __len__(self):
"""
return sum([1 if not isinstance(s, (ConfigBase, AppriseConfig))
else len(s.servers()) for s in self.servers])


if six.PY2:
del Apprise.async_notify
84 changes: 38 additions & 46 deletions apprise/py3compat/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

import sys
import asyncio
from concurrent.futures import ThreadPoolExecutor
caronc marked this conversation as resolved.
Show resolved Hide resolved
from functools import partial
from ..URLBase import URLBase
from ..logger import logger
Expand All @@ -37,69 +36,63 @@
(sys.version_info.major == 3 and sys.version_info.minor >= 7)


def notify(coroutines, debug=False):
# async reference produces a SyntaxError (E999) in Python v2.7
# For this reason we turn on the noqa flag
async def notify(coroutines, debug=False): # noqa: E999
"""
A Wrapper to the AsyncNotifyBase.async_notify() calls allowing us
An async wrapper to the AsyncNotifyBase.async_notify() calls allowing us
to call gather() and collect the responses
"""

# Create log entry
logger.info(
'Notifying {} service(s) asynchronously.'.format(len(coroutines)))

results = await asyncio.gather(*coroutines, return_exceptions=True)

# Returns True if all notifications succeeded, otherwise False is
# returned.
failed = any(not status or isinstance(status, Exception)
for status in results)
return not failed


def tosync(cor, debug=False):
"""
Await a coroutine from non-async code.
"""

if ASYNCIO_RUN_SUPPORT:
# async reference produces a SyntaxError (E999) in Python v2.7
# For this reason we turn on the noqa flag
async def main(results, coroutines): # noqa: E999
"""
Task: Notify all servers specified and return our result set
through a mutable object.
"""
# send our notifications and store our result set into
# our results dictionary
results['response'] = \
await asyncio.gather(*coroutines, return_exceptions=True)

# Initialize a mutable object we can populate with our notification
# responses
results = {}

# Send our notifications
asyncio.run(main(results, coroutines), debug=debug)

# Acquire our return status
status = next((s for s in results['response'] if s is False), True)
return asyncio.run(cor, debug=debug)

else:
#
# The Deprecated Way (<= Python v3.6)
#

try:
# acquire access to our event loop
loop = asyncio.get_event_loop()

except RuntimeError:
# This happens if we're inside a thread of another application
# where there is no running event_loop(). Pythong v3.7 and higher
# automatically take care of this case for us. But for the lower
# versions we need to do the following:
# where there is no running event_loop(). Pythong v3.7 and
# higher automatically take care of this case for us. But for
# the lower versions we need to do the following:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

if debug:
# Enable debug mode
loop.set_debug(1)
# Enable debug mode
loop.set_debug(debug)

# Send our notifications and acquire our status
results = loop.run_until_complete(asyncio.gather(*coroutines))
return loop.run_until_complete(cor)

# Acquire our return status
status = next((r for r in results if r is False), True)

# Returns True if all notifications succeeded, otherwise False is
# returned.
return status
def toasyncwrap(v):
"""
Create a coroutine that, when run, returns the provided value.
"""

async def cor(): # noqa: E999
return v
return cor()


class AsyncNotifyBase(URLBase):
Expand All @@ -111,13 +104,12 @@ async def async_notify(self, *args, **kwargs): # noqa: E999
"""
Async Notification Wrapper
"""

loop = asyncio.get_event_loop()

try:
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as executor:
return await loop.run_in_executor(
executor,
partial(self.notify, *args, **kwargs),
)
return await loop.run_in_executor(
None, partial(self.notify, *args, **kwargs))

except TypeError:
# These our our internally thrown notifications
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ six
click >= 5.0
markdown
PyYAML
more_itertools
Loading