Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(client): job scheduler #1042

Merged
merged 18 commits into from
Sep 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 21 additions & 71 deletions client/starwhale/api/_impl/job.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import copy
import typing as t
from pathlib import Path
from collections import defaultdict

import yaml
from loguru import logger

from starwhale.utils import load_yaml
from starwhale.consts import DEFAULT_EVALUATION_JOB_NAME, DEFAULT_EVALUATION_RESOURCE
from starwhale.core.job import dag
from starwhale.utils.fs import ensure_file
from starwhale.utils.load import load_module

Expand All @@ -26,9 +24,11 @@ def step(

def decorator(func: t.Any) -> t.Any:
if Parser.is_parse_stage():
cls, delim, func_name = func.__qualname__.rpartition(".")
_step = dict(
job_name=job_name,
step_name=func.__qualname__,
step_name=func_name,
cls_name=cls,
resources=_resources,
concurrency=concurrency,
task_num=task_num,
Expand All @@ -52,7 +52,6 @@ def __init__(
dataset_uris: t.List[str] = [],
version: str = "",
project: str = "",
kw: t.Dict[str, t.Any] = {},
):
self.project = project
self.version = version
Expand All @@ -61,55 +60,11 @@ def __init__(
self.index = index
self.dataset_uris = dataset_uris
self.workdir = workdir
self.kw = copy.deepcopy(kw)

def get_param(self, name: str) -> t.Any:
return self.kw.get(name)

def put_param(self, name: str, value: t.Any) -> None:
if not self.kw:
self.kw = {}
self.kw.setdefault(name, value)

def __repr__(self) -> str:
return "step:{}, total:{}, index:{}".format(self.step, self.total, self.index)


class Step:
def __init__(
self,
job_name: str,
step_name: str,
resources: t.List[str],
needs: t.List[str],
concurrency: int = 1,
task_num: int = 1,
status: str = "",
):
self.job_name = job_name
self.step_name = step_name
self.resources = resources
self.concurrency = concurrency
self.task_num = task_num
self.needs = needs
self.status = status

def __repr__(self) -> str:
return (
"%s(job_name=%r, step_name=%r, resources=%r, needs=%r, concurrency=%r, task_num=%r, status=%r)"
% (
self.__class__.__name__,
self.job_name,
self.step_name,
self.resources,
self.needs,
self.concurrency,
self.task_num,
self.status,
)
)


class ParseConfig:
def __init__(self, is_parse_stage: bool, jobs: t.Dict[str, t.List[t.Dict]]):
self.parse_stage = is_parse_stage
Expand Down Expand Up @@ -184,31 +139,26 @@ def generate_job_yaml(module: str, path: Path, target_file: Path) -> None:
logger.debug("generator DAG success!")
else:
logger.error("generator DAG error! reason: check is failed.")
raise RuntimeError("generator DAG error!")

@staticmethod
def check(jobs: t.Dict[str, t.List[t.Dict]]) -> bool:
checks = []
logger.debug(f"jobs:{jobs}")
for job in jobs.items():
all_steps = []
needs = []
for _step in job[1]:
all_steps.append(_step["step_name"])
for d in _step["needs"]:
if d:
needs.append(d)
logger.debug(f"all steps:{all_steps}, length:{len(all_steps)}")
_check = all(item in all_steps for item in needs)
if not _check:
logger.error(f"job:{job[0]} check error!")
checks.append(_check)
logger.debug(f"check jobs:{jobs}")

for name, steps in jobs.items():
_vertices: t.List[str] = []
_edges: t.Dict[str, str] = {}
for _step in steps:
_vertices.append(_step["step_name"])
for _pre in _step["needs"]:
if _pre:
_edges[_pre] = _step["step_name"]
try:
dag.generate_dag(_vertices, _edges)
checks.append(True)
except RuntimeError as e:
logger.error(f"check job:{name} failed, error:{e}")
checks.append(False)

return all(checks)

@staticmethod
def parse_job_from_yaml(file_path: str) -> t.Dict[str, t.List[Step]]:
_jobs = load_yaml(file_path)
rt = defaultdict(list)
for k, v in _jobs.items():
rt[k] = [Step(**_v) for _v in v]
return rt
78 changes: 41 additions & 37 deletions client/starwhale/api/_impl/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,42 +256,46 @@ def _starwhale_internal_run_cmp(self) -> None:
self._timeline_writer.write({"time": now, "status": True, "exception": ""})
self._sw_logger.debug(f"cmp result:{output}")

if "summary" in output:
self.evaluation.log_metrics(do_flatten_dict(output["summary"]))
self.evaluation.log_metrics({"kind": output["kind"]})

if "labels" in output:
for i, label in output["labels"].items():
self.evaluation.log("labels", id=i, **label)

if (
"confusion_matrix" in output
and "binarylabel" in output["confusion_matrix"]
):
_binary_label = output["confusion_matrix"]["binarylabel"]
for _label, _probability in enumerate(_binary_label):
self.evaluation.log(
"confusion_matrix/binarylabel",
id=str(_label),
**{str(k): v for k, v in enumerate(_probability)},
)
if "roc_auc" in output:
for _label, _roc_auc in output["roc_auc"].items():
_id = 0
for _fpr, _tpr, _threshold in zip(
_roc_auc["fpr"], _roc_auc["tpr"], _roc_auc["thresholds"]
):
if not output:
self._sw_logger.warning("cmp results is None!")
return
if isinstance(output, dict):
if "summary" in output:
self.evaluation.log_metrics(do_flatten_dict(output["summary"]))
self.evaluation.log_metrics({"kind": output["kind"]})

if "labels" in output:
for i, label in output["labels"].items():
self.evaluation.log("labels", id=i, **label)

if (
"confusion_matrix" in output
and "binarylabel" in output["confusion_matrix"]
):
_binary_label = output["confusion_matrix"]["binarylabel"]
for _label, _probability in enumerate(_binary_label):
self.evaluation.log(
f"roc_auc/{_label}",
id=str(_id),
fpr=_fpr,
tpr=_tpr,
threshold=_threshold,
)
_id += 1
self.evaluation.log(
"roc_auc/summary", id=_label, auc=_roc_auc["auc"]
"confusion_matrix/binarylabel",
id=str(_label),
**{str(k): v for k, v in enumerate(_probability)},
)
if "roc_auc" in output:
for _label, _roc_auc in output["roc_auc"].items():
_id = 0
for _fpr, _tpr, _threshold in zip(
_roc_auc["fpr"], _roc_auc["tpr"], _roc_auc["thresholds"]
):
self.evaluation.log(
f"roc_auc/{_label}",
id=str(_id),
fpr=_fpr,
tpr=_tpr,
threshold=_threshold,
)
_id += 1
self.evaluation.log(
"roc_auc/summary", id=_label, auc=_roc_auc["auc"]
)

@_record_status # type: ignore
def _starwhale_internal_run_ppl(self) -> None:
Expand All @@ -305,6 +309,9 @@ def _starwhale_internal_run_ppl(self) -> None:
dataset_row_start, dataset_row_end = calculate_index(
_dataset.summary().rows, self.context.total, self.context.index
)
self._sw_logger.debug(
f"step:{self.context.step}, ds start from:{dataset_row_start} to:{dataset_row_end}"
)

_data_loader = get_data_loader(
dataset_uri=_dataset_uri,
Expand Down Expand Up @@ -346,9 +353,6 @@ def _starwhale_internal_run_ppl(self) -> None:
exception = None

self._do_record(data, label, exception, *pred)
self._sw_logger.debug(
f"ppl result:{len([item for item in self.evaluation.get_results()])}"
)

def _do_record(
self,
Expand Down
3 changes: 2 additions & 1 deletion client/starwhale/core/eval/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ def _do_run_cmd_in_host(self) -> None:
dataset_uris=[u.full_uri for u in self.dataset_uris],
step_name=self.step,
task_index=self.task_index,
kw=dict(
# other runtime info
base_info=dict(
name=self.name,
desc=self.desc,
model=self.model_uri,
Expand Down
108 changes: 108 additions & 0 deletions client/starwhale/core/job/dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
from typing import Any, Set, Dict, List


class _DAGData:
def __init__(self) -> None:
self.__graph: Dict[Any, Set[Any]] = {}
self.__graph_reverse: Dict[Any, Set[Any]] = {}

def vertices(self) -> Set[Any]:
return set(self.__graph.keys())

def add_vertex(self, vertex: Any) -> None:
if vertex not in self.__graph:
self.__graph[vertex] = set()
self.__graph_reverse[vertex] = set()

def add_edge(self, v_from: Any, v_to: Any) -> None:
self.__graph[v_from].add(v_to)
self.__graph_reverse[v_to].add(v_from)

def remove_edge(self, v_from: Any, v_to: Any) -> None:
self.__graph[v_from].remove(v_to)
self.__graph_reverse[v_to].remove(v_from)

def successors(self, vertex: Any) -> Set[Any]:
return self.__graph[vertex]

def predecessors(self, vertex: Any) -> Set[Any]:
return self.__graph_reverse[vertex]


class DAG:
def __init__(self) -> None:
self.__data = _DAGData()

def _validate_vertex(self, *vertices: Any) -> None:
for vtx in vertices:
if vtx not in self.__data.vertices():
raise RuntimeError(f"Vertex '{vtx}' does not belong to DAG")

def _has_path_to(self, v_from: Any, v_to: Any) -> bool:
if v_from == v_to:
return True
for vtx in self.__data.successors(v_from):
if self._has_path_to(vtx, v_to):
return True
return False

def vertices(self) -> Set[Any]:
return self.__data.vertices()

def add_vertex(self, *vertices: Any) -> None:
for vtx in vertices:
self.__data.add_vertex(vtx)

def add_edge(self, v_from: Any, *v_tos: Any) -> None:
self._validate_vertex(v_from, *v_tos)

for v_to in v_tos:
if self._has_path_to(v_to, v_from):
raise RuntimeError(
f"If this edge from '{v_from}' to '{v_to}' is added, it will cause the graph to cycle"
)
self.__data.add_edge(v_from, v_to)

def remove_edge(self, v_from: Any, v_to: Any) -> None:
self._validate_vertex(v_from, v_to)
if v_to not in self.__data.successors(v_from):
raise RuntimeError(f"Edge not found from '{v_from}' to '{v_to}'")

self.__data.remove_edge(v_from, v_to)

def vertex_size(self) -> int:
return len(self.__data.vertices())

def edge_size(self) -> int:
size = 0
for vtx in self.__data.vertices():
size += self.out_degree(vtx)
return size

def successors(self, vertex: Any) -> Set[Any]:
self._validate_vertex(vertex)
return self.__data.successors(vertex)

def predecessors(self, vertex: Any) -> Set[Any]:
self._validate_vertex(vertex)
return self.__data.predecessors(vertex)

def in_degree(self, vertex: Any) -> int:
return len(self.predecessors(vertex))

def out_degree(self, vertex: Any) -> int:
return len(self.successors(vertex))

def all_starts(self) -> Set[Any]:
return set(vtx for vtx in self.__data.vertices() if self.in_degree(vtx) == 0)

def all_terminals(self) -> Set[Any]:
return set(vtx for vtx in self.__data.vertices() if self.out_degree(vtx) == 0)


def generate_dag(_vertices: List[str], _edges: Dict[str, str]) -> DAG:
_dag = DAG()
_dag.add_vertex(*_vertices)
for _from, _to in _edges.items():
_dag.add_edge(_from, _to)
return _dag
Loading