-
-
Notifications
You must be signed in to change notification settings - Fork 41
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #173 from python-discord/file-scan-recursion-fix
Fix recursion error during file attachment parsing of deep nested paths
- Loading branch information
Showing
5 changed files
with
117 additions
and
41 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,37 +1,34 @@ | ||
"""Calling functions with time limits.""" | ||
import multiprocessing | ||
from collections.abc import Callable, Iterable, Mapping | ||
from typing import Any, TypeVar | ||
import signal | ||
from collections.abc import Generator | ||
from contextlib import contextmanager | ||
from typing import TypeVar | ||
|
||
_T = TypeVar("_T") | ||
_V = TypeVar("_V") | ||
|
||
__all__ = ("timed",) | ||
__all__ = ("time_limit",) | ||
|
||
|
||
def timed( | ||
func: Callable[[_T], _V], | ||
args: Iterable = (), | ||
kwds: Mapping[str, Any] | None = None, | ||
timeout: float | None = None, | ||
) -> _V: | ||
@contextmanager | ||
def time_limit(timeout: int | None = None) -> Generator[None, None, None]: | ||
""" | ||
Call a function with a time limit. | ||
Decorator to call a function with a time limit. | ||
Args: | ||
func: Function to call. | ||
args: Arguments for function. | ||
kwds: Keyword arguments for function. | ||
timeout: Timeout limit in seconds. | ||
Raises: | ||
TimeoutError: If the function call takes longer than `timeout` seconds. | ||
""" | ||
if kwds is None: | ||
kwds = {} | ||
with multiprocessing.Pool(1) as pool: | ||
result = pool.apply_async(func, args, kwds) | ||
try: | ||
return result.get(timeout) | ||
except multiprocessing.TimeoutError as e: | ||
raise TimeoutError(f"Call to {func.__name__} timed out after {timeout} seconds.") from e | ||
|
||
def signal_handler(_signum, _frame): | ||
raise TimeoutError(f"time_limit call timed out after {timeout} seconds.") | ||
|
||
signal.signal(signal.SIGALRM, signal_handler) | ||
signal.alarm(timeout) | ||
|
||
try: | ||
yield | ||
finally: | ||
signal.alarm(0) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
import math | ||
import time | ||
from unittest import TestCase | ||
|
||
from snekbox.utils.timed import time_limit | ||
|
||
|
||
class TimedTests(TestCase): | ||
def test_sleep(self): | ||
"""Test that a sleep can be interrupted.""" | ||
_finished = False | ||
start = time.perf_counter() | ||
with self.assertRaises(TimeoutError): | ||
with time_limit(1): | ||
time.sleep(2) | ||
_finished = True | ||
end = time.perf_counter() | ||
self.assertLess(end - start, 2) | ||
self.assertFalse(_finished) | ||
|
||
def test_iter(self): | ||
"""Test that a long-running built-in function can be interrupted.""" | ||
_result = 0 | ||
start = time.perf_counter() | ||
with self.assertRaises(TimeoutError): | ||
with time_limit(1): | ||
_result = math.factorial(2**30) | ||
end = time.perf_counter() | ||
self.assertEqual(_result, 0) | ||
self.assertLess(end - start, 2) |