diff --git a/docs/includes/settingref.txt b/docs/includes/settingref.txt index a9590b676..5ae599b61 100644 --- a/docs/includes/settingref.txt +++ b/docs/includes/settingref.txt @@ -1436,7 +1436,7 @@ consider doing retries (send to separate topic): main_topic = app.topic('main') deadletter_topic = app.topic('main_deadletter') - async def send_request(value, timeout: float = None) -> None: + async def send_request(value, timeout: Optional[float] = None) -> None: await app.http_client.get('http://foo.com', timeout=timeout) @app.agent(main_topic) diff --git a/docs/userguide/application.rst b/docs/userguide/application.rst index 75775da15..4071d99b6 100644 --- a/docs/userguide/application.rst +++ b/docs/userguide/application.rst @@ -856,8 +856,8 @@ This can be used to attach custom headers to Kafka messages: sender: AppT, key: bytes = None, value: bytes = None, - partition: int = None, - timestamp: float = None, + partition: Optional[int] = None, + timestamp: Optional[float] = None, headers: List[Tuple[str, bytes]] = None, **kwargs: Any) -> None: test = current_test() diff --git a/docs/userguide/models.rst b/docs/userguide/models.rst index 53fa6fe59..7945764df 100644 --- a/docs/userguide/models.rst +++ b/docs/userguide/models.rst @@ -601,8 +601,8 @@ time and last modified time: .. sourcecode:: python class MyBaseRecord(Record, abstract=True): - time_created: float = None - time_modified: float = None + time_created: Optional[float] = None + time_modified: Optional[float] = None Inherit from this model to create a new model having the fields by default: diff --git a/faust/agents/actor.py b/faust/agents/actor.py index 6637d8cde..c8d743233 100644 --- a/faust/agents/actor.py +++ b/faust/agents/actor.py @@ -1,5 +1,5 @@ """Actor - Individual Agent instances.""" -from typing import Any, AsyncGenerator, AsyncIterator, Coroutine, Set, cast +from typing import Any, AsyncGenerator, AsyncIterator, Coroutine, Optional, Set, cast from mode import Service from mode.utils.tracebacks import format_agen_stack, format_coro_stack @@ -22,8 +22,8 @@ def __init__( agent: AgentT, stream: StreamT, it: _T, - index: int = None, - active_partitions: Set[TP] = None, + index: Optional[int] = None, + active_partitions: Optional[Set[TP]] = None, **kwargs: Any, ) -> None: self.agent = agent diff --git a/faust/agents/agent.py b/faust/agents/agent.py index d4b6e9e21..1c367a8a0 100644 --- a/faust/agents/agent.py +++ b/faust/agents/agent.py @@ -183,18 +183,18 @@ def __init__( fun: AgentFun, *, app: AppT, - name: str = None, + name: Optional[str] = None, channel: Union[str, ChannelT] = None, concurrency: int = 1, sink: Iterable[SinkT] = None, on_error: AgentErrorHandler = None, supervisor_strategy: Type[SupervisorStrategyT] = None, - help: str = None, - schema: SchemaT = None, + help: Optional[str] = None, + schema: Optional[SchemaT] = None, key_type: ModelArg = None, value_type: ModelArg = None, isolated_partitions: bool = False, - use_reply_headers: bool = None, + use_reply_headers: Optional[bool] = None, **kwargs: Any, ) -> None: self.app = app @@ -242,8 +242,8 @@ async def _start_one( *, index: Optional[int] = None, active_partitions: Optional[Set[TP]] = None, - stream: StreamT = None, - channel: ChannelT = None, + stream: Optional[StreamT] = None, + channel: Optional[ChannelT] = None, ) -> ActorT: # an index of None means there's only one instance, # and `index is None` is used as a test by functions that @@ -261,7 +261,7 @@ async def _start_one_supervised( self, index: Optional[int] = None, active_partitions: Optional[Set[TP]] = None, - stream: StreamT = None, + stream: Optional[StreamT] = None, ) -> ActorT: aref = await self._start_one( index=index, @@ -456,7 +456,7 @@ def clone(self, *, cls: Type[AgentT] = None, **kwargs: Any) -> AgentT: def test_context( self, - channel: ChannelT = None, + channel: Optional[ChannelT] = None, supervisor_strategy: SupervisorStrategyT = None, on_error: AgentErrorHandler = None, **kwargs: Any, @@ -487,7 +487,7 @@ def _prepare_channel( self, channel: Union[str, ChannelT] = None, internal: bool = True, - schema: SchemaT = None, + schema: Optional[SchemaT] = None, key_type: ModelArg = None, value_type: ModelArg = None, **kwargs: Any, @@ -514,10 +514,10 @@ def _prepare_channel( def __call__( self, *, - index: int = None, - active_partitions: Set[TP] = None, - stream: StreamT = None, - channel: ChannelT = None, + index: Optional[int] = None, + active_partitions: Optional[Set[TP]] = None, + stream: Optional[StreamT] = None, + channel: Optional[ChannelT] = None, ) -> ActorRefT: """Create new actor instance for this agent.""" # The agent function can be reused by other agents/tasks. @@ -545,9 +545,9 @@ def actor_from_stream( self, stream: Optional[StreamT], *, - index: int = None, - active_partitions: Set[TP] = None, - channel: ChannelT = None, + index: Optional[int] = None, + active_partitions: Optional[Set[TP]] = None, + channel: Optional[ChannelT] = None, ) -> ActorRefT: """Create new actor from stream.""" we_created_stream = False @@ -601,7 +601,10 @@ def add_sink(self, sink: SinkT) -> None: self._sinks.append(sink) def stream( - self, channel: ChannelT = None, active_partitions: Set[TP] = None, **kwargs: Any + self, + channel: Optional[ChannelT] = None, + active_partitions: Optional[Set[TP]] = None, + **kwargs: Any, ) -> StreamT: """Create underlying stream used by this agent.""" if channel is None: @@ -631,9 +634,9 @@ async def _start_task( *, index: Optional[int], active_partitions: Optional[Set[TP]] = None, - stream: StreamT = None, - channel: ChannelT = None, - beacon: NodeT = None, + stream: Optional[StreamT] = None, + channel: Optional[ChannelT] = None, + beacon: Optional[NodeT] = None, ) -> ActorRefT: # If the agent is an async function we simply start it, # if it returns an AsyncIterable/AsyncGenerator we start a task @@ -750,8 +753,8 @@ async def cast( value: V = None, *, key: K = None, - partition: int = None, - timestamp: float = None, + partition: Optional[int] = None, + timestamp: Optional[float] = None, headers: HeadersArg = None, ) -> None: """RPC operation: like :meth:`ask` but do not expect reply. @@ -772,11 +775,11 @@ async def ask( value: V = None, *, key: K = None, - partition: int = None, - timestamp: float = None, + partition: Optional[int] = None, + timestamp: Optional[float] = None, headers: HeadersArg = None, reply_to: ReplyToArg = None, - correlation_id: str = None, + correlation_id: Optional[str] = None, ) -> Any: """RPC operation: ask agent for result of processing value. @@ -803,11 +806,11 @@ async def ask_nowait( value: V = None, *, key: K = None, - partition: int = None, - timestamp: float = None, + partition: Optional[int] = None, + timestamp: Optional[float] = None, headers: HeadersArg = None, reply_to: ReplyToArg = None, - correlation_id: str = None, + correlation_id: Optional[str] = None, force: bool = False, ) -> ReplyPromise: """RPC operation: ask agent for result of processing value. @@ -835,7 +838,7 @@ def _create_req( key: K = None, value: V = None, reply_to: ReplyToArg = None, - correlation_id: str = None, + correlation_id: Optional[str] = None, headers: HeadersArg = None, ) -> Tuple[V, Optional[HeadersArg]]: if reply_to is None: @@ -871,14 +874,14 @@ async def send( *, key: K = None, value: V = None, - partition: int = None, - timestamp: float = None, + partition: Optional[int] = None, + timestamp: Optional[float] = None, headers: HeadersArg = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, - callback: MessageSentCallback = None, + callback: Optional[MessageSentCallback] = None, reply_to: ReplyToArg = None, - correlation_id: str = None, + correlation_id: Optional[str] = None, force: bool = False, ) -> Awaitable[RecordMetadata]: """Send message to topic used by agent.""" @@ -1091,7 +1094,7 @@ class AgentTestWrapper(Agent, AgentTestWrapperT): # pragma: no cover _stream: StreamT def __init__( - self, *args: Any, original_channel: ChannelT = None, **kwargs: Any + self, *args: Any, original_channel: Optional[ChannelT] = None, **kwargs: Any ) -> None: super().__init__(*args, **kwargs) self.results = {} @@ -1131,7 +1134,7 @@ async def put( value_serializer: CodecArg = None, *, reply_to: ReplyToArg = None, - correlation_id: str = None, + correlation_id: Optional[str] = None, wait: bool = True, ) -> EventT: if reply_to: @@ -1164,7 +1167,7 @@ def to_message( *, partition: Optional[int] = None, offset: int = 0, - timestamp: float = None, + timestamp: Optional[float] = None, timestamp_type: int = 0, headers: HeadersArg = None, ) -> Message: diff --git a/faust/app/_attached.py b/faust/app/_attached.py index 734f97aa5..dd5af835e 100644 --- a/faust/app/_attached.py +++ b/faust/app/_attached.py @@ -13,6 +13,7 @@ List, MutableMapping, NamedTuple, + Optional, Union, cast, ) @@ -91,13 +92,13 @@ async def maybe_put( channel: Union[ChannelT, str], key: K = None, value: V = None, - partition: int = None, - timestamp: float = None, + partition: Optional[int] = None, + timestamp: Optional[float] = None, headers: HeadersArg = None, - schema: SchemaT = None, + schema: Optional[SchemaT] = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, - callback: MessageSentCallback = None, + callback: Optional[MessageSentCallback] = None, force: bool = False, ) -> Awaitable[RecordMetadata]: """Attach message to source topic offset. @@ -145,13 +146,13 @@ def put( channel: Union[str, ChannelT], key: K, value: V, - partition: int = None, - timestamp: float = None, + partition: Optional[int] = None, + timestamp: Optional[float] = None, headers: HeadersArg = None, - schema: SchemaT = None, + schema: Optional[SchemaT] = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, - callback: MessageSentCallback = None, + callback: Optional[MessageSentCallback] = None, ) -> Awaitable[RecordMetadata]: """Attach message to source topic offset.""" # This attaches message to be published when source message' is diff --git a/faust/app/base.py b/faust/app/base.py index 1bfb3c5f2..d0fa8c551 100644 --- a/faust/app/base.py +++ b/faust/app/base.py @@ -244,11 +244,11 @@ def __init__( self, app: AppT, *, - enable_web: bool = None, - enable_kafka: bool = None, - enable_kafka_producer: bool = None, - enable_kafka_consumer: bool = None, - enable_sensors: bool = None, + enable_web: Optional[bool] = None, + enable_kafka: Optional[bool] = None, + enable_kafka_producer: Optional[bool] = None, + enable_kafka_consumer: Optional[bool] = None, + enable_sensors: Optional[bool] = None, ) -> None: self.app = app @@ -458,8 +458,8 @@ def __init__( *, monitor: Monitor = None, config_source: Any = None, - loop: asyncio.AbstractEventLoop = None, - beacon: NodeT = None, + loop: Optional[asyncio.AbstractEventLoop] = None, + beacon: Optional[NodeT] = None, **options: Any, ) -> None: # This is passed to the configuration in self.conf @@ -765,23 +765,23 @@ def topic( self, *topics: str, pattern: Union[str, Pattern] = None, - schema: SchemaT = None, - key_type: ModelArg = None, - value_type: ModelArg = None, + schema: Optional[SchemaT] = None, + key_type: Optional[ModelArg] = None, + value_type: Optional[ModelArg] = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, - partitions: int = None, - retention: Seconds = None, - compacting: bool = None, - deleting: bool = None, - replicas: int = None, + partitions: Optional[int] = None, + retention: Optional[Seconds] = None, + compacting: Optional[bool] = None, + deleting: Optional[bool] = None, + replicas: Optional[int] = None, acks: bool = True, internal: bool = False, - config: Mapping[str, Any] = None, - maxsize: int = None, + config: Optional[Mapping[str, Any]] = None, + maxsize: Optional[int] = None, allow_empty: bool = False, has_prefix: bool = False, - loop: asyncio.AbstractEventLoop = None, + loop: Optional[asyncio.AbstractEventLoop] = None, ) -> TopicT: """Create topic description. @@ -820,11 +820,11 @@ def topic( def channel( self, *, - schema: SchemaT = None, - key_type: ModelArg = None, - value_type: ModelArg = None, - maxsize: int = None, - loop: asyncio.AbstractEventLoop = None, + schema: Optional[SchemaT] = None, + key_type: Optional[ModelArg] = None, + value_type: Optional[ModelArg] = None, + maxsize: Optional[int] = None, + loop: Optional[asyncio.AbstractEventLoop] = None, ) -> ChannelT: """Create new channel. @@ -849,7 +849,7 @@ def agent( self, channel: Union[str, ChannelT[_T]] = None, *, - name: str = None, + name: Optional[str] = None, concurrency: int = 1, supervisor_strategy: Type[SupervisorStrategyT] = None, sink: Iterable[SinkT] = None, @@ -978,7 +978,7 @@ def timer( interval: Seconds, on_leader: bool = False, traced: bool = True, - name: str = None, + name: Optional[str] = None, max_drift_correction: float = 0.1, ) -> Callable: """Define an async def function to be run at periodic intervals. @@ -1109,7 +1109,7 @@ def is_leader(self) -> bool: def stream( self, channel: Union[AsyncIterable, Iterable], - beacon: NodeT = None, + beacon: Optional[NodeT] = None, **kwargs: Any, ) -> StreamT: """Create new stream from channel/topic/iterable/async iterable. @@ -1138,9 +1138,9 @@ def Table( name: str, *, default: Callable[[], Any] = None, - window: WindowT = None, - partitions: int = None, - help: str = None, + window: Optional[WindowT] = None, + partitions: Optional[int] = None, + help: Optional[str] = None, **kwargs: Any, ) -> TableT: """Define new table. @@ -1183,9 +1183,9 @@ def GlobalTable( name: str, *, default: Callable[[], Any] = None, - window: WindowT = None, - partitions: int = None, - help: str = None, + window: Optional[WindowT] = None, + partitions: Optional[int] = None, + help: Optional[str] = None, **kwargs: Any, ) -> GlobalTableT: """Define new global table. @@ -1231,10 +1231,10 @@ def SetTable( self, name: str, *, - window: WindowT = None, - partitions: int = None, + window: Optional[WindowT] = None, + partitions: Optional[int] = None, start_manager: bool = False, - help: str = None, + help: Optional[str] = None, **kwargs: Any, ) -> TableT: """Table of sets.""" @@ -1258,10 +1258,10 @@ def SetGlobalTable( self, name: str, *, - window: WindowT = None, - partitions: int = None, + window: Optional[WindowT] = None, + partitions: Optional[int] = None, start_manager: bool = False, - help: str = None, + help: Optional[str] = None, **kwargs: Any, ) -> TableT: """Table of sets (global).""" @@ -1287,7 +1287,7 @@ def page( *, base: Type[View] = View, cors_options: Mapping[str, ResourceOptions] = None, - name: str = None, + name: Optional[str] = None, ) -> Callable[[PageArg], Type[View]]: """Decorate view to be included in the web server.""" view_base: Type[View] = base if base is not None else View @@ -1313,11 +1313,11 @@ def _decorator(fun: PageArg) -> Type[View]: def table_route( self, table: CollectionT, - shard_param: str = None, + shard_param: Optional[str] = None, *, - query_param: str = None, - match_info: str = None, - exact_key: str = None, + query_param: Optional[str] = None, + match_info: Optional[str] = None, + exact_key: Optional[str] = None, ) -> ViewDecorator: """Decorate view method to route request to table key destination.""" @@ -1357,11 +1357,11 @@ async def get( def topic_route( self, topic: CollectionT, - shard_param: str = None, + shard_param: Optional[str] = None, *, - query_param: str = None, - match_info: str = None, - exact_key: str = None, + query_param: Optional[str] = None, + match_info: Optional[str] = None, + exact_key: Optional[str] = None, ) -> ViewDecorator: """Decorate view method to route request to a topic partition destination.""" @@ -1453,7 +1453,11 @@ def trace( return self.tracer.trace(name=name, **extra_context) def traced( - self, fun: Callable, name: str = None, sample_rate: float = 1.0, **context: Any + self, + fun: Callable, + name: Optional[str] = None, + sample_rate: float = 1.0, + **context: Any, ) -> Callable: """Decorate function to be traced using the OpenTracing API.""" assert fun @@ -1485,13 +1489,13 @@ async def send( channel: Union[ChannelT, str], key: K = None, value: V = None, - partition: int = None, - timestamp: float = None, + partition: Optional[int] = None, + timestamp: Optional[float] = None, headers: HeadersArg = None, - schema: SchemaT = None, + schema: Optional[SchemaT] = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, - callback: MessageSentCallback = None, + callback: Optional[MessageSentCallback] = None, ) -> Awaitable[RecordMetadata]: """Send event to channel/topic. @@ -1815,10 +1819,10 @@ def _new_cache_backend(self) -> CacheBackendT: def FlowControlQueue( self, - maxsize: int = None, + maxsize: Optional[int] = None, *, clear_on_resume: bool = False, - loop: asyncio.AbstractEventLoop = None, + loop: Optional[asyncio.AbstractEventLoop] = None, ) -> ThrowableQueue: """Like :class:`asyncio.Queue`, but can be suspended/resumed.""" return ThrowableQueue( diff --git a/faust/assignor/copartitioned_assignor.py b/faust/assignor/copartitioned_assignor.py index 9f14769e0..a06c8d8f4 100644 --- a/faust/assignor/copartitioned_assignor.py +++ b/faust/assignor/copartitioned_assignor.py @@ -45,7 +45,7 @@ def __init__( cluster_asgn: MutableMapping[str, CopartitionedAssignment], num_partitions: int, replicas: int, - capacity: int = None, + capacity: Optional[int] = None, ) -> None: self._num_clients = len(cluster_asgn) assert self._num_clients, "Should assign to at least 1 client" @@ -154,7 +154,7 @@ def _client_exhausted( self, assignemnt: CopartitionedAssignment, active: bool, - client_limit: int = None, + client_limit: Optional[int] = None, ) -> bool: if client_limit is None: client_limit = self._get_client_limit(active) diff --git a/faust/auth.py b/faust/auth.py index a72873d6e..22c4e859f 100644 --- a/faust/auth.py +++ b/faust/auth.py @@ -30,8 +30,8 @@ class SASLCredentials(Credentials): def __init__( self, *, - username: str = None, - password: str = None, + username: Optional[str] = None, + password: Optional[str] = None, ssl_context: ssl.SSLContext = None, mechanism: Union[str, SASLMechanism] = None, ) -> None: @@ -61,7 +61,7 @@ def __init__( self, *, kerberos_service_name: str = "kafka", - kerberos_domain_name: str = None, + kerberos_domain_name: Optional[str] = None, ssl_context: ssl.SSLContext = None, mechanism: Union[str, SASLMechanism] = None, ) -> None: diff --git a/faust/channels.py b/faust/channels.py index 937f6d86b..3ac34fdb6 100644 --- a/faust/channels.py +++ b/faust/channels.py @@ -90,15 +90,15 @@ def __init__( self, app: AppT, *, - schema: SchemaT = None, + schema: Optional[SchemaT] = None, key_type: ModelArg = None, value_type: ModelArg = None, is_iterator: bool = False, - queue: ThrowableQueue = None, - maxsize: int = None, - root: ChannelT = None, - active_partitions: Set[TP] = None, - loop: asyncio.AbstractEventLoop = None, + queue: Optional[ThrowableQueue] = None, + maxsize: Optional[int] = None, + root: Optional[ChannelT] = None, + active_partitions: Optional[Set[TP]] = None, + loop: Optional[asyncio.AbstractEventLoop] = None, ) -> None: self.app = app self.loop = loop @@ -146,7 +146,9 @@ def queue(self) -> ThrowableQueue: ) return self._queue - def clone(self, *, is_iterator: bool = None, **kwargs: Any) -> ChannelT[T]: + def clone( + self, *, is_iterator: Optional[bool] = None, **kwargs: Any + ) -> ChannelT[T]: """Create clone of this channel. Arguments: @@ -201,13 +203,13 @@ async def send( *, key: K = None, value: V = None, - partition: int = None, - timestamp: float = None, + partition: Optional[int] = None, + timestamp: Optional[float] = None, headers: HeadersArg = None, - schema: SchemaT = None, + schema: Optional[SchemaT] = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, - callback: MessageSentCallback = None, + callback: Optional[MessageSentCallback] = None, force: bool = False, ) -> Awaitable[RecordMetadata]: """Send message to channel.""" @@ -228,13 +230,13 @@ def send_soon( *, key: K = None, value: V = None, - partition: int = None, - timestamp: float = None, + partition: Optional[int] = None, + timestamp: Optional[float] = None, headers: HeadersArg = None, - schema: SchemaT = None, + schema: Optional[SchemaT] = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, - callback: MessageSentCallback = None, + callback: Optional[MessageSentCallback] = None, force: bool = False, eager_partitioning: bool = False, ) -> FutureMessage: @@ -251,13 +253,13 @@ def as_future_message( self, key: K = None, value: V = None, - partition: int = None, - timestamp: float = None, + partition: Optional[int] = None, + timestamp: Optional[float] = None, headers: HeadersArg = None, - schema: SchemaT = None, + schema: Optional[SchemaT] = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, - callback: MessageSentCallback = None, + callback: Optional[MessageSentCallback] = None, eager_partitioning: bool = False, ) -> FutureMessage: """Create promise that message will be transmitted.""" @@ -302,13 +304,13 @@ async def _send_now( self, key: K = None, value: V = None, - partition: int = None, - timestamp: float = None, + partition: Optional[int] = None, + timestamp: Optional[float] = None, headers: HeadersArg = None, - schema: SchemaT = None, + schema: Optional[SchemaT] = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, - callback: MessageSentCallback = None, + callback: Optional[MessageSentCallback] = None, ) -> Awaitable[RecordMetadata]: return await self.publish_message( self.as_future_message( @@ -388,7 +390,7 @@ def prepare_key( self, key: K, key_serializer: CodecArg, - schema: SchemaT = None, + schema: Optional[SchemaT] = None, headers: OpenHeadersArg = None, ) -> Tuple[Any, OpenHeadersArg]: """Prepare key before it is sent to this channel. @@ -402,7 +404,7 @@ def prepare_value( self, value: V, value_serializer: CodecArg, - schema: SchemaT = None, + schema: Optional[SchemaT] = None, headers: OpenHeadersArg = None, ) -> Tuple[Any, OpenHeadersArg]: """Prepare value before it is sent to this channel. @@ -450,7 +452,7 @@ async def put(self, value: EventT[T_contra]) -> None: for subscriber in root._subscribers: await subscriber.queue.put(value) - async def get(self, *, timeout: Seconds = None) -> EventT[T]: + async def get(self, *, timeout: Optional[Seconds] = None) -> EventT[T]: """Get the next :class:`~faust.Event` received on this channel.""" timeout_: float = want_seconds(timeout) if timeout_: @@ -591,12 +593,12 @@ def __init__( self, app: AppT, *, - schema: SchemaT = None, + schema: Optional[SchemaT] = None, key_type: ModelArg = None, value_type: ModelArg = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, - allow_empty: bool = None, + allow_empty: Optional[bool] = None, **kwargs: Any, ) -> None: self.app = app # need to set early @@ -633,7 +635,7 @@ def _contribute_to_schema( value_type: ModelArg = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, - allow_empty: bool = None, + allow_empty: Optional[bool] = None, ) -> None: # Update schema and take compat attributes # from passed schema. @@ -651,7 +653,7 @@ def _get_default_schema( value_type: ModelArg = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, - allow_empty: bool = None, + allow_empty: Optional[bool] = None, ) -> SchemaT: return cast( SchemaT, @@ -678,7 +680,7 @@ def prepare_key( self, key: K, key_serializer: CodecArg, - schema: SchemaT = None, + schema: Optional[SchemaT] = None, headers: OpenHeadersArg = None, ) -> Tuple[Any, OpenHeadersArg]: """Serialize key to format suitable for transport.""" @@ -694,7 +696,7 @@ def prepare_value( self, value: V, value_serializer: CodecArg, - schema: SchemaT = None, + schema: Optional[SchemaT] = None, headers: OpenHeadersArg = None, ) -> Tuple[Any, OpenHeadersArg]: """Serialize value to format suitable for transport.""" diff --git a/faust/cli/base.py b/faust/cli/base.py index 456c701b7..d4cf47687 100644 --- a/faust/cli/base.py +++ b/faust/cli/base.py @@ -412,7 +412,7 @@ def make_context( self, info_name: str, args: str, - app: AppT = None, + app: Optional[AppT] = None, parent: click.Context = None, stdout: IO = None, stderr: IO = None, @@ -644,7 +644,7 @@ def as_service( ) def worker_for_service( - self, service: ServiceT, loop: asyncio.AbstractEventLoop = None + self, service: ServiceT, loop: Optional[asyncio.AbstractEventLoop] = None ) -> Worker: """Create :class:`faust.Worker` instance for this command.""" return self._Worker( @@ -670,7 +670,7 @@ def _Worker(self) -> Type[Worker]: def tabulate( self, data: terminal.TableDataT, - headers: Sequence[str] = None, + headers: Optional[Sequence[str]] = None, wrap_last_row: bool = True, title: str = "", **kwargs: Any, @@ -698,7 +698,7 @@ def tabulate( return table.table def _tabulate_json( - self, data: terminal.TableDataT, headers: Sequence[str] = None + self, data: terminal.TableDataT, headers: Optional[Sequence[str]] = None ) -> str: if headers: return json.dumps([dict(zip(headers, row)) for row in data]) @@ -836,7 +836,7 @@ def _finalize_app(self, app: AppT) -> AppT: else: return self._app_from_str(self.state.app) - def _app_from_str(self, appstr: str = None) -> Optional[AppT]: + def _app_from_str(self, appstr: Optional[str] = None) -> Optional[AppT]: if appstr: return find_app(appstr) else: diff --git a/faust/cli/model.py b/faust/cli/model.py index 81dd9d6ff..fde0fe530 100644 --- a/faust/cli/model.py +++ b/faust/cli/model.py @@ -1,6 +1,6 @@ """Program ``faust model`` used to list details about a model.""" from datetime import datetime -from typing import Any, Sequence, Type +from typing import Any, Optional, Sequence, Type import click from mode.utils import text @@ -51,7 +51,9 @@ async def run(self, name: str) -> None: ) ) - def _unknown_model(self, name: str, *, lookup: str = None) -> click.UsageError: + def _unknown_model( + self, name: str, *, lookup: Optional[str] = None + ) -> click.UsageError: lookup = lookup or name alt = text.didyoumean( registry, diff --git a/faust/cli/send.py b/faust/cli/send.py index 0a751a7fa..a52cd4c9e 100644 --- a/faust/cli/send.py +++ b/faust/cli/send.py @@ -1,7 +1,7 @@ """Program ``faust send`` used to send events to agents and topics.""" import asyncio import random -from typing import Any +from typing import Any, Optional from faust.types import CodecArg, K, RecordMetadata, V @@ -51,13 +51,13 @@ async def run( entity: str, value: str, *args: Any, - key: str = None, - key_type: str = None, - key_serializer: str = None, - value_type: str = None, - value_serializer: str = None, + key: Optional[str] = None, + key_type: Optional[str] = None, + key_serializer: Optional[str] = None, + value_type: Optional[str] = None, + value_serializer: Optional[str] = None, partition: int = 1, - timestamp: float = None, + timestamp: Optional[float] = None, repeat: int = 1, min_latency: float = 0.0, max_latency: float = 0.0, diff --git a/faust/contrib/sentry.py b/faust/contrib/sentry.py index 407d3aa9d..404595cd4 100644 --- a/faust/contrib/sentry.py +++ b/faust/contrib/sentry.py @@ -91,10 +91,10 @@ def carp(self, obj: Any) -> None: def handler_from_dsn( - dsn: str = None, + dsn: Optional[str] = None, workers: int = 5, include_paths: Iterable[str] = None, - loglevel: int = None, + loglevel: Optional[int] = None, qsize: int = 1000, **kwargs: Any ) -> Optional[logging.Handler]: @@ -126,10 +126,10 @@ def handler_from_dsn( def setup( app: AppT, *, - dsn: str = None, + dsn: Optional[str] = None, workers: int = 4, max_queue_size: int = 1000, - loglevel: int = None + loglevel: Optional[int] = None ) -> None: sentry_handler = handler_from_dsn( dsn=dsn, diff --git a/faust/events.py b/faust/events.py index 18c6dbd0b..3e218a16c 100644 --- a/faust/events.py +++ b/faust/events.py @@ -134,13 +134,13 @@ async def send( channel: Union[str, ChannelT], key: K = USE_EXISTING_KEY, value: V = USE_EXISTING_VALUE, - partition: int = None, - timestamp: float = None, + partition: Optional[int] = None, + timestamp: Optional[float] = None, headers: Any = USE_EXISTING_HEADERS, - schema: SchemaT = None, + schema: Optional[SchemaT] = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, - callback: MessageSentCallback = None, + callback: Optional[MessageSentCallback] = None, force: bool = False, ) -> Awaitable[RecordMetadata]: """Send object to channel.""" @@ -169,13 +169,13 @@ async def forward( channel: Union[str, ChannelT], key: K = USE_EXISTING_KEY, value: V = USE_EXISTING_VALUE, - partition: int = None, - timestamp: float = None, + partition: Optional[int] = None, + timestamp: Optional[float] = None, headers: Any = USE_EXISTING_HEADERS, - schema: SchemaT = None, + schema: Optional[SchemaT] = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, - callback: MessageSentCallback = None, + callback: Optional[MessageSentCallback] = None, force: bool = False, ) -> Awaitable[RecordMetadata]: """Forward original message (will not be reserialized).""" @@ -206,13 +206,13 @@ async def _send( channel: Union[str, ChannelT], key: K = None, value: V = None, - partition: int = None, - timestamp: float = None, + partition: Optional[int] = None, + timestamp: Optional[float] = None, headers: HeadersArg = None, - schema: SchemaT = None, + schema: Optional[SchemaT] = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, - callback: MessageSentCallback = None, + callback: Optional[MessageSentCallback] = None, force: bool = False, ) -> Awaitable[RecordMetadata]: return await cast(_App, self.app)._attachments.maybe_put( @@ -234,13 +234,13 @@ def _attach( channel: Union[ChannelT, str], key: K = None, value: V = None, - partition: int = None, - timestamp: float = None, + partition: Optional[int] = None, + timestamp: Optional[float] = None, headers: HeadersArg = None, - schema: SchemaT = None, + schema: Optional[SchemaT] = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, - callback: MessageSentCallback = None, + callback: Optional[MessageSentCallback] = None, ) -> Awaitable[RecordMetadata]: return cast(_App, self.app)._attachments.put( self.message, diff --git a/faust/livecheck/app.py b/faust/livecheck/app.py index 3ddc1764b..35126a0ad 100644 --- a/faust/livecheck/app.py +++ b/faust/livecheck/app.py @@ -100,12 +100,12 @@ def for_app( *, prefix: str = "livecheck-", web_port: int = 9999, - test_topic_name: str = None, - bus_topic_name: str = None, - report_topic_name: str = None, - bus_concurrency: int = None, - test_concurrency: int = None, - send_reports: bool = None, + test_topic_name: Optional[str] = None, + bus_topic_name: Optional[str] = None, + report_topic_name: Optional[str] = None, + bus_concurrency: Optional[int] = None, + test_concurrency: Optional[int] = None, + send_reports: Optional[bool] = None, **kwargs: Any, ) -> "LiveCheck": """Create LiveCheck application targeting specific app. @@ -143,12 +143,12 @@ def __init__( self, id: str, *, - test_topic_name: str = None, - bus_topic_name: str = None, - report_topic_name: str = None, - bus_concurrency: int = None, - test_concurrency: int = None, - send_reports: bool = None, + test_topic_name: Optional[str] = None, + bus_topic_name: Optional[str] = None, + report_topic_name: Optional[str] = None, + bus_concurrency: Optional[int] = None, + test_concurrency: Optional[int] = None, + send_reports: Optional[bool] = None, **kwargs: Any, ) -> None: super().__init__(id, **kwargs) @@ -194,10 +194,10 @@ def on_produce_attach_test_headers( sender: AppT, key: bytes = None, value: bytes = None, - partition: int = None, - timestamp: float = None, + partition: Optional[int] = None, + timestamp: Optional[float] = None, headers: List[Tuple[str, bytes]] = None, - signal: BaseSignalT = None, + signal: Optional[BaseSignalT] = None, **kwargs: Any, ) -> None: """Attach test headers to Kafka produce requests.""" @@ -210,20 +210,20 @@ def on_produce_attach_test_headers( def case( self, *, - name: str = None, - probability: float = None, + name: Optional[str] = None, + probability: Optional[float] = None, warn_stalled_after: Seconds = WARN_STALLED_AFTER_DEFAULT, - active: bool = None, - test_expires: Seconds = None, - frequency: Seconds = None, - max_history: int = None, - max_consecutive_failures: int = None, - url_timeout_total: float = None, - url_timeout_connect: float = None, - url_error_retries: float = None, - url_error_delay_min: float = None, - url_error_delay_backoff: float = None, - url_error_delay_max: float = None, + active: Optional[bool] = None, + test_expires: Optional[Seconds] = None, + frequency: Optional[Seconds] = None, + max_history: Optional[int] = None, + max_consecutive_failures: Optional[int] = None, + url_timeout_total: Optional[float] = None, + url_timeout_connect: Optional[float] = None, + url_error_retries: Optional[float] = None, + url_error_delay_min: Optional[float] = None, + url_error_delay_backoff: Optional[float] = None, + url_error_delay_max: Optional[float] = None, base: Type[_Case] = Case, ) -> Callable[[Type], _Case]: """Decorate class to be used as a test case. diff --git a/faust/livecheck/case.py b/faust/livecheck/case.py index 12858bc68..c71c0b51f 100644 --- a/faust/livecheck/case.py +++ b/faust/livecheck/case.py @@ -112,21 +112,21 @@ def __init__( *, app: _LiveCheck, name: str, - probability: float = None, - warn_stalled_after: Seconds = None, - active: bool = None, + probability: Optional[float] = None, + warn_stalled_after: Optional[Seconds] = None, + active: Optional[bool] = None, signals: Iterable[BaseSignal] = None, - test_expires: Seconds = None, - frequency: Seconds = None, - realtime_logs: bool = None, - max_history: int = None, - max_consecutive_failures: int = None, - url_timeout_total: float = None, - url_timeout_connect: float = None, - url_error_retries: int = None, - url_error_delay_min: float = None, - url_error_delay_backoff: float = None, - url_error_delay_max: float = None, + test_expires: Optional[Seconds] = None, + frequency: Optional[Seconds] = None, + realtime_logs: Optional[bool] = None, + max_history: Optional[int] = None, + max_consecutive_failures: Optional[int] = None, + url_timeout_total: Optional[float] = None, + url_timeout_connect: Optional[float] = None, + url_error_retries: Optional[int] = None, + url_error_delay_min: Optional[float] = None, + url_error_delay_backoff: Optional[float] = None, + url_error_delay_max: Optional[float] = None, **kwargs: Any, ) -> None: self.app = app @@ -199,7 +199,7 @@ async def _sample(self) -> None: @asynccontextmanager async def maybe_trigger( - self, id: str = None, *args: Any, **kwargs: Any + self, id: Optional[str] = None, *args: Any, **kwargs: Any ) -> AsyncGenerator[Optional[TestExecution], None]: """Schedule test execution, or not, based on probability setting.""" execution: Optional[TestExecution] = None @@ -209,7 +209,9 @@ async def maybe_trigger( exit_stack.enter_context(current_test_stack.push(execution)) yield execution - async def trigger(self, id: str = None, *args: Any, **kwargs: Any) -> TestExecution: + async def trigger( + self, id: Optional[str] = None, *args: Any, **kwargs: Any + ) -> TestExecution: """Schedule test execution ASAP.""" id = id or uuid() execution = TestExecution( diff --git a/faust/livecheck/signals.py b/faust/livecheck/signals.py index 4f86f9ac3..d7aa9b974 100644 --- a/faust/livecheck/signals.py +++ b/faust/livecheck/signals.py @@ -2,7 +2,7 @@ import asyncio import typing from time import monotonic -from typing import Any, Dict, Generic, Tuple, Type, TypeVar, cast +from typing import Any, Dict, Generic, Optional, Tuple, Type, TypeVar, cast from mode import Seconds, want_seconds @@ -43,7 +43,7 @@ async def send( """Notify test that this signal is now complete.""" raise NotImplementedError() - async def wait(self, *, key: Any = None, timeout: Seconds = None) -> VT: + async def wait(self, *, key: Any = None, timeout: Optional[Seconds] = None) -> VT: """Wait for signal to be completed.""" raise NotImplementedError() @@ -59,7 +59,7 @@ def __set_name__(self, owner: Type, name: str) -> None: def _wakeup_resolvers(self) -> None: self.case.app._can_resolve.set() - async def _wait_for_resolved(self, *, timeout: float = None) -> None: + async def _wait_for_resolved(self, *, timeout: Optional[float] = None) -> None: app = self.case.app app._can_resolve.clear() await app.wait(app._can_resolve, timeout=timeout) @@ -116,7 +116,7 @@ async def send( ), ) - async def wait(self, *, key: Any = None, timeout: Seconds = None) -> VT: + async def wait(self, *, key: Any = None, timeout: Optional[Seconds] = None) -> VT: """Wait for signal to be completed.""" # wait for key to arrive in consumer runner = self.case.current_execution @@ -144,7 +144,7 @@ def _verify_event(self, ev: SignalEvent, key: Any, name: str, case: str) -> None assert ev.case_name == case, f"{ev.case_name!r} == {case!r}" async def _wait_for_message_by_key( - self, key: Any, *, timeout: float = None, max_interval: float = 2.0 + self, key: Any, *, timeout: Optional[float] = None, max_interval: float = 2.0 ) -> SignalEvent: app = self.case.app time_start = monotonic() diff --git a/faust/models/base.py b/faust/models/base.py index 5cae52844..3f7ec65c5 100644 --- a/faust/models/base.py +++ b/faust/models/base.py @@ -250,17 +250,17 @@ def loads( def __init_subclass__( self, - serializer: str = None, - namespace: str = None, - include_metadata: bool = None, - isodates: bool = None, + serializer: Optional[str] = None, + namespace: Optional[str] = None, + include_metadata: Optional[bool] = None, + isodates: Optional[bool] = None, abstract: bool = False, - allow_blessed_key: bool = None, - decimals: bool = None, - coerce: bool = None, + allow_blessed_key: Optional[bool] = None, + decimals: Optional[bool] = None, + coerce: Optional[bool] = None, coercions: CoercionMapping = None, - polymorphic_fields: bool = None, - validation: bool = None, + polymorphic_fields: Optional[bool] = None, + validation: Optional[bool] = None, date_parser: Callable[[Any], datetime] = None, lazy_creation: bool = False, **kwargs: Any, @@ -305,17 +305,17 @@ def make_final(cls) -> None: @classmethod def _init_subclass( cls, - serializer: str = None, - namespace: str = None, - include_metadata: bool = None, - isodates: bool = None, + serializer: Optional[str] = None, + namespace: Optional[str] = None, + include_metadata: Optional[bool] = None, + isodates: Optional[bool] = None, abstract: bool = False, - allow_blessed_key: bool = None, - decimals: bool = None, - coerce: bool = None, + allow_blessed_key: Optional[bool] = None, + decimals: Optional[bool] = None, + coerce: Optional[bool] = None, coercions: CoercionMapping = None, - polymorphic_fields: bool = None, - validation: bool = None, + polymorphic_fields: Optional[bool] = None, + validation: Optional[bool] = None, date_parser: Callable[[Any], datetime] = None, ) -> None: # Can set serializer/namespace/etc. using: @@ -421,7 +421,10 @@ def _contribute_methods(cls) -> None: # pragma: no cover @classmethod @abc.abstractmethod def _contribute_field_descriptors( - cls, target: Type, options: ModelOptions, parent: FieldDescriptorT = None + cls, + target: Type, + options: ModelOptions, + parent: Optional[FieldDescriptorT] = None, ) -> FieldMap: # pragma: no cover ... diff --git a/faust/models/fields.py b/faust/models/fields.py index 6daff80ce..a51db66e0 100644 --- a/faust/models/fields.py +++ b/faust/models/fields.py @@ -132,16 +132,16 @@ class FieldDescriptor(FieldDescriptorT[T]): def __init__( self, *, - field: str = None, - input_name: str = None, - output_name: str = None, + field: Optional[str] = None, + input_name: Optional[str] = None, + output_name: Optional[str] = None, type: Type[T] = None, model: Type[ModelT] = None, required: bool = True, default: T = None, - parent: FieldDescriptorT = None, - coerce: bool = None, - exclude: bool = None, + parent: Optional[FieldDescriptorT] = None, + coerce: Optional[bool] = None, + exclude: Optional[bool] = None, date_parser: Callable[[Any], datetime] = None, tag: Type[Tag] = None, **options: Any, @@ -239,7 +239,9 @@ def to_python(self, value: Any) -> Optional[T]: value = to_python(value) return self.prepare_value(value) - def prepare_value(self, value: Any, *, coerce: bool = None) -> Optional[T]: + def prepare_value( + self, value: Any, *, coerce: Optional[bool] = None + ) -> Optional[T]: return cast(T, value) def _copy_descriptors(self, typ: Type = None) -> None: @@ -264,7 +266,7 @@ def __get__(self, instance: Any, owner: Type) -> Any: evaluated_fields.add(field) return value - def should_coerce(self, value: Any, coerce: bool = None) -> bool: + def should_coerce(self, value: Any, coerce: Optional[bool] = None) -> bool: c = coerce if coerce is not None else self.coerce return c and (self.required or value is not None) @@ -320,7 +322,9 @@ def validate(self, value: T) -> Iterable[ValidationError]: f"{self.field} must be True or False, of type bool" ) - def prepare_value(self, value: Any, *, coerce: bool = None) -> Optional[bool]: + def prepare_value( + self, value: Any, *, coerce: Optional[bool] = None + ) -> Optional[bool]: if self.should_coerce(value, coerce): return True if value else False return value @@ -332,7 +336,11 @@ class NumberField(FieldDescriptor[T]): min_value: Optional[int] def __init__( - self, *, max_value: int = None, min_value: int = None, **kwargs: Any + self, + *, + max_value: Optional[int] = None, + min_value: Optional[int] = None, + **kwargs: Any, ) -> None: self.max_value = max_value self.min_value = min_value @@ -358,12 +366,16 @@ def validate(self, value: T) -> Iterable[ValidationError]: class IntegerField(NumberField[int]): - def prepare_value(self, value: Any, *, coerce: bool = None) -> Optional[int]: + def prepare_value( + self, value: Any, *, coerce: Optional[bool] = None + ) -> Optional[int]: return int(value) if self.should_coerce(value, coerce) else value class FloatField(NumberField[float]): - def prepare_value(self, value: Any, *, coerce: bool = None) -> Optional[float]: + def prepare_value( + self, value: Any, *, coerce: Optional[bool] = None + ) -> Optional[float]: return float(value) if self.should_coerce(value, coerce) else value @@ -372,7 +384,11 @@ class DecimalField(NumberField[Decimal]): max_decimal_places: Optional[int] = None def __init__( - self, *, max_digits: int = None, max_decimal_places: int = None, **kwargs: Any + self, + *, + max_digits: Optional[int] = None, + max_decimal_places: Optional[int] = None, + **kwargs: Any, ) -> None: self.max_digits = max_digits self.max_decimal_places = max_decimal_places @@ -393,7 +409,9 @@ def to_python(self, value: Any) -> Any: else: return self._to_python(value) - def prepare_value(self, value: Any, *, coerce: bool = None) -> Optional[Decimal]: + def prepare_value( + self, value: Any, *, coerce: Optional[bool] = None + ) -> Optional[Decimal]: return Decimal(value) if self.should_coerce(value, coerce) else value def validate(self, value: Decimal) -> Iterable[ValidationError]: @@ -430,8 +448,8 @@ class CharField(FieldDescriptor[CharacterType]): def __init__( self, *, - max_length: int = None, - min_length: int = None, + max_length: Optional[int] = None, + min_length: Optional[int] = None, trim_whitespace: bool = False, allow_blank: bool = False, **kwargs: Any, @@ -472,7 +490,9 @@ def validate(self, value: CharacterType) -> Iterable[ValidationError]: class StringField(CharField[str]): - def prepare_value(self, value: Any, *, coerce: bool = None) -> Optional[str]: + def prepare_value( + self, value: Any, *, coerce: Optional[bool] = None + ) -> Optional[str]: if self.should_coerce(value, coerce): val = str(value) if not isinstance(value, str) else value if self.trim_whitespace: @@ -491,7 +511,9 @@ def to_python(self, value: Any) -> Any: else: return self._to_python(value) - def prepare_value(self, value: Any, *, coerce: bool = None) -> Optional[datetime]: + def prepare_value( + self, value: Any, *, coerce: Optional[bool] = None + ) -> Optional[datetime]: if self.should_coerce(value, coerce): if value is not None and not isinstance(value, datetime): return self.date_parser(value) @@ -506,7 +528,11 @@ class BytesField(CharField[bytes]): errors: str = "strict" def __init__( - self, *, encoding: str = None, errors: str = None, **kwargs: Any + self, + *, + encoding: Optional[str] = None, + errors: Optional[str] = None, + **kwargs: Any, ) -> None: if encoding is not None: self.encoding = encoding @@ -518,7 +544,9 @@ def __init__( **kwargs, ) - def prepare_value(self, value: Any, *, coerce: bool = None) -> Optional[bytes]: + def prepare_value( + self, value: Any, *, coerce: Optional[bool] = None + ) -> Optional[bytes]: if self.should_coerce(value, coerce): if isinstance(value, bytes): val = value diff --git a/faust/models/record.py b/faust/models/record.py index 2d9af32b9..2dedb96fa 100644 --- a/faust/models/record.py +++ b/faust/models/record.py @@ -10,6 +10,7 @@ List, Mapping, MutableMapping, + Optional, Set, Tuple, Type, @@ -92,17 +93,17 @@ class Record(Model, abstract=True): # type: ignore def __init_subclass__( cls, - serializer: str = None, - namespace: str = None, - include_metadata: bool = None, - isodates: bool = None, + serializer: Optional[str] = None, + namespace: Optional[str] = None, + include_metadata: Optional[bool] = None, + isodates: Optional[bool] = None, abstract: bool = False, - allow_blessed_key: bool = None, - decimals: bool = None, - coerce: bool = None, + allow_blessed_key: Optional[bool] = None, + decimals: Optional[bool] = None, + coerce: Optional[bool] = None, coercions: CoercionMapping = None, - polymorphic_fields: bool = None, - validation: bool = None, + polymorphic_fields: Optional[bool] = None, + validation: Optional[bool] = None, date_parser: Callable[[Any], datetime] = None, lazy_creation: bool = False, **kwargs: Any, @@ -192,7 +193,10 @@ def _contribute_methods(cls) -> None: @classmethod def _contribute_field_descriptors( - cls, target: Type, options: ModelOptions, parent: FieldDescriptorT = None + cls, + target: Type, + options: ModelOptions, + parent: Optional[FieldDescriptorT] = None, ) -> FieldMap: fields = options.fields defaults = options.defaults diff --git a/faust/models/tags.py b/faust/models/tags.py index 3c005b695..ac87b0466 100644 --- a/faust/models/tags.py +++ b/faust/models/tags.py @@ -31,7 +31,7 @@ class Tag(Generic[T]): is_sensitive: bool = False is_personal: bool = False - def __init__(self, value: T, *, field: str = None): + def __init__(self, value: T, *, field: Optional[str] = None): if isinstance(value, Tag): raise SecurityError("Cannot wrap: value is already tagged") self._value = value diff --git a/faust/models/typing.py b/faust/models/typing.py index ff4252f60..eaf9ee4a6 100644 --- a/faust/models/typing.py +++ b/faust/models/typing.py @@ -183,7 +183,9 @@ def __repr__(self) -> str: def __getitem__(self, name: Any) -> "Variable": return self.clone(getitem=name) - def clone(self, *, name: str = None, getitem: Any = MISSING) -> "Variable": + def clone( + self, *, name: Optional[str] = None, getitem: Any = MISSING + ) -> "Variable": return type(self)( name=name if name is not None else self.name, getitem=getitem if getitem is not MISSING else self.getitem, diff --git a/faust/sensors/datadog.py b/faust/sensors/datadog.py index 5011392ae..ed8bf57b9 100644 --- a/faust/sensors/datadog.py +++ b/faust/sensors/datadog.py @@ -90,7 +90,10 @@ def timing(self, metric: str, value: float, labels: Dict = None) -> None: ) def timed( - self, metric: str = None, labels: Dict = None, use_ms: bool = None + self, + metric: Optional[str] = None, + labels: Dict = None, + use_ms: Optional[bool] = None, ) -> float: return self.client.timed( # type: ignore metric=metric, diff --git a/faust/sensors/monitor.py b/faust/sensors/monitor.py index 638765342..c334bbc6e 100644 --- a/faust/sensors/monitor.py +++ b/faust/sensors/monitor.py @@ -211,10 +211,10 @@ class Monitor(Sensor, KeywordReduce): def __init__( self, *, - max_avg_history: int = None, - max_commit_latency_history: int = None, - max_send_latency_history: int = None, - max_assignment_latency_history: int = None, + max_avg_history: Optional[int] = None, + max_commit_latency_history: Optional[int] = None, + max_send_latency_history: Optional[int] = None, + max_assignment_latency_history: Optional[int] = None, messages_sent: int = 0, tables: MutableMapping[str, TableState] = None, messages_active: int = 0, @@ -232,7 +232,7 @@ def __init__( messages_s: int = 0, events_runtime_avg: float = 0.0, topic_buffer_full: Counter[TP] = None, - rebalances: int = None, + rebalances: Optional[int] = None, rebalance_return_latency: Deque[float] = None, rebalance_end_latency: Deque[float] = None, rebalance_return_avg: float = 0.0, diff --git a/faust/sensors/prometheus.py b/faust/sensors/prometheus.py index 5284b9d8e..66829fef7 100644 --- a/faust/sensors/prometheus.py +++ b/faust/sensors/prometheus.py @@ -1,6 +1,6 @@ """Monitor using Prometheus.""" import typing -from typing import Any, NamedTuple, cast +from typing import Any, NamedTuple, Optional, cast from aiohttp.web import Response @@ -43,7 +43,7 @@ def setup_prometheus_sensors( app: AppT, pattern: str = "/metrics", registry: CollectorRegistry = REGISTRY, - name_prefix: str = None, + name_prefix: Optional[str] = None, ) -> None: """ A utility function which sets up prometheus and attaches the config to the app. diff --git a/faust/serializers/schemas.py b/faust/serializers/schemas.py index b4cf4ab31..8192cd2b1 100644 --- a/faust/serializers/schemas.py +++ b/faust/serializers/schemas.py @@ -39,7 +39,7 @@ def __init__( value_type: ModelArg = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, - allow_empty: bool = None, + allow_empty: Optional[bool] = None, ) -> None: self.update( key_type=key_type, @@ -56,7 +56,7 @@ def update( value_type: ModelArg = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, - allow_empty: bool = None, + allow_empty: Optional[bool] = None, ) -> None: if key_type is not None: self.key_type = key_type @@ -78,7 +78,7 @@ def loads_key( app: AppT, message: Message, *, - loads: Callable = None, + loads: Optional[Callable] = None, serializer: CodecArg = None, ) -> KT: if loads is None: @@ -97,7 +97,7 @@ def loads_value( app: AppT, message: Message, *, - loads: Callable = None, + loads: Optional[Callable] = None, serializer: CodecArg = None, ) -> VT: if loads is None: diff --git a/faust/stores/base.py b/faust/stores/base.py index c684b8511..62b55fda1 100644 --- a/faust/stores/base.py +++ b/faust/stores/base.py @@ -37,7 +37,7 @@ def __init__( value_type: ModelArg = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, - options: Mapping[str, Any] = None, + options: Optional[Mapping[str, Any]] = None, **kwargs: Any, ) -> None: Service.__init__(self, **kwargs) diff --git a/faust/stores/rocksdb.py b/faust/stores/rocksdb.py index 94440ad75..e7150d120 100644 --- a/faust/stores/rocksdb.py +++ b/faust/stores/rocksdb.py @@ -41,8 +41,8 @@ DEFAULT_WRITE_BUFFER_SIZE = 67108864 DEFAULT_MAX_WRITE_BUFFER_NUMBER = 3 DEFAULT_TARGET_FILE_SIZE_BASE = 67108864 -DEFAULT_BLOCK_CACHE_SIZE = 2 * 1024 ** 3 -DEFAULT_BLOCK_CACHE_COMPRESSED_SIZE = 500 * 1024 ** 2 +DEFAULT_BLOCK_CACHE_SIZE = 2 * 1024**3 +DEFAULT_BLOCK_CACHE_COMPRESSED_SIZE = 500 * 1024**2 DEFAULT_BLOOM_FILTER_SIZE = 3 try: # pragma: no cover @@ -87,13 +87,13 @@ class RocksDBOptions: def __init__( self, - max_open_files: int = None, - write_buffer_size: int = None, - max_write_buffer_number: int = None, - target_file_size_base: int = None, - block_cache_size: int = None, - block_cache_compressed_size: int = None, - bloom_filter_size: int = None, + max_open_files: Optional[int] = None, + write_buffer_size: Optional[int] = None, + max_write_buffer_number: Optional[int] = None, + target_file_size_base: Optional[int] = None, + block_cache_size: Optional[int] = None, + block_cache_compressed_size: Optional[int] = None, + bloom_filter_size: Optional[int] = None, **kwargs: Any, ) -> None: if max_open_files is not None: @@ -157,8 +157,8 @@ def __init__( app: AppT, table: CollectionT, *, - key_index_size: int = None, - options: Mapping[str, Any] = None, + key_index_size: Optional[int] = None, + options: Optional[Mapping[str, Any]] = None, **kwargs: Any, ) -> None: if rocksdb is None: diff --git a/faust/streams.py b/faust/streams.py index 99855104c..2275a3fec 100644 --- a/faust/streams.py +++ b/faust/streams.py @@ -119,15 +119,15 @@ def __init__( app: AppT, processors: Iterable[Processor[T]] = None, combined: List[JoinableT] = None, - on_start: Callable = None, - join_strategy: JoinT = None, - beacon: NodeT = None, - concurrency_index: int = None, - prev: StreamT = None, - active_partitions: Set[TP] = None, + on_start: Optional[Callable] = None, + join_strategy: Optional[JoinT] = None, + beacon: Optional[NodeT] = None, + concurrency_index: Optional[int] = None, + prev: Optional[StreamT] = None, + active_partitions: Optional[Set[TP]] = None, enable_acks: bool = True, prefix: str = "", - loop: asyncio.AbstractEventLoop = None, + loop: Optional[asyncio.AbstractEventLoop] = None, ) -> None: Service.__init__(self, loop=loop, beacon=beacon) self.app = app @@ -703,9 +703,9 @@ def group_by( self, key: GroupByKeyArg, *, - name: str = None, - topic: TopicT = None, - partitions: int = None, + name: Optional[str] = None, + topic: Optional[TopicT] = None, + partitions: Optional[int] = None, ) -> StreamT: """Create new stream that repartitions the stream using a new key. @@ -837,7 +837,7 @@ def derive_topic( self, name: str, *, - schema: SchemaT = None, + schema: Optional[SchemaT] = None, key_type: ModelArg = None, value_type: ModelArg = None, prefix: str = "", @@ -988,7 +988,7 @@ async def _c_aiter(self) -> AsyncIterator[T_co]: # pragma: no cover await self.stop() self.service_reset() - def _set_current_event(self, event: EventT = None) -> None: + def _set_current_event(self, event: Optional[EventT] = None) -> None: if event is None: _current_event.set(None) else: diff --git a/faust/tables/base.py b/faust/tables/base.py index fe52dda56..9ef83a10e 100644 --- a/faust/tables/base.py +++ b/faust/tables/base.py @@ -103,25 +103,25 @@ def __init__( self, app: AppT, *, - name: str = None, + name: Optional[str] = None, default: Callable[[], Any] = None, store: Union[str, URL] = None, - schema: SchemaT = None, + schema: Optional[SchemaT] = None, key_type: ModelArg = None, value_type: ModelArg = None, - partitions: int = None, - window: WindowT = None, - changelog_topic: TopicT = None, - help: str = None, + partitions: Optional[int] = None, + window: Optional[WindowT] = None, + changelog_topic: Optional[TopicT] = None, + help: Optional[str] = None, on_recover: RecoverCallback = None, - on_changelog_event: ChangelogEventCallback = None, + on_changelog_event: Optional[ChangelogEventCallback] = None, recovery_buffer_size: int = 1000, - standby_buffer_size: int = None, - extra_topic_configs: Mapping[str, Any] = None, + standby_buffer_size: Optional[int] = None, + extra_topic_configs: Optional[Mapping[str, Any]] = None, recover_callbacks: Set[RecoverCallback] = None, - options: Mapping[str, Any] = None, + options: Optional[Mapping[str, Any]] = None, use_partitioner: bool = False, - on_window_close: WindowCloseCallback = None, + on_window_close: Optional[WindowCloseCallback] = None, is_global: bool = False, **kwargs: Any, ) -> None: @@ -454,9 +454,9 @@ async def remove_from_stream(self, stream: StreamT) -> None: def _new_changelog_topic( self, *, - retention: Seconds = None, - compacting: bool = None, - deleting: bool = None, + retention: Optional[Seconds] = None, + compacting: Optional[bool] = None, + deleting: Optional[bool] = None, ) -> TopicT: if compacting is None: compacting = self._changelog_compacting @@ -511,14 +511,14 @@ def _window_ranges(self, timestamp: float) -> Iterator[WindowRange]: for window_range in window.ranges(timestamp): yield window_range - def _relative_now(self, event: EventT = None) -> float: + def _relative_now(self, event: Optional[EventT] = None) -> float: # get current timestamp event = event if event is not None else current_event() if event is None: return time.time() return self._partition_latest_timestamp[event.message.partition] - def _relative_event(self, event: EventT = None) -> float: + def _relative_event(self, event: Optional[EventT] = None) -> float: event = event if event is not None else current_event() # get event timestamp if event is None: @@ -526,7 +526,7 @@ def _relative_event(self, event: EventT = None) -> float: return event.message.timestamp def _relative_field(self, field: FieldDescriptorT) -> RelativeHandler: - def to_value(event: EventT = None) -> Union[float, datetime]: + def to_value(event: Optional[EventT] = None) -> Union[float, datetime]: if event is None: raise RuntimeError("Operation outside of stream iteration") return field.getattr(cast(ModelT, event.value)) @@ -534,7 +534,7 @@ def to_value(event: EventT = None) -> Union[float, datetime]: return to_value def _relative_timestamp(self, timestamp: float) -> RelativeHandler: - def handler(event: EventT = None) -> Union[float, datetime]: + def handler(event: Optional[EventT] = None) -> Union[float, datetime]: return timestamp return handler @@ -551,7 +551,9 @@ def _windowed_contains(self, key: Any, timestamp: float) -> bool: window = cast(WindowT, self.window) return self._has_key((key, window.current(timestamp))) - def _windowed_delta(self, key: Any, d: Seconds, event: EventT = None) -> Any: + def _windowed_delta( + self, key: Any, d: Seconds, event: Optional[EventT] = None + ) -> Any: window = cast(WindowT, self.window) return self._get_key( (key, window.delta(self._relative_event(event), d)), diff --git a/faust/tables/recovery.py b/faust/tables/recovery.py index 6038c8fc8..74a729699 100644 --- a/faust/tables/recovery.py +++ b/faust/tables/recovery.py @@ -580,7 +580,7 @@ def _estimated_active_remaining_secs(self, remaining: float) -> Optional[float]: else: return None - async def _wait(self, coro: WaitArgT, timeout: int = None) -> None: + async def _wait(self, coro: WaitArgT, timeout: Optional[int] = None) -> None: signal = self.signal_recovery_start wait_result = await self.wait_first(coro, signal, timeout=timeout) if wait_result.stopped: diff --git a/faust/tables/sets.py b/faust/tables/sets.py index 055aab3c8..1cca3679d 100644 --- a/faust/tables/sets.py +++ b/faust/tables/sets.py @@ -39,13 +39,15 @@ class SetWindowSet(wrappers.WindowSet): """A windowed set.""" - def add(self, element: Any, *, event: EventT = None) -> None: + def add(self, element: Any, *, event: Optional[EventT] = None) -> None: self._apply_set_operation("add", element, event) - def discard(self, element: Any, *, event: EventT = None) -> None: + def discard(self, element: Any, *, event: Optional[EventT] = None) -> None: self._apply_set_operation("discard", element, event) - def _apply_set_operation(self, op: str, element: Any, event: EventT = None) -> None: + def _apply_set_operation( + self, op: str, element: Any, event: Optional[EventT] = None + ) -> None: table = cast(Table, self.table) timestamp = self.wrapper.get_timestamp(event or self.event) key = self.key @@ -275,8 +277,8 @@ def __init__( app: AppT, *, start_manager: bool = False, - manager_topic_name: str = None, - manager_topic_suffix: str = None, + manager_topic_name: Optional[str] = None, + manager_topic_suffix: Optional[str] = None, **kwargs: Any, ) -> None: super().__init__(app, **kwargs) diff --git a/faust/tables/table.py b/faust/tables/table.py index 4eabd8a67..a38d32859 100644 --- a/faust/tables/table.py +++ b/faust/tables/table.py @@ -1,5 +1,5 @@ """Table (key/value changelog stream).""" -from typing import Any, ClassVar, Type +from typing import Any, ClassVar, Optional, Type from mode import Seconds @@ -33,7 +33,7 @@ def hopping( self, size: Seconds, step: Seconds, - expires: Seconds = None, + expires: Optional[Seconds] = None, key_index: bool = False, ) -> WindowWrapperT: """Wrap table in a hopping window.""" @@ -43,7 +43,7 @@ def hopping( ) def tumbling( - self, size: Seconds, expires: Seconds = None, key_index: bool = False + self, size: Seconds, expires: Optional[Seconds] = None, key_index: bool = False ) -> WindowWrapperT: """Wrap table in a tumbling window.""" return self.using_window( diff --git a/faust/tables/wrappers.py b/faust/tables/wrappers.py index 0b45b1d0f..4dfb18d08 100644 --- a/faust/tables/wrappers.py +++ b/faust/tables/wrappers.py @@ -56,7 +56,7 @@ class _Table: class WindowedKeysView(KeysView): """The object returned by ``windowed_table.keys()``.""" - def __init__(self, mapping: WindowWrapperT, event: EventT = None) -> None: + def __init__(self, mapping: WindowWrapperT, event: Optional[EventT] = None) -> None: self._mapping = mapping self.event = event @@ -78,13 +78,13 @@ def now(self) -> Iterator[Any]: for key, _ in wrapper._items_now(): yield key - def current(self, event: EventT = None) -> Iterator[Any]: + def current(self, event: Optional[EventT] = None) -> Iterator[Any]: """Return all keys present in window closest to stream time.""" wrapper = cast(WindowWrapper, self._mapping) for key, _ in wrapper._items_current(event or self.event): yield key - def delta(self, d: Seconds, event: EventT = None) -> Iterator[Any]: + def delta(self, d: Seconds, event: Optional[EventT] = None) -> Iterator[Any]: """Return all keys present in window ±n seconds ago.""" wrapper = cast(WindowWrapper, self._mapping) for key, _ in wrapper._items_delta(d, event or self.event): @@ -94,7 +94,7 @@ def delta(self, d: Seconds, event: EventT = None) -> Iterator[Any]: class WindowedItemsView(WindowedItemsViewT): """The object returned by ``windowed_table.items()``.""" - def __init__(self, mapping: WindowWrapperT, event: EventT = None) -> None: + def __init__(self, mapping: WindowWrapperT, event: Optional[EventT] = None) -> None: self._mapping = mapping self.event = event @@ -111,12 +111,14 @@ def now(self) -> Iterator[Tuple[Any, Any]]: wrapper = cast(WindowWrapper, self._mapping) return wrapper._items_now() - def current(self, event: EventT = None) -> Iterator[Tuple[Any, Any]]: + def current(self, event: Optional[EventT] = None) -> Iterator[Tuple[Any, Any]]: """Return all items present in window closest to stream time.""" wrapper = cast(WindowWrapper, self._mapping) return wrapper._items_current(event or self.event) - def delta(self, d: Seconds, event: EventT = None) -> Iterator[Tuple[Any, Any]]: + def delta( + self, d: Seconds, event: Optional[EventT] = None + ) -> Iterator[Tuple[Any, Any]]: """Return all items present in window ±n seconds ago.""" wrapper = cast(WindowWrapper, self._mapping) return wrapper._items_delta(d, event or self.event) @@ -125,7 +127,7 @@ def delta(self, d: Seconds, event: EventT = None) -> Iterator[Tuple[Any, Any]]: class WindowedValuesView(WindowedValuesViewT): """The object returned by ``windowed_table.values()``.""" - def __init__(self, mapping: WindowWrapperT, event: EventT = None) -> None: + def __init__(self, mapping: WindowWrapperT, event: Optional[EventT] = None) -> None: self._mapping = mapping self.event = event @@ -144,13 +146,13 @@ def now(self) -> Iterator[Any]: for _, value in wrapper._items_now(): yield value - def current(self, event: EventT = None) -> Iterator[Any]: + def current(self, event: Optional[EventT] = None) -> Iterator[Any]: """Return all values present in window closest to stream time.""" wrapper = cast(WindowWrapper, self._mapping) for _, value in wrapper._items_current(event or self.event): yield value - def delta(self, d: Seconds, event: EventT = None) -> Iterator[Any]: + def delta(self, d: Seconds, event: Optional[EventT] = None) -> Iterator[Any]: """Return all values present in window ±n seconds ago.""" wrapper = cast(WindowWrapper, self._mapping) for _, value in wrapper._items_delta(d, event or self.event): @@ -183,7 +185,11 @@ class WindowSet(WindowSetT[KT, VT]): """ def __init__( - self, key: KT, table: TableT, wrapper: WindowWrapperT, event: EventT = None + self, + key: KT, + table: TableT, + wrapper: WindowWrapperT, + event: Optional[EventT] = None, ) -> None: self.key = key self.table = cast(_Table, table) @@ -192,7 +198,7 @@ def __init__( self.data = table # provides underlying mapping in FastUserDict def apply( - self, op: Callable[[VT, VT], VT], value: VT, event: EventT = None + self, op: Callable[[VT, VT], VT], value: VT, event: Optional[EventT] = None ) -> WindowSetT[KT, VT]: """Apply operation to all affected windows.""" table = cast(_Table, self.table) @@ -202,7 +208,7 @@ def apply( table._apply_window_op(op, self.key, value, timestamp) return self - def value(self, event: EventT = None) -> VT: + def value(self, event: Optional[EventT] = None) -> VT: """Return current value. The selected window depends on the current time-relativity @@ -217,12 +223,12 @@ def now(self) -> VT: """Return current value, using the current system time.""" return cast(_Table, self.table)._windowed_now(self.key) - def current(self, event: EventT = None) -> VT: + def current(self, event: Optional[EventT] = None) -> VT: """Return current value, using stream time-relativity.""" t = cast(_Table, self.table) return t._windowed_timestamp(self.key, t._relative_event(event or self.event)) - def delta(self, d: Seconds, event: EventT = None) -> VT: + def delta(self, d: Seconds, event: Optional[EventT] = None) -> VT: """Return value as it was ±n seconds ago.""" table = cast(_Table, self.table) return table._windowed_delta(self.key, d, event or self.event) @@ -322,7 +328,7 @@ def __init__( *, relative_to: RelativeArg = None, key_index: bool = False, - key_index_table: TableT = None, + key_index_table: Optional[TableT] = None, ) -> None: self.table = table self.key_index = key_index @@ -389,7 +395,7 @@ def relative_to_stream(self) -> WindowWrapperT: """ return self.clone(relative_to=self.table._relative_event) - def get_timestamp(self, event: EventT = None) -> float: + def get_timestamp(self, event: Optional[EventT] = None) -> float: """Get timestamp from event.""" event = event or current_event() get_relative_timestamp = self.get_relative_timestamp @@ -471,15 +477,15 @@ def _keys(self) -> Iterator: "support .keys/.items/.values" ) - def values(self, event: EventT = None) -> ValuesView: + def values(self, event: Optional[EventT] = None) -> ValuesView: """Return table values view: iterate over values in this table.""" return WindowedValuesView(self, event or current_event()) - def items(self, event: EventT = None) -> ItemsView: + def items(self, event: Optional[EventT] = None) -> ItemsView: """Return table items view: iterate over ``(key, value)`` pairs.""" return WindowedItemsView(self, event or current_event()) - def _items(self, event: EventT = None) -> Iterator[Tuple[Any, Any]]: + def _items(self, event: Optional[EventT] = None) -> Iterator[Tuple[Any, Any]]: table = cast(_Table, self.table) timestamp = self.get_timestamp(event) for key in self._keys(): @@ -496,7 +502,9 @@ def _items_now(self) -> Iterator[Tuple[Any, Any]]: except KeyError: pass - def _items_current(self, event: EventT = None) -> Iterator[Tuple[Any, Any]]: + def _items_current( + self, event: Optional[EventT] = None + ) -> Iterator[Tuple[Any, Any]]: table = cast(_Table, self.table) timestamp = table._relative_event(event) for key in self._keys(): @@ -505,7 +513,7 @@ def _items_current(self, event: EventT = None) -> Iterator[Tuple[Any, Any]]: except KeyError: pass - def _items_delta(self, d: Seconds, event: EventT = None) -> Iterator[Any]: + def _items_delta(self, d: Seconds, event: Optional[EventT] = None) -> Iterator[Any]: table = cast(_Table, self.table) for key in self._keys(): try: diff --git a/faust/topics.py b/faust/topics.py index 826038696..76eb46ebc 100644 --- a/faust/topics.py +++ b/faust/topics.py @@ -96,29 +96,29 @@ def __init__( self, app: AppT, *, - topics: Sequence[str] = None, + topics: Optional[Sequence[str]] = None, pattern: Union[str, Pattern] = None, - schema: SchemaT = None, + schema: Optional[SchemaT] = None, key_type: ModelArg = None, value_type: ModelArg = None, is_iterator: bool = False, - partitions: int = None, - retention: Seconds = None, - compacting: bool = None, - deleting: bool = None, - replicas: int = None, + partitions: Optional[int] = None, + retention: Optional[Seconds] = None, + compacting: Optional[bool] = None, + deleting: Optional[bool] = None, + replicas: Optional[int] = None, acks: bool = True, internal: bool = False, - config: Mapping[str, Any] = None, - queue: ThrowableQueue = None, + config: Optional[Mapping[str, Any]] = None, + queue: Optional[ThrowableQueue] = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, - maxsize: int = None, - root: ChannelT = None, - active_partitions: Set[TP] = None, - allow_empty: bool = None, + maxsize: Optional[int] = None, + root: Optional[ChannelT] = None, + active_partitions: Optional[Set[TP]] = None, + allow_empty: Optional[bool] = None, has_prefix: bool = False, - loop: asyncio.AbstractEventLoop = None, + loop: Optional[asyncio.AbstractEventLoop] = None, ) -> None: super().__init__( app, @@ -160,13 +160,13 @@ async def send( *, key: K = None, value: V = None, - partition: int = None, - timestamp: float = None, + partition: Optional[int] = None, + timestamp: Optional[float] = None, headers: HeadersArg = None, - schema: SchemaT = None, + schema: Optional[SchemaT] = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, - callback: MessageSentCallback = None, + callback: Optional[MessageSentCallback] = None, force: bool = False, ) -> Awaitable[RecordMetadata]: """Send message to topic.""" @@ -203,13 +203,13 @@ def send_soon( *, key: K = None, value: V = None, - partition: int = None, - timestamp: float = None, + partition: Optional[int] = None, + timestamp: Optional[float] = None, headers: HeadersArg = None, - schema: SchemaT = None, + schema: Optional[SchemaT] = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, - callback: MessageSentCallback = None, + callback: Optional[MessageSentCallback] = None, force: bool = False, eager_partitioning: bool = False, ) -> FutureMessage: @@ -321,18 +321,18 @@ def derive(self, **kwargs: Any) -> ChannelT: def derive_topic( self, *, - topics: Sequence[str] = None, - schema: SchemaT = None, + topics: Optional[Sequence[str]] = None, + schema: Optional[SchemaT] = None, key_type: ModelArg = None, value_type: ModelArg = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, - partitions: int = None, - retention: Seconds = None, - compacting: bool = None, - deleting: bool = None, - internal: bool = None, - config: Mapping[str, Any] = None, + partitions: Optional[int] = None, + retention: Optional[Seconds] = None, + compacting: Optional[bool] = None, + deleting: Optional[bool] = None, + internal: Optional[bool] = None, + config: Optional[Mapping[str, Any]] = None, prefix: str = "", suffix: str = "", **kwargs: Any, diff --git a/faust/transport/base.py b/faust/transport/base.py index 9ecd9be68..6b5d173ca 100644 --- a/faust/transport/base.py +++ b/faust/transport/base.py @@ -9,7 +9,7 @@ :file:`faust/transport/drivers/aiokafka.py` """ import asyncio -from typing import Any, ClassVar, List, Type +from typing import Any, ClassVar, List, Optional, Type from mode.services import ServiceT from yarl import URL @@ -54,7 +54,10 @@ class Transport(TransportT): driver_version: str def __init__( - self, url: List[URL], app: AppT, loop: asyncio.AbstractEventLoop = None + self, + url: List[URL], + app: AppT, + loop: Optional[asyncio.AbstractEventLoop] = None, ) -> None: self.url = url self.app = app diff --git a/faust/transport/consumer.py b/faust/transport/consumer.py index 3be0ffb3b..4531da79c 100644 --- a/faust/transport/consumer.py +++ b/faust/transport/consumer.py @@ -284,7 +284,7 @@ async def send( timestamp: Optional[float], headers: Optional[HeadersArg], *, - transactional_id: str = None, + transactional_id: Optional[str] = None, ) -> Awaitable[RecordMetadata]: """Schedule message to be sent by producer.""" group = transactional_id = None @@ -314,7 +314,7 @@ async def send_and_wait( timestamp: Optional[float], headers: Optional[HeadersArg], *, - transactional_id: str = None, + transactional_id: Optional[str] = None, ) -> RecordMetadata: """Send message and wait for it to be transmitted.""" fut = await self.send(topic, key, value, partition, timestamp, headers) @@ -355,11 +355,11 @@ async def create_topic( partitions: int, replication: int, *, - config: Mapping[str, Any] = None, + config: Optional[Mapping[str, Any]] = None, timeout: Seconds = 30.0, - retention: Seconds = None, - compacting: bool = None, - deleting: bool = None, + retention: Optional[Seconds] = None, + compacting: Optional[bool] = None, + deleting: Optional[bool] = None, ensure_created: bool = False, ) -> None: """Create/declare topic on server.""" @@ -445,9 +445,9 @@ def __init__( on_partitions_revoked: PartitionsRevokedCallback, on_partitions_assigned: PartitionsAssignedCallback, *, - commit_interval: float = None, - commit_livelock_soft_timeout: float = None, - loop: asyncio.AbstractEventLoop = None, + commit_interval: Optional[float] = None, + commit_livelock_soft_timeout: Optional[float] = None, + loop: Optional[asyncio.AbstractEventLoop] = None, **kwargs: Any, ) -> None: assert callback is not None @@ -1299,11 +1299,11 @@ async def create_topic( partitions: int, replication: int, *, - config: Mapping[str, Any] = None, + config: Optional[Mapping[str, Any]] = None, timeout: Seconds = 30.0, - retention: Seconds = None, - compacting: bool = None, - deleting: bool = None, + retention: Optional[Seconds] = None, + compacting: Optional[bool] = None, + deleting: Optional[bool] = None, ensure_created: bool = False, ) -> None: """Create/declare topic on server.""" @@ -1323,7 +1323,7 @@ async def on_partitions_assigned( @abc.abstractmethod def key_partition( - self, topic: str, key: Optional[bytes], partition: int = None + self, topic: str, key: Optional[bytes], partition: Optional[int] = None ) -> Optional[int]: """Hash key to determine partition number.""" ... @@ -1435,7 +1435,7 @@ def close(self) -> None: self._thread.close() def key_partition( - self, topic: str, key: Optional[bytes], partition: int = None + self, topic: str, key: Optional[bytes], partition: Optional[int] = None ) -> Optional[int]: """Hash key to determine partition number.""" return self._thread.key_partition(topic, key, partition=partition) diff --git a/faust/transport/drivers/aiokafka.py b/faust/transport/drivers/aiokafka.py index c15751c55..3ac986ea0 100644 --- a/faust/transport/drivers/aiokafka.py +++ b/faust/transport/drivers/aiokafka.py @@ -226,11 +226,11 @@ async def create_topic( partitions: int, replication: int, *, - config: Mapping[str, Any] = None, + config: Optional[Mapping[str, Any]] = None, timeout: Seconds = 30.0, - retention: Seconds = None, - compacting: bool = None, - deleting: bool = None, + retention: Optional[Seconds] = None, + compacting: Optional[bool] = None, + deleting: Optional[bool] = None, ensure_created: bool = False, ) -> None: """Create/declare topic on server.""" @@ -292,8 +292,8 @@ def __init__( app, *, executor: Any = None, - loop: asyncio.AbstractEventLoop = None, - thread_loop: asyncio.AbstractEventLoop = None, + loop: Optional[asyncio.AbstractEventLoop] = None, + thread_loop: Optional[asyncio.AbstractEventLoop] = None, Worker: Type[WorkerThread] = None, **kwargs: Any, ) -> None: @@ -319,7 +319,9 @@ async def flush(self) -> None: if self._producer is not None: await self._producer.flush() - def _new_producer(self, transactional_id: str = None) -> aiokafka.AIOKafkaProducer: + def _new_producer( + self, transactional_id: Optional[str] = None + ) -> aiokafka.AIOKafkaProducer: return aiokafka.AIOKafkaProducer( loop=self.thread_loop, **{ @@ -975,8 +977,8 @@ async def _fetch_records( self, consumer: aiokafka.AIOKafkaConsumer, active_partitions: Set[TP], - timeout: float = None, - max_records: int = None, + timeout: Optional[float] = None, + max_records: Optional[int] = None, ) -> RecordMap: if not self.consumer.flow_active: return {} @@ -999,11 +1001,11 @@ async def create_topic( partitions: int, replication: int, *, - config: Mapping[str, Any] = None, + config: Optional[Mapping[str, Any]] = None, timeout: Seconds = 30.0, - retention: Seconds = None, - compacting: bool = None, - deleting: bool = None, + retention: Optional[Seconds] = None, + compacting: Optional[bool] = None, + deleting: Optional[bool] = None, ensure_created: bool = False, ) -> None: """Create/declare topic on server.""" @@ -1030,7 +1032,7 @@ async def create_topic( ) def key_partition( - self, topic: str, key: Optional[bytes], partition: int = None + self, topic: str, key: Optional[bytes], partition: Optional[int] = None ) -> Optional[int]: """Hash key to determine partition destination.""" consumer = self._ensure_consumer() @@ -1207,7 +1209,9 @@ def _settings_extra(self) -> Mapping[str, Any]: return {"acks": "all", "enable_idempotence": True} return {} - def _new_producer(self, transactional_id: str = None) -> aiokafka.AIOKafkaProducer: + def _new_producer( + self, transactional_id: Optional[str] = None + ) -> aiokafka.AIOKafkaProducer: return self._producer_type( loop=self.loop, **{ @@ -1228,11 +1232,11 @@ async def create_topic( partitions: int, replication: int, *, - config: Mapping[str, Any] = None, + config: Optional[Mapping[str, Any]] = None, timeout: Seconds = 20.0, - retention: Seconds = None, - compacting: bool = None, - deleting: bool = None, + retention: Optional[Seconds] = None, + compacting: Optional[bool] = None, + deleting: Optional[bool] = None, ensure_created: bool = False, ) -> None: """Create/declare topic on server.""" @@ -1285,7 +1289,7 @@ async def send( timestamp: Optional[float], headers: Optional[HeadersArg], *, - transactional_id: str = None, + transactional_id: Optional[str] = None, ) -> Awaitable[RecordMetadata]: """Schedule message to be transmitted by producer.""" @@ -1349,7 +1353,7 @@ async def send_and_wait( timestamp: Optional[float], headers: Optional[HeadersArg], *, - transactional_id: str = None, + transactional_id: Optional[str] = None, ) -> RecordMetadata: """Send message and wait for it to be transmitted.""" fut = await self.send( @@ -1411,7 +1415,10 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: self._topic_waiters = {} def _topic_config( - self, retention: int = None, compacting: bool = None, deleting: bool = None + self, + retention: Optional[int] = None, + compacting: Optional[bool] = None, + deleting: Optional[bool] = None, ) -> MutableMapping[str, Any]: config: MutableMapping[str, Any] = {} cleanup_flags: Set[str] = set() @@ -1481,11 +1488,11 @@ async def _really_create_topic( partitions: int, replication: int, *, - config: Mapping[str, Any] = None, + config: Optional[Mapping[str, Any]] = None, timeout: int = 30000, - retention: int = None, - compacting: bool = None, - deleting: bool = None, + retention: Optional[int] = None, + compacting: Optional[bool] = None, + deleting: Optional[bool] = None, ensure_created: bool = False, ) -> None: # pragma: no cover owner.log.info("Creating topic %r", topic) @@ -1543,7 +1550,7 @@ async def _really_create_topic( def credentials_to_aiokafka_auth( - credentials: CredentialsT = None, ssl_context: Any = None + credentials: Optional[CredentialsT] = None, ssl_context: Any = None ) -> Mapping: if credentials is not None: if isinstance(credentials, SSLCredentials): diff --git a/faust/transport/producer.py b/faust/transport/producer.py index 82d8c6b06..abf32bb0d 100644 --- a/faust/transport/producer.py +++ b/faust/transport/producer.py @@ -22,7 +22,7 @@ class ProducerBuffer(Service, ProducerBufferT): - app: AppT = None + app: Optional[AppT] = None max_messages = 100 queue: Optional[asyncio.Queue] = None @@ -123,7 +123,7 @@ class Producer(Service, ProducerT): def __init__( self, transport: TransportT, - loop: asyncio.AbstractEventLoop = None, + loop: Optional[asyncio.AbstractEventLoop] = None, **kwargs: Any, ) -> None: self.transport = transport @@ -160,7 +160,7 @@ async def send( timestamp: Optional[float], headers: Optional[HeadersArg], *, - transactional_id: str = None, + transactional_id: Optional[str] = None, ) -> Awaitable[RecordMetadata]: """Schedule message to be sent by producer.""" raise NotImplementedError() @@ -177,7 +177,7 @@ async def send_and_wait( timestamp: Optional[float], headers: Optional[HeadersArg], *, - transactional_id: str = None, + transactional_id: Optional[str] = None, ) -> RecordMetadata: """Send message and wait for it to be transmitted.""" raise NotImplementedError() @@ -193,11 +193,11 @@ async def create_topic( partitions: int, replication: int, *, - config: Mapping[str, Any] = None, + config: Optional[Mapping[str, Any]] = None, timeout: Seconds = 1000.0, - retention: Seconds = None, - compacting: bool = None, - deleting: bool = None, + retention: Optional[Seconds] = None, + compacting: Optional[bool] = None, + deleting: Optional[bool] = None, ensure_created: bool = False, ) -> None: """Create/declare topic on server.""" diff --git a/faust/types/agents.py b/faust/types/agents.py index 7c9c2d76e..0e59c4a5d 100644 --- a/faust/types/agents.py +++ b/faust/types/agents.py @@ -88,7 +88,7 @@ def __init__( agent: "AgentT", stream: StreamT, it: _T, - active_partitions: Set[TP] = None, + active_partitions: Optional[Set[TP]] = None, **kwargs: Any ) -> None: ... @@ -135,15 +135,15 @@ def __init__( self, fun: AgentFun, *, - name: str = None, - app: _AppT = None, + name: Optional[str] = None, + app: Optional[_AppT] = None, channel: Union[str, ChannelT] = None, concurrency: int = 1, sink: Iterable[SinkT] = None, on_error: AgentErrorHandler = None, supervisor_strategy: Type[SupervisorStrategyT] = None, - help: str = None, - schema: SchemaT = None, + help: Optional[str] = None, + schema: Optional[SchemaT] = None, key_type: ModelArg = None, value_type: ModelArg = None, isolated_partitions: bool = False, @@ -159,17 +159,17 @@ def actor_tracebacks(self) -> List[str]: def __call__( self, *, - index: int = None, - active_partitions: Set[TP] = None, - stream: StreamT = None, - channel: ChannelT = None + index: Optional[int] = None, + active_partitions: Optional[Set[TP]] = None, + stream: Optional[StreamT] = None, + channel: Optional[ChannelT] = None ) -> ActorRefT: ... @abc.abstractmethod def test_context( self, - channel: ChannelT = None, + channel: Optional[ChannelT] = None, supervisor_strategy: SupervisorStrategyT = None, **kwargs: Any ) -> "AgentTestWrapperT": @@ -197,8 +197,8 @@ async def cast( value: V = None, *, key: K = None, - partition: int = None, - timestamp: float = None, + partition: Optional[int] = None, + timestamp: Optional[float] = None, headers: HeadersArg = None ) -> None: ... @@ -209,11 +209,11 @@ async def ask( value: V = None, *, key: K = None, - partition: int = None, - timestamp: float = None, + partition: Optional[int] = None, + timestamp: Optional[float] = None, headers: HeadersArg = None, reply_to: ReplyToArg = None, - correlation_id: str = None + correlation_id: Optional[str] = None ) -> Any: ... @@ -223,13 +223,13 @@ async def send( *, key: K = None, value: V = None, - partition: int = None, - timestamp: float = None, + partition: Optional[int] = None, + timestamp: Optional[float] = None, headers: HeadersArg = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, reply_to: ReplyToArg = None, - correlation_id: str = None + correlation_id: Optional[str] = None ) -> Awaitable[RecordMetadata]: ... @@ -334,7 +334,7 @@ class AgentTestWrapperT(AgentT, AsyncIterable): @abc.abstractmethod def __init__( - self, *args: Any, original_channel: ChannelT = None, **kwargs: Any + self, *args: Any, original_channel: Optional[ChannelT] = None, **kwargs: Any ) -> None: ... @@ -343,14 +343,14 @@ async def put( self, value: V = None, key: K = None, - partition: int = None, - timestamp: float = None, + partition: Optional[int] = None, + timestamp: Optional[float] = None, headers: HeadersArg = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, *, reply_to: ReplyToArg = None, - correlation_id: str = None, + correlation_id: Optional[str] = None, wait: bool = True ) -> EventT: ... @@ -363,7 +363,7 @@ def to_message( *, partition: int = 0, offset: int = 0, - timestamp: float = None, + timestamp: Optional[float] = None, timestamp_type: int = 0, headers: HeadersArg = None ) -> Message: diff --git a/faust/types/app.py b/faust/types/app.py index f036931b2..be8a5e2ca 100644 --- a/faust/types/app.py +++ b/faust/types/app.py @@ -108,7 +108,7 @@ def default_tracer(self) -> opentracing.Tracer: @abc.abstractmethod def trace( - self, name: str, sample_rate: float = None, **extra_context: Any + self, name: str, sample_rate: Optional[float] = None, **extra_context: Any ) -> ContextManager: ... @@ -137,10 +137,10 @@ def __init__( self, app: "AppT", *, - enable_web: bool = None, + enable_web: Optional[bool] = None, enable_kafka: bool = True, - enable_kafka_producer: bool = None, - enable_kafka_consumer: bool = None, + enable_kafka_producer: Optional[bool] = None, + enable_kafka_consumer: Optional[bool] = None, enable_sensors: bool = True ) -> None: ... @@ -257,23 +257,23 @@ def topic( self, *topics: str, pattern: Union[str, Pattern] = None, - schema: _SchemaT = None, + schema: Optional[_SchemaT] = None, key_type: _ModelArg = None, value_type: _ModelArg = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, - partitions: int = None, - retention: Seconds = None, - compacting: bool = None, - deleting: bool = None, - replicas: int = None, + partitions: Optional[int] = None, + retention: Optional[Seconds] = None, + compacting: Optional[bool] = None, + deleting: Optional[bool] = None, + replicas: Optional[int] = None, acks: bool = True, internal: bool = False, - config: Mapping[str, Any] = None, - maxsize: int = None, + config: Optional[Mapping[str, Any]] = None, + maxsize: Optional[int] = None, allow_empty: bool = False, has_prefix: bool = False, - loop: asyncio.AbstractEventLoop = None + loop: Optional[asyncio.AbstractEventLoop] = None ) -> TopicT: ... @@ -281,11 +281,11 @@ def topic( def channel( self, *, - schema: _SchemaT = None, + schema: Optional[_SchemaT] = None, key_type: _ModelArg = None, value_type: _ModelArg = None, - maxsize: int = None, - loop: asyncio.AbstractEventLoop = None + maxsize: Optional[int] = None, + loop: Optional[asyncio.AbstractEventLoop] = None ) -> ChannelT: ... @@ -294,7 +294,7 @@ def agent( self, channel: Union[str, ChannelT[_T]] = None, *, - name: str = None, + name: Optional[str] = None, concurrency: int = 1, supervisor_strategy: Type[SupervisorStrategyT] = None, sink: Iterable[SinkT] = None, @@ -317,7 +317,7 @@ def timer( interval: Seconds, on_leader: bool = False, traced: bool = True, - name: str = None, + name: Optional[str] = None, max_drift_correction: float = 0.1, ) -> Callable: ... @@ -339,7 +339,7 @@ def service(self, cls: Type[ServiceT]) -> Type[ServiceT]: @abc.abstractmethod def stream( - self, channel: AsyncIterable, beacon: NodeT = None, **kwargs: Any + self, channel: AsyncIterable, beacon: Optional[NodeT] = None, **kwargs: Any ) -> StreamT: ... @@ -349,9 +349,9 @@ def Table( name: str, *, default: Callable[[], Any] = None, - window: WindowT = None, - partitions: int = None, - help: str = None, + window: Optional[WindowT] = None, + partitions: Optional[int] = None, + help: Optional[str] = None, **kwargs: Any ) -> TableT: ... @@ -362,9 +362,9 @@ def GlobalTable( name: str, *, default: Callable[[], Any] = None, - window: WindowT = None, - partitions: int = None, - help: str = None, + window: Optional[WindowT] = None, + partitions: Optional[int] = None, + help: Optional[str] = None, **kwargs: Any ) -> TableT: ... @@ -374,10 +374,10 @@ def SetTable( self, name: str, *, - window: WindowT = None, - partitions: int = None, + window: Optional[WindowT] = None, + partitions: Optional[int] = None, start_manager: bool = False, - help: str = None, + help: Optional[str] = None, **kwargs: Any ) -> TableT: ... @@ -387,10 +387,10 @@ def SetGlobalTable( self, name: str, *, - window: WindowT = None, - partitions: int = None, + window: Optional[WindowT] = None, + partitions: Optional[int] = None, start_manager: bool = False, - help: str = None, + help: Optional[str] = None, **kwargs: Any ) -> TableT: ... @@ -402,7 +402,7 @@ def page( *, base: Type[View] = View, cors_options: Mapping[str, ResourceOptions] = None, - name: str = None + name: Optional[str] = None ) -> Callable[[PageArg], Type[View]]: ... @@ -410,11 +410,11 @@ def page( def table_route( self, table: CollectionT, - shard_param: str = None, + shard_param: Optional[str] = None, *, - query_param: str = None, - match_info: str = None, - exact_key: str = None + query_param: Optional[str] = None, + match_info: Optional[str] = None, + exact_key: Optional[str] = None ) -> ViewDecorator: ... @@ -450,13 +450,13 @@ async def send( channel: Union[ChannelT, str], key: K = None, value: V = None, - partition: int = None, - timestamp: float = None, + partition: Optional[int] = None, + timestamp: Optional[float] = None, headers: HeadersArg = None, - schema: _SchemaT = None, + schema: Optional[_SchemaT] = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, - callback: MessageSentCallback = None, + callback: Optional[MessageSentCallback] = None, ) -> Awaitable[RecordMetadata]: ... @@ -476,10 +476,10 @@ def is_leader(self) -> bool: @abc.abstractmethod def FlowControlQueue( self, - maxsize: int = None, + maxsize: Optional[int] = None, *, clear_on_resume: bool = False, - loop: asyncio.AbstractEventLoop = None + loop: Optional[asyncio.AbstractEventLoop] = None ) -> ThrowableQueue: ... diff --git a/faust/types/channels.py b/faust/types/channels.py index 4a1beb617..f8468f692 100644 --- a/faust/types/channels.py +++ b/faust/types/channels.py @@ -52,20 +52,22 @@ def __init__( self, app: _AppT, *, - schema: _SchemaT = None, + schema: Optional[_SchemaT] = None, key_type: _ModelArg = None, value_type: _ModelArg = None, is_iterator: bool = False, - queue: ThrowableQueue = None, - maxsize: int = None, + queue: Optional[ThrowableQueue] = None, + maxsize: Optional[int] = None, root: "ChannelT" = None, - active_partitions: Set[TP] = None, - loop: asyncio.AbstractEventLoop = None + active_partitions: Optional[Set[TP]] = None, + loop: Optional[asyncio.AbstractEventLoop] = None ) -> None: ... @abc.abstractmethod - def clone(self, *, is_iterator: bool = None, **kwargs: Any) -> "ChannelT[_T]": + def clone( + self, *, is_iterator: Optional[bool] = None, **kwargs: Any + ) -> "ChannelT[_T]": ... @abc.abstractmethod @@ -86,13 +88,13 @@ async def send( *, key: K = None, value: V = None, - partition: int = None, - timestamp: float = None, + partition: Optional[int] = None, + timestamp: Optional[float] = None, headers: HeadersArg = None, - schema: _SchemaT = None, + schema: Optional[_SchemaT] = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, - callback: MessageSentCallback = None, + callback: Optional[MessageSentCallback] = None, force: bool = False ) -> Awaitable[RecordMetadata]: ... @@ -103,13 +105,13 @@ def send_soon( *, key: K = None, value: V = None, - partition: int = None, - timestamp: float = None, + partition: Optional[int] = None, + timestamp: Optional[float] = None, headers: HeadersArg = None, - schema: _SchemaT = None, + schema: Optional[_SchemaT] = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, - callback: MessageSentCallback = None, + callback: Optional[MessageSentCallback] = None, force: bool = False, eager_partitioning: bool = False ) -> FutureMessage: @@ -120,13 +122,13 @@ def as_future_message( self, key: K = None, value: V = None, - partition: int = None, - timestamp: float = None, + partition: Optional[int] = None, + timestamp: Optional[float] = None, headers: HeadersArg = None, - schema: _SchemaT = None, + schema: Optional[_SchemaT] = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, - callback: MessageSentCallback = None, + callback: Optional[MessageSentCallback] = None, eager_partitioning: bool = False, ) -> FutureMessage: ... @@ -148,13 +150,13 @@ async def declare(self) -> None: @abc.abstractmethod def prepare_key( - self, key: K, key_serializer: CodecArg, schema: _SchemaT = None + self, key: K, key_serializer: CodecArg, schema: Optional[_SchemaT] = None ) -> Any: ... @abc.abstractmethod def prepare_value( - self, value: V, value_serializer: CodecArg, schema: _SchemaT = None + self, value: V, value_serializer: CodecArg, schema: Optional[_SchemaT] = None ) -> Any: ... @@ -171,7 +173,7 @@ async def put(self, value: _EventT[_T]) -> None: ... @abc.abstractmethod - async def get(self, *, timeout: Seconds = None) -> _EventT[_T]: + async def get(self, *, timeout: Optional[Seconds] = None) -> _EventT[_T]: ... @abc.abstractmethod diff --git a/faust/types/events.py b/faust/types/events.py index 491971c57..6894137dc 100644 --- a/faust/types/events.py +++ b/faust/types/events.py @@ -55,13 +55,13 @@ async def send( channel: Union[str, _ChannelT], key: K = None, value: V = None, - partition: int = None, - timestamp: float = None, + partition: Optional[int] = None, + timestamp: Optional[float] = None, headers: HeadersArg = None, - schema: _SchemaT = None, + schema: Optional[_SchemaT] = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, - callback: MessageSentCallback = None, + callback: Optional[MessageSentCallback] = None, force: bool = False, ) -> Awaitable[RecordMetadata]: ... @@ -72,13 +72,13 @@ async def forward( channel: Union[str, _ChannelT], key: Any = None, value: Any = None, - partition: int = None, - timestamp: float = None, + partition: Optional[int] = None, + timestamp: Optional[float] = None, headers: HeadersArg = None, - schema: _SchemaT = None, + schema: Optional[_SchemaT] = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, - callback: MessageSentCallback = None, + callback: Optional[MessageSentCallback] = None, force: bool = False, ) -> Awaitable[RecordMetadata]: ... diff --git a/faust/types/models.py b/faust/types/models.py index 3cba768bf..12737eec7 100644 --- a/faust/types/models.py +++ b/faust/types/models.py @@ -209,15 +209,15 @@ class FieldDescriptorT(Generic[T]): def __init__( self, *, - field: str = None, - input_name: str = None, - output_name: str = None, - type: Type[T] = None, - model: Type[ModelT] = None, + field: Optional[str] = None, + input_name: Optional[str] = None, + output_name: Optional[str] = None, + type: Optional[Type[T]] = None, + model: Optional[Type[ModelT]] = None, required: bool = True, default: T = None, parent: "FieldDescriptorT" = None, - exclude: bool = None, + exclude: Optional[bool] = None, date_parser: Callable[[Any], datetime] = None, **kwargs: Any ) -> None: diff --git a/faust/types/serializers.py b/faust/types/serializers.py index 8a77eb6de..4b434f31c 100644 --- a/faust/types/serializers.py +++ b/faust/types/serializers.py @@ -88,7 +88,7 @@ def __init__( value_type: _ModelArg = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, - allow_empty: bool = None + allow_empty: Optional[bool] = None ) -> None: ... @@ -100,7 +100,7 @@ def update( value_type: _ModelArg = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, - allow_empty: bool = None + allow_empty: Optional[bool] = None ) -> None: ... @@ -110,7 +110,7 @@ def loads_key( app: _AppT, message: _Message, *, - loads: Callable = None, + loads: Optional[Callable] = None, serializer: CodecArg = None ) -> KT: ... @@ -121,7 +121,7 @@ def loads_value( app: _AppT, message: _Message, *, - loads: Callable = None, + loads: Optional[Callable] = None, serializer: CodecArg = None ) -> VT: ... diff --git a/faust/types/settings/params.py b/faust/types/settings/params.py index 7d24e4363..61d1c59f1 100644 --- a/faust/types/settings/params.py +++ b/faust/types/settings/params.py @@ -295,21 +295,21 @@ def __init__( self, *, name: str, - env_name: str = None, + env_name: Optional[str] = None, default: IT = None, - default_alias: str = None, - default_template: str = None, - allow_none: bool = None, - ignore_default: bool = None, + default_alias: Optional[str] = None, + default_template: Optional[str] = None, + allow_none: Optional[bool] = None, + ignore_default: Optional[bool] = None, section: _Section = None, - version_introduced: str = None, - version_deprecated: str = None, - version_removed: str = None, + version_introduced: Optional[str] = None, + version_deprecated: Optional[str] = None, + version_removed: Optional[str] = None, version_changed: Mapping[str, str] = None, - deprecation_reason: str = None, + deprecation_reason: Optional[str] = None, related_cli_options: Mapping[str, List[str]] = None, related_settings: List[Any] = None, - help: str = None, + help: Optional[str] = None, **kwargs: Any, ) -> None: assert name @@ -522,8 +522,8 @@ class Number(Param[IT, OT]): def _init_options( self, - min_value: int = None, - max_value: int = None, + min_value: Optional[int] = None, + max_value: Optional[int] = None, number_aliases: Mapping[IT, OT] = None, **kwargs: Any, ) -> None: diff --git a/faust/types/settings/sections.py b/faust/types/settings/sections.py index 920f475bf..4c5cd3cae 100644 --- a/faust/types/settings/sections.py +++ b/faust/types/settings/sections.py @@ -56,7 +56,12 @@ class Section: content: Optional[str] def __init__( - self, type: SectionType, title: str, refid: str, *, content: str = None + self, + type: SectionType, + title: str, + refid: str, + *, + content: Optional[str] = None, ) -> None: self.type = type self.title = title diff --git a/faust/types/settings/settings.py b/faust/types/settings/settings.py index c78d1e5c5..69cdb0676 100644 --- a/faust/types/settings/settings.py +++ b/faust/types/settings/settings.py @@ -81,37 +81,37 @@ def __init__( autodiscover: AutodiscoverArg = None, datadir: typing.Union[str, Path] = None, tabledir: typing.Union[str, Path] = None, - debug: bool = None, - env_prefix: str = None, - id_format: str = None, - origin: str = None, + debug: Optional[bool] = None, + env_prefix: Optional[str] = None, + id_format: Optional[str] = None, + origin: Optional[str] = None, timezone: typing.Union[str, tzinfo] = None, - version: int = None, + version: Optional[int] = None, # Agent settings: agent_supervisor: SymbolArg[Type[SupervisorStrategyT]] = None, # Broker settings: broker: BrokerArg = None, broker_consumer: BrokerArg = None, broker_producer: BrokerArg = None, - broker_api_version: str = None, - broker_check_crcs: bool = None, - broker_client_id: str = None, - broker_commit_every: int = None, - broker_commit_interval: Seconds = None, - broker_commit_livelock_soft_timeout: Seconds = None, + broker_api_version: Optional[str] = None, + broker_check_crcs: Optional[bool] = None, + broker_client_id: Optional[str] = None, + broker_commit_every: Optional[int] = None, + broker_commit_interval: Optional[Seconds] = None, + broker_commit_livelock_soft_timeout: Optional[Seconds] = None, broker_credentials: CredentialsArg = None, - broker_heartbeat_interval: Seconds = None, - broker_max_poll_interval: Seconds = None, - broker_max_poll_records: int = None, - broker_rebalance_timeout: Seconds = None, - broker_request_timeout: Seconds = None, - broker_session_timeout: Seconds = None, + broker_heartbeat_interval: Optional[Seconds] = None, + broker_max_poll_interval: Optional[Seconds] = None, + broker_max_poll_records: Optional[int] = None, + broker_rebalance_timeout: Optional[Seconds] = None, + broker_request_timeout: Optional[Seconds] = None, + broker_session_timeout: Optional[Seconds] = None, ssl_context: ssl.SSLContext = None, # Consumer settings: - consumer_api_version: str = None, - consumer_max_fetch_size: int = None, - consumer_auto_offset_reset: str = None, - consumer_group_instance_id: str = None, + consumer_api_version: Optional[str] = None, + consumer_max_fetch_size: Optional[int] = None, + consumer_auto_offset_reset: Optional[str] = None, + consumer_group_instance_id: Optional[str] = None, # Topic serialization settings: key_serializer: CodecArg = None, value_serializer: CodecArg = None, @@ -119,51 +119,51 @@ def __init__( logging_config: Mapping = None, loghandlers: List[logging.Handler] = None, # Producer settings: - producer_acks: int = None, - producer_api_version: str = None, - producer_compression_type: str = None, - producer_linger_ms: int = None, - producer_max_batch_size: int = None, - producer_max_request_size: int = None, + producer_acks: Optional[int] = None, + producer_api_version: Optional[str] = None, + producer_compression_type: Optional[str] = None, + producer_linger_ms: Optional[int] = None, + producer_max_batch_size: Optional[int] = None, + producer_max_request_size: Optional[int] = None, producer_partitioner: SymbolArg[PartitionerT] = None, - producer_request_timeout: Seconds = None, + producer_request_timeout: Optional[Seconds] = None, producer_threaded: bool = False, # RPC settings: - reply_create_topic: bool = None, - reply_expires: Seconds = None, - reply_to: str = None, - reply_to_prefix: str = None, + reply_create_topic: Optional[bool] = None, + reply_expires: Optional[Seconds] = None, + reply_to: Optional[str] = None, + reply_to_prefix: Optional[str] = None, # Stream settings: processing_guarantee: Union[str, ProcessingGuarantee] = None, - stream_buffer_maxsize: int = None, - stream_processing_timeout: Seconds = None, - stream_publish_on_commit: bool = None, - stream_recovery_delay: Seconds = None, - stream_wait_empty: bool = None, + stream_buffer_maxsize: Optional[int] = None, + stream_processing_timeout: Optional[Seconds] = None, + stream_publish_on_commit: Optional[bool] = None, + stream_recovery_delay: Optional[Seconds] = None, + stream_wait_empty: Optional[bool] = None, # Table settings: store: URLArg = None, - table_cleanup_interval: Seconds = None, - table_key_index_size: int = None, - table_standby_replicas: int = None, + table_cleanup_interval: Optional[Seconds] = None, + table_key_index_size: Optional[int] = None, + table_standby_replicas: Optional[int] = None, # Topic settings: - topic_allow_declare: bool = None, - topic_disable_leader: bool = None, - topic_partitions: int = None, - topic_replication_factor: int = None, + topic_allow_declare: Optional[bool] = None, + topic_disable_leader: Optional[bool] = None, + topic_partitions: Optional[int] = None, + topic_replication_factor: Optional[int] = None, # Web server settings: cache: URLArg = None, canonical_url: URLArg = None, web: URLArg = None, - web_bind: str = None, + web_bind: Optional[str] = None, web_cors_options: typing.Mapping[str, ResourceOptions] = None, - web_enabled: bool = None, - web_host: str = None, - web_in_thread: bool = None, - web_port: int = None, + web_enabled: Optional[bool] = None, + web_host: Optional[str] = None, + web_in_thread: Optional[bool] = None, + web_port: Optional[int] = None, web_ssl_context: ssl.SSLContext = None, web_transport: URLArg = None, # Worker settings: - worker_redirect_stdouts: bool = None, + worker_redirect_stdouts: Optional[bool] = None, worker_redirect_stdouts_level: Severity = None, # Extension settings: Agent: SymbolArg[Type[AgentT]] = None, @@ -185,8 +185,8 @@ def __init__( HttpClient: SymbolArg[Type[HttpClientT]] = None, Monitor: SymbolArg[Type[SensorT]] = None, # Deprecated settings: - stream_ack_cancelled_tasks: bool = None, - stream_ack_exceptions: bool = None, + stream_ack_cancelled_tasks: Optional[bool] = None, + stream_ack_exceptions: Optional[bool] = None, url: URLArg = None, **kwargs: Any, ) -> None: @@ -202,7 +202,10 @@ def on_init(self, id: str, **kwargs: Any) -> None: self.id = id def _init_env_prefix( - self, env: Mapping[str, str] = None, env_prefix: str = None, **kwargs: Any + self, + env: Mapping[str, str] = None, + env_prefix: Optional[str] = None, + **kwargs: Any, ) -> None: if env is None: env = os.environ @@ -1060,7 +1063,7 @@ def consumer_api_version(self) -> str: params.UnsignedInt, version_introduced="1.4", env_name="CONSUMER_MAX_FETCH_SIZE", - default=1024 ** 2, + default=1024**2, ) def consumer_max_fetch_size(self) -> int: """Consumer max fetch size. @@ -1543,7 +1546,7 @@ def stream_processing_timeout(self) -> float: main_topic = app.topic('main') deadletter_topic = app.topic('main_deadletter') - async def send_request(value, timeout: float = None) -> None: + async def send_request(value, timeout: Optional[float] = None) -> None: await app.http_client.get('http://foo.com', timeout=timeout) @app.agent(main_topic) diff --git a/faust/types/stores.py b/faust/types/stores.py index a9cbac723..238c8cee5 100644 --- a/faust/types/stores.py +++ b/faust/types/stores.py @@ -56,7 +56,7 @@ def __init__( value_type: _ModelArg = None, key_serializer: CodecArg = "", value_serializer: CodecArg = "", - options: Mapping[str, Any] = None, + options: Optional[Mapping[str, Any]] = None, **kwargs: Any ) -> None: ... diff --git a/faust/types/streams.py b/faust/types/streams.py index dd02942dd..48875292d 100644 --- a/faust/types/streams.py +++ b/faust/types/streams.py @@ -135,18 +135,18 @@ def __init__( self, channel: AsyncIterator[T_co] = None, *, - app: _AppT = None, + app: Optional[_AppT] = None, processors: Iterable[Processor[T]] = None, combined: List[JoinableT] = None, - on_start: Callable = None, + on_start: Optional[Callable] = None, join_strategy: _JoinT = None, - beacon: NodeT = None, - concurrency_index: int = None, + beacon: Optional[NodeT] = None, + concurrency_index: Optional[int] = None, prev: "StreamT" = None, - active_partitions: Set[TP] = None, + active_partitions: Optional[Set[TP]] = None, enable_acks: bool = True, prefix: str = "", - loop: asyncio.AbstractEventLoop = None + loop: Optional[asyncio.AbstractEventLoop] = None ) -> None: ... @@ -195,7 +195,11 @@ def echo(self, *channels: Union[str, ChannelT]) -> "StreamT": @abc.abstractmethod def group_by( - self, key: GroupByKeyArg, *, name: str = None, topic: TopicT = None + self, + key: GroupByKeyArg, + *, + name: Optional[str] = None, + topic: Optional[TopicT] = None ) -> "StreamT": ... @@ -204,7 +208,7 @@ def derive_topic( self, name: str, *, - schema: _SchemaT = None, + schema: Optional[_SchemaT] = None, key_type: ModelArg = None, value_type: ModelArg = None, prefix: str = "", diff --git a/faust/types/tables.py b/faust/types/tables.py index b7feb8f7d..6d16e03eb 100644 --- a/faust/types/tables.py +++ b/faust/types/tables.py @@ -107,24 +107,24 @@ def __init__( self, app: _AppT, *, - name: str = None, + name: Optional[str] = None, default: Callable[[], Any] = None, store: Union[str, URL] = None, - schema: _SchemaT = None, + schema: Optional[_SchemaT] = None, key_type: _ModelArg = None, value_type: _ModelArg = None, - partitions: int = None, - window: WindowT = None, - changelog_topic: TopicT = None, - help: str = None, + partitions: Optional[int] = None, + window: Optional[WindowT] = None, + changelog_topic: Optional[TopicT] = None, + help: Optional[str] = None, on_recover: RecoverCallback = None, - on_changelog_event: ChangelogEventCallback = None, + on_changelog_event: Optional[ChangelogEventCallback] = None, recovery_buffer_size: int = 1000, - standby_buffer_size: int = None, - extra_topic_configs: Mapping[str, Any] = None, - options: Mapping[str, Any] = None, + standby_buffer_size: Optional[int] = None, + extra_topic_configs: Optional[Mapping[str, Any]] = None, + options: Optional[Mapping[str, Any]] = None, use_partitioner: bool = False, - on_window_close: WindowCloseCallback = None, + on_window_close: Optional[WindowCloseCallback] = None, **kwargs: Any ) -> None: ... @@ -220,14 +220,14 @@ def hopping( self, size: Seconds, step: Seconds, - expires: Seconds = None, + expires: Optional[Seconds] = None, key_index: bool = False, ) -> "WindowWrapperT": ... @abc.abstractmethod def tumbling( - self, size: Seconds, expires: Seconds = None, key_index: bool = False + self, size: Seconds, expires: Optional[Seconds] = None, key_index: bool = False ) -> "WindowWrapperT": ... @@ -236,11 +236,11 @@ def as_ansitable(self, **kwargs: Any) -> str: ... @abc.abstractmethod - def _relative_now(self, event: EventT = None) -> float: + def _relative_now(self, event: Optional[EventT] = None) -> float: ... @abc.abstractmethod - def _relative_event(self, event: EventT = None) -> float: + def _relative_event(self, event: Optional[EventT] = None) -> float: ... @abc.abstractmethod @@ -325,22 +325,26 @@ class WindowSetT(FastUserDict[KT, VT]): @abc.abstractmethod def __init__( - self, key: KT, table: TableT, wrapper: "WindowWrapperT", event: EventT = None + self, + key: KT, + table: TableT, + wrapper: "WindowWrapperT", + event: Optional[EventT] = None, ) -> None: ... @abc.abstractmethod def apply( - self, op: Callable[[VT, VT], VT], value: VT, event: EventT = None + self, op: Callable[[VT, VT], VT], value: VT, event: Optional[EventT] = None ) -> "WindowSetT": ... @abc.abstractmethod - def value(self, event: EventT = None) -> VT: + def value(self, event: Optional[EventT] = None) -> VT: ... @abc.abstractmethod - def current(self, event: EventT = None) -> VT: + def current(self, event: Optional[EventT] = None) -> VT: ... @abc.abstractmethod @@ -348,7 +352,7 @@ def now(self) -> VT: ... @abc.abstractmethod - def delta(self, d: Seconds, event: EventT = None) -> VT: + def delta(self, d: Seconds, event: Optional[EventT] = None) -> VT: ... @abc.abstractmethod @@ -402,7 +406,9 @@ def __ior__(self, other: VT) -> "WindowSetT": class WindowedItemsViewT(ItemsView): @abc.abstractmethod - def __init__(self, mapping: "WindowWrapperT", event: EventT = None) -> None: + def __init__( + self, mapping: "WindowWrapperT", event: Optional[EventT] = None + ) -> None: ... @abc.abstractmethod @@ -414,17 +420,21 @@ def now(self) -> Iterator[Tuple[Any, Any]]: ... @abc.abstractmethod - def current(self, event: EventT = None) -> Iterator[Tuple[Any, Any]]: + def current(self, event: Optional[EventT] = None) -> Iterator[Tuple[Any, Any]]: ... @abc.abstractmethod - def delta(self, d: Seconds, event: EventT = None) -> Iterator[Tuple[Any, Any]]: + def delta( + self, d: Seconds, event: Optional[EventT] = None + ) -> Iterator[Tuple[Any, Any]]: ... class WindowedValuesViewT(ValuesView): @abc.abstractmethod - def __init__(self, mapping: "WindowWrapperT", event: EventT = None) -> None: + def __init__( + self, mapping: "WindowWrapperT", event: Optional[EventT] = None + ) -> None: ... @abc.abstractmethod @@ -436,11 +446,11 @@ def now(self) -> Iterator[Any]: ... @abc.abstractmethod - def current(self, event: EventT = None) -> Iterator[Any]: + def current(self, event: Optional[EventT] = None) -> Iterator[Any]: ... @abc.abstractmethod - def delta(self, d: Seconds, event: EventT = None) -> Iterator[Any]: + def delta(self, d: Seconds, event: Optional[EventT] = None) -> Iterator[Any]: ... @@ -454,7 +464,7 @@ def __init__( *, relative_to: RelativeArg = None, key_index: bool = False, - key_index_table: TableT = None + key_index_table: Optional[TableT] = None ) -> None: ... @@ -480,7 +490,7 @@ def relative_to_stream(self) -> "WindowWrapperT": ... @abc.abstractmethod - def get_timestamp(self, event: EventT = None) -> float: + def get_timestamp(self, event: Optional[EventT] = None) -> float: ... @abc.abstractmethod diff --git a/faust/types/topics.py b/faust/types/topics.py index d50aee7b4..9d9e390df 100644 --- a/faust/types/topics.py +++ b/faust/types/topics.py @@ -67,29 +67,29 @@ def __init__( self, app: _AppT, *, - topics: Sequence[str] = None, + topics: Optional[Sequence[str]] = None, pattern: Union[str, Pattern] = None, - schema: _SchemaT = None, + schema: Optional[_SchemaT] = None, key_type: _ModelArg = None, value_type: _ModelArg = None, is_iterator: bool = False, - partitions: int = None, - retention: Seconds = None, - compacting: bool = None, - deleting: bool = None, - replicas: int = None, + partitions: Optional[int] = None, + retention: Optional[Seconds] = None, + compacting: Optional[bool] = None, + deleting: Optional[bool] = None, + replicas: Optional[int] = None, acks: bool = True, internal: bool = False, - config: Mapping[str, Any] = None, - queue: ThrowableQueue = None, + config: Optional[Mapping[str, Any]] = None, + queue: Optional[ThrowableQueue] = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, - maxsize: int = None, - root: ChannelT = None, - active_partitions: Set[TP] = None, + maxsize: Optional[int] = None, + root: Optional[ChannelT] = None, + active_partitions: Optional[Set[TP]] = None, allow_empty: bool = False, has_prefix: bool = False, - loop: asyncio.AbstractEventLoop = None + loop: Optional[asyncio.AbstractEventLoop] = None ) -> None: ... @@ -119,16 +119,16 @@ def derive(self, **kwargs: Any) -> ChannelT: def derive_topic( self, *, - topics: Sequence[str] = None, - schema: _SchemaT = None, + topics: Optional[Sequence[str]] = None, + schema: Optional[_SchemaT] = None, key_type: _ModelArg = None, value_type: _ModelArg = None, - partitions: int = None, - retention: Seconds = None, - compacting: bool = None, - deleting: bool = None, + partitions: Optional[int] = None, + retention: Optional[Seconds] = None, + compacting: Optional[bool] = None, + deleting: Optional[bool] = None, internal: bool = False, - config: Mapping[str, Any] = None, + config: Optional[Mapping[str, Any]] = None, prefix: str = "", suffix: str = "", **kwargs: Any diff --git a/faust/types/transports.py b/faust/types/transports.py index 67b8e2a80..9911b8b3a 100644 --- a/faust/types/transports.py +++ b/faust/types/transports.py @@ -126,7 +126,7 @@ class ProducerT(ServiceT): def __init__( self, transport: "TransportT", - loop: asyncio.AbstractEventLoop = None, + loop: Optional[asyncio.AbstractEventLoop] = None, **kwargs: Any ) -> None: ... @@ -141,7 +141,7 @@ async def send( timestamp: Optional[float], headers: Optional[HeadersArg], *, - transactional_id: str = None + transactional_id: Optional[str] = None ) -> Awaitable[RecordMetadata]: ... @@ -159,7 +159,7 @@ async def send_and_wait( timestamp: Optional[float], headers: Optional[HeadersArg], *, - transactional_id: str = None + transactional_id: Optional[str] = None ) -> RecordMetadata: ... @@ -170,11 +170,11 @@ async def create_topic( partitions: int, replication: int, *, - config: Mapping[str, Any] = None, + config: Optional[Mapping[str, Any]] = None, timeout: Seconds = 1000.0, - retention: Seconds = None, - compacting: bool = None, - deleting: bool = None, + retention: Optional[Seconds] = None, + compacting: Optional[bool] = None, + deleting: Optional[bool] = None, ensure_created: bool = False ) -> None: ... @@ -230,7 +230,7 @@ class TransactionManagerT(ProducerT): def __init__( self, transport: "TransportT", - loop: asyncio.AbstractEventLoop = None, + loop: Optional[asyncio.AbstractEventLoop] = None, *, consumer: "ConsumerT", producer: "ProducerT", @@ -316,8 +316,8 @@ def __init__( on_partitions_revoked: PartitionsRevokedCallback, on_partitions_assigned: PartitionsAssignedCallback, *, - commit_interval: float = None, - loop: asyncio.AbstractEventLoop = None, + commit_interval: Optional[float] = None, + loop: Optional[asyncio.AbstractEventLoop] = None, **kwargs: Any ) -> None: self._on_partitions_revoked: PartitionsRevokedCallback @@ -330,11 +330,11 @@ async def create_topic( partitions: int, replication: int, *, - config: Mapping[str, Any] = None, + config: Optional[Mapping[str, Any]] = None, timeout: Seconds = 1000.0, - retention: Seconds = None, - compacting: bool = None, - deleting: bool = None, + retention: Optional[Seconds] = None, + compacting: Optional[bool] = None, + deleting: Optional[bool] = None, ensure_created: bool = False ) -> None: ... @@ -424,7 +424,7 @@ def topic_partitions(self, topic: str) -> Optional[int]: @abc.abstractmethod def key_partition( - self, topic: str, key: Optional[bytes], partition: int = None + self, topic: str, key: Optional[bytes], partition: Optional[int] = None ) -> Optional[int]: ... @@ -522,7 +522,10 @@ class TransportT(abc.ABC): @abc.abstractmethod def __init__( - self, url: List[URL], app: _AppT, loop: asyncio.AbstractEventLoop = None + self, + url: List[URL], + app: _AppT, + loop: Optional[asyncio.AbstractEventLoop] = None, ) -> None: ... diff --git a/faust/types/tuples.py b/faust/types/tuples.py index aabe6c217..bbd46c481 100644 --- a/faust/types/tuples.py +++ b/faust/types/tuples.py @@ -151,13 +151,13 @@ def __init__( key: Optional[bytes], value: Optional[bytes], checksum: Optional[bytes], - serialized_key_size: int = None, - serialized_value_size: int = None, + serialized_key_size: Optional[int] = None, + serialized_value_size: Optional[int] = None, tp: TP = None, - time_in: float = None, - time_out: float = None, - time_total: float = None, - generation_id: int = None, + time_in: Optional[float] = None, + time_out: Optional[float] = None, + time_total: Optional[float] = None, + generation_id: Optional[int] = None, ) -> None: self.topic: str = topic self.partition: int = partition diff --git a/faust/types/web.py b/faust/types/web.py index d39b18a5c..f42e7bd2e 100644 --- a/faust/types/web.py +++ b/faust/types/web.py @@ -110,7 +110,9 @@ async def get(self, key: str) -> Optional[bytes]: ... @abc.abstractmethod - async def set(self, key: str, value: bytes, timeout: float = None) -> None: + async def set( + self, key: str, value: bytes, timeout: Optional[float] = None + ) -> None: ... @abc.abstractmethod @@ -128,8 +130,8 @@ class CacheT(abc.ABC): @abc.abstractmethod def __init__( self, - timeout: Seconds = None, - key_prefix: str = None, + timeout: Optional[Seconds] = None, + key_prefix: Optional[str] = None, backend: Union[Type[CacheBackendT], str] = None, **kwargs: Any ) -> None: @@ -138,9 +140,9 @@ def __init__( @abc.abstractmethod def view( self, - timeout: Seconds = None, + timeout: Optional[Seconds] = None, include_headers: bool = False, - key_prefix: str = None, + key_prefix: Optional[str] = None, **kwargs: Any ) -> Callable[[Callable], Callable]: ... @@ -153,9 +155,9 @@ class BlueprintT(abc.ABC): @abc.abstractmethod def cache( self, - timeout: Seconds = None, + timeout: Optional[Seconds] = None, include_headers: bool = False, - key_prefix: str = None, + key_prefix: Optional[str] = None, backend: Union[Type[CacheBackendT], str] = None, ) -> CacheT: ... diff --git a/faust/utils/terminal/tables.py b/faust/utils/terminal/tables.py index 02c489c4c..7acf7ef79 100644 --- a/faust/utils/terminal/tables.py +++ b/faust/utils/terminal/tables.py @@ -1,7 +1,18 @@ """Using :pypi:`terminaltables` to draw ANSI tables.""" import sys from operator import itemgetter -from typing import IO, Any, Callable, Iterable, List, Mapping, Sequence, Type, cast +from typing import ( + IO, + Any, + Callable, + Iterable, + List, + Mapping, + Optional, + Sequence, + Type, + cast, +) from mode.utils import logging, text from mode.utils.compat import isatty @@ -14,7 +25,12 @@ def table( - data: TableDataT, *, title: str, target: IO = None, tty: bool = None, **kwargs: Any + data: TableDataT, + *, + title: str, + target: IO = None, + tty: Optional[bool] = None, + **kwargs: Any ) -> Table: """Create suitable :pypi:`terminaltables` table for target. @@ -41,8 +57,8 @@ def logtable( *, title: str, target: IO = None, - tty: bool = None, - headers: Sequence[str] = None, + tty: Optional[bool] = None, + headers: Optional[Sequence[str]] = None, **kwargs: Any ) -> str: """Prepare table for logging. @@ -71,7 +87,7 @@ def dict_as_ansitable( sort: bool = False, sortkey: Callable[[Any], Any] = DEFAULT_SORT_KEY, target: IO = sys.stdout, - title: str = None + title: Optional[str] = None ) -> str: header = [text.title(key), text.title(value)] data = cast(Iterable[List[str]], d.items()) diff --git a/faust/utils/tracing.py b/faust/utils/tracing.py index 1970cc165..f942f6ba3 100644 --- a/faust/utils/tracing.py +++ b/faust/utils/tracing.py @@ -66,7 +66,7 @@ def operation_name_from_fun(fun: Any) -> str: def traced_from_parent_span( parent_span: opentracing.Span = None, - callback: Callable = None, + callback: Optional[Callable] = None, **extra_context: Any, ) -> Callable: """Decorate function to be traced from parent span.""" diff --git a/faust/utils/urls.py b/faust/utils/urls.py index 0207115f5..8d565411e 100644 --- a/faust/utils/urls.py +++ b/faust/utils/urls.py @@ -6,7 +6,7 @@ URIListArg = Union[str, URL, List[URL], List[str]] -def urllist(arg: URIListArg, *, default_scheme: str = None) -> List[URL]: +def urllist(arg: URIListArg, *, default_scheme: Optional[str] = None) -> List[URL]: """Create list of URLs. You can pass in a comma-separated string, or an actual list @@ -43,7 +43,7 @@ def urllist(arg: URIListArg, *, default_scheme: str = None) -> List[URL]: def _find_first_actual_scheme( - urls: List[str], default_scheme: str = None + urls: List[str], default_scheme: Optional[str] = None ) -> Optional[str]: for url in urls: scheme, sep, rest = url.partition("://") @@ -52,7 +52,7 @@ def _find_first_actual_scheme( return default_scheme -def _prepare_str_url(s: str, default_scheme: str = None) -> str: +def _prepare_str_url(s: str, default_scheme: Optional[str] = None) -> str: # yarl.URL parses b:9092 into scheme=b,port=9092 # where we would expect it to be scheme=None,host=b,port=9092 if default_scheme: diff --git a/faust/web/base.py b/faust/web/base.py index c613bc9cd..79b21453d 100644 --- a/faust/web/base.py +++ b/faust/web/base.py @@ -193,9 +193,9 @@ def text( self, value: str, *, - content_type: str = None, + content_type: Optional[str] = None, status: int = 200, - reason: str = None, + reason: Optional[str] = None, headers: MutableMapping = None, ) -> Response: """Create text response, using "text/plain" content-type.""" @@ -206,9 +206,9 @@ def html( self, value: str, *, - content_type: str = None, + content_type: Optional[str] = None, status: int = 200, - reason: str = None, + reason: Optional[str] = None, headers: MutableMapping = None, ) -> Response: """Create HTML response from string, ``text/html`` content-type.""" @@ -219,9 +219,9 @@ def json( self, value: Any, *, - content_type: str = None, + content_type: Optional[str] = None, status: int = 200, - reason: str = None, + reason: Optional[str] = None, headers: MutableMapping = None, ) -> Response: """Create new JSON response. @@ -238,9 +238,9 @@ def bytes( self, value: _bytes, *, - content_type: str = None, + content_type: Optional[str] = None, status: int = 200, - reason: str = None, + reason: Optional[str] = None, headers: MutableMapping = None, ) -> Response: """Create new ``bytes`` response - for binary data.""" diff --git a/faust/web/blueprints.py b/faust/web/blueprints.py index b5510baed..ed24689d9 100644 --- a/faust/web/blueprints.py +++ b/faust/web/blueprints.py @@ -107,9 +107,9 @@ def __init__(self, name: str, *, url_prefix: Optional[str] = None) -> None: def cache( self, - timeout: Seconds = None, + timeout: Optional[Seconds] = None, include_headers: bool = False, - key_prefix: str = None, + key_prefix: Optional[str] = None, backend: Union[Type[CacheBackendT], str] = None, ) -> CacheT: """Cache API.""" @@ -189,7 +189,7 @@ def on_webserver_init(self, web: Web) -> None: """Call when web server starts.""" ... - def _url_with_prefix(self, url: str, prefix: str = None) -> str: + def _url_with_prefix(self, url: str, prefix: Optional[str] = None) -> str: if prefix: return prefix.rstrip("/") + "/" + url.lstrip("/") return url diff --git a/faust/web/cache/backends/base.py b/faust/web/cache/backends/base.py index 4688ccf7b..27a48042b 100644 --- a/faust/web/cache/backends/base.py +++ b/faust/web/cache/backends/base.py @@ -43,7 +43,9 @@ async def _get(self, key: str) -> Optional[bytes]: ... @abc.abstractmethod - async def _set(self, key: str, value: bytes, timeout: float = None) -> None: + async def _set( + self, key: str, value: bytes, timeout: Optional[float] = None + ) -> None: ... @abc.abstractmethod @@ -55,7 +57,9 @@ async def get(self, key: str) -> Optional[bytes]: async with self._recovery_context(key): return await self._get(key) - async def set(self, key: str, value: bytes, timeout: float = None) -> None: + async def set( + self, key: str, value: bytes, timeout: Optional[float] = None + ) -> None: """Set cached-value by key.""" async with self._recovery_context(key): await self._set(key, value, timeout) diff --git a/faust/web/cache/backends/memory.py b/faust/web/cache/backends/memory.py index ab47627ec..94b8be391 100644 --- a/faust/web/cache/backends/memory.py +++ b/faust/web/cache/backends/memory.py @@ -88,7 +88,9 @@ def __post_init__(self) -> None: async def _get(self, key: str) -> Optional[bytes]: return self.storage.get(key) - async def _set(self, key: str, value: bytes, timeout: float = None) -> None: + async def _set( + self, key: str, value: bytes, timeout: Optional[float] = None + ) -> None: if timeout is not None: self.storage.setex(key, timeout, want_bytes(value)) else: diff --git a/faust/web/cache/backends/redis.py b/faust/web/cache/backends/redis.py index cf2684d77..3e664084c 100644 --- a/faust/web/cache/backends/redis.py +++ b/faust/web/cache/backends/redis.py @@ -67,10 +67,10 @@ def __init__( app: AppT, url: Union[URL, str], *, - connect_timeout: float = None, - stream_timeout: float = None, - max_connections: int = None, - max_connections_per_node: int = None, + connect_timeout: Optional[float] = None, + stream_timeout: Optional[float] = None, + max_connections: Optional[int] = None, + max_connections_per_node: Optional[int] = None, **kwargs: Any, ) -> None: super().__init__(app, url, **kwargs) @@ -95,7 +95,9 @@ async def _get(self, key: str) -> Optional[bytes]: return want_bytes(value) return None - async def _set(self, key: str, value: bytes, timeout: float = None) -> None: + async def _set( + self, key: str, value: bytes, timeout: Optional[float] = None + ) -> None: if timeout is not None: await self.client.setex(key, int(timeout), value) else: @@ -125,10 +127,10 @@ def _client_from_url_and_query( self, url: URL, *, - connect_timeout: str = None, - stream_timeout: str = None, - max_connections: str = None, - max_connections_per_node: str = None, + connect_timeout: Optional[str] = None, + stream_timeout: Optional[str] = None, + max_connections: Optional[str] = None, + max_connections_per_node: Optional[str] = None, **kwargs: Any, ) -> _RedisClientT: Client = self._client_by_scheme[url.scheme] @@ -160,15 +162,17 @@ def _prepare_client_kwargs(self, url: URL, **kwargs: Any) -> Mapping: return self._as_cluster_kwargs(**kwargs) return kwargs - def _as_cluster_kwargs(self, db: str = None, **kwargs: Any) -> Mapping: + def _as_cluster_kwargs(self, db: Optional[str] = None, **kwargs: Any) -> Mapping: # Redis Cluster does not support db as argument. return kwargs - def _int_from_str(self, val: str = None, default: int = None) -> Optional[int]: + def _int_from_str( + self, val: Optional[str] = None, default: Optional[int] = None + ) -> Optional[int]: return int(val) if val else default def _float_from_str( - self, val: str = None, default: float = None + self, val: Optional[str] = None, default: Optional[float] = None ) -> Optional[float]: return float(val) if val else default diff --git a/faust/web/cache/cache.py b/faust/web/cache/cache.py index 950afc006..f40278739 100644 --- a/faust/web/cache/cache.py +++ b/faust/web/cache/cache.py @@ -23,9 +23,9 @@ class Cache(CacheT): def __init__( self, - timeout: Seconds = None, + timeout: Optional[Seconds] = None, include_headers: bool = False, - key_prefix: str = None, + key_prefix: Optional[str] = None, backend: Union[Type[CacheBackendT], str] = None, **kwargs: Any, ) -> None: @@ -36,9 +36,9 @@ def __init__( def view( self, - timeout: Seconds = None, + timeout: Optional[Seconds] = None, include_headers: bool = False, - key_prefix: str = None, + key_prefix: Optional[str] = None, **kwargs: Any, ) -> Callable[[Callable], Callable]: """Decorate view to be cached.""" @@ -99,7 +99,11 @@ def _view_backend(self, view: View) -> CacheBackendT: return cast(CacheBackendT, self.backend or view.app.cache) async def set_view( - self, key: str, view: View, response: Response, timeout: Seconds = None + self, + key: str, + view: View, + response: Response, + timeout: Optional[Seconds] = None, ) -> None: """Set cached value for HTTP view request.""" backend = self._view_backend(view) @@ -122,8 +126,8 @@ def can_cache_response(self, request: Request, response: Response) -> bool: def key_for_request( self, request: Request, - prefix: str = None, - method: str = None, + prefix: Optional[str] = None, + method: Optional[str] = None, include_headers: bool = False, ) -> str: """Return a cache key created from web request.""" diff --git a/faust/web/drivers/aiohttp.py b/faust/web/drivers/aiohttp.py index bd6d62dcf..389637d75 100644 --- a/faust/web/drivers/aiohttp.py +++ b/faust/web/drivers/aiohttp.py @@ -128,9 +128,9 @@ def text( self, value: str, *, - content_type: str = None, + content_type: Optional[str] = None, status: int = 200, - reason: str = None, + reason: Optional[str] = None, headers: MutableMapping = None, ) -> base.Response: """Create text response, using "text/plain" content-type.""" @@ -147,9 +147,9 @@ def html( self, value: str, *, - content_type: str = None, + content_type: Optional[str] = None, status: int = 200, - reason: str = None, + reason: Optional[str] = None, headers: MutableMapping = None, ) -> base.Response: """Create HTML response from string, ``text/html`` content-type.""" @@ -165,9 +165,9 @@ def json( self, value: Any, *, - content_type: str = None, + content_type: Optional[str] = None, status: int = 200, - reason: str = None, + reason: Optional[str] = None, headers: MutableMapping = None, ) -> Any: """Create new JSON response. @@ -201,9 +201,9 @@ def bytes( self, value: _bytes, *, - content_type: str = None, + content_type: Optional[str] = None, status: int = 200, - reason: str = None, + reason: Optional[str] = None, headers: MutableMapping = None, ) -> base.Response: """Create new ``bytes`` response - for binary data.""" diff --git a/faust/web/exceptions.py b/faust/web/exceptions.py index 53c2da9a4..db78319f6 100644 --- a/faust/web/exceptions.py +++ b/faust/web/exceptions.py @@ -1,6 +1,6 @@ """HTTP and related errors.""" import http -from typing import Any, Dict, cast +from typing import Any, Dict, Optional, cast from faust.exceptions import FaustError @@ -34,7 +34,11 @@ class WebError(FaustError): extra_context: Dict def __init__( - self, detail: str = None, *, code: int = None, **extra_context: Any + self, + detail: Optional[str] = None, + *, + code: Optional[int] = None, + **extra_context: Any ) -> None: if detail: self.detail = detail diff --git a/faust/web/views.py b/faust/web/views.py index 8533475b8..7430f900e 100644 --- a/faust/web/views.py +++ b/faust/web/views.py @@ -197,9 +197,9 @@ def text( self, value: str, *, - content_type: str = None, + content_type: Optional[str] = None, status: int = 200, - reason: str = None, + reason: Optional[str] = None, headers: MutableMapping = None, ) -> Response: """Create text response, using "text/plain" content-type.""" @@ -215,9 +215,9 @@ def html( self, value: str, *, - content_type: str = None, + content_type: Optional[str] = None, status: int = 200, - reason: str = None, + reason: Optional[str] = None, headers: MutableMapping = None, ) -> Response: """Create HTML response from string, ``text/html`` content-type.""" @@ -233,9 +233,9 @@ def json( self, value: Any, *, - content_type: str = None, + content_type: Optional[str] = None, status: int = 200, - reason: str = None, + reason: Optional[str] = None, headers: MutableMapping = None, ) -> Response: """Create new JSON response. @@ -257,9 +257,9 @@ def bytes( self, value: _bytes, *, - content_type: str = None, + content_type: Optional[str] = None, status: int = 200, - reason: str = None, + reason: Optional[str] = None, headers: MutableMapping = None, ) -> Response: """Create new ``bytes`` response - for binary data.""" diff --git a/faust/windows.py b/faust/windows.py index 8c9f24637..6e956f34e 100644 --- a/faust/windows.py +++ b/faust/windows.py @@ -2,7 +2,7 @@ import os import typing from math import floor -from typing import List, Type, cast +from typing import List, Optional, Type, cast from mode import Seconds, want_seconds @@ -31,7 +31,9 @@ class _PyHoppingWindow(Window): size: float step: float - def __init__(self, size: Seconds, step: Seconds, expires: Seconds = None) -> None: + def __init__( + self, size: Seconds, step: Seconds, expires: Optional[Seconds] = None + ) -> None: self.size = want_seconds(size) self.step = want_seconds(step) self.expires = want_seconds(expires) if expires else None @@ -94,7 +96,7 @@ class TumblingWindow(HoppingWindow): Fixed-size, non-overlapping, gap-less windows. """ - def __init__(self, size: Seconds, expires: Seconds = None) -> None: + def __init__(self, size: Seconds, expires: Optional[Seconds] = None) -> None: super(TumblingWindow, self).__init__(size, size, expires) diff --git a/faust/worker.py b/faust/worker.py index bef5554b0..d5366e001 100644 --- a/faust/worker.py +++ b/faust/worker.py @@ -216,20 +216,20 @@ def __init__( self, app: AppT, *services: ServiceT, - sensors: Iterable[SensorT] = None, + sensors: Optional[Iterable[SensorT]] = None, debug: bool = DEBUG, quiet: bool = False, - loglevel: Union[str, int] = None, - logfile: Union[str, IO] = None, + loglevel: Union[str, int, None] = None, + logfile: Union[str, IO, None] = None, stdout: IO = sys.stdout, stderr: IO = sys.stderr, - blocking_timeout: float = None, - workdir: Union[Path, str] = None, + blocking_timeout: Optional[float] = None, + workdir: Union[Path, str, None] = None, console_port: int = CONSOLE_PORT, - loop: asyncio.AbstractEventLoop = None, - redirect_stdouts: bool = None, - redirect_stdouts_level: Severity = None, - logging_config: Dict = None, + loop: Optional[asyncio.AbstractEventLoop] = None, + redirect_stdouts: Optional[bool] = None, + redirect_stdouts_level: Optional[Severity] = None, + logging_config: Optional[Dict] = None, **kwargs: Any, ) -> None: self.app = app diff --git a/tests/conftest.py b/tests/conftest.py index 5c45e4191..508650e43 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2,7 +2,7 @@ import threading import time from http import HTTPStatus -from typing import Any, NamedTuple +from typing import Any, NamedTuple, Optional import pytest from _pytest.assertion.util import _compare_eq_dict, _compare_eq_set @@ -77,8 +77,8 @@ def setitem(self, dic, name, value=sentinel, new=MagicMock, **kwargs): class TimeMarks(NamedTuple): - time: float = None - monotonic: float = None + time: Optional[float] = None + monotonic: Optional[float] = None @pytest.fixture() diff --git a/tests/functional/agents/helpers.py b/tests/functional/agents/helpers.py index 1e7ab6efa..3834fd5d6 100644 --- a/tests/functional/agents/helpers.py +++ b/tests/functional/agents/helpers.py @@ -43,7 +43,7 @@ def __init__( value_serializer: CodecT = "raw", num_messages: int = 100, isolated_partitions: bool = False, - name: str = None, + name: Optional[str] = None, **kwargs: Any, ): self.app = app @@ -169,8 +169,8 @@ async def put( def Message( self, tp: TP = None, - offset: int = None, - timestamp: float = None, + offset: Optional[int] = None, + timestamp: Optional[float] = None, timestamp_type: int = 1, headers: Optional[HeadersArg] = None, key: Optional[bytes] = None, diff --git a/tests/functional/test_models.py b/tests/functional/test_models.py index bd4845562..d029d64df 100644 --- a/tests/functional/test_models.py +++ b/tests/functional/test_models.py @@ -80,7 +80,7 @@ def test_parameters(): assert not account2.active class Account3(Account): - foo: int = None + foo: Optional[int] = None account3 = Account3("id", "name", False, "foo") assert account3.id == "id" @@ -662,10 +662,10 @@ def test_dumps(record): def test_subclass_default_values(): class X(Record): x: int - y: int = None + y: Optional[int] = None class Z(X): - z: int = None + z: Optional[int] = None assert X(x=None).y is None assert X(x=None, y=303).y == 303 @@ -678,7 +678,7 @@ class Z(X): def test_subclass_preserves_required_values(): class X(Record): x: int - z: int = None + z: Optional[int] = None class Y(X): y: int @@ -946,22 +946,22 @@ class AdjustData(Record): tracker: str reftag: str nonce: str - campaign_name: str = None - adgroup_name: str = None - creative_name: str = None - click_referer: str = None - is_organic: str = None - reattribution_attribution_window: str = None - impression_attribution_window: str = None - store: str = None - match_type: str = None - platform_adid: str = None - search_term: str = None - event_name: str = None - installed_at: str = None - engagement_time: str = None - deeplink: str = None - source_user: str = None + campaign_name: Optional[str] = None + adgroup_name: Optional[str] = None + creative_name: Optional[str] = None + click_referer: Optional[str] = None + is_organic: Optional[str] = None + reattribution_attribution_window: Optional[str] = None + impression_attribution_window: Optional[str] = None + store: Optional[str] = None + match_type: Optional[str] = None + platform_adid: Optional[str] = None + search_term: Optional[str] = None + event_name: Optional[str] = None + installed_at: Optional[str] = None + engagement_time: Optional[str] = None + deeplink: Optional[str] = None + source_user: Optional[str] = None class User(Record): username: str @@ -988,8 +988,8 @@ class AdjustRecord(BaseAttribution): app: App event: Event timestamp: str - client_ip: str = None - event_hash: str = None + client_ip: Optional[str] = None + event_hash: Optional[str] = None def __post_init__(self) -> None: self.data_store = None @@ -1008,8 +1008,8 @@ def asdict(self): def test_prepare_dict(): class Quote(Record): - ask_price: float = None - bid_price: float = None + ask_price: Optional[float] = None + bid_price: Optional[float] = None def _prepare_dict(self, payload): return {k: v for k, v in payload.items() if v is not None} diff --git a/tests/stress/models.py b/tests/stress/models.py index 4844aef26..8c9a919d6 100644 --- a/tests/stress/models.py +++ b/tests/stress/models.py @@ -1,6 +1,7 @@ import random from datetime import datetime, timezone from itertools import count +from typing import Optional import faust @@ -9,15 +10,15 @@ class Withdrawal(faust.Record, isodates=True, serializer="json"): user: str country: str amount: float - date: datetime = None + date: Optional[datetime] = None -def generate_withdrawals(n: int = None): +def generate_withdrawals(n: Optional[int] = None): for d in generate_withdrawals_dict(n): yield Withdrawal(**d) -def generate_withdrawals_dict(n: int = None): +def generate_withdrawals_dict(n: Optional[int] = None): num_countries = 5 countries = [f"country_{i}" for i in range(num_countries)] country_dist = [0.9] + ([0.10 / num_countries] * (num_countries - 1)) diff --git a/tests/stress/reports/models.py b/tests/stress/reports/models.py index ba24afc18..d04482695 100644 --- a/tests/stress/reports/models.py +++ b/tests/stress/reports/models.py @@ -1,3 +1,5 @@ +from typing import Optional + import faust __all__ = ["Error", "Status"] @@ -11,29 +13,29 @@ class Error(faust.Record): #: Format (sent to logging, e.g. 'the %s for %s did %r' format: str # noqa - hostname: str = None + hostname: Optional[str] = None #: Traceback (if any) - traceback: str = None + traceback: Optional[str] = None #: Name of the origin logger. - logger: str = None + logger: Optional[str] = None #: Path to the file logging this. - filename: str = None + filename: Optional[str] = None #: Name of module logging this. - module: str = None + module: Optional[str] = None #: Line number - lineno: int = None + lineno: Optional[int] = None #: Logging severity severity: str = "ERROR" - timestamp: float = None + timestamp: Optional[float] = None - app_id: str = None + app_id: Optional[str] = None class Status(faust.Record):