From e9689ed68b478971c1f92cf7854f0f4e86c58d03 Mon Sep 17 00:00:00 2001 From: Chen Shen Date: Sun, 22 Jan 2023 10:41:20 -0800 Subject: [PATCH 01/49] [Core][run_function_on_all_workers] deflake run_function_on_all_workers and reenable test (#31838) run_function_on_all_workers importing requires job_id to run properly. after #30883 the worker might not have job_id when startup, which lead to run_function_on_all_workers failed to be executed on start up. to fix it, we defer the import_thread start up until job_config is initialized. --- python/ray/_private/import_thread.py | 30 +++++++++---- python/ray/_private/worker.py | 19 +++++++-- python/ray/_raylet.pyx | 64 +++++++++++++++------------- python/ray/tests/test_advanced.py | 2 +- python/ray/tests/test_basic_5.py | 4 +- python/ray/tests/test_failure.py | 15 ++++++- 6 files changed, 89 insertions(+), 45 deletions(-) diff --git a/python/ray/_private/import_thread.py b/python/ray/_private/import_thread.py index 271aace307983..1da268c39e24e 100644 --- a/python/ray/_private/import_thread.py +++ b/python/ray/_private/import_thread.py @@ -7,6 +7,7 @@ import ray import ray._private.profiling as profiling +from ray import JobID from ray import cloudpickle as pickle from ray._private import ray_constants @@ -42,26 +43,33 @@ def __init__(self, worker, mode, threads_stopped): self.num_imported = 0 # Protect writes to self.num_imported. self._lock = threading.Lock() + # Protect start and join of import thread. + self._thread_spawn_lock = threading.Lock() # Try to load all FunctionsToRun so that these functions will be # run before accepting tasks. self._do_importing() def start(self): """Start the import thread.""" - self.t = threading.Thread(target=self._run, name="ray_import_thread") - # Making the thread a daemon causes it to exit - # when the main thread exits. - self.t.daemon = True - self.t.start() + with self._thread_spawn_lock: + if self.t is not None: + return + self.t = threading.Thread(target=self._run, name="ray_import_thread") + # Making the thread a daemon causes it to exit + # when the main thread exits. + self.t.daemon = True + self.t.start() def join_import_thread(self): """Wait for the thread to exit.""" - if self.t: - self.t.join() + with self._thread_spawn_lock: + if self.t: + self.t.join() def _run(self): try: - self._do_importing() + if not self.threads_stopped.is_set(): + self._do_importing() while True: # Exit if we received a signal that we should stop. if self.threads_stopped.is_set(): @@ -79,10 +87,14 @@ def _run(self): self.subscriber.close() def _do_importing(self): + job_id = self.worker.current_job_id + if job_id == JobID.nil(): + return + while True: with self._lock: export_key = ray._private.function_manager.make_export_key( - self.num_imported + 1, self.worker.current_job_id + self.num_imported + 1, job_id ) key = self.gcs_client.internal_kv_get( export_key, ray_constants.KV_NAMESPACE_FUNCTION_TABLE diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index be924ca9241d6..5148d8eda15c5 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -2097,13 +2097,13 @@ def connect( " and will be removed in the future." ) - # Start the import thread + # Setup import thread, but defer the start up of + # import thread until job_config is initialized. + # (python/ray/_raylet.pyx maybe_initialize_job_config) if mode not in (RESTORE_WORKER_MODE, SPILL_WORKER_MODE): worker.import_thread = import_thread.ImportThread( worker, mode, worker.threads_stopped ) - if ray._raylet.Config.start_python_importer_thread(): - worker.import_thread.start() # If this is a driver running in SCRIPT_MODE, start a thread to print error # messages asynchronously in the background. Ideally the scheduler would @@ -2194,6 +2194,19 @@ def disconnect(exiting_interpreter=False): ray_actor._ActorClassMethodMetadata.reset_cache() +def start_import_thread(): + """Start the import thread if the worker is connected.""" + worker = global_worker + worker.check_connected() + + assert _mode() not in ( + RESTORE_WORKER_MODE, + SPILL_WORKER_MODE, + ), "import thread can not be used in IO workers." + if worker.import_thread and ray._raylet.Config.start_python_importer_thread(): + worker.import_thread.start() + + @contextmanager def _changeproctitle(title, next_title): if _mode() is not LOCAL_MODE: diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 1e2dccb927bb2..9b14da45d3d18 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -166,6 +166,7 @@ current_task_id = None current_task_id_lock = threading.Lock() job_config_initialized = False +job_config_initialization_lock = threading.Lock() class ObjectRefGenerator: @@ -1389,35 +1390,40 @@ cdef void unhandled_exception_handler(const CRayObject& error) nogil: def maybe_initialize_job_config(): - global job_config_initialized - if job_config_initialized: - return - # Add code search path to sys.path, set load_code_from_local. - core_worker = ray._private.worker.global_worker.core_worker - code_search_path = core_worker.get_job_config().code_search_path - load_code_from_local = False - if code_search_path: - load_code_from_local = True - for p in code_search_path: - if os.path.isfile(p): - p = os.path.dirname(p) - sys.path.insert(0, p) - ray._private.worker.global_worker.set_load_code_from_local(load_code_from_local) - - # Add driver's system path to sys.path - py_driver_sys_path = core_worker.get_job_config().py_driver_sys_path - if py_driver_sys_path: - for p in py_driver_sys_path: - sys.path.insert(0, p) - job_config_initialized = True - - # Record the task name via :task_name: magic token in the log file. - # This is used for the prefix in driver logs `(task_name pid=123) ...` - job_id_magic_token = "{}{}\n".format( - ray_constants.LOG_PREFIX_JOB_ID, core_worker.get_current_job_id().hex()) - # Print on both .out and .err - print(job_id_magic_token, end="") - print(job_id_magic_token, file=sys.stderr, end="") + with job_config_initialization_lock: + global job_config_initialized + if job_config_initialized: + return + # Add code search path to sys.path, set load_code_from_local. + core_worker = ray._private.worker.global_worker.core_worker + code_search_path = core_worker.get_job_config().code_search_path + load_code_from_local = False + if code_search_path: + load_code_from_local = True + for p in code_search_path: + if os.path.isfile(p): + p = os.path.dirname(p) + sys.path.insert(0, p) + ray._private.worker.global_worker.set_load_code_from_local(load_code_from_local) + + # Add driver's system path to sys.path + py_driver_sys_path = core_worker.get_job_config().py_driver_sys_path + if py_driver_sys_path: + for p in py_driver_sys_path: + sys.path.insert(0, p) + + # Record the task name via :task_name: magic token in the log file. + # This is used for the prefix in driver logs `(task_name pid=123) ...` + job_id_magic_token = "{}{}\n".format( + ray_constants.LOG_PREFIX_JOB_ID, core_worker.get_current_job_id().hex()) + # Print on both .out and .err + print(job_id_magic_token, end="") + print(job_id_magic_token, file=sys.stderr, end="") + + # Only start import thread after job_config is initialized + ray._private.worker.start_import_thread() + + job_config_initialized = True # This function introduces ~2-7us of overhead per call (i.e., it can be called diff --git a/python/ray/tests/test_advanced.py b/python/ray/tests/test_advanced.py index 104714334865c..447d8cbabbddd 100644 --- a/python/ray/tests/test_advanced.py +++ b/python/ray/tests/test_advanced.py @@ -193,7 +193,7 @@ def f(worker_info): ray._private.worker.global_worker.run_function_on_all_workers(f) -@pytest.mark.skip(reason="Flaky tests") +@pytest.mark.skipif(client_test_enabled(), reason="internal api") def test_running_function_on_all_workers(ray_start_regular): def f(worker_info): sys.path.append("fake_directory") diff --git a/python/ray/tests/test_basic_5.py b/python/ray/tests/test_basic_5.py index 083df25b53c91..5571bc5277448 100644 --- a/python/ray/tests/test_basic_5.py +++ b/python/ray/tests/test_basic_5.py @@ -142,7 +142,7 @@ def pid(self): assert "Traceback" not in log -@pytest.mark.skipif(True, reason="run_function_on_all_workers doesn't work") +@pytest.mark.skipif(sys.platform == "win32", reason="Flaky on windows") def test_run_on_all_workers(call_ray_start, tmp_path): # This test is to ensure run_function_on_all_workers are executed # on all workers. @@ -247,7 +247,7 @@ def get_kv_metrics(): ???? # unknown """ # !!!If you want to increase this number, please let ray-core knows this!!! - assert freqs["internal_kv_get"] == 5 + assert freqs["internal_kv_get"] == 4 if __name__ == "__main__": diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index 6abc9d9592f61..71bb7a98dd9ad 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -125,7 +125,6 @@ def expect_exception(objects, exception): ray.get(signal2.send.remote()) -@pytest.mark.skipif(True, reason="run_function_on_all_workers doesn't work") def test_failed_function_to_run(ray_start_2_cpus, error_pubsub): p = error_pubsub @@ -134,6 +133,20 @@ def f(worker): raise Exception("Function to run failed.") ray._private.worker.global_worker.run_function_on_all_workers(f) + + @ray.remote + class Actor: + def foo(self): + pass + + # Functions scheduled through run_function_on_all_workers only + # executes on workers binded with current driver's job_id. + # Since the 2 prestarted workers lazily bind to job_id until the first + # task/actor executed, we need to schedule two actors to trigger + # prestart functions. + actors = [Actor.remote() for _ in range(2)] + ray.get([actor.foo.remote() for actor in actors]) + # Check that the error message is in the task info. errors = get_error_message(p, 2, ray_constants.FUNCTION_TO_RUN_PUSH_ERROR) assert len(errors) == 2 From c8c1da7cbf2d31e46fd7cbfced7c255a469e4f74 Mon Sep 17 00:00:00 2001 From: Dmitri Gekhtman <62982571+DmitriGekhtman@users.noreply.github.com> Date: Sun, 22 Jan 2023 10:57:13 -0800 Subject: [PATCH 02/49] [docs] Update reference K8s configuration in large cluster deployment guide. (#31307) Fixes an outdated K8s configuration reference in the large cluster deployment guide. Signed-off-by: Dmitri Gekhtman --- doc/source/cluster/kubernetes/user-guides/config.md | 1 + .../vms/user-guides/large-cluster-best-practices.rst | 11 +++++++---- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/doc/source/cluster/kubernetes/user-guides/config.md b/doc/source/cluster/kubernetes/user-guides/config.md index 3dd7b14f0d56e..49603e20753b6 100644 --- a/doc/source/cluster/kubernetes/user-guides/config.md +++ b/doc/source/cluster/kubernetes/user-guides/config.md @@ -212,6 +212,7 @@ For most use-cases, this field should be set to "0.0.0.0" for the Ray head pod. This is required to expose the Ray dashboard outside the Ray cluster. (Future versions might set this parameter by default.) +(kuberay-num-cpus)= ### num-cpus This optional field tells the Ray scheduler and autoscaler how many CPUs are available to the Ray pod. The CPU count can be autodetected from the diff --git a/doc/source/cluster/vms/user-guides/large-cluster-best-practices.rst b/doc/source/cluster/vms/user-guides/large-cluster-best-practices.rst index 312e785886044..49a031bef4dfd 100644 --- a/doc/source/cluster/vms/user-guides/large-cluster-best-practices.rst +++ b/doc/source/cluster/vms/user-guides/large-cluster-best-practices.rst @@ -56,10 +56,13 @@ architecture means that the head node will have extra stress due to GCS. resource on the head node is outbound bandwidth. For large clusters (see the scalability envelope), we recommend using machines networking characteristics at least as good as an r5dn.16xlarge on AWS EC2. -* Set ``resources: {"CPU": 0}`` on the head node. (For Ray clusters deployed using Helm, - set ``rayResources: {"CPU": 0}``.) Due to the heavy networking - load (and the GCS and dashboard processes), we recommend setting the number of - CPUs to 0 on the head node to avoid scheduling additional tasks on it. +* Set ``resources: {"CPU": 0}`` on the head node. + (For Ray clusters deployed using KubeRay, + set ``rayStartParams: {"num-cpus": "0"}``. + See the :ref:`configuration guide for KubeRay clusters `.) + Due to the heavy networking load (and the GCS and dashboard processes), we + recommend setting the number of CPUs to 0 on the head node to avoid + scheduling additional tasks on it. Configuring the autoscaler ^^^^^^^^^^^^^^^^^^^^^^^^^^ From cdb37800cc639f7c0291bca54882ca9d95f80d8e Mon Sep 17 00:00:00 2001 From: Alan Guo Date: Sun, 22 Jan 2023 16:23:39 -0800 Subject: [PATCH 03/49] Polish the new IA for dashboard (#31770) Signed-off-by: Alan Guo Update to more closely match the design spec. --- dashboard/client/src/App.tsx | 11 ++++- .../client/src/common/CollapsibleSection.tsx | 48 +++++++++++++++---- dashboard/client/src/pages/job/JobDetail.tsx | 8 +++- .../src/pages/job/JobDetailActorPage.tsx | 37 ++++++++++++++ .../client/src/pages/job/JobDetailLayout.tsx | 12 ++++- .../client/src/pages/layout/MainNavLayout.tsx | 19 +++++--- dashboard/client/src/pages/log/Logs.tsx | 2 +- .../client/src/pages/metrics/Metrics.tsx | 32 +++++++++---- .../src/pages/overview/OverviewPage.tsx | 15 ++++-- .../pages/overview/cards/NodeCountCard.tsx | 1 + .../src/pages/overview/cards/OverviewCard.tsx | 5 +- .../cards/RecentJobsCard.component.test.tsx | 2 +- .../pages/overview/cards/RecentJobsCard.tsx | 12 +++-- dashboard/client/src/theme.ts | 16 +++++++ .../metrics/grafana_dashboard_base.json | 2 +- 15 files changed, 180 insertions(+), 42 deletions(-) create mode 100644 dashboard/client/src/pages/job/JobDetailActorPage.tsx diff --git a/dashboard/client/src/App.tsx b/dashboard/client/src/App.tsx index c48be1456e9f8..302a9afe3985c 100644 --- a/dashboard/client/src/App.tsx +++ b/dashboard/client/src/App.tsx @@ -8,6 +8,7 @@ import Events from "./pages/event/Events"; import Loading from "./pages/exception/Loading"; import JobList, { NewIAJobsPage } from "./pages/job"; import { JobDetailChartsPage } from "./pages/job/JobDetail"; +import { JobDetailActorsPage } from "./pages/job/JobDetailActorPage"; import { JobDetailInfoPage } from "./pages/job/JobDetailInfoPage"; import { JobDetailLayout } from "./pages/job/JobDetailLayout"; import { DEFAULT_VALUE, MainNavContext } from "./pages/layout/mainNavContext"; @@ -205,11 +206,19 @@ const App = () => { - + } path="" /> + + + + } + path="actors" + /> } path="logs"> diff --git a/dashboard/client/src/common/CollapsibleSection.tsx b/dashboard/client/src/common/CollapsibleSection.tsx index 879ce6bf692c0..2f42d82ff7d6e 100644 --- a/dashboard/client/src/common/CollapsibleSection.tsx +++ b/dashboard/client/src/common/CollapsibleSection.tsx @@ -1,6 +1,8 @@ import { createStyles, makeStyles, Typography } from "@material-ui/core"; -import React, { PropsWithChildren, useState } from "react"; -import { RiArrowDownSLine, RiArrowUpSLine } from "react-icons/ri"; +import classNames from "classnames"; +import React, { PropsWithChildren, useEffect, useState } from "react"; +import { RiArrowDownSLine, RiArrowRightSLine } from "react-icons/ri"; +import { ClassNameProps } from "./props"; const useStyles = makeStyles((theme) => createStyles({ @@ -10,6 +12,7 @@ const useStyles = makeStyles((theme) => flexWrap: "nowrap", alignItems: "center", fontWeight: 500, + cursor: "pointer", }, icon: { marginRight: theme.spacing(1), @@ -17,25 +20,42 @@ const useStyles = makeStyles((theme) => height: 24, }, body: { - marginTop: theme.spacing(3), + marginTop: theme.spacing(1), + }, + bodyHidden: { + display: "none", }, }), ); -type CollapsibleSectionProps = PropsWithChildren<{ - title: string; - startExpanded?: boolean; - className?: string; -}>; +type CollapsibleSectionProps = PropsWithChildren< + { + title: string; + startExpanded?: boolean; + /** + * An optimization to not avoid re-rendering the contents of the collapsible section. + * When enabled, we will keep the content around when collapsing but hide it via css. + */ + keepRendered?: boolean; + } & ClassNameProps +>; export const CollapsibleSection = ({ title, startExpanded = false, className, children, + keepRendered, }: CollapsibleSectionProps) => { const classes = useStyles(); const [expanded, setExpanded] = useState(startExpanded); + const [rendered, setRendered] = useState(expanded); + + useEffect(() => { + if (expanded) { + setRendered(true); + } + }, [expanded]); const handleExpandClick = () => { setExpanded(!expanded); @@ -51,11 +71,19 @@ export const CollapsibleSection = ({ {expanded ? ( ) : ( - + )} {title} - {expanded &&
{children}
} + {(expanded || (keepRendered && rendered)) && ( +
+ {children} +
+ )} ); }; diff --git a/dashboard/client/src/pages/job/JobDetail.tsx b/dashboard/client/src/pages/job/JobDetail.tsx index cddca7c6761c9..8f0b6cdf1c21d 100644 --- a/dashboard/client/src/pages/job/JobDetail.tsx +++ b/dashboard/client/src/pages/job/JobDetail.tsx @@ -25,7 +25,13 @@ const useStyle = makeStyles((theme) => ({ }, })); -export const JobDetailChartsPage = () => { +type JobDetailChartsPageProps = { + newIA?: boolean; +}; + +export const JobDetailChartsPage = ({ + newIA = false, +}: JobDetailChartsPageProps) => { const classes = useStyle(); const { job, msg, params } = useJobDetail(); const jobId = params.id; diff --git a/dashboard/client/src/pages/job/JobDetailActorPage.tsx b/dashboard/client/src/pages/job/JobDetailActorPage.tsx new file mode 100644 index 0000000000000..c0fa8735850cb --- /dev/null +++ b/dashboard/client/src/pages/job/JobDetailActorPage.tsx @@ -0,0 +1,37 @@ +import { makeStyles } from "@material-ui/core"; +import React from "react"; + +import TitleCard from "../../components/TitleCard"; +import ActorList from "../actor/ActorList"; +import { MainNavPageInfo } from "../layout/mainNavContext"; +import { useJobDetail } from "./hook/useJobDetail"; + +const useStyle = makeStyles((theme) => ({ + root: { + padding: theme.spacing(2), + }, +})); + +export const JobDetailActorsPage = () => { + const classes = useStyle(); + const { job, params } = useJobDetail(); + + const pageInfo = job + ? { + title: "Actors", + id: "actors", + path: job.job_id ? `/new/jobs/${job.job_id}/actors` : undefined, + } + : { + title: "Actors", + id: "actors", + path: undefined, + }; + + return ( +
+ + {} +
+ ); +}; diff --git a/dashboard/client/src/pages/job/JobDetailLayout.tsx b/dashboard/client/src/pages/job/JobDetailLayout.tsx index 3c65ccc6b42a6..f87915e451308 100644 --- a/dashboard/client/src/pages/job/JobDetailLayout.tsx +++ b/dashboard/client/src/pages/job/JobDetailLayout.tsx @@ -1,5 +1,9 @@ import React from "react"; -import { RiInformationLine, RiLineChartLine } from "react-icons/ri"; +import { + RiGradienterLine, + RiInformationLine, + RiLineChartLine, +} from "react-icons/ri"; import { MainNavPageInfo } from "../layout/mainNavContext"; import { SideTabLayout, SideTabRouteLink } from "../layout/SideTabLayout"; import { useJobDetail } from "./hook/useJobDetail"; @@ -29,6 +33,12 @@ export const JobDetailLayout = () => { title="Charts" Icon={RiLineChartLine} /> + ); }; diff --git a/dashboard/client/src/pages/layout/MainNavLayout.tsx b/dashboard/client/src/pages/layout/MainNavLayout.tsx index 6a001a29eb3b7..85dabc6d97a35 100644 --- a/dashboard/client/src/pages/layout/MainNavLayout.tsx +++ b/dashboard/client/src/pages/layout/MainNavLayout.tsx @@ -88,13 +88,14 @@ const useMainNavBarStyles = makeStyles((theme) => boxShadow: "0px 1px 0px #D2DCE6", }, logo: { - width: 60, display: "flex", justifyContent: "center", + marginLeft: theme.spacing(2), + marginRight: theme.spacing(3), }, navItem: { - marginRight: theme.spacing(2), - fontSize: "1em", + marginRight: theme.spacing(6), + fontSize: "1rem", fontWeight: 500, color: "black", textDecoration: "none", @@ -211,15 +212,21 @@ const MainNavBreadcrumbs = () => { ); if (index === 0) { return ( - + {linkOrText} ); } else { return ( - {"/"} - + + {"/"} + + {linkOrText} diff --git a/dashboard/client/src/pages/log/Logs.tsx b/dashboard/client/src/pages/log/Logs.tsx index 6501a0c8b7786..7a36a1e23c64c 100644 --- a/dashboard/client/src/pages/log/Logs.tsx +++ b/dashboard/client/src/pages/log/Logs.tsx @@ -126,7 +126,7 @@ const Logs = (props: LogsProps) => { setEnd, } = useLogs(props); const { newIA } = props; - let href = newIA ? "#/new/log/" : "#/log/"; + let href = newIA ? "#/new/logs/" : "#/log/"; if (origin) { if (path) { diff --git a/dashboard/client/src/pages/metrics/Metrics.tsx b/dashboard/client/src/pages/metrics/Metrics.tsx index 554990b3250ef..8610f6961622e 100644 --- a/dashboard/client/src/pages/metrics/Metrics.tsx +++ b/dashboard/client/src/pages/metrics/Metrics.tsx @@ -20,9 +20,21 @@ const useStyles = makeStyles((theme) => display: "flex", flexDirection: "row", flexWrap: "wrap", + gap: theme.spacing(3), + }, + chart: { + flex: "1 0 448px", + maxWidth: "100%", + height: 300, + overflow: "hidden", + [theme.breakpoints.up("md")]: { + // Calculate max width based on 1/3 of the total width minus padding between cards + maxWidth: `calc((100% - ${theme.spacing(3)}px * 2) / 3)`, + }, }, grafanaEmbed: { - margin: theme.spacing(1), + width: "100%", + height: "100%", }, topBar: { position: "sticky", @@ -213,15 +225,15 @@ export const Metrics = () => {
{METRICS_CONFIG.map(({ title, path }) => ( -