Skip to content

Commit

Permalink
Backport PR jupyter#845: Fix pending kernels again
Browse files Browse the repository at this point in the history
  • Loading branch information
blink1073 committed Oct 11, 2022
1 parent 5c3afcf commit 079763b
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 54 deletions.
112 changes: 75 additions & 37 deletions jupyter_client/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,34 +55,37 @@ class _ShutdownStatus(Enum):
F = t.TypeVar('F', bound=t.Callable[..., t.Any])


def in_pending_state(method: F) -> F:
"""Sets the kernel to a pending state by
creating a fresh Future for the KernelManager's `ready`
attribute. Once the method is finished, set the Future's results.
"""
def in_pending_state(prefix: str = '') -> t.Callable[[F], F]:
def decorator(method: F) -> F:
"""Sets the kernel to a pending state by
creating a fresh Future for the KernelManager's `ready`
attribute. Once the method is finished, set the Future's results.
"""

@t.no_type_check
@functools.wraps(method)
async def wrapper(self, *args, **kwargs):
# Create a future for the decorated method
try:
self._ready = Future()
except RuntimeError:
# No event loop running, use concurrent future
self._ready = CFuture()
try:
# call wrapped method, await, and set the result or exception.
out = await method(self, *args, **kwargs)
# Add a small sleep to ensure tests can capture the state before done
await asyncio.sleep(0.01)
self._ready.set_result(None)
return out
except Exception as e:
self._ready.set_exception(e)
self.log.exception(self._ready.exception())
raise e
@t.no_type_check
@functools.wraps(method)
async def wrapper(self, *args, **kwargs):
# Create a future for the decorated method
name = f"{prefix}_ready"
future = getattr(self, name)
if not future or future.done():
future = self._future_factory()
setattr(self, name, future)
try:
# call wrapped method, await, and set the result or exception.
out = await method(self, *args, **kwargs)
# Add a small sleep to ensure tests can capture the state before done
await asyncio.sleep(0.01)
future.set_result(None)
return out
except Exception as e:
future.set_exception(e)
self.log.exception(future.exception())
raise e

return t.cast(F, wrapper)

return t.cast(F, wrapper)
return decorator


class KernelManager(ConnectionFileMixin):
Expand All @@ -93,16 +96,32 @@ class KernelManager(ConnectionFileMixin):

_ready: t.Union[Future, CFuture]

@default("event_logger")
def _default_event_logger(self):
if self.parent and hasattr(self.parent, "event_logger"):
return self.parent.event_logger
else:
# If parent does not have an event logger, create one.
logger = EventLogger()
schema_path = DEFAULT_EVENTS_SCHEMA_PATH / "kernel_manager" / "v1.yaml"
logger.register_event_schema(schema_path)
return logger

def _emit(self, *, action: str) -> None:
"""Emit event using the core event schema from Jupyter Server's Contents Manager."""
self.event_logger.emit(
schema_id=self.event_schema_id,
data={"action": action, "kernel_id": self.kernel_id, "caller": "kernel_manager"},
)

_ready: t.Optional[CFuture]
_shutdown_ready: t.Optional[CFuture]

def __init__(self, *args, **kwargs):
super().__init__(**kwargs)
self._shutdown_status = _ShutdownStatus.Unset
# Create a place holder future.
try:
asyncio.get_running_loop()
self._ready = Future()
except RuntimeError:
# No event loop running, use concurrent future
self._ready = CFuture()
self._ready = None
self._shutdown_ready = None

_created_context: Bool = Bool(False)

Expand All @@ -120,6 +139,8 @@ def _context_default(self) -> zmq.Context:
)
client_factory: Type = Type(klass="jupyter_client.KernelClient")

_future_factory: t.Type[CFuture] = CFuture

@default("client_factory") # type:ignore[misc]
def _client_factory_default(self) -> Type:
return import_item(self.client_class)
Expand Down Expand Up @@ -185,10 +206,21 @@ def _default_cache_ports(self) -> bool:
return self.transport == "tcp"

@property
def ready(self) -> t.Union[CFuture, Future]:
"""A future that resolves when the kernel process has started for the first time"""
def ready(self) -> CFuture:
"""A future that resolves when the kernel process has started."""
if not self._ready:
self._ready = self._future_factory()
assert self._ready is not None
return self._ready

@property
def shutdown_ready(self) -> CFuture:
"""A future that resolves when the kernel process has shut down."""
if not self._shutdown_ready:
self._shutdown_ready = self._future_factory()
assert self._shutdown_ready is not None
return self._shutdown_ready

@property
def ipykernel(self) -> bool:
return self.kernel_name in {"python", "python2", "python3"}
Expand Down Expand Up @@ -369,7 +401,7 @@ async def _async_post_start_kernel(self, **kw: t.Any) -> None:

post_start_kernel = run_sync(_async_post_start_kernel)

@in_pending_state
@in_pending_state()
async def _async_start_kernel(self, **kw: t.Any) -> None:
"""Starts a kernel on this host in a separate process.
Expand Down Expand Up @@ -462,7 +494,7 @@ async def _async_cleanup_resources(self, restart: bool = False) -> None:

cleanup_resources = run_sync(_async_cleanup_resources)

@in_pending_state
@in_pending_state('_shutdown')
async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False) -> None:
"""Attempts to stop the kernel process cleanly.
Expand All @@ -481,6 +513,8 @@ async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False)
Will this kernel be restarted after it is shutdown. When this
is True, connection files will not be cleaned up.
"""
# Reset the start ready future.
self._ready = self._future_factory()
self.shutting_down = True # Used by restarter to prevent race condition
# Stop monitoring for restarting while we shutdown.
self.stop_restarter()
Expand Down Expand Up @@ -643,6 +677,10 @@ class AsyncKernelManager(KernelManager):
)
client_factory: Type = Type(klass="jupyter_client.asynchronous.AsyncKernelClient")

# The PyZMQ Context to use for communication with the kernel.
context: Instance = Instance(zmq.asyncio.Context)

_future_factory: t.Type[Future] = Future # type:ignore[assignment]
_launch_kernel = KernelManager._async_launch_kernel
start_kernel = KernelManager._async_start_kernel
pre_start_kernel = KernelManager._async_pre_start_kernel
Expand Down
16 changes: 7 additions & 9 deletions jupyter_client/multikernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,7 @@ async def _async_start_kernel(
self._kernels[kernel_id] = km
else:
await task
# raise an exception if one occurred during kernel startup.
if km.ready.exception():
raise km.ready.exception() # type: ignore
await asyncio.wrap_future(km.ready)

return kernel_id

Expand Down Expand Up @@ -253,15 +251,17 @@ async def _async_shutdown_kernel(
try:
await task
km = self.get_kernel(kernel_id)
await t.cast(asyncio.Future, km.ready)
await asyncio.wrap_future(km.ready)
except asyncio.CancelledError:
pass
except Exception:
self.remove_kernel(kernel_id)
return
km = self.get_kernel(kernel_id)
# If a pending kernel raised an exception, remove it.
if not km.ready.cancelled() and km.ready.exception():
try:
await asyncio.wrap_future(km.ready)
except Exception:
self.remove_kernel(kernel_id)
return
stopper = ensure_async(km.shutdown_kernel(now, restart))
Expand All @@ -270,9 +270,7 @@ async def _async_shutdown_kernel(
# Await the kernel if not using pending kernels.
if not self._using_pending_kernels():
await fut
# raise an exception if one occurred during kernel shutdown.
if km.ready.exception():
raise km.ready.exception() # type: ignore
await asyncio.wrap_future(km.shutdown_ready)

shutdown_kernel = run_sync(_async_shutdown_kernel)

Expand Down Expand Up @@ -315,7 +313,7 @@ async def _async_shutdown_all(self, now: bool = False) -> None:
if self._using_pending_kernels():
for km in kms:
try:
await km.ready
await km.shutdown_ready
except asyncio.CancelledError:
self._pending_kernels[km.kernel_id].cancel()
except Exception:
Expand Down
8 changes: 4 additions & 4 deletions jupyter_client/tests/test_kernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,13 @@ def zmq_context():


@pytest.fixture(params=[AsyncKernelManager, AsyncKMSubclass])
def async_km(request, config):
async def async_km(request, config):
km = request.param(config=config)
return km


@pytest.fixture
def async_km_subclass(config):
async def async_km_subclass(config):
km = AsyncKMSubclass(config=config)
return km

Expand Down Expand Up @@ -451,11 +451,11 @@ async def test_lifecycle(self, async_km):
await async_km.start_kernel(stdout=PIPE, stderr=PIPE)
is_alive = await async_km.is_alive()
assert is_alive
is_ready = async_km.ready.done()
assert is_ready
await async_km.ready
await async_km.restart_kernel(now=True)
is_alive = await async_km.is_alive()
assert is_alive
await async_km.ready
await async_km.interrupt_kernel()
assert isinstance(async_km, AsyncKernelManager)
await async_km.shutdown_kernel(now=True)
Expand Down
8 changes: 4 additions & 4 deletions jupyter_client/tests/test_multikernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ async def test_use_pending_kernels(self):
assert isinstance(k, AsyncKernelManager)
await ensure_future(km.shutdown_kernel(kid, now=True))
# Wait for the kernel to shutdown
await kernel.ready
await kernel.shutdown_ready
assert kid not in km, f"{kid} not in {km}"

@gen_test
Expand All @@ -409,7 +409,7 @@ async def test_use_pending_kernels_early_restart(self):
await kernel.ready
await ensure_future(km.shutdown_kernel(kid, now=True))
# Wait for the kernel to shutdown
await kernel.ready
await kernel.shutdown_ready
assert kid not in km, f"{kid} not in {km}"

@gen_test
Expand All @@ -421,7 +421,7 @@ async def test_use_pending_kernels_early_shutdown(self):
# Try shutting down while the kernel is pending
await ensure_future(km.shutdown_kernel(kid, now=True))
# Wait for the kernel to shutdown
await kernel.ready
await kernel.shutdown_ready
assert kid not in km, f"{kid} not in {km}"

@gen_test
Expand All @@ -436,7 +436,7 @@ async def test_use_pending_kernels_early_interrupt(self):
await kernel.ready
await ensure_future(km.shutdown_kernel(kid, now=True))
# Wait for the kernel to shutdown
await kernel.ready
await kernel.shutdown_ready
assert kid not in km, f"{kid} not in {km}"

@gen_test
Expand Down

0 comments on commit 079763b

Please sign in to comment.