Skip to content

Commit

Permalink
Make RemoteKernelManager more independent
Browse files Browse the repository at this point in the history
Make RemoteKernelManager (RKM) able to be run without a (grand)parent
EnterpriseGatewayApp (EGA) instance so it can be used by nbclient.

The config which RKM with EGA was being referred to using traitlet
lineage like `self.parent.parent.property_name`. Change both to inherit
from a Configurable mixin which contains all of EGA's previous config.
Link the attributes of the RKM instance with EGA if available to keep
old behaviour.

Modify other properties RKM uses that are not traits to become
`@property`s which fall back to sane defaults if running independently.

Change RKM to be able to generate a kernel id if necessary. Change
ProcessProxy to use provided kernel ids. Move kernel id generation
logic out of RemoteMappingKernelManager.

Resolves #803
  • Loading branch information
golf-player committed May 20, 2020
1 parent c594cd9 commit e523fcb
Show file tree
Hide file tree
Showing 11 changed files with 574 additions and 464 deletions.
1 change: 1 addition & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ Jupyter Enterprise Gateway leverages local resource managers to distribute kerne
kernel-kubernetes
kernel-docker
kernel-conductor
kernel-library

.. toctree::
:maxdepth: 2
Expand Down
22 changes: 22 additions & 0 deletions docs/source/kernel-library.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
## Standalone Remote Kernel Execution
#####a.k.a "Library Mode"

Remote kernels can be executed by using the `RemoteKernelManager` class directly. This enables running kernels using `ProcessProxy`s without the Enterprise Gateway Webapp.

This can be useful in niche situations, for example, using [nbconvert](https://nbconvert.readthedocs.io/) or [nbclient](https://nbclient.readthedocs.io/) to execute a kernel on a remote cluster.

Sample code using nbclient 0.2.0:

```python
import nbformat
from nbclient import NotebookClient
from enterprise_gateway.services.kernels.remotemanager import RemoteKernelManager

with open("my_notebook.ipynb") as fp:
test_notebook = nbformat.read(fp, as_version=4)

client = NotebookClient(nb=test_notebook, kernel_manager_class=RemoteKernelManager)
client.execute()
```

The above code will execute the notebook on a kernel using the configured `ProcessProxy` (defaults to Kubernetes).
3 changes: 3 additions & 0 deletions docs/source/use-cases.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,6 @@ issues and manage network configurations that adhere to my corporate policy.

- **As an administrator**, I want to constrain the number of active kernels that each of my users can have at any
given time.

- **As a solution architect**, I want to easily integrate the ability to launch remote kernels with existing platforms,
so I can leverage my compute cluster in a customizable way.
392 changes: 5 additions & 387 deletions enterprise_gateway/enterprisegatewayapp.py

Large diffs are not rendered by default.

398 changes: 393 additions & 5 deletions enterprise_gateway/mixins.py

Large diffs are not rendered by default.

160 changes: 116 additions & 44 deletions enterprise_gateway/services/kernels/remotemanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
from ipython_genutils.importstring import import_item
from notebook.services.kernels.kernelmanager import MappingKernelManager
from jupyter_client.ioloop.manager import IOLoopKernelManager
from traitlets import directional_link, log as traitlets_log

from ..processproxies.processproxy import LocalProcessProxy, RemoteProcessProxy
from ..sessions.kernelsessionmanager import KernelSessionManager
from enterprise_gateway.mixins import EnterpriseGatewayConfigMixin


def get_process_proxy_config(kernelspec):
Expand Down Expand Up @@ -44,6 +46,46 @@ def get_process_proxy_config(kernelspec):
return {"class_name": "enterprise_gateway.services.processproxies.processproxy.LocalProcessProxy", "config": {}}


def new_kernel_id(**kwargs):
"""
This method provides a mechanism by which clients can specify a kernel's id. In this case
that mechanism is via the per-kernel environment variable: KERNEL_ID. If specified, its value
will be validated and returned, otherwise the result from the provided method is returned.
NOTE: This method exists in jupyter_client.multikernelmanager.py for releases > 5.2.3. If you
find that this method is not getting invoked, then you likely need to update the version of
jupyter_client. The Enterprise Gateway dependency will be updated once new releases of
jupyter_client are more prevalent.
Returns
-------
kernel_id : str
The uuid string to associate with the new kernel
"""
log = kwargs.pop("log", None) or traitlets_log.get_logger()
kernel_id_fn = kwargs.pop("kernel_id_fn", None) or (lambda: unicode_type(uuid.uuid4()))

env = kwargs.get('env')
if env and env.get('KERNEL_ID'): # If there's a KERNEL_ID in the env, check it out
# convert string back to UUID - validating string in the process.
str_kernel_id = env.get('KERNEL_ID')
try:
str_v4_kernel_id = str(uuid.UUID(str_kernel_id, version=4))
if str_kernel_id != str_v4_kernel_id: # Given string is not uuid v4 compliant
raise ValueError("value is not uuid v4 compliant")
except ValueError as ve:
log.error("Invalid v4 UUID value detected in ['env']['KERNEL_ID']: '{}'! Error: {}".
format(str_kernel_id, ve))
raise ve
# user-provided id is valid, use it
kernel_id = unicode_type(str_kernel_id)
log.debug("Using user-provided kernel_id: {}".format(kernel_id))
else:
kernel_id = kernel_id_fn(**kwargs)

return kernel_id


class RemoteMappingKernelManager(MappingKernelManager):
"""Extends the MappingKernelManager with support for managing remote kernels via the process-proxy. """

Expand Down Expand Up @@ -161,44 +203,12 @@ def start_kernel_from_session(self, kernel_id, kernel_name, connection_info, pro
return True

def new_kernel_id(self, **kwargs):
"""Determines the kernel_id to use for a new kernel.
This method provides a mechanism by which clients can specify a kernel's id. In this case
that mechanism is via the per-kernel environment variable: KERNEL_ID. If specified, its value
will be validated and returned, otherwise the result from the superclass method is returned.
NOTE: This method exists in jupyter_client.multikernelmanager.py for releases > 5.2.3. If you
find that this method is not getting invoked, then you likely need to update the version of
jupyter_client. The Enterprise Gateway dependency will be updated once new releases of
jupyter_client are more prevalent.
Returns
-------
kernel_id : str
The uuid string to associate with the new kernel
"""
env = kwargs.get('env')
if env and env.get('KERNEL_ID'): # If there's a KERNEL_ID in the env, check it out
# convert string back to UUID - validating string in the process.
str_kernel_id = env.get('KERNEL_ID')
try:
str_v4_kernel_id = str(uuid.UUID(str_kernel_id, version=4))
if str_kernel_id != str_v4_kernel_id: # Given string is not uuid v4 compliant
raise ValueError("value is not uuid v4 compliant")
except ValueError as ve:
self.log.error("Invalid v4 UUID value detected in ['env']['KERNEL_ID']: '{}'! Error: {}".
format(str_kernel_id, ve))
raise ve
# user-provided id is valid, use it
kernel_id = unicode_type(str_kernel_id)
self.log.debug("Using user-provided kernel_id: {}".format(kernel_id))
else:
kernel_id = super(RemoteMappingKernelManager, self).new_kernel_id(**kwargs)
"""Determines the kernel_id to use for a new kernel."""

return kernel_id
return new_kernel_id(kernel_id_fn=super(RemoteMappingKernelManager, self).new_kernel_id, log=self.log)


class RemoteKernelManager(IOLoopKernelManager):
class RemoteKernelManager(EnterpriseGatewayConfigMixin, IOLoopKernelManager):
"""Extends the IOLoopKernelManager used by the MappingKernelManager.
This class is responsible for detecting that a remote kernel is desired, then launching the
Expand All @@ -211,7 +221,6 @@ def __init__(self, **kwargs):
self.process_proxy = None
self.response_address = None
self.sigint_value = None
self.port_range = None
self.kernel_id = None
self.user_overrides = {}
self.restarting = False # need to track whether we're in a restart situation or not
Expand All @@ -224,6 +233,43 @@ def __init__(self, **kwargs):
if hasattr(self, "cache_ports"):
self.cache_ports = False

if not self.connection_file:
self.kernel_id = new_kernel_id(log=self.log)

self._link_dependent_props()

if self.kernel_spec_manager is None:
self.kernel_spec_manager = self.kernel_spec_manager_class(
parent=self,
)

def _link_dependent_props(self):
"""
Ensure that RemoteKernelManager, when used as part of an EnterpriseGatewayApp,
has certain necessary configuration stay in sync with the app's configuration.
When RemoteKernelManager is used independently, this function is a no-op, and
default values or configuration set on this class is used.
"""
try:
eg_instance = self.parent.parent
except AttributeError:
return
dependent_props = ["authorized_users",
"unauthorized_users",
"port_range",
"impersonation_enabled",
"max_kernels_per_user",
"env_whitelist",
"env_process_whitelist",
"yarn_endpoint",
"alt_yarn_endpoint",
"yarn_endpoint_security_enabled",
"conductor_endpoint",
"remote_hosts"
]
self._links = [directional_link((eg_instance, prop), (self, prop)) for prop in dependent_props]

def start_kernel(self, **kwargs):
"""Starts a kernel in a separate process.
Expand All @@ -248,8 +294,8 @@ def _capture_user_overrides(self, **kwargs):
env = kwargs.get('env', {})
self.user_overrides.update({key: value for key, value in env.items()
if key.startswith('KERNEL_') or
key in self.parent.parent.env_process_whitelist or
key in self.parent.parent.env_whitelist})
key in self.env_process_whitelist or
key in self.env_whitelist})

def format_kernel_cmd(self, extra_arguments=None):
""" Replace templated args (e.g. {response_address}, {port_range}, or {kernel_id}). """
Expand Down Expand Up @@ -325,26 +371,28 @@ def restart_kernel(self, now=False, **kwargs):
kernel.
"""
self.restarting = True
kernel_id = os.path.basename(self.connection_file).replace('kernel-', '').replace('.json', '')
kernel_id = self.kernel_id or os.path.basename(self.connection_file).replace('kernel-', '').replace('.json', '')
# Check if this is a remote process proxy and if now = True. If so, check its connection count. If no
# connections, shutdown else perform the restart. Note: auto-restart sets now=True, but handlers use
# the default value (False).
if isinstance(self.process_proxy, RemoteProcessProxy) and now:
if self.parent._kernel_connections.get(kernel_id, 0) == 0:
if isinstance(self.process_proxy, RemoteProcessProxy) and now and self.mapping_kernel_manager:
if self.mapping_kernel_manager._kernel_connections.get(kernel_id, 0) == 0:
self.log.warning("Remote kernel ({}) will not be automatically restarted since there are no "
"clients connected at this time.".format(kernel_id))
# Use the parent mapping kernel manager so activity monitoring and culling is also shutdown
self.parent.shutdown_kernel(kernel_id, now=now)
self.mapping_kernel_manager.shutdown_kernel(kernel_id, now=now)
return
super(RemoteKernelManager, self).restart_kernel(now, **kwargs)
if isinstance(self.process_proxy, RemoteProcessProxy): # for remote kernels...
# Re-establish activity watching...
if self._activity_stream:
self._activity_stream.close()
self._activity_stream = None
self.parent.start_watching_activity(kernel_id)
if self.mapping_kernel_manager:
self.mapping_kernel_manager.start_watching_activity(kernel_id)
# Refresh persisted state.
self.parent.parent.kernel_session_manager.refresh_session(kernel_id)
if self.kernel_session_manager:
self.kernel_session_manager.refresh_session(kernel_id)
self.restarting = False

def signal_kernel(self, signum):
Expand Down Expand Up @@ -423,3 +471,27 @@ def _get_process_proxy(self):
format(self.kernel_spec.display_name, process_proxy_class_name))
process_proxy_class = import_item(process_proxy_class_name)
self.process_proxy = process_proxy_class(kernel_manager=self, proxy_config=process_proxy_cfg.get('config'))

# When this class is used by an EnterpriseGatewayApp instance, it will be able to
# access the app's configuration using the traitlet parent chain.
# When it's used independently, it should fall back to safe defaults.
@property
def kernel_session_manager(self):
try:
return self.parent.parent.kernel_session_manager
except AttributeError:
return None

@property
def cull_idle_timeout(self):
try:
return self.parent.cull_idle_timeout
except AttributeError:
return 0

@property
def mapping_kernel_manager(self):
try:
return self.parent
except AttributeError:
return None
2 changes: 1 addition & 1 deletion enterprise_gateway/services/processproxies/conductor.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(self, kernel_manager, proxy_config):
self.env = None
self.rest_credential = None
self.conductor_endpoint = proxy_config.get('conductor_endpoint',
kernel_manager.parent.parent.conductor_endpoint)
kernel_manager.conductor_endpoint)

def launch_process(self, kernel_cmd, **kwargs):
"""Launches the specified process within a Conductor cluster environment."""
Expand Down
2 changes: 1 addition & 1 deletion enterprise_gateway/services/processproxies/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def __init__(self, kernel_manager, proxy_config):
if proxy_config.get('remote_hosts'):
self.hosts = proxy_config.get('remote_hosts').split(',')
else:
self.hosts = kernel_manager.parent.parent.remote_hosts # from command line or env
self.hosts = kernel_manager.remote_hosts # from command line or env

def launch_process(self, kernel_cmd, **kwargs):
"""Launches a kernel process on a selected host."""
Expand Down
34 changes: 20 additions & 14 deletions enterprise_gateway/services/processproxies/processproxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,12 @@ def __init__(self, kernel_manager, proxy_config):
# relaunch (see jupyter_client.manager.start_kernel().
self.kernel_manager.ip = '0.0.0.0'
self.log = kernel_manager.log

# extract the kernel_id string from the connection file and set the KERNEL_ID environment variable
self.kernel_manager.kernel_id = os.path.basename(self.kernel_manager.connection_file). \
replace('kernel-', '').replace('.json', '')
if self.kernel_manager.kernel_id is None:
self.kernel_manager.kernel_id = os.path.basename(self.kernel_manager.connection_file). \
replace('kernel-', '').replace('.json', '')

self.kernel_id = self.kernel_manager.kernel_id
self.kernel_launch_timeout = default_kernel_launch_timeout
self.lower_port = 0
Expand All @@ -127,15 +130,15 @@ def __init__(self, kernel_manager, proxy_config):

# Handle authorization sets...
# Take union of unauthorized users...
self.unauthorized_users = self.kernel_manager.parent.parent.unauthorized_users
self.unauthorized_users = self.kernel_manager.unauthorized_users
if proxy_config.get('unauthorized_users'):
self.unauthorized_users = self.unauthorized_users.union(proxy_config.get('unauthorized_users').split(','))

# Let authorized users override global value - if set on kernelspec...
if proxy_config.get('authorized_users'):
self.authorized_users = set(proxy_config.get('authorized_users').split(','))
else:
self.authorized_users = self.kernel_manager.parent.parent.authorized_users
self.authorized_users = self.kernel_manager.authorized_users

# Represents the local process (from popen) if applicable. Note that we could have local_proc = None even when
# the subclass is a LocalProcessProxy (or YarnProcessProxy). This will happen if EG is restarted and the
Expand Down Expand Up @@ -432,7 +435,7 @@ def _enforce_authorization(self, **kwargs):

# Although it may already be set in the env, just override in case it was only set via command line or config
# Convert to string since execve() (called by Popen in base classes) wants string values.
env_dict['EG_IMPERSONATION_ENABLED'] = str(self.kernel_manager.parent.parent.impersonation_enabled)
env_dict['EG_IMPERSONATION_ENABLED'] = str(self.kernel_manager.impersonation_enabled)

# Ensure KERNEL_USERNAME is set
kernel_username = KernelSessionManager.get_kernel_username(**kwargs)
Expand Down Expand Up @@ -460,16 +463,19 @@ def _enforce_limits(self, **kwargs):

# if kernels-per-user is configured, ensure that this next kernel is still within the limit. If this
# is due to a restart, skip enforcement since we're re-using that id.
max_kernels_per_user = self.kernel_manager.parent.parent.max_kernels_per_user
max_kernels_per_user = self.kernel_manager.max_kernels_per_user
if max_kernels_per_user >= 0 and not self.kernel_manager.restarting:
env_dict = kwargs.get('env')
username = env_dict['KERNEL_USERNAME']
current_kernel_count = self.kernel_manager.parent.parent.kernel_session_manager.active_sessions(username)
if current_kernel_count >= max_kernels_per_user:
error_message = "A max kernels per user limit has been set to {} and user '{}' currently has {} " \
"active {}.".format(max_kernels_per_user, username, current_kernel_count,
"kernel" if max_kernels_per_user == 1 else "kernels")
self.log_and_raise(http_status_code=403, reason=error_message)

# Per user limits are only meaningful if a session manager exists.
if self.kernel_manager.kernel_session_manager:
current_kernel_count = self.kernel_manager.kernel_session_manager.active_sessions(username)
if current_kernel_count >= max_kernels_per_user:
error_message = "A max kernels per user limit has been set to {} and user '{}' currently has {} " \
"active {}.".format(max_kernels_per_user, username, current_kernel_count,
"kernel" if max_kernels_per_user == 1 else "kernels")
self.log_and_raise(http_status_code=403, reason=error_message)

def get_process_info(self):
"""Captures the base information necessary for kernel persistence relative to process proxies.
Expand All @@ -494,7 +500,7 @@ def load_process_info(self, process_info):
def _validate_port_range(self, proxy_config):
"""Validates the port range configuration option to ensure appropriate values."""
# Let port_range override global value - if set on kernelspec...
port_range = self.kernel_manager.parent.parent.port_range
port_range = self.kernel_manager.port_range
if proxy_config.get('port_range'):
port_range = proxy_config.get('port_range')

Expand Down Expand Up @@ -797,7 +803,7 @@ def _spawn_ssh_tunnel(self, kernel_channel, local_port, remote_port, remote_ip,
return pexpect.spawn(cmd, env=os.environ.copy().pop('SSH_ASKPASS', None))

def _get_keep_alive_interval(self, kernel_channel):
cull_idle_timeout = self.kernel_manager.parent.cull_idle_timeout
cull_idle_timeout = self.kernel_manager.cull_idle_timeout

if (kernel_channel == KernelChannel.COMMUNICATION or
kernel_channel == KernelChannel.CONTROL or
Expand Down
Loading

0 comments on commit e523fcb

Please sign in to comment.