-
Notifications
You must be signed in to change notification settings - Fork 137
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enabled Event and CapacityLimiter to be instantiated outside an event loop #651
Changes from 1 commit
7c5bf4f
9f9c811
0dae529
3db1677
08c1d7d
7db73e7
d50165a
f214590
d8ba6de
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -1,5 +1,6 @@ | ||||||||||||
from __future__ import annotations | ||||||||||||
|
||||||||||||
import math | ||||||||||||
from collections import deque | ||||||||||||
from dataclasses import dataclass | ||||||||||||
from types import TracebackType | ||||||||||||
|
@@ -76,7 +77,7 @@ class SemaphoreStatistics: | |||||||||||
|
||||||||||||
class Event: | ||||||||||||
def __new__(cls) -> Event: | ||||||||||||
return get_async_backend().create_event() | ||||||||||||
return EventAdapter() | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in the async methods of high-frequency primitives that construct There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With my latest change, they won't be getting the adapters, will they? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yep, implementing #651 (comment) addressed this comment too. (technically |
||||||||||||
|
||||||||||||
def set(self) -> None: | ||||||||||||
"""Set the flag, notifying all listeners.""" | ||||||||||||
|
@@ -101,6 +102,32 @@ def statistics(self) -> EventStatistics: | |||||||||||
raise NotImplementedError | ||||||||||||
|
||||||||||||
|
||||||||||||
class EventAdapter(Event): | ||||||||||||
_internal_event: Event | None = None | ||||||||||||
|
||||||||||||
def __new__(cls) -> EventAdapter: | ||||||||||||
return object.__new__(cls) | ||||||||||||
|
||||||||||||
@property | ||||||||||||
def _event(self) -> Event: | ||||||||||||
if self._internal_event is None: | ||||||||||||
self._internal_event = get_async_backend().create_event() | ||||||||||||
|
||||||||||||
return self._internal_event | ||||||||||||
|
||||||||||||
def set(self) -> None: | ||||||||||||
self._event.set() | ||||||||||||
|
||||||||||||
def is_set(self) -> bool: | ||||||||||||
return self._internal_event is not None and self._internal_event.is_set() | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I understand that if the event is not created then it cannot be set, but should
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. perhaps
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added. |
||||||||||||
|
||||||||||||
async def wait(self) -> None: | ||||||||||||
await self._event.wait() | ||||||||||||
|
||||||||||||
def statistics(self) -> EventStatistics: | ||||||||||||
return self._event.statistics() | ||||||||||||
|
||||||||||||
|
||||||||||||
class Lock: | ||||||||||||
_owner_task: TaskInfo | None = None | ||||||||||||
|
||||||||||||
|
@@ -372,8 +399,8 @@ def statistics(self) -> SemaphoreStatistics: | |||||||||||
|
||||||||||||
|
||||||||||||
class CapacityLimiter: | ||||||||||||
def __new__(cls, total_tokens: float) -> CapacityLimiter: | ||||||||||||
return get_async_backend().create_capacity_limiter(total_tokens) | ||||||||||||
def __new__(cls, total_tokens: int) -> CapacityLimiter: | ||||||||||||
return CapacityLimiterAdapter(total_tokens) | ||||||||||||
|
||||||||||||
async def __aenter__(self) -> None: | ||||||||||||
raise NotImplementedError | ||||||||||||
|
@@ -482,6 +509,78 @@ def statistics(self) -> CapacityLimiterStatistics: | |||||||||||
raise NotImplementedError | ||||||||||||
|
||||||||||||
|
||||||||||||
class CapacityLimiterAdapter(CapacityLimiter): | ||||||||||||
_internal_limiter: CapacityLimiter | None = None | ||||||||||||
|
||||||||||||
def __new__(cls, total_tokens: int) -> CapacityLimiterAdapter: | ||||||||||||
return object.__new__(cls) | ||||||||||||
|
||||||||||||
def __init__(self, total_tokens: int) -> None: | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line 519 checks if
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In that case, the annotation should be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, on the other hand it shouldn't be any float. Only Shouldn't There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, because There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Neither is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ugh. They should just have accepted arbitrary singletons. (I do wonder whether any type checkers do …) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it should be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, it's not. Floats are not valid literals according to PEP 586. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It couldn't been done, but I was following Trio's API design which uses |
||||||||||||
if not isinstance(total_tokens, int) and total_tokens is not math.inf: | ||||||||||||
raise TypeError("total_tokens must be an int or math.inf") | ||||||||||||
elif total_tokens < 1: | ||||||||||||
raise ValueError("total_tokens must be >= 1") | ||||||||||||
|
||||||||||||
self._total_tokens = total_tokens | ||||||||||||
|
||||||||||||
@property | ||||||||||||
def _limiter(self) -> CapacityLimiter: | ||||||||||||
if self._internal_limiter is None: | ||||||||||||
self._internal_limiter = get_async_backend().create_capacity_limiter( | ||||||||||||
self._total_tokens | ||||||||||||
) | ||||||||||||
|
||||||||||||
return self._internal_limiter | ||||||||||||
|
||||||||||||
async def __aenter__(self) -> None: | ||||||||||||
await self._limiter.__aenter__() | ||||||||||||
|
||||||||||||
async def __aexit__( | ||||||||||||
self, | ||||||||||||
exc_type: type[BaseException] | None, | ||||||||||||
exc_val: BaseException | None, | ||||||||||||
exc_tb: TracebackType | None, | ||||||||||||
) -> bool | None: | ||||||||||||
return await self._limiter.__aexit__(exc_type, exc_val, exc_tb) | ||||||||||||
|
||||||||||||
@property | ||||||||||||
def total_tokens(self) -> float: | ||||||||||||
return self._limiter.total_tokens | ||||||||||||
|
||||||||||||
@total_tokens.setter | ||||||||||||
def total_tokens(self, value: float) -> None: | ||||||||||||
self._limiter.total_tokens = value | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (obscure/minor comment) could this be delayed too?
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. mhm, i suppose that if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Frankly, the idea that someone would change the adapter's total tokens from a worker thread had not even occurred to me. The whole idea seems pretty bizarre to me. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah, i think that it's reasonable to require them to do the mutation in the event loop thread. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll revert my latest commit then? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The more I think about it, the more I'm leaning towards allowing users to mutate There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. huh, does Trio document that? i'm peeking at the Trio code and
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
i'm leaning this way too. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, generally the API should be considered thread-unsafe, save for that parts that were explicitly designed to allow access from worker threads (the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oops—i misinterpreted
as
and was replying to something you didn't say. :p |
||||||||||||
|
||||||||||||
@property | ||||||||||||
def borrowed_tokens(self) -> int: | ||||||||||||
return self._limiter.borrowed_tokens | ||||||||||||
|
||||||||||||
@property | ||||||||||||
def available_tokens(self) -> float: | ||||||||||||
return self._limiter.available_tokens | ||||||||||||
|
||||||||||||
def acquire_nowait(self) -> None: | ||||||||||||
self._limiter.acquire_nowait() | ||||||||||||
|
||||||||||||
def acquire_on_behalf_of_nowait(self, borrower: object) -> None: | ||||||||||||
self._limiter.acquire_on_behalf_of_nowait(borrower) | ||||||||||||
|
||||||||||||
async def acquire(self) -> None: | ||||||||||||
await self._limiter.acquire() | ||||||||||||
|
||||||||||||
async def acquire_on_behalf_of(self, borrower: object) -> None: | ||||||||||||
await self._limiter.acquire_on_behalf_of(borrower) | ||||||||||||
|
||||||||||||
def release(self) -> None: | ||||||||||||
self._limiter.release() | ||||||||||||
|
||||||||||||
def release_on_behalf_of(self, borrower: object) -> None: | ||||||||||||
self._limiter.release_on_behalf_of(borrower) | ||||||||||||
|
||||||||||||
def statistics(self) -> CapacityLimiterStatistics: | ||||||||||||
return self._limiter.statistics() | ||||||||||||
|
||||||||||||
|
||||||||||||
class ResourceGuard: | ||||||||||||
""" | ||||||||||||
A context manager for ensuring that a resource is only used by a single task at a | ||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
from __future__ import annotations | ||
|
||
import asyncio | ||
from typing import Any | ||
|
||
import pytest | ||
|
||
|
@@ -13,6 +14,7 @@ | |
WouldBlock, | ||
create_task_group, | ||
fail_after, | ||
run, | ||
to_thread, | ||
wait_all_tasks_blocked, | ||
) | ||
|
@@ -208,6 +210,18 @@ async def waiter() -> None: | |
|
||
assert event.statistics().tasks_waiting == 0 | ||
|
||
def test_instantiate_outside_event_loop( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. to prevent the other synchronization primitives from regressing on this issue, it could make sense to add an analogous There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
self, anyio_backend_name: str, anyio_backend_options: dict[str, Any] | ||
) -> None: | ||
async def use_event() -> None: | ||
event.set() | ||
await event.wait() | ||
|
||
event = Event() | ||
run( | ||
use_event, backend=anyio_backend_name, backend_options=anyio_backend_options | ||
) | ||
|
||
|
||
class TestCondition: | ||
async def test_contextmanager(self) -> None: | ||
|
@@ -595,3 +609,17 @@ async def worker(entered_event: Event) -> None: | |
|
||
# Allow all tasks to exit | ||
continue_event.set() | ||
|
||
def test_instantiate_outside_event_loop( | ||
self, anyio_backend_name: str, anyio_backend_options: dict[str, Any] | ||
) -> None: | ||
async def use_limiter() -> None: | ||
async with limiter: | ||
pass | ||
|
||
limiter = CapacityLimiter(1) | ||
run( | ||
use_limiter, | ||
backend=anyio_backend_name, | ||
backend_options=anyio_backend_options, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would it be cheaper to avoid using the proxy in the common case?
for the common case (construction under the loop context) this would move the overhead from on-every-attribute-access to on-construction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.