Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1/N][Advanced timeline] Include events/profiling events to the ray list tasks #31776

Merged
merged 11 commits into from
Jan 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 25 additions & 6 deletions dashboard/client/src/components/TaskTable.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ import {
import Autocomplete from "@material-ui/lab/Autocomplete";
import Pagination from "@material-ui/lab/Pagination";
import React, { useState } from "react";
import { DurationText } from "../common/DurationText";
import rowStyles from "../common/RowStyles";
import { Task } from "../type/task";
import { useFilter } from "../util/hook";
import StateCounter from "./StatesCounter";
import { StatusChip } from "./StatusChip";

const TaskTable = ({
Expand All @@ -36,7 +38,8 @@ const TaskTable = ({
{ label: "ID" },
{ label: "Name" },
{ label: "Job Id" },
{ label: "Scheduling State" },
{ label: "State" },
{ label: "Duration" },
{ label: "Function or Class Name" },
{ label: "Node Id" },
{ label: "Actor_id" },
Expand All @@ -59,12 +62,12 @@ const TaskTable = ({
/>
<Autocomplete
style={{ margin: 8, width: 120 }}
options={Array.from(new Set(tasks.map((e) => e.scheduling_state)))}
options={Array.from(new Set(tasks.map((e) => e.state)))}
onInputChange={(_: any, value: string) => {
changeFilter("scheduling_state", value.trim());
changeFilter("state", value.trim());
}}
renderInput={(params: TextFieldProps) => (
<TextField {...params} label="Scheduling State" />
<TextField {...params} label="State" />
)}
/>
<Autocomplete
Expand Down Expand Up @@ -121,6 +124,9 @@ const TaskTable = ({
count={Math.ceil(taskList.length / pageSize)}
/>
</div>
<div>
<StateCounter type="task" list={taskList} />
</div>
</div>
<Table>
<TableHead>
Expand All @@ -140,12 +146,15 @@ const TaskTable = ({
task_id,
name,
job_id,
scheduling_state,
state,
func_or_class_name,
node_id,
actor_id,
type,
required_resources,
events,
start_time_ms,
end_time_ms,
}) => (
<TableRow>
<TableCell align="center">
Expand All @@ -161,7 +170,17 @@ const TaskTable = ({
<TableCell align="center">{name ? name : "-"}</TableCell>
<TableCell align="center">{job_id}</TableCell>
<TableCell align="center">
<StatusChip type="actor" status={scheduling_state} />
<StatusChip type="actor" status={state} />
</TableCell>
<TableCell align="center">
{start_time_ms && start_time_ms > 0 ? (
<DurationText
startTime={start_time_ms}
endTime={end_time_ms}
/>
) : (
"-"
)}
</TableCell>
<TableCell align="center">{func_or_class_name}</TableCell>
<TableCell align="center">{node_id ? node_id : "-"}</TableCell>
Expand Down
2 changes: 1 addition & 1 deletion dashboard/client/src/pages/state/task.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { Task } from "../../type/task";
import { useStateApiList } from "./hook/useStateApi";

/**
* Represent the embedable actors page.
* Represent the embedable tasks page.
*/
const TaskList = ({ jobId = null }: { jobId?: string | null }) => {
const [timeStamp] = useState(dayjs());
Expand Down
2 changes: 1 addition & 1 deletion dashboard/client/src/service/placementGroup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ import { get } from "./requestHandlers";

export const getPlacementGroup = () => {
return get<StateApiResponse<PlacementGroup>>(
"api/v0/placement_groups?detail=1",
"api/v0/placement_groups?detail=1&limit=10000",
);
};
2 changes: 1 addition & 1 deletion dashboard/client/src/service/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ import { Task } from "../type/task";
import { get } from "./requestHandlers";

export const getTasks = () => {
return get<StateApiResponse<Task>>("api/v0/tasks?detail=1");
return get<StateApiResponse<Task>>("api/v0/tasks?detail=1&limit=10000");
};
5 changes: 4 additions & 1 deletion dashboard/client/src/type/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export enum TypeTaskType {
export type Task = {
task_id: string;
name: string;
scheduling_state: TypeTaskStatus;
state: TypeTaskStatus;
job_id: string;
node_id: string;
actor_id: string;
Expand All @@ -33,4 +33,7 @@ export type Task = {
language: string;
required_resources: { [key: string]: number };
runtime_env_info: string;
events: { [key: string]: string }[];
start_time_ms: number | null;
end_time_ms: number | null;
};
57 changes: 45 additions & 12 deletions dashboard/state_aggregator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import logging
import json

from dataclasses import asdict, fields
from itertools import islice
Expand Down Expand Up @@ -359,9 +360,16 @@ async def list_tasks(self, *, option: ListApiOptions) -> ListApiResponse:
{task_id -> task_data_in_dict}
task_data_in_dict's schema is in TaskState
"""
job_id = None
for filter in option.filters:
if filter[0] == "job_id":
# tuple consists of (job_id, predicate, value)
job_id = filter[2]

try:
reply = await self._client.get_all_task_info(timeout=option.timeout)
reply = await self._client.get_all_task_info(
timeout=option.timeout, job_id=job_id
)
except DataSourceUnavailable:
raise DataSourceUnavailable(GCS_QUERY_FAILURE_WARNING)

Expand All @@ -371,7 +379,15 @@ def _to_task_state(task_attempt: dict) -> dict:
"""
task_state = {}
task_info = task_attempt.get("task_info", {})
state_updates = task_attempt.get("state_updates", None)
state_updates = task_attempt.get("state_updates", [])
profiling_data = task_attempt.get("profiling_data", {})
if profiling_data:
for event in profiling_data["events"]:
# End/start times are recorded in ns. We convert them to ms.
event["end_time"] = int(event["end_time"]) / 1e6
event["start_time"] = int(event["start_time"]) / 1e6
event["extra_data"] = json.loads(event["extra_data"])
task_state["profiling_data"] = profiling_data

# Convert those settable fields
mappings = [
Expand All @@ -396,16 +412,32 @@ def _to_task_state(task_attempt: dict) -> dict:
for key in keys:
task_state[key] = src.get(key)

# Get the most updated scheduling_state by state transition ordering.
def _get_most_recent_status(task_state: dict) -> str:
# Reverse the order as defined in protobuf for the most recent state.
for status_name in reversed(common_pb2.TaskStatus.keys()):
key = f"{status_name.lower()}_ts"
if state_updates.get(key):
return status_name
return common_pb2.TaskStatus.Name(common_pb2.NIL)

task_state["scheduling_state"] = _get_most_recent_status(state_updates)
task_state["start_time_ms"] = None
task_state["end_time_ms"] = None
events = []

for state in common_pb2.TaskStatus.keys():
key = f"{state.lower()}_ts"
if key in state_updates:
# timestamp is recorded in ns.
ts_ms = int(state_updates[key]) // 1e6
events.append(
{
"state": state,
"created_ms": ts_ms,
}
)
if state == "RUNNING":
task_state["start_time_ms"] = ts_ms
if state == "FINISHED" or state == "FAILED":
task_state["end_time_ms"] = ts_ms

task_state["events"] = events
if len(events) > 0:
latest_state = events[-1]["state"]
else:
latest_state = common_pb2.TaskStatus.Name(common_pb2.NIL)
task_state["state"] = latest_state

return task_state

Expand All @@ -419,6 +451,7 @@ def _get_most_recent_status(task_state: dict) -> str:
"node_id",
"actor_id",
"parent_task_id",
"component_id",
],
)
)
Expand Down
12 changes: 6 additions & 6 deletions doc/source/ray-observability/state/state-api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -320,14 +320,14 @@ E.g., List running tasks

.. code-block:: bash

ray list tasks -f scheduling_state=RUNNING
ray list tasks -f state=RUNNING

.. tabbed:: Python SDK

.. code-block:: python

from ray.experimental.state.api import list_tasks
list_tasks(filters=[("scheduling_state", "=", "RUNNING")])
list_tasks(filters=[("state", "=", "RUNNING")])

E.g., List non-running tasks
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand All @@ -336,14 +336,14 @@ E.g., List non-running tasks

.. code-block:: bash

ray list tasks -f scheduling_state!=RUNNING
ray list tasks -f state!=RUNNING

.. tabbed:: Python SDK

.. code-block:: python

from ray.experimental.state.api import list_tasks
list_tasks(filters=[("scheduling_state", "!=", "RUNNING")])
list_tasks(filters=[("state", "!=", "RUNNING")])

E.g., List running tasks that have a name func
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand All @@ -352,14 +352,14 @@ E.g., List running tasks that have a name func

.. code-block:: bash

ray list tasks -f scheduling_state=RUNNING -f name="task_running_300_seconds()"
ray list tasks -f state=RUNNING -f name="task_running_300_seconds()"

.. tabbed:: Python SDK

.. code-block:: python

from ray.experimental.state.api import list_tasks
list_tasks(filters=[("scheduling_state", "=", "RUNNING"), ("name", "=", "task_running_300_seconds()")])
list_tasks(filters=[("state", "=", "RUNNING"), ("name", "=", "task_running_300_seconds()")])

E.g., List tasks with more details
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
4 changes: 2 additions & 2 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -745,8 +745,8 @@ cdef void execute_task(
function_descriptor = CFunctionDescriptorToPython(
ray_function.GetFunctionDescriptor())
function_name = execution_info.function_name
extra_data = (b'{"name": ' + function_name.encode("ascii") +
b' "task_id": ' + task_id.hex().encode("ascii") + b'}')
extra_data = (b'{"name": "' + function_name.encode("ascii") +
b'", "task_id": "' + task_id.hex().encode("ascii") + b'"}')

name_of_concurrency_group_to_execute = \
c_name_of_concurrency_group_to_execute.decode("ascii")
Expand Down
16 changes: 13 additions & 3 deletions python/ray/experimental/state/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ class TaskState(StateSchema):
#: Refer to src/ray/protobuf/common.proto for a detailed explanation of the state
#: breakdowns and typical state transition flow.
#:
scheduling_state: TypeTaskStatus = state_column(filterable=True)
state: TypeTaskStatus = state_column(filterable=True)
#: The job id of this task.
job_id: str = state_column(filterable=True)
#: Id of the node that runs the task. If the task is retried, it could
Expand Down Expand Up @@ -522,7 +522,17 @@ class TaskState(StateSchema):
#: The runtime environment information for the task.
runtime_env_info: str = state_column(detail=True, filterable=False)
#: The parent task id.
parent_task_id: str = state_column(filterable=True)
parent_task_id: str = state_column(detail=True, filterable=True)
#: The list of events of the given task.
#: Refer to src/ray/protobuf/common.proto for a detailed explanation of the state
#: breakdowns and typical state transition flow.
events: List[dict] = state_column(detail=True, filterable=False)
#: The list of profile events of the given task.
profiling_data: List[dict] = state_column(detail=True, filterable=False)
#: The time when the task starts to run. A Unix timestamp in ms.
start_time_ms: Optional[int] = state_column(detail=True, filterable=False)
#: The time when the task finishes or failed. A Unix timestamp in ms.
end_time_ms: Optional[int] = state_column(detail=True, filterable=False)


@dataclass(init=True)
Expand Down Expand Up @@ -740,7 +750,7 @@ def to_summary(cls, *, tasks: List[Dict]):
)
task_summary = summary[key]

state = task["scheduling_state"]
state = task["state"]
if state not in task_summary.state_counts:
task_summary.state_counts[state] = 0
task_summary.state_counts[state] += 1
Expand Down
10 changes: 8 additions & 2 deletions python/ray/experimental/state/state_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import ray.dashboard.modules.log.log_consts as log_consts
from ray._private import ray_constants
from ray._private.gcs_utils import GcsAioClient
from ray._private.utils import hex_to_binary
from ray._raylet import JobID
from ray.core.generated import gcs_service_pb2_grpc
from ray.core.generated.gcs_service_pb2 import (
GetAllActorInfoReply,
Expand Down Expand Up @@ -232,11 +234,15 @@ async def get_all_actor_info(

@handle_grpc_network_errors
async def get_all_task_info(
self, timeout: int = None, limit: int = None
self, timeout: int = None, limit: int = None, job_id: Optional[str] = None
) -> Optional[GetTaskEventsReply]:
if not limit:
limit = RAY_MAX_LIMIT_FROM_DATA_SOURCE
request = GetTaskEventsRequest(limit=limit, exclude_driver_task=True)
if job_id:
job_id = JobID(hex_to_binary(job_id)).binary()
request = GetTaskEventsRequest(
limit=limit, exclude_driver_task=True, job_id=job_id
)
reply = await self._gcs_task_info_stub.GetTaskEvents(request, timeout=timeout)
return reply

Expand Down
Loading