From 33d9b0e4a4e5b328eddd6fa8203a715c42bf8da5 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 6 Dec 2023 18:04:25 +0800 Subject: [PATCH] feat(meta): show recovery reason when request is rejected Signed-off-by: Bugen Zhao --- src/meta/service/src/scale_service.rs | 14 +++--------- src/meta/src/barrier/mod.rs | 33 ++++++++++++++++++++++----- src/meta/src/rpc/ddl_controller.rs | 13 +---------- 3 files changed, 31 insertions(+), 29 deletions(-) diff --git a/src/meta/service/src/scale_service.rs b/src/meta/service/src/scale_service.rs index 39fc0bd90acca..3308ae43287f6 100644 --- a/src/meta/service/src/scale_service.rs +++ b/src/meta/service/src/scale_service.rs @@ -126,11 +126,7 @@ impl ScaleService for ScaleServiceImpl { &self, request: Request, ) -> Result, Status> { - if !self.barrier_manager.is_running().await { - return Err(Status::unavailable( - "Rescheduling is unavailable for now. Likely the cluster is starting or recovering.", - )); - } + self.barrier_manager.check_status_running().await?; let RescheduleRequest { reschedules, @@ -190,13 +186,9 @@ impl ScaleService for ScaleServiceImpl { &self, request: Request, ) -> Result, Status> { - let req = request.into_inner(); + self.barrier_manager.check_status_running().await?; - if !self.barrier_manager.is_running().await { - return Err(Status::unavailable( - "Rescheduling is unavailable for now. Likely the cluster is starting or recovering.", - )); - } + let req = request.into_inner(); let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await; diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index a929f50abfdec..9e3f74dd382c4 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -106,12 +106,20 @@ pub(crate) type TableDefinitionMap = TableMap; pub(crate) type TableNotifierMap = TableMap; pub(crate) type TableFragmentMap = TableMap; +/// The reason why the cluster is recovering. +enum RecoveryReason { + /// After bootstrap. + Bootstrap, + /// After failure. + Failover(MetaError), +} + /// Status of barrier manager. enum BarrierManagerStatus { /// Barrier manager is starting. Starting, /// Barrier manager is under recovery. - Recovering, + Recovering(RecoveryReason), /// Barrier manager is running. Running, } @@ -566,10 +574,19 @@ impl GlobalBarrierManager { (join_handle, shutdown_tx) } - /// Return whether the barrier manager is running. - pub async fn is_running(&self) -> bool { + /// Check the status of barrier manager, return error if it is not `Running`. + pub async fn check_status_running(&self) -> MetaResult<()> { let status = self.status.lock().await; - matches!(*status, BarrierManagerStatus::Running) + match &*status { + BarrierManagerStatus::Starting + | BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap) => { + bail!("The cluster is bootstrapping") + } + BarrierManagerStatus::Recovering(RecoveryReason::Failover(e)) => { + Err(anyhow::anyhow!(e.clone()).context("The cluster is recovering"))? + } + BarrierManagerStatus::Running => Ok(()), + } } /// Set barrier manager status. @@ -631,7 +648,8 @@ impl GlobalBarrierManager { // consistency. // Even if there's no actor to recover, we still go through the recovery process to // inject the first `Initial` barrier. - self.set_status(BarrierManagerStatus::Recovering).await; + self.set_status(BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap)) + .await; let span = tracing::info_span!("bootstrap_recovery", prev_epoch = prev_epoch.value().0); let paused = self.take_pause_on_bootstrap().await.unwrap_or(false); @@ -1010,7 +1028,10 @@ impl GlobalBarrierManager { } if self.enable_recovery { - self.set_status(BarrierManagerStatus::Recovering).await; + self.set_status(BarrierManagerStatus::Recovering(RecoveryReason::Failover( + err.clone(), + ))) + .await; let latest_snapshot = self.hummock_manager.latest_snapshot(); let prev_epoch = TracedEpoch::new(latest_snapshot.committed_epoch.into()); // we can only recovery from the committed epoch let span = tracing::info_span!( diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 98360eee83cb2..5e66821e441cc 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -235,24 +235,13 @@ impl DdlController { } } - /// `check_barrier_manager_status` checks the status of the barrier manager, return unavailable - /// when it's not running. - async fn check_barrier_manager_status(&self) -> MetaResult<()> { - if !self.barrier_manager.is_running().await { - return Err(MetaError::unavailable( - "The cluster is starting or recovering", - )); - } - Ok(()) - } - /// `run_command` spawns a tokio coroutine to execute the target ddl command. When the client /// has been interrupted during executing, the request will be cancelled by tonic. Since we have /// a lot of logic for revert, status management, notification and so on, ensuring consistency /// would be a huge hassle and pain if we don't spawn here. pub async fn run_command(&self, command: DdlCommand) -> MetaResult { if !command.allow_in_recovery() { - self.check_barrier_manager_status().await?; + self.barrier_manager.check_status_running().await?; } let ctrl = self.clone(); let fut = async move {