Skip to content

Commit

Permalink
[1.6.0 Release] Pop worker and friends (#17787)
Browse files Browse the repository at this point in the history
* [Core] Support ConcurrentGroup part1 (#16795)

* Core change and Java change.

* Fix void call.

* Address comments and fix cases.

* Fix asyncio

* Change core worker C++ namespace to ray::core (#17610)

* [C++ Worker] Replace `Ray::xxx` with `ray::xxx` and update namespaces (#17388)

* [core] make 'PopWorker' to be an async function (#17202)

* make 'PopWorker' to be an async function

* pop worker async works

* fix

* address comments

* bugfix

* fix cluster_task_manager_test

* fix

* bugfix of detached actor

* address comments

* fix

* address comments

* fix aioredis

* Revert "fix aioredis"

This reverts commit 041b983.

* bug fix

* fix

* fix test_step_resources test

* format

* add unit test

* fix

* add test case PopWorkerStatus

* address commit

* fix lint

* address comments

* add python test

* address comments

* make an independent function

* Update test_basic_3.py

Co-authored-by: Hao Chen <chenh1024@gmail.com>

Co-authored-by: Qing Wang <kingchin1218@gmail.com>
Co-authored-by: Hao Chen <chenh1024@gmail.com>
Co-authored-by: qicosmos <383121719@qq.com>
Co-authored-by: SongGuyang <guyang.sgy@antfin.com>
  • Loading branch information
5 people authored Aug 12, 2021
1 parent 3594f67 commit d49cc68
Show file tree
Hide file tree
Showing 159 changed files with 3,237 additions and 1,779 deletions.
23 changes: 10 additions & 13 deletions cpp/example/example.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
/// including the `<ray/api.h>` header
#include <ray/api.h>

/// using namespace
using namespace ::ray::api;

/// common function
int Plus(int x, int y) { return x + y; }
/// Declare remote function
Expand All @@ -46,31 +43,31 @@ RAY_REMOTE(Counter::FactoryCreate, &Counter::Add);

int main(int argc, char **argv) {
/// configuration and initialization
RayConfig config;
Ray::Init(config);
ray::RayConfig config;
ray::Init(config);

/// put and get object
auto object = Ray::Put(100);
auto put_get_result = *(Ray::Get(object));
auto object = ray::Put(100);
auto put_get_result = *(ray::Get(object));
std::cout << "put_get_result = " << put_get_result << std::endl;

/// common task
auto task_object = Ray::Task(Plus).Remote(1, 2);
int task_result = *(Ray::Get(task_object));
auto task_object = ray::Task(Plus).Remote(1, 2);
int task_result = *(ray::Get(task_object));
std::cout << "task_result = " << task_result << std::endl;

/// actor
ActorHandle<Counter> actor = Ray::Actor(Counter::FactoryCreate).Remote(0);
ray::ActorHandle<Counter> actor = ray::Actor(Counter::FactoryCreate).Remote(0);
/// actor task
auto actor_object = actor.Task(&Counter::Add).Remote(3);
int actor_task_result = *(Ray::Get(actor_object));
int actor_task_result = *(ray::Get(actor_object));
std::cout << "actor_task_result = " << actor_task_result << std::endl;
/// actor task with reference argument
auto actor_object2 = actor.Task(&Counter::Add).Remote(task_object);
int actor_task_result2 = *(Ray::Get(actor_object2));
int actor_task_result2 = *(ray::Get(actor_object2));
std::cout << "actor_task_result2 = " << actor_task_result2 << std::endl;

/// shutdown
Ray::Shutdown();
ray::Shutdown();
return 0;
}
263 changes: 127 additions & 136 deletions cpp/include/ray/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,117 +32,106 @@
#include <mutex>

namespace ray {
namespace api {
class Ray {
public:
/// Initialize Ray runtime with config.
static void Init(RayConfig &config);

/// Initialize Ray runtime with config and command-line arguments.
/// If a parameter is explicitly set in command-line arguments, the parameter value will
/// be overwritten.
static void Init(RayConfig &config, int *argc, char ***argv);

/// Initialize Ray runtime with default config.
static void Init();

/// Shutdown Ray runtime.
static void Shutdown();

/// Store an object in the object store.
///
/// \param[in] obj The object which should be stored.
/// \return ObjectRef A reference to the object in the object store.
template <typename T>
static ObjectRef<T> Put(const T &obj);

/// Get a single object from the object store.
/// This method will be blocked until the object is ready.
/// \param[in] object The object reference which should be returned.
/// \return shared pointer of the result.
/// \Throws RayException if task or worker failed, or object is unreconstructable.
template <typename T>
static std::shared_ptr<T> Get(const ObjectRef<T> &object);

/// Get a list of objects from the object store.
/// This method will be blocked until all the objects are ready.
///
/// \param[in] objects The object array which should be got.
/// \return shared pointer array of the result.
template <typename T>
static std::vector<std::shared_ptr<T>> Get(const std::vector<ObjectRef<T>> &objects);

/// Wait for a list of objects to be locally available,
/// until specified number of objects are ready, or specified timeout has passed.
///
/// \param[in] objects The object array which should be waited.
/// \param[in] num_objects The minimum number of objects to wait.
/// \param[in] timeout_ms The maximum wait time in milliseconds.
/// \return Two arrays, one containing locally available objects, one containing the
/// rest.
template <typename T>
static WaitResult<T> Wait(const std::vector<ObjectRef<T>> &objects, int num_objects,
int timeout_ms);

/// Create a `TaskCaller` for calling remote function.
/// It is used for normal task, such as Ray::Task(Plus1, 1), Ray::Task(Plus, 1, 2).
/// \param[in] func The function to be remote executed.
/// \param[in] args The function arguments passed by a value or ObjectRef.
/// \return TaskCaller.
template <typename F>
static TaskCaller<F> Task(F func);

/// Generic version of creating an actor
/// It is used for creating an actor, such as: ActorCreator<Counter> creator =
/// Ray::Actor(Counter::FactoryCreate<int>).Remote(1);
template <typename F>
static ActorCreator<F> Actor(F create_func);

/// Get a handle to a global named actor.
/// Gets a handle to a global named actor with the given name. The actor must have been
/// created with global name specified.
///
/// \param[in] name The global name of the named actor.
/// \return An ActorHandle to the actor if the actor of specified name exists or an
/// empty optional object.
template <typename T>
inline static boost::optional<ActorHandle<T>> GetGlobalActor(
const std::string &actor_name);

/// Intentionally exit the current actor.
/// It is used to disconnect an actor and exit the worker.
/// \Throws RayException if the current process is a driver or the current worker is not
/// an actor.
static void ExitActor() { ray::internal::RayRuntime()->ExitActor(); }

private:
static std::once_flag is_inited_;

template <typename T>
static std::vector<std::shared_ptr<T>> Get(const std::vector<std::string> &ids);

template <typename FuncType>
static TaskCaller<FuncType> TaskInternal(FuncType &func);

template <typename FuncType>
static ActorCreator<FuncType> CreateActorInternal(FuncType &func);

template <typename T>
inline static boost::optional<ActorHandle<T>> GetActorInternal(
bool global, const std::string &actor_name);
};

} // namespace api
} // namespace ray

// --------- inline implementation ------------
/// Initialize Ray runtime with config.
void Init(ray::RayConfig &config);

namespace ray {
namespace api {
/// Initialize Ray runtime with config and command-line arguments.
/// If a parameter is explicitly set in command-line arguments, the parameter value will
/// be overwritten.
void Init(ray::RayConfig &config, int *argc, char ***argv);

/// Initialize Ray runtime with default config.
void Init();

/// Shutdown Ray runtime.
void Shutdown();

/// Store an object in the object store.
///
/// \param[in] obj The object which should be stored.
/// \return ObjectRef A reference to the object in the object store.
template <typename T>
ray::ObjectRef<T> Put(const T &obj);

/// Get a single object from the object store.
/// This method will be blocked until the object is ready.
///
/// \param[in] object The object reference which should be returned.
/// \return shared pointer of the result.
template <typename T>
std::shared_ptr<T> Get(const ray::ObjectRef<T> &object);

/// Get a list of objects from the object store.
/// This method will be blocked until all the objects are ready.
///
/// \param[in] objects The object array which should be got.
/// \return shared pointer array of the result.
template <typename T>
std::vector<std::shared_ptr<T>> Get(const std::vector<ray::ObjectRef<T>> &objects);

/// Wait for a list of objects to be locally available,
/// until specified number of objects are ready, or specified timeout has passed.
///
/// \param[in] objects The object array which should be waited.
/// \param[in] num_objects The minimum number of objects to wait.
/// \param[in] timeout_ms The maximum wait time in milliseconds.
/// \return Two arrays, one containing locally available objects, one containing the
/// rest.
template <typename T>
WaitResult<T> Wait(const std::vector<ray::ObjectRef<T>> &objects, int num_objects,
int timeout_ms);

/// Create a `TaskCaller` for calling remote function.
/// It is used for normal task, such as ray::Task(Plus1, 1), ray::Task(Plus, 1, 2).
/// \param[in] func The function to be remote executed.
/// \param[in] args The function arguments passed by a value or ObjectRef.
/// \return TaskCaller.
template <typename F>
ray::internal::TaskCaller<F> Task(F func);

/// Generic version of creating an actor
/// It is used for creating an actor, such as: ActorCreator<Counter> creator =
/// ray::Actor(Counter::FactoryCreate<int>).Remote(1);
template <typename F>
ray::internal::ActorCreator<F> Actor(F create_func);

/// Get a handle to a global named actor.
/// Gets a handle to a global named actor with the given name. The actor must have been
/// created with global name specified.
///
/// \param[in] name The global name of the named actor.
/// \return An ActorHandle to the actor if the actor of specified name exists or an
/// empty optional object.
template <typename T>
boost::optional<ActorHandle<T>> GetGlobalActor(const std::string &actor_name);

/// Intentionally exit the current actor.
/// It is used to disconnect an actor and exit the worker.
/// \Throws RayException if the current process is a driver or the current worker is not
/// an actor.
inline void ExitActor() { ray::internal::GetRayRuntime()->ExitActor(); }

static std::once_flag is_inited_;

template <typename T>
std::vector<std::shared_ptr<T>> Get(const std::vector<std::string> &ids);

template <typename FuncType>
ray::internal::TaskCaller<FuncType> TaskInternal(FuncType &func);

template <typename FuncType>
ray::internal::ActorCreator<FuncType> CreateActorInternal(FuncType &func);

template <typename T>
inline static boost::optional<ActorHandle<T>> GetActorInternal(
bool global, const std::string &actor_name);

// --------- inline implementation ------------

template <typename T>
inline static std::vector<std::string> ObjectRefsToObjectIDs(
const std::vector<ObjectRef<T>> &object_refs) {
inline std::vector<std::string> ObjectRefsToObjectIDs(
const std::vector<ray::ObjectRef<T>> &object_refs) {
std::vector<std::string> object_ids;
for (auto it = object_refs.begin(); it != object_refs.end(); it++) {
object_ids.push_back(it->ID());
Expand All @@ -151,42 +140,45 @@ inline static std::vector<std::string> ObjectRefsToObjectIDs(
}

template <typename T>
inline ObjectRef<T> Ray::Put(const T &obj) {
auto buffer = std::make_shared<msgpack::sbuffer>(Serializer::Serialize(obj));
auto id = ray::internal::RayRuntime()->Put(buffer);
return ObjectRef<T>(id);
inline ray::ObjectRef<T> Put(const T &obj) {
auto buffer =
std::make_shared<msgpack::sbuffer>(ray::internal::Serializer::Serialize(obj));
auto id = ray::internal::GetRayRuntime()->Put(buffer);
return ray::ObjectRef<T>(id);
}

template <typename T>
inline std::shared_ptr<T> Ray::Get(const ObjectRef<T> &object) {
inline std::shared_ptr<T> Get(const ray::ObjectRef<T> &object) {
return GetFromRuntime(object);
}

template <typename T>
inline std::vector<std::shared_ptr<T>> Ray::Get(const std::vector<std::string> &ids) {
auto result = ray::internal::RayRuntime()->Get(ids);
inline std::vector<std::shared_ptr<T>> Get(const std::vector<std::string> &ids) {
auto result = ray::internal::GetRayRuntime()->Get(ids);
std::vector<std::shared_ptr<T>> return_objects;
return_objects.reserve(result.size());
for (auto it = result.begin(); it != result.end(); it++) {
auto obj = Serializer::Deserialize<std::shared_ptr<T>>((*it)->data(), (*it)->size());
auto obj = ray::internal::Serializer::Deserialize<std::shared_ptr<T>>((*it)->data(),
(*it)->size());
return_objects.push_back(std::move(obj));
}
return return_objects;
}

template <typename T>
inline std::vector<std::shared_ptr<T>> Ray::Get(const std::vector<ObjectRef<T>> &ids) {
inline std::vector<std::shared_ptr<T>> Get(const std::vector<ray::ObjectRef<T>> &ids) {
auto object_ids = ObjectRefsToObjectIDs<T>(ids);
return Get<T>(object_ids);
}

template <typename T>
inline WaitResult<T> Ray::Wait(const std::vector<ObjectRef<T>> &objects, int num_objects,
int timeout_ms) {
inline WaitResult<T> Wait(const std::vector<ray::ObjectRef<T>> &objects, int num_objects,
int timeout_ms) {
auto object_ids = ObjectRefsToObjectIDs<T>(objects);
auto results = ray::internal::RayRuntime()->Wait(object_ids, num_objects, timeout_ms);
std::list<ObjectRef<T>> readys;
std::list<ObjectRef<T>> unreadys;
auto results =
ray::internal::GetRayRuntime()->Wait(object_ids, num_objects, timeout_ms);
std::list<ray::ObjectRef<T>> readys;
std::list<ray::ObjectRef<T>> unreadys;
for (size_t i = 0; i < results.size(); i++) {
if (results[i] == true) {
readys.emplace_back(objects[i]);
Expand All @@ -198,39 +190,39 @@ inline WaitResult<T> Ray::Wait(const std::vector<ObjectRef<T>> &objects, int num
}

template <typename FuncType>
inline TaskCaller<FuncType> Ray::TaskInternal(FuncType &func) {
RemoteFunctionHolder remote_func_holder(func);
return TaskCaller<FuncType>(ray::internal::RayRuntime().get(),
std::move(remote_func_holder));
inline ray::internal::TaskCaller<FuncType> TaskInternal(FuncType &func) {
ray::internal::RemoteFunctionHolder remote_func_holder(func);
return ray::internal::TaskCaller<FuncType>(ray::internal::GetRayRuntime().get(),
std::move(remote_func_holder));
}

template <typename FuncType>
inline ActorCreator<FuncType> Ray::CreateActorInternal(FuncType &create_func) {
RemoteFunctionHolder remote_func_holder(create_func);
return ActorCreator<FuncType>(ray::internal::RayRuntime().get(),
std::move(remote_func_holder));
inline ray::internal::ActorCreator<FuncType> CreateActorInternal(FuncType &create_func) {
ray::internal::RemoteFunctionHolder remote_func_holder(create_func);
return ray::internal::ActorCreator<FuncType>(ray::internal::GetRayRuntime().get(),
std::move(remote_func_holder));
}

/// Normal task.
template <typename F>
TaskCaller<F> Ray::Task(F func) {
ray::internal::TaskCaller<F> Task(F func) {
return TaskInternal<F>(func);
}

/// Creating an actor.
template <typename F>
ActorCreator<F> Ray::Actor(F create_func) {
ray::internal::ActorCreator<F> Actor(F create_func) {
return CreateActorInternal<F>(create_func);
}

template <typename T>
boost::optional<ActorHandle<T>> Ray::GetActorInternal(bool global,
const std::string &actor_name) {
inline boost::optional<ActorHandle<T>> GetActorInternal(bool global,
const std::string &actor_name) {
if (actor_name.empty()) {
return {};
}

auto actor_id = ray::internal::RayRuntime()->GetActorId(global, actor_name);
auto actor_id = ray::internal::GetRayRuntime()->GetActorId(global, actor_name);
if (actor_id.empty()) {
return {};
}
Expand All @@ -239,9 +231,8 @@ boost::optional<ActorHandle<T>> Ray::GetActorInternal(bool global,
}

template <typename T>
boost::optional<ActorHandle<T>> Ray::GetGlobalActor(const std::string &actor_name) {
boost::optional<ActorHandle<T>> GetGlobalActor(const std::string &actor_name) {
return GetActorInternal<T>(true, actor_name);
}

} // namespace api
} // namespace ray
Loading

0 comments on commit d49cc68

Please sign in to comment.