diff --git a/ballista/rust/executor/src/executor_server.rs b/ballista/rust/executor/src/executor_server.rs index ad2c88478..f9ee9e0ba 100644 --- a/ballista/rust/executor/src/executor_server.rs +++ b/ballista/rust/executor/src/executor_server.rs @@ -34,13 +34,15 @@ use ballista_core::serde::protobuf::executor_registration::OptionalHost; use ballista_core::serde::protobuf::scheduler_grpc_client::SchedulerGrpcClient; use ballista_core::serde::protobuf::{ HeartBeatParams, LaunchTaskParams, LaunchTaskResult, RegisterExecutorParams, - StopExecutorParams, StopExecutorResult, TaskDefinition, UpdateTaskStatusParams, + StopExecutorParams, StopExecutorResult, TaskDefinition, TaskStatus, + UpdateTaskStatusParams, }; use ballista_core::serde::scheduler::ExecutorState; use ballista_core::serde::{AsExecutionPlan, BallistaCodec}; use datafusion::execution::context::TaskContext; use datafusion::physical_plan::ExecutionPlan; use datafusion_proto::logical_plan::AsLogicalPlan; +use tokio::sync::mpsc::error::TryRecvError; use crate::as_task_status; use crate::cpu_bound_executor::DedicatedExecutor; @@ -53,11 +55,15 @@ pub async fn startup( ) { // TODO make the buffer size configurable let (tx_task, rx_task) = mpsc::channel::(1000); + let (tx_task_status, rx_task_status) = mpsc::channel::(1000); let executor_server = ExecutorServer::new( scheduler.clone(), executor.clone(), - ExecutorEnv { tx_task }, + ExecutorEnv { + tx_task, + tx_task_status, + }, codec, ); @@ -103,7 +109,7 @@ pub async fn startup( // 4. Start TaskRunnerPool { let task_runner_pool = TaskRunnerPool::new(executor_server.clone()); - task_runner_pool.start(rx_task).await; + task_runner_pool.start(rx_task, rx_task_status).await; } } @@ -138,6 +144,7 @@ pub struct ExecutorServer, + tx_task_status: mpsc::Sender, } unsafe impl Sync for ExecutorEnv {} @@ -248,18 +255,10 @@ impl ExecutorServer TaskRunnerPool) { + async fn start( + &self, + mut rx_task: mpsc::Receiver, + mut rx_task_status: mpsc::Receiver, + ) { + // loop for task status reporting + let executor_server = self.executor_server.clone(); + tokio::spawn(async move { + info!("Starting the task status reporter"); + loop { + let mut tasks_status = vec![]; + // First try to fetch task status from the channel in blocking mode + if let Some(task_status) = rx_task_status.recv().await { + tasks_status.push(task_status); + } else { + info!("Channel is closed and will exit the loop"); + return; + } + // Then try to fetch by non-blocking mode to fetch as much finished tasks as possible + loop { + match rx_task_status.try_recv() { + Ok(task_status) => { + tasks_status.push(task_status); + } + Err(TryRecvError::Empty) => { + info!( + "Fetched {} tasks status to report", + tasks_status.len() + ); + break; + } + Err(TryRecvError::Disconnected) => { + info!("Channel is closed and will exit the loop"); + return; + } + } + } + + if let Err(e) = executor_server + .scheduler + .clone() + .update_task_status(UpdateTaskStatusParams { + executor_id: executor_server.executor.metadata.id.clone(), + task_status: tasks_status.clone(), + }) + .await + { + error!("Fail to update tasks {:?} due to {:?}", tasks_status, e); + } + } + }); + + // loop for task fetching and running let executor_server = self.executor_server.clone(); tokio::spawn(async move { info!("Starting the task runner pool"); diff --git a/ballista/rust/scheduler/src/planner.rs b/ballista/rust/scheduler/src/planner.rs index 33dc3ac03..c4e3eef64 100644 --- a/ballista/rust/scheduler/src/planner.rs +++ b/ballista/rust/scheduler/src/planner.rs @@ -74,6 +74,7 @@ impl DistributedPlanner { new_plan, None, )?); + info!("finished planning"); Ok(stages) } diff --git a/ballista/rust/scheduler/src/scheduler_server/mod.rs b/ballista/rust/scheduler/src/scheduler_server/mod.rs index ae88efa14..7c8f5e53e 100644 --- a/ballista/rust/scheduler/src/scheduler_server/mod.rs +++ b/ballista/rust/scheduler/src/scheduler_server/mod.rs @@ -191,7 +191,7 @@ impl SchedulerServer