From 2bfafa323fb6cef92cf61a0df540472e8a527390 Mon Sep 17 00:00:00 2001 From: Glenn Moynihan Date: Wed, 18 Oct 2023 17:18:03 +0100 Subject: [PATCH 01/25] Use GCSClient instead of PythonGCSClient --- .vscode/settings.json | 101 +++++++++++++++++++++++++++++++ build/wrapper.cc | 45 ++++++++------ build/wrapper.h | 30 ++++----- src/function_manager.jl | 12 ++-- src/runtime.jl | 3 +- test/function_manager.jl | 8 ++- test/ray_julia_jll/gcs_client.jl | 42 +++++++------ 7 files changed, 178 insertions(+), 63 deletions(-) create mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 00000000..74b5013f --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,101 @@ +{ + "files.associations": { + "stdexcept": "cpp", + "__bit_reference": "cpp", + "__bits": "cpp", + "__config": "cpp", + "__debug": "cpp", + "__errc": "cpp", + "__hash_table": "cpp", + "__locale": "cpp", + "__mutex_base": "cpp", + "__node_handle": "cpp", + "__split_buffer": "cpp", + "__threading_support": "cpp", + "__tree": "cpp", + "__tuple": "cpp", + "__verbose_abort": "cpp", + "any": "cpp", + "array": "cpp", + "atomic": "cpp", + "bit": "cpp", + "bitset": "cpp", + "cctype": "cpp", + "cfenv": "cpp", + "charconv": "cpp", + "cinttypes": "cpp", + "clocale": "cpp", + "cmath": "cpp", + "codecvt": "cpp", + "complex": "cpp", + "condition_variable": "cpp", + "csetjmp": "cpp", + "csignal": "cpp", + "cstdarg": "cpp", + "cstddef": "cpp", + "cstdint": "cpp", + "cstdio": "cpp", + "cstdlib": "cpp", + "cstring": "cpp", + "ctime": "cpp", + "cwchar": "cpp", + "cwctype": "cpp", + "deque": "cpp", + "exception": "cpp", + "coroutine": "cpp", + "format": "cpp", + "forward_list": "cpp", + "fstream": "cpp", + "future": "cpp", + "initializer_list": "cpp", + "iomanip": "cpp", + "ios": "cpp", + "iosfwd": "cpp", + "iostream": "cpp", + "istream": "cpp", + "latch": "cpp", + "limits": "cpp", + "list": "cpp", + "locale": "cpp", + "map": "cpp", + "memory": "cpp", + "mutex": "cpp", + "new": "cpp", + "optional": "cpp", + "ostream": "cpp", + "queue": "cpp", + "ratio": "cpp", + "regex": "cpp", + "scoped_allocator": "cpp", + "semaphore": "cpp", + "set": "cpp", + "shared_mutex": "cpp", + "span": "cpp", + "sstream": "cpp", + "stack": "cpp", + "streambuf": "cpp", + "string": "cpp", + "string_view": "cpp", + "strstream": "cpp", + "system_error": "cpp", + "thread": "cpp", + "tuple": "cpp", + "type_traits": "cpp", + "typeindex": "cpp", + "typeinfo": "cpp", + "unordered_map": "cpp", + "unordered_set": "cpp", + "valarray": "cpp", + "variant": "cpp", + "vector": "cpp", + "__nullptr": "cpp", + "__string": "cpp", + "chrono": "cpp", + "compare": "cpp", + "concepts": "cpp", + "numeric": "cpp", + "random": "cpp", + "ranges": "cpp", + "algorithm": "cpp" + } +} diff --git a/build/wrapper.cc b/build/wrapper.cc index e581f973..bde900e3 100644 --- a/build/wrapper.cc +++ b/build/wrapper.cc @@ -257,62 +257,68 @@ JuliaGcsClient::JuliaGcsClient(const std::string &gcs_address) { } Status JuliaGcsClient::Connect() { - gcs_client_ = std::make_unique(options_); - return gcs_client_->Connect(); + io_service_ = std::make_unique(); + io_service_thread_ = std::make_unique([this] { + std::unique_ptr work( + new boost::asio::io_service::work(*io_service_)); + io_service_->run(); + }); + gcs_client_ = std::make_unique(options_); + return gcs_client_->Connect(*io_service_); +} + +void JuliaGcsClient::Disconnect() { + io_service_->stop(); + io_service_thread_->join(); + gcs_client_->Disconnect(); + gcs_client_.reset(); } -std::string JuliaGcsClient::Get(const std::string &ns, - const std::string &key, - int64_t timeout_ms) { +std::string JuliaGcsClient::Get(const std::string &ns, const std::string &key) { if (!gcs_client_) { throw std::runtime_error("GCS client not initialized; did you forget to Connect?"); } std::string value; - Status status = gcs_client_->InternalKVGet(ns, key, timeout_ms, value); + Status status = gcs_client_->InternalKV().Get(ns, key, value); if (!status.ok()) { throw std::runtime_error(status.ToString()); } return value; } -int JuliaGcsClient::Put(const std::string &ns, +bool JuliaGcsClient::Put(const std::string &ns, const std::string &key, const std::string &value, - bool overwrite, - int64_t timeout_ms) { + bool overwrite) { if (!gcs_client_) { throw std::runtime_error("GCS client not initialized; did you forget to Connect?"); } - int added_num; - Status status = gcs_client_->InternalKVPut(ns, key, value, overwrite, timeout_ms, added_num); + bool added_num; + Status status = gcs_client_->InternalKV().Put(ns, key, value, overwrite, added_num); if (!status.ok()) { throw std::runtime_error(status.ToString()); } return added_num; } -std::vector JuliaGcsClient::Keys(const std::string &ns, - const std::string &prefix, - int64_t timeout_ms) { +std::vector JuliaGcsClient::Keys(const std::string &ns, const std::string &prefix) { if (!gcs_client_) { throw std::runtime_error("GCS client not initialized; did you forget to Connect?"); } std::vector results; - Status status = gcs_client_->InternalKVKeys(ns, prefix, timeout_ms, results); + Status status = gcs_client_->InternalKV().Keys(ns, prefix, results); if (!status.ok()) { throw std::runtime_error(status.ToString()); } return results; } -bool JuliaGcsClient::Exists(const std::string &ns, - const std::string &key, - int64_t timeout_ms) { +bool JuliaGcsClient::Exists(const std::string &ns, const std::string &key) { if (!gcs_client_) { throw std::runtime_error("GCS client not initialized; did you forget to Connect?"); } bool exists; - Status status = gcs_client_->InternalKVExists(ns, key, timeout_ms, exists); + Status status = gcs_client_->InternalKV().Exists(ns, key, exists); if (!status.ok()) { throw std::runtime_error(status.ToString()); } @@ -715,6 +721,7 @@ JLCXX_MODULE define_julia_module(jlcxx::Module& mod) mod.add_type("JuliaGcsClient") .constructor() .method("Connect", &JuliaGcsClient::Connect) + .method("Disconnect", &JuliaGcsClient::Disconnect) .method("Put", &JuliaGcsClient::Put) .method("Get", &JuliaGcsClient::Get) .method("Keys", &JuliaGcsClient::Keys) diff --git a/build/wrapper.h b/build/wrapper.h index 43873e8a..924d0cfe 100644 --- a/build/wrapper.h +++ b/build/wrapper.h @@ -34,23 +34,23 @@ class JuliaGcsClient { ray::Status Connect(); - std::string Get(const std::string &ns, - const std::string &key, - int64_t timeout_ms); - int Put(const std::string &ns, + void Disconnect(); + + std::string Get(const std::string &ns, const std::string &key); + + bool Put(const std::string &ns, const std::string &key, - const std::string &val, - bool overwrite, - int64_t timeout_ms); - std::vector Keys(const std::string &ns, - const std::string &prefix, - int64_t timeout_ms); - bool Exists(const std::string &ns, - const std::string &key, - int64_t timeout_ms); - - std::unique_ptr gcs_client_; + const std::string &value, + bool overwrite); + + std::vector Keys(const std::string &ns, const std::string &prefix); + + bool Exists(const std::string &ns, const std::string &key); + + std::unique_ptr gcs_client_; ray::gcs::GcsClientOptions options_; + std::unique_ptr io_service_; + std::unique_ptr io_service_thread_; }; JLCXX_MODULE define_julia_module(jlcxx::Module& mod); diff --git a/src/function_manager.jl b/src/function_manager.jl index 42a49a3b..f72e93ae 100644 --- a/src/function_manager.jl +++ b/src/function_manager.jl @@ -65,8 +65,10 @@ const FUNCTION_MANAGER = Ref{FunctionManager}() function _init_global_function_manager(gcs_address) @info "Connecting function manager to GCS at $gcs_address..." gcs_client = ray_jll.JuliaGcsClient(gcs_address) - ray_jll.Connect(gcs_client) + status = ray_jll.Connect(gcs_client) + ray_jll.ok(status) || error("Could not connect to GCS") FUNCTION_MANAGER[] = FunctionManager(; gcs_client, functions=Dict{String,Any}()) + atexit(() -> ray_jll.Disconnect(gcs_client)) return nothing end @@ -82,13 +84,13 @@ function export_function!(fm::FunctionManager, f, job_id=get_job_id()) @debug "Exporting function to function store:" fd key function_locations # DFK: I _think_ the string memory may be mangled if we don't `deepcopy`. Not sure but # it can't hurt - if ray_jll.Exists(fm.gcs_client, FUNCTION_MANAGER_NAMESPACE, deepcopy(key), -1) + if ray_jll.Exists(fm.gcs_client, FUNCTION_MANAGER_NAMESPACE, deepcopy(key)) @debug "Function already present in GCS store:" fd key else @debug "Exporting function to GCS store:" fd key val = base64encode(serialize, f) check_oversized_function(val, fd) - ray_jll.Put(fm.gcs_client, FUNCTION_MANAGER_NAMESPACE, key, val, true, -1) + ray_jll.Put(fm.gcs_client, FUNCTION_MANAGER_NAMESPACE, key, val, true) end end @@ -96,7 +98,7 @@ function timedwait_for_function(fm::FunctionManager, fd::ray_jll.JuliaFunctionDe job_id=get_job_id(); timeout_s=10) key = function_key(fd, job_id) status = try - exists = ray_jll.Exists(fm.gcs_client, FUNCTION_MANAGER_NAMESPACE, key, timeout_s) + exists = ray_jll.Exists(fm.gcs_client, FUNCTION_MANAGER_NAMESPACE, key) exists ? :ok : :timed_out catch e if e isa ErrorException && contains(e.msg, "Deadline Exceeded") @@ -118,7 +120,7 @@ function import_function!(fm::FunctionManager, fd::ray_jll.JuliaFunctionDescript return get!(fm.functions, fd.function_hash) do key = function_key(fd, job_id) @debug "Function not found locally, retrieving from function store" fd key - val = ray_jll.Get(fm.gcs_client, FUNCTION_MANAGER_NAMESPACE, key, -1) + val = ray_jll.Get(fm.gcs_client, FUNCTION_MANAGER_NAMESPACE, key) try io = IOBuffer() iob64 = Base64DecodePipe(io) diff --git a/src/runtime.jl b/src/runtime.jl index a4688e94..8b288eef 100644 --- a/src/runtime.jl +++ b/src/runtime.jl @@ -80,8 +80,7 @@ function init(runtime_env::Union{RuntimeEnv,Nothing}=nothing; opts = ray_jll.GcsClientOptions(gcs_address) GLOBAL_STATE_ACCESSOR[] = ray_jll.GlobalStateAccessor(opts) - ray_jll.Connect(GLOBAL_STATE_ACCESSOR[]) || - error("Failed to connect to Ray GCS at $(gcs_address)") + ray_jll.Connect(GLOBAL_STATE_ACCESSOR[]) || error("Failed to connect to Ray GCS at $(gcs_address)") atexit(() -> ray_jll.Disconnect(GLOBAL_STATE_ACCESSOR[])) job_id = ray_jll.GetNextJobID(GLOBAL_STATE_ACCESSOR[]) diff --git a/test/function_manager.jl b/test/function_manager.jl index 85d1673c..11e6cf04 100644 --- a/test/function_manager.jl +++ b/test/function_manager.jl @@ -1,6 +1,6 @@ @testset "function manager" begin using Ray: FunctionManager, export_function!, import_function!, timedwait_for_function - using .ray_julia_jll: JuliaGcsClient, Connect, function_descriptor, + using .ray_julia_jll: JuliaGcsClient, Connect, Disconnect, function_descriptor, JuliaFunctionDescriptor, Exists client = JuliaGcsClient("127.0.0.1:6379") @@ -20,8 +20,9 @@ @test f2.(1:10) == f.(1:10) mfd = function_descriptor(MyMod.f) - @test_throws ErrorException import_function!(fm, mfd, jobid) - @test timedwait_for_function(fm, mfd, jobid; timeout_s=1) == :timed_out + # TODO: COME BACK TO THESE + # @test_throws ErrorException import_function!(fm, mfd, jobid) + # @test timedwait_for_function(fm, mfd, jobid; timeout_s=1) == :timed_out export_function!(fm, MyMod.f, jobid) # can import the function even when it's aliased in another module: @@ -101,4 +102,5 @@ # finally # rmprocs(workers()) # end + Disconnect(client) end diff --git a/test/ray_julia_jll/gcs_client.jl b/test/ray_julia_jll/gcs_client.jl index da43754b..06d58bdb 100644 --- a/test/ray_julia_jll/gcs_client.jl +++ b/test/ray_julia_jll/gcs_client.jl @@ -1,44 +1,48 @@ @testset "GCS client" begin using UUIDs using .ray_julia_jll: JuliaGcsClient, Connect, Put, Get, Keys, Exists, Status, ok, - ToString + ToString, Disconnect client = JuliaGcsClient("127.0.0.1:6379") ns = string("TESTING-", uuid4()) # throws if not connected - @test_throws ErrorException Put(client, ns, "computer", "mistaek", false, -1) - @test_throws ErrorException Get(client, ns, "computer", -1) - @test_throws ErrorException Keys(client, ns, "", -1) - @test_throws ErrorException Exists(client, ns, "computer", -1) + @test_throws ErrorException Put(client, ns, "computer", "mistaek", false) + @test_throws ErrorException Get(client, ns, "computer") + @test_throws ErrorException Keys(client, ns, "") + @test_throws ErrorException Exists(client, ns, "computer") status = Connect(client) @test ok(status) @test ToString(status) == "OK" - @test Put(client, ns, "computer", "mistaek", false, -1) == 1 - @test Get(client, ns, "computer", -1) == "mistaek" - @test Keys(client, ns, "", -1) == ["computer"] - @test Keys(client, ns, "comp", -1) == ["computer"] - @test Keys(client, ns, "comppp", -1) == [] - @test Exists(client, ns, "computer", -1) + @test Put(client, ns, "computer", "mistaek", false) == 1 + @test Get(client, ns, "computer") == "mistaek" + @test Keys(client, ns, "") == ["computer"] + @test Keys(client, ns, "comp") == ["computer"] + @test Keys(client, ns, "comppp") == [] + @test Exists(client, ns, "computer") # no overwrite - @test Put(client, ns, "computer", "blah", false, -1) == 0 - @test Get(client, ns, "computer", -1) == "mistaek" + @test Put(client, ns, "computer", "blah", false) == 0 + @test Get(client, ns, "computer") == "mistaek" # overwrite ("added" only increments on new key I think) - @test Put(client, ns, "computer", "blah", true, -1) == 0 - @test Get(client, ns, "computer", -1) == "blah" + @test Put(client, ns, "computer", "blah", true) == 0 + @test Get(client, ns, "computer") == "blah" # throw on missing key - @test_throws ErrorException Get(client, ns, "none", -1) + @test_throws ErrorException Get(client, ns, "none") + # TODO: COME BACK TO THESE # ideally we'd throw on connect but it returns OK...... - badclient = JuliaGcsClient("127.0.0.1:6378") - status = Connect(badclient) + # badclient = JuliaGcsClient("127.0.0.1:6378") + # status = Connect(badclient) # ...but then throws when we try to do anything so at least there's that - @test_throws ErrorException Put(badclient, ns, "computer", "mistaek", false, -1) + # @test_throws ErrorException Put(badclient, ns, "computer", "mistaek", false) + + Disconnect(client) + # Disconnect(badclient) end From f342fa09fb1308161f7fa80cd59b044a00f64897 Mon Sep 17 00:00:00 2001 From: Glenn Date: Fri, 20 Oct 2023 15:26:30 +0100 Subject: [PATCH 02/25] delete .vscode settings --- .vscode/settings.json | 101 ------------------------------------------ 1 file changed, 101 deletions(-) delete mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100644 index 74b5013f..00000000 --- a/.vscode/settings.json +++ /dev/null @@ -1,101 +0,0 @@ -{ - "files.associations": { - "stdexcept": "cpp", - "__bit_reference": "cpp", - "__bits": "cpp", - "__config": "cpp", - "__debug": "cpp", - "__errc": "cpp", - "__hash_table": "cpp", - "__locale": "cpp", - "__mutex_base": "cpp", - "__node_handle": "cpp", - "__split_buffer": "cpp", - "__threading_support": "cpp", - "__tree": "cpp", - "__tuple": "cpp", - "__verbose_abort": "cpp", - "any": "cpp", - "array": "cpp", - "atomic": "cpp", - "bit": "cpp", - "bitset": "cpp", - "cctype": "cpp", - "cfenv": "cpp", - "charconv": "cpp", - "cinttypes": "cpp", - "clocale": "cpp", - "cmath": "cpp", - "codecvt": "cpp", - "complex": "cpp", - "condition_variable": "cpp", - "csetjmp": "cpp", - "csignal": "cpp", - "cstdarg": "cpp", - "cstddef": "cpp", - "cstdint": "cpp", - "cstdio": "cpp", - "cstdlib": "cpp", - "cstring": "cpp", - "ctime": "cpp", - "cwchar": "cpp", - "cwctype": "cpp", - "deque": "cpp", - "exception": "cpp", - "coroutine": "cpp", - "format": "cpp", - "forward_list": "cpp", - "fstream": "cpp", - "future": "cpp", - "initializer_list": "cpp", - "iomanip": "cpp", - "ios": "cpp", - "iosfwd": "cpp", - "iostream": "cpp", - "istream": "cpp", - "latch": "cpp", - "limits": "cpp", - "list": "cpp", - "locale": "cpp", - "map": "cpp", - "memory": "cpp", - "mutex": "cpp", - "new": "cpp", - "optional": "cpp", - "ostream": "cpp", - "queue": "cpp", - "ratio": "cpp", - "regex": "cpp", - "scoped_allocator": "cpp", - "semaphore": "cpp", - "set": "cpp", - "shared_mutex": "cpp", - "span": "cpp", - "sstream": "cpp", - "stack": "cpp", - "streambuf": "cpp", - "string": "cpp", - "string_view": "cpp", - "strstream": "cpp", - "system_error": "cpp", - "thread": "cpp", - "tuple": "cpp", - "type_traits": "cpp", - "typeindex": "cpp", - "typeinfo": "cpp", - "unordered_map": "cpp", - "unordered_set": "cpp", - "valarray": "cpp", - "variant": "cpp", - "vector": "cpp", - "__nullptr": "cpp", - "__string": "cpp", - "chrono": "cpp", - "compare": "cpp", - "concepts": "cpp", - "numeric": "cpp", - "random": "cpp", - "ranges": "cpp", - "algorithm": "cpp" - } -} From 5e4812cf8159c8eb811e8792e7740136d5a20cf7 Mon Sep 17 00:00:00 2001 From: Glenn Date: Fri, 20 Oct 2023 17:02:02 +0100 Subject: [PATCH 03/25] Delete tests that no longer apply --- test/function_manager.jl | 3 --- test/ray_julia_jll/gcs_client.jl | 9 --------- 2 files changed, 12 deletions(-) diff --git a/test/function_manager.jl b/test/function_manager.jl index 11e6cf04..4f426972 100644 --- a/test/function_manager.jl +++ b/test/function_manager.jl @@ -20,9 +20,6 @@ @test f2.(1:10) == f.(1:10) mfd = function_descriptor(MyMod.f) - # TODO: COME BACK TO THESE - # @test_throws ErrorException import_function!(fm, mfd, jobid) - # @test timedwait_for_function(fm, mfd, jobid; timeout_s=1) == :timed_out export_function!(fm, MyMod.f, jobid) # can import the function even when it's aliased in another module: diff --git a/test/ray_julia_jll/gcs_client.jl b/test/ray_julia_jll/gcs_client.jl index 06d58bdb..e539a525 100644 --- a/test/ray_julia_jll/gcs_client.jl +++ b/test/ray_julia_jll/gcs_client.jl @@ -35,14 +35,5 @@ # throw on missing key @test_throws ErrorException Get(client, ns, "none") - # TODO: COME BACK TO THESE - # ideally we'd throw on connect but it returns OK...... - # badclient = JuliaGcsClient("127.0.0.1:6378") - # status = Connect(badclient) - - # ...but then throws when we try to do anything so at least there's that - # @test_throws ErrorException Put(badclient, ns, "computer", "mistaek", false) - Disconnect(client) - # Disconnect(badclient) end From feeec487f71f3e192239ffcd2a83b10107a14ef2 Mon Sep 17 00:00:00 2001 From: Glenn Date: Fri, 20 Oct 2023 17:03:27 +0100 Subject: [PATCH 04/25] Delete timedwait_for_function --- src/function_manager.jl | 16 ---------------- test/function_manager.jl | 3 +-- 2 files changed, 1 insertion(+), 18 deletions(-) diff --git a/src/function_manager.jl b/src/function_manager.jl index f72e93ae..bd4a7871 100644 --- a/src/function_manager.jl +++ b/src/function_manager.jl @@ -94,22 +94,6 @@ function export_function!(fm::FunctionManager, f, job_id=get_job_id()) end end -function timedwait_for_function(fm::FunctionManager, fd::ray_jll.JuliaFunctionDescriptor, - job_id=get_job_id(); timeout_s=10) - key = function_key(fd, job_id) - status = try - exists = ray_jll.Exists(fm.gcs_client, FUNCTION_MANAGER_NAMESPACE, key) - exists ? :ok : :timed_out - catch e - if e isa ErrorException && contains(e.msg, "Deadline Exceeded") - return :timed_out - else - rethrow() - end - end - return status -end - # XXX: this will error if the function is not found in the store. # TODO: consider _trying_ to resolve the function descriptor locally (i.e., # somthing like `eval(Meta.parse(CallString(fd)))`), falling back to the function diff --git a/test/function_manager.jl b/test/function_manager.jl index 4f426972..7a614d8a 100644 --- a/test/function_manager.jl +++ b/test/function_manager.jl @@ -1,5 +1,5 @@ @testset "function manager" begin - using Ray: FunctionManager, export_function!, import_function!, timedwait_for_function + using Ray: FunctionManager, export_function!, import_function! using .ray_julia_jll: JuliaGcsClient, Connect, Disconnect, function_descriptor, JuliaFunctionDescriptor, Exists @@ -14,7 +14,6 @@ export_function!(fm, f, jobid) fd = function_descriptor(f) - @test timedwait_for_function(fm, fd, jobid; timeout_s=10) == :ok f2 = import_function!(fm, fd, jobid) @test f2.(1:10) == f.(1:10) From dbc760f0eca89f9a1e83b74935f5da3e569c15e4 Mon Sep 17 00:00:00 2001 From: Glenn Date: Fri, 20 Oct 2023 17:48:44 +0100 Subject: [PATCH 05/25] add back test --- test/function_manager.jl | 1 + 1 file changed, 1 insertion(+) diff --git a/test/function_manager.jl b/test/function_manager.jl index 7a614d8a..557bade6 100644 --- a/test/function_manager.jl +++ b/test/function_manager.jl @@ -19,6 +19,7 @@ @test f2.(1:10) == f.(1:10) mfd = function_descriptor(MyMod.f) + @test_throws ErrorException import_function!(fm, mfd, jobid) export_function!(fm, MyMod.f, jobid) # can import the function even when it's aliased in another module: From 7ec1c81bbb3b5cd1b2b56a68c5fd95ac7e9296ba Mon Sep 17 00:00:00 2001 From: Glenn Date: Fri, 20 Oct 2023 18:03:07 +0100 Subject: [PATCH 06/25] add back function manager tests --- test/function_manager.jl | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/test/function_manager.jl b/test/function_manager.jl index 557bade6..95fe6d1f 100644 --- a/test/function_manager.jl +++ b/test/function_manager.jl @@ -1,5 +1,6 @@ @testset "function manager" begin - using Ray: FunctionManager, export_function!, import_function! + using Ray: FUNCTION_MANAGER_NAMESPACE, FunctionManager, function_key, export_function!, + import_function! using .ray_julia_jll: JuliaGcsClient, Connect, Disconnect, function_descriptor, JuliaFunctionDescriptor, Exists @@ -14,12 +15,16 @@ export_function!(fm, f, jobid) fd = function_descriptor(f) + key = function_key(fd, jobid) + @test ray_jll.Exists(fm.gcs_client, FUNCTION_MANAGER_NAMESPACE, key) f2 = import_function!(fm, fd, jobid) @test f2.(1:10) == f.(1:10) mfd = function_descriptor(MyMod.f) @test_throws ErrorException import_function!(fm, mfd, jobid) + mkey = function_key(mfd, jobid) + @test !(ray_jll.Exists(fm.gcs_client, FUNCTION_MANAGER_NAMESPACE, mkey)) export_function!(fm, MyMod.f, jobid) # can import the function even when it's aliased in another module: From daf3da23decaf82de470290db35e2a994676465e Mon Sep 17 00:00:00 2001 From: Glenn Moynihan Date: Fri, 20 Oct 2023 19:30:37 +0100 Subject: [PATCH 07/25] formatting Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> --- src/runtime.jl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/runtime.jl b/src/runtime.jl index 8b288eef..a4688e94 100644 --- a/src/runtime.jl +++ b/src/runtime.jl @@ -80,7 +80,8 @@ function init(runtime_env::Union{RuntimeEnv,Nothing}=nothing; opts = ray_jll.GcsClientOptions(gcs_address) GLOBAL_STATE_ACCESSOR[] = ray_jll.GlobalStateAccessor(opts) - ray_jll.Connect(GLOBAL_STATE_ACCESSOR[]) || error("Failed to connect to Ray GCS at $(gcs_address)") + ray_jll.Connect(GLOBAL_STATE_ACCESSOR[]) || + error("Failed to connect to Ray GCS at $(gcs_address)") atexit(() -> ray_jll.Disconnect(GLOBAL_STATE_ACCESSOR[])) job_id = ray_jll.GetNextJobID(GLOBAL_STATE_ACCESSOR[]) From 6ad9dc0a833753f25a901dd74f8d34287371719b Mon Sep 17 00:00:00 2001 From: Glenn Date: Mon, 23 Oct 2023 19:02:13 +0100 Subject: [PATCH 08/25] add comments with links to src code --- build/wrapper.cc | 1 + build/wrapper.h | 16 ++++++++++++---- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/build/wrapper.cc b/build/wrapper.cc index bde900e3..d6fcfe1c 100644 --- a/build/wrapper.cc +++ b/build/wrapper.cc @@ -257,6 +257,7 @@ JuliaGcsClient::JuliaGcsClient(const std::string &gcs_address) { } Status JuliaGcsClient::Connect() { + // https://github.com/beacon-biosignals/ray/blob/448a83caf44108fc1bc44fa7c6c358cffcfcb0d7/src/ray/gcs/gcs_client/test/usage_stats_client_test.cc#L34-L41 io_service_ = std::make_unique(); io_service_thread_ = std::make_unique([this] { std::unique_ptr work( diff --git a/build/wrapper.h b/build/wrapper.h index 924d0cfe..e818c768 100644 --- a/build/wrapper.h +++ b/build/wrapper.h @@ -23,10 +23,16 @@ using ray::core::TaskOptions; using ray::core::WorkerType; std::string ToString(ray::FunctionDescriptor function_descriptor); -// a wrapper class to manage the IO service + thread that the GcsClient needs. -// we may want to use the PythonGcsClient however, which does not do async -// operations in separate threads as far as I can tell...in which case we would -// not need this wrapper at all. +// JuliaGCSClient is a wrapper class to manage the IO service + thread that the GcsClient needs. +// https://github.com/beacon-biosignals/ray/blob/448a83caf44108fc1bc44fa7c6c358cffcfcb0d7/src/ray/gcs/gcs_client/gcs_client.h#L61-L87 +// Note that connection timeout information, etc. is parsed from the RayConfig parameters: +// https://github.com/ray-project/ray/blob/e060dc2de2251cf883de9b43dda9c73e448e6cbd/src/ray/common/ray_config_def.h +// these can be overwritten via environment variables before initializing the RayConfig / GCSClient, e.g. +// ENV["RAY_gcs_server_request_timeout_seconds"] = 10 +// Timeouts that may be useful to configure: +// - gcs_rpc_server_reconnect_timeout_s (default 5): timeout for connecting to GCSClient +// - gcs_rpc_server_reconnect_timeout_s (default 60): timeout for reconnecting to GCSClient +// - gcs_server_request_timeout_seconds (default 60): timeout for fetching from GCSClient class JuliaGcsClient { public: JuliaGcsClient(const ray::gcs::GcsClientOptions &options); @@ -36,6 +42,8 @@ class JuliaGcsClient { void Disconnect(); + // Get, Put, Exists, Keys use methods belonging to an InternalKV field of the GCSClient + // https://github.com/beacon-biosignals/ray/blob/448a83caf44108fc1bc44fa7c6c358cffcfcb0d7/src/ray/gcs/gcs_client/accessor.h#L687 std::string Get(const std::string &ns, const std::string &key); bool Put(const std::string &ns, From 6b98717caa778604f27918461dc30f6cc7b3d9bf Mon Sep 17 00:00:00 2001 From: Glenn Date: Mon, 23 Oct 2023 20:26:01 +0100 Subject: [PATCH 09/25] add finalizer to FunctionManager --- src/function_manager.jl | 11 ++++++++--- test/function_manager.jl | 1 - 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/function_manager.jl b/src/function_manager.jl index bd4a7871..051bca57 100644 --- a/src/function_manager.jl +++ b/src/function_manager.jl @@ -55,11 +55,18 @@ end # so "jlfun" seems reasonable const FUNCTION_MANAGER_NAMESPACE = "jlfun" -Base.@kwdef struct FunctionManager +Base.@kwdef mutable struct FunctionManager gcs_client::ray_jll.JuliaGcsClient functions::Dict{String,Any} + + function FunctionManager(gcs_client, functions) + fm = new(gcs_client, functions) + f(x) = ray_jll.Disconnect(x.gcs_client) + return finalizer(f, fm) + end end + const FUNCTION_MANAGER = Ref{FunctionManager}() function _init_global_function_manager(gcs_address) @@ -68,8 +75,6 @@ function _init_global_function_manager(gcs_address) status = ray_jll.Connect(gcs_client) ray_jll.ok(status) || error("Could not connect to GCS") FUNCTION_MANAGER[] = FunctionManager(; gcs_client, functions=Dict{String,Any}()) - atexit(() -> ray_jll.Disconnect(gcs_client)) - return nothing end diff --git a/test/function_manager.jl b/test/function_manager.jl index 95fe6d1f..ab6979ac 100644 --- a/test/function_manager.jl +++ b/test/function_manager.jl @@ -104,5 +104,4 @@ # finally # rmprocs(workers()) # end - Disconnect(client) end From 46e1d6483b09c299644d80ab74d4f879b15113c1 Mon Sep 17 00:00:00 2001 From: Glenn Date: Mon, 23 Oct 2023 20:54:06 +0100 Subject: [PATCH 10/25] add Connect-do syntax --- src/ray_julia_jll/common.jl | 11 ++++++++++ test/ray_julia_jll/gcs_client.jl | 35 ++++++++++++++++---------------- 2 files changed, 28 insertions(+), 18 deletions(-) diff --git a/src/ray_julia_jll/common.jl b/src/ray_julia_jll/common.jl index beda30fb..851bc01e 100644 --- a/src/ray_julia_jll/common.jl +++ b/src/ray_julia_jll/common.jl @@ -79,6 +79,17 @@ function check_status(status::Status) return nothing end +function Connect(body, client::JuliaGcsClient) + status = Connect(client) + ok(status) || error("Could not connect to GCS") + try + body(client) + finally + Disconnect(client) + end + return nothing +end + ##### ##### Function descriptor wrangling ##### diff --git a/test/ray_julia_jll/gcs_client.jl b/test/ray_julia_jll/gcs_client.jl index e539a525..83d9a817 100644 --- a/test/ray_julia_jll/gcs_client.jl +++ b/test/ray_julia_jll/gcs_client.jl @@ -13,27 +13,26 @@ @test_throws ErrorException Keys(client, ns, "") @test_throws ErrorException Exists(client, ns, "computer") - status = Connect(client) - @test ok(status) - @test ToString(status) == "OK" + Connect(client) do client - @test Put(client, ns, "computer", "mistaek", false) == 1 - @test Get(client, ns, "computer") == "mistaek" - @test Keys(client, ns, "") == ["computer"] - @test Keys(client, ns, "comp") == ["computer"] - @test Keys(client, ns, "comppp") == [] - @test Exists(client, ns, "computer") + @test Put(client, ns, "computer", "mistaek", false) == 1 + @test Get(client, ns, "computer") == "mistaek" + @test Keys(client, ns, "") == ["computer"] + @test Keys(client, ns, "comp") == ["computer"] + @test Keys(client, ns, "comppp") == [] + @test Exists(client, ns, "computer") - # no overwrite - @test Put(client, ns, "computer", "blah", false) == 0 - @test Get(client, ns, "computer") == "mistaek" + # no overwrite + @test Put(client, ns, "computer", "blah", false) == 0 + @test Get(client, ns, "computer") == "mistaek" - # overwrite ("added" only increments on new key I think) - @test Put(client, ns, "computer", "blah", true) == 0 - @test Get(client, ns, "computer") == "blah" + # overwrite ("added" only increments on new key I think) + @test Put(client, ns, "computer", "blah", true) == 0 + @test Get(client, ns, "computer") == "blah" - # throw on missing key - @test_throws ErrorException Get(client, ns, "none") + # throw on missing key + @test_throws ErrorException Get(client, ns, "none") - Disconnect(client) + end + # Disconnect(client) end From b92b437c6a06d605239b122f47c78a5007c5674b Mon Sep 17 00:00:00 2001 From: Glenn Date: Mon, 23 Oct 2023 20:58:12 +0100 Subject: [PATCH 11/25] connect to GCSClient in FunctionManager constructor --- src/function_manager.jl | 4 ++-- test/function_manager.jl | 2 -- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/function_manager.jl b/src/function_manager.jl index 051bca57..45bd3bee 100644 --- a/src/function_manager.jl +++ b/src/function_manager.jl @@ -60,6 +60,8 @@ Base.@kwdef mutable struct FunctionManager functions::Dict{String,Any} function FunctionManager(gcs_client, functions) + status = ray_jll.Connect(gcs_client) + ray_jll.ok(status) || error("Could not connect to GCS") fm = new(gcs_client, functions) f(x) = ray_jll.Disconnect(x.gcs_client) return finalizer(f, fm) @@ -72,8 +74,6 @@ const FUNCTION_MANAGER = Ref{FunctionManager}() function _init_global_function_manager(gcs_address) @info "Connecting function manager to GCS at $gcs_address..." gcs_client = ray_jll.JuliaGcsClient(gcs_address) - status = ray_jll.Connect(gcs_client) - ray_jll.ok(status) || error("Could not connect to GCS") FUNCTION_MANAGER[] = FunctionManager(; gcs_client, functions=Dict{String,Any}()) return nothing end diff --git a/test/function_manager.jl b/test/function_manager.jl index ab6979ac..c2fce8de 100644 --- a/test/function_manager.jl +++ b/test/function_manager.jl @@ -5,8 +5,6 @@ JuliaFunctionDescriptor, Exists client = JuliaGcsClient("127.0.0.1:6379") - Connect(client) - fm = FunctionManager(client, Dict{String,Any}()) jobid = 1337 From 0726fefe4336100a92796e996778eb7cd3d05bcc Mon Sep 17 00:00:00 2001 From: Glenn Date: Tue, 24 Oct 2023 16:45:41 +0100 Subject: [PATCH 12/25] Revert "connect to GCSClient in FunctionManager constructor" This reverts commit 7769d88669a622dafa8c3bbab4e057a79d35b403. --- src/function_manager.jl | 4 ++-- test/function_manager.jl | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/function_manager.jl b/src/function_manager.jl index 45bd3bee..051bca57 100644 --- a/src/function_manager.jl +++ b/src/function_manager.jl @@ -60,8 +60,6 @@ Base.@kwdef mutable struct FunctionManager functions::Dict{String,Any} function FunctionManager(gcs_client, functions) - status = ray_jll.Connect(gcs_client) - ray_jll.ok(status) || error("Could not connect to GCS") fm = new(gcs_client, functions) f(x) = ray_jll.Disconnect(x.gcs_client) return finalizer(f, fm) @@ -74,6 +72,8 @@ const FUNCTION_MANAGER = Ref{FunctionManager}() function _init_global_function_manager(gcs_address) @info "Connecting function manager to GCS at $gcs_address..." gcs_client = ray_jll.JuliaGcsClient(gcs_address) + status = ray_jll.Connect(gcs_client) + ray_jll.ok(status) || error("Could not connect to GCS") FUNCTION_MANAGER[] = FunctionManager(; gcs_client, functions=Dict{String,Any}()) return nothing end diff --git a/test/function_manager.jl b/test/function_manager.jl index c2fce8de..ab6979ac 100644 --- a/test/function_manager.jl +++ b/test/function_manager.jl @@ -5,6 +5,8 @@ JuliaFunctionDescriptor, Exists client = JuliaGcsClient("127.0.0.1:6379") + Connect(client) + fm = FunctionManager(client, Dict{String,Any}()) jobid = 1337 From b63369ff214b5b2d62211efa8aa9bfa3f0eb8573 Mon Sep 17 00:00:00 2001 From: Glenn Date: Tue, 24 Oct 2023 16:46:35 +0100 Subject: [PATCH 13/25] return body() --- src/ray_julia_jll/common.jl | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/ray_julia_jll/common.jl b/src/ray_julia_jll/common.jl index 851bc01e..e59c2e96 100644 --- a/src/ray_julia_jll/common.jl +++ b/src/ray_julia_jll/common.jl @@ -83,11 +83,10 @@ function Connect(body, client::JuliaGcsClient) status = Connect(client) ok(status) || error("Could not connect to GCS") try - body(client) + return body(client) finally Disconnect(client) end - return nothing end ##### From 2cf25295015f9fc1b7b0e09f6b9598b0a189414b Mon Sep 17 00:00:00 2001 From: Glenn Date: Tue, 24 Oct 2023 16:53:43 +0100 Subject: [PATCH 14/25] call check_status --- src/function_manager.jl | 2 +- src/ray_julia_jll/common.jl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/function_manager.jl b/src/function_manager.jl index 051bca57..1b6cca35 100644 --- a/src/function_manager.jl +++ b/src/function_manager.jl @@ -73,7 +73,7 @@ function _init_global_function_manager(gcs_address) @info "Connecting function manager to GCS at $gcs_address..." gcs_client = ray_jll.JuliaGcsClient(gcs_address) status = ray_jll.Connect(gcs_client) - ray_jll.ok(status) || error("Could not connect to GCS") + ray_jll.check_status(status) FUNCTION_MANAGER[] = FunctionManager(; gcs_client, functions=Dict{String,Any}()) return nothing end diff --git a/src/ray_julia_jll/common.jl b/src/ray_julia_jll/common.jl index e59c2e96..996dccd4 100644 --- a/src/ray_julia_jll/common.jl +++ b/src/ray_julia_jll/common.jl @@ -81,7 +81,7 @@ end function Connect(body, client::JuliaGcsClient) status = Connect(client) - ok(status) || error("Could not connect to GCS") + check_status(status) try return body(client) finally From 9c1edd6366c071ec6f30ad900d5fee87278b93b9 Mon Sep 17 00:00:00 2001 From: Glenn Date: Tue, 24 Oct 2023 17:08:34 +0100 Subject: [PATCH 15/25] add finalizer call in _init_global_function_manager --- src/function_manager.jl | 1 + 1 file changed, 1 insertion(+) diff --git a/src/function_manager.jl b/src/function_manager.jl index 1b6cca35..4d11b2d8 100644 --- a/src/function_manager.jl +++ b/src/function_manager.jl @@ -74,6 +74,7 @@ function _init_global_function_manager(gcs_address) gcs_client = ray_jll.JuliaGcsClient(gcs_address) status = ray_jll.Connect(gcs_client) ray_jll.check_status(status) + finalizer(ray_jll.Disconnect, gcs_client) FUNCTION_MANAGER[] = FunctionManager(; gcs_client, functions=Dict{String,Any}()) return nothing end From 123a7dc31ea55adc115d59d68f06c292cbb76e3f Mon Sep 17 00:00:00 2001 From: Glenn Date: Tue, 24 Oct 2023 17:40:45 +0100 Subject: [PATCH 16/25] Disconnect from unconnected client should no-op --- build/wrapper.cc | 3 +++ test/ray_julia_jll/gcs_client.jl | 1 + 2 files changed, 4 insertions(+) diff --git a/build/wrapper.cc b/build/wrapper.cc index d6fcfe1c..20afc8fb 100644 --- a/build/wrapper.cc +++ b/build/wrapper.cc @@ -269,6 +269,9 @@ Status JuliaGcsClient::Connect() { } void JuliaGcsClient::Disconnect() { + if (!gcs_client_){ + return; + } io_service_->stop(); io_service_thread_->join(); gcs_client_->Disconnect(); diff --git a/test/ray_julia_jll/gcs_client.jl b/test/ray_julia_jll/gcs_client.jl index 83d9a817..4e550b2f 100644 --- a/test/ray_julia_jll/gcs_client.jl +++ b/test/ray_julia_jll/gcs_client.jl @@ -4,6 +4,7 @@ ToString, Disconnect client = JuliaGcsClient("127.0.0.1:6379") + @test isnothing(Disconnect(client)) ns = string("TESTING-", uuid4()) From 9e15271d3d2e10fb76a8f244dc407305fbcc939f Mon Sep 17 00:00:00 2001 From: Glenn Moynihan Date: Mon, 23 Oct 2023 21:09:13 +0100 Subject: [PATCH 17/25] Apply suggestions from code review Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> --- src/function_manager.jl | 1 - test/ray_julia_jll/gcs_client.jl | 3 --- 2 files changed, 4 deletions(-) diff --git a/src/function_manager.jl b/src/function_manager.jl index 4d11b2d8..acbc6870 100644 --- a/src/function_manager.jl +++ b/src/function_manager.jl @@ -66,7 +66,6 @@ Base.@kwdef mutable struct FunctionManager end end - const FUNCTION_MANAGER = Ref{FunctionManager}() function _init_global_function_manager(gcs_address) diff --git a/test/ray_julia_jll/gcs_client.jl b/test/ray_julia_jll/gcs_client.jl index 4e550b2f..ccf9fa04 100644 --- a/test/ray_julia_jll/gcs_client.jl +++ b/test/ray_julia_jll/gcs_client.jl @@ -15,7 +15,6 @@ @test_throws ErrorException Exists(client, ns, "computer") Connect(client) do client - @test Put(client, ns, "computer", "mistaek", false) == 1 @test Get(client, ns, "computer") == "mistaek" @test Keys(client, ns, "") == ["computer"] @@ -33,7 +32,5 @@ # throw on missing key @test_throws ErrorException Get(client, ns, "none") - end - # Disconnect(client) end From dff8b4f6d6a0b086cf20451a084f73db949fa545 Mon Sep 17 00:00:00 2001 From: Glenn Moynihan Date: Wed, 25 Oct 2023 15:36:43 +0100 Subject: [PATCH 18/25] call finalizer before check_status Co-authored-by: Curtis Vogt --- src/function_manager.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/function_manager.jl b/src/function_manager.jl index acbc6870..c8220116 100644 --- a/src/function_manager.jl +++ b/src/function_manager.jl @@ -71,9 +71,9 @@ const FUNCTION_MANAGER = Ref{FunctionManager}() function _init_global_function_manager(gcs_address) @info "Connecting function manager to GCS at $gcs_address..." gcs_client = ray_jll.JuliaGcsClient(gcs_address) + finalizer(ray_jll.Disconnect, gcs_client) status = ray_jll.Connect(gcs_client) ray_jll.check_status(status) - finalizer(ray_jll.Disconnect, gcs_client) FUNCTION_MANAGER[] = FunctionManager(; gcs_client, functions=Dict{String,Any}()) return nothing end From 9862a205410da384db717be94c121a78ce287930 Mon Sep 17 00:00:00 2001 From: Glenn Moynihan Date: Wed, 25 Oct 2023 15:37:02 +0100 Subject: [PATCH 19/25] delete finalizer from the FunctionManager constructor Co-authored-by: Curtis Vogt --- src/function_manager.jl | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/src/function_manager.jl b/src/function_manager.jl index c8220116..3297dc58 100644 --- a/src/function_manager.jl +++ b/src/function_manager.jl @@ -55,15 +55,9 @@ end # so "jlfun" seems reasonable const FUNCTION_MANAGER_NAMESPACE = "jlfun" -Base.@kwdef mutable struct FunctionManager +Base.@kwdef struct FunctionManager gcs_client::ray_jll.JuliaGcsClient functions::Dict{String,Any} - - function FunctionManager(gcs_client, functions) - fm = new(gcs_client, functions) - f(x) = ray_jll.Disconnect(x.gcs_client) - return finalizer(f, fm) - end end const FUNCTION_MANAGER = Ref{FunctionManager}() From c49e9e3374e00a786ef77b89b15dd391598a839d Mon Sep 17 00:00:00 2001 From: Glenn Date: Wed, 25 Oct 2023 16:06:02 +0100 Subject: [PATCH 20/25] function manager needs to be mutable --- src/function_manager.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/function_manager.jl b/src/function_manager.jl index 3297dc58..73badf94 100644 --- a/src/function_manager.jl +++ b/src/function_manager.jl @@ -55,7 +55,7 @@ end # so "jlfun" seems reasonable const FUNCTION_MANAGER_NAMESPACE = "jlfun" -Base.@kwdef struct FunctionManager +Base.@kwdef mutable struct FunctionManager gcs_client::ray_jll.JuliaGcsClient functions::Dict{String,Any} end From cc8f43a0286b3c4a64002d442574393b5313d487 Mon Sep 17 00:00:00 2001 From: Curtis Vogt Date: Wed, 25 Oct 2023 21:01:23 -0500 Subject: [PATCH 21/25] Disconnect GCS client in function manager tests --- test/function_manager.jl | 107 +++++++++++++++++++-------------------- 1 file changed, 53 insertions(+), 54 deletions(-) diff --git a/test/function_manager.jl b/test/function_manager.jl index ab6979ac..84573ac6 100644 --- a/test/function_manager.jl +++ b/test/function_manager.jl @@ -4,70 +4,69 @@ using .ray_julia_jll: JuliaGcsClient, Connect, Disconnect, function_descriptor, JuliaFunctionDescriptor, Exists - client = JuliaGcsClient("127.0.0.1:6379") - Connect(client) + Connect(JuliaGcsClient("127.0.0.1:6379")) do gcs_client + fm = FunctionManager(client, Dict{String,Any}()) - fm = FunctionManager(client, Dict{String,Any}()) + jobid = 1337 - jobid = 1337 + f = x -> isless(x, 5) + export_function!(fm, f, jobid) - f = x -> isless(x, 5) - export_function!(fm, f, jobid) + fd = function_descriptor(f) + key = function_key(fd, jobid) + @test ray_jll.Exists(fm.gcs_client, FUNCTION_MANAGER_NAMESPACE, key) + f2 = import_function!(fm, fd, jobid) - fd = function_descriptor(f) - key = function_key(fd, jobid) - @test ray_jll.Exists(fm.gcs_client, FUNCTION_MANAGER_NAMESPACE, key) - f2 = import_function!(fm, fd, jobid) + @test f2.(1:10) == f.(1:10) - @test f2.(1:10) == f.(1:10) + mfd = function_descriptor(MyMod.f) + @test_throws ErrorException import_function!(fm, mfd, jobid) + mkey = function_key(mfd, jobid) + @test !(ray_jll.Exists(fm.gcs_client, FUNCTION_MANAGER_NAMESPACE, mkey)) + export_function!(fm, MyMod.f, jobid) - mfd = function_descriptor(MyMod.f) - @test_throws ErrorException import_function!(fm, mfd, jobid) - mkey = function_key(mfd, jobid) - @test !(ray_jll.Exists(fm.gcs_client, FUNCTION_MANAGER_NAMESPACE, mkey)) - export_function!(fm, MyMod.f, jobid) + # can import the function even when it's aliased in another module: + nfd = function_descriptor(NotMyMod.f) + @test mfd.function_hash == nfd.function_hash + mf2 = import_function!(fm, nfd, jobid) + @test MyMod.f.(1:10) == mf2.(1:10) - # can import the function even when it's aliased in another module: - nfd = function_descriptor(NotMyMod.f) - @test mfd.function_hash == nfd.function_hash - mf2 = import_function!(fm, nfd, jobid) - @test MyMod.f.(1:10) == mf2.(1:10) + mmfd = function_descriptor(MyMod.MySubMod.f) + @test mmfd.module_name == "Main.MyMod.MySubMod" + @test mmfd.function_name == "f" + @test mmfd.function_hash != mfd.function_hash - mmfd = function_descriptor(MyMod.MySubMod.f) - @test mmfd.module_name == "Main.MyMod.MySubMod" - @test mmfd.function_name == "f" - @test mmfd.function_hash != mfd.function_hash + export_function!(fm, MyMod.MySubMod.f, jobid) + mmf2 = import_function!(fm, mmfd, jobid) + @test mmf2.(1:10) == MyMod.MySubMod.f.(1:10) != MyMod.f.(1:10) - export_function!(fm, MyMod.MySubMod.f, jobid) - mmf2 = import_function!(fm, mmfd, jobid) - @test mmf2.(1:10) == MyMod.MySubMod.f.(1:10) != MyMod.f.(1:10) - - fc = let - xthresh = 3 - x -> isless(x, xthresh) - end - export_function!(fm, fc, jobid) - - fcd = function_descriptor(fc) - fc2 = import_function!(fm, fcd, jobid) - @test fc.(1:10) == fc2.(1:10) != f.(1:10) - - @testset "warn/error for large functions" begin - # for some reason, using `let` to introduce local scope does not work - # during test execution to generate a closure, even though it works - # locally, so we use a factory instead: - function bigfactory(size) - x = rand(UInt8, size) - return f(y) = y * sum(x) + fc = let + xthresh = 3 + x -> isless(x, xthresh) + end + export_function!(fm, fc, jobid) + + fcd = function_descriptor(fc) + fc2 = import_function!(fm, fcd, jobid) + @test fc.(1:10) == fc2.(1:10) != f.(1:10) + + @testset "warn/error for large functions" begin + # for some reason, using `let` to introduce local scope does not work + # during test execution to generate a closure, even though it works + # locally, so we use a factory instead: + function bigfactory(size) + x = rand(UInt8, size) + return f(y) = y * sum(x) + end + bigf = bigfactory(Ray.FUNCTION_SIZE_WARN_THRESHOLD) + @test_logs (:warn, r"very large") export_function!(fm, bigf, jobid) + bigfd = function_descriptor(bigf) + bigf2 = import_function!(fm, bigfd, jobid) + @test bigf(100) == bigf2(100) + + biggerf = bigfactory(Ray.FUNCTION_SIZE_ERROR_THRESHOLD) + @test_throws ArgumentError export_function!(fm, biggerf, jobid) end - bigf = bigfactory(Ray.FUNCTION_SIZE_WARN_THRESHOLD) - @test_logs (:warn, r"very large") export_function!(fm, bigf, jobid) - bigfd = function_descriptor(bigf) - bigf2 = import_function!(fm, bigfd, jobid) - @test bigf(100) == bigf2(100) - - biggerf = bigfactory(Ray.FUNCTION_SIZE_ERROR_THRESHOLD) - @test_throws ArgumentError export_function!(fm, biggerf, jobid) end # XXX: this works when run in global scope but unfortunately something about From 7f6d315dea79ab43e2d2e60fb640281505239313 Mon Sep 17 00:00:00 2001 From: Curtis Vogt Date: Wed, 25 Oct 2023 22:24:58 -0500 Subject: [PATCH 22/25] Revert "Disconnect GCS client in function manager tests" This reverts commit cc8f43a0286b3c4a64002d442574393b5313d487. --- test/function_manager.jl | 107 ++++++++++++++++++++------------------- 1 file changed, 54 insertions(+), 53 deletions(-) diff --git a/test/function_manager.jl b/test/function_manager.jl index 84573ac6..ab6979ac 100644 --- a/test/function_manager.jl +++ b/test/function_manager.jl @@ -4,69 +4,70 @@ using .ray_julia_jll: JuliaGcsClient, Connect, Disconnect, function_descriptor, JuliaFunctionDescriptor, Exists - Connect(JuliaGcsClient("127.0.0.1:6379")) do gcs_client - fm = FunctionManager(client, Dict{String,Any}()) + client = JuliaGcsClient("127.0.0.1:6379") + Connect(client) - jobid = 1337 + fm = FunctionManager(client, Dict{String,Any}()) - f = x -> isless(x, 5) - export_function!(fm, f, jobid) + jobid = 1337 - fd = function_descriptor(f) - key = function_key(fd, jobid) - @test ray_jll.Exists(fm.gcs_client, FUNCTION_MANAGER_NAMESPACE, key) - f2 = import_function!(fm, fd, jobid) + f = x -> isless(x, 5) + export_function!(fm, f, jobid) - @test f2.(1:10) == f.(1:10) + fd = function_descriptor(f) + key = function_key(fd, jobid) + @test ray_jll.Exists(fm.gcs_client, FUNCTION_MANAGER_NAMESPACE, key) + f2 = import_function!(fm, fd, jobid) - mfd = function_descriptor(MyMod.f) - @test_throws ErrorException import_function!(fm, mfd, jobid) - mkey = function_key(mfd, jobid) - @test !(ray_jll.Exists(fm.gcs_client, FUNCTION_MANAGER_NAMESPACE, mkey)) - export_function!(fm, MyMod.f, jobid) + @test f2.(1:10) == f.(1:10) - # can import the function even when it's aliased in another module: - nfd = function_descriptor(NotMyMod.f) - @test mfd.function_hash == nfd.function_hash - mf2 = import_function!(fm, nfd, jobid) - @test MyMod.f.(1:10) == mf2.(1:10) + mfd = function_descriptor(MyMod.f) + @test_throws ErrorException import_function!(fm, mfd, jobid) + mkey = function_key(mfd, jobid) + @test !(ray_jll.Exists(fm.gcs_client, FUNCTION_MANAGER_NAMESPACE, mkey)) + export_function!(fm, MyMod.f, jobid) - mmfd = function_descriptor(MyMod.MySubMod.f) - @test mmfd.module_name == "Main.MyMod.MySubMod" - @test mmfd.function_name == "f" - @test mmfd.function_hash != mfd.function_hash + # can import the function even when it's aliased in another module: + nfd = function_descriptor(NotMyMod.f) + @test mfd.function_hash == nfd.function_hash + mf2 = import_function!(fm, nfd, jobid) + @test MyMod.f.(1:10) == mf2.(1:10) - export_function!(fm, MyMod.MySubMod.f, jobid) - mmf2 = import_function!(fm, mmfd, jobid) - @test mmf2.(1:10) == MyMod.MySubMod.f.(1:10) != MyMod.f.(1:10) + mmfd = function_descriptor(MyMod.MySubMod.f) + @test mmfd.module_name == "Main.MyMod.MySubMod" + @test mmfd.function_name == "f" + @test mmfd.function_hash != mfd.function_hash - fc = let - xthresh = 3 - x -> isless(x, xthresh) - end - export_function!(fm, fc, jobid) - - fcd = function_descriptor(fc) - fc2 = import_function!(fm, fcd, jobid) - @test fc.(1:10) == fc2.(1:10) != f.(1:10) - - @testset "warn/error for large functions" begin - # for some reason, using `let` to introduce local scope does not work - # during test execution to generate a closure, even though it works - # locally, so we use a factory instead: - function bigfactory(size) - x = rand(UInt8, size) - return f(y) = y * sum(x) - end - bigf = bigfactory(Ray.FUNCTION_SIZE_WARN_THRESHOLD) - @test_logs (:warn, r"very large") export_function!(fm, bigf, jobid) - bigfd = function_descriptor(bigf) - bigf2 = import_function!(fm, bigfd, jobid) - @test bigf(100) == bigf2(100) - - biggerf = bigfactory(Ray.FUNCTION_SIZE_ERROR_THRESHOLD) - @test_throws ArgumentError export_function!(fm, biggerf, jobid) + export_function!(fm, MyMod.MySubMod.f, jobid) + mmf2 = import_function!(fm, mmfd, jobid) + @test mmf2.(1:10) == MyMod.MySubMod.f.(1:10) != MyMod.f.(1:10) + + fc = let + xthresh = 3 + x -> isless(x, xthresh) + end + export_function!(fm, fc, jobid) + + fcd = function_descriptor(fc) + fc2 = import_function!(fm, fcd, jobid) + @test fc.(1:10) == fc2.(1:10) != f.(1:10) + + @testset "warn/error for large functions" begin + # for some reason, using `let` to introduce local scope does not work + # during test execution to generate a closure, even though it works + # locally, so we use a factory instead: + function bigfactory(size) + x = rand(UInt8, size) + return f(y) = y * sum(x) end + bigf = bigfactory(Ray.FUNCTION_SIZE_WARN_THRESHOLD) + @test_logs (:warn, r"very large") export_function!(fm, bigf, jobid) + bigfd = function_descriptor(bigf) + bigf2 = import_function!(fm, bigfd, jobid) + @test bigf(100) == bigf2(100) + + biggerf = bigfactory(Ray.FUNCTION_SIZE_ERROR_THRESHOLD) + @test_throws ArgumentError export_function!(fm, biggerf, jobid) end # XXX: this works when run in global scope but unfortunately something about From 0a8046b24469a678db6f318177dff353c48e2450 Mon Sep 17 00:00:00 2001 From: Curtis Vogt Date: Wed, 25 Oct 2023 22:21:34 -0500 Subject: [PATCH 23/25] Add destructor to JuliaGcsClient Avoids this failure when `JuliaGcsClient` is garbage collected: ``` libc++abi: terminating due to uncaught exception of type std::runtime_error: GCS client not initialized; did you forget to Connect? [13612] signal (6): Abort trap: 6 in expression starting at /Users/cvogt/.julia/dev/Ray/test/runtests.jl:19 __pthread_kill at /usr/lib/system/libsystem_kernel.dylib (unknown line) Allocations: 30955062 (Pool: 30929299; Big: 25763); GC: 43 ERROR: Package Ray errored during testing (received signal: 6) ``` --- build/wrapper.h | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/build/wrapper.h b/build/wrapper.h index eea42a0f..4dad677c 100644 --- a/build/wrapper.h +++ b/build/wrapper.h @@ -39,8 +39,13 @@ class JuliaGcsClient { JuliaGcsClient(const ray::gcs::GcsClientOptions &options); JuliaGcsClient(const std::string &gcs_address); - ray::Status Connect(); + ~JuliaGcsClient() { + if (gcs_client_) { + this->Disconnect(); + } + } + ray::Status Connect(); void Disconnect(); // Get, Put, Exists, Keys use methods belonging to an InternalKV field of the GCSClient From 492845b046ed484518a41ad7bbf95f52b7a43c74 Mon Sep 17 00:00:00 2001 From: Curtis Vogt Date: Wed, 25 Oct 2023 22:51:37 -0500 Subject: [PATCH 24/25] fixup! Add destructor to JuliaGcsClient --- build/wrapper.h | 3 +++ 1 file changed, 3 insertions(+) diff --git a/build/wrapper.h b/build/wrapper.h index 4dad677c..a88874a0 100644 --- a/build/wrapper.h +++ b/build/wrapper.h @@ -40,7 +40,10 @@ class JuliaGcsClient { JuliaGcsClient(const std::string &gcs_address); ~JuliaGcsClient() { + // Automatically disconnect a client to avoid a SIGABRT (6) + // https://github.com/beacon-biosignals/Ray.jl/pull/211#issuecomment-1780070784 if (gcs_client_) { + std::cerr << "\x1B[31mWarning: Forgot to disconnect JuliaGcsClient\033[0m" << std::endl; this->Disconnect(); } } From 9bb69215d15cf92b67a8be89f7bef5ed285b61cc Mon Sep 17 00:00:00 2001 From: Curtis Vogt Date: Wed, 25 Oct 2023 21:01:23 -0500 Subject: [PATCH 25/25] Disconnect GCS client in function manager tests --- test/function_manager.jl | 107 +++++++++++++++++++-------------------- 1 file changed, 53 insertions(+), 54 deletions(-) diff --git a/test/function_manager.jl b/test/function_manager.jl index ab6979ac..1537a7f6 100644 --- a/test/function_manager.jl +++ b/test/function_manager.jl @@ -4,70 +4,69 @@ using .ray_julia_jll: JuliaGcsClient, Connect, Disconnect, function_descriptor, JuliaFunctionDescriptor, Exists - client = JuliaGcsClient("127.0.0.1:6379") - Connect(client) + Connect(JuliaGcsClient("127.0.0.1:6379")) do gcs_client + fm = FunctionManager(gcs_client, Dict{String,Any}()) - fm = FunctionManager(client, Dict{String,Any}()) + jobid = 1337 - jobid = 1337 + f = x -> isless(x, 5) + export_function!(fm, f, jobid) - f = x -> isless(x, 5) - export_function!(fm, f, jobid) + fd = function_descriptor(f) + key = function_key(fd, jobid) + @test ray_jll.Exists(fm.gcs_client, FUNCTION_MANAGER_NAMESPACE, key) + f2 = import_function!(fm, fd, jobid) - fd = function_descriptor(f) - key = function_key(fd, jobid) - @test ray_jll.Exists(fm.gcs_client, FUNCTION_MANAGER_NAMESPACE, key) - f2 = import_function!(fm, fd, jobid) + @test f2.(1:10) == f.(1:10) - @test f2.(1:10) == f.(1:10) + mfd = function_descriptor(MyMod.f) + @test_throws ErrorException import_function!(fm, mfd, jobid) + mkey = function_key(mfd, jobid) + @test !(ray_jll.Exists(fm.gcs_client, FUNCTION_MANAGER_NAMESPACE, mkey)) + export_function!(fm, MyMod.f, jobid) - mfd = function_descriptor(MyMod.f) - @test_throws ErrorException import_function!(fm, mfd, jobid) - mkey = function_key(mfd, jobid) - @test !(ray_jll.Exists(fm.gcs_client, FUNCTION_MANAGER_NAMESPACE, mkey)) - export_function!(fm, MyMod.f, jobid) + # can import the function even when it's aliased in another module: + nfd = function_descriptor(NotMyMod.f) + @test mfd.function_hash == nfd.function_hash + mf2 = import_function!(fm, nfd, jobid) + @test MyMod.f.(1:10) == mf2.(1:10) - # can import the function even when it's aliased in another module: - nfd = function_descriptor(NotMyMod.f) - @test mfd.function_hash == nfd.function_hash - mf2 = import_function!(fm, nfd, jobid) - @test MyMod.f.(1:10) == mf2.(1:10) + mmfd = function_descriptor(MyMod.MySubMod.f) + @test mmfd.module_name == "Main.MyMod.MySubMod" + @test mmfd.function_name == "f" + @test mmfd.function_hash != mfd.function_hash - mmfd = function_descriptor(MyMod.MySubMod.f) - @test mmfd.module_name == "Main.MyMod.MySubMod" - @test mmfd.function_name == "f" - @test mmfd.function_hash != mfd.function_hash + export_function!(fm, MyMod.MySubMod.f, jobid) + mmf2 = import_function!(fm, mmfd, jobid) + @test mmf2.(1:10) == MyMod.MySubMod.f.(1:10) != MyMod.f.(1:10) - export_function!(fm, MyMod.MySubMod.f, jobid) - mmf2 = import_function!(fm, mmfd, jobid) - @test mmf2.(1:10) == MyMod.MySubMod.f.(1:10) != MyMod.f.(1:10) - - fc = let - xthresh = 3 - x -> isless(x, xthresh) - end - export_function!(fm, fc, jobid) - - fcd = function_descriptor(fc) - fc2 = import_function!(fm, fcd, jobid) - @test fc.(1:10) == fc2.(1:10) != f.(1:10) - - @testset "warn/error for large functions" begin - # for some reason, using `let` to introduce local scope does not work - # during test execution to generate a closure, even though it works - # locally, so we use a factory instead: - function bigfactory(size) - x = rand(UInt8, size) - return f(y) = y * sum(x) + fc = let + xthresh = 3 + x -> isless(x, xthresh) + end + export_function!(fm, fc, jobid) + + fcd = function_descriptor(fc) + fc2 = import_function!(fm, fcd, jobid) + @test fc.(1:10) == fc2.(1:10) != f.(1:10) + + @testset "warn/error for large functions" begin + # for some reason, using `let` to introduce local scope does not work + # during test execution to generate a closure, even though it works + # locally, so we use a factory instead: + function bigfactory(size) + x = rand(UInt8, size) + return f(y) = y * sum(x) + end + bigf = bigfactory(Ray.FUNCTION_SIZE_WARN_THRESHOLD) + @test_logs (:warn, r"very large") export_function!(fm, bigf, jobid) + bigfd = function_descriptor(bigf) + bigf2 = import_function!(fm, bigfd, jobid) + @test bigf(100) == bigf2(100) + + biggerf = bigfactory(Ray.FUNCTION_SIZE_ERROR_THRESHOLD) + @test_throws ArgumentError export_function!(fm, biggerf, jobid) end - bigf = bigfactory(Ray.FUNCTION_SIZE_WARN_THRESHOLD) - @test_logs (:warn, r"very large") export_function!(fm, bigf, jobid) - bigfd = function_descriptor(bigf) - bigf2 = import_function!(fm, bigfd, jobid) - @test bigf(100) == bigf2(100) - - biggerf = bigfactory(Ray.FUNCTION_SIZE_ERROR_THRESHOLD) - @test_throws ArgumentError export_function!(fm, biggerf, jobid) end # XXX: this works when run in global scope but unfortunately something about