Skip to content

Commit

Permalink
Revert "[core] Assign tasks to the first available worker (ray-projec…
Browse files Browse the repository at this point in the history
…t#18167)"

This reverts commit 545db13.
  • Loading branch information
rkooo567 committed Oct 7, 2021
1 parent fe413c3 commit ca15dc5
Show file tree
Hide file tree
Showing 25 changed files with 408 additions and 663 deletions.
2 changes: 1 addition & 1 deletion .buildkite/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@
- label: ":java: Java"
conditions: ["RAY_CI_JAVA_AFFECTED"]
commands:
- RAY_BACKEND_LOG_LEVEL=debug ./java/test.sh
- ./java/test.sh

- label: ":java: Streaming"
conditions:
Expand Down
3 changes: 2 additions & 1 deletion python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -959,7 +959,7 @@ cdef class CoreWorker:
JobID job_id, GcsClientOptions gcs_options, log_dir,
node_ip_address, node_manager_port, raylet_ip_address,
local_mode, driver_name, stdout_file, stderr_file,
serialized_job_config, metrics_agent_port,
serialized_job_config, metrics_agent_port, runtime_env_hash,
worker_shim_pid):
self.is_local_mode = local_mode

Expand Down Expand Up @@ -1009,6 +1009,7 @@ cdef class CoreWorker:
options.serialized_job_config = serialized_job_config
options.metrics_agent_port = metrics_agent_port
options.connect_on_start = False
options.runtime_env_hash = runtime_env_hash
options.worker_shim_pid = worker_shim_pid
CCoreWorkerProcess.Initialize(options)

Expand Down
1 change: 1 addition & 0 deletions python/ray/includes/libcoreworker.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
c_string serialized_job_config
int metrics_agent_port
c_bool connect_on_start
int runtime_env_hash
int worker_shim_pid

cdef cppclass CCoreWorkerProcess "ray::core::CoreWorkerProcess":
Expand Down
1 change: 0 additions & 1 deletion python/ray/tests/test_basic_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,6 @@ def get_num_workers():
num_workers_prefix = "- num PYTHON workers: "
if num_workers_prefix in line:
num_workers = int(line[len(num_workers_prefix):])
print(num_workers)
return num_workers
return None

Expand Down
5 changes: 4 additions & 1 deletion python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1255,6 +1255,7 @@ def connect(node,
job_id=None,
namespace=None,
job_config=None,
runtime_env_hash=0,
worker_shim_pid=0,
ray_debugger_external=False):
"""Connect this worker to the raylet, to Plasma, and to Redis.
Expand All @@ -1269,6 +1270,7 @@ def connect(node,
driver_object_store_memory: Deprecated.
job_id: The ID of job. If it's None, then we will generate one.
job_config (ray.job_config.JobConfig): The job configuration.
runtime_env_hash (int): The hash of the runtime env for this worker.
worker_shim_pid (int): The PID of the process for setup worker
runtime env.
ray_debugger_host (bool): The host to bind a Ray debugger to on
Expand Down Expand Up @@ -1388,7 +1390,8 @@ def connect(node,
gcs_options, node.get_logs_dir_path(), node.node_ip_address,
node.node_manager_port, node.raylet_ip_address, (mode == LOCAL_MODE),
driver_name, log_stdout_file_path, log_stderr_file_path,
serialized_job_config, node.metrics_agent_port, worker_shim_pid)
serialized_job_config, node.metrics_agent_port, runtime_env_hash,
worker_shim_pid)
worker.gcs_client = worker.core_worker.get_gcs_client()

# If it's a driver and it's not coming from ray client, we'll prepare the
Expand Down
7 changes: 7 additions & 0 deletions python/ray/workers/default_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@
default=ray_constants.LOGGING_ROTATE_BACKUP_COUNT,
help="Specify the backup count of rotated log file, default is "
f"{ray_constants.LOGGING_ROTATE_BACKUP_COUNT}.")
parser.add_argument(
"--runtime-env-hash",
required=False,
type=int,
default=0,
help="The computed hash of the runtime env for this worker.")
parser.add_argument(
"--worker-shim-pid",
required=False,
Expand Down Expand Up @@ -181,6 +187,7 @@
ray.worker.connect(
node,
mode=mode,
runtime_env_hash=args.runtime_env_hash,
worker_shim_pid=args.worker_shim_pid,
ray_debugger_external=args.ray_debugger_external)

Expand Down
31 changes: 13 additions & 18 deletions src/ray/common/task/task_spec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,18 +142,13 @@ bool TaskSpecification::HasRuntimeEnv() const {
return !(SerializedRuntimeEnv() == "{}" || SerializedRuntimeEnv() == "");
}

RuntimeEnvHash TaskSpecification::GetRuntimeEnvHash() const {
const auto &job_id = JobId();
if (!HasRuntimeEnv() && job_id.IsNil()) {
return 0;
}

int TaskSpecification::GetRuntimeEnvHash() const {
std::unordered_map<std::string, double> required_resource{};
if (RayConfig::instance().worker_resource_limits_enabled()) {
required_resource = GetRequiredResources().GetResourceMap();
}
WorkerCacheKey env = {job_id, SerializedRuntimeEnv(), required_resource};
return env.Hash();
WorkerCacheKey env = {SerializedRuntimeEnv(), required_resource};
return env.IntHash();
}

const SchedulingClass TaskSpecification::GetSchedulingClass() const {
Expand Down Expand Up @@ -398,13 +393,10 @@ std::string TaskSpecification::CallSiteString() const {
return stream.str();
}

WorkerCacheKey::WorkerCacheKey(const JobID &job_id) : WorkerCacheKey(job_id, "", {}) {}

WorkerCacheKey::WorkerCacheKey(
const JobID &job_id, const std::string &serialized_runtime_env,
const std::string serialized_runtime_env,
const std::unordered_map<std::string, double> required_resources)
: job_id_(job_id),
serialized_runtime_env(serialized_runtime_env),
: serialized_runtime_env(serialized_runtime_env),
required_resources(std::move(required_resources)) {}

bool WorkerCacheKey::operator==(const WorkerCacheKey &k) const {
Expand All @@ -417,13 +409,14 @@ bool WorkerCacheKey::EnvIsEmpty() const {
required_resources.empty();
}

RuntimeEnvHash WorkerCacheKey::Hash() const {
std::size_t WorkerCacheKey::Hash() const {
// Cache the hash value.
if (!hash_) {
if (!job_id_.IsNil()) {
hash_ = job_id_.Hash();
}
if (!EnvIsEmpty()) {
if (EnvIsEmpty()) {
// It's useful to have the same predetermined value for both unspecified and empty
// runtime envs.
hash_ = 0;
} else {
boost::hash_combine(hash_, serialized_runtime_env);

std::vector<std::pair<std::string, double>> resource_vars(
Expand All @@ -439,6 +432,8 @@ RuntimeEnvHash WorkerCacheKey::Hash() const {
return hash_;
}

int WorkerCacheKey::IntHash() const { return (int)Hash(); }

std::vector<ConcurrencyGroup> TaskSpecification::ConcurrencyGroups() const {
RAY_CHECK(IsActorCreationTask());
std::vector<ConcurrencyGroup> concurrency_groups;
Expand Down
22 changes: 10 additions & 12 deletions src/ray/common/task/task_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ namespace ray {
typedef ResourceSet SchedulingClassDescriptor;
typedef int SchedulingClass;

using RuntimeEnvHash = int64_t;

/// ConcurrencyGroup is a group of actor methods that shares
/// a executing thread pool.
struct ConcurrencyGroup {
Expand Down Expand Up @@ -108,7 +106,7 @@ class TaskSpecification : public MessageWrapper<rpc::TaskSpec> {

bool HasRuntimeEnv() const;

[[nodiscard]] RuntimeEnvHash GetRuntimeEnvHash() const;
int GetRuntimeEnvHash() const;

size_t NumArgs() const;

Expand Down Expand Up @@ -274,16 +272,12 @@ class TaskSpecification : public MessageWrapper<rpc::TaskSpec> {
/// Class used to cache workers, keyed by runtime_env.
class WorkerCacheKey {
public:
/// Constructor for a default runtime env.
///
explicit WorkerCacheKey(const JobID &job_id);

/// Create a cache key with the given environment variable overrides and serialized
/// runtime_env.
///
/// worker. \param serialized_runtime_env The JSON-serialized runtime env for this
/// worker. \param required_resources The required resouce.
WorkerCacheKey(const JobID &job_id, const std::string &serialized_runtime_env,
WorkerCacheKey(const std::string serialized_runtime_env,
const std::unordered_map<std::string, double> required_resources);

bool operator==(const WorkerCacheKey &k) const;
Expand All @@ -296,12 +290,16 @@ class WorkerCacheKey {

/// Get the hash for this worker's environment.
///
/// \return The hash of the override_environment_variables and the serialized
/// runtime_env.
RuntimeEnvHash Hash() const;
/// \return The hash of the serialized runtime_env.
std::size_t Hash() const;

/// Get the int-valued hash for this worker's environment, useful for portability in
/// flatbuffers.
///
/// \return The hash truncated to an int.
int IntHash() const;

private:
const JobID job_id_;
/// The JSON-serialized runtime env for this worker.
const std::string serialized_runtime_env;
/// The required resources for this worker.
Expand Down
6 changes: 3 additions & 3 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -432,9 +432,9 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
std::string serialized_job_config = options_.serialized_job_config;
local_raylet_client_ = std::make_shared<raylet::RayletClient>(
io_service_, std::move(grpc_client), options_.raylet_socket, GetWorkerID(),
options_.worker_type, worker_context_.GetCurrentJobID(), options_.language,
options_.node_ip_address, &raylet_client_status, &local_raylet_id, &assigned_port,
&serialized_job_config, options_.worker_shim_pid);
options_.worker_type, worker_context_.GetCurrentJobID(), options_.runtime_env_hash,
options_.language, options_.node_ip_address, &raylet_client_status,
&local_raylet_id, &assigned_port, &serialized_job_config, options_.worker_shim_pid);

if (!raylet_client_status.ok()) {
// Avoid using FATAL log or RAY_CHECK here because they may create a core dump file.
Expand Down
3 changes: 3 additions & 0 deletions src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ struct CoreWorkerOptions {
serialized_job_config(""),
metrics_agent_port(-1),
connect_on_start(true),
runtime_env_hash(0),
worker_shim_pid(0) {}

/// Type of this worker (i.e., DRIVER or WORKER).
Expand Down Expand Up @@ -181,6 +182,8 @@ struct CoreWorkerOptions {
/// ready. It should be explicitly startd by a caller using CoreWorker::Start.
/// TODO(sang): Use this method for Java and cpp frontend too.
bool connect_on_start;
/// The hash of the runtime env for this worker.
int runtime_env_hash;
/// The PID of the process for setup worker runtime env.
pid_t worker_shim_pid;
};
Expand Down
1 change: 1 addition & 0 deletions src/ray/core_worker/transport/direct_task_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ typedef std::function<std::shared_ptr<WorkerLeaseInterface>(const std::string &i
// be aware of the actor and is not able to manage it. It is also keyed on
// RuntimeEnvHash, because a worker can only run a task if the worker's RuntimeEnvHash
// matches the RuntimeEnvHash required by the task spec.
typedef int RuntimeEnvHash;
using SchedulingKey =
std::tuple<SchedulingClass, std::vector<ObjectID>, ActorID, RuntimeEnvHash>;

Expand Down
2 changes: 2 additions & 0 deletions src/ray/raylet/format/node_manager.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ table RegisterClientRequest {
worker_shim_pid: long;
// The job ID if the client is a driver, otherwise it should be NIL.
job_id: string;
// The hash of the runtime env for this worker.
runtime_env_hash: int;
// Language of this worker.
// TODO(hchen): Use `Language` in `common.proto`.
language: int;
Expand Down
7 changes: 2 additions & 5 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -492,10 +492,6 @@ ray::Status NodeManager::RegisterGcs() {
"NodeManager.deadline_timer.print_event_loop_stats");
}

periodical_runner_.RunFnPeriodically(
[this] { cluster_task_manager_->ScheduleAndDispatchTasks(); }, 100,
"NodeManager.schedule_and_dispatch_tasks");

return ray::Status::OK();
}

Expand Down Expand Up @@ -1045,6 +1041,7 @@ void NodeManager::ProcessRegisterClientRequestMessage(
auto message = flatbuffers::GetRoot<protocol::RegisterClientRequest>(message_data);
Language language = static_cast<Language>(message->language());
const JobID job_id = from_flatbuf<JobID>(*message->job_id());
const int runtime_env_hash = static_cast<int>(message->runtime_env_hash());
WorkerID worker_id = from_flatbuf<WorkerID>(*message->worker_id());
pid_t pid = message->worker_pid();
pid_t worker_shim_pid = message->worker_shim_pid();
Expand All @@ -1059,7 +1056,7 @@ void NodeManager::ProcessRegisterClientRequestMessage(
RAY_CHECK(job_id.IsNil());
}
auto worker = std::dynamic_pointer_cast<WorkerInterface>(
std::make_shared<Worker>(job_id, worker_id, language, worker_type,
std::make_shared<Worker>(job_id, runtime_env_hash, worker_id, language, worker_type,
worker_ip_address, client, client_call_manager_));

auto send_reply_callback = [this, client, job_id](Status status, int assigned_port) {
Expand Down
23 changes: 0 additions & 23 deletions src/ray/raylet/scheduling/cluster_task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ bool ClusterTaskManager::WaitForTaskArgsRequests(std::shared_ptr<Work> work) {
task_dependency_manager_.RequestTaskDependencies(task_id, task.GetDependencies());
if (args_ready) {
RAY_LOG(DEBUG) << "Args already ready, task can be dispatched " << task_id;
num_tasks_waiting_for_dispatch_++;
tasks_to_dispatch_[scheduling_key].push_back(work);
} else {
RAY_LOG(DEBUG) << "Waiting for args for task: "
Expand All @@ -150,7 +149,6 @@ bool ClusterTaskManager::WaitForTaskArgsRequests(std::shared_ptr<Work> work) {
} else {
RAY_LOG(DEBUG) << "No args, task can be dispatched "
<< task.GetTaskSpecification().TaskId();
num_tasks_waiting_for_dispatch_++;
tasks_to_dispatch_[scheduling_key].push_back(work);
}
return can_dispatch;
Expand Down Expand Up @@ -234,7 +232,6 @@ bool ClusterTaskManager::PoppedWorkerHandler(
// In other cases, set the work status `WAITING` to make this task
// could be re-dispatched.
work->status = WorkStatus::WAITING;
num_tasks_waiting_for_dispatch_++;
// Return here because we shouldn't remove task dependencies.
return dispatched;
}
Expand Down Expand Up @@ -267,11 +264,6 @@ bool ClusterTaskManager::PoppedWorkerHandler(
void ClusterTaskManager::DispatchScheduledTasksToWorkers(
WorkerPoolInterface &worker_pool,
std::unordered_map<WorkerID, std::shared_ptr<WorkerInterface>> &leased_workers) {
if (num_tasks_waiting_for_dispatch_ == 0) {
RAY_LOG(DEBUG) << "No new tasks since last call to dispatch, skipping";
return;
}

// Check every task in task_to_dispatch queue to see
// whether it can be dispatched and ran. This avoids head-of-line
// blocking where a task which cannot be dispatched because
Expand All @@ -291,7 +283,6 @@ void ClusterTaskManager::DispatchScheduledTasksToWorkers(
work_it++;
continue;
}
RAY_CHECK(work->status == WorkStatus::WAITING);

bool args_missing = false;
bool success = PinTaskArgsIfMemoryAvailable(spec, &args_missing);
Expand All @@ -305,8 +296,6 @@ void ClusterTaskManager::DispatchScheduledTasksToWorkers(
auto it = waiting_task_queue_.insert(waiting_task_queue_.begin(),
std::move(*work_it));
RAY_CHECK(waiting_tasks_index_.emplace(task_id, it).second);
RAY_CHECK(num_tasks_waiting_for_dispatch_ > 0);
num_tasks_waiting_for_dispatch_--;
work_it = dispatch_queue.erase(work_it);
} else {
// The task's args cannot be pinned due to lack of memory. We should
Expand Down Expand Up @@ -338,8 +327,6 @@ void ClusterTaskManager::DispatchScheduledTasksToWorkers(
task_dependency_manager_.RemoveTaskDependencies(task_id);
}
ReleaseTaskArgs(task_id);
RAY_CHECK(num_tasks_waiting_for_dispatch_ > 0);
num_tasks_waiting_for_dispatch_--;
work_it = dispatch_queue.erase(work_it);
continue;
}
Expand All @@ -365,8 +352,6 @@ void ClusterTaskManager::DispatchScheduledTasksToWorkers(
task_dependency_manager_.RemoveTaskDependencies(
task.GetTaskSpecification().TaskId());
}
RAY_CHECK(num_tasks_waiting_for_dispatch_ > 0);
num_tasks_waiting_for_dispatch_--;
work_it = dispatch_queue.erase(work_it);
} else {
// The local node has the available resources to run the task, so we should run
Expand All @@ -390,8 +375,6 @@ void ClusterTaskManager::DispatchScheduledTasksToWorkers(
is_detached_actor, owner_address);
},
allocated_instances_serialized_json);
RAY_CHECK(num_tasks_waiting_for_dispatch_ > 0);
num_tasks_waiting_for_dispatch_--;
work_it++;
}
}
Expand Down Expand Up @@ -460,8 +443,6 @@ void ClusterTaskManager::TasksUnblocked(const std::vector<TaskID> &ready_ids) {
const auto &scheduling_key = task.GetTaskSpecification().GetSchedulingClass();
RAY_LOG(DEBUG) << "Args ready, task can be dispatched "
<< task.GetTaskSpecification().TaskId();
RAY_CHECK(work->status == WorkStatus::WAITING);
num_tasks_waiting_for_dispatch_++;
tasks_to_dispatch_[scheduling_key].push_back(work);
waiting_task_queue_.erase(it->second);
waiting_tasks_index_.erase(it);
Expand Down Expand Up @@ -637,10 +618,6 @@ bool ClusterTaskManager::CancelTask(const TaskID &task_id,
task_dependency_manager_.RemoveTaskDependencies(
task.GetTaskSpecification().TaskId());
}
if ((*work_it)->status == WorkStatus::WAITING) {
RAY_CHECK(num_tasks_waiting_for_dispatch_ > 0);
num_tasks_waiting_for_dispatch_--;
}
(*work_it)->status = WorkStatus::CANCELLED;
work_queue.erase(work_it);
if (work_queue.empty()) {
Expand Down
Loading

0 comments on commit ca15dc5

Please sign in to comment.