Skip to content

Commit

Permalink
chore: adopt ruff (#503)
Browse files Browse the repository at this point in the history
* chore(actions): bump lycheeverse/lychee-action from 1.9.2 to 1.9.3 (#499)

Bumps [lycheeverse/lychee-action](https://github.com/lycheeverse/lychee-action) from 1.9.2 to 1.9.3.
- [Release notes](https://github.com/lycheeverse/lychee-action/releases)
- [Commits](lycheeverse/lychee-action@v1.9.2...v1.9.3)

---
updated-dependencies:
- dependency-name: lycheeverse/lychee-action
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Signed-off-by: Keming <kemingy94@gmail.com>

* chore: adopt ruff

Signed-off-by: Keming <kemingy94@gmail.com>

---------

Signed-off-by: dependabot[bot] <support@github.com>
Signed-off-by: Keming <kemingy94@gmail.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
  • Loading branch information
kemingy and dependabot[bot] committed Feb 16, 2024
1 parent 1cbaaa6 commit f16c897
Show file tree
Hide file tree
Showing 27 changed files with 202 additions and 283 deletions.
265 changes: 86 additions & 179 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ async-stream = "0.3.5"
serde = "1.0"
serde_json = "1.0"
utoipa = "4"
utoipa-swagger-ui = { version = "5", features = ["axum"] }
utoipa-swagger-ui = { version = "6", features = ["axum"] }
20 changes: 7 additions & 13 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ doc:

clean:
@cargo clean
@-rm -rf build/ dist/ .eggs/ site/ *.egg-info .pytest_cache .mypy_cache
@-rm -rf build/ dist/ .eggs/ site/ *.egg-info .pytest_cache .mypy_cache .ruff_cache
@-find . -name '*.pyc' -type f -exec rm -rf {} +
@-find . -name '__pycache__' -exec rm -rf {} +

Expand All @@ -55,23 +55,17 @@ publish: package
twine upload dist/*

format:
@autoflake --in-place --recursive ${PY_SOURCE_FILES}
@isort --project=mosec ${PY_SOURCE_FILES}
@black ${PY_SOURCE_FILES}
@ruff check --fix ${PY_SOURCE_FILES}
@ruff format ${PY_SOURCE_FILES}
@cargo +nightly fmt --all

lint:
@pip install -q -e .
isort --check --diff --project=mosec ${PY_SOURCE_FILES}
black --check --diff ${PY_SOURCE_FILES}
pylint -j 8 --recursive=y mosec setup.py
pylint -j 8 --recursive=y --disable=import-error examples --generated-members=numpy.*,torch.*,cv2.*,cv.*
pylint -j 8 --recursive=y tests --disable=unused-argument,missing-function-docstring,missing-class-docstring,redefined-outer-name,too-few-public-methods,consider-using-with
pydocstyle mosec
@ruff check ${PY_SOURCE_FILES}
@-rm mosec/_version.py
pyright --stats
mypy --non-interactive --install-types ${PY_SOURCE_FILES}
cargo +nightly fmt -- --check
@pyright --stats
@mypy --non-interactive --install-types ${PY_SOURCE_FILES}
@cargo +nightly fmt -- --check

semantic_lint:
@cargo clippy -- -D warnings
Expand Down
5 changes: 3 additions & 2 deletions examples/echo.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"""Example: Sample structures for using mosec server."""

import time
from types import MappingProxyType as ImmutableDict
from typing import List

from mosec import Server, ValidationError, Worker, get_logger
Expand All @@ -24,7 +25,7 @@
class Preprocess(Worker):
"""Sample Class."""

example = {"time": 0}
example = ImmutableDict({"time": 0})

def forward(self, data: dict) -> float:
logger.debug("pre received %s", data)
Expand All @@ -39,7 +40,7 @@ def forward(self, data: dict) -> float:
class Inference(Worker):
"""Sample Class."""

example = [0, 1e-5, 2e-4]
example = (0, 1e-5, 2e-4)

def forward(self, data: List[float]) -> List[float]:
logger.info("sleeping for %s seconds", max(data))
Expand Down
22 changes: 10 additions & 12 deletions examples/server_side_event/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,15 @@
import httpx
from httpx_sse import connect_sse

with httpx.Client() as client:
with connect_sse(
client, "POST", "http://127.0.0.1:8000/inference", json={"text": "mosec"}
) as event_source:
for sse in event_source.iter_sse():
print(f"Event({sse.event}): {sse.data}")
with httpx.Client() as client, connect_sse(
client, "POST", "http://127.0.0.1:8000/inference", json={"text": "mosec"}
) as event_source:
for sse in event_source.iter_sse():
print(f"Event({sse.event}): {sse.data}")

# error handling
with httpx.Client() as client:
with connect_sse(
client, "POST", "http://127.0.0.1:8000/inference", json={"error": "mosec"}
) as event_source:
for sse in event_source.iter_sse():
print(f"Event({sse.event}): {sse.data}")
with httpx.Client() as client, connect_sse(
client, "POST", "http://127.0.0.1:8000/inference", json={"error": "mosec"}
) as event_source:
for sse in event_source.iter_sse():
print(f"Event({sse.event}): {sse.data}")
2 changes: 2 additions & 0 deletions mosec/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ def parse_arguments() -> argparse.Namespace:
"`--wait` is deprecated and will be removed in v1, please configure"
"the `max_wait_time` on `Server.append_worker`",
DeprecationWarning,
stacklevel=2,
)

if args.debug:
Expand All @@ -155,6 +156,7 @@ def parse_arguments() -> argparse.Namespace:
"`--debug` is deprecated and will be removed in v1, please configure"
"`--log_level=debug`",
DeprecationWarning,
stacklevel=2,
)

if not is_port_available(args.address, args.port):
Expand Down
6 changes: 4 additions & 2 deletions mosec/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def set_mosec_timeout(duration: int):
Args:
duration (float): the duration in seconds before timing out
"""

def handler(signum, frame):
Expand Down Expand Up @@ -114,13 +115,14 @@ def __init__(
Args:
worker: subclass of `mosec.Worker` implemented by users.
max_batch_size: maximum batch size for this worker.
shutdown: `multiprocessing.synchronize.Event` object for shutdown
IPC.
shutdown: `multiprocessing.synchronize.Event` object for shutdown IPC.
shutdown_notify: shutdown notification event.
socket_prefix: prefix for the socket addresses.
stage_name: identification name for this worker stage.
worker_id: identification number for worker processes at the same
stage.
timeout: timeout for the `forward` function.
"""
self.name = f"<{stage_name}|{worker_id}>"
self.timeout = timeout
Expand Down
1 change: 1 addition & 0 deletions mosec/dry_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def __init__(self, process_context: SpawnContext, shutdown_notify: Event):
Args:
process_context: server context of spawn process
shutdown_notify: event of server will shutdown
"""
self.process_context = process_context
self.shutdown_notify = shutdown_notify
Expand Down
8 changes: 5 additions & 3 deletions mosec/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ def get_env_namespace(prefix: str = MOSEC_ENV_PREFIX) -> Namespace:
warnings.warn(
f"failed to convert env {var}={value} to type {converter} {err}, "
"will skip this one",
RuntimeWarning,
stacklevel=2,
)
else:
setattr(namespace, name, val)
Expand Down Expand Up @@ -97,8 +99,8 @@ def validate_env(env: Union[Any, List[Dict[str, str]]], num: int):
return
assert len(env) == num, "len(env) must equal to num"
valid = True
if not isinstance(env, List):
valid = False
elif not all(isinstance(x, Dict) and validate_str_dict(x) for x in env):
if not isinstance(env, List) or not all(
isinstance(x, Dict) and validate_str_dict(x) for x in env
):
valid = False
assert valid, "env must be a list of string dictionary"
2 changes: 2 additions & 0 deletions mosec/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def __init__(
fmt (str): logging message format (% style)
datefmt (str): datatime format
prefix (str): prefix of target
"""
# partially align with rust tracing_subscriber
self.colors = {
Expand Down Expand Up @@ -109,6 +110,7 @@ def __init__(
fmt (str): logging message format (% style)
datefmt (str): datatime format
prefix (str): prefix of target
"""
super().__init__(fmt, datefmt, "%")
self.prefix = prefix
Expand Down
2 changes: 2 additions & 0 deletions mosec/mixin/msgpack_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def serialize(self, data: Any) -> bytes:
Raises:
EncodingError: if the data cannot be serialized with msgpack
"""
import msgpack # type: ignore

Expand All @@ -67,6 +68,7 @@ def deserialize(self, data: bytes) -> Any:
Raises:
DecodingError: if the data cannot be deserialized with msgpack
"""
import msgpack

Expand Down
2 changes: 2 additions & 0 deletions mosec/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def __init__(
name (str): name of its belonging coordinator.
addr (str): Unix domain socket address in file system's namespace.
timeout (float, optional): socket timeout. Defaults to 2.0 seconds.
"""
self.socket = socket.socket(
socket.AF_UNIX,
Expand Down Expand Up @@ -117,6 +118,7 @@ def receive(self) -> Tuple[bytes, Sequence[bytes], Sequence[int], Sequence[bytes
f"IPC data ({total_bytes} bytes) is large, "
"which may affect performance",
RuntimeWarning,
stacklevel=2,
)
return flag, ids, states, payloads

Expand Down
6 changes: 6 additions & 0 deletions mosec/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def __init__(
timeout (int): timeout (second) for the `forward` function.
start_method: the process starting method ("spawn" or "fork")
env: the environment variables to set before starting the process
"""
self.worker = worker
self.num = num
Expand Down Expand Up @@ -145,6 +146,7 @@ def _check(
Returns:
Whether the worker is started successfully
"""
# for every sequential stage
self._pool = [p if self._process_healthy(p) else None for p in self._pool]
Expand Down Expand Up @@ -188,6 +190,7 @@ def __init__(self, work_path: str, shutdown: Event, shutdown_notify: Event):
work_path: path of working directory
shutdown: Event of server shutdown
shutdown_notify: Event of server will shutdown
"""
self.runtimes: List[Runtime] = []

Expand All @@ -214,6 +217,7 @@ def check_and_start(self, init: bool) -> Union[Runtime, None]:
Args:
init: whether the worker is tried to start at the first time
"""
for worker_runtime in self.runtimes:
if not worker_runtime._check( # pylint: disable=protected-access
Expand All @@ -231,6 +235,7 @@ def __init__(self, timeout: int):
Args:
timeout: service timeout in milliseconds
"""
self.process: Optional[subprocess.Popen] = None

Expand Down Expand Up @@ -267,6 +272,7 @@ def start(self, config_path: Path) -> subprocess.Popen:
Args:
config_path: configuration path of mosec
"""
# pylint: disable=consider-using-with
self.process = subprocess.Popen([self.server_path, config_path])
Expand Down
24 changes: 13 additions & 11 deletions mosec/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@
model serving.
Dynamic Batching
----------------
The user may enable the dynamic batching feature for any stage when the
corresponding worker is appended, by setting the
:py:meth:`append_worker(max_batch_size) <Server.append_worker>`.
Multiprocessing
---------------
The user may spawn multiple processes for any stage when the
corresponding worker is appended, by setting the
Expand Down Expand Up @@ -178,6 +176,7 @@ def register_daemon(self, name: str, proc: subprocess.Popen):
Args:
name: the name of this daemon
proc: the process handle of the daemon
"""
assert isinstance(name, str), "daemon name should be a string"
assert isinstance(
Expand Down Expand Up @@ -215,6 +214,7 @@ def append_worker(
route: the route path for this worker. If not configured, will use the
default route path `/inference`. If a list is provided, different
route paths will share the same worker.
"""
timeout = timeout if timeout >= 1 else self._configs["timeout"] // 1000
max_wait_time = max_wait_time if max_wait_time >= 1 else self._configs["wait"]
Expand Down Expand Up @@ -296,15 +296,17 @@ def make_body(description, mime, schema):
request_worker_cls.resp_mime_type,
input_schema,
),
"responses": None
if not return_schema
else {
"200": make_body(
"Mosec Inference Result",
response_worker_cls.resp_mime_type,
return_schema,
)
},
"responses": (
None
if not return_schema
else {
"200": make_body(
"Mosec Inference Result",
response_worker_cls.resp_mime_type,
return_schema,
)
}
),
"schemas": {**input_components, **return_components},
"mime": response_worker_cls.resp_mime_type,
}
12 changes: 11 additions & 1 deletion mosec/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,14 @@ def __init__(self):
This method doesn't require the child class to override.
"""
super().__init__()

def serialize_ipc(self, data: Any) -> bytes:
"""Define IPC serialization method.
Args:
data: returned data from :py:meth:`forward`
"""
return pickle.dumps(data, protocol=pickle.HIGHEST_PROTOCOL)

Expand All @@ -88,6 +90,7 @@ def deserialize_ipc(self, data: bytes) -> Any:
Args:
data: input data for :py:meth:`forward`
"""
return pickle.loads(data)

Expand Down Expand Up @@ -139,6 +142,7 @@ def serialize(self, data: Any) -> bytes:
Raises:
EncodingError: if the data cannot be serialized with JSON
"""
try:
data_bytes = json.dumps(data, indent=2).encode()
Expand All @@ -161,6 +165,7 @@ def deserialize(self, data: bytes) -> Any:
Raises:
DecodingError: if the data cannot be deserialized with JSON
"""
try:
data_json = json.loads(data) if data else {}
Expand Down Expand Up @@ -193,12 +198,15 @@ def forward(self, data: Any) -> Any:
- for a multi-stage worker that is neither `ingress` not `egress`, data
will go through ``<deserialize_ipc> -> <forward> -> <serialize_ipc>``
"""
raise NotImplementedError

@classmethod
def get_forward_json_schema(
cls, target: ParseTarget, ref_template: str # pylint: disable=unused-argument
cls,
target: ParseTarget,
ref_template: str, # pylint: disable=unused-argument
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
"""Retrieve the JSON schema for the `forward` method of the class.
Expand All @@ -225,6 +233,7 @@ def get_forward_json_schema(
The :py:const:`MOSEC_REF_TEMPLATE` constant should be used as a reference
template according to openapi standards.
"""
return {}, {}

Expand All @@ -244,6 +253,7 @@ def send_stream_event(self, text: str, index: int = 0):
index: the index of the stream event. For the single request, this will
always be 0. For dynamic batch request, this should be the index of
the request in this batch.
"""
if self._stream_queue is None or self._stream_semaphore is None:
raise RuntimeError("the worker stream or semaphore is not initialized")
Expand Down
Loading

0 comments on commit f16c897

Please sign in to comment.