Skip to content

Commit

Permalink
Consistent handling/emitting of kernel session state (jupyter-server#366
Browse files Browse the repository at this point in the history
)

* update state

* streamline events about sessions

* compute a kernel path server-side and emit that in event data

* rename some modules

* code review cleanup

* define kernel states as constants for easier future maintenance

* fix broken unit test
  • Loading branch information
Zsailer authored and GitHub Enterprise committed May 11, 2022
1 parent 8cea5b8 commit 67e713c
Show file tree
Hide file tree
Showing 22 changed files with 440 additions and 406 deletions.
2 changes: 1 addition & 1 deletion conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from data_studio_jupyter_extensions.configurables.notebook_service import (
NotebookServiceClient,
)
from data_studio_jupyter_extensions.extensions.events.logger import EventBus
from data_studio_jupyter_extensions.extensions.events.bus import EventBus
from data_studio_jupyter_extensions.tests.mock.client import MockNotebookServiceClient
from data_studio_jupyter_extensions.tests.mock.utils import load_openapi_spec

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ async def post_start_kernel(self, **kwargs):
async def start_heartbeat(self):
"""Start a heartbeat for the kernel."""
# Connect a heartbeat from the manager.
self._emit_status(
status="Connecting",
description="",
self._emit(
state=constants.KERNEL_STATE.CONNECTING,
msg="Waiting for a successful heartbeat from the kernel.",
)
client = self.client()
self.heartbeat = client.hb_channel
Expand Down
34 changes: 17 additions & 17 deletions data_studio_jupyter_extensions/configurables/kernel_restarter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from traitlets import Float
from traitlets import Int

from data_studio_jupyter_extensions import constants
from data_studio_jupyter_extensions.configurables.kernel_status import KernelStatusMixin


Expand Down Expand Up @@ -39,13 +40,15 @@ def _dead_state(self):
self.log.warning(
f"Kernel (kernel_id={self.kernel_manager.kernel_id}) is no longer running in Notebook Service."
)
self._emit_status(status="Dead", description="Kernel is no longer running.")
self._emit(
state=constants.KERNEL_STATE.DEAD, msg="Kernel is no longer running."
)

def _disconnected_state(self):
self.log.warning(f"No heartbeat detected for: {self.kernel_manager.kernel_id}")
self._emit_status(
status="Disconnected",
description="Kernel appears to be running, but a connection could not be established.",
self._emit(
state=constants.KERNEL_STATE.DISCONNECTED,
msg="Kernel appears to be running, but a connection could not be established.",
)
self.stop()

Expand All @@ -60,10 +63,10 @@ async def poll(self):

# If the kernel is communicating, we're good here.
if km.is_communicating():
if self._attempt_count > 0:
self._emit_status(
status="Connected",
description="",
if not self._connected_once or self._attempt_count > 0:
self._emit(
state=constants.KERNEL_STATE.CONNECTED,
msg="Kernel heartbeat established.",
)
self._connected_once = True
self._attempt_count = 0
Expand All @@ -73,9 +76,9 @@ async def poll(self):

# Check if the kernel ever successfully connected.
if not self._connected_once:
self._emit_status(
status="Connecting",
description="",
self._emit(
state=constants.KERNEL_STATE.CONNECTING,
msg="",
)
# Kernel is disconnected due to timeout.
if km.heartbeat_timeout < (now - self._start_time):
Expand All @@ -85,12 +88,9 @@ async def poll(self):
else:
self._attempt_count += 1
if self._attempt_count == 1:
self.log.warning(
f"Missed a heartbeat for kernel: {self.kernel_manager.kernel_id}"
)
self._emit_status(
status="Missed a kernel heartbeat. Trying to reconnect.",
description="",
self._emit(
state=constants.KERNEL_STATE.DISCONNECTED,
msg="Missed a kernel heartbeat. Trying to reconnect.",
)
elif self._attempt_count == self.restart_limit:
self._last_attempt = now
Expand Down
44 changes: 18 additions & 26 deletions data_studio_jupyter_extensions/configurables/kernel_status.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,4 @@
from data_studio_jupyter_extensions.extensions.events.logger import EventBus


KERNEL_STATES = [
"unknown",
"starting",
"started",
"terminating",
"dead",
"connecting",
"connected",
"disconnected",
]
from data_studio_jupyter_extensions.extensions.events.bus import EventBus


class KernelStatusMixin:
Expand All @@ -22,35 +10,39 @@ class KernelStatusMixin:
def event_bus(self):
return EventBus.instance()

def _emit_status(self, status, description=""):
def _emit(self, state, msg=""):
"""Emit a kernel's message to both the banner and the console."""
self._emit_banner(state, msg)
self._emit_console(state, msg)

def _emit_banner(self, state, msg=""):
"""Emit a kernel status event."""
self.log.debug(f"{self.kernel_id} status {status} ({description})")
self.event_bus.record_event(
schema_name="event.datastudio.jupyter.com/kernel-status",
schema_name="event.datastudio.jupyter.com/kernel-message",
version=1,
event={
"notebook_id": self.notebook_id or "Not set",
"process_id": self.process_id or "Not set yet",
"kernel_id": self.kernel_id,
"status": status,
"details": description,
"state": state,
"msg": msg,
"banner": True,
"console": False,
},
)

def _emit(self, message):
"""Am alias to emit a message"""
self._emit_kernel_message(message)

def _emit_kernel_message(self, message):
"""Emit a kernel message."""
self.log.debug(f"{self.kernel_id} message: {message}")
def _emit_console(self, state, msg):
"""Emit a kernel message to the log console in JupyterLab."""
self.event_bus.record_event(
schema_name="event.datastudio.jupyter.com/kernel-message",
version=1,
event={
"notebook_id": self.notebook_id or "Not set",
"process_id": self.process_id or "Not set yet",
"description": message,
"kernel_id": self.kernel_id,
"state": state,
"msg": msg,
"banner": False,
"console": True,
},
)
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from traitlets import DottedObjectName

from data_studio_jupyter_extensions.configurables.provisioner import KernelFailedError
from data_studio_jupyter_extensions.extensions.events.logger import EventBus
from data_studio_jupyter_extensions.extensions.events.bus import EventBus


def raise_error_if_pending(method):
Expand Down
43 changes: 22 additions & 21 deletions data_studio_jupyter_extensions/configurables/provisioner.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,8 @@ async def _wait_for_status(
s = "Pod Pending"

# Logging and eventbus
self._emit_kernel_message(message)
self._emit_status(s, d)
self._emit_console(s, message)
self._emit_banner(s, d)

# If the status is known, return
if resp["status"].lower() in status:
Expand Down Expand Up @@ -362,9 +362,13 @@ async def kill(self, restart: bool = False) -> None:
restart is True if this operation will precede a subsequent launch_kernel request.
"""
self._emit_status(status="Terminating the current kernel.")
self._emit(
constants.KERNEL_STATE.TERMINATING, msg="Shutting down the current kernel."
)
await self.nbservice_client.stop_kernel(self.process_id)
self._emit_status(status="Dead", description="Kernel is terminated.")
self._emit(
constants.KERNEL_STATE.DEAD, msg="Kernel has been successfully terminated."
)
self.process_id = None

async def terminate(self, restart: bool = False) -> None:
Expand All @@ -378,9 +382,11 @@ async def terminate(self, restart: bool = False) -> None:
restart is True if this operation precedes a start launch_kernel request.
"""
self._emit_status(status="Terminating the current kernel.")
self._emit(
constants.KERNEL_STATE.TERMINATING, msg="Terminating the current kernel."
)
await self.nbservice_client.stop_kernel(self.process_id)
self._emit_status(status="Dead", description="Kernel is terminated.")
self._emit(constants.KERNEL_STATE.DEAD, msg="Kernel is terminated.")
self.process_id = None

async def pre_launch(self, **kwargs: Any) -> Dict[str, Any]:
Expand Down Expand Up @@ -428,22 +434,14 @@ async def launch_kernel(
This method is called from `KernelManager.launch_kernel()` during the
kernel manager's start kernel sequence.
"""
# If the caller provided metadata for an already-running
# kernel, get that data here and check if its valid
# in the next step.
if kwargs.get("ds_metadata"):
metadata = kwargs["ds_metadata"]
if metadata.get("id") == self.notebook_id:
self.process_id = metadata.get("kernel")

if "process_id" in kwargs:
self.process_id = kwargs["process_id"]

# If a process ID already exists for this provisioner,
# check with the notebook-service to see if its still running.
if self.process_id:
self._emit_status("Kernel found", "checking its status")
try:
self._emit(constants.KERNEL_STATE.RECONNECTING)
r = await self.nbservice_client.get_kernel_status(
self.process_id, query_params_dict={"cause": "view"}
)
Expand All @@ -456,25 +454,28 @@ async def launch_kernel(
"process_id": self.process_id,
"kernel_id": self.kernel_id,
}
self._emit_status("Reconnecting")
# Fetch the kernel connection info if the kernel already exists.
await self._fetch_connection_info()
return self.connection_info
# If notebook service returned an error, start a new kernel.
self._emit("The listed kernel no longer exists. Starting a new kernel.")
raise HTTPError
except (HTTPError, HTTPClientError):
# If notebook service returned an error, start a new kernel.
self._emit("The listed kernel no longer exists. Starting a new kernel.")
self._emit(
constants.KERNEL_STATE.DEAD,
"The listed kernel no longer exists. Starting a new kernel.",
)

# If the block above didn't return, a new kernel is needed.
# Request notebook service to start a kernel.
self._emit_status("Starting", "This may take a minute...")
self._emit(
constants.KERNEL_STATE.STARTING,
msg=f"Launching a {self.kernel_spec.display_name} kernel.",
)
r = await self.nbservice_client.start_kernel(self.kernel_spec.name)

# Store the new process ID from notebook service
response = json_decode(r.body)
self.process_id = str(response["id"])

await self.wait_for_ready(timeout=self.launch_timeout)

# Set the connection_info for the given mode.
Expand Down
60 changes: 26 additions & 34 deletions data_studio_jupyter_extensions/configurables/session_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from data_studio_jupyter_extensions.configurables.notebook_service import (
NotebookServiceClient,
)
from data_studio_jupyter_extensions.extensions.events.logger import EventBus
from data_studio_jupyter_extensions.extensions.events.bus import EventBus


KERNEL_SESSION_DB_PATH = osp.join(jupyter_runtime_dir(), "jupyter-session.db")
Expand All @@ -29,6 +29,12 @@ class RemoteKernelRecord(KernelRecord):
process_id: Union[None, str] = None


@dataclass(eq=False)
class RemoteKernelSessionRecord(KernelSessionRecord):
path: Union[None, str] = None
name: Union[None, str] = None


class DataStudioSessionManager(SynchronizerSessionManager):

database_filepath = Unicode(KERNEL_SESSION_DB_PATH)
Expand Down Expand Up @@ -79,9 +85,10 @@ async def start_kernel_for_session(self, session_id, path, name, type, kernel_na
# Assign a kernel id so we can use it in the client before the kernel has started.
kernel_id = str(uuid.uuid4())

# Update the pending session,
self._pending_sessions.update(
KernelSessionRecord(session_id=session_id, kernel_id=kernel_id)
RemoteKernelSessionRecord(
session_id=session_id, kernel_id=kernel_id, path=path, name=name
)
)

# allow contents manager to specify kernels cwd
Expand All @@ -96,46 +103,31 @@ async def start_kernel_for_session(self, session_id, path, name, type, kernel_na
return model

async def list_sessions(self):
self.event_bus.record_event(
schema_name="event.datastudio.jupyter.com/syncing-state",
version=1,
event={
"syncing": True,
"msg": "Syncing running sessions...",
"last_sync": self._last_sync,
},
)
self._emit_sync_message(syncing=True, msg="Syncing running sessions...")
# Run the synchronizer loop
try:
await self.sync_managers()
# Update the last sync time
self._last_sync = datetime.datetime.now().strftime("%c")
self.event_bus.record_event(
schema_name="event.datastudio.jupyter.com/syncing-state",
version=1,
event={
"syncing": False,
"msg": "Successfully synced.",
"last_sync": self._last_sync,
},
)
self._emit_sync_message(syncing=False, msg="Successfully synced.")
except Exception as e:
last_sync = datetime.datetime.now().strftime("%c")
self.event_bus.record_event(
schema_name="event.datastudio.jupyter.com/syncing-state",
version=1,
event={
"syncing": False,
"msg": (
"Failed to sync:"
f"{last_sync}"
"This tab might be out of date."
),
"last_sync": self._last_sync,
},
self._emit_sync_message(
syncing=False, msg="Failed to sync. This tab might be out of date."
)
self.log.error(e)
pass

out = await SessionManager.list_sessions(self)
return out

def _emit_sync_message(self, syncing: bool, msg=""):
"""Emit an event message to the synchronizer client."""
self.event_bus.record_event(
schema_name="event.datastudio.jupyter.com/syncing-state",
version=1,
event={
"syncing": syncing,
"msg": msg,
"last_sync": self._last_sync,
},
)
Loading

0 comments on commit 67e713c

Please sign in to comment.