Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make optional types explicit #255

Merged
merged 16 commits into from
Feb 7, 2022
Merged
2 changes: 1 addition & 1 deletion docs/includes/settingref.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1436,7 +1436,7 @@ consider doing retries (send to separate topic):
main_topic = app.topic('main')
deadletter_topic = app.topic('main_deadletter')

async def send_request(value, timeout: float = None) -> None:
async def send_request(value, timeout: Optional[float] = None) -> None:
await app.http_client.get('http://foo.com', timeout=timeout)

@app.agent(main_topic)
Expand Down
4 changes: 2 additions & 2 deletions docs/userguide/application.rst
Original file line number Diff line number Diff line change
Expand Up @@ -856,8 +856,8 @@ This can be used to attach custom headers to Kafka messages:
sender: AppT,
key: bytes = None,
value: bytes = None,
partition: int = None,
timestamp: float = None,
partition: Optional[int] = None,
timestamp: Optional[float] = None,
headers: List[Tuple[str, bytes]] = None,
**kwargs: Any) -> None:
test = current_test()
Expand Down
4 changes: 2 additions & 2 deletions docs/userguide/models.rst
Original file line number Diff line number Diff line change
Expand Up @@ -601,8 +601,8 @@ time and last modified time:
.. sourcecode:: python

class MyBaseRecord(Record, abstract=True):
time_created: float = None
time_modified: float = None
time_created: Optional[float] = None
time_modified: Optional[float] = None

Inherit from this model to create a new model
having the fields by default:
Expand Down
6 changes: 3 additions & 3 deletions faust/agents/actor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Actor - Individual Agent instances."""
from typing import Any, AsyncGenerator, AsyncIterator, Coroutine, Set, cast
from typing import Any, AsyncGenerator, AsyncIterator, Coroutine, Optional, Set, cast

from mode import Service
from mode.utils.tracebacks import format_agen_stack, format_coro_stack
Expand All @@ -22,8 +22,8 @@ def __init__(
agent: AgentT,
stream: StreamT,
it: _T,
index: int = None,
active_partitions: Set[TP] = None,
index: Optional[int] = None,
active_partitions: Optional[Set[TP]] = None,
**kwargs: Any,
) -> None:
self.agent = agent
Expand Down
75 changes: 39 additions & 36 deletions faust/agents/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,18 +183,18 @@ def __init__(
fun: AgentFun,
*,
app: AppT,
name: str = None,
name: Optional[str] = None,
channel: Union[str, ChannelT] = None,
concurrency: int = 1,
sink: Iterable[SinkT] = None,
on_error: AgentErrorHandler = None,
supervisor_strategy: Type[SupervisorStrategyT] = None,
help: str = None,
schema: SchemaT = None,
help: Optional[str] = None,
schema: Optional[SchemaT] = None,
key_type: ModelArg = None,
value_type: ModelArg = None,
isolated_partitions: bool = False,
use_reply_headers: bool = None,
use_reply_headers: Optional[bool] = None,
**kwargs: Any,
) -> None:
self.app = app
Expand Down Expand Up @@ -242,8 +242,8 @@ async def _start_one(
*,
index: Optional[int] = None,
active_partitions: Optional[Set[TP]] = None,
stream: StreamT = None,
channel: ChannelT = None,
stream: Optional[StreamT] = None,
channel: Optional[ChannelT] = None,
) -> ActorT:
# an index of None means there's only one instance,
# and `index is None` is used as a test by functions that
Expand All @@ -261,7 +261,7 @@ async def _start_one_supervised(
self,
index: Optional[int] = None,
active_partitions: Optional[Set[TP]] = None,
stream: StreamT = None,
stream: Optional[StreamT] = None,
) -> ActorT:
aref = await self._start_one(
index=index,
Expand Down Expand Up @@ -456,7 +456,7 @@ def clone(self, *, cls: Type[AgentT] = None, **kwargs: Any) -> AgentT:

def test_context(
self,
channel: ChannelT = None,
channel: Optional[ChannelT] = None,
supervisor_strategy: SupervisorStrategyT = None,
on_error: AgentErrorHandler = None,
**kwargs: Any,
Expand Down Expand Up @@ -487,7 +487,7 @@ def _prepare_channel(
self,
channel: Union[str, ChannelT] = None,
internal: bool = True,
schema: SchemaT = None,
schema: Optional[SchemaT] = None,
key_type: ModelArg = None,
value_type: ModelArg = None,
**kwargs: Any,
Expand All @@ -514,10 +514,10 @@ def _prepare_channel(
def __call__(
self,
*,
index: int = None,
active_partitions: Set[TP] = None,
stream: StreamT = None,
channel: ChannelT = None,
index: Optional[int] = None,
active_partitions: Optional[Set[TP]] = None,
stream: Optional[StreamT] = None,
channel: Optional[ChannelT] = None,
) -> ActorRefT:
"""Create new actor instance for this agent."""
# The agent function can be reused by other agents/tasks.
Expand Down Expand Up @@ -545,9 +545,9 @@ def actor_from_stream(
self,
stream: Optional[StreamT],
*,
index: int = None,
active_partitions: Set[TP] = None,
channel: ChannelT = None,
index: Optional[int] = None,
active_partitions: Optional[Set[TP]] = None,
channel: Optional[ChannelT] = None,
) -> ActorRefT:
"""Create new actor from stream."""
we_created_stream = False
Expand Down Expand Up @@ -601,7 +601,10 @@ def add_sink(self, sink: SinkT) -> None:
self._sinks.append(sink)

def stream(
self, channel: ChannelT = None, active_partitions: Set[TP] = None, **kwargs: Any
self,
channel: Optional[ChannelT] = None,
active_partitions: Optional[Set[TP]] = None,
**kwargs: Any,
) -> StreamT:
"""Create underlying stream used by this agent."""
if channel is None:
Expand Down Expand Up @@ -631,9 +634,9 @@ async def _start_task(
*,
index: Optional[int],
active_partitions: Optional[Set[TP]] = None,
stream: StreamT = None,
channel: ChannelT = None,
beacon: NodeT = None,
stream: Optional[StreamT] = None,
channel: Optional[ChannelT] = None,
beacon: Optional[NodeT] = None,
) -> ActorRefT:
# If the agent is an async function we simply start it,
# if it returns an AsyncIterable/AsyncGenerator we start a task
Expand Down Expand Up @@ -750,8 +753,8 @@ async def cast(
value: V = None,
*,
key: K = None,
partition: int = None,
timestamp: float = None,
partition: Optional[int] = None,
timestamp: Optional[float] = None,
headers: HeadersArg = None,
) -> None:
"""RPC operation: like :meth:`ask` but do not expect reply.
Expand All @@ -772,11 +775,11 @@ async def ask(
value: V = None,
*,
key: K = None,
partition: int = None,
timestamp: float = None,
partition: Optional[int] = None,
timestamp: Optional[float] = None,
headers: HeadersArg = None,
reply_to: ReplyToArg = None,
correlation_id: str = None,
correlation_id: Optional[str] = None,
) -> Any:
"""RPC operation: ask agent for result of processing value.

Expand All @@ -803,11 +806,11 @@ async def ask_nowait(
value: V = None,
*,
key: K = None,
partition: int = None,
timestamp: float = None,
partition: Optional[int] = None,
timestamp: Optional[float] = None,
headers: HeadersArg = None,
reply_to: ReplyToArg = None,
correlation_id: str = None,
correlation_id: Optional[str] = None,
force: bool = False,
) -> ReplyPromise:
"""RPC operation: ask agent for result of processing value.
Expand Down Expand Up @@ -835,7 +838,7 @@ def _create_req(
key: K = None,
value: V = None,
reply_to: ReplyToArg = None,
correlation_id: str = None,
correlation_id: Optional[str] = None,
headers: HeadersArg = None,
) -> Tuple[V, Optional[HeadersArg]]:
if reply_to is None:
Expand Down Expand Up @@ -871,14 +874,14 @@ async def send(
*,
key: K = None,
value: V = None,
partition: int = None,
timestamp: float = None,
partition: Optional[int] = None,
timestamp: Optional[float] = None,
headers: HeadersArg = None,
key_serializer: CodecArg = None,
value_serializer: CodecArg = None,
callback: MessageSentCallback = None,
callback: Optional[MessageSentCallback] = None,
reply_to: ReplyToArg = None,
correlation_id: str = None,
correlation_id: Optional[str] = None,
force: bool = False,
) -> Awaitable[RecordMetadata]:
"""Send message to topic used by agent."""
Expand Down Expand Up @@ -1091,7 +1094,7 @@ class AgentTestWrapper(Agent, AgentTestWrapperT): # pragma: no cover
_stream: StreamT

def __init__(
self, *args: Any, original_channel: ChannelT = None, **kwargs: Any
self, *args: Any, original_channel: Optional[ChannelT] = None, **kwargs: Any
) -> None:
super().__init__(*args, **kwargs)
self.results = {}
Expand Down Expand Up @@ -1131,7 +1134,7 @@ async def put(
value_serializer: CodecArg = None,
*,
reply_to: ReplyToArg = None,
correlation_id: str = None,
correlation_id: Optional[str] = None,
wait: bool = True,
) -> EventT:
if reply_to:
Expand Down Expand Up @@ -1164,7 +1167,7 @@ def to_message(
*,
partition: Optional[int] = None,
offset: int = 0,
timestamp: float = None,
timestamp: Optional[float] = None,
timestamp_type: int = 0,
headers: HeadersArg = None,
) -> Message:
Expand Down
17 changes: 9 additions & 8 deletions faust/app/_attached.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
List,
MutableMapping,
NamedTuple,
Optional,
Union,
cast,
)
Expand Down Expand Up @@ -91,13 +92,13 @@ async def maybe_put(
channel: Union[ChannelT, str],
key: K = None,
value: V = None,
partition: int = None,
timestamp: float = None,
partition: Optional[int] = None,
timestamp: Optional[float] = None,
headers: HeadersArg = None,
schema: SchemaT = None,
schema: Optional[SchemaT] = None,
key_serializer: CodecArg = None,
value_serializer: CodecArg = None,
callback: MessageSentCallback = None,
callback: Optional[MessageSentCallback] = None,
force: bool = False,
) -> Awaitable[RecordMetadata]:
"""Attach message to source topic offset.
Expand Down Expand Up @@ -145,13 +146,13 @@ def put(
channel: Union[str, ChannelT],
key: K,
value: V,
partition: int = None,
timestamp: float = None,
partition: Optional[int] = None,
timestamp: Optional[float] = None,
headers: HeadersArg = None,
schema: SchemaT = None,
schema: Optional[SchemaT] = None,
key_serializer: CodecArg = None,
value_serializer: CodecArg = None,
callback: MessageSentCallback = None,
callback: Optional[MessageSentCallback] = None,
) -> Awaitable[RecordMetadata]:
"""Attach message to source topic offset."""
# This attaches message to be published when source message' is
Expand Down
Loading