-
Notifications
You must be signed in to change notification settings - Fork 148
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Tcp mode #636
base: main
Are you sure you want to change the base?
Tcp mode #636
Changes from 41 commits
09c8c39
4b3ab1b
0ca4164
5998969
e480fc8
22f5dbd
6dbdbac
ac7be83
9360802
b76b05e
030d50f
84afabd
5174669
880f0f6
e36f3ac
7085747
22ae93f
62dc992
ce18086
08ebe54
e0ef174
a5be8fb
aacc847
792ac73
f41d5fe
aa48b4f
0f97a20
f3a02b3
bb3287e
0b431a0
129ad3d
86cbad8
8f18893
83d28d1
78c2f5c
c2b951d
e2cc7c5
d29e2ca
7743679
c9125eb
d7f4f3f
6b3c955
c5c2156
376264e
710bd2b
a4a40c0
6a1cc56
b948108
cf8e92b
b90fe56
9367ee9
f2f2f80
2163484
fc1125b
e1660c7
e5b7b73
84e0c91
98d1109
e9d5f5f
d41ce9a
9e61c2f
562148a
148f868
4f9b95b
b1205fb
032ea35
e83b716
3417de4
6a9ff59
f492327
8444ec6
341f810
1994870
3e02246
efe22b4
bdbc4ac
c019eb2
3451099
a0f6937
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
""" Language Server stdio-mode readers | ||
""" Language Server readers and writers | ||
|
||
Parts of this code are derived from: | ||
|
||
|
@@ -7,32 +7,32 @@ | |
> > MIT License https://github.com/palantir/python-jsonrpc-server/blob/0.2.0/LICENSE | ||
> > Copyright 2018 Palantir Technologies, Inc. | ||
""" | ||
# pylint: disable=broad-except | ||
import asyncio | ||
import io | ||
import os | ||
from concurrent.futures import ThreadPoolExecutor | ||
from abc import ABC, ABCMeta, abstractmethod | ||
from typing import List, Optional, Text | ||
|
||
from tornado.concurrent import run_on_executor | ||
# pylint: disable=broad-except | ||
import anyio | ||
from anyio.streams.buffered import BufferedByteReceiveStream | ||
from anyio.streams.text import TextSendStream | ||
from tornado.gen import convert_yielded | ||
from tornado.httputil import HTTPHeaders | ||
from tornado.ioloop import IOLoop | ||
from tornado.queues import Queue | ||
from traitlets import Float, Instance, default | ||
from traitlets import Float, Instance, Int, default | ||
from traitlets.config import LoggingConfigurable | ||
from traitlets.traitlets import MetaHasTraits | ||
|
||
from .non_blocking import make_non_blocking | ||
|
||
class LspStreamMeta(MetaHasTraits, ABCMeta): | ||
pass | ||
|
||
class LspStdIoBase(LoggingConfigurable): | ||
"""Non-blocking, queued base for communicating with stdio Language Servers""" | ||
|
||
executor = None | ||
class LspStreamBase(LoggingConfigurable, ABC, metaclass=LspStreamMeta): | ||
"""Non-blocking, queued base for communicating with Language Servers through anyio | ||
streams | ||
""" | ||
|
||
stream = Instance( | ||
io.RawIOBase, help="the stream to read/write" | ||
) # type: io.RawIOBase | ||
queue = Instance(Queue, help="queue to get/put") | ||
|
||
def __repr__(self): # pragma: no cover | ||
|
@@ -41,15 +41,14 @@ def __repr__(self): # pragma: no cover | |
def __init__(self, **kwargs): | ||
super().__init__(**kwargs) | ||
self.log.debug("%s initialized", self) | ||
self.executor = ThreadPoolExecutor(max_workers=1) | ||
|
||
def close(self): | ||
self.stream.close() | ||
self.log.debug("%s closed", self) | ||
@abstractmethod | ||
async def close(self): | ||
pass # pragma: no cover | ||
|
||
|
||
class LspStdIoReader(LspStdIoBase): | ||
"""Language Server stdio Reader | ||
class LspStreamReader(LspStreamBase): | ||
"""Language Server Reader | ||
|
||
Because non-blocking (but still synchronous) IO is used, rudimentary | ||
exponential backoff is used. | ||
|
@@ -58,18 +57,34 @@ class LspStdIoReader(LspStdIoBase): | |
max_wait = Float(help="maximum time to wait on idle stream").tag(config=True) | ||
min_wait = Float(0.05, help="minimum time to wait on idle stream").tag(config=True) | ||
next_wait = Float(0.05, help="next time to wait on idle stream").tag(config=True) | ||
receive_max_bytes = Int( | ||
65536, | ||
help="the maximum size a header line send by the language server may have", | ||
).tag(config=True) | ||
|
||
stream = Instance( | ||
BufferedByteReceiveStream, help="the stream to read from" | ||
) # type: BufferedByteReceiveStream | ||
|
||
def __init__(self, stream: anyio.abc.ByteReceiveStream, **kwargs): | ||
super().__init__(**kwargs) | ||
self.stream = BufferedByteReceiveStream(stream) | ||
|
||
async def close(self): | ||
await self.stream.aclose() | ||
self.log.debug("%s closed", self) | ||
|
||
@default("max_wait") | ||
def _default_max_wait(self): | ||
return 0.1 if os.name == "nt" else self.min_wait * 2 | ||
|
||
async def sleep(self): | ||
"""Simple exponential backoff for sleeping""" | ||
if self.stream.closed: # pragma: no cover | ||
if self.stream._closed: # pragma: no cover | ||
return | ||
self.next_wait = min(self.next_wait * 2, self.max_wait) | ||
try: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we really need that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed, it does not seem useful here. |
||
await asyncio.sleep(self.next_wait) | ||
await anyio.sleep(self.next_wait) | ||
except Exception: # pragma: no cover | ||
pass | ||
|
||
|
@@ -79,9 +94,7 @@ def wake(self): | |
|
||
async def read(self) -> None: | ||
"""Read from a Language Server until it is closed""" | ||
make_non_blocking(self.stream) | ||
|
||
while not self.stream.closed: | ||
while True: | ||
message = None | ||
try: | ||
message = await self.read_one() | ||
|
@@ -93,6 +106,10 @@ async def read(self) -> None: | |
self.wake() | ||
|
||
IOLoop.current().add_callback(self.queue.put_nowait, message) | ||
except anyio.ClosedResourceError: | ||
# stream was closed -> terminate | ||
self.log.debug("Stream closed while a read was still in progress") | ||
break | ||
FlyingSamson marked this conversation as resolved.
Show resolved
Hide resolved
|
||
except Exception as e: # pragma: no cover | ||
self.log.exception( | ||
"%s couldn't enqueue message: %s (%s)", self, message, e | ||
|
@@ -124,10 +141,10 @@ async def _read_content( | |
while received_size < length and len(raw_parts) < max_parts and max_empties > 0: | ||
part = None | ||
try: | ||
part = self.stream.read(length - received_size) | ||
except OSError: # pragma: no cover | ||
part = await self.stream.receive_exactly(length - received_size) | ||
except anyio.IncompleteRead: # pragma: no cover | ||
pass | ||
if part is None: | ||
if part is None: # pragma: no cover | ||
max_empties -= 1 | ||
await self.sleep() | ||
continue | ||
|
@@ -171,32 +188,60 @@ async def read_one(self) -> Text: | |
|
||
return message | ||
|
||
@run_on_executor | ||
def _readline(self) -> Text: | ||
async def _readline(self) -> Text: | ||
"""Read a line (or immediately return None)""" | ||
try: | ||
return self.stream.readline().decode("utf-8").strip() | ||
except OSError: # pragma: no cover | ||
# use same max_bytes as is default for receive for now. It seems there is no | ||
# way of getting the bytes read until max_bytes is reached, so we cannot | ||
# iterate the receive_until call with smaller max_bytes values | ||
async with anyio.move_on_after(0.2): | ||
line = await self.stream.receive_until(b"\r\n", self.receive_max_bytes) | ||
return line.decode("utf-8").strip() | ||
except anyio.IncompleteRead: | ||
# resource has been closed before the requested bytes could be retrieved | ||
# -> signal recource closed | ||
raise anyio.ClosedResourceError | ||
except anyio.DelimiterNotFound: # pragma: no cover | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Regarding the next review round: I added There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's absolutely fine! |
||
self.log.error( | ||
"Readline hit max_bytes before newline character was encountered" | ||
) | ||
return "" | ||
|
||
|
||
class LspStdIoWriter(LspStdIoBase): | ||
"""Language Server stdio Writer""" | ||
class LspStreamWriter(LspStreamBase): | ||
"""Language Server Writer""" | ||
|
||
stream = Instance( | ||
TextSendStream, help="the stream to write to" | ||
) # type: TextSendStream | ||
|
||
def __init__(self, stream: anyio.abc.ByteSendStream, **kwargs): | ||
super().__init__(**kwargs) | ||
self.stream = TextSendStream(stream, encoding="utf-8") | ||
|
||
async def close(self): | ||
await self.stream.aclose() | ||
self.log.debug("%s closed", self) | ||
|
||
async def write(self) -> None: | ||
"""Write to a Language Server until it closes""" | ||
while not self.stream.closed: | ||
while True: | ||
message = await self.queue.get() | ||
try: | ||
body = message.encode("utf-8") | ||
response = "Content-Length: {}\r\n\r\n{}".format(len(body), message) | ||
await convert_yielded(self._write_one(response.encode("utf-8"))) | ||
n_bytes = len(message.encode("utf-8")) | ||
response = "Content-Length: {}\r\n\r\n{}".format(n_bytes, message) | ||
await convert_yielded(self._write_one(response)) | ||
except ( | ||
anyio.ClosedResourceError, | ||
anyio.BrokenResourceError, | ||
): # pragma: no cover | ||
# stream was closed -> terminate | ||
self.log.debug("Stream closed while a write was still in progress") | ||
break | ||
except Exception: # pragma: no cover | ||
self.log.exception("%s couldn't write message: %s", self, response) | ||
finally: | ||
self.queue.task_done() | ||
|
||
@run_on_executor | ||
def _write_one(self, message) -> None: | ||
self.stream.write(message) | ||
self.stream.flush() | ||
async def _write_one(self, message) -> None: | ||
await self.stream.send(message) |
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this side effect really necessary? I think we only ever call this method more or less immediately after a read operation was performed, at which time the stream was alive then. That is, hardly any time will have gone by between then and the time we entered this function which will as first action recheck that the stream is alive, which isn't even a concern in the remainder of that function.
Just asking because it seems rather complicated to produce future-proof code that will check wether the stream is alive. The used
_closed
property is undocumented and might go away in future versions of anyio. Another option would be areceive_exactly(0)
within atry IncompleteRead except ...
, however the border case of 0 bytes to read also seems risky, asreceive_exaclty
might or might not decide in future version whether 0 bytes can safely read by a closed steam (namely returning emptybytes
-object) or not (namely throwingIncompleteRead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure why it is here. I imagine it could be useful if it was down where the sleep is called. It would be fine to remove it and wrap the entire
sleep()
call which is inside_read_content()
in a try-except instead, just to be safe.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But which exception would this catch? I mean sleep then only consists of computing the minimum of two floats and a call to
anyio.sleep()
which as far as I can tell should not throw, especially not as a consequence of the stream being closed.In
_read_content
, after the call to sleep is done it will try to receive another part of the message, but that part already is guarded by a try ... catch block capturing the case that during the sleep the stream was closed.