Skip to content

Commit

Permalink
fix: add docs
Browse files Browse the repository at this point in the history
  • Loading branch information
phi-friday committed Feb 24, 2024
1 parent 9402d68 commit 85d309e
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 4 deletions.
10 changes: 8 additions & 2 deletions src/async_wrapper/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,13 @@ class QueueRestrictedError(QueueError):
"""queue is restricted but used"""


class PipeError(Exception): ...
class PipeError(Exception):
"""
Base exception for pipe-related errors.
This exception serves as the base class for various pipe-related exceptions.
"""


class PipeAlreadyDisposedError(PipeError): ...
class PipeAlreadyDisposedError(PipeError):
"""Indicates that an attempt was made to use a pipe that has already been disposed of.""" # noqa: E501
44 changes: 42 additions & 2 deletions src/async_wrapper/pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,44 @@ class Synchronization(TypedDict, total=False):

@runtime_checkable
class Disposable(Protocol[InputT, OutputT]):
async def next(self, value: InputT) -> OutputT: ...
async def dispose(self) -> Any: ...
"""
Defines the interface for a disposable resource.
Type Parameters:
InputT: The type of input data.
OutputT: The type of output data.
"""

async def next(self, value: InputT) -> OutputT:
"""
Processes the next input value and produces an output value.
Args:
value: The input value.
Returns:
The output value.
"""
...

async def dispose(self) -> Any:
"""Disposes the resource and releases any associated resources."""


class Pipe(Disposable[InputT, OutputT], Generic[InputT, OutputT]):
"""
Implements a pipe that can be used to communicate data between coroutines.
Type Parameters:
InputT: The type of input data.
OutputT: The type of output data.
Args:
listener: The function that will be called to process each input value.
context: An optional synchronization context to use.
dispose: An optional function that will be called to dispose the pipe.
"""

_context: Synchronization
_listener: Callable[[InputT], Awaitable[OutputT]]
_listeners: deque[tuple[Disposable[OutputT, Any], bool]]
Expand Down Expand Up @@ -104,6 +137,13 @@ def subscribe(
*,
dispose: bool = True,
) -> None:
"""
Subscribes a listener to the pipe.
Args:
listener: The listener to subscribe.
dispose: Whether to dispose the listener when the pipe is disposed.
"""
if self._is_disposed:
raise PipeAlreadyDisposedError("pipe already disposed")

Expand Down

0 comments on commit 85d309e

Please sign in to comment.