Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(meta): show recovery cause when request is rejected #13836

Merged
merged 1 commit into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 3 additions & 11 deletions src/meta/service/src/scale_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,7 @@ impl ScaleService for ScaleServiceImpl {
&self,
request: Request<RescheduleRequest>,
) -> Result<Response<RescheduleResponse>, Status> {
if !self.barrier_manager.is_running().await {
return Err(Status::unavailable(
"Rescheduling is unavailable for now. Likely the cluster is starting or recovering.",
));
}
self.barrier_manager.check_status_running().await?;

let RescheduleRequest {
reschedules,
Expand Down Expand Up @@ -190,13 +186,9 @@ impl ScaleService for ScaleServiceImpl {
&self,
request: Request<GetReschedulePlanRequest>,
) -> Result<Response<GetReschedulePlanResponse>, Status> {
let req = request.into_inner();
self.barrier_manager.check_status_running().await?;

if !self.barrier_manager.is_running().await {
return Err(Status::unavailable(
"Rescheduling is unavailable for now. Likely the cluster is starting or recovering.",
));
}
let req = request.into_inner();

let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await;

Expand Down
33 changes: 27 additions & 6 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,20 @@ pub(crate) type TableDefinitionMap = TableMap<String>;
pub(crate) type TableNotifierMap = TableMap<Notifier>;
pub(crate) type TableFragmentMap = TableMap<TableFragments>;

/// The reason why the cluster is recovering.
enum RecoveryReason {
/// After bootstrap.
Bootstrap,
/// After failure.
Failover(MetaError),
}

/// Status of barrier manager.
enum BarrierManagerStatus {
/// Barrier manager is starting.
Starting,
/// Barrier manager is under recovery.
Recovering,
Recovering(RecoveryReason),
/// Barrier manager is running.
Running,
}
Expand Down Expand Up @@ -566,10 +574,19 @@ impl GlobalBarrierManager {
(join_handle, shutdown_tx)
}

/// Return whether the barrier manager is running.
pub async fn is_running(&self) -> bool {
/// Check the status of barrier manager, return error if it is not `Running`.
pub async fn check_status_running(&self) -> MetaResult<()> {
let status = self.status.lock().await;
matches!(*status, BarrierManagerStatus::Running)
match &*status {
BarrierManagerStatus::Starting
| BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap) => {
bail!("The cluster is bootstrapping")
}
BarrierManagerStatus::Recovering(RecoveryReason::Failover(e)) => {
Err(anyhow::anyhow!(e.clone()).context("The cluster is recovering"))?
}
BarrierManagerStatus::Running => Ok(()),
}
}

/// Set barrier manager status.
Expand Down Expand Up @@ -631,7 +648,8 @@ impl GlobalBarrierManager {
// consistency.
// Even if there's no actor to recover, we still go through the recovery process to
// inject the first `Initial` barrier.
self.set_status(BarrierManagerStatus::Recovering).await;
self.set_status(BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap))
.await;
let span = tracing::info_span!("bootstrap_recovery", prev_epoch = prev_epoch.value().0);

let paused = self.take_pause_on_bootstrap().await.unwrap_or(false);
Expand Down Expand Up @@ -1010,7 +1028,10 @@ impl GlobalBarrierManager {
}

if self.enable_recovery {
self.set_status(BarrierManagerStatus::Recovering).await;
self.set_status(BarrierManagerStatus::Recovering(RecoveryReason::Failover(
err.clone(),
)))
.await;
let latest_snapshot = self.hummock_manager.latest_snapshot();
let prev_epoch = TracedEpoch::new(latest_snapshot.committed_epoch.into()); // we can only recovery from the committed epoch
let span = tracing::info_span!(
Expand Down
13 changes: 1 addition & 12 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,24 +235,13 @@ impl DdlController {
}
}

/// `check_barrier_manager_status` checks the status of the barrier manager, return unavailable
/// when it's not running.
async fn check_barrier_manager_status(&self) -> MetaResult<()> {
if !self.barrier_manager.is_running().await {
return Err(MetaError::unavailable(
"The cluster is starting or recovering",
));
}
Ok(())
}

/// `run_command` spawns a tokio coroutine to execute the target ddl command. When the client
/// has been interrupted during executing, the request will be cancelled by tonic. Since we have
/// a lot of logic for revert, status management, notification and so on, ensuring consistency
/// would be a huge hassle and pain if we don't spawn here.
pub async fn run_command(&self, command: DdlCommand) -> MetaResult<NotificationVersion> {
if !command.allow_in_recovery() {
self.check_barrier_manager_status().await?;
self.barrier_manager.check_status_running().await?;
}
let ctrl = self.clone();
let fut = async move {
Expand Down