-
Notifications
You must be signed in to change notification settings - Fork 28
/
Copy pathhelpers.py
336 lines (275 loc) · 11.9 KB
/
helpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
from __future__ import annotations
import itertools
import os
import re
import threading
import time
from operator import itemgetter
from pathlib import Path
from typing import Any, Callable, Literal, Sequence, cast
import sublime
from LSP.plugin.core.protocol import Position as LspPosition
from LSP.plugin.core.protocol import Range as LspRange
from LSP.plugin.core.url import filename_to_uri
from more_itertools import duplicates_everseen, first_true
from wcmatch import glob
from .constants import COPILOT_WINDOW_SETTINGS_PREFIX, PACKAGE_NAME
from .log import log_error
from .settings import get_plugin_setting_dotted
from .types import (
CopilotConversationTemplates,
CopilotDocType,
CopilotGitHubWebSearch,
CopilotPayloadCompletion,
CopilotPayloadPanelSolution,
CopilotRequestConversationTurn,
CopilotRequestConversationTurnReference,
CopilotUserDefinedPromptTemplates,
)
from .utils import (
all_views,
all_windows,
drop_falsy,
erase_copilot_setting,
erase_copilot_view_setting,
get_copilot_setting,
get_project_relative_path,
get_view_language_id,
set_copilot_setting,
simple_urlopen,
)
class ActivityIndicator:
def __init__(self, callback: Callable[[dict[str, Any]], None] | None = None) -> None:
self.thread: threading.Thread | None = None
self.animation = ["⣷", "⣯", "⣟", "⡿", "⢿", "⣻", "⣽", "⣾"] # taken from Package Control
self.animation_cycled = itertools.cycle(self.animation)
self.callback = callback
self.stop_event = threading.Event()
def start(self) -> None:
if not (self.thread and self.thread.is_alive()):
self.stop_event.clear()
self.thread = threading.Thread(target=self._run, daemon=True)
self.thread.start()
def stop(self) -> None:
if self.thread:
self.stop_event.set()
self.thread.join()
if self.callback:
self.callback({"is_waiting": ""})
def _run(self) -> None:
while not self.stop_event.is_set():
if self.callback:
self.callback({"is_waiting": next(self.animation_cycled)})
time.sleep(0.1)
class GithubInfo:
AVATAR_PATH = Path(sublime.cache_path()) / f"{PACKAGE_NAME}/avatar.png"
AVATAR_RESOURCE_URL = f"res://Cache/{PACKAGE_NAME}/avatar.png"
@classmethod
def get_avatar_img_src(cls) -> str:
return cls.AVATAR_RESOURCE_URL if cls.AVATAR_PATH.is_file() else ""
@classmethod
def fetch_avatar(cls, username: str, *, size: int = 64) -> None:
"""If there is no cached avatar, fetches the avatar from GitHub and saves it to the cache."""
if not username:
log_error("No username provided for fetching avatar.")
return
cls.update_avatar(username, size=size)
@classmethod
def update_avatar(cls, username: str, *, size: int = 64) -> None:
"""Updates the avatar from GitHub and saves it to the cache directory."""
if not username:
cls.clear_avatar()
return
try:
data = simple_urlopen(f"https://github.com/{username}.png?size={size}")
except Exception as e:
log_error(f'Failed to fetch avatar for "{username}" because: {e}')
cls.clear_avatar()
return
cls.AVATAR_PATH.parent.mkdir(parents=True, exist_ok=True)
cls.AVATAR_PATH.write_bytes(data)
@classmethod
def clear_avatar(cls) -> None:
cls.AVATAR_PATH.unlink(missing_ok=True)
class CopilotIgnore:
def __init__(self, window: sublime.Window) -> None:
self.window = window
self.patterns: dict[str, list[str]] = {}
self.load_patterns()
@classmethod
def cleanup(cls) -> None:
for window in all_windows():
erase_copilot_setting(window, COPILOT_WINDOW_SETTINGS_PREFIX, "copilotignore.patterns")
for view in all_views():
erase_copilot_view_setting(view, "is_copilot_ignored")
def unload_patterns(self) -> None:
self.patterns.clear()
erase_copilot_setting(self.window, COPILOT_WINDOW_SETTINGS_PREFIX, "copilotignore.patterns")
def load_patterns(self) -> None:
self.patterns.clear()
# Load workspace patterns
for folder in self.window.folders():
self.add_patterns_from_file(os.path.join(folder, ".copilotignore"), folder)
set_copilot_setting(self.window, COPILOT_WINDOW_SETTINGS_PREFIX, "copilotignore.patterns", self.patterns)
def read_ignore_patterns(self, file_path: str) -> list[str]:
if os.path.isfile(file_path):
with open(file_path, encoding="utf-8") as f:
return list(drop_falsy(map(str.strip, f)))
return []
def add_patterns_from_file(self, file_path: str, folder: str) -> None:
if patterns := self.read_ignore_patterns(file_path):
self.patterns[folder] = patterns
def matches_any_pattern(self, file_path: str | Path) -> bool:
file_path = Path(file_path)
loaded_patterns: dict[str, list[str]] = get_copilot_setting(
self.window,
COPILOT_WINDOW_SETTINGS_PREFIX,
"copilotignore.patterns",
self.patterns,
)
for folder, patterns in loaded_patterns.items():
try:
relative_path = file_path.relative_to(folder).as_posix()
except ValueError:
continue
if glob.globmatch(relative_path, patterns, flags=glob.GLOBSTAR):
return True
return False
def trigger(self, view: sublime.View) -> bool:
if self.patterns and (file := view.file_name()):
return self.matches_any_pattern(file)
return False
def st_point_to_lsp_position(point: int, view: sublime.View) -> LspPosition:
row, col = view.rowcol_utf16(point)
return {"line": row, "character": col}
def lsp_position_to_st_point(position: LspPosition, view: sublime.View) -> int:
return view.text_point_utf16(position["line"], position["character"])
def st_region_to_lsp_range(region: sublime.Region, view: sublime.View) -> LspRange:
return {
"start": st_point_to_lsp_position(region.begin(), view),
"end": st_point_to_lsp_position(region.end(), view),
}
def lsp_range_to_st_region(range_: LspRange, view: sublime.View) -> sublime.Region:
return sublime.Region(
lsp_position_to_st_point(range_["start"], view),
lsp_position_to_st_point(range_["end"], view),
)
def prepare_completion_request_doc(view: sublime.View) -> CopilotDocType | None:
if not view.file_name():
return None
selection = view.sel()[0]
file_path = view.file_name() or f"buffer:{view.buffer().id()}"
return {
"source": view.substr(sublime.Region(0, view.size())),
"tabSize": cast(int, view.settings().get("tab_size")),
"indentSize": 1, # there is no such concept in ST
"insertSpaces": cast(bool, view.settings().get("translate_tabs_to_spaces")),
"path": file_path,
"uri": file_path if file_path.startswith("buffer:") else filename_to_uri(file_path),
"relativePath": get_project_relative_path(file_path),
"languageId": get_view_language_id(view),
"position": st_point_to_lsp_position(selection.begin(), view),
# Buffer Version. Generally this is handled by LSP, but we need to handle it here
# Will need to test getting the version from LSP
"version": view.change_count(),
}
def prepare_conversation_turn_request(
conversation_id: str,
window_id: int,
message: str,
view: sublime.View,
views: list[sublime.View],
source: Literal["panel", "inline"] = "panel",
) -> CopilotRequestConversationTurn | None:
if not (doc := prepare_completion_request_doc(view)):
return None
# References can technicaly be across multiple files
# TODO: Support references across multiple files
references: list[CopilotRequestConversationTurnReference | CopilotGitHubWebSearch] = []
visible_range = st_region_to_lsp_range(view.visible_region(), view)
views.append(view)
for view_ in views:
if not (selection := view_.sel()[0]) or view_.substr(selection).isspace():
continue
references.append({
"type": "file",
"status": "included", # included, blocked, notfound, empty
"uri": filename_to_uri(file_path) if (file_path := view_.file_name()) else f"buffer:{view.buffer().id()}",
"position": st_point_to_lsp_position(selection.begin(), view_),
"range": st_region_to_lsp_range(selection, view),
"visibleRange": visible_range,
"selection": st_region_to_lsp_range(selection, view_),
"openedAt": None,
"activeAt": None,
})
return {
"conversationId": conversation_id,
"message": message,
"workDoneToken": f"copilot_chat://{window_id}",
"doc": doc,
"computeSuggestions": True,
"references": references,
"source": source,
}
def preprocess_message_for_html(message: str) -> str:
def _escape_html(text: str) -> str:
return re.sub(r"<(.*?)>", r"<\1>", text)
new_lines: list[str] = []
inside_code_block = False
inline_code_pattern = re.compile(r"`([^`]*)`")
for line in message.split("\n"):
if line.lstrip().startswith("```"):
inside_code_block = not inside_code_block
new_lines.append(line)
continue
if not inside_code_block:
escaped_line = ""
start = 0
for match in inline_code_pattern.finditer(line):
escaped_line += _escape_html(line[start : match.start()]) + match.group(0)
start = match.end()
escaped_line += _escape_html(line[start:])
new_lines.append(escaped_line)
else:
new_lines.append(line)
return "\n".join(new_lines)
def preprocess_chat_message(
view: sublime.View,
message: str,
templates: Sequence[CopilotUserDefinedPromptTemplates] | None = None,
) -> tuple[bool, str]:
from .template import load_string_template
templates = templates or []
user_template = first_true(templates, pred=lambda t: f"/{t['id']}" == message)
if is_template := bool(user_template or CopilotConversationTemplates.has_value(message)):
message += "\n\n{{ user_prompt }}\n\n{{ code }}"
region = view.sel()[0]
lang = get_view_language_id(view, region.begin())
template = load_string_template(message)
message = template.render(
code=f"\n```{lang}\n{view.substr(region)}\n```\n",
user_prompt="\n".join(user_template["prompt"]) if user_template else "",
)
return is_template, message
def preprocess_completions(view: sublime.View, completions: list[CopilotPayloadCompletion]) -> None:
"""Preprocess the `completions` from "getCompletions" request."""
# in-place de-duplication
duplicate_indexes = list(
map(
itemgetter(0), # the index from enumerate
duplicates_everseen(enumerate(completions), key=lambda pair: pair[1]["displayText"]),
)
)
# delete from the end to avoid changing the index during iteration
for index in reversed(duplicate_indexes):
del completions[index]
# inject extra information for convenience
for completion in completions:
completion["point"] = lsp_position_to_st_point(completion["position"], view)
completion["region"] = lsp_range_to_st_region(completion["range"], view).to_tuple()
def preprocess_panel_completions(view: sublime.View, completions: Sequence[CopilotPayloadPanelSolution]) -> None:
"""Preprocess the `completions` from "getCompletionsCycling" request."""
for completion in completions:
completion["region"] = lsp_range_to_st_region(completion["range"], view).to_tuple()
def is_debug_mode() -> bool:
return bool(get_plugin_setting_dotted("settings.debug", False))