-
Notifications
You must be signed in to change notification settings - Fork 6.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added StartupToken to idenitfy a process at startup #19014
Conversation
This branch builds successfully and behaving as desired with |
opps sorry I misclicked, please adjust to draft PR if not intended. |
src/ray/core_worker/core_worker.h
Outdated
@@ -186,6 +187,7 @@ struct CoreWorkerOptions { | |||
int runtime_env_hash; | |||
/// The PID of the process for setup worker runtime env. | |||
pid_t worker_shim_pid; | |||
StartupToken startup_token; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a comment above describing what this does :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will add documentation comments at all the necessary places once I complete testing. There are some internal API changes so tests are required to be updated. Thanks.
src/ray/raylet/worker.h
Outdated
@@ -27,6 +27,12 @@ | |||
#include "ray/rpc/worker/core_worker_client.h" | |||
#include "ray/util/process.h" | |||
|
|||
#ifdef _WIN32 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should get rid of this typedef. Why not make it int64_t everywhere?
src/ray/raylet/worker_pool.cc
Outdated
@@ -370,7 +371,11 @@ Process WorkerPool::StartWorkerProcess( | |||
<< " worker(s) with pid " << proc.GetId(); | |||
MonitorStartingWorkerProcess(proc, language, worker_type); | |||
state.starting_worker_processes.emplace( | |||
proc, StartingWorkerProcessInfo{workers_to_start, workers_to_start, worker_type}); | |||
startup_token_, | |||
std::make_pair( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you put startup_token into the StartingWorkerProcessInfo? Then we don't need an additional level of indirections :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The core of the fix is using startup_token
as key. I can additionally add it in StartingWorkerProcessInfo
and make necessary changes else where.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah sorry, I meant put proc
into StartingWorkerProcessInfo
and get rid of the std::make_pair (startup_token should be the key yes). It mainly seems odd to use both std::make_pair
here and also have the struct. This will simplify the code elsewhere because then you don't need the additional first
and second
calls.
src/ray/raylet/worker_pool.h
Outdated
@@ -38,6 +39,12 @@ namespace ray { | |||
|
|||
namespace raylet { | |||
|
|||
#ifdef _WIN32 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shouldn't be needed since it is already in src/ray/raylet/worker.h. Just make sure it is in the right namespace there.
@@ -29,6 +29,12 @@ | |||
#include "src/ray/protobuf/common.pb.h" | |||
#include "src/ray/protobuf/gcs.pb.h" | |||
|
|||
#ifdef _WIN32 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually maybe put it in ray/util/process.h
that should be included by everybody who needs it :)
Thanks a lot for fixing this @czgdp1807 :) Can you shepherd this through the process (another round of reviews and making sure tests are fixed etc) @wuisawesome ? |
5b1f8a5
to
da4bf7e
Compare
da4bf7e
to
8045363
Compare
Following tests fail on my branch, FAIL: //:gcs_server_rpc_test
FAIL: //:core_worker_test
FAIL: //:event_test
FAIL: //:gcs_server_test The above tests fail on I have added documentation at necessary places in the diff. This PR is ready from my side. Please let me know if anything else is to be done. cc: @pcmoritz @wuisawesome @dharhas |
@czgdp1807 it looks like the build is failing on all non-windows platforms right now. Can you fix the initialization order?
|
@@ -186,6 +187,9 @@ struct CoreWorkerOptions { | |||
int runtime_env_hash; | |||
/// The PID of the process for setup worker runtime env. | |||
pid_t worker_shim_pid; | |||
/// The startup token of the process assgined to it | |||
/// during startup via command line arguments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// during startup via command line arguments. | |
/// during startup via command line arguments. This is needed because the actual core worker process may not have the same pid as the process the worker pool starts (due to shim processes). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should I change this everywhere or doing it only once here would suffice?
I have made the change @wuisawesome. Thanks for pointing out. |
src/ray/raylet/worker_pool.h
Outdated
@@ -132,6 +133,12 @@ class Worker; | |||
/// The WorkerPool is responsible for managing a pool of Workers. Each Worker | |||
/// is a container for a unit of work. | |||
class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { | |||
protected: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason this should not be private? also as a general guideline, this should be declared after function declaration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need this value in WorkerPoolMock
(inherits WorkerPool
) in worker_pool_test.cc
. The other way to expose this member is via a protected function in WorkerPool
which can be called by WorkerPoolMock
. I do not want to expose startup_token
via public method. Making a protected
data member appeared simpler to me instead of making a protected function and then using it everywhere else. Let me know if you have other ideas.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds reasonable.
src/ray/raylet/worker_pool.h
Outdated
/// Gloabl startup token variable. Incremented once assigned | ||
/// to a worker process and is added to | ||
/// state.starting_worker_processes. | ||
StartupToken startup_token_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: worker_startup_token_counter_;
startup_token_ is a bit confusing.
src/ray/raylet/worker_pool.cc
Outdated
@@ -559,9 +575,10 @@ Status WorkerPool::RegisterWorker(const std::shared_ptr<WorkerInterface> &worker | |||
void WorkerPool::OnWorkerStarted(const std::shared_ptr<WorkerInterface> &worker) { | |||
auto &state = GetStateForLanguage(worker->GetLanguage()); | |||
const auto &shim_process = worker->GetShimProcess(); | |||
const StartupToken &worker_startup_token = worker->GetStartupToken(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't need to be reference.
src/ray/raylet/worker_pool.h
Outdated
@@ -473,7 +484,8 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { | |||
/// A map from the pids of this shim processes to the extra information of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this comment need to be updated.
src/ray/raylet/worker_pool.cc
Outdated
@@ -516,6 +525,7 @@ Status WorkerPool::RegisterWorker(const std::shared_ptr<WorkerInterface> &worker | |||
} | |||
auto process = Process::FromPid(pid); | |||
worker->SetProcess(process); | |||
worker->SetStartupToken(worker_startup_token); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we can move
worker->SetProcess(process);
worker->SetStartupToken(worker_startup_token);
to worker's constructor and make them const member variables we could just get rid of a category of errors
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for helping out on the Windows issues!
src/ray/core_worker/core_worker.h
Outdated
/// This is needed because the actual core worker process | ||
/// may not have the same pid as the process the worker pool | ||
/// starts (due to shim processes). | ||
StartupToken startup_token{-1}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there any special meaning for -1? maybe worth documenting if that's the case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The startup tokens usually start from 0. So, set it to -1 so that first process created gets 0 as the startup token.
src/ray/raylet/worker_pool.cc
Outdated
proc, StartingWorkerProcessInfo{workers_to_start, workers_to_start, worker_type}); | ||
startup_token_, | ||
StartingWorkerProcessInfo{workers_to_start, workers_to_start, worker_type, proc}); | ||
startup_token_ += 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
imo this is a bit fragile... maybe have a helper function called get_next_worker_startup_token
src/ray/raylet/worker_pool.cc
Outdated
@@ -360,6 +366,12 @@ Process WorkerPool::StartWorkerProcess( | |||
env.insert({"SPT_NOENV", "1"}); | |||
} | |||
|
|||
if (language == Language::PYTHON) { | |||
worker_command_args.push_back("--startup-token=" + std::to_string(startup_token_)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a bit sad we have these slight difference between different languanges
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall looks good to me. @wuisawesome takes another look before we merge this? :)
void Worker::SetProcess(Process proc) { | ||
RAY_CHECK(proc_.IsNull()); // this procedure should not be called multiple times | ||
proc_ = std::move(proc); | ||
} | ||
|
||
void Worker::SetStartupToken(StartupToken startup_token) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this only used for test? we can make it private and use FRIEND_TEST/friend to access it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah! right. This can be protected as it isn't required to be exposed publicly. What do you say?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sgtm :)
@pcmoritz |
Hi @davidberenstein1957. I never saw this error before. Can I try re-producing it on my system and investigate if it's somehow related to the changes we have made here. AFAIK, the changes here only deal with worker processes and not agent processes. There are two things that I will do for re-producing this error - i) check on |
@czgdp1807
SCRIPT:
RUN APP |
I get a comparable message when enabling GPU without full 'ray[default]' installment, even though I disabled dashboard usage. Same code as above, but with a initialization with GPU.
|
I reformatted your error message to make the error clearer. You should install |
This comment has been minimized.
This comment has been minimized.
@davidberenstein1957 Thanks for reporting! I just went through your script. There were a few typos, but after fixing them (see attached), on my machine the problem was #19948, and after running In the case that only Glad to know the issue is unrelated to this PR though (which makes sense, since the PID of the agent is actually handled correctly and doesn't suffer from the issue that was fixed in this PR, as it calls os.pid() directly in the relevant process instead of trying to get the PID of the child). For anybody curious how the aiohttp issue could cause this: Basically Please let me know if the problem persists for you even after pinning aiohttp!
|
PR #19014 introduced the idea of a StartupToken to uniquely identify a worker via a counter. This PR: - returns the Process and the StartupToken from StartWorkerProcess (previously only Process was returned) - Change the starting_workers_to_tasks map to index via the StartupToken, which seems to fix the windows failures. - Unskip the windows tests in test_basic_2.py It seems once a fix to PR #18167 goes in, the starting_workers_to_tasks map will be removed, which should remove the need for the changes to StartWorkerProcess made in this PR.
Why are these changes needed?
The changes in this PR introduce
StartupToken
to uniquely identify a process at startup. This helps in avoiding conflicts due to differing PIDs on Windows due to wrapper processes. For more details please refer #18951 (comment)After this PR,
ray.init()
completes without hanging in virtual and conda environments.Related issue number
Closes #18951
Closes #18952
#15970
Checks
scripts/format.sh
to lint the changes in this PR.cc: @dharhas @mattip