From 74d0e292b8d637f168c51c6f655813af023df758 Mon Sep 17 00:00:00 2001 From: "Patrick J. McNerthney" Date: Sun, 23 Aug 2020 13:34:41 -1000 Subject: [PATCH 1/7] Implement port forwarding. --- stream/__init__.py | 2 +- stream/stream.py | 8 ++- stream/ws_client.py | 172 +++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 178 insertions(+), 4 deletions(-) diff --git a/stream/__init__.py b/stream/__init__.py index e72d0583..cd346528 100644 --- a/stream/__init__.py +++ b/stream/__init__.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .stream import stream +from .stream import stream, portforward diff --git a/stream/stream.py b/stream/stream.py index 9bb59017..57bac758 100644 --- a/stream/stream.py +++ b/stream/stream.py @@ -17,9 +17,12 @@ from . import ws_client -def _websocket_reqeust(websocket_request, api_method, *args, **kwargs): +def _websocket_reqeust(websocket_request, force_kwargs, api_method, *args, **kwargs): """Override the ApiClient.request method with an alternative websocket based method and call the supplied Kubernetes API method with that in place.""" + if force_kwargs: + for kwarg, value in force_kwargs.items(): + kwargs[kwarg] = value api_client = api_method.__self__.api_client # old generated code's api client has config. new ones has configuration try: @@ -34,4 +37,5 @@ def _websocket_reqeust(websocket_request, api_method, *args, **kwargs): api_client.request = prev_request -stream = functools.partial(_websocket_reqeust, ws_client.websocket_call) +stream = functools.partial(_websocket_reqeust, ws_client.websocket_call, None) +portforward = functools.partial(_websocket_reqeust, ws_client.portforward_call, {'_preload_content':False}) diff --git a/stream/ws_client.py b/stream/ws_client.py index fa7f393e..69274d55 100644 --- a/stream/ws_client.py +++ b/stream/ws_client.py @@ -12,12 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -from kubernetes.client.rest import ApiException +from kubernetes.client.rest import ApiException, ApiValueError import certifi import collections import select +import socket import ssl +import threading import time import six @@ -225,6 +227,143 @@ def close(self, **kwargs): WSResponse = collections.namedtuple('WSResponse', ['data']) +class PortForward: + def __init__(self, websocket, ports): + """A websocket client with support for port forwarding. + + Port Forward command sends on 2 channels per port, a read/write + data channel and a read only error channel. Both channels are sent an + initial frame contaning the port number that channel is associated with. + """ + + self.websocket = websocket + self.ports = {} + for ix, port_number in enumerate(ports): + self.ports[port_number] = self._Port(ix, port_number) + threading.Thread( + name="Kubernetes port forward proxy", target=self._proxy, daemon=True + ).start() + + def socket(self, port_number): + if port_number not in self.ports: + raise ValueError("Invalid port number") + return self.ports[port_number].socket + + def error(self, port_number): + if port_number not in self.ports: + raise ValueError("Invalid port number") + return self.ports[port_number].error + + def close(self): + for port in self.ports.values(): + port.socket.close() + + class _Port: + def __init__(self, ix, number): + self.number = number + self.channel = bytes([ix * 2]) + s, self.python = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM) + self.socket = self._Socket(s) + self.data = b'' + self.error = None + + class _Socket: + def __init__(self, socket): + self._socket = socket + + def __getattr__(self, name): + return getattr(self._socket, name) + + def setsockopt(self, level, optname, value): + # The following socket option is not valid with a socket created from socketpair, + # and is set when creating an SSLSocket from this socket. + if level == socket.IPPROTO_TCP and optname == socket.TCP_NODELAY: + return + self._socket.setsockopt(level, optname, value) + + # Proxy all socket data between the python code and the kubernetes websocket. + def _proxy(self): + channel_ports = [] + channel_initialized = [] + python_ports = {} + rlist = [] + for port in self.ports.values(): + channel_ports.append(port) + channel_initialized.append(False) + channel_ports.append(port) + channel_initialized.append(False) + python_ports[port.python] = port + rlist.append(port.python) + rlist.append(self.websocket.sock) + kubernetes_data = b'' + while True: + wlist = [] + for port in self.ports.values(): + if port.data: + wlist.append(port.python) + if kubernetes_data: + wlist.append(self.websocket.sock) + r, w, _ = select.select(rlist, wlist, []) + for s in w: + if s == self.websocket.sock: + sent = self.websocket.sock.send(kubernetes_data) + kubernetes_data = kubernetes_data[sent:] + else: + port = python_ports[s] + sent = port.python.send(port.data) + port.data = port.data[sent:] + for s in r: + if s == self.websocket.sock: + opcode, frame = self.websocket.recv_data_frame(True) + if opcode == ABNF.OPCODE_CLOSE: + for port in self.ports.values(): + port.python.close() + return + if opcode == ABNF.OPCODE_BINARY: + if not frame.data: + raise RuntimeError("Unexpected frame data size") + channel = frame.data[0] + if channel >= len(channel_ports): + raise RuntimeError("Unexpected channel number: " + str(channel)) + port = channel_ports[channel] + if channel_initialized[channel]: + if channel % 2: + port.error = frame.data[1:].decode() + if port.python in rlist: + port.python.close() + rlist.remove(port.python) + port.data = b'' + else: + port.data += frame.data[1:] + else: + if len(frame.data) != 3: + raise RuntimeError( + "Unexpected initial channel frame data size" + ) + port_number = frame.data[1] + (frame.data[2] * 256) + if port_number != port.number: + raise RuntimeError( + "Unexpected port number in initial channel frame: " + str(port_number) + ) + channel_initialized[channel] = True + elif opcode not in (ABNF.OPCODE_PING, ABNF.OPCODE_PONG): + raise RuntimeError("Unexpected websocket opcode: " + str(opcode)) + else: + port = python_ports[s] + data = port.python.recv(1024 * 1024) + if data: + kubernetes_data += ABNF.create_frame( + port.channel + data, + ABNF.OPCODE_BINARY, + ).format() + else: + port.python.close() + rlist.remove(s) + if len(rlist) == 1: + self.websocket.close() + return + + def get_websocket_url(url, query_params=None): parsed_url = urlparse(url) parts = list(parsed_url) @@ -302,3 +441,34 @@ def websocket_call(configuration, _method, url, **kwargs): return WSResponse('%s' % ''.join(client.read_all())) except (Exception, KeyboardInterrupt, SystemExit) as e: raise ApiException(status=0, reason=str(e)) + + +def portforward_call(configuration, _method, url, **kwargs): + """An internal function to be called in api-client when a websocket + connection is required for port forwarding. args and kwargs are the + parameters of apiClient.request method.""" + + query_params = kwargs.get("query_params") + + ports = [] + for key, value in query_params: + if key == 'ports': + for port in value.split(','): + try: + port = int(port) + if not (0 < port < 65536): + raise ValueError + ports.append(port) + except ValueError: + raise ApiValueError("Invalid port number `" + str(port) + "`") + if not ports: + raise ApiValueError("Missing required parameter `ports`") + + url = get_websocket_url(url, query_params) + headers = kwargs.get("headers") + + try: + websocket = create_websocket(configuration, url, headers) + return PortForward(websocket, ports) + except (Exception, KeyboardInterrupt, SystemExit) as e: + raise ApiException(status=0, reason=str(e)) From cc9ae10549db26dd1391de55f0da2f4946de4ad7 Mon Sep 17 00:00:00 2001 From: "Patrick J. McNerthney" Date: Mon, 31 Aug 2020 15:53:59 -1000 Subject: [PATCH 2/7] Address the following PR issues: * Rename `_Port.error` to be `_Port.error_channel`. * Correct comment about where setsockopt is being called. * Add comments clarifying why the double call to the same methods to setup channel information. * Allow for ports specified with both local and remote port numbers. --- stream/ws_client.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/stream/ws_client.py b/stream/ws_client.py index 69274d55..5decad80 100644 --- a/stream/ws_client.py +++ b/stream/ws_client.py @@ -249,7 +249,7 @@ def socket(self, port_number): raise ValueError("Invalid port number") return self.ports[port_number].socket - def error(self, port_number): + def error_channel(self, port_number): if port_number not in self.ports: raise ValueError("Invalid port number") return self.ports[port_number].error @@ -276,7 +276,7 @@ def __getattr__(self, name): def setsockopt(self, level, optname, value): # The following socket option is not valid with a socket created from socketpair, - # and is set when creating an SSLSocket from this socket. + # and is set by the http.client.HTTPConnection.connect method. if level == socket.IPPROTO_TCP and optname == socket.TCP_NODELAY: return self._socket.setsockopt(level, optname, value) @@ -288,8 +288,10 @@ def _proxy(self): python_ports = {} rlist = [] for port in self.ports.values(): + # Setup the data channel for this port number channel_ports.append(port) channel_initialized.append(False) + # Setup the error channel for this port number channel_ports.append(port) channel_initialized.append(False) python_ports[port.python] = port @@ -455,7 +457,8 @@ def portforward_call(configuration, _method, url, **kwargs): if key == 'ports': for port in value.split(','): try: - port = int(port) + # The last specified port is the remote port + port = int(port.split(':')[-1]) if not (0 < port < 65536): raise ValueError ports.append(port) From 72e372599d68c4e268512c4085ac9e2e13368ae2 Mon Sep 17 00:00:00 2001 From: "Patrick J. McNerthney" Date: Tue, 1 Sep 2020 18:33:33 -1000 Subject: [PATCH 3/7] Rework the parsing of the requested ports to support both a local port and a remote port. --- stream/ws_client.py | 77 ++++++++++++++++++++++++++++----------------- 1 file changed, 48 insertions(+), 29 deletions(-) diff --git a/stream/ws_client.py b/stream/ws_client.py index 5decad80..971ab6b4 100644 --- a/stream/ws_client.py +++ b/stream/ws_client.py @@ -237,30 +237,30 @@ def __init__(self, websocket, ports): """ self.websocket = websocket - self.ports = {} - for ix, port_number in enumerate(ports): - self.ports[port_number] = self._Port(ix, port_number) + self.local_ports = {} + for ix, local_remote in enumerate(ports): + self.local_ports[local_remote[0]] = self._Port(ix, local_remote[1]) threading.Thread( name="Kubernetes port forward proxy", target=self._proxy, daemon=True ).start() - def socket(self, port_number): - if port_number not in self.ports: + def socket(self, local_number): + if local_number not in self.local_ports: raise ValueError("Invalid port number") - return self.ports[port_number].socket + return self.local_ports[local_number].socket - def error_channel(self, port_number): - if port_number not in self.ports: + def error(self, local_number): + if local_number not in self.local_ports: raise ValueError("Invalid port number") - return self.ports[port_number].error + return self.local_ports[local_number].error def close(self): - for port in self.ports.values(): + for port in self.local_ports.values(): port.socket.close() class _Port: - def __init__(self, ix, number): - self.number = number + def __init__(self, ix, remote_number): + self.remote_number = remote_number self.channel = bytes([ix * 2]) s, self.python = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM) self.socket = self._Socket(s) @@ -287,7 +287,7 @@ def _proxy(self): channel_initialized = [] python_ports = {} rlist = [] - for port in self.ports.values(): + for port in self.local_ports.values(): # Setup the data channel for this port number channel_ports.append(port) channel_initialized.append(False) @@ -300,7 +300,7 @@ def _proxy(self): kubernetes_data = b'' while True: wlist = [] - for port in self.ports.values(): + for port in self.local_ports.values(): if port.data: wlist.append(port.python) if kubernetes_data: @@ -318,7 +318,7 @@ def _proxy(self): if s == self.websocket.sock: opcode, frame = self.websocket.recv_data_frame(True) if opcode == ABNF.OPCODE_CLOSE: - for port in self.ports.values(): + for port in self.local_ports.values(): port.python.close() return if opcode == ABNF.OPCODE_BINARY: @@ -330,11 +330,9 @@ def _proxy(self): port = channel_ports[channel] if channel_initialized[channel]: if channel % 2: - port.error = frame.data[1:].decode() - if port.python in rlist: - port.python.close() - rlist.remove(port.python) - port.data = b'' + if port.error is None: + port.error = '' + port.error += frame.data[1:].decode() else: port.data += frame.data[1:] else: @@ -343,7 +341,7 @@ def _proxy(self): "Unexpected initial channel frame data size" ) port_number = frame.data[1] + (frame.data[2] * 256) - if port_number != port.number: + if port_number != port.remote_number: raise RuntimeError( "Unexpected port number in initial channel frame: " + str(port_number) ) @@ -453,17 +451,38 @@ def portforward_call(configuration, _method, url, **kwargs): query_params = kwargs.get("query_params") ports = [] - for key, value in query_params: - if key == 'ports': - for port in value.split(','): + for ix in range(len(query_params)): + if query_params[ix][0] == 'ports': + remote_ports = [] + for port in query_params[ix][1].split(','): try: - # The last specified port is the remote port - port = int(port.split(':')[-1]) - if not (0 < port < 65536): + local_remote = port.split(':') + if len(local_remote) > 2: raise ValueError - ports.append(port) + if len(local_remote) == 1: + local_remote[0] = int(local_remote[0]) + if not (0 < local_remote[0] < 65536): + raise ValueError + local_remote.append(local_remote[0]) + elif len(local_remote) == 2: + if local_remote[0]: + local_remote[0] = int(local_remote[0]) + if not (0 <= local_remote[0] < 65536): + raise ValueError + else: + local_remote[0] = 0 + local_remote[1] = int(local_remote[1]) + if not (0 < local_remote[1] < 65536): + raise ValueError + if not local_remote[0]: + local_remote[0] = len(ports) + 1 + else: + raise ValueError + ports.append(local_remote) + remote_ports.append(str(local_remote[1])) except ValueError: - raise ApiValueError("Invalid port number `" + str(port) + "`") + raise ApiValueError("Invalid port number `" + port + "`") + query_params[ix] = ('ports', ','.join(remote_ports)) if not ports: raise ApiValueError("Missing required parameter `ports`") From 7bf04b384b8cfcdba6387cf61e1cd9d6052669ee Mon Sep 17 00:00:00 2001 From: "Patrick J. McNerthney" Date: Sun, 6 Sep 2020 09:25:58 -1000 Subject: [PATCH 4/7] Rework how the PortForward._proxy thread determines when and how to terminate. --- stream/ws_client.py | 151 +++++++++++++++++++++++--------------------- 1 file changed, 78 insertions(+), 73 deletions(-) diff --git a/stream/ws_client.py b/stream/ws_client.py index 971ab6b4..fafba79a 100644 --- a/stream/ws_client.py +++ b/stream/ws_client.py @@ -238,33 +238,51 @@ def __init__(self, websocket, ports): self.websocket = websocket self.local_ports = {} - for ix, local_remote in enumerate(ports): - self.local_ports[local_remote[0]] = self._Port(ix, local_remote[1]) + for ix, port_number in enumerate(ports): + self.local_ports[port_number] = self._Port(ix, port_number) + # There is a thread run per PortForward instance which performs the translation between the + # raw socket data sent by the python application and the websocket protocol. This thread + # terminates after either side has closed all ports, and after flushing all pending data. threading.Thread( - name="Kubernetes port forward proxy", target=self._proxy, daemon=True + name="Kubernetes port forward proxy: %s" % ', '.join([str(port) for port in ports]), + target=self._proxy, + daemon=True ).start() - def socket(self, local_number): - if local_number not in self.local_ports: + def socket(self, port_number): + if port_number not in self.local_ports: raise ValueError("Invalid port number") - return self.local_ports[local_number].socket + return self.local_ports[port_number].socket - def error(self, local_number): - if local_number not in self.local_ports: + def error(self, port_number): + if port_number not in self.local_ports: raise ValueError("Invalid port number") - return self.local_ports[local_number].error + return self.local_ports[port_number].error def close(self): for port in self.local_ports.values(): port.socket.close() class _Port: - def __init__(self, ix, remote_number): - self.remote_number = remote_number + def __init__(self, ix, port_number): + # The remote port number + self.port_number = port_number + # The websocket channel byte number for this port self.channel = bytes([ix * 2]) + # A socket pair is created to provide a means of translating the data flow + # between the python application and the kubernetes websocket. The self.python + # half of the socket pair is used by the _proxy method to receive and send data + # to the running python application. s, self.python = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM) + # The self.socket half of the pair is used by the python application to send + # and receive data to the eventual pod port. It is wrapped in the _Socket class + # because a socket pair is an AF_UNIX socket, not a AF_NET socket. This allows + # intercepting setting AF_INET socket options that would error against an AD_UNIX + # socket. self.socket = self._Socket(s) + # Data accumulated from the websocket to be sent to the python application. self.data = b'' + # All data sent from kubernetes on the port error channel. self.error = None class _Socket: @@ -285,8 +303,7 @@ def setsockopt(self, level, optname, value): def _proxy(self): channel_ports = [] channel_initialized = [] - python_ports = {} - rlist = [] + local_ports = {} for port in self.local_ports.values(): # Setup the data channel for this port number channel_ports.append(port) @@ -294,33 +311,36 @@ def _proxy(self): # Setup the error channel for this port number channel_ports.append(port) channel_initialized.append(False) - python_ports[port.python] = port - rlist.append(port.python) - rlist.append(self.websocket.sock) + port.python.setblocking(True) + local_ports[port.python] = port + # The data to send on the websocket socket kubernetes_data = b'' while True: - wlist = [] + rlist = [] # List of sockets to read from + wlist = [] # List of sockets to write to + if self.websocket.connected: + rlist.append(self.websocket) + if kubernetes_data: + wlist.append(self.websocket) + all_closed = True for port in self.local_ports.values(): - if port.data: - wlist.append(port.python) - if kubernetes_data: - wlist.append(self.websocket.sock) + if port.python.fileno() != -1: + if port.data: + wlist.append(port.python) + all_closed = False + else: + if self.websocket.connected: + rlist.append(port.python) + all_closed = False + else: + port.python.close() + if all_closed and (not self.websocket.connected or not kubernetes_data): + self.websocket.close() + return r, w, _ = select.select(rlist, wlist, []) - for s in w: - if s == self.websocket.sock: - sent = self.websocket.sock.send(kubernetes_data) - kubernetes_data = kubernetes_data[sent:] - else: - port = python_ports[s] - sent = port.python.send(port.data) - port.data = port.data[sent:] - for s in r: - if s == self.websocket.sock: + for sock in r: + if sock == self.websocket: opcode, frame = self.websocket.recv_data_frame(True) - if opcode == ABNF.OPCODE_CLOSE: - for port in self.local_ports.values(): - port.python.close() - return if opcode == ABNF.OPCODE_BINARY: if not frame.data: raise RuntimeError("Unexpected frame data size") @@ -341,15 +361,15 @@ def _proxy(self): "Unexpected initial channel frame data size" ) port_number = frame.data[1] + (frame.data[2] * 256) - if port_number != port.remote_number: + if port_number != port.port_number: raise RuntimeError( "Unexpected port number in initial channel frame: " + str(port_number) ) channel_initialized[channel] = True - elif opcode not in (ABNF.OPCODE_PING, ABNF.OPCODE_PONG): + elif opcode not in (ABNF.OPCODE_PING, ABNF.OPCODE_PONG, ABNF.OPCODE_CLOSE): raise RuntimeError("Unexpected websocket opcode: " + str(opcode)) else: - port = python_ports[s] + port = local_ports[sock] data = port.python.recv(1024 * 1024) if data: kubernetes_data += ABNF.create_frame( @@ -357,11 +377,16 @@ def _proxy(self): ABNF.OPCODE_BINARY, ).format() else: - port.python.close() - rlist.remove(s) - if len(rlist) == 1: - self.websocket.close() - return + if not port.data: + port.python.close() + for sock in w: + if sock == self.websocket: + sent = self.websocket.sock.send(kubernetes_data) + kubernetes_data = kubernetes_data[sent:] + else: + port = local_ports[sock] + sent = port.python.send(port.data) + port.data = port.data[sent:] def get_websocket_url(url, query_params=None): @@ -451,38 +476,18 @@ def portforward_call(configuration, _method, url, **kwargs): query_params = kwargs.get("query_params") ports = [] - for ix in range(len(query_params)): - if query_params[ix][0] == 'ports': - remote_ports = [] - for port in query_params[ix][1].split(','): + for param, value in query_params: + if param == 'ports': + for port in value.split(','): try: - local_remote = port.split(':') - if len(local_remote) > 2: - raise ValueError - if len(local_remote) == 1: - local_remote[0] = int(local_remote[0]) - if not (0 < local_remote[0] < 65536): - raise ValueError - local_remote.append(local_remote[0]) - elif len(local_remote) == 2: - if local_remote[0]: - local_remote[0] = int(local_remote[0]) - if not (0 <= local_remote[0] < 65536): - raise ValueError - else: - local_remote[0] = 0 - local_remote[1] = int(local_remote[1]) - if not (0 < local_remote[1] < 65536): - raise ValueError - if not local_remote[0]: - local_remote[0] = len(ports) + 1 - else: - raise ValueError - ports.append(local_remote) - remote_ports.append(str(local_remote[1])) + port_number = int(port) except ValueError: - raise ApiValueError("Invalid port number `" + port + "`") - query_params[ix] = ('ports', ','.join(remote_ports)) + raise ApiValueError("Invalid port number: %s" % port) + if not (0 < port_number < 65536): + raise ApiValueError("Port number must be between 0 and 65536: %s" % port) + if port_number in ports: + raise ApiValueError("Duplicate port numbers: %s" % port) + ports.append(port_number) if not ports: raise ApiValueError("Missing required parameter `ports`") From ce3a1a298a1c4d38dfd1e0d228b2eafff2e647a4 Mon Sep 17 00:00:00 2001 From: "Patrick J. McNerthney" Date: Mon, 7 Sep 2020 11:56:01 -1000 Subject: [PATCH 5/7] Rework loop which collects the local python sockets for read and writing. --- stream/ws_client.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/stream/ws_client.py b/stream/ws_client.py index fafba79a..b8204599 100644 --- a/stream/ws_client.py +++ b/stream/ws_client.py @@ -322,19 +322,21 @@ def _proxy(self): rlist.append(self.websocket) if kubernetes_data: wlist.append(self.websocket) - all_closed = True + local_all_closed = True for port in self.local_ports.values(): if port.python.fileno() != -1: - if port.data: - wlist.append(port.python) - all_closed = False + if self.websocket.connected: + rlist.append(port.python) + if port.data: + wlist.append(port.python) + local_all_closed = False else: - if self.websocket.connected: - rlist.append(port.python) - all_closed = False + if port.data: + wlist.append(port.python) + local_all_closed = False else: port.python.close() - if all_closed and (not self.websocket.connected or not kubernetes_data): + if local_all_closed and not (self.websocket.connected and kubernetes_data): self.websocket.close() return r, w, _ = select.select(rlist, wlist, []) From 2e86b713341faaf3309d22f7494b3c68a6a6e04e Mon Sep 17 00:00:00 2001 From: "Patrick J. McNerthney" Date: Mon, 7 Sep 2020 13:06:44 -1000 Subject: [PATCH 6/7] Better handling of error channel reponse, and comment typo. --- stream/ws_client.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/stream/ws_client.py b/stream/ws_client.py index b8204599..0f8dc327 100644 --- a/stream/ws_client.py +++ b/stream/ws_client.py @@ -249,6 +249,10 @@ def __init__(self, websocket, ports): daemon=True ).start() + @property + def connected(self): + return self.websocket.connected + def socket(self, port_number): if port_number not in self.local_ports: raise ValueError("Invalid port number") @@ -276,8 +280,8 @@ def __init__(self, ix, port_number): s, self.python = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM) # The self.socket half of the pair is used by the python application to send # and receive data to the eventual pod port. It is wrapped in the _Socket class - # because a socket pair is an AF_UNIX socket, not a AF_NET socket. This allows - # intercepting setting AF_INET socket options that would error against an AD_UNIX + # because a socket pair is an AF_UNIX socket, not a AF_INET socket. This allows + # intercepting setting AF_INET socket options that would error against an AF_UNIX # socket. self.socket = self._Socket(s) # Data accumulated from the websocket to be sent to the python application. @@ -325,17 +329,17 @@ def _proxy(self): local_all_closed = True for port in self.local_ports.values(): if port.python.fileno() != -1: - if self.websocket.connected: - rlist.append(port.python) - if port.data: - wlist.append(port.python) - local_all_closed = False - else: + if port.error or not self.websocket.connected: if port.data: wlist.append(port.python) local_all_closed = False else: port.python.close() + else: + rlist.append(port.python) + if port.data: + wlist.append(port.python) + local_all_closed = False if local_all_closed and not (self.websocket.connected and kubernetes_data): self.websocket.close() return From 5d39d0d5f0e077ea9d19a0f7d94383bed36f7a27 Mon Sep 17 00:00:00 2001 From: "Patrick J. McNerthney" Date: Mon, 7 Sep 2020 19:38:54 -1000 Subject: [PATCH 7/7] Support both python 2.7 and 3.x. --- stream/ws_client.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/stream/ws_client.py b/stream/ws_client.py index 0f8dc327..356440c8 100644 --- a/stream/ws_client.py +++ b/stream/ws_client.py @@ -243,11 +243,12 @@ def __init__(self, websocket, ports): # There is a thread run per PortForward instance which performs the translation between the # raw socket data sent by the python application and the websocket protocol. This thread # terminates after either side has closed all ports, and after flushing all pending data. - threading.Thread( + proxy = threading.Thread( name="Kubernetes port forward proxy: %s" % ', '.join([str(port) for port in ports]), - target=self._proxy, - daemon=True - ).start() + target=self._proxy + ) + proxy.daemon = True + proxy.start() @property def connected(self): @@ -272,7 +273,7 @@ def __init__(self, ix, port_number): # The remote port number self.port_number = port_number # The websocket channel byte number for this port - self.channel = bytes([ix * 2]) + self.channel = six.int2byte(ix * 2) # A socket pair is created to provide a means of translating the data flow # between the python application and the kubernetes websocket. The self.python # half of the socket pair is used by the _proxy method to receive and send data @@ -350,9 +351,9 @@ def _proxy(self): if opcode == ABNF.OPCODE_BINARY: if not frame.data: raise RuntimeError("Unexpected frame data size") - channel = frame.data[0] + channel = six.byte2int(frame.data) if channel >= len(channel_ports): - raise RuntimeError("Unexpected channel number: " + str(channel)) + raise RuntimeError("Unexpected channel number: %s" % channel) port = channel_ports[channel] if channel_initialized[channel]: if channel % 2: @@ -366,14 +367,14 @@ def _proxy(self): raise RuntimeError( "Unexpected initial channel frame data size" ) - port_number = frame.data[1] + (frame.data[2] * 256) + port_number = six.byte2int(frame.data[1:2]) + (six.byte2int(frame.data[2:3]) * 256) if port_number != port.port_number: raise RuntimeError( - "Unexpected port number in initial channel frame: " + str(port_number) + "Unexpected port number in initial channel frame: %s" % port_number ) channel_initialized[channel] = True elif opcode not in (ABNF.OPCODE_PING, ABNF.OPCODE_PONG, ABNF.OPCODE_CLOSE): - raise RuntimeError("Unexpected websocket opcode: " + str(opcode)) + raise RuntimeError("Unexpected websocket opcode: %s" % opcode) else: port = local_ports[sock] data = port.python.recv(1024 * 1024) @@ -383,8 +384,7 @@ def _proxy(self): ABNF.OPCODE_BINARY, ).format() else: - if not port.data: - port.python.close() + port.python.close() for sock in w: if sock == self.websocket: sent = self.websocket.sock.send(kubernetes_data)