Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Attempt to fix FD leak issue #1051 #1054

Merged
merged 7 commits into from
May 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/source/operators/config-env-debug.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ The following environment variables may be useful for troubleshooting:
EG_POLL_INTERVAL=0.5
The interval (in seconds) to wait before checking poll results again.

EG_RESTART_STATUS_POLL_INTERVAL=1.0
The interval (in seconds) to wait before polling for the restart status again when duplicate restart request
for the same kernel is received or when a shutdown request is received while kernel is still restarting.

EG_REMOVE_CONTAINER=True
Used by launch_docker.py, indicates whether the kernel's docker container should be
removed following its shutdown. Set this value to 'False' if you want the container
Expand Down
58 changes: 55 additions & 3 deletions enterprise_gateway/services/kernels/remotemanager.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.
"""Kernel managers that operate against a remote process."""

import asyncio
import os
import re
import signal
import time
import uuid

from jupyter_client.ioloop.manager import AsyncIOLoopKernelManager
Expand All @@ -18,6 +19,9 @@
from ..processproxies.processproxy import LocalProcessProxy, RemoteProcessProxy
from ..sessions.kernelsessionmanager import KernelSessionManager

default_kernel_launch_timeout = float(os.getenv("EG_KERNEL_LAUNCH_TIMEOUT", "30"))
kernel_restart_status_poll_interval = float(os.getenv("EG_RESTART_STATUS_POLL_INTERVAL", 1.0))


def import_item(name):
"""Import and return ``bar`` given the string ``foo.bar``.
Expand Down Expand Up @@ -189,6 +193,47 @@ async def start_kernel(self, *args, **kwargs):
self.parent.kernel_session_manager.create_session(kernel_id, **kwargs)
return kernel_id

async def restart_kernel(self, kernel_id):
kernel = self.get_kernel(kernel_id)
if kernel.restarting: # assuming duplicate request.
await self.wait_for_restart_finish(kernel_id, "restart")
self.log.info("Skipping kernel restart as this was duplicate request.")
return
try:
kernel.restarting = True # Moved in out of RemoteKernelManager
await super().restart_kernel(kernel_id)
finally:
kernel.restarting = False

async def shutdown_kernel(self, kernel_id, now=False, restart=False):
kernel = self.get_kernel(kernel_id)
if kernel.restarting:
await self.wait_for_restart_finish(kernel_id, "shutdown")
try:
await super().shutdown_kernel(kernel_id, now, restart)
except KeyError as ke: # this is hit for multiple shutdown request.
self.log.exception(f"Exception while shutting down kernel: '{kernel_id}': {ke}")
raise web.HTTPError(404, "Kernel does not exist: %s" % kernel_id)

async def wait_for_restart_finish(self, kernel_id, action="shutdown"):
kernel = self.get_kernel(kernel_id)
start_time = float(time.time()) # epoc time
timeout = kernel.kernel_launch_timeout
poll_time = kernel_restart_status_poll_interval
self.log.info(
f"Kernel '{kernel_id}' was restarting when {action} request received. Polling every {poll_time} "
f"seconds for next {timeout} seconds for kernel to complete its restart."
)
while kernel.restarting:
now = float(time.time())
if (now - start_time) > timeout:
self.log.info(
f"Timeout: Exiting restart wait loop in order to {action} kernel '{kernel_id}'."
)
break
await asyncio.sleep(poll_time)
return

def _enforce_kernel_limits(self, username: str) -> None:
"""
If MaxKernels or MaxKernelsPerUser are configured, enforce the respective values.
Expand Down Expand Up @@ -341,6 +386,7 @@ def __init__(self, **kwargs):
self.sigint_value = None
self.kernel_id = None
self.user_overrides = {}
self.kernel_launch_timeout = default_kernel_launch_timeout
self.restarting = False # need to track whether we're in a restart situation or not

# If this instance supports port caching, then disable cache_ports since we don't need this
Expand Down Expand Up @@ -412,6 +458,10 @@ def _capture_user_overrides(self, **kwargs):
of the kernelspec env stanza that would have otherwise overridden the user-provided values.
"""
env = kwargs.get("env", {})
# If KERNEL_LAUNCH_TIMEOUT is passed in the payload, override it.
self.kernel_launch_timeout = float(
env.get("KERNEL_LAUNCH_TIMEOUT", default_kernel_launch_timeout)
)
self.user_overrides.update(
{
key: value
Expand Down Expand Up @@ -504,7 +554,8 @@ async def restart_kernel(self, now=False, **kwargs):
Any options specified here will overwrite those used to launch the
kernel.
"""
self.restarting = True
kevin-bates marked this conversation as resolved.
Show resolved Hide resolved
if now: # if auto-restarting (when now is True), indicate we're restarting.
self.restarting = True
kernel_id = self.kernel_id or os.path.basename(self.connection_file).replace(
"kernel-", ""
).replace(".json", "")
Expand Down Expand Up @@ -535,7 +586,8 @@ async def restart_kernel(self, now=False, **kwargs):
# Refresh persisted state.
if self.kernel_session_manager:
self.kernel_session_manager.refresh_session(kernel_id)
self.restarting = False
if now:
self.restarting = False

async def signal_kernel(self, signum):
"""
Expand Down