Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve timeouts handling in conversation handlers #2417

Merged
merged 19 commits into from
Apr 30, 2021
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 104 additions & 39 deletions telegram/ext/conversationhandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@

import logging
import warnings
import functools
import datetime
from threading import Lock
from typing import TYPE_CHECKING, Dict, List, NoReturn, Optional, Tuple, cast, ClassVar
from typing import TYPE_CHECKING, Dict, List, NoReturn, Optional, Union, Tuple, cast, ClassVar

from telegram import Update
from telegram.ext import (
Expand Down Expand Up @@ -143,6 +145,13 @@ class ConversationHandler(Handler[Update]):
received update and the corresponding ``context`` will be handled by ALL the handler's
who's :attr:`check_update` method returns :obj:`True` that are in the state
:attr:`ConversationHandler.TIMEOUT`.

Note:
Using `conversation_timeout` with nested conversations is currently not
supported. You can still try to use it, but it will likely behave differently
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
supported. You can still try to use it, but it will likely behave differently
supported. You can try to use it, but it will likely behave differently

from what you expect.


name (:obj:`str`, optional): The name for this conversationhandler. Required for
persistence.
persistent (:obj:`bool`, optional): If the conversations dict for this handler should be
Expand Down Expand Up @@ -215,7 +224,7 @@ def __init__(
per_chat: bool = True,
per_user: bool = True,
per_message: bool = False,
conversation_timeout: int = None,
conversation_timeout: Union[float, datetime.timedelta] = None,
name: str = None,
persistent: bool = False,
map_to_parent: Dict[object, object] = None,
Expand Down Expand Up @@ -291,6 +300,16 @@ def __init__(
)
break

if self.conversation_timeout:
for handler in all_handlers:
if isinstance(handler, self.__class__):
warnings.warn(
"Using `conversation_timeout` with nested conversations is currently not "
"supported. You can still try to use it, but it will likely behave "
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"supported. You can still try to use it, but it will likely behave "
"supported. You can try to use it, but it will likely behave "

"differently from what you expect."
)
break

if self.run_async:
for handler in all_handlers:
handler.run_async = True
Expand Down Expand Up @@ -352,7 +371,9 @@ def per_message(self, value: object) -> NoReturn:
raise ValueError('You can not assign a new value to per_message after initialization.')

@property
def conversation_timeout(self) -> Optional[int]:
def conversation_timeout(
self,
) -> Optional[Union[float, datetime.timedelta]]:
return self._conversation_timeout

@conversation_timeout.setter
Expand Down Expand Up @@ -423,6 +444,45 @@ def _get_key(self, update: Update) -> Tuple[int, ...]:

return tuple(key)

def _resolve_promise(self, state: Tuple) -> object:
Bibo-Joshi marked this conversation as resolved.
Show resolved Hide resolved
old_state, new_state = state
try:
res = new_state.result(0)
res = res if res is not None else old_state
except Exception as exc:
self.logger.exception("Promise function raised exception")
self.logger.exception("%s", exc)
res = old_state
finally:
if res is None and old_state is None:
res = self.END
return res

def _schedule_job(
self,
new_state: object,
dispatcher: 'Dispatcher',
update: Update,
context: Optional[CallbackContext],
conversation_key: Tuple[int, ...],
) -> None:
if new_state != self.END:
try:
# both job_queue & conversation_timeout are checked before calling _schedule_job
j_queue = dispatcher.job_queue
self.timeout_jobs[conversation_key] = j_queue.run_once( # type: ignore[union-attr]
self._trigger_timeout,
self.conversation_timeout, # type: ignore[arg-type]
context=_ConversationTimeoutContext(
conversation_key, update, dispatcher, context
),
)
except Exception as exc:
self.logger.exception(
"Failed to schedule timeout job due to the following exception:"
)
self.logger.exception("%s", exc)

def check_update(self, update: object) -> CheckUpdateType: # pylint: disable=R0911
"""
Determines whether an update should be handled by this conversationhandler, and if so in
Expand Down Expand Up @@ -455,21 +515,14 @@ def check_update(self, update: object) -> CheckUpdateType: # pylint: disable=R0
if isinstance(state, tuple) and len(state) == 2 and isinstance(state[1], Promise):
self.logger.debug('waiting for promise...')

old_state, new_state = state
if new_state.done.wait(0):
try:
res = new_state.result(0)
res = res if res is not None else old_state
except Exception as exc:
self.logger.exception("Promise function raised exception")
self.logger.exception("%s", exc)
res = old_state
finally:
if res is None and old_state is None:
res = self.END
self.update_state(res, key)
with self._conversations_lock:
state = self.conversations.get(key)
# check if promise is finished or not
Bibo-Joshi marked this conversation as resolved.
Show resolved Hide resolved
if state[1].done.wait(0):
res = self._resolve_promise(state)
self.update_state(res, key)
with self._conversations_lock:
state = self.conversations.get(key)

# if not then handle WAITING state instead
else:
hdlrs = self.states.get(self.WAITING, [])
for hdlr in hdlrs:
Expand Down Expand Up @@ -551,15 +604,27 @@ def handle_update( # type: ignore[override]
new_state = exception.state
raise_dp_handler_stop = True
with self._timeout_jobs_lock:
if self.conversation_timeout and new_state != self.END and dispatcher.job_queue:
# Add the new timeout job
self.timeout_jobs[conversation_key] = dispatcher.job_queue.run_once(
self._trigger_timeout, # type: ignore[arg-type]
self.conversation_timeout,
context=_ConversationTimeoutContext(
conversation_key, update, dispatcher, context
),
)
if self.conversation_timeout:
if dispatcher.job_queue is not None:
# Add the new timeout job
if isinstance(new_state, Promise):
new_state.add_done_callback(
functools.partial(
self._schedule_job,
dispatcher=dispatcher,
update=update,
context=context,
conversation_key=conversation_key,
)
)
else:
Bibo-Joshi marked this conversation as resolved.
Show resolved Hide resolved
self._schedule_job(
new_state, dispatcher, update, context, conversation_key
)
else:
self.logger.warning(
"Ignoring `conversation_timeout` because the Dispatcher has no JobQueue."
)

if isinstance(self.map_to_parent, dict) and new_state in self.map_to_parent:
self.update_state(self.END, conversation_key)
Expand Down Expand Up @@ -597,35 +662,35 @@ def update_state(self, new_state: object, key: Tuple[int, ...]) -> None:
if self.persistent and self.persistence and self.name:
self.persistence.update_conversation(self.name, key, new_state)

def _trigger_timeout(self, context: _ConversationTimeoutContext, job: 'Job' = None) -> None:
def _trigger_timeout(self, context: CallbackContext, job: 'Job' = None) -> None:
self.logger.debug('conversation timeout was triggered!')

# Backward compatibility with bots that do not use CallbackContext
callback_context = None
if isinstance(context, CallbackContext):
job = context.job
ctxt = cast(_ConversationTimeoutContext, job.context) # type: ignore[union-attr]
else:
ctxt = cast(_ConversationTimeoutContext, job.context)

context = job.context # type:ignore[union-attr,assignment]
callback_context = context.callback_context
callback_context = ctxt.callback_context

with self._timeout_jobs_lock:
found_job = self.timeout_jobs[context.conversation_key]
found_job = self.timeout_jobs[ctxt.conversation_key]
if found_job is not job:
# The timeout has been canceled in handle_update
# The timeout has been cancelled in handle_update
starry-shivam marked this conversation as resolved.
Show resolved Hide resolved
return
del self.timeout_jobs[context.conversation_key]
del self.timeout_jobs[ctxt.conversation_key]

handlers = self.states.get(self.TIMEOUT, [])
for handler in handlers:
check = handler.check_update(context.update)
check = handler.check_update(ctxt.update)
if check is not None and check is not False:
try:
handler.handle_update(
context.update, context.dispatcher, check, callback_context
)
handler.handle_update(ctxt.update, ctxt.dispatcher, check, callback_context)
except DispatcherHandlerStop:
self.logger.warning(
'DispatcherHandlerStop in TIMEOUT state of '
'ConversationHandler has no effect. Ignoring.'
)
self.update_state(self.END, context.conversation_key)

self.update_state(self.END, ctxt.conversation_key)
20 changes: 20 additions & 0 deletions telegram/ext/utils/promise.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def __init__(
self.update = update
self.error_handling = error_handling
self.done = Event()
self._done_callback: Optional[Callable] = None
self._result: Optional[RT] = None
self._exception: Optional[Exception] = None

Expand All @@ -83,6 +84,11 @@ def run(self) -> None:

finally:
self.done.set()
if self._done_callback:
starry-shivam marked this conversation as resolved.
Show resolved Hide resolved
try:
self._done_callback(self.result())
except Exception:
pass
Bibo-Joshi marked this conversation as resolved.
Show resolved Hide resolved

def __call__(self) -> None:
self.run()
Expand All @@ -106,6 +112,20 @@ def result(self, timeout: float = None) -> Optional[RT]:
raise self._exception # pylint: disable=raising-bad-type
return self._result

def add_done_callback(self, callback: Callable) -> None:
"""
Callback to be run when :class:`telegram.ext.utils.promise.Promise` becomes done.
starry-shivam marked this conversation as resolved.
Show resolved Hide resolved

Args:
callback (:obj:`callable`): The callable that will be called when promise is done.
callback will be called by passing ``Promise.result()`` as only positional argument.

"""
if self.done.wait(0):
callback(self.result())
else:
self._done_callback = callback

@property
def exception(self) -> Optional[Exception]:
"""The exception raised by :attr:`pooled_function` or ``None`` if no exception has been
Expand Down
Loading