Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change core worker C++ namespace to ray::core #17610

Merged
merged 27 commits into from
Aug 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cpp/src/ray/config_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
namespace ray {
namespace api {

using ray::core::WorkerType;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we avoid using in header files?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This header is not exposed to users. So I think it's fine. we should avoid using in api headers.


enum class RunMode { SINGLE_PROCESS, CLUSTER };

class ConfigInternal {
Expand Down
9 changes: 6 additions & 3 deletions cpp/src/ray/runtime/abstract_ray_runtime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ msgpack::sbuffer PackError(std::string error_msg) {
}
} // namespace internal
namespace api {

using ray::core::CoreWorkerProcess;
using ray::core::WorkerType;

std::shared_ptr<AbstractRayRuntime> AbstractRayRuntime::abstract_ray_runtime_ = nullptr;

std::shared_ptr<AbstractRayRuntime> AbstractRayRuntime::DoInit() {
Expand Down Expand Up @@ -200,8 +204,7 @@ std::string GetFullName(bool global, const std::string &name) {
return "";
}
return global ? name
: ::ray::CoreWorkerProcess::GetCoreWorker().GetCurrentJobId().Hex() +
"-" + name;
: CoreWorkerProcess::GetCoreWorker().GetCurrentJobId().Hex() + "-" + name;
}

/// TODO(qicosmos): Now only support global name, will support the name of a current job.
Expand Down Expand Up @@ -231,7 +234,7 @@ void AbstractRayRuntime::KillActor(const std::string &str_actor_id, bool no_rest

void AbstractRayRuntime::ExitActor() {
auto &core_worker = CoreWorkerProcess::GetCoreWorker();
if (ConfigInternal::Instance().worker_type != ray::WorkerType::WORKER ||
if (ConfigInternal::Instance().worker_type != WorkerType::WORKER ||
core_worker.GetActorId().IsNil()) {
throw std::logic_error("This shouldn't be called on a non-actor worker.");
}
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/ray/runtime/abstract_ray_runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
namespace ray {
namespace api {

using ray::core::WorkerContext;

class RayIntentionalSystemExitException : public RayException {
public:
RayIntentionalSystemExitException(const std::string &msg) : RayException(msg){};
Expand Down Expand Up @@ -99,4 +101,4 @@ class AbstractRayRuntime : public RayRuntime {
friend class Ray;
};
} // namespace api
} // namespace ray
} // namespace ray
4 changes: 2 additions & 2 deletions cpp/src/ray/runtime/local_mode_ray_runtime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace api {

LocalModeRayRuntime::LocalModeRayRuntime() {
worker_ = std::make_unique<WorkerContext>(
ray::WorkerType::DRIVER, ComputeDriverIdFromJob(JobID::Nil()), JobID::Nil());
ray::core::WorkerType::DRIVER, ComputeDriverIdFromJob(JobID::Nil()), JobID::Nil());
object_store_ = std::unique_ptr<ObjectStore>(new LocalModeObjectStore(*this));
task_submitter_ = std::unique_ptr<TaskSubmitter>(new LocalModeTaskSubmitter(*this));
}
Expand All @@ -38,4 +38,4 @@ ActorID LocalModeRayRuntime::GetNextActorID() {
}

} // namespace api
} // namespace ray
} // namespace ray
5 changes: 3 additions & 2 deletions cpp/src/ray/runtime/object/local_mode_object_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@

namespace ray {
namespace api {

LocalModeObjectStore::LocalModeObjectStore(LocalModeRayRuntime &local_mode_ray_tuntime)
: local_mode_ray_tuntime_(local_mode_ray_tuntime) {
memory_store_ = std::make_unique<::ray::CoreWorkerMemoryStore>();
memory_store_ = std::make_unique<CoreWorkerMemoryStore>();
}

void LocalModeObjectStore::PutRaw(std::shared_ptr<msgpack::sbuffer> data,
Expand Down Expand Up @@ -106,4 +107,4 @@ void LocalModeObjectStore::AddLocalReference(const std::string &id) { return; }

void LocalModeObjectStore::RemoveLocalReference(const std::string &id) { return; }
} // namespace api
} // namespace ray
} // namespace ray
6 changes: 4 additions & 2 deletions cpp/src/ray/runtime/object/local_mode_object_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
namespace ray {
namespace api {

using ray::core::CoreWorkerMemoryStore;

class LocalModeObjectStore : public ObjectStore {
public:
LocalModeObjectStore(LocalModeRayRuntime &local_mode_ray_tuntime);
Expand All @@ -44,10 +46,10 @@ class LocalModeObjectStore : public ObjectStore {
std::vector<std::shared_ptr<msgpack::sbuffer>> GetRaw(const std::vector<ObjectID> &ids,
int timeout_ms);

std::unique_ptr<::ray::CoreWorkerMemoryStore> memory_store_;
std::unique_ptr<CoreWorkerMemoryStore> memory_store_;

LocalModeRayRuntime &local_mode_ray_tuntime_;
};

} // namespace api
} // namespace ray
} // namespace ray
4 changes: 3 additions & 1 deletion cpp/src/ray/runtime/object/native_object_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
namespace ray {
namespace api {

using ray::core::CoreWorkerProcess;

void NativeObjectStore::PutRaw(std::shared_ptr<msgpack::sbuffer> data,
ObjectID *object_id) {
auto &core_worker = CoreWorkerProcess::GetCoreWorker();
Expand Down Expand Up @@ -128,4 +130,4 @@ void NativeObjectStore::RemoveLocalReference(const std::string &id) {
}
}
} // namespace api
} // namespace ray
} // namespace ray
2 changes: 1 addition & 1 deletion cpp/src/ray/runtime/object/native_object_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,4 @@ class NativeObjectStore : public ObjectStore {
};

} // namespace api
} // namespace ray
} // namespace ray
23 changes: 13 additions & 10 deletions cpp/src/ray/runtime/task/native_task_submitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
namespace ray {
namespace api {

using ray::core::CoreWorkerProcess;
using ray::core::TaskOptions;

RayFunction BuildRayFunction(InvocationSpec &invocation) {
auto function_descriptor = FunctionDescriptorBuilder::BuildCpp(
invocation.remote_function_holder.function_name);
Expand Down Expand Up @@ -56,16 +59,16 @@ ActorID NativeTaskSubmitter::CreateActor(InvocationSpec &invocation,
std::unordered_map<std::string, double> resources;
std::string name = create_options.name;
std::string ray_namespace = "";
ray::ActorCreationOptions actor_options{create_options.max_restarts,
/*max_task_retries=*/0,
create_options.max_concurrency,
create_options.resources,
resources,
/*dynamic_worker_options=*/{},
/*is_detached=*/false,
name,
ray_namespace,
/*is_asyncio=*/false};
ray::core::ActorCreationOptions actor_options{create_options.max_restarts,
/*max_task_retries=*/0,
create_options.max_concurrency,
create_options.resources,
resources,
/*dynamic_worker_options=*/{},
/*is_detached=*/false,
name,
ray_namespace,
/*is_asyncio=*/false};
ActorID actor_id;
auto status = core_worker.CreateActor(BuildRayFunction(invocation), invocation.args,
actor_options, "", &actor_id);
Expand Down
7 changes: 4 additions & 3 deletions cpp/src/ray/runtime/task/task_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ GetRemoteFunctions() {

namespace api {

using ray::core::CoreWorkerProcess;

std::shared_ptr<msgpack::sbuffer> TaskExecutor::current_actor_ = nullptr;

TaskExecutor::TaskExecutor(AbstractRayRuntime &abstract_ray_tuntime_)
Expand Down Expand Up @@ -174,7 +176,7 @@ Status TaskExecutor::ExecuteTask(
auto &result_id = return_ids[0];
auto result_ptr = &(*results)[0];
int64_t task_output_inlined_bytes = 0;
RAY_CHECK_OK(ray::CoreWorkerProcess::GetCoreWorker().AllocateReturnObject(
RAY_CHECK_OK(CoreWorkerProcess::GetCoreWorker().AllocateReturnObject(
result_id, data_size, meta_buffer, std::vector<ray::ObjectID>(),
task_output_inlined_bytes, result_ptr));

Expand All @@ -185,8 +187,7 @@ Status TaskExecutor::ExecuteTask(
}
}

RAY_CHECK_OK(
ray::CoreWorkerProcess::GetCoreWorker().SealReturnObject(result_id, result));
RAY_CHECK_OK(CoreWorkerProcess::GetCoreWorker().SealReturnObject(result_id, result));
}
return ray::Status::OK();
}
Expand Down
5 changes: 4 additions & 1 deletion cpp/src/ray/runtime/task/task_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
namespace ray {

namespace internal {

/// Execute remote functions by networking stream.
msgpack::sbuffer TaskExecutionHandler(const std::string &func_name,
const std::vector<msgpack::sbuffer> &args_buffer,
Expand All @@ -44,6 +45,8 @@ BOOST_DLL_ALIAS(internal::GetRemoteFunctions, GetRemoteFunctions);

namespace api {

using ray::core::RayFunction;

class AbstractRayRuntime;

class ActorContext {
Expand Down Expand Up @@ -85,4 +88,4 @@ class TaskExecutor {
static std::shared_ptr<msgpack::sbuffer> current_actor_;
};
} // namespace api
} // namespace ray
} // namespace ray
8 changes: 5 additions & 3 deletions cpp/src/ray/util/process_helper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
namespace ray {
namespace api {

using ray::core::CoreWorkerProcess;
using ray::core::WorkerType;

/// IP address by which the local node can be reached *from* the `address`.
///
/// The behavior should be the same as `node_ip_address_from_perspective` from Ray Python
Expand Down Expand Up @@ -77,8 +80,7 @@ void ProcessHelper::StopRayNode() {

void ProcessHelper::RayStart(CoreWorkerOptions::TaskExecutionCallback callback) {
std::string redis_ip = ConfigInternal::Instance().redis_ip;
if (ConfigInternal::Instance().worker_type == ray::WorkerType::DRIVER &&
redis_ip.empty()) {
if (ConfigInternal::Instance().worker_type == WorkerType::DRIVER && redis_ip.empty()) {
redis_ip = "127.0.0.1";
StartRayNode(ConfigInternal::Instance().redis_port,
ConfigInternal::Instance().redis_password);
Expand All @@ -99,7 +101,7 @@ void ProcessHelper::RayStart(CoreWorkerOptions::TaskExecutionCallback callback)
}

std::unique_ptr<ray::gcs::GlobalStateAccessor> global_state_accessor = nullptr;
if (ConfigInternal::Instance().worker_type == ray::WorkerType::DRIVER) {
if (ConfigInternal::Instance().worker_type == WorkerType::DRIVER) {
global_state_accessor.reset(new ray::gcs::GlobalStateAccessor(
redis_address, ConfigInternal::Instance().redis_password));
RAY_CHECK(global_state_accessor->Connect()) << "Failed to connect to GCS.";
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/ray/util/process_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
namespace ray {
namespace api {

using ray::core::CoreWorkerOptions;

class ProcessHelper {
public:
void RayStart(CoreWorkerOptions::TaskExecutionCallback callback);
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/ray/worker/default_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@

int main(int argc, char **argv) {
RAY_LOG(INFO) << "CPP default worker started.";
ray::api::ConfigInternal::Instance().worker_type = ray::WorkerType::WORKER;
ray::api::ConfigInternal::Instance().worker_type = ray::core::WorkerType::WORKER;
ray::api::RayConfig config;
ray::api::Ray::Init(config, &argc, &argv);
::ray::CoreWorkerProcess::RunTaskExecutionLoop();
::ray::core::CoreWorkerProcess::RunTaskExecutionLoop();
return 0;
}
32 changes: 16 additions & 16 deletions python/ray/includes/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,11 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil:
cdef extern from "src/ray/protobuf/common.pb.h" nogil:
cdef cppclass CLanguage "Language":
pass
cdef cppclass CWorkerType "ray::WorkerType":
cdef cppclass CWorkerType "ray::core::WorkerType":
pass
cdef cppclass CTaskType "ray::TaskType":
pass
cdef cppclass CPlacementStrategy "ray::PlacementStrategy":
cdef cppclass CPlacementStrategy "ray::core::PlacementStrategy":
pass
cdef cppclass CAddress "ray::rpc::Address":
CAddress()
Expand All @@ -166,11 +166,11 @@ cdef extern from "src/ray/protobuf/common.pb.h" nogil:
cdef CLanguage LANGUAGE_JAVA "Language::JAVA"

cdef extern from "src/ray/protobuf/common.pb.h" nogil:
cdef CWorkerType WORKER_TYPE_WORKER "ray::WorkerType::WORKER"
cdef CWorkerType WORKER_TYPE_DRIVER "ray::WorkerType::DRIVER"
cdef CWorkerType WORKER_TYPE_SPILL_WORKER "ray::WorkerType::SPILL_WORKER"
cdef CWorkerType WORKER_TYPE_RESTORE_WORKER "ray::WorkerType::RESTORE_WORKER" # noqa: E501
cdef CWorkerType WORKER_TYPE_UTIL_WORKER "ray::WorkerType::UTIL_WORKER" # noqa: E501
cdef CWorkerType WORKER_TYPE_WORKER "ray::core::WorkerType::WORKER"
cdef CWorkerType WORKER_TYPE_DRIVER "ray::core::WorkerType::DRIVER"
cdef CWorkerType WORKER_TYPE_SPILL_WORKER "ray::core::WorkerType::SPILL_WORKER" # noqa: E501
cdef CWorkerType WORKER_TYPE_RESTORE_WORKER "ray::core::WorkerType::RESTORE_WORKER" # noqa: E501
cdef CWorkerType WORKER_TYPE_UTIL_WORKER "ray::core::WorkerType::UTIL_WORKER" # noqa: E501

cdef extern from "src/ray/protobuf/common.pb.h" nogil:
cdef CTaskType TASK_TYPE_NORMAL_TASK "ray::TaskType::NORMAL_TASK"
Expand All @@ -179,13 +179,13 @@ cdef extern from "src/ray/protobuf/common.pb.h" nogil:

cdef extern from "src/ray/protobuf/common.pb.h" nogil:
cdef CPlacementStrategy PLACEMENT_STRATEGY_PACK \
"ray::PlacementStrategy::PACK"
"ray::core::PlacementStrategy::PACK"
cdef CPlacementStrategy PLACEMENT_STRATEGY_SPREAD \
"ray::PlacementStrategy::SPREAD"
"ray::core::PlacementStrategy::SPREAD"
cdef CPlacementStrategy PLACEMENT_STRATEGY_STRICT_PACK \
"ray::PlacementStrategy::STRICT_PACK"
"ray::core::PlacementStrategy::STRICT_PACK"
cdef CPlacementStrategy PLACEMENT_STRATEGY_STRICT_SPREAD \
"ray::PlacementStrategy::STRICT_SPREAD"
"ray::core::PlacementStrategy::STRICT_SPREAD"

cdef extern from "ray/common/task/scheduling_resources.h" nogil:
cdef cppclass ResourceSet "ray::ResourceSet":
Expand Down Expand Up @@ -230,7 +230,7 @@ cdef extern from "ray/common/ray_object.h" nogil:
c_bool IsInPlasmaError() const

cdef extern from "ray/core_worker/common.h" nogil:
cdef cppclass CRayFunction "ray::RayFunction":
cdef cppclass CRayFunction "ray::core::RayFunction":
CRayFunction()
CRayFunction(CLanguage language,
const CFunctionDescriptor &function_descriptor)
Expand All @@ -247,7 +247,7 @@ cdef extern from "ray/core_worker/common.h" nogil:
cdef cppclass CTaskArgByValue "ray::TaskArgByValue":
CTaskArgByValue(const shared_ptr[CRayObject] &data)

cdef cppclass CTaskOptions "ray::TaskOptions":
cdef cppclass CTaskOptions "ray::core::TaskOptions":
CTaskOptions()
CTaskOptions(c_string name, int num_returns,
unordered_map[c_string, double] &resources)
Expand All @@ -258,7 +258,7 @@ cdef extern from "ray/core_worker/common.h" nogil:
const unordered_map[c_string, c_string]
&override_environment_variables)

cdef cppclass CActorCreationOptions "ray::ActorCreationOptions":
cdef cppclass CActorCreationOptions "ray::core::ActorCreationOptions":
CActorCreationOptions()
CActorCreationOptions(
int64_t max_restarts,
Expand All @@ -276,7 +276,7 @@ cdef extern from "ray/core_worker/common.h" nogil:
&override_environment_variables)

cdef cppclass CPlacementGroupCreationOptions \
"ray::PlacementGroupCreationOptions":
"ray::core::PlacementGroupCreationOptions":
CPlacementGroupCreationOptions()
CPlacementGroupCreationOptions(
const c_string &name,
Expand All @@ -285,7 +285,7 @@ cdef extern from "ray/core_worker/common.h" nogil:
c_bool is_detached
)

cdef cppclass CObjectLocation "ray::ObjectLocation":
cdef cppclass CObjectLocation "ray::core::ObjectLocation":
const CNodeID &GetPrimaryNodeID() const
const uint64_t GetObjectSize() const
const c_vector[CNodeID] &GetNodeIDs() const
Expand Down
Loading