Skip to content

Commit

Permalink
Merge pull request #3 from yangzhong/issue-96
Browse files Browse the repository at this point in the history
Use another channel to update the status of a task set for executor
  • Loading branch information
kyotoYaho authored and GitHub Enterprise committed Jul 26, 2022
2 parents 913daca + 784b04f commit 35a33a9
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 17 deletions.
83 changes: 67 additions & 16 deletions ballista/rust/executor/src/executor_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ use ballista_core::serde::protobuf::executor_registration::OptionalHost;
use ballista_core::serde::protobuf::scheduler_grpc_client::SchedulerGrpcClient;
use ballista_core::serde::protobuf::{
HeartBeatParams, LaunchTaskParams, LaunchTaskResult, RegisterExecutorParams,
StopExecutorParams, StopExecutorResult, TaskDefinition, UpdateTaskStatusParams,
StopExecutorParams, StopExecutorResult, TaskDefinition, TaskStatus,
UpdateTaskStatusParams,
};
use ballista_core::serde::scheduler::ExecutorState;
use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_proto::logical_plan::AsLogicalPlan;
use tokio::sync::mpsc::error::TryRecvError;

use crate::as_task_status;
use crate::cpu_bound_executor::DedicatedExecutor;
Expand All @@ -53,11 +55,15 @@ pub async fn startup<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>(
) {
// TODO make the buffer size configurable
let (tx_task, rx_task) = mpsc::channel::<TaskDefinition>(1000);
let (tx_task_status, rx_task_status) = mpsc::channel::<TaskStatus>(1000);

let executor_server = ExecutorServer::new(
scheduler.clone(),
executor.clone(),
ExecutorEnv { tx_task },
ExecutorEnv {
tx_task,
tx_task_status,
},
codec,
);

Expand Down Expand Up @@ -103,7 +109,7 @@ pub async fn startup<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>(
// 4. Start TaskRunnerPool
{
let task_runner_pool = TaskRunnerPool::new(executor_server.clone());
task_runner_pool.start(rx_task).await;
task_runner_pool.start(rx_task, rx_task_status).await;
}
}

Expand Down Expand Up @@ -138,6 +144,7 @@ pub struct ExecutorServer<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPl
#[derive(Clone)]
struct ExecutorEnv {
tx_task: mpsc::Sender<TaskDefinition>,
tx_task_status: mpsc::Sender<TaskStatus>,
}

unsafe impl Sync for ExecutorEnv {}
Expand Down Expand Up @@ -248,18 +255,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,
debug!("Statistics: {:?}", execution_result);

let executor_id = &self.executor.metadata.id;
// TODO use another channel to update the status of a task set
self.scheduler
.clone()
.update_task_status(UpdateTaskStatusParams {
executor_id: executor_id.clone(),
task_status: vec![as_task_status(
execution_result,
executor_id.clone(),
task_id,
)],
})
.await?;
let task_status = as_task_status(execution_result, executor_id.clone(), task_id);

let task_status_sender = self.executor_env.tx_task_status.clone();
task_status_sender.send(task_status).await.unwrap();

Ok(())
}
Expand Down Expand Up @@ -302,7 +301,59 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskRunnerPool<T,
Self { executor_server }
}

async fn start(&self, mut rx_task: mpsc::Receiver<TaskDefinition>) {
async fn start(
&self,
mut rx_task: mpsc::Receiver<TaskDefinition>,
mut rx_task_status: mpsc::Receiver<TaskStatus>,
) {
// loop for task status reporting
let executor_server = self.executor_server.clone();
tokio::spawn(async move {
info!("Starting the task status reporter");
loop {
let mut tasks_status = vec![];
// First try to fetch task status from the channel in blocking mode
if let Some(task_status) = rx_task_status.recv().await {
tasks_status.push(task_status);
} else {
info!("Channel is closed and will exit the loop");
return;
}
// Then try to fetch by non-blocking mode to fetch as much finished tasks as possible
loop {
match rx_task_status.try_recv() {
Ok(task_status) => {
tasks_status.push(task_status);
}
Err(TryRecvError::Empty) => {
info!(
"Fetched {} tasks status to report",
tasks_status.len()
);
break;
}
Err(TryRecvError::Disconnected) => {
info!("Channel is closed and will exit the loop");
return;
}
}
}

if let Err(e) = executor_server
.scheduler
.clone()
.update_task_status(UpdateTaskStatusParams {
executor_id: executor_server.executor.metadata.id.clone(),
task_status: tasks_status.clone(),
})
.await
{
error!("Fail to update tasks {:?} due to {:?}", tasks_status, e);
}
}
});

// loop for task fetching and running
let executor_server = self.executor_server.clone();
tokio::spawn(async move {
info!("Starting the task runner pool");
Expand Down
1 change: 1 addition & 0 deletions ballista/rust/scheduler/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ impl DistributedPlanner {
new_plan,
None,
)?);
info!("finished planning");
Ok(stages)
}

Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/scheduler/src/scheduler_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
.unwrap_or_else(|| addr),
port: executor_metadata.port as u16,
grpc_port: executor_metadata.grpc_port as u16,
specification: executor_metadata.specification.unwrap().clone().into(),
specification: executor_metadata.specification.unwrap().into(),
};
// save the metadata to cache
self.state.save_executor_metadata(metadata.clone()).await?;
Expand Down

0 comments on commit 35a33a9

Please sign in to comment.