Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tcp mode #636

Open
wants to merge 79 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
79 commits
Select commit Hold shift + click to select a range
09c8c39
Move sessions to anyio
FlyingSamson Feb 21, 2021
4b3ab1b
Add support for connection to lsp providers through tcp
FlyingSamson Mar 17, 2021
0ca4164
Merge pull request #1 from krassowski/master
FlyingSamson Apr 9, 2021
5998969
Merge branch 'master' into tcp_mode
FlyingSamson Apr 9, 2021
e480fc8
Fix error where jupyter would not start after page reload
FlyingSamson May 23, 2021
22f5dbd
Merge pull request #2 from krassowski/master
FlyingSamson May 24, 2021
6dbdbac
Merge branch 'master' into tcp_mode
FlyingSamson May 24, 2021
ac7be83
Adapt to changes of start_blocking_portal() in anyio3.0
FlyingSamson Jun 5, 2021
9360802
Kill lsp-servers which are not terminating willingly when asked to
FlyingSamson Jun 20, 2021
b76b05e
Merge branch 'krassowski:master' into master
FlyingSamson Jun 28, 2021
030d50f
Merge branch 'master' into tcp_mode
FlyingSamson Jun 28, 2021
84afabd
Code style fixes
FlyingSamson Jul 1, 2021
5174669
Make it work for LSPs running in own process on localhost
FlyingSamson Jul 3, 2021
880f0f6
Split Session in separate classes for TCP and Stdio
FlyingSamson Jul 4, 2021
e36f3ac
Move stream from LspStreamBase to LspStreamReader and LspStreamWriter
FlyingSamson Jul 5, 2021
7085747
Fix unit tests (switched to anyio)
FlyingSamson Jul 8, 2021
22ae93f
Remove code related to externally running servers for now
FlyingSamson Jul 8, 2021
62dc992
Extend docs for extending language servers with different modes
FlyingSamson Jul 8, 2021
ce18086
Merge branch 'krassowski:master' into master
FlyingSamson Jul 8, 2021
08ebe54
Merge branch 'master' into tcp_mode
FlyingSamson Jul 8, 2021
e0ef174
Fix codestyle
FlyingSamson Jul 9, 2021
a5be8fb
Make maximum bytes for receive configurable
FlyingSamson Jul 9, 2021
aacc847
Fix spelling in doc
FlyingSamson Jul 9, 2021
792ac73
Enforce interfaces by making base classes for Session and Stream abst…
FlyingSamson Jul 11, 2021
f41d5fe
Add unit test for reading over tcp
FlyingSamson Jul 16, 2021
aa48b4f
Issue debug message if stream was closed prematurely
FlyingSamson Jul 16, 2021
0f97a20
Codestyle fixes
FlyingSamson Jul 16, 2021
f3a02b3
Fix type of streams in Reader's and Writer's c-tors
FlyingSamson Jul 16, 2021
bb3287e
Add instructions for specifying port in language servers argv
FlyingSamson Jul 16, 2021
0b431a0
Remove no longer required ThreadPoolExecutor from Stream classes
FlyingSamson Jul 17, 2021
129ad3d
Increase sleep before connecting in test to ensure that the tcp serve…
FlyingSamson Jul 17, 2021
86cbad8
Use newly introduced `env` parameter in `anyio.open_process`
FlyingSamson Jul 19, 2021
8f18893
Mark abstract methods with 'no cover'
FlyingSamson Jul 22, 2021
83d28d1
Add specs for pyls over tcp and include it into unit testing
FlyingSamson Jul 22, 2021
78c2f5c
Add unit test checking that the LS process is brought down no matter …
FlyingSamson Jul 24, 2021
c2b951d
Test that unknown modes in spec are detected
FlyingSamson Jul 24, 2021
e2cc7c5
Mark code parts `no cover` that cannot be tested easily
FlyingSamson Jul 24, 2021
d29e2ca
Remove no longer required code to make file non-blocking
FlyingSamson Jul 24, 2021
7743679
Move from `localhost` to `127.0.0.1`
FlyingSamson Jul 24, 2021
c9125eb
Rewrite session handling with anyio without need for blocking portal
FlyingSamson Jul 31, 2021
d7f4f3f
Code style fixes
FlyingSamson Jul 31, 2021
6b3c955
Merge branch 'krassowski:master' into master
FlyingSamson Aug 1, 2021
c5c2156
Merge branch 'master' into tcp_mode
FlyingSamson Aug 1, 2021
376264e
Merge branch 'krassowski:master' into master
FlyingSamson Aug 3, 2021
710bd2b
Merge branch 'master' into tcp_mode
FlyingSamson Aug 3, 2021
a4a40c0
Add changelog entry
FlyingSamson Aug 3, 2021
6a1cc56
Merge branch 'jupyter-lsp:master' into master
FlyingSamson Oct 23, 2021
b948108
Merge branch 'master' into tcp_mode
FlyingSamson Oct 23, 2021
cf8e92b
Remove unnecessary try catch
FlyingSamson Oct 23, 2021
b90fe56
Try increasing timeout for stop test to make it pass on the windows r…
FlyingSamson Oct 23, 2021
9367ee9
Merge branch 'jupyter-lsp:master' into master
FlyingSamson Oct 26, 2021
f2f2f80
Merge branch 'master' into tcp_mode
FlyingSamson Oct 26, 2021
2163484
Handle language server process termination differently on Windows
FlyingSamson Nov 3, 2021
fc1125b
Code style fixes
FlyingSamson Nov 6, 2021
e1660c7
Fix coverage of test file itself
FlyingSamson Nov 7, 2021
e5b7b73
Merge branch 'jupyter-lsp:master' into master
FlyingSamson Nov 14, 2021
84e0c91
Merge branch 'master' into tcp_mode2
FlyingSamson Nov 14, 2021
98d1109
Fix problem when using 0 seconds for stop timeout
FlyingSamson Nov 14, 2021
e9d5f5f
Fix missing coverage if tcp connection is established on first try
FlyingSamson Nov 17, 2021
d41ce9a
Removed probably unnecessary test for closed stream in sleep
FlyingSamson Nov 17, 2021
9e61c2f
Merge branch 'jupyter-lsp:master' into master
FlyingSamson Dec 1, 2021
562148a
Merge branch 'master' into tcp_mode
FlyingSamson Dec 1, 2021
148f868
Merge branch 'jupyter-lsp:master' into master
FlyingSamson Apr 24, 2022
4f9b95b
Merge remote-tracking branch 'origin/master' into tcp_mode
FlyingSamson Apr 24, 2022
b1205fb
Merge branch 'jupyter-lsp:master' into master
FlyingSamson Jun 5, 2022
032ea35
Merge branch 'master' into tcp_mode
FlyingSamson Jun 5, 2022
e83b716
Fix occasionally occurring race condition causing an exception
FlyingSamson Jun 27, 2022
3417de4
Merge branch 'jupyter-lsp:master' into master
FlyingSamson Jun 27, 2022
6a9ff59
Merge branch 'master' into tcp_mode
FlyingSamson Jun 27, 2022
f492327
Reapply mypy fixes
FlyingSamson Jun 27, 2022
8444ec6
Remove old synchronous code from Reader and Writer
FlyingSamson Jul 3, 2022
341f810
Remove extraneous cancel scope in Session
FlyingSamson Jul 3, 2022
1994870
Switch from Tornado Queues to anyio MemoryObjectStreams
FlyingSamson Jul 4, 2022
3e02246
Fix mypy error caused by Optional return value
FlyingSamson Jul 4, 2022
efe22b4
Add units (seconds) to stop_timeout
FlyingSamson Jul 8, 2022
bdbc4ac
Encode unbounded queue with size -1
FlyingSamson Jul 8, 2022
c019eb2
Merge branch 'master' into tcp_mode
krassowski Dec 28, 2022
3451099
Merge branch 'master' into HEAD
krassowski Dec 28, 2022
a0f6937
Add missing `await` in `test_stop`
krassowski Dec 31, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -386,8 +386,11 @@ otherwise an empty dictionary (`{}`) should be returned.
##### Common Concerns

- some language servers need to have their connection mode specified
- the `stdio` interface is the only one supported by `jupyter_lsp`
- PRs welcome to support other modes!
- `jupyter_lsp` currently supports the `stdio` and `tcp` interface
- the mode used by `jupyter_lsp` to connect to the language server can be specified by including `mode="stdio"` or `mode="tcp"` in the language server `spec`-dictionary
- currently it is not possible to connect to externally running language servers via tcp, but only to servers spawned by `jupyter_lsp` as given by the `argv` specs entry
- PRs welcome to support externally running language servers!
- use the placeholder `{port}` within the `argv` entry to allow `jupyter_lsp` to specify the port on which to launch the language server
- because of its VSCode heritage, many language servers use `nodejs`
- `LanguageServerManager.nodejs` will provide the location of our best
guess at where a user's `nodejs` might be found
Expand Down
2 changes: 1 addition & 1 deletion atest/ports.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def get_unused_port():
Probably could introduce race conditions if inside a tight loop.
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(("localhost", 0))
sock.bind(("127.0.0.1", 0))
sock.listen(1)
port = sock.getsockname()[1]
sock.close()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
""" Language Server stdio-mode readers
""" Language Server readers and writers

Parts of this code are derived from:

Expand All @@ -7,32 +7,32 @@
> > MIT License https://github.com/palantir/python-jsonrpc-server/blob/0.2.0/LICENSE
> > Copyright 2018 Palantir Technologies, Inc.
"""
# pylint: disable=broad-except
import asyncio
import io
import os
from concurrent.futures import ThreadPoolExecutor
from abc import ABC, ABCMeta, abstractmethod
from typing import List, Optional, Text

from tornado.concurrent import run_on_executor
# pylint: disable=broad-except
import anyio
from anyio.streams.buffered import BufferedByteReceiveStream
from anyio.streams.text import TextSendStream
from tornado.gen import convert_yielded
from tornado.httputil import HTTPHeaders
from tornado.ioloop import IOLoop
from tornado.queues import Queue
from traitlets import Float, Instance, default
from traitlets import Float, Instance, Int, default
from traitlets.config import LoggingConfigurable
from traitlets.traitlets import MetaHasTraits

from .non_blocking import make_non_blocking

class LspStreamMeta(MetaHasTraits, ABCMeta):
pass

class LspStdIoBase(LoggingConfigurable):
"""Non-blocking, queued base for communicating with stdio Language Servers"""

executor = None
class LspStreamBase(LoggingConfigurable, ABC, metaclass=LspStreamMeta):
"""Non-blocking, queued base for communicating with Language Servers through anyio
streams
"""

stream = Instance(
io.RawIOBase, help="the stream to read/write"
) # type: io.RawIOBase
queue = Instance(Queue, help="queue to get/put")

def __repr__(self): # pragma: no cover
Expand All @@ -41,15 +41,14 @@ def __repr__(self): # pragma: no cover
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.log.debug("%s initialized", self)
self.executor = ThreadPoolExecutor(max_workers=1)

def close(self):
self.stream.close()
self.log.debug("%s closed", self)
@abstractmethod
async def close(self):
pass # pragma: no cover


class LspStdIoReader(LspStdIoBase):
"""Language Server stdio Reader
class LspStreamReader(LspStreamBase):
"""Language Server Reader

Because non-blocking (but still synchronous) IO is used, rudimentary
exponential backoff is used.
Expand All @@ -58,18 +57,34 @@ class LspStdIoReader(LspStdIoBase):
max_wait = Float(help="maximum time to wait on idle stream").tag(config=True)
min_wait = Float(0.05, help="minimum time to wait on idle stream").tag(config=True)
next_wait = Float(0.05, help="next time to wait on idle stream").tag(config=True)
receive_max_bytes = Int(
65536,
help="the maximum size a header line send by the language server may have",
).tag(config=True)

stream = Instance(
BufferedByteReceiveStream, help="the stream to read from"
) # type: BufferedByteReceiveStream

def __init__(self, stream: anyio.abc.ByteReceiveStream, **kwargs):
super().__init__(**kwargs)
self.stream = BufferedByteReceiveStream(stream)

async def close(self):
await self.stream.aclose()
self.log.debug("%s closed", self)

@default("max_wait")
def _default_max_wait(self):
return 0.1 if os.name == "nt" else self.min_wait * 2

async def sleep(self):
"""Simple exponential backoff for sleeping"""
if self.stream.closed: # pragma: no cover
if self.stream._closed: # pragma: no cover
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this side effect really necessary? I think we only ever call this method more or less immediately after a read operation was performed, at which time the stream was alive then. That is, hardly any time will have gone by between then and the time we entered this function which will as first action recheck that the stream is alive, which isn't even a concern in the remainder of that function.

Just asking because it seems rather complicated to produce future-proof code that will check wether the stream is alive. The used _closed property is undocumented and might go away in future versions of anyio. Another option would be a receive_exactly(0) within a try IncompleteRead except ..., however the border case of 0 bytes to read also seems risky, as receive_exaclty might or might not decide in future version whether 0 bytes can safely read by a closed steam (namely returning empty bytes-object) or not (namely throwing IncompleteRead

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure why it is here. I imagine it could be useful if it was down where the sleep is called. It would be fine to remove it and wrap the entire sleep() call which is inside _read_content() in a try-except instead, just to be safe.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But which exception would this catch? I mean sleep then only consists of computing the minimum of two floats and a call to anyio.sleep() which as far as I can tell should not throw, especially not as a consequence of the stream being closed.

In _read_content, after the call to sleep is done it will try to receive another part of the message, but that part already is guarded by a try ... catch block capturing the case that during the sleep the stream was closed.

return
self.next_wait = min(self.next_wait * 2, self.max_wait)
try:
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need that try ... except ... part here? As far as I can see, neither the previous asyncio.sleep nor the new anyio.sleep throw.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, it does not seem useful here.

await asyncio.sleep(self.next_wait)
await anyio.sleep(self.next_wait)
except Exception: # pragma: no cover
pass

Expand All @@ -79,9 +94,7 @@ def wake(self):

async def read(self) -> None:
"""Read from a Language Server until it is closed"""
make_non_blocking(self.stream)

while not self.stream.closed:
while True:
message = None
try:
message = await self.read_one()
Expand All @@ -93,6 +106,10 @@ async def read(self) -> None:
self.wake()

IOLoop.current().add_callback(self.queue.put_nowait, message)
except anyio.ClosedResourceError:
# stream was closed -> terminate
self.log.debug("Stream closed while a read was still in progress")
break
except Exception as e: # pragma: no cover
self.log.exception(
"%s couldn't enqueue message: %s (%s)", self, message, e
Expand Down Expand Up @@ -124,10 +141,10 @@ async def _read_content(
while received_size < length and len(raw_parts) < max_parts and max_empties > 0:
part = None
try:
part = self.stream.read(length - received_size)
except OSError: # pragma: no cover
part = await self.stream.receive_exactly(length - received_size)
except anyio.IncompleteRead: # pragma: no cover
pass
if part is None:
if part is None: # pragma: no cover
max_empties -= 1
await self.sleep()
continue
Expand Down Expand Up @@ -171,32 +188,60 @@ async def read_one(self) -> Text:

return message

@run_on_executor
def _readline(self) -> Text:
async def _readline(self) -> Text:
"""Read a line (or immediately return None)"""
try:
return self.stream.readline().decode("utf-8").strip()
except OSError: # pragma: no cover
# use same max_bytes as is default for receive for now. It seems there is no
# way of getting the bytes read until max_bytes is reached, so we cannot
# iterate the receive_until call with smaller max_bytes values
async with anyio.move_on_after(0.2):
line = await self.stream.receive_until(b"\r\n", self.receive_max_bytes)
return line.decode("utf-8").strip()
except anyio.IncompleteRead:
# resource has been closed before the requested bytes could be retrieved
# -> signal recource closed
raise anyio.ClosedResourceError
except anyio.DelimiterNotFound: # pragma: no cover
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the next review round: I added # pragma: no cover in some places that either should never be called in the way the functions are used within the rest of the code, or seemed rather hard to test for. I tried to orient myself at places where this was already used, but was not sure whether there is any policy saying at which point I'm allowed to use this and at which point I'm not. So if you think any of those are inappropriate and think I should add a test for them just say a word (ideally with an idea on how to test for that part).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's absolutely fine!

self.log.error(
"Readline hit max_bytes before newline character was encountered"
)
return ""


class LspStdIoWriter(LspStdIoBase):
"""Language Server stdio Writer"""
class LspStreamWriter(LspStreamBase):
"""Language Server Writer"""

stream = Instance(
TextSendStream, help="the stream to write to"
) # type: TextSendStream

def __init__(self, stream: anyio.abc.ByteSendStream, **kwargs):
super().__init__(**kwargs)
self.stream = TextSendStream(stream, encoding="utf-8")

async def close(self):
await self.stream.aclose()
self.log.debug("%s closed", self)

async def write(self) -> None:
"""Write to a Language Server until it closes"""
while not self.stream.closed:
while True:
message = await self.queue.get()
try:
body = message.encode("utf-8")
response = "Content-Length: {}\r\n\r\n{}".format(len(body), message)
await convert_yielded(self._write_one(response.encode("utf-8")))
n_bytes = len(message.encode("utf-8"))
response = "Content-Length: {}\r\n\r\n{}".format(n_bytes, message)
await convert_yielded(self._write_one(response))
except (
anyio.ClosedResourceError,
anyio.BrokenResourceError,
): # pragma: no cover
# stream was closed -> terminate
self.log.debug("Stream closed while a write was still in progress")
break
except Exception: # pragma: no cover
self.log.exception("%s couldn't write message: %s", self, response)
finally:
self.queue.task_done()

@run_on_executor
def _write_one(self, message) -> None:
self.stream.write(message)
self.stream.flush()
async def _write_one(self, message) -> None:
await self.stream.send(message)
30 changes: 23 additions & 7 deletions python_packages/jupyter_lsp/jupyter_lsp/manager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
""" A configurable frontend for stdio-based Language Servers
""" A configurable frontend for stream-based Language Servers
"""
import os
import traceback
Expand All @@ -22,7 +22,11 @@
EP_SPEC_V1,
)
from .schema import LANGUAGE_SERVER_SPEC_MAP
from .session import LanguageServerSession
from .session import (
LanguageServerSessionBase,
LanguageServerSessionStdio,
LanguageServerSessionTCP,
)
from .trait_types import LoadableCallable, Schema
from .types import (
KeyedLanguageServerSpecs,
Expand Down Expand Up @@ -55,10 +59,10 @@ class LanguageServerManager(LanguageServerManagerAPI):
) # type: bool

sessions = Dict_(
trait=Instance(LanguageServerSession),
trait=Instance(LanguageServerSessionBase),
default_value={},
help="sessions keyed by language server name",
) # type: Dict[Tuple[Text], LanguageServerSession]
) # type: Dict[Tuple[Text], LanguageServerSessionBase]

virtual_documents_dir = Unicode(
help="""Path to virtual documents relative to the content manager root
Expand Down Expand Up @@ -137,9 +141,21 @@ def init_sessions(self):
"""create, but do not initialize all sessions"""
sessions = {}
for language_server, spec in self.language_servers.items():
sessions[language_server] = LanguageServerSession(
language_server=language_server, spec=spec, parent=self
)
mode = spec.get("mode", "stdio")
if mode == "stdio":
sessions[language_server] = LanguageServerSessionStdio(
language_server=language_server, spec=spec, parent=self
)
elif mode == "tcp":
sessions[language_server] = LanguageServerSessionTCP(
language_server=language_server, spec=spec, parent=self
)
else: # pragma: no cover
raise ValueError(
"Unknown session mode {} for language server '{}'".format(
mode, language_server
)
)
self.sessions = sessions

def init_listeners(self):
Expand Down
45 changes: 0 additions & 45 deletions python_packages/jupyter_lsp/jupyter_lsp/non_blocking.py

This file was deleted.

7 changes: 7 additions & 0 deletions python_packages/jupyter_lsp/jupyter_lsp/schema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,13 @@
"description": "list of MIME types supported by the language server",
"title": "MIME Types"
},
"mode": {
"description": "connection mode used, e.g. stdio (default), tcp",
"title": "Mode",
"type": "string",
"enum": ["stdio", "tcp"],
"default": "stdio"
},
"troubleshoot": {
"type": "string",
"description": "information on troubleshooting the installation or auto-detection of the language server",
Expand Down
Loading