Skip to content

Commit

Permalink
chore: No more typing.Optional nor typing.Union (gorakhargosh#1056)
Browse files Browse the repository at this point in the history
* chore: No more `typing.Optional` nor `typing.Union`

* chore: delete missed ruff rule [skip ci]
  • Loading branch information
BoboTiG authored Aug 11, 2024
1 parent 2872c7e commit 3b00a42
Show file tree
Hide file tree
Showing 10 changed files with 19 additions and 26 deletions.
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ ignore = [
"S",
"TD",
"TRY003",
"UP",
]
fixable = ["ALL"]

Expand Down
3 changes: 1 addition & 2 deletions src/watchdog/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@
import os.path
import re
from dataclasses import dataclass, field
from typing import Optional

from watchdog.utils.patterns import match_any_paths

Expand Down Expand Up @@ -450,7 +449,7 @@ def dispatch(self, event: FileSystemEvent) -> None:
class LoggingEventHandler(FileSystemEventHandler):
"""Logs all the events captured."""

def __init__(self, logger: Optional[logging.Logger] = None) -> None:
def __init__(self, logger: logging.Logger | None = None) -> None:
super().__init__()
self.logger = logger or logging.root

Expand Down
4 changes: 2 additions & 2 deletions src/watchdog/observers/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class EventEmitter(BaseThread):
:param event_filter:
Collection of event types to emit, or None for no filtering (default).
:type event_filter:
Optional[Iterable[:class:`watchdog.events.FileSystemEvent`]]
Iterable[:class:`watchdog.events.FileSystemEvent`] | None
"""

def __init__(self, event_queue, watch, timeout=DEFAULT_EMITTER_TIMEOUT, event_filter=None):
Expand Down Expand Up @@ -290,7 +290,7 @@ def schedule(self, event_handler, path, recursive=False, event_filter=None):
:param event_filter:
Collection of event types to emit, or None for no filtering (default).
:type event_filter:
Optional[Iterable[:class:`watchdog.events.FileSystemEvent`]]
Iterable[:class:`watchdog.events.FileSystemEvent`] | None
:return:
An :class:`ObservedWatch` object instance representing
a watch.
Expand Down
3 changes: 1 addition & 2 deletions src/watchdog/observers/fsevents2.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import unicodedata
import warnings
from threading import Thread
from typing import Optional

# pyobjc
import AppKit
Expand Down Expand Up @@ -81,7 +80,7 @@ class FSEventsQueue(Thread):

def __init__(self, path):
Thread.__init__(self)
self._queue: queue.Queue[Optional[list[NativeEvent]]] = queue.Queue()
self._queue: queue.Queue[list[NativeEvent] | None] = queue.Queue()
self._run_loop = None

if isinstance(path, bytes):
Expand Down
4 changes: 2 additions & 2 deletions src/watchdog/observers/inotify.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class InotifyEmitter(EventEmitter):
:param event_filter:
Collection of event types to emit, or None for no filtering (default).
:type event_filter:
Optional[Iterable[:class:`watchdog.events.FileSystemEvent`]]
Iterable[:class:`watchdog.events.FileSystemEvent`] | None
"""

def __init__(self, event_queue, watch, timeout=DEFAULT_EMITTER_TIMEOUT, event_filter=None):
Expand Down Expand Up @@ -250,7 +250,7 @@ class InotifyFullEmitter(InotifyEmitter):
:param event_filter:
Collection of event types to emit, or None for no filtering (default).
:type event_filter:
Optional[Iterable[:class:`watchdog.events.FileSystemEvent`]]
Iterable[:class:`watchdog.events.FileSystemEvent`] | None
"""

Expand Down
6 changes: 1 addition & 5 deletions src/watchdog/observers/inotify_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from __future__ import annotations

import logging
from typing import TYPE_CHECKING, Union

from watchdog.observers.inotify_c import Inotify, InotifyEvent
from watchdog.utils import BaseThread
Expand Down Expand Up @@ -54,7 +53,7 @@ def close(self):

def _group_events(self, event_list):
"""Group any matching move events"""
grouped: list[Union[InotifyEvent, tuple[InotifyEvent, InotifyEvent]]] = []
grouped: list[InotifyEvent | tuple[InotifyEvent, InotifyEvent]] = []
for inotify_event in event_list:
logger.debug("in-event %s", inotify_event)

Expand All @@ -65,9 +64,6 @@ def matching_from_event(event):
# Check if move_from is already in the buffer
for index, event in enumerate(grouped):
if matching_from_event(event):
if TYPE_CHECKING:
# this check is hidden from mypy inside matching_from_event()
assert not isinstance(event, tuple)
grouped[index] = (event, inotify_event)
break
else:
Expand Down
2 changes: 1 addition & 1 deletion src/watchdog/observers/kqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ class KqueueEmitter(EventEmitter):
:param event_filter:
Collection of event types to emit, or None for no filtering (default).
:type event_filter:
Optional[Iterable[:class:`watchdog.events.FileSystemEvent`]]
Iterable[:class:`watchdog.events.FileSystemEvent`] | None
:param stat: stat function. See ``os.stat`` for details.
"""

Expand Down
6 changes: 3 additions & 3 deletions src/watchdog/utils/delayed_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import threading
import time
from collections import deque
from typing import Callable, Generic, Optional, TypeVar
from typing import Callable, Generic, TypeVar

T = TypeVar("T")

Expand Down Expand Up @@ -45,7 +45,7 @@ def close(self):
self._not_empty.notify()
self._not_empty.release()

def get(self) -> Optional[T]:
def get(self) -> T | None:
"""Remove and return an element from the queue, or this queue has been
closed raise the Closed exception.
"""
Expand Down Expand Up @@ -74,7 +74,7 @@ def get(self) -> Optional[T]:
self._queue.popleft()
return head

def remove(self, predicate: Callable[[T], bool]) -> Optional[T]:
def remove(self, predicate: Callable[[T], bool]) -> T | None:
"""Remove and return the first items for which predicate is True,
ignoring delay.
"""
Expand Down
8 changes: 4 additions & 4 deletions src/watchdog/utils/dirsnapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@

if TYPE_CHECKING:
from collections.abc import Iterator
from typing import Any, Callable, Optional
from typing import Any, Callable


class DirectorySnapshotDiff:
Expand Down Expand Up @@ -254,7 +254,7 @@ def __init__(
path: str,
recursive: bool = True,
stat: Callable[[str], os.stat_result] = os.stat,
listdir: Callable[[Optional[str]], Iterator[os.DirEntry]] = os.scandir,
listdir: Callable[[str | None], Iterator[os.DirEntry]] = os.scandir,
ignore_device: bool = False,
):
self.path = path
Expand Down Expand Up @@ -310,7 +310,7 @@ def __init__(
path: str,
recursive: bool = True,
stat: Callable[[str], os.stat_result] = os.stat,
listdir: Callable[[Optional[str]], Iterator[os.DirEntry]] = os.scandir,
listdir: Callable[[str | None], Iterator[os.DirEntry]] = os.scandir,
):
self.recursive = recursive
self.stat = stat
Expand Down Expand Up @@ -359,7 +359,7 @@ def paths(self) -> set[str]:
"""Set of file/directory paths in the snapshot."""
return set(self._stat_info.keys())

def path(self, uid: tuple[int, int]) -> Optional[str]:
def path(self, uid: tuple[int, int]) -> str | None:
"""Returns path for id. None if id is unknown to this snapshot."""
return self._inode_to_path.get(uid)

Expand Down
8 changes: 4 additions & 4 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
import dataclasses
import os
from queue import Empty, Queue
from typing import Optional, Type, Union, Protocol
from typing import Protocol

from watchdog.events import FileSystemEvent
from watchdog.observers.api import EventEmitter, ObservedWatch
from watchdog.utils import platform

Emitter: Type[EventEmitter]
Emitter: type[EventEmitter]

if platform.is_linux():
from watchdog.observers.inotify import InotifyEmitter as Emitter
Expand All @@ -30,7 +30,7 @@ def __call__(self, *args: str) -> str:
class StartWatching(Protocol):
def __call__(
self,
path: Optional[Union[str, bytes]] = ...,
path: bytes | str | None = ...,
use_full_emitter: bool = ...,
recursive: bool = ...,
) -> EventEmitter:
Expand All @@ -56,7 +56,7 @@ def joinpath(self, *args: str) -> str:

def start_watching(
self,
path: Optional[Union[str, bytes]] = None,
path: bytes | str | None = None,
use_full_emitter: bool = False,
recursive: bool = True,
) -> EventEmitter:
Expand Down

0 comments on commit 3b00a42

Please sign in to comment.