Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support client config with tcp_port but without command #2300

Merged
merged 13 commits into from
Aug 15, 2023
17 changes: 10 additions & 7 deletions plugin/core/transports.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,23 +95,25 @@ def _decode(message: bytes) -> Dict[str, Any]:
class ProcessTransport(Transport[T]):

def __init__(self, name: str, process: subprocess.Popen, socket: Optional[socket.socket], reader: IO[bytes],
writer: IO[bytes], stderr: Optional[IO[bytes]], processor: AbstractProcessor[T],
writer: IO[bytes], processor: AbstractProcessor[T],
rchl marked this conversation as resolved.
Show resolved Hide resolved
rchl marked this conversation as resolved.
Show resolved Hide resolved
callback_object: TransportCallbacks[T]) -> None:
self._closed = False
self._process = process
self._socket = socket
self._reader = reader
self._writer = writer
self._stderr = stderr
self._processor = processor
self._reader_thread = threading.Thread(target=self._read_loop, name='{}-reader'.format(name))
self._writer_thread = threading.Thread(target=self._write_loop, name='{}-writer'.format(name))
self._stderr_thread = threading.Thread(target=self._stderr_loop, name='{}-stderr'.format(name))
self._callback_object = weakref.ref(callback_object)
self._send_queue = Queue(0) # type: Queue[Union[T, None]]
self._reader_thread.start()
self._writer_thread.start()
self._stderr_thread.start()

if process is not None:
self._stderr = process.stderr
self._stderr_thread = threading.Thread(target=self._stderr_loop, name='{}-stderr'.format(name))
self._stderr_thread.start()
rchl marked this conversation as resolved.
Show resolved Hide resolved

def send(self, payload: T) -> None:
self._send_queue.put_nowait(payload)
Expand Down Expand Up @@ -170,7 +172,7 @@ def _end(self, exception: Optional[Exception]) -> None:
exit_code = self._process.wait(1)
except (AttributeError, ProcessLookupError, subprocess.TimeoutExpired):
pass
if self._process.poll() is None:
if self._process is not None and self._process.poll() is None:
try:
# The process didn't stop itself. Terminate!
self._process.kill()
Expand Down Expand Up @@ -258,7 +260,8 @@ def start_subprocess() -> subprocess.Popen:
start_subprocess
)
else:
process = start_subprocess()
if config.command:
process = start_subprocess()
rchl marked this conversation as resolved.
Show resolved Hide resolved
if config.tcp_port:
sock = _connect_tcp(config.tcp_port)
if sock is None:
Expand All @@ -271,7 +274,7 @@ def start_subprocess() -> subprocess.Popen:
if not reader or not writer:
raise RuntimeError('Failed initializing transport: reader: {}, writer: {}'.format(reader, writer))
return ProcessTransport(
config.name, process, sock, reader, writer, process.stderr, json_rpc_processor, callback_object) # type: ignore
config.name, process, sock, reader, writer, json_rpc_processor, callback_object) # type: ignore


_subprocesses = weakref.WeakSet() # type: weakref.WeakSet[subprocess.Popen]
Expand Down