Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use GCSClient instead of PythonGCSClient #211

Merged
merged 27 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
2bfafa3
Use GCSClient instead of PythonGCSClient
glennmoy Oct 18, 2023
f342fa0
delete .vscode settings
glennmoy Oct 20, 2023
5e4812c
Delete tests that no longer apply
glennmoy Oct 20, 2023
feeec48
Delete timedwait_for_function
glennmoy Oct 20, 2023
dbc760f
add back test
glennmoy Oct 20, 2023
7ec1c81
add back function manager tests
glennmoy Oct 20, 2023
daf3da2
formatting
glennmoy Oct 20, 2023
6ad9dc0
add comments with links to src code
glennmoy Oct 23, 2023
6b98717
add finalizer to FunctionManager
glennmoy Oct 23, 2023
46e1d64
add Connect-do syntax
glennmoy Oct 23, 2023
b92b437
connect to GCSClient in FunctionManager constructor
glennmoy Oct 23, 2023
0726fef
Revert "connect to GCSClient in FunctionManager constructor"
glennmoy Oct 24, 2023
b63369f
return body()
glennmoy Oct 24, 2023
2cf2529
call check_status
glennmoy Oct 24, 2023
9c1edd6
add finalizer call in _init_global_function_manager
glennmoy Oct 24, 2023
123a7dc
Disconnect from unconnected client should no-op
glennmoy Oct 24, 2023
9e15271
Apply suggestions from code review
glennmoy Oct 23, 2023
dff8b4f
call finalizer before check_status
glennmoy Oct 25, 2023
9862a20
delete finalizer from the FunctionManager constructor
glennmoy Oct 25, 2023
c49e9e3
function manager needs to be mutable
glennmoy Oct 25, 2023
a35ccc0
Merge branch 'main' into gm/gcsclient
omus Oct 25, 2023
cc8f43a
Disconnect GCS client in function manager tests
omus Oct 26, 2023
49de1b5
Merge branch 'main' into gm/gcsclient
omus Oct 26, 2023
7f6d315
Revert "Disconnect GCS client in function manager tests"
omus Oct 26, 2023
0a8046b
Add destructor to JuliaGcsClient
omus Oct 26, 2023
492845b
fixup! Add destructor to JuliaGcsClient
omus Oct 26, 2023
9bb6921
Disconnect GCS client in function manager tests
omus Oct 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 30 additions & 19 deletions build/wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -257,62 +257,72 @@ JuliaGcsClient::JuliaGcsClient(const std::string &gcs_address) {
}

Status JuliaGcsClient::Connect() {
gcs_client_ = std::make_unique<gcs::PythonGcsClient>(options_);
return gcs_client_->Connect();
// https://github.com/beacon-biosignals/ray/blob/448a83caf44108fc1bc44fa7c6c358cffcfcb0d7/src/ray/gcs/gcs_client/test/usage_stats_client_test.cc#L34-L41
io_service_ = std::make_unique<instrumented_io_context>();
io_service_thread_ = std::make_unique<std::thread>([this] {
std::unique_ptr<boost::asio::io_service::work> work(
new boost::asio::io_service::work(*io_service_));
io_service_->run();
});
gcs_client_ = std::make_unique<gcs::GcsClient>(options_);
return gcs_client_->Connect(*io_service_);
}

void JuliaGcsClient::Disconnect() {
if (!gcs_client_){
return;
}
io_service_->stop();
io_service_thread_->join();
gcs_client_->Disconnect();
gcs_client_.reset();
}

std::string JuliaGcsClient::Get(const std::string &ns,
const std::string &key,
int64_t timeout_ms) {
std::string JuliaGcsClient::Get(const std::string &ns, const std::string &key) {
if (!gcs_client_) {
throw std::runtime_error("GCS client not initialized; did you forget to Connect?");
}
std::string value;
Status status = gcs_client_->InternalKVGet(ns, key, timeout_ms, value);
Status status = gcs_client_->InternalKV().Get(ns, key, value);
if (!status.ok()) {
throw std::runtime_error(status.ToString());
}
return value;
}

int JuliaGcsClient::Put(const std::string &ns,
glennmoy marked this conversation as resolved.
Show resolved Hide resolved
bool JuliaGcsClient::Put(const std::string &ns,
const std::string &key,
const std::string &value,
bool overwrite,
int64_t timeout_ms) {
bool overwrite) {
if (!gcs_client_) {
throw std::runtime_error("GCS client not initialized; did you forget to Connect?");
}
int added_num;
Status status = gcs_client_->InternalKVPut(ns, key, value, overwrite, timeout_ms, added_num);
bool added_num;
Status status = gcs_client_->InternalKV().Put(ns, key, value, overwrite, added_num);
if (!status.ok()) {
throw std::runtime_error(status.ToString());
}
return added_num;
}

std::vector<std::string> JuliaGcsClient::Keys(const std::string &ns,
const std::string &prefix,
int64_t timeout_ms) {
std::vector<std::string> JuliaGcsClient::Keys(const std::string &ns, const std::string &prefix) {
if (!gcs_client_) {
throw std::runtime_error("GCS client not initialized; did you forget to Connect?");
}
std::vector<std::string> results;
Status status = gcs_client_->InternalKVKeys(ns, prefix, timeout_ms, results);
Status status = gcs_client_->InternalKV().Keys(ns, prefix, results);
glennmoy marked this conversation as resolved.
Show resolved Hide resolved
if (!status.ok()) {
throw std::runtime_error(status.ToString());
}
return results;
}

bool JuliaGcsClient::Exists(const std::string &ns,
const std::string &key,
int64_t timeout_ms) {
bool JuliaGcsClient::Exists(const std::string &ns, const std::string &key) {
if (!gcs_client_) {
throw std::runtime_error("GCS client not initialized; did you forget to Connect?");
}
bool exists;
Status status = gcs_client_->InternalKVExists(ns, key, timeout_ms, exists);
Status status = gcs_client_->InternalKV().Exists(ns, key, exists);
if (!status.ok()) {
throw std::runtime_error(status.ToString());
}
Expand Down Expand Up @@ -715,6 +725,7 @@ JLCXX_MODULE define_julia_module(jlcxx::Module& mod)
mod.add_type<JuliaGcsClient>("JuliaGcsClient")
.constructor<const std::string&>()
.method("Connect", &JuliaGcsClient::Connect)
.method("Disconnect", &JuliaGcsClient::Disconnect)
.method("Put", &JuliaGcsClient::Put)
.method("Get", &JuliaGcsClient::Get)
.method("Keys", &JuliaGcsClient::Keys)
Expand Down
46 changes: 27 additions & 19 deletions build/wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,34 +23,42 @@ using ray::core::TaskOptions;
using ray::core::WorkerType;
std::string ToString(ray::FunctionDescriptor function_descriptor);

// a wrapper class to manage the IO service + thread that the GcsClient needs.
// we may want to use the PythonGcsClient however, which does not do async
// operations in separate threads as far as I can tell...in which case we would
// not need this wrapper at all.
// JuliaGCSClient is a wrapper class to manage the IO service + thread that the GcsClient needs.
// https://github.com/beacon-biosignals/ray/blob/448a83caf44108fc1bc44fa7c6c358cffcfcb0d7/src/ray/gcs/gcs_client/gcs_client.h#L61-L87
// Note that connection timeout information, etc. is parsed from the RayConfig parameters:
// https://github.com/ray-project/ray/blob/e060dc2de2251cf883de9b43dda9c73e448e6cbd/src/ray/common/ray_config_def.h
// these can be overwritten via environment variables before initializing the RayConfig / GCSClient, e.g.
// ENV["RAY_gcs_server_request_timeout_seconds"] = 10
// Timeouts that may be useful to configure:
// - gcs_rpc_server_reconnect_timeout_s (default 5): timeout for connecting to GCSClient
// - gcs_rpc_server_reconnect_timeout_s (default 60): timeout for reconnecting to GCSClient
// - gcs_server_request_timeout_seconds (default 60): timeout for fetching from GCSClient
class JuliaGcsClient {
public:
JuliaGcsClient(const ray::gcs::GcsClientOptions &options);
JuliaGcsClient(const std::string &gcs_address);

ray::Status Connect();

std::string Get(const std::string &ns,
const std::string &key,
int64_t timeout_ms);
int Put(const std::string &ns,
void Disconnect();

// Get, Put, Exists, Keys use methods belonging to an InternalKV field of the GCSClient
// https://github.com/beacon-biosignals/ray/blob/448a83caf44108fc1bc44fa7c6c358cffcfcb0d7/src/ray/gcs/gcs_client/accessor.h#L687
std::string Get(const std::string &ns, const std::string &key);

bool Put(const std::string &ns,
const std::string &key,
const std::string &val,
bool overwrite,
int64_t timeout_ms);
std::vector<std::string> Keys(const std::string &ns,
const std::string &prefix,
int64_t timeout_ms);
bool Exists(const std::string &ns,
const std::string &key,
int64_t timeout_ms);

std::unique_ptr<ray::gcs::PythonGcsClient> gcs_client_;
const std::string &value,
bool overwrite);

std::vector<std::string> Keys(const std::string &ns, const std::string &prefix);

bool Exists(const std::string &ns, const std::string &key);

std::unique_ptr<ray::gcs::GcsClient> gcs_client_;
ray::gcs::GcsClientOptions options_;
std::unique_ptr<instrumented_io_context> io_service_;
std::unique_ptr<std::thread> io_service_thread_;
};

JLCXX_MODULE define_julia_module(jlcxx::Module& mod);
35 changes: 13 additions & 22 deletions src/function_manager.jl
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,26 @@ end
# so "jlfun" seems reasonable
const FUNCTION_MANAGER_NAMESPACE = "jlfun"

Base.@kwdef struct FunctionManager
Base.@kwdef mutable struct FunctionManager
gcs_client::ray_jll.JuliaGcsClient
functions::Dict{String,Any}

function FunctionManager(gcs_client, functions)
fm = new(gcs_client, functions)
f(x) = ray_jll.Disconnect(x.gcs_client)
return finalizer(f, fm)
end
end
glennmoy marked this conversation as resolved.
Show resolved Hide resolved

const FUNCTION_MANAGER = Ref{FunctionManager}()

function _init_global_function_manager(gcs_address)
@info "Connecting function manager to GCS at $gcs_address..."
gcs_client = ray_jll.JuliaGcsClient(gcs_address)
ray_jll.Connect(gcs_client)
status = ray_jll.Connect(gcs_client)
ray_jll.check_status(status)
finalizer(ray_jll.Disconnect, gcs_client)
glennmoy marked this conversation as resolved.
Show resolved Hide resolved
FUNCTION_MANAGER[] = FunctionManager(; gcs_client, functions=Dict{String,Any}())
glennmoy marked this conversation as resolved.
Show resolved Hide resolved

return nothing
end

Expand All @@ -82,30 +89,14 @@ function export_function!(fm::FunctionManager, f, job_id=get_job_id())
@debug "Exporting function to function store:" fd key function_locations
# DFK: I _think_ the string memory may be mangled if we don't `deepcopy`. Not sure but
# it can't hurt
if ray_jll.Exists(fm.gcs_client, FUNCTION_MANAGER_NAMESPACE, deepcopy(key), -1)
if ray_jll.Exists(fm.gcs_client, FUNCTION_MANAGER_NAMESPACE, deepcopy(key))
@debug "Function already present in GCS store:" fd key
else
@debug "Exporting function to GCS store:" fd key
val = base64encode(serialize, f)
check_oversized_function(val, fd)
ray_jll.Put(fm.gcs_client, FUNCTION_MANAGER_NAMESPACE, key, val, true, -1)
end
end

function timedwait_for_function(fm::FunctionManager, fd::ray_jll.JuliaFunctionDescriptor,
job_id=get_job_id(); timeout_s=10)
key = function_key(fd, job_id)
status = try
exists = ray_jll.Exists(fm.gcs_client, FUNCTION_MANAGER_NAMESPACE, key, timeout_s)
exists ? :ok : :timed_out
catch e
if e isa ErrorException && contains(e.msg, "Deadline Exceeded")
return :timed_out
else
rethrow()
end
ray_jll.Put(fm.gcs_client, FUNCTION_MANAGER_NAMESPACE, key, val, true)
end
return status
end

omus marked this conversation as resolved.
Show resolved Hide resolved
# XXX: this will error if the function is not found in the store.
Expand All @@ -118,7 +109,7 @@ function import_function!(fm::FunctionManager, fd::ray_jll.JuliaFunctionDescript
return get!(fm.functions, fd.function_hash) do
key = function_key(fd, job_id)
@debug "Function not found locally, retrieving from function store" fd key
val = ray_jll.Get(fm.gcs_client, FUNCTION_MANAGER_NAMESPACE, key, -1)
val = ray_jll.Get(fm.gcs_client, FUNCTION_MANAGER_NAMESPACE, key)
try
io = IOBuffer()
iob64 = Base64DecodePipe(io)
Expand Down
10 changes: 10 additions & 0 deletions src/ray_julia_jll/common.jl
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,16 @@ function check_status(status::Status)
return nothing
end

function Connect(body, client::JuliaGcsClient)
status = Connect(client)
check_status(status)
try
return body(client)
finally
Disconnect(client)
end
end

#####
##### Function descriptor wrangling
#####
Expand Down
11 changes: 7 additions & 4 deletions test/function_manager.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
@testset "function manager" begin
using Ray: FunctionManager, export_function!, import_function!, timedwait_for_function
using .ray_julia_jll: JuliaGcsClient, Connect, function_descriptor,
using Ray: FUNCTION_MANAGER_NAMESPACE, FunctionManager, function_key, export_function!,
import_function!
using .ray_julia_jll: JuliaGcsClient, Connect, Disconnect, function_descriptor,
JuliaFunctionDescriptor, Exists

client = JuliaGcsClient("127.0.0.1:6379")
Expand All @@ -14,14 +15,16 @@
export_function!(fm, f, jobid)

fd = function_descriptor(f)
@test timedwait_for_function(fm, fd, jobid; timeout_s=10) == :ok
key = function_key(fd, jobid)
@test ray_jll.Exists(fm.gcs_client, FUNCTION_MANAGER_NAMESPACE, key)
f2 = import_function!(fm, fd, jobid)

@test f2.(1:10) == f.(1:10)

mfd = function_descriptor(MyMod.f)
@test_throws ErrorException import_function!(fm, mfd, jobid)
@test timedwait_for_function(fm, mfd, jobid; timeout_s=1) == :timed_out
mkey = function_key(mfd, jobid)
@test !(ray_jll.Exists(fm.gcs_client, FUNCTION_MANAGER_NAMESPACE, mkey))
export_function!(fm, MyMod.f, jobid)

# can import the function even when it's aliased in another module:
Expand Down
60 changes: 26 additions & 34 deletions test/ray_julia_jll/gcs_client.jl
Original file line number Diff line number Diff line change
@@ -1,44 +1,36 @@
@testset "GCS client" begin
using UUIDs
using .ray_julia_jll: JuliaGcsClient, Connect, Put, Get, Keys, Exists, Status, ok,
ToString
ToString, Disconnect

client = JuliaGcsClient("127.0.0.1:6379")
omus marked this conversation as resolved.
Show resolved Hide resolved
@test isnothing(Disconnect(client))

ns = string("TESTING-", uuid4())

# throws if not connected
@test_throws ErrorException Put(client, ns, "computer", "mistaek", false, -1)
@test_throws ErrorException Get(client, ns, "computer", -1)
@test_throws ErrorException Keys(client, ns, "", -1)
@test_throws ErrorException Exists(client, ns, "computer", -1)

status = Connect(client)
@test ok(status)
@test ToString(status) == "OK"

@test Put(client, ns, "computer", "mistaek", false, -1) == 1
@test Get(client, ns, "computer", -1) == "mistaek"
@test Keys(client, ns, "", -1) == ["computer"]
@test Keys(client, ns, "comp", -1) == ["computer"]
@test Keys(client, ns, "comppp", -1) == []
@test Exists(client, ns, "computer", -1)

# no overwrite
@test Put(client, ns, "computer", "blah", false, -1) == 0
@test Get(client, ns, "computer", -1) == "mistaek"

# overwrite ("added" only increments on new key I think)
@test Put(client, ns, "computer", "blah", true, -1) == 0
@test Get(client, ns, "computer", -1) == "blah"

# throw on missing key
@test_throws ErrorException Get(client, ns, "none", -1)

# ideally we'd throw on connect but it returns OK......
badclient = JuliaGcsClient("127.0.0.1:6378")
status = Connect(badclient)

# ...but then throws when we try to do anything so at least there's that
@test_throws ErrorException Put(badclient, ns, "computer", "mistaek", false, -1)
Comment on lines -38 to -43
Copy link
Contributor Author

@glennmoy glennmoy Oct 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue here is that the Connect call returns Status::OK irrespective of whether the GCS Server exists.

It first reports after 5 seconds that it can't connect, then after a minute kills the session with an EXIT_FAILURE.
Again these are set by RayConfig params.

If the client does not exist then then the thread executing the server (I think) throws the error which only gets reported but not caught in the Julia REPL

https://github.com/ray-project/ray/blob/cde6e887cbb21a9cae2632e3e4b883d913d38a05/src/ray/rpc/gcs_server/gcs_rpc_client.h#L212-L216

Unfortunately the gcs_is_down_ field is private, however there is a way to check if the server is alive that uses a callback

However, I don't think it's worth directly implementing this. The timeout should take care of things it's just that the error won't be nicely caught/reported in Julia but we can add that as a follow up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MWE

julia> using Ray.ray_julia_jll

julia> badclient = ray_julia_jll.JuliaGcsClient("127.0.0.1:6378")
Ray.ray_julia_jll.JuliaGcsClientAllocated(Ptr{Nothing} @0x00000001376270e0)

julia> ray_julia_jll.Connect(badclient)
[2023-10-20 18:04:32,822 E 28309 8652849] gcs_rpc_client.h:207: Failed to connect to GCS at address 127.0.0.1:6378 within 5 seconds.
OK

julia> [2023-10-20 18:05:27,877 E 28309 8653025] gcs_rpc_client.h:537: Failed to connect to GCS within 60 seconds. GCS may have been killed. It's either GCS is terminated by `ray stop` or is killed unexpectedly. If it is killed unexpectedly, see the log file gcs_server.out. https://docs.ray.io/en/master/ray-observability/ray-logging.html#logging-directory-structure. The program will terminate.
(venv) MacBook-Air~/.j/d/Ray (gm/gcsclient|✔)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scenario in which this would occur is if you run Ray.init() without a local raylet present? If so, I'm fine with making this into an issue to tackle later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah and even then it should error earlier if it tried to get the GCS address during Ray.init

@test_throws ErrorException Put(client, ns, "computer", "mistaek", false)
@test_throws ErrorException Get(client, ns, "computer")
@test_throws ErrorException Keys(client, ns, "")
@test_throws ErrorException Exists(client, ns, "computer")

Connect(client) do client
@test Put(client, ns, "computer", "mistaek", false) == 1
@test Get(client, ns, "computer") == "mistaek"
@test Keys(client, ns, "") == ["computer"]
@test Keys(client, ns, "comp") == ["computer"]
@test Keys(client, ns, "comppp") == []
@test Exists(client, ns, "computer")

# no overwrite
@test Put(client, ns, "computer", "blah", false) == 0
@test Get(client, ns, "computer") == "mistaek"

# overwrite ("added" only increments on new key I think)
@test Put(client, ns, "computer", "blah", true) == 0
@test Get(client, ns, "computer") == "blah"

# throw on missing key
@test_throws ErrorException Get(client, ns, "none")
end
end