Skip to content

Commit

Permalink
refactor: use py38 plugin host
Browse files Browse the repository at this point in the history
Signed-off-by: Jack Cherng <jfcherng@gmail.com>
  • Loading branch information
jfcherng committed Apr 19, 2024
1 parent f14f1d1 commit d29894e
Show file tree
Hide file tree
Showing 16 changed files with 453 additions and 937 deletions.
1 change: 1 addition & 0 deletions .python-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3.8
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
MIT License

Copyright (c) 2020-2023 Jack Cherng <jfcherng@gmail.com>
Copyright (c) 2020-2024 Jack Cherng <jfcherng@gmail.com>

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
33 changes: 21 additions & 12 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,19 +1,28 @@
UV_INSTALL_FLAGS :=

.PHONY: all
all: fix
all:

.PHONY: install
install:
pip install -U -r requirements.txt
uv pip install $(UV_INSTALL_FLAGS) -r requirements.txt

.PHONY: pip-compile
pip-compile:
uv pip compile --upgrade requirements.in -o requirements.txt

.PHONY: check
check:
.PHONY: ci-check
ci-check:
@echo "========== check: mypy =========="
mypy -p plugin
flake8 .
black --check --diff --preview .
isort --check --diff .
@echo "========== check: ruff (lint) =========="
ruff check --diff .
@echo "========== check: ruff (format) =========="
ruff format --diff .

.PHONY: fix
fix:
autoflake --in-place .
black --preview .
isort .
.PHONY: ci-fix
ci-fix:
@echo "========== fix: ruff (lint) =========="
ruff check --fix .
@echo "========== fix: ruff (format) =========="
ruff format .
7 changes: 5 additions & 2 deletions boot.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
from __future__ import annotations


def reload_plugin() -> None:
import sys

# remove all previously loaded plugin modules
prefix = __package__ + "."
prefix = f"{__package__}."
for module_name in tuple(filter(lambda m: m.startswith(prefix) and m != __name__, sys.modules)):
del sys.modules[module_name]


reload_plugin()

from .plugin import * # noqa: E402, F401, F403
from .plugin import * # noqa: E402, F403
2 changes: 2 additions & 0 deletions plugin/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

# import all listeners and commands
from .commands import (
LspIntelephensePatcherOpenServerBinaryDirCommand,
Expand Down
43 changes: 31 additions & 12 deletions plugin/commands.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,41 @@
from __future__ import annotations

import importlib
import os
from collections.abc import Callable
from types import ModuleType
from typing import Any, Callable, Dict, List, Tuple, cast
from typing import Any, Dict, Tuple, cast

import sublime
import sublime_plugin
from lsp_utils import ServerNpmResource

from .patcher import AlreadyPatchedException, Patcher, PatcherUnsupportedException, json_dumps, restore_directory
from .patcher import (
AlreadyPatchedException,
Patcher,
PatcherUnsupportedException,
json_dumps,
restore_directory,
)
from .plugin_message import console_msg, error_box, info_box
from .utils import get_command_name


def restart_intelephense_server() -> None:
view = sublime.active_window().active_view()
if view:
if view := sublime.active_window().active_view():
view.run_command("lsp_restart_server", {"config_name": "LSP-intelephense"})


def st_command_run_precheck(func: Callable) -> Callable:
def wrapped(self: sublime_plugin.Command, *args, **kwargs) -> None:
def checker() -> Tuple[ModuleType, ServerNpmResource]:
def checker() -> tuple[ModuleType, ServerNpmResource]:
try:
plugin_module = importlib.import_module("LSP-intelephense.plugin")
lsp_plugin = plugin_module.LspIntelephensePlugin # type: ignore
except (ImportError, AttributeError):
raise RuntimeError("LSP-intelephense is not installed...")

server_resource = lsp_plugin.get_server() # type: ServerNpmResource
server_resource: ServerNpmResource = lsp_plugin.get_server()

if not os.path.isfile(server_resource.binary_path):
raise RuntimeError(
Expand Down Expand Up @@ -62,9 +70,12 @@ def run(

try:
is_success, occurrences = Patcher.patch_file(binary_path, allow_unsupported)

if is_success and occurrences > 0:
info_box('[{_}] "{}" is patched with {} occurrences!', binary_path, occurrences)
info_box(
'[{_}] "{}" is patched with {} occurrences!',
binary_path,
occurrences,
)
else:
error_box("[{_}] Unfortunately, somehow the patching failed.")
except AlreadyPatchedException:
Expand Down Expand Up @@ -133,14 +144,22 @@ def run(self, server_resource: ServerNpmResource) -> None:


class LspIntelephensePatcherShowMenuCommand(sublime_plugin.WindowCommand):
menu_items = [
menu_items: list[tuple[str, type, dict[str, Any]]] = [
# title, cmd_class, cmd_arg
("Patch Intelephense", LspIntelephensePatcherPatchCommand, {}),
("Patch Intelephense (Allow Unsupported)", LspIntelephensePatcherPatchCommand, {"allow_unsupported": True}),
(
"Patch Intelephense (Allow Unsupported)",
LspIntelephensePatcherPatchCommand,
{"allow_unsupported": True},
),
("Un-patch Intelephense", LspIntelephensePatcherUnpatchCommand, {}),
("Re-patch Intelephense", LspIntelephensePatcherRepatchCommand, {}),
("Open Server Binary Directory", LspIntelephensePatcherOpenServerBinaryDirCommand, {}),
] # type: List[Tuple[str, type, Dict[str, Any]]]
(
"Open Server Binary Directory",
LspIntelephensePatcherOpenServerBinaryDirCommand,
{},
),
]

def run(self) -> None:
titles, cmd_classes, cmd_args = cast(
Expand Down
68 changes: 35 additions & 33 deletions plugin/patcher.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
from __future__ import annotations

import datetime
import io
import json
import operator
import os
import re
import shutil
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union
from collections.abc import Callable, Iterable
from typing import Any


def now_isoformat() -> str:
return datetime.datetime.now(datetime.timezone.utc).astimezone().isoformat()


def backup_files(files: Iterable[str], force_overwrite: bool = False) -> List[str]:
def backup_files(files: Iterable[str], force_overwrite: bool = False) -> list[str]:
ok_files = []

for file in files:
file_backup = file + ".bak"
file_backup = f"{file}.bak"

if os.path.isdir(file_backup):
continue
Expand All @@ -28,7 +30,7 @@ def backup_files(files: Iterable[str], force_overwrite: bool = False) -> List[st
return ok_files


def restore_directory(path: str) -> List[str]:
def restore_directory(path: str) -> list[str]:
if not os.path.isdir(path):
return []

Expand All @@ -52,17 +54,17 @@ def json_dumps(value: Any, **kwargs) -> str:
return json.dumps(value, ensure_ascii=False, sort_keys=True, **kwargs)


def file_get_content(path: str, **kwargs) -> Optional[str]:
def file_get_content(path: str, **kwargs) -> str | None:
try:
with io.open(path, "r", encoding="utf-8", **kwargs) as f:
with open(path, encoding="utf-8", **kwargs) as f:
return f.read()
except Exception:
return None


def file_set_content(path: str, content: str, **kwargs) -> bool:
try:
with io.open(path, "w", encoding="utf-8", newline="\n", **kwargs) as f:
with open(path, "w", encoding="utf-8", newline="\n", **kwargs) as f:
f.write(content)
return True
except Exception:
Expand Down Expand Up @@ -110,10 +112,8 @@ def _compare_2(self, other: Any, comparator: Callable[[Any, Any], bool]) -> bool
raise ValueError("SchemaVersion can only be compared with itself or str.")

@staticmethod
def from_str(v_str: str) -> "SchemaVersion":
m = re.search(r"^(\d+)(?:\.(\d+))?(?:\.(\d+))?", v_str.strip())

if not m:
def from_str(v_str: str) -> SchemaVersion:
if not (m := re.search(r"^(\d+)(?:\.(\d+))?(?:\.(\d+))?", v_str.strip())):
raise ValueError("The input is not a valid version string...")

major = int(m.group(1))
Expand All @@ -131,22 +131,26 @@ def __init__(self) -> None:
class PatcherUnsupportedException(Exception):
def __init__(
self,
version: Union[str, SchemaVersion],
supported_versions: Iterable[Union[str, SchemaVersion]] = [],
version: str | SchemaVersion,
supported_versions: Iterable[str | SchemaVersion] = [],
) -> None:
msg = '"intelephense" v{} is probably unsupported (or untested) by the patcher...'.format(version)

v_versions = ["v" + str(v) for v in supported_versions]
if v_versions:
msg += " The patcher supports {}".format(", ".join(v_versions))
msg = f'"intelephense" v{version} is probably unsupported (or untested) by the patcher...'

if v_versions := [f"v{v}" for v in supported_versions]:
msg += f' The patcher supports {", ".join(v_versions)}'
super().__init__(msg)


class PatchPattern:
__slots__ = ("search", "replace", "flags", "count")

def __init__(self, search: str = "", replace: str = "", flags: int = re.UNICODE, count: int = 0) -> None:
def __init__(
self,
search: str = "",
replace: str = "",
flags: int = re.UNICODE,
count: int = 0,
) -> None:
self.search = search
self.replace = replace
self.flags = flags
Expand All @@ -168,14 +172,12 @@ class Patcher:
PATCH_INFO_MARK_PAIR = ("--- PATCH_INFO_BEGIN ---", "--- PATCH_INFO_END ---")
PATCHED_MARK_DETECTION = "/** FILE HAS BEEN PATCHED **/"

PATCHED_MARK = "\n".join(
[
# indicates file has been patched
PATCHED_MARK_DETECTION,
# patch info
"/** " + " {info} ".join(PATCH_INFO_MARK_PAIR) + " **/",
]
)
PATCHED_MARK = "\n".join([
# indicates file has been patched
PATCHED_MARK_DETECTION,
# patch info
"/** " + " {info} ".join(PATCH_INFO_MARK_PAIR) + " **/",
])

LICENSE_OBJECT = {
"message": {
Expand All @@ -189,7 +191,7 @@ class Patcher:
}

@classmethod
def patch_file(cls, path: str, allow_unsupported: bool = False) -> Tuple[bool, int]:
def patch_file(cls, path: str, allow_unsupported: bool = False) -> tuple[bool, int]:
if not path or not os.path.isfile(path):
return (False, 0)

Expand All @@ -206,7 +208,7 @@ def patch_file(cls, path: str, allow_unsupported: bool = False) -> Tuple[bool, i
return (is_success, occurrences)

@classmethod
def patch_str(cls, content: str, allow_unsupported: bool = False) -> Tuple[str, int]:
def patch_str(cls, content: str, allow_unsupported: bool = False) -> tuple[str, int]:
if content.rfind(cls.PATCHED_MARK_DETECTION) > -1:
raise AlreadyPatchedException()

Expand All @@ -225,7 +227,7 @@ def patch_str(cls, content: str, allow_unsupported: bool = False) -> Tuple[str,
)

@classmethod
def get_patch_patterns(cls) -> List[PatchPattern]:
def get_patch_patterns(cls) -> list[PatchPattern]:
LICENSE_OBJECT_JS = json_dumps(cls.LICENSE_OBJECT)

return [
Expand Down Expand Up @@ -270,7 +272,7 @@ def generate_patch_marker(cls, occurrences: int = 0) -> str:
)

@classmethod
def extract_patch_info(cls, path_or_content: str) -> Dict[str, Any]:
def extract_patch_info(cls, path_or_content: str) -> dict[str, Any]:
content = file_get_content(path_or_content) or path_or_content
region = [content.rfind(mark) for mark in cls.PATCH_INFO_MARK_PAIR]

Expand All @@ -282,7 +284,7 @@ def extract_patch_info(cls, path_or_content: str) -> Dict[str, Any]:
return json.loads(content[slice(*region)].strip())

@classmethod
def extract_intelephense_version(cls, path_or_content: str) -> "SchemaVersion":
def extract_intelephense_version(cls, path_or_content: str) -> SchemaVersion:
content = file_get_content(path_or_content) or path_or_content
m = re.search(r'\bVERSION=["\'](\d+(?:\.\d+)?(?:\.\d+)?)', content)

Expand Down
5 changes: 4 additions & 1 deletion plugin/plugin_message.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from __future__ import annotations

import sublime


def pluginfy_msg(msg: str, *args, **kwargs) -> str:
package_name = __package__.split(".")[0]
assert __package__
package_name = __package__.partition(".")[0]

return msg.format(*args, _=package_name, **kwargs)

Expand Down
18 changes: 4 additions & 14 deletions plugin/utils.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,12 @@
from __future__ import annotations

import re
from typing import Iterable, Iterator, TypeVar, Union
from typing import TypeVar

T = TypeVar("T")


def unique(it: Iterable[T], stable: bool = False) -> Iterator[T]:
"""
Generates a collection of unique items from the iterable.
@param stable If True, returned items are garanteed to be in their original relative ordering.
"""

from collections import OrderedDict

return (OrderedDict.fromkeys(it).keys() if stable else set(it)).__iter__()


def get_command_name(var: Union[type, str]) -> str:
def get_command_name(var: type | str) -> str:
name = var.__name__ if isinstance(var, type) else str(var)

name = re.sub(r"Command$", "", name)
Expand Down
Loading

0 comments on commit d29894e

Please sign in to comment.