From 7d75d1d1a6d3698759ccdcee1471d82755049bf1 Mon Sep 17 00:00:00 2001 From: John Spray Date: Tue, 10 Oct 2023 12:34:48 +0100 Subject: [PATCH 01/22] pageserver: add InProgress top level state & make TenantsMap lock synchronous --- pageserver/src/consumption_metrics.rs | 2 +- pageserver/src/consumption_metrics/metrics.rs | 1 - pageserver/src/deletion_queue/check.log | 2 + pageserver/src/disk_usage_eviction_task.rs | 2 +- pageserver/src/http/routes.rs | 73 +- pageserver/src/page_service.rs | 2 +- pageserver/src/tenant.rs | 18 +- pageserver/src/tenant/delete.rs | 38 +- pageserver/src/tenant/mgr.rs | 914 +++++++++++------- .../src/tenant/timeline/eviction_task.rs | 15 +- 10 files changed, 658 insertions(+), 409 deletions(-) create mode 100644 pageserver/src/deletion_queue/check.log diff --git a/pageserver/src/consumption_metrics.rs b/pageserver/src/consumption_metrics.rs index 061045eb76b8..9e8377c1f1b0 100644 --- a/pageserver/src/consumption_metrics.rs +++ b/pageserver/src/consumption_metrics.rs @@ -266,7 +266,7 @@ async fn calculate_synthetic_size_worker( continue; } - if let Ok(tenant) = mgr::get_tenant(tenant_id, true).await { + if let Ok(tenant) = mgr::get_tenant(tenant_id, true) { // TODO should we use concurrent_background_tasks_rate_limit() here, like the other background tasks? // We can put in some prioritization for consumption metrics. // Same for the loop that fetches computed metrics. diff --git a/pageserver/src/consumption_metrics/metrics.rs b/pageserver/src/consumption_metrics/metrics.rs index 652dd98683d6..4986d38c1a1d 100644 --- a/pageserver/src/consumption_metrics/metrics.rs +++ b/pageserver/src/consumption_metrics/metrics.rs @@ -206,7 +206,6 @@ pub(super) async fn collect_all_metrics( None } else { crate::tenant::mgr::get_tenant(id, true) - .await .ok() .map(|tenant| (id, tenant)) } diff --git a/pageserver/src/deletion_queue/check.log b/pageserver/src/deletion_queue/check.log new file mode 100644 index 000000000000..c75c3092b2bd --- /dev/null +++ b/pageserver/src/deletion_queue/check.log @@ -0,0 +1,2 @@ + Checking pageserver v0.1.0 (/home/neon/neon/pageserver) + Finished dev [optimized + debuginfo] target(s) in 7.62s diff --git a/pageserver/src/disk_usage_eviction_task.rs b/pageserver/src/disk_usage_eviction_task.rs index 413c941bc4f6..36476275e9da 100644 --- a/pageserver/src/disk_usage_eviction_task.rs +++ b/pageserver/src/disk_usage_eviction_task.rs @@ -545,7 +545,7 @@ async fn collect_eviction_candidates( if cancel.is_cancelled() { return Ok(EvictionCandidates::Cancelled); } - let tenant = match tenant::mgr::get_tenant(*tenant_id, true).await { + let tenant = match tenant::mgr::get_tenant(*tenant_id, true) { Ok(tenant) => tenant, Err(e) => { // this can happen if tenant has lifecycle transition after we fetched it diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 2c46d733d678..591e20ae9a99 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -36,7 +36,8 @@ use crate::pgdatadir_mapping::LsnForTimestamp; use crate::task_mgr::TaskKind; use crate::tenant::config::{LocationConf, TenantConfOpt}; use crate::tenant::mgr::{ - GetTenantError, SetNewTenantConfigError, TenantMapInsertError, TenantStateError, + GetTenantError, SetNewTenantConfigError, TenantMapError, TenantMapInsertError, TenantSlotError, + TenantSlotUpsertError, TenantStateError, }; use crate::tenant::size::ModelInputs; use crate::tenant::storage_layer::LayerAccessStatsReset; @@ -147,16 +148,47 @@ impl From for ApiError { impl From for ApiError { fn from(tmie: TenantMapInsertError) -> ApiError { match tmie { - TenantMapInsertError::StillInitializing | TenantMapInsertError::ShuttingDown => { - ApiError::ResourceUnavailable(format!("{tmie}").into()) + TenantMapInsertError::SlotError(e) => e.into(), + TenantMapInsertError::SlotUpsertError(e) => e.into(), + TenantMapInsertError::Other(e) => ApiError::InternalServerError(e), + } + } +} + +impl From for ApiError { + fn from(e: TenantSlotError) -> ApiError { + use TenantSlotError::*; + match e { + NotFound(tenant_id) => { + ApiError::NotFound(anyhow::anyhow!("NotFound: tenant {tenant_id}").into()) } - TenantMapInsertError::TenantAlreadyExists(id, state) => { - ApiError::Conflict(format!("tenant {id} already exists, state: {state:?}")) + e @ AlreadyExists(_, _) => ApiError::Conflict(format!("{e}")), + e @ Conflict(_) => ApiError::Conflict(format!("{e}")), + InProgress => { + ApiError::ResourceUnavailable("Tenant is being modified concurrently".into()) } - TenantMapInsertError::TenantExistsSecondary(id) => { - ApiError::Conflict(format!("tenant {id} already exists as secondary")) + MapState(e) => e.into(), + } + } +} + +impl From for ApiError { + fn from(e: TenantSlotUpsertError) -> ApiError { + use TenantSlotUpsertError::*; + match e { + InternalError(e) => ApiError::InternalServerError(anyhow::anyhow!("{e}")), + MapState(e) => e.into(), + } + } +} + +impl From for ApiError { + fn from(e: TenantMapError) -> ApiError { + use TenantMapError::*; + match e { + StillInitializing | ShuttingDown => { + ApiError::ResourceUnavailable(format!("{e}").into()) } - TenantMapInsertError::Other(e) => ApiError::InternalServerError(e), } } } @@ -164,11 +196,12 @@ impl From for ApiError { impl From for ApiError { fn from(tse: TenantStateError) -> ApiError { match tse { - TenantStateError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid).into()), TenantStateError::IsStopping(_) => { ApiError::ResourceUnavailable("Tenant is stopping".into()) } - _ => ApiError::InternalServerError(anyhow::Error::new(tse)), + TenantStateError::SlotError(e) => e.into(), + TenantStateError::SlotUpsertError(e) => e.into(), + TenantStateError::Other(e) => ApiError::InternalServerError(anyhow!(e)), } } } @@ -243,6 +276,9 @@ impl From for ApiError { Get(g) => ApiError::from(g), e @ AlreadyInProgress => ApiError::Conflict(e.to_string()), Timeline(t) => ApiError::from(t), + NotAttached => ApiError::NotFound(anyhow::anyhow!("Tenant is not attached").into()), + SlotError(e) => e.into(), + SlotUpsertError(e) => e.into(), Other(o) => ApiError::InternalServerError(o), e @ InvalidState(_) => ApiError::PreconditionFailed(e.to_string().into_boxed_str()), } @@ -369,7 +405,7 @@ async fn timeline_create_handler( let state = get_state(&request); async { - let tenant = mgr::get_tenant(tenant_id, true).await?; + let tenant = mgr::get_tenant(tenant_id, true)?; match tenant.create_timeline( new_timeline_id, request_data.ancestor_timeline_id.map(TimelineId::from), @@ -416,7 +452,7 @@ async fn timeline_list_handler( let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let response_data = async { - let tenant = mgr::get_tenant(tenant_id, true).await?; + let tenant = mgr::get_tenant(tenant_id, true)?; let timelines = tenant.list_timelines(); let mut response_data = Vec::with_capacity(timelines.len()); @@ -455,7 +491,7 @@ async fn timeline_detail_handler( let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let timeline_info = async { - let tenant = mgr::get_tenant(tenant_id, true).await?; + let tenant = mgr::get_tenant(tenant_id, true)?; let timeline = tenant .get_timeline(timeline_id, false) @@ -713,7 +749,7 @@ async fn tenant_status( check_permission(&request, Some(tenant_id))?; let tenant_info = async { - let tenant = mgr::get_tenant(tenant_id, false).await?; + let tenant = mgr::get_tenant(tenant_id, false)?; // Calculate total physical size of all timelines let mut current_physical_size = 0; @@ -776,7 +812,7 @@ async fn tenant_size_handler( let headers = request.headers(); let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); - let tenant = mgr::get_tenant(tenant_id, true).await?; + let tenant = mgr::get_tenant(tenant_id, true)?; // this can be long operation let inputs = tenant @@ -1035,7 +1071,7 @@ async fn get_tenant_config_handler( let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; check_permission(&request, Some(tenant_id))?; - let tenant = mgr::get_tenant(tenant_id, false).await?; + let tenant = mgr::get_tenant(tenant_id, false)?; let response = HashMap::from([ ( @@ -1094,7 +1130,7 @@ async fn put_tenant_location_config_handler( .await { match e { - TenantStateError::NotFound(_) => { + TenantStateError::SlotError(TenantSlotError::NotFound(_)) => { // This API is idempotent: a NotFound on a detach is fine. } _ => return Err(e.into()), @@ -1132,7 +1168,6 @@ async fn handle_tenant_break( let tenant_id: TenantId = parse_request_param(&r, "tenant_id")?; let tenant = crate::tenant::mgr::get_tenant(tenant_id, true) - .await .map_err(|_| ApiError::Conflict(String::from("no active tenant found")))?; tenant.set_broken("broken from test".to_owned()).await; @@ -1437,7 +1472,7 @@ async fn active_timeline_of_active_tenant( tenant_id: TenantId, timeline_id: TimelineId, ) -> Result, ApiError> { - let tenant = mgr::get_tenant(tenant_id, true).await?; + let tenant = mgr::get_tenant(tenant_id, true)?; tenant .get_timeline(timeline_id, true) .map_err(|e| ApiError::NotFound(e.into())) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 536334d051ce..4d265a8c6f29 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -1314,7 +1314,7 @@ async fn get_active_tenant_with_timeout( tenant_id: TenantId, _ctx: &RequestContext, /* require get a context to support cancellation in the future */ ) -> Result, GetActiveTenantError> { - let tenant = match mgr::get_tenant(tenant_id, false).await { + let tenant = match mgr::get_tenant(tenant_id, false) { Ok(tenant) => tenant, Err(e @ GetTenantError::NotFound(_)) => return Err(GetActiveTenantError::NotFound(e)), Err(GetTenantError::NotActive(_)) => { diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 3a426ac87beb..62da6b618ef1 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -254,6 +254,12 @@ pub struct Tenant { pub(crate) delete_progress: Arc>, } +impl std::fmt::Debug for Tenant { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{} ({})", self.tenant_id, self.current_state()) + } +} + pub(crate) enum WalRedoManager { Prod(PostgresRedoManager), #[cfg(test)] @@ -526,7 +532,7 @@ impl Tenant { resources: TenantSharedResources, attached_conf: AttachedTenantConf, init_order: Option, - tenants: &'static tokio::sync::RwLock, + tenants: &'static std::sync::RwLock, mode: SpawnMode, ctx: &RequestContext, ) -> anyhow::Result> { @@ -1833,6 +1839,7 @@ impl Tenant { } Err(SetStoppingError::AlreadyStopping(other)) => { // give caller the option to wait for this this shutdown + info!("Tenant::shutdown: AlreadyStopping"); return Err(other); } }; @@ -2110,6 +2117,9 @@ where } impl Tenant { + pub fn get_tenant_id(&self) -> TenantId { + self.tenant_id + } pub fn tenant_specific_overrides(&self) -> TenantConfOpt { self.tenant_conf.read().unwrap().tenant_conf } @@ -4236,11 +4246,7 @@ mod tests { metadata_bytes[8] ^= 1; std::fs::write(metadata_path, metadata_bytes)?; - let err = harness - .try_load_local(&ctx) - .await - .err() - .expect("should fail"); + let err = harness.try_load_local(&ctx).await.expect_err("should fail"); // get all the stack with all .context, not only the last one let message = format!("{err:#}"); let expected = "failed to load metadata"; diff --git a/pageserver/src/tenant/delete.rs b/pageserver/src/tenant/delete.rs index 87b48e4beebf..7344dd1d92dd 100644 --- a/pageserver/src/tenant/delete.rs +++ b/pageserver/src/tenant/delete.rs @@ -21,7 +21,7 @@ use crate::{ }; use super::{ - mgr::{GetTenantError, TenantsMap}, + mgr::{GetTenantError, TenantSlotError, TenantSlotUpsertError, TenantsMap}, remote_timeline_client::{FAILED_REMOTE_OP_RETRIES, FAILED_UPLOAD_WARN_THRESHOLD}, span, timeline::delete::DeleteTimelineFlow, @@ -33,12 +33,21 @@ pub(crate) enum DeleteTenantError { #[error("GetTenant {0}")] Get(#[from] GetTenantError), + #[error("Tenant not attached")] + NotAttached, + #[error("Invalid state {0}. Expected Active or Broken")] InvalidState(TenantState), #[error("Tenant deletion is already in progress")] AlreadyInProgress, + #[error("Tenant map slot error {0}")] + SlotError(#[from] TenantSlotError), + + #[error("Tenant map slot upsert error {0}")] + SlotUpsertError(#[from] TenantSlotUpsertError), + #[error("Timeline {0}")] Timeline(#[from] DeleteTimelineError), @@ -273,12 +282,12 @@ impl DeleteTenantFlow { pub(crate) async fn run( conf: &'static PageServerConf, remote_storage: Option, - tenants: &'static tokio::sync::RwLock, - tenant_id: TenantId, + tenants: &'static std::sync::RwLock, + tenant: Arc, ) -> Result<(), DeleteTenantError> { span::debug_assert_current_span_has_tenant_id(); - let (tenant, mut guard) = Self::prepare(tenants, tenant_id).await?; + let mut guard = Self::prepare(&tenant).await?; if let Err(e) = Self::run_inner(&mut guard, conf, remote_storage.as_ref(), &tenant).await { tenant.set_broken(format!("{e:#}")).await; @@ -378,7 +387,7 @@ impl DeleteTenantFlow { guard: DeletionGuard, tenant: &Arc, preload: Option, - tenants: &'static tokio::sync::RwLock, + tenants: &'static std::sync::RwLock, init_order: Option, ctx: &RequestContext, ) -> Result<(), DeleteTenantError> { @@ -405,15 +414,8 @@ impl DeleteTenantFlow { } async fn prepare( - tenants: &tokio::sync::RwLock, - tenant_id: TenantId, - ) -> Result<(Arc, tokio::sync::OwnedMutexGuard), DeleteTenantError> { - let m = tenants.read().await; - - let tenant = m - .get(&tenant_id) - .ok_or(GetTenantError::NotFound(tenant_id))?; - + tenant: &Arc, + ) -> Result, DeleteTenantError> { // FIXME: unsure about active only. Our init jobs may not be cancellable properly, // so at least for now allow deletions only for active tenants. TODO recheck // Broken and Stopping is needed for retries. @@ -447,14 +449,14 @@ impl DeleteTenantFlow { ))); } - Ok((Arc::clone(tenant), guard)) + Ok(guard) } fn schedule_background( guard: OwnedMutexGuard, conf: &'static PageServerConf, remote_storage: Option, - tenants: &'static tokio::sync::RwLock, + tenants: &'static std::sync::RwLock, tenant: Arc, ) { let tenant_id = tenant.tenant_id; @@ -487,7 +489,7 @@ impl DeleteTenantFlow { mut guard: OwnedMutexGuard, conf: &PageServerConf, remote_storage: Option, - tenants: &'static tokio::sync::RwLock, + tenants: &'static std::sync::RwLock, tenant: &Arc, ) -> Result<(), DeleteTenantError> { // Tree sort timelines, schedule delete for them. Mention retries from the console side. @@ -535,7 +537,7 @@ impl DeleteTenantFlow { .await .context("cleanup_remaining_fs_traces")?; - let mut locked = tenants.write().await; + let mut locked = tenants.write().unwrap(); if locked.remove(&tenant.tenant_id).is_none() { warn!("Tenant got removed from tenants map during deletion"); }; diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 33fdc76f8d65..d6e851344449 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -3,13 +3,14 @@ use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf}; use rand::{distributions::Alphanumeric, Rng}; -use std::collections::{hash_map, HashMap}; +use std::borrow::Cow; +use std::collections::HashMap; +use std::ops::Deref; use std::sync::Arc; use tokio::fs; use anyhow::Context; use once_cell::sync::Lazy; -use tokio::sync::RwLock; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; use tracing::*; @@ -47,26 +48,31 @@ use super::TenantSharedResources; /// that way we avoid having to carefully switch a tenant's ingestion etc on and off during /// its lifetime, and we can preserve some important safety invariants like `Tenant` always /// having a properly acquired generation (Secondary doesn't need a generation) -#[derive(Clone)] pub(crate) enum TenantSlot { Attached(Arc), Secondary, + /// In this state, other administrative operations acting on the TenantId should + /// block, or return a retry indicator equivalent to HTTP 503. + InProgress(utils::completion::Barrier), } -impl TenantSlot { - /// Return the `Tenant` in this slot if attached, else None - fn get_attached(&self) -> Option<&Arc> { +impl std::fmt::Debug for TenantSlot { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Self::Attached(t) => Some(t), - Self::Secondary => None, + Self::Attached(tenant) => write!(f, "Attached({})", tenant.current_state()), + Self::Secondary => write!(f, "Secondary"), + Self::InProgress(_) => write!(f, "InProgress"), } } +} - /// Consume self and return the `Tenant` that was in this slot if attached, else None - fn into_attached(self) -> Option> { +impl TenantSlot { + /// Return the `Tenant` in this slot if attached, else None + fn get_attached(&self) -> Option<&Arc> { match self { Self::Attached(t) => Some(t), Self::Secondary => None, + Self::InProgress(_) => None, } } } @@ -77,7 +83,7 @@ pub(crate) enum TenantsMap { /// [`init_tenant_mgr`] is not done yet. Initializing, /// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded. - /// New tenants can be added using [`tenant_map_insert`]. + /// New tenants can be added using [`tenant_map_acquire_slot`]. Open(HashMap), /// The pageserver has entered shutdown mode via [`shutdown_all_tenants`]. /// Existing tenants are still accessible, but no new tenants can be created. @@ -97,19 +103,10 @@ impl TenantsMap { } } - /// Get the contents of the map at this tenant ID, even if it is in secondary state. - pub(crate) fn get_slot(&self, tenant_id: &TenantId) -> Option<&TenantSlot> { + pub(crate) fn remove(&mut self, tenant_id: &TenantId) -> Option { match self { TenantsMap::Initializing => None, - TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.get(tenant_id), - } - } - pub(crate) fn remove(&mut self, tenant_id: &TenantId) -> Option> { - match self { - TenantsMap::Initializing => None, - TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => { - m.remove(tenant_id).and_then(TenantSlot::into_attached) - } + TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.remove(tenant_id), } } } @@ -147,7 +144,8 @@ async fn safe_rename_tenant_dir(path: impl AsRef) -> std::io::Result> = Lazy::new(|| RwLock::new(TenantsMap::Initializing)); +static TENANTS: Lazy> = + Lazy::new(|| std::sync::RwLock::new(TenantsMap::Initializing)); /// Create a directory, including parents. This does no fsyncs and makes /// no guarantees about the persistence of the resulting metadata: for @@ -456,7 +454,7 @@ pub async fn init_tenant_mgr( info!("Processed {} local tenants at startup", tenants.len()); - let mut tenants_map = TENANTS.write().await; + let mut tenants_map = TENANTS.write().unwrap(); assert!(matches!(&*tenants_map, &TenantsMap::Initializing)); *tenants_map = TenantsMap::Open(tenants); Ok(()) @@ -472,7 +470,7 @@ pub(crate) fn tenant_spawn( resources: TenantSharedResources, location_conf: AttachedTenantConf, init_order: Option, - tenants: &'static tokio::sync::RwLock, + tenants: &'static std::sync::RwLock, mode: SpawnMode, ctx: &RequestContext, ) -> anyhow::Result> { @@ -533,12 +531,13 @@ pub(crate) async fn shutdown_all_tenants() { shutdown_all_tenants0(&TENANTS).await } -async fn shutdown_all_tenants0(tenants: &tokio::sync::RwLock) { +async fn shutdown_all_tenants0(tenants: &std::sync::RwLock) { use utils::completion; - // Prevent new tenants from being created. - let tenants_to_shut_down = { - let mut m = tenants.write().await; + // Under write lock (prevent any new tenants being created), extract the list + // of tenants to shut down. + let (in_progress_ops, tenants_to_shut_down) = { + let mut m = tenants.write().unwrap(); match &mut *m { TenantsMap::Initializing => { *m = TenantsMap::ShuttingDown(HashMap::default()); @@ -546,9 +545,28 @@ async fn shutdown_all_tenants0(tenants: &tokio::sync::RwLock) { return; } TenantsMap::Open(tenants) => { - let tenants_clone = tenants.clone(); - *m = TenantsMap::ShuttingDown(std::mem::take(tenants)); - tenants_clone + let mut shutdown_state = HashMap::new(); + let mut in_progress_ops = Vec::new(); + let mut tenants_to_shut_down = Vec::new(); + + for (k, v) in tenants.drain() { + match v { + TenantSlot::Attached(t) => { + tenants_to_shut_down.push(t.clone()); + shutdown_state.insert(k, TenantSlot::Attached(t)); + } + TenantSlot::Secondary => { + shutdown_state.insert(k, TenantSlot::Secondary); + } + TenantSlot::InProgress(notify) => { + // InProgress tenants are not visible in TenantsMap::ShuttingDown: we will + // wait for their notifications to fire in this function. + in_progress_ops.push(notify); + } + } + } + *m = TenantsMap::ShuttingDown(shutdown_state); + (in_progress_ops, tenants_to_shut_down) } TenantsMap::ShuttingDown(_) => { // TODO: it is possible that detach and shutdown happen at the same time. as a @@ -559,25 +577,29 @@ async fn shutdown_all_tenants0(tenants: &tokio::sync::RwLock) { } }; + info!( + "Waiting for {} InProgress tenants to complete...", + in_progress_ops.len() + ); + for barrier in in_progress_ops { + barrier.wait().await; + } + + info!( + "Waiting for {} attached tenants to shut down...", + tenants_to_shut_down.len() + ); let started_at = std::time::Instant::now(); let mut join_set = JoinSet::new(); - for (tenant_id, tenant) in tenants_to_shut_down { + for tenant in tenants_to_shut_down { + let tenant_id = tenant.get_tenant_id(); join_set.spawn( async move { let freeze_and_flush = true; let res = { let (_guard, shutdown_progress) = completion::channel(); - match tenant { - TenantSlot::Attached(t) => { - t.shutdown(shutdown_progress, freeze_and_flush).await - } - TenantSlot::Secondary => { - // TODO: once secondary mode downloads are implemented, - // ensure they have all stopped before we reach this point. - Ok(()) - } - } + tenant.shutdown(shutdown_progress, freeze_and_flush).await }; if let Err(other_progress) = res { @@ -649,42 +671,35 @@ pub(crate) async fn create_tenant( resources: TenantSharedResources, ctx: &RequestContext, ) -> Result, TenantMapInsertError> { - tenant_map_insert(tenant_id, || async { - let location_conf = LocationConf::attached_single(tenant_conf, generation); + let location_conf = LocationConf::attached_single(tenant_conf, generation); - // We're holding the tenants lock in write mode while doing local IO. - // If this section ever becomes contentious, introduce a new `TenantState::Creating` - // and do the work in that state. - super::create_tenant_files(conf, &location_conf, &tenant_id).await?; + let tenant_guard = tenant_map_acquire_slot(&tenant_id, Some(false))?; + let tenant_path = super::create_tenant_files(conf, &location_conf, &tenant_id).await?; - // TODO: tenant directory remains on disk if we bail out from here on. - // See https://github.com/neondatabase/neon/issues/4233 + let created_tenant = tenant_spawn( + conf, + tenant_id, + &tenant_path, + resources, + AttachedTenantConf::try_from(location_conf)?, + None, + &TENANTS, + SpawnMode::Create, + ctx, + )?; + // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here. + // See https://github.com/neondatabase/neon/issues/4233 + + let created_tenant_id = created_tenant.tenant_id(); + if tenant_id != created_tenant_id { + return Err(TenantMapInsertError::Other(anyhow::anyhow!( + "loaded created tenant has unexpected tenant id (expect {tenant_id} != actual {created_tenant_id})", + ))); + } - let tenant_path = conf.tenant_path(&tenant_id); + tenant_guard.upsert(TenantSlot::Attached(created_tenant.clone()))?; - let created_tenant = tenant_spawn( - conf, - tenant_id, - &tenant_path, - resources, - AttachedTenantConf::try_from(location_conf)?, - None, - &TENANTS, - SpawnMode::Create, - ctx, - )?; - // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here. - // See https://github.com/neondatabase/neon/issues/4233 - - let crated_tenant_id = created_tenant.tenant_id(); - anyhow::ensure!( - tenant_id == crated_tenant_id, - "loaded created tenant has unexpected tenant id \ - (expect {tenant_id} != actual {crated_tenant_id})", - ); - Ok(created_tenant) - }) - .await + Ok(created_tenant) } #[derive(Debug, thiserror::Error)] @@ -701,7 +716,7 @@ pub(crate) async fn set_new_tenant_config( tenant_id: TenantId, ) -> Result<(), SetNewTenantConfigError> { info!("configuring tenant {tenant_id}"); - let tenant = get_tenant(tenant_id, true).await?; + let tenant = get_tenant(tenant_id, true)?; // This is a legacy API that only operates on attached tenants: the preferred // API to use is the location_config/ endpoint, which lets the caller provide @@ -727,32 +742,49 @@ pub(crate) async fn upsert_location( ) -> Result<(), anyhow::Error> { info!("configuring tenant location {tenant_id} to state {new_location_config:?}"); - let mut existing_tenant = match get_tenant(tenant_id, false).await { - Ok(t) => Some(t), - Err(GetTenantError::NotFound(_)) => None, - Err(e) => anyhow::bail!(e), - }; - - // If we need to shut down a Tenant, do that first - let shutdown_tenant = match (&new_location_config.mode, &existing_tenant) { - (LocationMode::Secondary(_), Some(t)) => Some(t), - (LocationMode::Attached(attach_conf), Some(t)) => { - if attach_conf.generation != t.generation { - Some(t) - } else { - None + // Special case fast-path for updates to Tenant: if our upsert is only updating configuration, + // then we do not need to set the slot to InProgress, we can just call into the + // existng tenant. + { + let locked = TENANTS.read().unwrap(); + let peek_slot = tenant_map_peek_slot(&locked, &tenant_id)?; + match (&new_location_config.mode, peek_slot) { + (LocationMode::Attached(attach_conf), Some(TenantSlot::Attached(tenant))) => { + if attach_conf.generation == tenant.generation { + // A transition from Attached to Attached in the same generation, we may + // take our fast path and just provide the updated configuration + // to the tenant. + tenant.set_new_location_config(AttachedTenantConf::try_from( + new_location_config, + )?); + + // Persist the new config in the background, to avoid holding up any + // locks while we do so. + // TODO + + return Ok(()); + } else { + // Different generations, fall through to general case + } + } + _ => { + // Not an Attached->Attached transition, fall through to general case } } - _ => None, - }; - - // TODO: currently we risk concurrent operations interfering with the tenant - // while we await shutdown, but we also should not hold the TenantsMap lock - // across the whole operation. Before we start using this function in production, - // a follow-on change will revise how concurrency is handled in TenantsMap. - // (https://github.com/neondatabase/neon/issues/5378) + } - if let Some(tenant) = shutdown_tenant { + // General case for upserts to TenantsMap, excluding the case above: we will substitute an + // InProgress value to the slot while we make whatever changes are required. The state for + // the tenant is inaccessible to the outside world while we are doing this, but that is sensible: + // the state is ill-defined while we're in transition. Transitions are async, but fast: we do + // not do significant I/O, and shutdowns should be prompt via cancellation tokens. + let mut tenant_guard = tenant_map_acquire_slot(&tenant_id, None)?; + + if let Some(TenantSlot::Attached(tenant)) = tenant_guard.take_value() { + // The case where we keep a Tenant alive was covered above in the special case + // for Attached->Attached transitions in the same generation. By this point, + // if we see an attached tenant we know it will be discarded and should be + // shut down. let (_guard, progress) = utils::completion::channel(); match tenant.get_attach_mode() { @@ -774,82 +806,61 @@ pub(crate) async fn upsert_location( barrier.wait().await; } } - existing_tenant = None; } - if let Some(tenant) = existing_tenant { - // Update the existing tenant - Tenant::persist_tenant_config(conf, &tenant_id, &new_location_config) - .await - .map_err(SetNewTenantConfigError::Persist)?; - tenant.set_new_location_config(AttachedTenantConf::try_from(new_location_config)?); - } else { - // Upsert a fresh TenantSlot into TenantsMap. Do it within the map write lock, - // and re-check that the state of anything we are replacing is as expected. - tenant_map_upsert_slot(tenant_id, |old_value| async move { - if let Some(TenantSlot::Attached(t)) = old_value { - if !matches!(t.current_state(), TenantState::Stopping { .. }) { - anyhow::bail!("Tenant state changed during location configuration update"); - } - } + let tenant_path = conf.tenant_path(&tenant_id); + let new_slot = match &new_location_config.mode { + LocationMode::Secondary(_) => { let tenant_path = conf.tenant_path(&tenant_id); + // Directory doesn't need to be fsync'd because if we crash it can + // safely be recreated next time this tenant location is configured. + unsafe_create_dir_all(&tenant_path) + .await + .with_context(|| format!("Creating {tenant_path}"))?; - let new_slot = match &new_location_config.mode { - LocationMode::Secondary(_) => { - // Directory doesn't need to be fsync'd because if we crash it can - // safely be recreated next time this tenant location is configured. - unsafe_create_dir_all(&tenant_path) - .await - .with_context(|| format!("Creating {tenant_path}"))?; + Tenant::persist_tenant_config(conf, &tenant_id, &new_location_config) + .await + .map_err(SetNewTenantConfigError::Persist)?; - Tenant::persist_tenant_config(conf, &tenant_id, &new_location_config) - .await - .map_err(SetNewTenantConfigError::Persist)?; + TenantSlot::Secondary + } + LocationMode::Attached(_attach_config) => { + let timelines_path = conf.timelines_path(&tenant_id); - TenantSlot::Secondary - } - LocationMode::Attached(_attach_config) => { - // FIXME: should avoid doing this disk I/O inside the TenantsMap lock, - // we have the same problem in load_tenant/attach_tenant. Probably - // need a lock in TenantSlot to fix this. - let timelines_path = conf.timelines_path(&tenant_id); - - // Directory doesn't need to be fsync'd because we do not depend on - // it to exist after crashes: it may be recreated when tenant is - // re-attached, see https://github.com/neondatabase/neon/issues/5550 - unsafe_create_dir_all(&timelines_path) - .await - .with_context(|| format!("Creating {timelines_path}"))?; - - Tenant::persist_tenant_config(conf, &tenant_id, &new_location_config) - .await - .map_err(SetNewTenantConfigError::Persist)?; - - let tenant = tenant_spawn( - conf, - tenant_id, - &tenant_path, - TenantSharedResources { - broker_client, - remote_storage, - deletion_queue_client, - }, - AttachedTenantConf::try_from(new_location_config)?, - None, - &TENANTS, - SpawnMode::Normal, - ctx, - )?; - - TenantSlot::Attached(tenant) - } - }; + // Directory doesn't need to be fsync'd because we do not depend on + // it to exist after crashes: it may be recreated when tenant is + // re-attached, see https://github.com/neondatabase/neon/issues/5550 + unsafe_create_dir_all(&timelines_path) + .await + .with_context(|| format!("Creating {timelines_path}"))?; + + Tenant::persist_tenant_config(conf, &tenant_id, &new_location_config) + .await + .map_err(SetNewTenantConfigError::Persist)?; + + let tenant = tenant_spawn( + conf, + tenant_id, + &tenant_path, + TenantSharedResources { + broker_client, + remote_storage, + deletion_queue_client, + }, + AttachedTenantConf::try_from(new_location_config)?, + None, + &TENANTS, + SpawnMode::Normal, + ctx, + )?; + + TenantSlot::Attached(tenant) + } + }; + + tenant_guard.upsert(new_slot)?; - Ok(new_slot) - }) - .await?; - } Ok(()) } @@ -870,11 +881,11 @@ pub(crate) enum GetTenantError { /// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants. /// /// This method is cancel-safe. -pub(crate) async fn get_tenant( +pub(crate) fn get_tenant( tenant_id: TenantId, active_only: bool, ) -> Result, GetTenantError> { - let m = TENANTS.read().await; + let m = TENANTS.read().unwrap(); let tenant = m .get(&tenant_id) .ok_or(GetTenantError::NotFound(tenant_id))?; @@ -900,7 +911,35 @@ pub(crate) async fn delete_tenant( remote_storage: Option, tenant_id: TenantId, ) -> Result<(), DeleteTenantError> { - DeleteTenantFlow::run(conf, remote_storage, &TENANTS, tenant_id).await + // We acquire a SlotGuard during this function to protect against concurrent + // changes while the ::prepare phase of DeleteTenantFlow executes, but then + // have to return the Tenant to the map while the background deletion runs. + // + // TODO: refactor deletion to happen outside the lifetime of a Tenant. + // Currently, deletion requires a reference to the tenants map in order to + // keep the Tenant in the map until deletion is complete, and then remove + // it at the end. + // + // See https://github.com/neondatabase/neon/issues/5080 + + let mut slot_guard = tenant_map_acquire_slot(&tenant_id, Some(true))?; + + // unwrap is safe because we used expect_exist=true when acquiring the slot + let slot = slot_guard.take_value().unwrap(); + + let tenant = match &slot { + TenantSlot::Attached(tenant) => tenant.clone(), + _ => { + // Express "not attached" as equivalent to "not found" + return Err(DeleteTenantError::NotAttached); + } + }; + + let result = DeleteTenantFlow::run(conf, remote_storage, &TENANTS, tenant).await; + + // Replace our InProgress marker with the Tenant in attached state, after the prepare phase of deletion is done + slot_guard.upsert(slot)?; + result } #[derive(Debug, thiserror::Error)] @@ -917,18 +956,20 @@ pub(crate) async fn delete_timeline( timeline_id: TimelineId, _ctx: &RequestContext, ) -> Result<(), DeleteTimelineError> { - let tenant = get_tenant(tenant_id, true).await?; + let tenant = get_tenant(tenant_id, true)?; DeleteTimelineFlow::run(&tenant, timeline_id, false).await?; Ok(()) } #[derive(Debug, thiserror::Error)] pub(crate) enum TenantStateError { - #[error("Tenant {0} not found")] - NotFound(TenantId), #[error("Tenant {0} is stopping")] IsStopping(TenantId), #[error(transparent)] + SlotError(#[from] TenantSlotError), + #[error(transparent)] + SlotUpsertError(#[from] TenantSlotUpsertError), + #[error(transparent)] Other(#[from] anyhow::Error), } @@ -967,7 +1008,7 @@ pub(crate) async fn detach_tenant( async fn detach_tenant0( conf: &'static PageServerConf, - tenants: &tokio::sync::RwLock, + tenants: &std::sync::RwLock, tenant_id: TenantId, detach_ignored: bool, deletion_queue_client: &DeletionQueueClient, @@ -988,7 +1029,12 @@ async fn detach_tenant0( // Ignored tenants are not present in memory and will bail the removal from memory operation. // Before returning the error, check for ignored tenant removal case — we only need to clean its local files then. - if detach_ignored && matches!(removal_result, Err(TenantStateError::NotFound(_))) { + if detach_ignored + && matches!( + removal_result, + Err(TenantStateError::SlotError(TenantSlotError::NotFound(_))) + ) + { let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_id); if tenant_ignore_mark.exists() { info!("Detaching an ignored tenant"); @@ -1011,31 +1057,44 @@ pub(crate) async fn load_tenant( deletion_queue_client: DeletionQueueClient, ctx: &RequestContext, ) -> Result<(), TenantMapInsertError> { - tenant_map_insert(tenant_id, || async { - let tenant_path = conf.tenant_path(&tenant_id); - let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_id); - if tenant_ignore_mark.exists() { - std::fs::remove_file(&tenant_ignore_mark) - .with_context(|| format!("Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"))?; - } + let tenant_guard = tenant_map_acquire_slot(&tenant_id, Some(false))?; + let tenant_path = conf.tenant_path(&tenant_id); - let resources = TenantSharedResources { - broker_client, - remote_storage, - deletion_queue_client - }; + let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_id); + if tenant_ignore_mark.exists() { + std::fs::remove_file(&tenant_ignore_mark).with_context(|| { + format!( + "Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading" + ) + })?; + } - let mut location_conf = Tenant::load_tenant_config(conf, &tenant_id).map_err( TenantMapInsertError::Other)?; - location_conf.attach_in_generation(generation); - Tenant::persist_tenant_config(conf, &tenant_id, &location_conf).await?; + let resources = TenantSharedResources { + broker_client, + remote_storage, + deletion_queue_client, + }; - let new_tenant = tenant_spawn(conf, tenant_id, &tenant_path, resources, AttachedTenantConf::try_from(location_conf)?, None, &TENANTS, SpawnMode::Normal, ctx) - .with_context(|| { - format!("Failed to schedule tenant processing in path {tenant_path:?}") - })?; + let mut location_conf = + Tenant::load_tenant_config(conf, &tenant_id).map_err(TenantMapInsertError::Other)?; + location_conf.attach_in_generation(generation); - Ok(new_tenant) - }).await?; + Tenant::persist_tenant_config(conf, &tenant_id, &location_conf).await?; + + let new_tenant = tenant_spawn( + conf, + tenant_id, + &tenant_path, + resources, + AttachedTenantConf::try_from(location_conf)?, + None, + &TENANTS, + SpawnMode::Normal, + ctx, + ) + .with_context(|| format!("Failed to schedule tenant processing in path {tenant_path:?}"))?; + + tenant_guard.upsert(TenantSlot::Attached(new_tenant))?; Ok(()) } @@ -1048,7 +1107,7 @@ pub(crate) async fn ignore_tenant( async fn ignore_tenant0( conf: &'static PageServerConf, - tenants: &tokio::sync::RwLock, + tenants: &std::sync::RwLock, tenant_id: TenantId, ) -> Result<(), TenantStateError> { remove_tenant_from_memory(tenants, tenant_id, async { @@ -1076,7 +1135,7 @@ pub(crate) enum TenantMapListError { /// Get list of tenants, for the mgmt API /// pub(crate) async fn list_tenants() -> Result, TenantMapListError> { - let tenants = TENANTS.read().await; + let tenants = TENANTS.read().unwrap(); let m = match &*tenants { TenantsMap::Initializing => return Err(TenantMapListError::Initializing), TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m, @@ -1085,6 +1144,7 @@ pub(crate) async fn list_tenants() -> Result, Tenan .filter_map(|(id, tenant)| match tenant { TenantSlot::Attached(tenant) => Some((*id, tenant.current_state())), TenantSlot::Secondary => None, + TenantSlot::InProgress(_) => None, }) .collect()) } @@ -1101,101 +1161,302 @@ pub(crate) async fn attach_tenant( resources: TenantSharedResources, ctx: &RequestContext, ) -> Result<(), TenantMapInsertError> { - tenant_map_insert(tenant_id, || async { - let location_conf = LocationConf::attached_single(tenant_conf, generation); - let tenant_dir = create_tenant_files(conf, &location_conf, &tenant_id).await?; - // TODO: tenant directory remains on disk if we bail out from here on. - // See https://github.com/neondatabase/neon/issues/4233 - - let attached_tenant = tenant_spawn(conf, tenant_id, &tenant_dir, - resources, AttachedTenantConf::try_from(location_conf)?, None, &TENANTS, SpawnMode::Normal, ctx)?; - // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here. - // See https://github.com/neondatabase/neon/issues/4233 - - let attached_tenant_id = attached_tenant.tenant_id(); - anyhow::ensure!( - tenant_id == attached_tenant_id, + let tenant_guard = tenant_map_acquire_slot(&tenant_id, Some(false))?; + let location_conf = LocationConf::attached_single(tenant_conf, generation); + let tenant_dir = create_tenant_files(conf, &location_conf, &tenant_id).await?; + // TODO: tenant directory remains on disk if we bail out from here on. + // See https://github.com/neondatabase/neon/issues/4233 + + let attached_tenant = tenant_spawn( + conf, + tenant_id, + &tenant_dir, + resources, + AttachedTenantConf::try_from(location_conf)?, + None, + &TENANTS, + SpawnMode::Normal, + ctx, + )?; + // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here. + // See https://github.com/neondatabase/neon/issues/4233 + + let attached_tenant_id = attached_tenant.tenant_id(); + if tenant_id != attached_tenant_id { + return Err(TenantMapInsertError::Other(anyhow::anyhow!( "loaded created tenant has unexpected tenant id (expect {tenant_id} != actual {attached_tenant_id})", - ); - Ok(attached_tenant) - }) - .await?; + ))); + } + + tenant_guard.upsert(TenantSlot::Attached(attached_tenant))?; Ok(()) } #[derive(Debug, thiserror::Error)] pub(crate) enum TenantMapInsertError { + #[error(transparent)] + SlotError(#[from] TenantSlotError), + #[error(transparent)] + SlotUpsertError(#[from] TenantSlotUpsertError), + #[error(transparent)] + Other(#[from] anyhow::Error), +} + +/// Superset of TenantMapError: issues that can occur when acquiring a slot +/// for a particular tenant ID. +#[derive(Debug, thiserror::Error)] +pub enum TenantSlotError { + /// When acquiring a slot with the expectation that the tenant already exists. + #[error("Tenant {0} not found")] + NotFound(TenantId), + + /// When acquiring a slot with the expectation that the tenant does not already exist. + #[error("tenant {0} already exists, state: {1:?}")] + AlreadyExists(TenantId, TenantState), + + #[error("tenant {0} already exists in but is not attached")] + Conflict(TenantId), + + // Tried to read a slot that is currently being mutated by another administrative + // operation. + #[error("tenant has a state change in progress, try again later")] + InProgress, + + #[error(transparent)] + MapState(#[from] TenantMapError), +} + +/// Superset of TenantMapError: issues that can occur when using a SlotGuard +/// to insert a new value. +#[derive(Debug, thiserror::Error)] +pub enum TenantSlotUpsertError { + /// An error where the slot is in an unexpected state, indicating a code bug + #[error("Internal error updating Tenant")] + InternalError(Cow<'static, str>), + + #[error(transparent)] + MapState(#[from] TenantMapError), +} + +/// Errors that can happen any time we are walking the tenant map to try and acquire +/// the TenantSlot for a particular tenant. +#[derive(Debug, thiserror::Error)] +pub enum TenantMapError { + // Tried to read while initializing #[error("tenant map is still initializing")] StillInitializing, + + // Tried to read while shutting down #[error("tenant map is shutting down")] ShuttingDown, - #[error("tenant {0} already exists, state: {1:?}")] - TenantAlreadyExists(TenantId, TenantState), - #[error("tenant {0} already exists in secondary state")] - TenantExistsSecondary(TenantId), - #[error(transparent)] - Other(#[from] anyhow::Error), } -/// Give the given closure access to the tenants map entry for the given `tenant_id`, iff that -/// entry is vacant. The closure is responsible for creating the tenant object and inserting -/// it into the tenants map through the vacnt entry that it receives as argument. -/// -/// NB: the closure should return quickly because the current implementation of tenants map -/// serializes access through an `RwLock`. -async fn tenant_map_insert( +/// Guards a particular tenant_id's content in the TenantsMap. While this +/// structure exists, the TenantsMap will contain a [`TenantSlot::InProgress`] +/// for this tenant, which acts as a marker for any operations targeting +/// this tenant to retry later, or wait for the InProgress state to end. +pub struct SlotGuard { tenant_id: TenantId, - insert_fn: F, -) -> Result, TenantMapInsertError> -where - F: FnOnce() -> R, - R: std::future::Future>>, -{ - let mut guard = TENANTS.write().await; - let m = match &mut *guard { - TenantsMap::Initializing => return Err(TenantMapInsertError::StillInitializing), - TenantsMap::ShuttingDown(_) => return Err(TenantMapInsertError::ShuttingDown), - TenantsMap::Open(m) => m, - }; - match m.entry(tenant_id) { - hash_map::Entry::Occupied(e) => match e.get() { - TenantSlot::Attached(t) => Err(TenantMapInsertError::TenantAlreadyExists( - tenant_id, - t.current_state(), - )), - TenantSlot::Secondary => Err(TenantMapInsertError::TenantExistsSecondary(tenant_id)), - }, - hash_map::Entry::Vacant(v) => match insert_fn().await { - Ok(tenant) => { - v.insert(TenantSlot::Attached(tenant.clone())); - Ok(tenant) + old_value: Option, + upserted: bool, + + /// [`TenantSlot::InProgress`] carries the corresponding Barrier: it will + /// release any waiters as soon as this SlotGuard is dropped. + _completion: utils::completion::Completion, +} + +unsafe impl Send for SlotGuard {} +unsafe impl Sync for SlotGuard {} + +impl SlotGuard { + fn new( + tenant_id: TenantId, + old_value: Option, + completion: utils::completion::Completion, + ) -> Self { + Self { + tenant_id, + old_value, + upserted: false, + _completion: completion, + } + } + + /// Take any value that was present in the slot before we acquired ownership + /// of it: in state transitions, this will be the old state. + fn take_value(&mut self) -> Option { + self.old_value.take() + } + + /// Emplace a new value in the slot. This consumes the guard, and after + /// returning, the slot is no longer protected from concurrent changes. + fn upsert(mut self, new_value: TenantSlot) -> Result<(), TenantSlotUpsertError> { + let mut locked = TENANTS.write().unwrap(); + + if let TenantSlot::InProgress(_) = new_value { + // It is never expected to try and upsert InProgress via this path: it should + // only be written via the tenant_map_acquire_slot path. If we hit this it's a bug. + return Err(TenantSlotUpsertError::InternalError( + "Attempt to upsert an InProgress state".into(), + )); + } + + let m = match &mut *locked { + TenantsMap::Initializing => return Err(TenantMapError::StillInitializing.into()), + TenantsMap::ShuttingDown(_) => { + return Err(TenantMapError::ShuttingDown.into()); } - Err(e) => Err(TenantMapInsertError::Other(e)), - }, + TenantsMap::Open(m) => m, + }; + + let replaced = m.insert(self.tenant_id, new_value); + self.upserted = true; + + // Sanity check: on an upsert we should always be replacing an InProgress marker + match replaced { + Some(TenantSlot::InProgress(_)) => { + // Expected case: we find our InProgress in the map: nothing should have + // replaced it because the code that acquires slots will not grant another + // one for the same TenantId. + Ok(()) + } + None => { + error!( + tenant_id = %self.tenant_id, + "Missing InProgress marker during tenant upsert, this is a bug." + ); + Err(TenantSlotUpsertError::InternalError( + "Missing InProgress marker during tenant upsert".into(), + )) + } + Some(slot) => { + error!(tenant_id=%self.tenant_id, "Unexpected contents of TenantSlot during upsert, this is a bug. Contents: {:?}", slot); + Err(TenantSlotUpsertError::InternalError( + "Unexpected contents of TenantSlot".into(), + )) + } + } } } -async fn tenant_map_upsert_slot<'a, F, R>( - tenant_id: TenantId, - upsert_fn: F, -) -> Result<(), TenantMapInsertError> -where - F: FnOnce(Option) -> R, - R: std::future::Future>, -{ - let mut guard = TENANTS.write().await; - let m = match &mut *guard { - TenantsMap::Initializing => return Err(TenantMapInsertError::StillInitializing), - TenantsMap::ShuttingDown(_) => return Err(TenantMapInsertError::ShuttingDown), +impl Drop for SlotGuard { + fn drop(&mut self) { + if !self.upserted { + let mut locked = TENANTS.write().unwrap(); + + let m = match &mut *locked { + TenantsMap::Initializing | TenantsMap::ShuttingDown(_) => { + return; + } + TenantsMap::Open(m) => m, + }; + + let slot = m.remove(&self.tenant_id); + match slot { + Some(slot) => match slot { + TenantSlot::InProgress(_) => { + // Normal case: nothing should have replaced the TenantSlot value + // that was set when we were constructed. + } + _ => { + error!(tenant_id=%self.tenant_id, "Unexpected contents of TenantSlot during upsert, this is a bug. Contents: {:?}", slot); + } + }, + None => { + error!( + tenant_id = %self.tenant_id, + "Missing InProgress marker during SlotGuard drop, this is a bug." + ); + } + } + } + } +} + +fn tenant_map_peek_slot<'a>( + tenants: &'a std::sync::RwLockReadGuard<'a, TenantsMap>, + tenant_id: &TenantId, +) -> Result, TenantMapError> { + let m = match tenants.deref() { + TenantsMap::Initializing => return Err(TenantMapError::StillInitializing), + TenantsMap::ShuttingDown(_) => return Err(TenantMapError::ShuttingDown), TenantsMap::Open(m) => m, }; - match upsert_fn(m.remove(&tenant_id)).await { - Ok(upsert_val) => { - m.insert(tenant_id, upsert_val); - Ok(()) + Ok(m.get(tenant_id)) +} + +fn tenant_map_acquire_slot( + tenant_id: &TenantId, + expect_exist: Option, +) -> Result { + tenant_map_acquire_slot_impl(tenant_id, &TENANTS, expect_exist) +} + +fn tenant_map_acquire_slot_impl( + tenant_id: &TenantId, + tenants: &std::sync::RwLock, + expect_exist: Option, +) -> Result { + let mut locked = tenants.write().unwrap(); + let span = tracing::info_span!("acquire_slot", %tenant_id); + let _guard = span.enter(); + + let m = match &mut *locked { + TenantsMap::Initializing => return Err(TenantMapError::StillInitializing.into()), + TenantsMap::ShuttingDown(_) => return Err(TenantMapError::ShuttingDown.into()), + TenantsMap::Open(m) => m, + }; + + use std::collections::hash_map::Entry; + let entry = m.entry(*tenant_id); + match entry { + Entry::Vacant(v) => { + if let Some(true) = expect_exist { + tracing::debug!("Vacant & expect_exist: return NotFound"); + return Err(TenantSlotError::NotFound(*tenant_id)); + } + + let (completion, barrier) = utils::completion::channel(); + v.insert(TenantSlot::InProgress(barrier)); + tracing::debug!("Vacant, inserted InProgress"); + Ok(SlotGuard::new(*tenant_id, None, completion)) + } + Entry::Occupied(mut o) => { + match (o.get(), expect_exist) { + (TenantSlot::InProgress(_), _) => { + tracing::debug!("Occupied, failing for InProgress"); + return Err(TenantSlotError::InProgress); + } + (slot, Some(false)) => match slot { + TenantSlot::Attached(tenant) => { + tracing::debug!("Attached & !expected_exist, return AlreadyExists"); + return Err(TenantSlotError::AlreadyExists( + *tenant_id, + tenant.current_state(), + )); + } + _ => { + // FIXME: the AlreadyExists error assumes that we have a Tenant + // to get the state from + tracing::debug!("Occupied & !expected_exist, return AlreadyExists"); + return Err(TenantSlotError::AlreadyExists( + *tenant_id, + TenantState::Broken { + reason: "Present but not attached".to_string(), + backtrace: "".to_string(), + }, + )); + } + }, + _ => {} + }; + + let (completion, barrier) = utils::completion::channel(); + let old_value = o.insert(TenantSlot::InProgress(barrier)); + tracing::debug!("Occupied, replaced with InProgress"); + Ok(SlotGuard::new(*tenant_id, Some(old_value), completion)) } - Err(e) => Err(TenantMapInsertError::Other(e)), } } @@ -1204,7 +1465,7 @@ where /// If the cleanup fails, tenant will stay in memory in [`TenantState::Broken`] state, and another removal /// operation would be needed to remove it. async fn remove_tenant_from_memory( - tenants: &tokio::sync::RwLock, + tenants: &std::sync::RwLock, tenant_id: TenantId, tenant_cleanup: F, ) -> Result @@ -1213,20 +1474,14 @@ where { use utils::completion; - // It's important to keep the tenant in memory after the final cleanup, to avoid cleanup races. - // The exclusive lock here ensures we don't miss the tenant state updates before trying another removal. - // tenant-wde cleanup operations may take some time (removing the entire tenant directory), we want to - // avoid holding the lock for the entire process. - let tenant = { - match tenants - .write() - .await - .get_slot(&tenant_id) - .ok_or(TenantStateError::NotFound(tenant_id))? - { - TenantSlot::Attached(t) => Some(t.clone()), - TenantSlot::Secondary => None, - } + let mut tenant_guard = tenant_map_acquire_slot_impl(&tenant_id, tenants, Some(true))?; + let tenant_slot = tenant_guard.take_value(); + + // The SlotGuard allows us to manipulate the Tenant object without fear of some + // concurrent API request doing something else for the same tenant ID. + let attached_tenant = match tenant_slot { + Some(TenantSlot::Attached(t)) => Some(t), + _ => None, }; // allow pageserver shutdown to await for our completion @@ -1234,7 +1489,7 @@ where // If the tenant was attached, shut it down gracefully. For secondary // locations this part is not necessary - match tenant { + match &attached_tenant { Some(attached_tenant) => { // whenever we remove a tenant from memory, we don't want to flush and wait for upload let freeze_and_flush = false; @@ -1259,24 +1514,14 @@ where .await .with_context(|| format!("Failed to run cleanup for tenant {tenant_id}")) { - Ok(hook_value) => { - let mut tenants_accessor = tenants.write().await; - if tenants_accessor.remove(&tenant_id).is_none() { - warn!("Tenant {tenant_id} got removed from memory before operation finished"); - } - Ok(hook_value) - } + Ok(hook_value) => Ok(hook_value), Err(e) => { - let tenants_accessor = tenants.read().await; - match tenants_accessor.get(&tenant_id) { - Some(tenant) => { - tenant.set_broken(e.to_string()).await; - } - None => { - warn!("Tenant {tenant_id} got removed from memory"); - return Err(TenantStateError::NotFound(tenant_id)); - } + // If we had a Tenant, set it to Broken and put it back in the TenantsMap + if let Some(attached_tenant) = attached_tenant { + attached_tenant.set_broken(e.to_string()).await; + tenant_guard.upsert(TenantSlot::Attached(attached_tenant))?; } + Err(TenantStateError::Other(e)) } } @@ -1293,7 +1538,7 @@ pub(crate) async fn immediate_gc( gc_req: TimelineGcRequest, ctx: &RequestContext, ) -> Result>, ApiError> { - let guard = TENANTS.read().await; + let guard = TENANTS.read().unwrap(); let tenant = guard .get(&tenant_id) .map(Arc::clone) @@ -1346,14 +1591,12 @@ mod tests { use super::{super::harness::TenantHarness, TenantsMap}; - #[tokio::test(start_paused = true)] - async fn shutdown_joins_remove_tenant_from_memory() { - // the test is a bit ugly with the lockstep together with spawned tasks. the aim is to make - // sure `shutdown_all_tenants0` per-tenant processing joins in any active - // remove_tenant_from_memory calls, which is enforced by making the operation last until - // we've ran `shutdown_all_tenants0` for a long time. + #[tokio::test] + async fn shutdown_awaits_in_progress_tenant() { + // Test that if an InProgress tenant is in the map during shutdown, the shutdown will gracefully + // wait for it to complete before proceeding. - let (t, _ctx) = TenantHarness::create("shutdown_joins_detach") + let (t, _ctx) = TenantHarness::create("shutdown_awaits_in_progress_tenant") .unwrap() .load() .await; @@ -1366,13 +1609,14 @@ mod tests { let _e = info_span!("testing", tenant_id = %id).entered(); let tenants = HashMap::from([(id, TenantSlot::Attached(t.clone()))]); - let tenants = Arc::new(tokio::sync::RwLock::new(TenantsMap::Open(tenants))); + let tenants = Arc::new(std::sync::RwLock::new(TenantsMap::Open(tenants))); + + // Invoke remove_tenant_from_memory with a cleanup hook that blocks until we manually + // permit it to proceed: that will stick the tenant in InProgress let (until_cleanup_completed, can_complete_cleanup) = utils::completion::channel(); let (until_cleanup_started, cleanup_started) = utils::completion::channel(); - - // start a "detaching operation", which will take a while, until can_complete_cleanup - let cleanup_task = { + let mut remove_tenant_from_memory_task = { let jh = tokio::spawn({ let tenants = tenants.clone(); async move { @@ -1391,12 +1635,6 @@ mod tests { jh }; - let mut cleanup_progress = std::pin::pin!(t - .shutdown(utils::completion::Barrier::default(), false) - .await - .unwrap_err() - .wait()); - let mut shutdown_task = { let (until_shutdown_started, shutdown_started) = utils::completion::channel(); @@ -1409,37 +1647,17 @@ mod tests { shutdown_task }; - // if the joining in is removed from shutdown_all_tenants0, the shutdown_task should always - // get to complete within timeout and fail the test. it is expected to continue awaiting - // until completion or SIGKILL during normal shutdown. - // - // the timeout is long to cover anything that shutdown_task could be doing, but it is - // handled instantly because we use tokio's time pausing in this test. 100s is much more than - // what we get from systemd on shutdown (10s). - let long_time = std::time::Duration::from_secs(100); + let long_time = std::time::Duration::from_secs(15); tokio::select! { - _ = &mut shutdown_task => unreachable!("shutdown must continue, until_cleanup_completed is not dropped"), - _ = &mut cleanup_progress => unreachable!("cleanup progress must continue, until_cleanup_completed is not dropped"), + _ = &mut shutdown_task => unreachable!("shutdown should block on remove_tenant_from_memory completing"), + _ = &mut remove_tenant_from_memory_task => unreachable!("remove_tenant_from_memory_task should not complete until explicitly unblocked"), _ = tokio::time::sleep(long_time) => {}, } - // allow the remove_tenant_from_memory and thus eventually the shutdown to continue drop(until_cleanup_completed); - let (je, ()) = tokio::join!(shutdown_task, cleanup_progress); - je.expect("Tenant::shutdown shutdown not have panicked"); - cleanup_task - .await - .expect("no panicking") - .expect("remove_tenant_from_memory failed"); - - futures::future::poll_immediate( - t.shutdown(utils::completion::Barrier::default(), false) - .await - .unwrap_err() - .wait(), - ) - .await - .expect("the stopping progress must still be complete"); + // Now that we allow it to proceed, shutdown should complete immediately + remove_tenant_from_memory_task.await.unwrap().unwrap(); + shutdown_task.await.unwrap(); } } diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index dc5c71bbe1d2..659bef158078 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -344,20 +344,7 @@ impl Timeline { // Make one of the tenant's timelines draw the short straw and run the calculation. // The others wait until the calculation is done so that they take into account the // imitated accesses that the winner made. - // - // It is critical we are responsive to cancellation here. Otherwise, we deadlock with - // tenant deletion (holds TENANTS in read mode) any other task that attempts to - // acquire TENANTS in write mode before we here call get_tenant. - // See https://github.com/neondatabase/neon/issues/5284. - let res = tokio::select! { - _ = cancel.cancelled() => { - return ControlFlow::Break(()); - } - res = crate::tenant::mgr::get_tenant(self.tenant_id, true) => { - res - } - }; - let tenant = match res { + let tenant = match crate::tenant::mgr::get_tenant(self.tenant_id, true) { Ok(t) => t, Err(_) => { return ControlFlow::Break(()); From 6671b77f75b92143282b1db26174c17d8937af84 Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 27 Oct 2023 16:54:53 +0100 Subject: [PATCH 02/22] pageserver: return 503 for GET on InProgress tenant --- pageserver/src/http/routes.rs | 1 + pageserver/src/page_service.rs | 3 ++ pageserver/src/tenant/mgr.rs | 55 ++++++++++++++++++++++------------ 3 files changed, 40 insertions(+), 19 deletions(-) diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 591e20ae9a99..988b148a1f32 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -222,6 +222,7 @@ impl From for ApiError { // (We can produce this variant only in `mgr::get_tenant(..., active=true)` calls). ApiError::ResourceUnavailable("Tenant not yet active".into()) } + GetTenantError::MapState(e) => ApiError::ResourceUnavailable(format!("{e}").into()), } } } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 4d265a8c6f29..ef564ce710c0 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -1323,6 +1323,9 @@ async fn get_active_tenant_with_timeout( Err(GetTenantError::Broken(_)) => { unreachable!("we're calling get_tenant with active_only=false") } + Err(GetTenantError::MapState(_)) => { + unreachable!("TenantManager is initialized before page service starts") + } }; let wait_time = Duration::from_secs(30); match tokio::time::timeout(wait_time, tenant.wait_to_become_active()).await { diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index d6e851344449..b1bcce871ce1 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -747,7 +747,7 @@ pub(crate) async fn upsert_location( // existng tenant. { let locked = TENANTS.read().unwrap(); - let peek_slot = tenant_map_peek_slot(&locked, &tenant_id)?; + let peek_slot = tenant_map_peek_slot(&locked, &tenant_id, false)?; match (&new_location_config.mode, peek_slot) { (LocationMode::Attached(attach_conf), Some(TenantSlot::Attached(tenant))) => { if attach_conf.generation == tenant.generation { @@ -875,6 +875,10 @@ pub(crate) enum GetTenantError { /// is a stuck error state #[error("Tenant is broken: {0}")] Broken(String), + + // Initializing or shutting down: cannot authoritatively say whether we have this tenant + #[error("Tenant map is not available: {0}")] + MapState(#[from] TenantMapError), } /// Gets the tenant from the in-memory data, erroring if it's absent or is not fitting to the query. @@ -885,24 +889,26 @@ pub(crate) fn get_tenant( tenant_id: TenantId, active_only: bool, ) -> Result, GetTenantError> { - let m = TENANTS.read().unwrap(); - let tenant = m - .get(&tenant_id) - .ok_or(GetTenantError::NotFound(tenant_id))?; - - match tenant.current_state() { - TenantState::Broken { - reason, - backtrace: _, - } if active_only => Err(GetTenantError::Broken(reason)), - TenantState::Active => Ok(Arc::clone(tenant)), - _ => { - if active_only { - Err(GetTenantError::NotActive(tenant_id)) - } else { - Ok(Arc::clone(tenant)) + let locked = TENANTS.read().unwrap(); + let peek_slot = tenant_map_peek_slot(&locked, &tenant_id, true)?; + + match peek_slot { + Some(TenantSlot::Attached(tenant)) => match tenant.current_state() { + TenantState::Broken { + reason, + backtrace: _, + } if active_only => Err(GetTenantError::Broken(reason)), + TenantState::Active => Ok(Arc::clone(tenant)), + _ => { + if active_only { + Err(GetTenantError::NotActive(tenant_id)) + } else { + Ok(Arc::clone(tenant)) + } } - } + }, + Some(TenantSlot::InProgress(_)) => Err(GetTenantError::NotActive(tenant_id)), + None | Some(TenantSlot::Secondary) => Err(GetTenantError::NotFound(tenant_id)), } } @@ -1373,13 +1379,24 @@ impl Drop for SlotGuard { } } +/// `allow_shutdown=true` is appropriate for read-only APIs that should stay working while +/// the pageserver is shutting down. Otherwise, set it to false to refuse attempts to +/// operate on a slot while we are shutting down. +/// fn tenant_map_peek_slot<'a>( tenants: &'a std::sync::RwLockReadGuard<'a, TenantsMap>, tenant_id: &TenantId, + allow_shutdown: bool, ) -> Result, TenantMapError> { let m = match tenants.deref() { TenantsMap::Initializing => return Err(TenantMapError::StillInitializing), - TenantsMap::ShuttingDown(_) => return Err(TenantMapError::ShuttingDown), + TenantsMap::ShuttingDown(m) => { + if allow_shutdown { + return Err(TenantMapError::ShuttingDown); + } else { + m + } + } TenantsMap::Open(m) => m, }; From cc2e0621f407bd80a6ba3c8770dd3b5a0f831d5f Mon Sep 17 00:00:00 2001 From: John Spray Date: Tue, 31 Oct 2023 09:36:33 +0000 Subject: [PATCH 03/22] pageserver: wait with timeout for InProgress slots in page service --- pageserver/src/page_service.rs | 120 +++++++++------------------ pageserver/src/tenant/mgr.rs | 144 +++++++++++++++++++++++++++++++++ 2 files changed, 183 insertions(+), 81 deletions(-) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index ef564ce710c0..5361e7798f89 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -14,7 +14,6 @@ use async_compression::tokio::write::GzipEncoder; use bytes::Buf; use bytes::Bytes; use futures::Stream; -use pageserver_api::models::TenantState; use pageserver_api::models::{ PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse, PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse, @@ -55,16 +54,20 @@ use crate::metrics; use crate::metrics::LIVE_CONNECTIONS_COUNT; use crate::task_mgr; use crate::task_mgr::TaskKind; -use crate::tenant; use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id; use crate::tenant::mgr; -use crate::tenant::mgr::GetTenantError; -use crate::tenant::{Tenant, Timeline}; +use crate::tenant::mgr::get_active_tenant_with_timeout; +use crate::tenant::mgr::GetActiveTenantError; +use crate::tenant::Timeline; use crate::trace::Tracer; use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID; use postgres_ffi::BLCKSZ; +// How long we may block waiting for a [`TenantSlot::InProgress`]` and/or a [`Tenant`] which +// is not yet in state [`TenantState::Active`]. +const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(5000); + /// Read the end of a tar archive. /// /// A tar archive normally ends with two consecutive blocks of zeros, 512 bytes each. @@ -389,7 +392,9 @@ impl PageServerHandler { task_mgr::associate_with(Some(tenant_id), Some(timeline_id)); // Make request tracer if needed - let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?; + let tenant = + mgr::get_active_tenant_with_timeout(tenant_id, ACTIVE_TENANT_TIMEOUT, &self.cancel) + .await?; let mut tracer = if tenant.get_trace_read_requests() { let connection_id = ConnectionId::generate(); let path = tenant @@ -525,7 +530,8 @@ impl PageServerHandler { task_mgr::associate_with(Some(tenant_id), Some(timeline_id)); // Create empty timeline info!("creating new timeline"); - let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?; + let tenant = + get_active_tenant_with_timeout(tenant_id, ACTIVE_TENANT_TIMEOUT, &self.cancel).await?; let timeline = tenant .create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx) .await?; @@ -584,7 +590,9 @@ impl PageServerHandler { debug_assert_current_span_has_tenant_and_timeline_id(); task_mgr::associate_with(Some(tenant_id), Some(timeline_id)); - let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?; + let timeline = self + .get_active_tenant_timeline(tenant_id, timeline_id) + .await?; let last_record_lsn = timeline.get_last_record_lsn(); if last_record_lsn != start_lsn { return Err(QueryError::Other( @@ -792,7 +800,9 @@ impl PageServerHandler { let started = std::time::Instant::now(); // check that the timeline exists - let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?; + let timeline = self + .get_active_tenant_timeline(tenant_id, timeline_id) + .await?; let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); if let Some(lsn) = lsn { // Backup was requested at a particular LSN. Wait for it to arrive. @@ -891,6 +901,21 @@ impl PageServerHandler { .expect("claims presence already checked"); check_permission(claims, tenant_id) } + + /// Shorthand for getting a reference to a Timeline of an Active tenant. + async fn get_active_tenant_timeline( + &self, + tenant_id: TenantId, + timeline_id: TimelineId, + ) -> Result, GetActiveTimelineError> { + let tenant = get_active_tenant_with_timeout(tenant_id, ACTIVE_TENANT_TIMEOUT, &self.cancel) + .await + .map_err(GetActiveTimelineError::Tenant)?; + let timeline = tenant + .get_timeline(timeline_id, true) + .map_err(|e| GetActiveTimelineError::Timeline(anyhow::anyhow!(e)))?; + Ok(timeline) + } } #[async_trait::async_trait] @@ -1048,7 +1073,9 @@ where .record("timeline_id", field::display(timeline_id)); self.check_permission(Some(tenant_id))?; - let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?; + let timeline = self + .get_active_tenant_timeline(tenant_id, timeline_id) + .await?; let end_of_timeline = timeline.get_last_record_rlsn(); @@ -1232,7 +1259,9 @@ where self.check_permission(Some(tenant_id))?; - let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?; + let tenant = + get_active_tenant_with_timeout(tenant_id, ACTIVE_TENANT_TIMEOUT, &self.cancel) + .await?; pgb.write_message_noflush(&BeMessage::RowDescription(&[ RowDescriptor::int8_col(b"checkpoint_distance"), RowDescriptor::int8_col(b"checkpoint_timeout"), @@ -1278,21 +1307,6 @@ where } } -#[derive(thiserror::Error, Debug)] -enum GetActiveTenantError { - #[error( - "Timed out waiting {wait_time:?} for tenant active state. Latest state: {latest_state:?}" - )] - WaitForActiveTimeout { - latest_state: TenantState, - wait_time: Duration, - }, - #[error(transparent)] - NotFound(GetTenantError), - #[error(transparent)] - WaitTenantActive(tenant::WaitToBecomeActiveError), -} - impl From for QueryError { fn from(e: GetActiveTenantError) -> Self { match e { @@ -1305,47 +1319,6 @@ impl From for QueryError { } } -/// Get active tenant. -/// -/// If the tenant is Loading, waits for it to become Active, for up to 30 s. That -/// ensures that queries don't fail immediately after pageserver startup, because -/// all tenants are still loading. -async fn get_active_tenant_with_timeout( - tenant_id: TenantId, - _ctx: &RequestContext, /* require get a context to support cancellation in the future */ -) -> Result, GetActiveTenantError> { - let tenant = match mgr::get_tenant(tenant_id, false) { - Ok(tenant) => tenant, - Err(e @ GetTenantError::NotFound(_)) => return Err(GetActiveTenantError::NotFound(e)), - Err(GetTenantError::NotActive(_)) => { - unreachable!("we're calling get_tenant with active_only=false") - } - Err(GetTenantError::Broken(_)) => { - unreachable!("we're calling get_tenant with active_only=false") - } - Err(GetTenantError::MapState(_)) => { - unreachable!("TenantManager is initialized before page service starts") - } - }; - let wait_time = Duration::from_secs(30); - match tokio::time::timeout(wait_time, tenant.wait_to_become_active()).await { - Ok(Ok(())) => Ok(tenant), - // no .context(), the error message is good enough and some tests depend on it - Ok(Err(e)) => Err(GetActiveTenantError::WaitTenantActive(e)), - Err(_) => { - let latest_state = tenant.current_state(); - if latest_state == TenantState::Active { - Ok(tenant) - } else { - Err(GetActiveTenantError::WaitForActiveTimeout { - latest_state, - wait_time, - }) - } - } - } -} - #[derive(Debug, thiserror::Error)] enum GetActiveTimelineError { #[error(transparent)] @@ -1362,18 +1335,3 @@ impl From for QueryError { } } } - -/// Shorthand for getting a reference to a Timeline of an Active tenant. -async fn get_active_tenant_timeline( - tenant_id: TenantId, - timeline_id: TimelineId, - ctx: &RequestContext, -) -> Result, GetActiveTimelineError> { - let tenant = get_active_tenant_with_timeout(tenant_id, ctx) - .await - .map_err(GetActiveTimelineError::Tenant)?; - let timeline = tenant - .get_timeline(timeline_id, true) - .map_err(|e| GetActiveTimelineError::Timeline(anyhow::anyhow!(e)))?; - Ok(timeline) -} diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index b1bcce871ce1..e7265276d1db 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -7,6 +7,7 @@ use std::borrow::Cow; use std::collections::HashMap; use std::ops::Deref; use std::sync::Arc; +use std::time::{Duration, Instant}; use tokio::fs; use anyhow::Context; @@ -912,6 +913,149 @@ pub(crate) fn get_tenant( } } +#[derive(thiserror::Error, Debug)] +pub(crate) enum GetActiveTenantError { + #[error( + "Timed out waiting {wait_time:?} for tenant active state. Latest state: {latest_state:?}" + )] + WaitForActiveTimeout { + latest_state: TenantState, + wait_time: Duration, + }, + #[error(transparent)] + NotFound(#[from] GetTenantError), + #[error(transparent)] + WaitTenantActive(crate::tenant::WaitToBecomeActiveError), +} + +enum TimeoutCancellableError { + Timeout, + Cancelled, +} + +/// Wrap [`tokio::time::timeout`] with a CancellationToken. +async fn timeout_cancellable( + duration: Duration, + future: F, + cancel: &CancellationToken, +) -> Result +where + F: std::future::Future, +{ + tokio::select!( + r = tokio::time::timeout(duration, future) => { + r.map_err(|_| TimeoutCancellableError::Timeout) + + }, + _ = cancel.cancelled() => { + Err(TimeoutCancellableError::Cancelled) + + } + ) +} + +/// Get a [`Tenant`] in its active state. If the tenant_id is currently in [`TenantSlot::InProgress`] +/// state, then wait for up to `timeout`. If the [`Tenant`] is not currently in [`TenantState::Active`], +/// then wait for up to `timeout` (minus however long we waited for the slot). +pub(crate) async fn get_active_tenant_with_timeout( + tenant_id: TenantId, + mut timeout: Duration, + cancel: &CancellationToken, +) -> Result, GetActiveTenantError> { + enum WaitFor { + Barrier(utils::completion::Barrier), + Tenant(Arc), + } + + let wait_for = { + let locked = TENANTS.read().unwrap(); + let peek_slot = + tenant_map_peek_slot(&locked, &tenant_id, true).map_err(GetTenantError::MapState)?; + match peek_slot { + Some(TenantSlot::Attached(tenant)) => { + match tenant.current_state() { + TenantState::Active => { + // Fast path: we don't need to do any async waiting. + return Ok(tenant.clone()); + } + _ => WaitFor::Tenant(tenant.clone()), + } + } + Some(TenantSlot::Secondary) => { + return Err(GetActiveTenantError::NotFound(GetTenantError::NotActive( + tenant_id, + ))) + } + Some(TenantSlot::InProgress(barrier)) => WaitFor::Barrier(barrier.clone()), + None => { + return Err(GetActiveTenantError::NotFound(GetTenantError::NotFound( + tenant_id, + ))) + } + } + }; + + let tenant = match wait_for { + WaitFor::Barrier(barrier) => { + tracing::debug!("Waiting for tenant InProgress state to pass..."); + let wait_start = Instant::now(); + timeout_cancellable(timeout, barrier.wait(), cancel) + .await + .map_err(|e| match e { + TimeoutCancellableError::Timeout => { + GetActiveTenantError::WaitForActiveTimeout { + latest_state: TenantState::Loading, + wait_time: wait_start.elapsed(), + } + } + TimeoutCancellableError::Cancelled => { + GetActiveTenantError::NotFound(GetTenantError::NotFound(tenant_id)) + } + })?; + let wait_duration = Instant::now().duration_since(wait_start); + timeout -= wait_duration; + { + let locked = TENANTS.read().unwrap(); + let peek_slot = tenant_map_peek_slot(&locked, &tenant_id, true) + .map_err(GetTenantError::MapState)?; + match peek_slot { + Some(TenantSlot::Attached(tenant)) => tenant.clone(), + _ => { + return Err(GetActiveTenantError::NotFound(GetTenantError::NotActive( + tenant_id, + ))) + } + } + } + } + WaitFor::Tenant(tenant) => tenant, + }; + + tracing::debug!("Waiting for tenant to enter active state..."); + match timeout_cancellable(timeout, tenant.wait_to_become_active(), cancel).await { + Ok(Ok(())) => Ok(tenant), + // no .context(), the error message is good enough and some tests depend on it + Ok(Err(e)) => Err(GetActiveTenantError::WaitTenantActive(e)), + Err(TimeoutCancellableError::Timeout) => { + let latest_state = tenant.current_state(); + if latest_state == TenantState::Active { + Ok(tenant) + } else { + Err(GetActiveTenantError::WaitForActiveTimeout { + latest_state, + wait_time: timeout, + }) + } + } + Err(TimeoutCancellableError::Cancelled) => { + Err(GetActiveTenantError::WaitForActiveTimeout { + latest_state: TenantState::Loading, + wait_time: timeout, + }) + } + } +} + pub(crate) async fn delete_tenant( conf: &'static PageServerConf, remote_storage: Option, From a972ba13d554c4a49ef1eecbfe302cdd0db46197 Mon Sep 17 00:00:00 2001 From: John Spray Date: Wed, 1 Nov 2023 14:16:57 +0000 Subject: [PATCH 04/22] tests: tolerate more diverse non-active tenant errors --- test_runner/regress/test_tenant_detach.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test_runner/regress/test_tenant_detach.py b/test_runner/regress/test_tenant_detach.py index 11e8a80e1ddc..df27489ab575 100644 --- a/test_runner/regress/test_tenant_detach.py +++ b/test_runner/regress/test_tenant_detach.py @@ -699,6 +699,8 @@ def test_ignore_while_attaching( env.pageserver.allowed_errors.append( f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*" ) + env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found") + env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} is not active") data_id = 1 data_secret = "very secret secret" From 19467ad90c1ccd8b53bee31c9f2c28dd7dc82d6d Mon Sep 17 00:00:00 2001 From: John Spray Date: Thu, 2 Nov 2023 11:28:43 +0000 Subject: [PATCH 05/22] pageserver: simplify GetActiveTenantError & add Cancelled case --- pageserver/src/page_service.rs | 3 +- pageserver/src/tenant.rs | 45 ++----------- pageserver/src/tenant/mgr.rs | 34 +++++----- test_runner/regress/test_tenant_detach.py | 78 ++++++----------------- 4 files changed, 45 insertions(+), 115 deletions(-) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 5361e7798f89..c2bf18d355ae 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -1313,8 +1313,7 @@ impl From for QueryError { GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected( ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())), ), - GetActiveTenantError::WaitTenantActive(e) => QueryError::Other(anyhow::Error::new(e)), - GetActiveTenantError::NotFound(e) => QueryError::Other(anyhow::Error::new(e)), + e => QueryError::Other(anyhow::anyhow!(e)), } } } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 62da6b618ef1..e9274145dd36 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -54,6 +54,8 @@ use self::config::TenantConf; use self::delete::DeleteTenantFlow; use self::metadata::LoadMetadataError; use self::metadata::TimelineMetadata; +use self::mgr::GetActiveTenantError; +use self::mgr::GetTenantError; use self::mgr::TenantsMap; use self::remote_timeline_client::RemoteTimelineClient; use self::timeline::uninit::TimelineUninitMark; @@ -365,34 +367,6 @@ impl Debug for SetStoppingError { } } -#[derive(Debug, thiserror::Error)] -pub(crate) enum WaitToBecomeActiveError { - WillNotBecomeActive { - tenant_id: TenantId, - state: TenantState, - }, - TenantDropped { - tenant_id: TenantId, - }, -} - -impl std::fmt::Display for WaitToBecomeActiveError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - WaitToBecomeActiveError::WillNotBecomeActive { tenant_id, state } => { - write!( - f, - "Tenant {} will not become active. Current state: {:?}", - tenant_id, state - ) - } - WaitToBecomeActiveError::TenantDropped { tenant_id } => { - write!(f, "Tenant {tenant_id} will not become active (dropped)") - } - } - } -} - #[derive(thiserror::Error, Debug)] pub enum CreateTimelineError { #[error("a timeline with the given ID already exists")] @@ -2028,7 +2002,7 @@ impl Tenant { self.state.subscribe() } - pub(crate) async fn wait_to_become_active(&self) -> Result<(), WaitToBecomeActiveError> { + pub(crate) async fn wait_to_become_active(&self) -> Result<(), GetActiveTenantError> { let mut receiver = self.state.subscribe(); loop { let current_state = receiver.borrow_and_update().clone(); @@ -2036,11 +2010,9 @@ impl Tenant { TenantState::Loading | TenantState::Attaching | TenantState::Activating(_) => { // in these states, there's a chance that we can reach ::Active receiver.changed().await.map_err( - |_e: tokio::sync::watch::error::RecvError| { - WaitToBecomeActiveError::TenantDropped { - tenant_id: self.tenant_id, - } - }, + |_e: tokio::sync::watch::error::RecvError| + // Tenant existed but was dropped: report it as non-existent + GetActiveTenantError::NotFound(GetTenantError::NotFound(self.tenant_id)) )?; } TenantState::Active { .. } => { @@ -2048,10 +2020,7 @@ impl Tenant { } TenantState::Broken { .. } | TenantState::Stopping { .. } => { // There's no chance the tenant can transition back into ::Active - return Err(WaitToBecomeActiveError::WillNotBecomeActive { - tenant_id: self.tenant_id, - state: current_state, - }); + return Err(GetActiveTenantError::WillNotBecomeActive(current_state)); } } } diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index e7265276d1db..be805033debb 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -915,17 +915,27 @@ pub(crate) fn get_tenant( #[derive(thiserror::Error, Debug)] pub(crate) enum GetActiveTenantError { + /// We may time out either while TenantSlot is InProgress, or while the Tenant + /// is in a non-Active state #[error( "Timed out waiting {wait_time:?} for tenant active state. Latest state: {latest_state:?}" )] WaitForActiveTimeout { - latest_state: TenantState, + latest_state: Option, wait_time: Duration, }, + + /// The TenantSlot is absent, or in secondary mode #[error(transparent)] NotFound(#[from] GetTenantError), - #[error(transparent)] - WaitTenantActive(crate::tenant::WaitToBecomeActiveError), + + /// Cancellation token fired while we were waiting + #[error("cancelled")] + Cancelled, + + /// Tenant exists, but is in a state that cannot become active (e.g. Stopping, Broken) + #[error("will not become active. Current state: {0}")] + WillNotBecomeActive(TenantState), } enum TimeoutCancellableError { @@ -1004,13 +1014,11 @@ pub(crate) async fn get_active_tenant_with_timeout( .map_err(|e| match e { TimeoutCancellableError::Timeout => { GetActiveTenantError::WaitForActiveTimeout { - latest_state: TenantState::Loading, + latest_state: None, wait_time: wait_start.elapsed(), } } - TimeoutCancellableError::Cancelled => { - GetActiveTenantError::NotFound(GetTenantError::NotFound(tenant_id)) - } + TimeoutCancellableError::Cancelled => GetActiveTenantError::Cancelled, })?; let wait_duration = Instant::now().duration_since(wait_start); timeout -= wait_duration; @@ -1034,25 +1042,19 @@ pub(crate) async fn get_active_tenant_with_timeout( tracing::debug!("Waiting for tenant to enter active state..."); match timeout_cancellable(timeout, tenant.wait_to_become_active(), cancel).await { Ok(Ok(())) => Ok(tenant), - // no .context(), the error message is good enough and some tests depend on it - Ok(Err(e)) => Err(GetActiveTenantError::WaitTenantActive(e)), + Ok(Err(e)) => Err(e), Err(TimeoutCancellableError::Timeout) => { let latest_state = tenant.current_state(); if latest_state == TenantState::Active { Ok(tenant) } else { Err(GetActiveTenantError::WaitForActiveTimeout { - latest_state, + latest_state: Some(latest_state), wait_time: timeout, }) } } - Err(TimeoutCancellableError::Cancelled) => { - Err(GetActiveTenantError::WaitForActiveTimeout { - latest_state: TenantState::Loading, - wait_time: timeout, - }) - } + Err(TimeoutCancellableError::Cancelled) => Err(GetActiveTenantError::Cancelled), } } diff --git a/test_runner/regress/test_tenant_detach.py b/test_runner/regress/test_tenant_detach.py index df27489ab575..03e78dda2b27 100644 --- a/test_runner/regress/test_tenant_detach.py +++ b/test_runner/regress/test_tenant_detach.py @@ -26,6 +26,16 @@ from fixtures.utils import query_scalar, wait_until from prometheus_client.samples import Sample +# In tests that overlap endpoint activity with tenant attach/detach, there are +# a variety of warnings that the page service may emit when it cannot acquire +# an active tenant to serve a request +PERMIT_PAGE_SERVICE_ERRORS = [ + ".*page_service.*Tenant .* not found", + ".*page_service.*Tenant .* is not active", + ".*page_service.*cancelled", + ".*page_service.*will not become active.*", +] + def do_gc_target( pageserver_http: PageserverHttpClient, tenant_id: TenantId, timeline_id: TimelineId @@ -60,12 +70,7 @@ def test_tenant_reattach( # create new nenant tenant_id, timeline_id = env.neon_cli.create_tenant() - # Attempts to connect from compute to pageserver while the tenant is - # temporarily detached produces these errors in the pageserver log. - env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*") - env.pageserver.allowed_errors.append( - f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*" - ) + env.pageserver.allowed_errors.extend(PERMIT_PAGE_SERVICE_ERRORS) with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint: with endpoint.cursor() as cur: @@ -235,10 +240,7 @@ async def reattach_while_busy( # Attempts to connect from compute to pageserver while the tenant is # temporarily detached produces these errors in the pageserver log. - env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*") - env.pageserver.allowed_errors.append( - f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*" - ) + env.pageserver.allowed_errors.extend(PERMIT_PAGE_SERVICE_ERRORS) endpoint = env.endpoints.create_start("main", tenant_id=tenant_id) @@ -259,7 +261,7 @@ def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder): env = neon_env_builder.init_start() pageserver_http = env.pageserver.http_client() - env.pageserver.allowed_errors.append(".*NotFound: Tenant .*") + env.pageserver.allowed_errors.extend(PERMIT_PAGE_SERVICE_ERRORS) # first check for non existing tenant tenant_id = TenantId.generate() @@ -271,19 +273,9 @@ def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder): assert excinfo.value.status_code == 404 - # the error will be printed to the log too - env.pageserver.allowed_errors.append(".*NotFound: tenant *") - # create new nenant tenant_id, timeline_id = env.neon_cli.create_tenant() - # Attempts to connect from compute to pageserver while the tenant is - # temporarily detached produces these errors in the pageserver log. - env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*") - env.pageserver.allowed_errors.append( - f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*" - ) - # assert tenant exists on disk assert env.pageserver.tenant_dir(tenant_id).exists() @@ -345,12 +337,7 @@ def test_tenant_detach_ignored_tenant(neon_simple_env: NeonEnv): # create a new tenant tenant_id, _ = env.neon_cli.create_tenant() - # Attempts to connect from compute to pageserver while the tenant is - # temporarily detached produces these errors in the pageserver log. - env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*") - env.pageserver.allowed_errors.append( - f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*" - ) + env.pageserver.allowed_errors.extend(PERMIT_PAGE_SERVICE_ERRORS) # assert tenant exists on disk assert env.pageserver.tenant_dir(tenant_id).exists() @@ -401,12 +388,7 @@ def test_tenant_detach_regular_tenant(neon_simple_env: NeonEnv): # create a new tenant tenant_id, _ = env.neon_cli.create_tenant() - # Attempts to connect from compute to pageserver while the tenant is - # temporarily detached produces these errors in the pageserver log. - env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*") - env.pageserver.allowed_errors.append( - f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*" - ) + env.pageserver.allowed_errors.extend(PERMIT_PAGE_SERVICE_ERRORS) # assert tenant exists on disk assert env.pageserver.tenant_dir(tenant_id).exists() @@ -453,12 +435,7 @@ def test_detach_while_attaching( tenant_id = env.initial_tenant timeline_id = env.initial_timeline - # Attempts to connect from compute to pageserver while the tenant is - # temporarily detached produces these errors in the pageserver log. - env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*") - env.pageserver.allowed_errors.append( - f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*" - ) + env.pageserver.allowed_errors.extend(PERMIT_PAGE_SERVICE_ERRORS) # Create table, and insert some rows. Make it big enough that it doesn't fit in # shared_buffers, otherwise the SELECT after restart will just return answer @@ -593,12 +570,7 @@ def test_ignored_tenant_download_missing_layers(neon_env_builder: NeonEnvBuilder tenant_id = env.initial_tenant timeline_id = env.initial_timeline - # Attempts to connect from compute to pageserver while the tenant is - # temporarily detached produces these errors in the pageserver log. - env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*") - env.pageserver.allowed_errors.append( - f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*" - ) + env.pageserver.allowed_errors.extend(PERMIT_PAGE_SERVICE_ERRORS) data_id = 1 data_secret = "very secret secret" @@ -649,12 +621,7 @@ def test_load_attach_negatives(neon_env_builder: NeonEnvBuilder): tenant_id = env.initial_tenant - # Attempts to connect from compute to pageserver while the tenant is - # temporarily detached produces these errors in the pageserver log. - env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*") - env.pageserver.allowed_errors.append( - f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*" - ) + env.pageserver.allowed_errors.extend(PERMIT_PAGE_SERVICE_ERRORS) env.pageserver.allowed_errors.append(".*tenant .*? already exists, state:.*") with pytest.raises( @@ -693,14 +660,7 @@ def test_ignore_while_attaching( tenant_id = env.initial_tenant timeline_id = env.initial_timeline - # Attempts to connect from compute to pageserver while the tenant is - # temporarily detached produces these errors in the pageserver log. - env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*") - env.pageserver.allowed_errors.append( - f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*" - ) - env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found") - env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} is not active") + env.pageserver.allowed_errors.extend(PERMIT_PAGE_SERVICE_ERRORS) data_id = 1 data_secret = "very secret secret" From 44b03523451cb7e4cdd08950be26ce323dd3b2cc Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 3 Nov 2023 09:57:01 +0000 Subject: [PATCH 06/22] Update pageserver/src/tenant/mgr.rs Co-authored-by: Christian Schwarz --- pageserver/src/tenant/mgr.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index be805033debb..4b0f48ce2bdb 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -535,8 +535,7 @@ pub(crate) async fn shutdown_all_tenants() { async fn shutdown_all_tenants0(tenants: &std::sync::RwLock) { use utils::completion; - // Under write lock (prevent any new tenants being created), extract the list - // of tenants to shut down. + // Atomically, 1. extract the list of tenants to shut down and 2. prevent creation of new tenants. let (in_progress_ops, tenants_to_shut_down) = { let mut m = tenants.write().unwrap(); match &mut *m { From 914f2f4b9747d0fa414650dc3b30160046617a58 Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 3 Nov 2023 09:57:17 +0000 Subject: [PATCH 07/22] Update pageserver/src/tenant/mgr.rs Co-authored-by: Christian Schwarz --- pageserver/src/tenant/mgr.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 4b0f48ce2bdb..a2795935afb1 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -577,16 +577,15 @@ async fn shutdown_all_tenants0(tenants: &std::sync::RwLock) { } }; - info!( - "Waiting for {} InProgress tenants to complete...", - in_progress_ops.len() - ); + + info!("Waiting for {} InProgress tenants and {} Attached tenants to shut down", in_progress_ops.len(), tenants_to_shut_down.len()); + for barrier in in_progress_ops { barrier.wait().await; } info!( - "Waiting for {} attached tenants to shut down...", + "InProgress tenants shut down, waiting for Attached tenants to shut down", tenants_to_shut_down.len() ); let started_at = std::time::Instant::now(); From 7c739724c1785e154be2b2b4d658a290a95c7061 Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 3 Nov 2023 10:00:55 +0000 Subject: [PATCH 08/22] Fix a PR suggestion --- pageserver/src/tenant/mgr.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index a2795935afb1..820b46554cab 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -577,15 +577,18 @@ async fn shutdown_all_tenants0(tenants: &std::sync::RwLock) { } }; - - info!("Waiting for {} InProgress tenants and {} Attached tenants to shut down", in_progress_ops.len(), tenants_to_shut_down.len()); + info!( + "Waiting for {} InProgress tenants and {} Attached tenants to shut down", + in_progress_ops.len(), + tenants_to_shut_down.len() + ); for barrier in in_progress_ops { barrier.wait().await; } info!( - "InProgress tenants shut down, waiting for Attached tenants to shut down", + "InProgress tenants shut down, waiting for {} Attached tenants to shut down", tenants_to_shut_down.len() ); let started_at = std::time::Instant::now(); From 8d6b95b44fdccc131cf4368d69fccf5e73e45121 Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 3 Nov 2023 10:08:46 +0000 Subject: [PATCH 09/22] Refactor get_active_tenant_with_timeout to use a deadline --- pageserver/src/tenant/mgr.rs | 40 +++++++++++++++++++++--------------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 820b46554cab..f77e7897f2fb 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -970,7 +970,7 @@ where /// then wait for up to `timeout` (minus however long we waited for the slot). pub(crate) async fn get_active_tenant_with_timeout( tenant_id: TenantId, - mut timeout: Duration, + timeout: Duration, cancel: &CancellationToken, ) -> Result, GetActiveTenantError> { enum WaitFor { @@ -978,6 +978,9 @@ pub(crate) async fn get_active_tenant_with_timeout( Tenant(Arc), } + let wait_start = Instant::now(); + let deadline = wait_start + timeout; + let wait_for = { let locked = TENANTS.read().unwrap(); let peek_slot = @@ -1009,20 +1012,19 @@ pub(crate) async fn get_active_tenant_with_timeout( let tenant = match wait_for { WaitFor::Barrier(barrier) => { tracing::debug!("Waiting for tenant InProgress state to pass..."); - let wait_start = Instant::now(); - timeout_cancellable(timeout, barrier.wait(), cancel) - .await - .map_err(|e| match e { - TimeoutCancellableError::Timeout => { - GetActiveTenantError::WaitForActiveTimeout { - latest_state: None, - wait_time: wait_start.elapsed(), - } - } - TimeoutCancellableError::Cancelled => GetActiveTenantError::Cancelled, - })?; - let wait_duration = Instant::now().duration_since(wait_start); - timeout -= wait_duration; + timeout_cancellable( + deadline.duration_since(Instant::now()), + barrier.wait(), + cancel, + ) + .await + .map_err(|e| match e { + TimeoutCancellableError::Timeout => GetActiveTenantError::WaitForActiveTimeout { + latest_state: None, + wait_time: wait_start.elapsed(), + }, + TimeoutCancellableError::Cancelled => GetActiveTenantError::Cancelled, + })?; { let locked = TENANTS.read().unwrap(); let peek_slot = tenant_map_peek_slot(&locked, &tenant_id, true) @@ -1041,7 +1043,13 @@ pub(crate) async fn get_active_tenant_with_timeout( }; tracing::debug!("Waiting for tenant to enter active state..."); - match timeout_cancellable(timeout, tenant.wait_to_become_active(), cancel).await { + match timeout_cancellable( + deadline.duration_since(Instant::now()), + tenant.wait_to_become_active(), + cancel, + ) + .await + { Ok(Ok(())) => Ok(tenant), Ok(Err(e)) => Err(e), Err(TimeoutCancellableError::Timeout) => { From 810404f968b6f1ba30edeb4dcbbf15455bf084c3 Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 3 Nov 2023 10:19:09 +0000 Subject: [PATCH 10/22] Refactor mode args to tenant slot helpers --- pageserver/src/tenant/mgr.rs | 90 +++++++++++++++++++++--------------- 1 file changed, 53 insertions(+), 37 deletions(-) diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index f77e7897f2fb..9724a78502cd 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -675,7 +675,7 @@ pub(crate) async fn create_tenant( ) -> Result, TenantMapInsertError> { let location_conf = LocationConf::attached_single(tenant_conf, generation); - let tenant_guard = tenant_map_acquire_slot(&tenant_id, Some(false))?; + let tenant_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::MustNotExist)?; let tenant_path = super::create_tenant_files(conf, &location_conf, &tenant_id).await?; let created_tenant = tenant_spawn( @@ -749,7 +749,7 @@ pub(crate) async fn upsert_location( // existng tenant. { let locked = TENANTS.read().unwrap(); - let peek_slot = tenant_map_peek_slot(&locked, &tenant_id, false)?; + let peek_slot = tenant_map_peek_slot(&locked, &tenant_id, TenantSlotPeekMode::Write)?; match (&new_location_config.mode, peek_slot) { (LocationMode::Attached(attach_conf), Some(TenantSlot::Attached(tenant))) => { if attach_conf.generation == tenant.generation { @@ -780,7 +780,7 @@ pub(crate) async fn upsert_location( // the tenant is inaccessible to the outside world while we are doing this, but that is sensible: // the state is ill-defined while we're in transition. Transitions are async, but fast: we do // not do significant I/O, and shutdowns should be prompt via cancellation tokens. - let mut tenant_guard = tenant_map_acquire_slot(&tenant_id, None)?; + let mut tenant_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::Any)?; if let Some(TenantSlot::Attached(tenant)) = tenant_guard.take_value() { // The case where we keep a Tenant alive was covered above in the special case @@ -892,7 +892,7 @@ pub(crate) fn get_tenant( active_only: bool, ) -> Result, GetTenantError> { let locked = TENANTS.read().unwrap(); - let peek_slot = tenant_map_peek_slot(&locked, &tenant_id, true)?; + let peek_slot = tenant_map_peek_slot(&locked, &tenant_id, TenantSlotPeekMode::Read)?; match peek_slot { Some(TenantSlot::Attached(tenant)) => match tenant.current_state() { @@ -983,8 +983,8 @@ pub(crate) async fn get_active_tenant_with_timeout( let wait_for = { let locked = TENANTS.read().unwrap(); - let peek_slot = - tenant_map_peek_slot(&locked, &tenant_id, true).map_err(GetTenantError::MapState)?; + let peek_slot = tenant_map_peek_slot(&locked, &tenant_id, TenantSlotPeekMode::Read) + .map_err(GetTenantError::MapState)?; match peek_slot { Some(TenantSlot::Attached(tenant)) => { match tenant.current_state() { @@ -1027,7 +1027,7 @@ pub(crate) async fn get_active_tenant_with_timeout( })?; { let locked = TENANTS.read().unwrap(); - let peek_slot = tenant_map_peek_slot(&locked, &tenant_id, true) + let peek_slot = tenant_map_peek_slot(&locked, &tenant_id, TenantSlotPeekMode::Read) .map_err(GetTenantError::MapState)?; match peek_slot { Some(TenantSlot::Attached(tenant)) => tenant.clone(), @@ -1083,7 +1083,7 @@ pub(crate) async fn delete_tenant( // // See https://github.com/neondatabase/neon/issues/5080 - let mut slot_guard = tenant_map_acquire_slot(&tenant_id, Some(true))?; + let mut slot_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::MustExist)?; // unwrap is safe because we used expect_exist=true when acquiring the slot let slot = slot_guard.take_value().unwrap(); @@ -1218,7 +1218,7 @@ pub(crate) async fn load_tenant( deletion_queue_client: DeletionQueueClient, ctx: &RequestContext, ) -> Result<(), TenantMapInsertError> { - let tenant_guard = tenant_map_acquire_slot(&tenant_id, Some(false))?; + let tenant_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::MustNotExist)?; let tenant_path = conf.tenant_path(&tenant_id); let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_id); @@ -1322,7 +1322,7 @@ pub(crate) async fn attach_tenant( resources: TenantSharedResources, ctx: &RequestContext, ) -> Result<(), TenantMapInsertError> { - let tenant_guard = tenant_map_acquire_slot(&tenant_id, Some(false))?; + let tenant_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::MustNotExist)?; let location_conf = LocationConf::attached_single(tenant_conf, generation); let tenant_dir = create_tenant_files(conf, &location_conf, &tenant_id).await?; // TODO: tenant directory remains on disk if we bail out from here on. @@ -1534,42 +1534,55 @@ impl Drop for SlotGuard { } } -/// `allow_shutdown=true` is appropriate for read-only APIs that should stay working while -/// the pageserver is shutting down. Otherwise, set it to false to refuse attempts to -/// operate on a slot while we are shutting down. -/// +enum TenantSlotPeekMode { + /// In Read mode, peek will be permitted to see the slots even if the pageserver is shutting down + Read, + /// In Write mode, trying to peek at a slot while the pageserver is shutting down is an error + Write, +} + fn tenant_map_peek_slot<'a>( tenants: &'a std::sync::RwLockReadGuard<'a, TenantsMap>, tenant_id: &TenantId, - allow_shutdown: bool, + mode: TenantSlotPeekMode, ) -> Result, TenantMapError> { let m = match tenants.deref() { TenantsMap::Initializing => return Err(TenantMapError::StillInitializing), - TenantsMap::ShuttingDown(m) => { - if allow_shutdown { + TenantsMap::ShuttingDown(m) => match mode { + TenantSlotPeekMode::Read => m, + TenantSlotPeekMode::Write => { return Err(TenantMapError::ShuttingDown); - } else { - m } - } + }, TenantsMap::Open(m) => m, }; Ok(m.get(tenant_id)) } +enum TenantSlotAcquireMode { + /// Acquire the slot irrespective of current state, or whether it already exists + Any, + /// Return an error if trying to acquire a slot and it doesn't already exist + MustExist, + /// Return an error if trying to acquire a slot and it already exists + MustNotExist, +} + fn tenant_map_acquire_slot( tenant_id: &TenantId, - expect_exist: Option, + mode: TenantSlotAcquireMode, ) -> Result { - tenant_map_acquire_slot_impl(tenant_id, &TENANTS, expect_exist) + tenant_map_acquire_slot_impl(tenant_id, &TENANTS, mode) } fn tenant_map_acquire_slot_impl( tenant_id: &TenantId, tenants: &std::sync::RwLock, - expect_exist: Option, + mode: TenantSlotAcquireMode, ) -> Result { + use TenantSlotAcquireMode::*; + let mut locked = tenants.write().unwrap(); let span = tracing::info_span!("acquire_slot", %tenant_id); let _guard = span.enter(); @@ -1583,26 +1596,28 @@ fn tenant_map_acquire_slot_impl( use std::collections::hash_map::Entry; let entry = m.entry(*tenant_id); match entry { - Entry::Vacant(v) => { - if let Some(true) = expect_exist { - tracing::debug!("Vacant & expect_exist: return NotFound"); + Entry::Vacant(v) => match mode { + MustExist => { + tracing::debug!("Vacant && MustExist: return NotFound"); return Err(TenantSlotError::NotFound(*tenant_id)); } - - let (completion, barrier) = utils::completion::channel(); - v.insert(TenantSlot::InProgress(barrier)); - tracing::debug!("Vacant, inserted InProgress"); - Ok(SlotGuard::new(*tenant_id, None, completion)) - } + _ => { + let (completion, barrier) = utils::completion::channel(); + v.insert(TenantSlot::InProgress(barrier)); + tracing::debug!("Vacant, inserted InProgress"); + Ok(SlotGuard::new(*tenant_id, None, completion)) + } + }, Entry::Occupied(mut o) => { - match (o.get(), expect_exist) { + // Apply mode-driven checks + match (o.get(), mode) { (TenantSlot::InProgress(_), _) => { tracing::debug!("Occupied, failing for InProgress"); return Err(TenantSlotError::InProgress); } - (slot, Some(false)) => match slot { + (slot, MustNotExist) => match slot { TenantSlot::Attached(tenant) => { - tracing::debug!("Attached & !expected_exist, return AlreadyExists"); + tracing::debug!("Attached && MustNotExist, return AlreadyExists"); return Err(TenantSlotError::AlreadyExists( *tenant_id, tenant.current_state(), @@ -1611,7 +1626,7 @@ fn tenant_map_acquire_slot_impl( _ => { // FIXME: the AlreadyExists error assumes that we have a Tenant // to get the state from - tracing::debug!("Occupied & !expected_exist, return AlreadyExists"); + tracing::debug!("Occupied & MustNotExist, return AlreadyExists"); return Err(TenantSlotError::AlreadyExists( *tenant_id, TenantState::Broken { @@ -1646,7 +1661,8 @@ where { use utils::completion; - let mut tenant_guard = tenant_map_acquire_slot_impl(&tenant_id, tenants, Some(true))?; + let mut tenant_guard = + tenant_map_acquire_slot_impl(&tenant_id, tenants, TenantSlotAcquireMode::MustExist)?; let tenant_slot = tenant_guard.take_value(); // The SlotGuard allows us to manipulate the Tenant object without fear of some From ce82103a7c78fae464799180e96ebba2de629273 Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 3 Nov 2023 10:35:37 +0000 Subject: [PATCH 11/22] Pull timeout_cancellable into utils --- libs/utils/src/lib.rs | 3 +++ libs/utils/src/timeout.rs | 37 ++++++++++++++++++++++++++++++++++++ pageserver/src/tenant/mgr.rs | 31 +++--------------------------- 3 files changed, 43 insertions(+), 28 deletions(-) create mode 100644 libs/utils/src/timeout.rs diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index 88cefd516d75..3350bcc05b9b 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -73,6 +73,9 @@ pub mod completion; /// Reporting utilities pub mod error; +/// async timeout helper +pub mod timeout; + pub mod sync; /// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages diff --git a/libs/utils/src/timeout.rs b/libs/utils/src/timeout.rs new file mode 100644 index 000000000000..11fa417242da --- /dev/null +++ b/libs/utils/src/timeout.rs @@ -0,0 +1,37 @@ +use std::time::Duration; + +use tokio_util::sync::CancellationToken; + +pub enum TimeoutCancellableError { + Timeout, + Cancelled, +} + +/// Wrap [`tokio::time::timeout`] with a CancellationToken. +/// +/// This wrapper is appropriate for any long running operation in a task +/// that ought to respect a CancellationToken (which means most tasks). +/// +/// The only time you should use a bare tokio::timeout is when the future `F` +/// itself respects a CancellationToken: otherwise, always use this wrapper +/// with your CancellationToken to ensure that your task does not hold up +/// graceful shutdown. +pub async fn timeout_cancellable( + duration: Duration, + cancel: &CancellationToken, + future: F, +) -> Result +where + F: std::future::Future, +{ + tokio::select!( + r = tokio::time::timeout(duration, future) => { + r.map_err(|_| TimeoutCancellableError::Timeout) + + }, + _ = cancel.cancelled() => { + Err(TimeoutCancellableError::Cancelled) + + } + ) +} diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 9724a78502cd..e12d5a4c4c65 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -9,6 +9,7 @@ use std::ops::Deref; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::fs; +use utils::timeout::{timeout_cancellable, TimeoutCancellableError}; use anyhow::Context; use once_cell::sync::Lazy; @@ -939,32 +940,6 @@ pub(crate) enum GetActiveTenantError { WillNotBecomeActive(TenantState), } -enum TimeoutCancellableError { - Timeout, - Cancelled, -} - -/// Wrap [`tokio::time::timeout`] with a CancellationToken. -async fn timeout_cancellable( - duration: Duration, - future: F, - cancel: &CancellationToken, -) -> Result -where - F: std::future::Future, -{ - tokio::select!( - r = tokio::time::timeout(duration, future) => { - r.map_err(|_| TimeoutCancellableError::Timeout) - - }, - _ = cancel.cancelled() => { - Err(TimeoutCancellableError::Cancelled) - - } - ) -} - /// Get a [`Tenant`] in its active state. If the tenant_id is currently in [`TenantSlot::InProgress`] /// state, then wait for up to `timeout`. If the [`Tenant`] is not currently in [`TenantState::Active`], /// then wait for up to `timeout` (minus however long we waited for the slot). @@ -1014,8 +989,8 @@ pub(crate) async fn get_active_tenant_with_timeout( tracing::debug!("Waiting for tenant InProgress state to pass..."); timeout_cancellable( deadline.duration_since(Instant::now()), - barrier.wait(), cancel, + barrier.wait(), ) .await .map_err(|e| match e { @@ -1045,8 +1020,8 @@ pub(crate) async fn get_active_tenant_with_timeout( tracing::debug!("Waiting for tenant to enter active state..."); match timeout_cancellable( deadline.duration_since(Instant::now()), - tenant.wait_to_become_active(), cancel, + tenant.wait_to_become_active(), ) .await { From 2f405efb63783bbdcee6858e1822d62fad676fda Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 3 Nov 2023 12:13:10 +0000 Subject: [PATCH 12/22] pageserver: add `pageserver_tenant_manager_` metrics --- pageserver/src/metrics.rs | 29 +++++++ pageserver/src/tenant/delete.rs | 16 +++- pageserver/src/tenant/mgr.rs | 76 +++++++++++++------ .../regress/test_pageserver_restart.py | 5 ++ test_runner/regress/test_tenant_delete.py | 8 ++ 5 files changed, 105 insertions(+), 29 deletions(-) diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 3e15be67dc66..429ab801d938 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -962,6 +962,32 @@ static REMOTE_TIMELINE_CLIENT_BYTES_FINISHED_COUNTER: Lazy = Lazy .expect("failed to define a metric") }); +pub(crate) struct TenantManagerMetrics { + pub(crate) tenant_slots: UIntGauge, + pub(crate) tenant_slot_writes: IntCounter, + pub(crate) unexpected_errors: IntCounter, +} + +pub(crate) static TENANT_MANAGER: Lazy = Lazy::new(|| { + TenantManagerMetrics { + tenant_slots: register_uint_gauge!( + "pageserver_tenant_manager_slots", + "How many slots currently exist, including all attached, secondary and in-progress operations", + ) + .expect("failed to define a metric"), + tenant_slot_writes: register_int_counter!( + "pageserver_tenant_manager_slot_writes", + "Writes to a tenant slot, including all of create/attach/detach/delete" + ) + .expect("failed to define a metric"), + unexpected_errors: register_int_counter!( + "pageserver_tenant_manager_unexpected_errors_total", + "Number of unexpected conditions encountered: nonzero value indicates a non-fatal bug." + ) + .expect("failed to define a metric"), +} +}); + pub(crate) struct DeletionQueueMetrics { pub(crate) keys_submitted: IntCounter, pub(crate) keys_dropped: IntCounter, @@ -1884,6 +1910,9 @@ pub fn preinitialize_metrics() { // Deletion queue stats Lazy::force(&DELETION_QUEUE); + // Tenant manager stats + Lazy::force(&TENANT_MANAGER); + // countervecs [&BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT] .into_iter() diff --git a/pageserver/src/tenant/delete.rs b/pageserver/src/tenant/delete.rs index 7344dd1d92dd..066f239ff0b2 100644 --- a/pageserver/src/tenant/delete.rs +++ b/pageserver/src/tenant/delete.rs @@ -537,10 +537,18 @@ impl DeleteTenantFlow { .await .context("cleanup_remaining_fs_traces")?; - let mut locked = tenants.write().unwrap(); - if locked.remove(&tenant.tenant_id).is_none() { - warn!("Tenant got removed from tenants map during deletion"); - }; + { + let mut locked = tenants.write().unwrap(); + if locked.remove(&tenant.tenant_id).is_none() { + warn!("Tenant got removed from tenants map during deletion"); + }; + + // FIXME: we should not be modifying this from outside of mgr.rs. + // This will go away when we simplify deletion (https://github.com/neondatabase/neon/issues/5080) + crate::metrics::TENANT_MANAGER + .tenant_slots + .set(locked.len() as u64); + } *guard = Self::Finished; diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index e12d5a4c4c65..4b1fcccffdb8 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -26,6 +26,7 @@ use crate::control_plane_client::{ ControlPlaneClient, ControlPlaneGenerationsApi, RetryForeverError, }; use crate::deletion_queue::DeletionQueueClient; +use crate::metrics::TENANT_MANAGER as METRICS; use crate::task_mgr::{self, TaskKind}; use crate::tenant::config::{AttachmentMode, LocationConf, LocationMode, TenantConfOpt}; use crate::tenant::delete::DeleteTenantFlow; @@ -111,6 +112,13 @@ impl TenantsMap { TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.remove(tenant_id), } } + + pub(crate) fn len(&self) -> usize { + match self { + TenantsMap::Initializing => 0, + TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.len(), + } + } } /// This is "safe" in that that it won't leave behind a partially deleted directory @@ -458,6 +466,7 @@ pub async fn init_tenant_mgr( let mut tenants_map = TENANTS.write().unwrap(); assert!(matches!(&*tenants_map, &TenantsMap::Initializing)); + METRICS.tenant_slots.set(tenants.len() as u64); *tenants_map = TenantsMap::Open(tenants); Ok(()) } @@ -1427,26 +1436,33 @@ impl SlotGuard { /// Emplace a new value in the slot. This consumes the guard, and after /// returning, the slot is no longer protected from concurrent changes. fn upsert(mut self, new_value: TenantSlot) -> Result<(), TenantSlotUpsertError> { - let mut locked = TENANTS.write().unwrap(); - - if let TenantSlot::InProgress(_) = new_value { - // It is never expected to try and upsert InProgress via this path: it should - // only be written via the tenant_map_acquire_slot path. If we hit this it's a bug. - return Err(TenantSlotUpsertError::InternalError( - "Attempt to upsert an InProgress state".into(), - )); - } + let replaced = { + let mut locked = TENANTS.write().unwrap(); - let m = match &mut *locked { - TenantsMap::Initializing => return Err(TenantMapError::StillInitializing.into()), - TenantsMap::ShuttingDown(_) => { - return Err(TenantMapError::ShuttingDown.into()); + if let TenantSlot::InProgress(_) = new_value { + // It is never expected to try and upsert InProgress via this path: it should + // only be written via the tenant_map_acquire_slot path. If we hit this it's a bug. + return Err(TenantSlotUpsertError::InternalError( + "Attempt to upsert an InProgress state".into(), + )); } - TenantsMap::Open(m) => m, - }; - let replaced = m.insert(self.tenant_id, new_value); - self.upserted = true; + let m = match &mut *locked { + TenantsMap::Initializing => return Err(TenantMapError::StillInitializing.into()), + TenantsMap::ShuttingDown(_) => { + return Err(TenantMapError::ShuttingDown.into()); + } + TenantsMap::Open(m) => m, + }; + + let replaced = m.insert(self.tenant_id, new_value); + self.upserted = true; + + tracing::info!("Upserting slot {}, size {}", self.tenant_id, m.len()); + METRICS.tenant_slots.set(m.len() as u64); + + replaced + }; // Sanity check: on an upsert we should always be replacing an InProgress marker match replaced { @@ -1457,6 +1473,7 @@ impl SlotGuard { Ok(()) } None => { + METRICS.unexpected_errors.inc(); error!( tenant_id = %self.tenant_id, "Missing InProgress marker during tenant upsert, this is a bug." @@ -1466,6 +1483,7 @@ impl SlotGuard { )) } Some(slot) => { + METRICS.unexpected_errors.inc(); error!(tenant_id=%self.tenant_id, "Unexpected contents of TenantSlot during upsert, this is a bug. Contents: {:?}", slot); Err(TenantSlotUpsertError::InternalError( "Unexpected contents of TenantSlot".into(), @@ -1478,16 +1496,21 @@ impl SlotGuard { impl Drop for SlotGuard { fn drop(&mut self) { if !self.upserted { - let mut locked = TENANTS.write().unwrap(); + let slot = { + let mut locked = TENANTS.write().unwrap(); - let m = match &mut *locked { - TenantsMap::Initializing | TenantsMap::ShuttingDown(_) => { - return; - } - TenantsMap::Open(m) => m, - }; + let m = match &mut *locked { + TenantsMap::Initializing | TenantsMap::ShuttingDown(_) => { + return; + } + TenantsMap::Open(m) => m, + }; - let slot = m.remove(&self.tenant_id); + let slot = m.remove(&self.tenant_id); + tracing::info!("Dropping slot {}, size {}", self.tenant_id, m.len()); + METRICS.tenant_slots.set(m.len() as u64); + slot + }; match slot { Some(slot) => match slot { TenantSlot::InProgress(_) => { @@ -1495,10 +1518,12 @@ impl Drop for SlotGuard { // that was set when we were constructed. } _ => { + METRICS.unexpected_errors.inc(); error!(tenant_id=%self.tenant_id, "Unexpected contents of TenantSlot during upsert, this is a bug. Contents: {:?}", slot); } }, None => { + METRICS.unexpected_errors.inc(); error!( tenant_id = %self.tenant_id, "Missing InProgress marker during SlotGuard drop, this is a bug." @@ -1557,6 +1582,7 @@ fn tenant_map_acquire_slot_impl( mode: TenantSlotAcquireMode, ) -> Result { use TenantSlotAcquireMode::*; + METRICS.tenant_slot_writes.inc(); let mut locked = tenants.write().unwrap(); let span = tracing::info_span!("acquire_slot", %tenant_id); diff --git a/test_runner/regress/test_pageserver_restart.py b/test_runner/regress/test_pageserver_restart.py index 4c51155236d1..fa1b13153770 100644 --- a/test_runner/regress/test_pageserver_restart.py +++ b/test_runner/regress/test_pageserver_restart.py @@ -20,6 +20,8 @@ def test_pageserver_restart(neon_env_builder: NeonEnvBuilder, generations: bool) endpoint = env.endpoints.create_start("main") pageserver_http = env.pageserver.http_client() + assert pageserver_http.get_metric_value("pageserver_tenant_manager_slots") == 1 + pg_conn = endpoint.connect() cur = pg_conn.cursor() @@ -52,6 +54,9 @@ def test_pageserver_restart(neon_env_builder: NeonEnvBuilder, generations: bool) env.pageserver.stop() env.pageserver.start() + # We reloaded our tenant + assert pageserver_http.get_metric_value("pageserver_tenant_manager_slots") == 1 + cur.execute("SELECT count(*) FROM foo") assert cur.fetchone() == (100000,) diff --git a/test_runner/regress/test_tenant_delete.py b/test_runner/regress/test_tenant_delete.py index ae771970884a..6e35890922fa 100644 --- a/test_runner/regress/test_tenant_delete.py +++ b/test_runner/regress/test_tenant_delete.py @@ -63,6 +63,9 @@ def test_tenant_delete_smoke( conf=MANY_SMALL_LAYERS_TENANT_CONFIG, ) + # Default tenant and the one we created + assert ps_http.get_metric_value("pageserver_tenant_manager_slots") == 2 + # create two timelines one being the parent of another parent = None for timeline in ["first", "second"]: @@ -88,7 +91,9 @@ def test_tenant_delete_smoke( iterations = poll_for_remote_storage_iterations(remote_storage_kind) + assert ps_http.get_metric_value("pageserver_tenant_manager_slots") == 2 tenant_delete_wait_completed(ps_http, tenant_id, iterations) + assert ps_http.get_metric_value("pageserver_tenant_manager_slots") == 1 tenant_path = env.pageserver.tenant_dir(tenant_id) assert not tenant_path.exists() @@ -104,6 +109,9 @@ def test_tenant_delete_smoke( ), ) + # Deletion updates the tenant count: the one default tenant remains + assert ps_http.get_metric_value("pageserver_tenant_manager_slots") == 1 + class Check(enum.Enum): RETRY_WITHOUT_RESTART = enum.auto() From cfaed2a04c4e3e09ff54052b3c1d0dc2a07d6dca Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 3 Nov 2023 12:13:30 +0000 Subject: [PATCH 13/22] tests: check unexpected error counters at end of tests --- test_runner/fixtures/neon_fixtures.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 81a7b0750d2c..77c523425954 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -626,6 +626,9 @@ def __exit__( sk.stop(immediate=True) for pageserver in self.env.pageservers: + if pageserver.running: + pageserver.assert_no_metric_errors() + pageserver.stop(immediate=True) if self.env.attachment_service is not None: @@ -1784,6 +1787,21 @@ def assert_no_errors(self): assert not errors + def assert_no_metric_errors(self): + """ + Certain metrics should _always_ be zero: they track conditions that indicate a bug. + """ + if not self.running: + log.info(f"Skipping metrics check on pageserver {self.id}, it is not running") + return + + for metric in [ + "pageserver_tenant_manager_unexpected_errors_total", + "pageserver_deletion_queue_unexpected_errors_total", + ]: + value = self.http_client().get_metric_value(metric) + assert value == 0, f"Nonzero {metric} == {value}" + def log_contains(self, pattern: str) -> Optional[str]: """Check that the pageserver log contains a line that matches the given regex""" logfile = open(os.path.join(self.workdir, "pageserver.log"), "r") From bbfef5496eb350bc98a073f7eb582b1726fa69e3 Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 3 Nov 2023 12:16:07 +0000 Subject: [PATCH 14/22] clippy --- pageserver/src/tenant/mgr.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 4b1fcccffdb8..71d5351892c2 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -1600,7 +1600,7 @@ fn tenant_map_acquire_slot_impl( Entry::Vacant(v) => match mode { MustExist => { tracing::debug!("Vacant && MustExist: return NotFound"); - return Err(TenantSlotError::NotFound(*tenant_id)); + Err(TenantSlotError::NotFound(*tenant_id)) } _ => { let (completion, barrier) = utils::completion::channel(); From 1b119d357254b23ebeb7ab1a1dcf1d94583e33ca Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 3 Nov 2023 14:25:37 +0000 Subject: [PATCH 15/22] refactor match in tenant_map_acquire_slot_impl --- pageserver/src/tenant/mgr.rs | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 71d5351892c2..7a0c6efbdd5f 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -1614,36 +1614,37 @@ fn tenant_map_acquire_slot_impl( match (o.get(), mode) { (TenantSlot::InProgress(_), _) => { tracing::debug!("Occupied, failing for InProgress"); - return Err(TenantSlotError::InProgress); + Err(TenantSlotError::InProgress) } (slot, MustNotExist) => match slot { TenantSlot::Attached(tenant) => { tracing::debug!("Attached && MustNotExist, return AlreadyExists"); - return Err(TenantSlotError::AlreadyExists( + Err(TenantSlotError::AlreadyExists( *tenant_id, tenant.current_state(), - )); + )) } _ => { // FIXME: the AlreadyExists error assumes that we have a Tenant // to get the state from tracing::debug!("Occupied & MustNotExist, return AlreadyExists"); - return Err(TenantSlotError::AlreadyExists( + Err(TenantSlotError::AlreadyExists( *tenant_id, TenantState::Broken { reason: "Present but not attached".to_string(), backtrace: "".to_string(), }, - )); + )) } }, - _ => {} - }; - - let (completion, barrier) = utils::completion::channel(); - let old_value = o.insert(TenantSlot::InProgress(barrier)); - tracing::debug!("Occupied, replaced with InProgress"); - Ok(SlotGuard::new(*tenant_id, Some(old_value), completion)) + _ => { + // Happy case: the slot was not in any state that violated our mode + let (completion, barrier) = utils::completion::channel(); + let old_value = o.insert(TenantSlot::InProgress(barrier)); + tracing::debug!("Occupied, replaced with InProgress"); + Ok(SlotGuard::new(*tenant_id, Some(old_value), completion)) + } + } } } } From 768da53e9bcd30ac8d7ab82d504e031b51ebd831 Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 3 Nov 2023 14:33:51 +0000 Subject: [PATCH 16/22] Fix CLI tests for assert_no_log_errors --- test_runner/fixtures/neon_fixtures.py | 3 +-- test_runner/regress/test_neon_cli.py | 7 +++++++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 77c523425954..d226226617b9 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -626,8 +626,7 @@ def __exit__( sk.stop(immediate=True) for pageserver in self.env.pageservers: - if pageserver.running: - pageserver.assert_no_metric_errors() + pageserver.assert_no_metric_errors() pageserver.stop(immediate=True) diff --git a/test_runner/regress/test_neon_cli.py b/test_runner/regress/test_neon_cli.py index 1b3984583a36..de18ea0e6b58 100644 --- a/test_runner/regress/test_neon_cli.py +++ b/test_runner/regress/test_neon_cli.py @@ -134,6 +134,9 @@ def test_cli_start_stop(neon_env_builder: NeonEnvBuilder): env.neon_cli.pageserver_stop(env.pageserver.id) env.neon_cli.safekeeper_stop() + # Keep NeonEnv state up to date, it usually owns starting/stopping services + env.pageserver.running = False + # Default start res = env.neon_cli.raw_cli(["start"]) res.check_returncode() @@ -155,6 +158,10 @@ def test_cli_start_stop_multi(neon_env_builder: NeonEnvBuilder): env.neon_cli.pageserver_stop(env.BASE_PAGESERVER_ID) env.neon_cli.pageserver_stop(env.BASE_PAGESERVER_ID + 1) + # Keep NeonEnv state up to date, it usually owns starting/stopping services + env.pageservers[0].running = False + env.pageservers[1].running = False + # Addressing a nonexistent ID throws with pytest.raises(RuntimeError): env.neon_cli.pageserver_stop(env.BASE_PAGESERVER_ID + 100) From 130eadb18590425df2aa6033a03fb1471d4980a4 Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 3 Nov 2023 14:41:01 +0000 Subject: [PATCH 17/22] Make logic more explicit in SlotGuard::drop --- pageserver/src/tenant/mgr.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 7a0c6efbdd5f..33ad333a43d5 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -1500,7 +1500,14 @@ impl Drop for SlotGuard { let mut locked = TENANTS.write().unwrap(); let m = match &mut *locked { - TenantsMap::Initializing | TenantsMap::ShuttingDown(_) => { + TenantsMap::Initializing => { + // There is no map, this should never happen. + return; + } + TenantsMap::ShuttingDown(_) => { + // When we transition to shutdown, InProgress elements are removed + // from the map, so we do not need to clean up our Inprogress marker. + // See [`shutdown_all_tenants0`] return; } TenantsMap::Open(m) => m, From a7a87b643a711600211024dfb96e84edc799e29c Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 3 Nov 2023 15:30:32 +0000 Subject: [PATCH 18/22] Make SlotGuard stricter: refuse to clear a slot if old value isn't shut down --- pageserver/src/tenant/mgr.rs | 175 ++++++++++++++++++++++++++--------- 1 file changed, 132 insertions(+), 43 deletions(-) diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 33ad333a43d5..7189c775aec5 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -792,7 +792,7 @@ pub(crate) async fn upsert_location( // not do significant I/O, and shutdowns should be prompt via cancellation tokens. let mut tenant_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::Any)?; - if let Some(TenantSlot::Attached(tenant)) = tenant_guard.take_value() { + if let Some(TenantSlot::Attached(tenant)) = tenant_guard.get_old_value() { // The case where we keep a Tenant alive was covered above in the special case // for Attached->Attached transitions in the same generation. By this point, // if we see an attached tenant we know it will be discarded and should be @@ -818,6 +818,7 @@ pub(crate) async fn upsert_location( barrier.wait().await; } } + tenant_guard.drop_old_value().expect("We just shut it down"); } let tenant_path = conf.tenant_path(&tenant_id); @@ -1069,10 +1070,8 @@ pub(crate) async fn delete_tenant( let mut slot_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::MustExist)?; - // unwrap is safe because we used expect_exist=true when acquiring the slot - let slot = slot_guard.take_value().unwrap(); - - let tenant = match &slot { + // unwrap is safe because we used MustExist mode when acquiring + let tenant = match slot_guard.get_old_value().as_ref().unwrap() { TenantSlot::Attached(tenant) => tenant.clone(), _ => { // Express "not attached" as equivalent to "not found" @@ -1082,8 +1081,8 @@ pub(crate) async fn delete_tenant( let result = DeleteTenantFlow::run(conf, remote_storage, &TENANTS, tenant).await; - // Replace our InProgress marker with the Tenant in attached state, after the prepare phase of deletion is done - slot_guard.upsert(slot)?; + // The Tenant goes back into the map in Stopping state, it will eventually be removed by DeleteTenantFLow + slot_guard.revert(); result } @@ -1383,6 +1382,12 @@ pub enum TenantSlotUpsertError { MapState(#[from] TenantMapError), } +#[derive(Debug)] +enum TenantSlotDropError { + /// It is only legal to drop a TenantSlot if its contents are fully shut down + NotShutdown, +} + /// Errors that can happen any time we are walking the tenant map to try and acquire /// the TenantSlot for a particular tenant. #[derive(Debug, thiserror::Error)] @@ -1400,6 +1405,21 @@ pub enum TenantMapError { /// structure exists, the TenantsMap will contain a [`TenantSlot::InProgress`] /// for this tenant, which acts as a marker for any operations targeting /// this tenant to retry later, or wait for the InProgress state to end. +/// +/// This structure enforces the important invariant that we do not have overlapping +/// tasks that will try use local storage for a the same tenant ID: we enforce that +/// the previous contents of a slot have been shut down before the slot can be +/// left empty or used for something else +/// +/// Holders of a SlotGuard should explicitly dispose of it, using either `upsert` +/// to provide a new value, or `revert` to put the slot back into its initial +/// state. If the SlotGuard is dropped without calling either of these, then +/// we will leave the slot empty if our `old_value` is already shut down, else +/// we will replace the slot with `old_value` (equivalent to doing a revert). +/// +/// The `old_value` may be dropped before the SlotGuard is dropped, by calling +/// `drop_old_value`. It is an error to call this without shutting down +/// the conents of `old_value`. pub struct SlotGuard { tenant_id: TenantId, old_value: Option, @@ -1429,13 +1449,21 @@ impl SlotGuard { /// Take any value that was present in the slot before we acquired ownership /// of it: in state transitions, this will be the old state. - fn take_value(&mut self) -> Option { - self.old_value.take() + fn get_old_value(&mut self) -> &Option { + &self.old_value } /// Emplace a new value in the slot. This consumes the guard, and after /// returning, the slot is no longer protected from concurrent changes. fn upsert(mut self, new_value: TenantSlot) -> Result<(), TenantSlotUpsertError> { + if !self.old_value_is_shutdown() { + // This is a bug: callers should never try to drop an old value without + // shutting it down + return Err(TenantSlotUpsertError::InternalError( + "Old TenantSlot value not shut down".into(), + )); + } + let replaced = { let mut locked = TENANTS.write().unwrap(); @@ -1458,7 +1486,6 @@ impl SlotGuard { let replaced = m.insert(self.tenant_id, new_value); self.upserted = true; - tracing::info!("Upserting slot {}, size {}", self.tenant_id, m.len()); METRICS.tenant_slots.set(m.len() as u64); replaced @@ -1491,45 +1518,97 @@ impl SlotGuard { } } } + + /// Replace the InProgress slot with whatever was in the guard when we started + fn revert(mut self) { + if let Some(value) = self.old_value.take() { + match self.upsert(value) { + Err(TenantSlotUpsertError::InternalError(_)) => { + // We already logged the error, nothing else we can do. + } + Err(TenantSlotUpsertError::MapState(_)) => { + // If the map is shutting down, we need not replace anything + } + Ok(()) => {} + } + } + } + + /// We may never drop our old value until it is cleanly shut down: otherwise we might leave + /// rogue background tasks that would write to the local tenant directory that this guard + /// is responsible for protecting + fn old_value_is_shutdown(&self) -> bool { + match self.old_value.as_ref() { + Some(TenantSlot::Attached(tenant)) => { + // TODO: PR #5711 will add a gate that enables properly checking that + // shutdown completed. + matches!( + tenant.current_state(), + TenantState::Stopping { .. } | TenantState::Broken { .. } + ) + } + Some(TenantSlot::Secondary) => { + // TODO: when adding secondary mode tenants, this will check for shutdown + // in the same way that we do for `Tenant` above + true + } + Some(TenantSlot::InProgress(_)) => { + // A SlotGuard cannot be constructed for a slot that was already InProgress + unreachable!() + } + None => true, + } + } + + /// The guard holder is done with the old value of the slot: they are obliged to already + /// shut it down before we reach this point. + fn drop_old_value(&mut self) -> Result<(), TenantSlotDropError> { + if !self.old_value_is_shutdown() { + Err(TenantSlotDropError::NotShutdown) + } else { + self.old_value.take(); + Ok(()) + } + } } impl Drop for SlotGuard { fn drop(&mut self) { if !self.upserted { - let slot = { - let mut locked = TENANTS.write().unwrap(); + // Our old value is already shutdown, or it never existed: it is safe + // for us to fully release the TenantSlot back into an empty state - let m = match &mut *locked { - TenantsMap::Initializing => { - // There is no map, this should never happen. - return; - } - TenantsMap::ShuttingDown(_) => { - // When we transition to shutdown, InProgress elements are removed - // from the map, so we do not need to clean up our Inprogress marker. - // See [`shutdown_all_tenants0`] - return; - } - TenantsMap::Open(m) => m, - }; + let mut locked = TENANTS.write().unwrap(); - let slot = m.remove(&self.tenant_id); - tracing::info!("Dropping slot {}, size {}", self.tenant_id, m.len()); - METRICS.tenant_slots.set(m.len() as u64); - slot + let m = match &mut *locked { + TenantsMap::Initializing => { + // There is no map, this should never happen. + return; + } + TenantsMap::ShuttingDown(_) => { + // When we transition to shutdown, InProgress elements are removed + // from the map, so we do not need to clean up our Inprogress marker. + // See [`shutdown_all_tenants0`] + return; + } + TenantsMap::Open(m) => m, }; - match slot { - Some(slot) => match slot { - TenantSlot::InProgress(_) => { - // Normal case: nothing should have replaced the TenantSlot value - // that was set when we were constructed. - } - _ => { + + use std::collections::hash_map::Entry; + match m.entry(self.tenant_id) { + Entry::Occupied(mut entry) => { + if !matches!(entry.get(), TenantSlot::InProgress(_)) { METRICS.unexpected_errors.inc(); - error!(tenant_id=%self.tenant_id, "Unexpected contents of TenantSlot during upsert, this is a bug. Contents: {:?}", slot); + error!(tenant_id=%self.tenant_id, "Unexpected contents of TenantSlot during drop, this is a bug. Contents: {:?}", entry.get()); } - }, - None => { + + if self.old_value_is_shutdown() { + entry.remove(); + } else { + entry.insert(self.old_value.take().unwrap()); + } + } + Entry::Vacant(_) => { METRICS.unexpected_errors.inc(); error!( tenant_id = %self.tenant_id, @@ -1537,6 +1616,8 @@ impl Drop for SlotGuard { ); } } + + METRICS.tenant_slots.set(m.len() as u64); } } } @@ -1672,11 +1753,10 @@ where let mut tenant_guard = tenant_map_acquire_slot_impl(&tenant_id, tenants, TenantSlotAcquireMode::MustExist)?; - let tenant_slot = tenant_guard.take_value(); // The SlotGuard allows us to manipulate the Tenant object without fear of some // concurrent API request doing something else for the same tenant ID. - let attached_tenant = match tenant_slot { + let attached_tenant = match tenant_guard.get_old_value() { Some(TenantSlot::Attached(t)) => Some(t), _ => None, }; @@ -1698,6 +1778,7 @@ where Err(_other) => { // if pageserver shutdown or other detach/ignore is already ongoing, we don't want to // wait for it but return an error right away because these are distinct requests. + tenant_guard.revert(); return Err(TenantStateError::IsStopping(tenant_id)); } } @@ -1711,13 +1792,21 @@ where .await .with_context(|| format!("Failed to run cleanup for tenant {tenant_id}")) { - Ok(hook_value) => Ok(hook_value), + Ok(hook_value) => { + // Success: drop the old TenantSlot::Attached. + tenant_guard + .drop_old_value() + .expect("We just called shutdown"); + + Ok(hook_value) + } Err(e) => { // If we had a Tenant, set it to Broken and put it back in the TenantsMap if let Some(attached_tenant) = attached_tenant { attached_tenant.set_broken(e.to_string()).await; - tenant_guard.upsert(TenantSlot::Attached(attached_tenant))?; } + // Leave the broken tenant in the map + tenant_guard.revert(); Err(TenantStateError::Other(e)) } From 0cb75d546b3e37ba454d8fd15d0716d8e61cc80c Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 3 Nov 2023 16:06:09 +0000 Subject: [PATCH 19/22] s/tenant_guard/slot_guard/ --- pageserver/src/tenant/mgr.rs | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 7189c775aec5..f95b21adb8f7 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -685,7 +685,7 @@ pub(crate) async fn create_tenant( ) -> Result, TenantMapInsertError> { let location_conf = LocationConf::attached_single(tenant_conf, generation); - let tenant_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::MustNotExist)?; + let slot_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::MustNotExist)?; let tenant_path = super::create_tenant_files(conf, &location_conf, &tenant_id).await?; let created_tenant = tenant_spawn( @@ -709,7 +709,7 @@ pub(crate) async fn create_tenant( ))); } - tenant_guard.upsert(TenantSlot::Attached(created_tenant.clone()))?; + slot_guard.upsert(TenantSlot::Attached(created_tenant.clone()))?; Ok(created_tenant) } @@ -790,9 +790,9 @@ pub(crate) async fn upsert_location( // the tenant is inaccessible to the outside world while we are doing this, but that is sensible: // the state is ill-defined while we're in transition. Transitions are async, but fast: we do // not do significant I/O, and shutdowns should be prompt via cancellation tokens. - let mut tenant_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::Any)?; + let mut slot_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::Any)?; - if let Some(TenantSlot::Attached(tenant)) = tenant_guard.get_old_value() { + if let Some(TenantSlot::Attached(tenant)) = slot_guard.get_old_value() { // The case where we keep a Tenant alive was covered above in the special case // for Attached->Attached transitions in the same generation. By this point, // if we see an attached tenant we know it will be discarded and should be @@ -818,7 +818,7 @@ pub(crate) async fn upsert_location( barrier.wait().await; } } - tenant_guard.drop_old_value().expect("We just shut it down"); + slot_guard.drop_old_value().expect("We just shut it down"); } let tenant_path = conf.tenant_path(&tenant_id); @@ -872,7 +872,7 @@ pub(crate) async fn upsert_location( } }; - tenant_guard.upsert(new_slot)?; + slot_guard.upsert(new_slot)?; Ok(()) } @@ -1201,7 +1201,7 @@ pub(crate) async fn load_tenant( deletion_queue_client: DeletionQueueClient, ctx: &RequestContext, ) -> Result<(), TenantMapInsertError> { - let tenant_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::MustNotExist)?; + let slot_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::MustNotExist)?; let tenant_path = conf.tenant_path(&tenant_id); let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_id); @@ -1238,7 +1238,7 @@ pub(crate) async fn load_tenant( ) .with_context(|| format!("Failed to schedule tenant processing in path {tenant_path:?}"))?; - tenant_guard.upsert(TenantSlot::Attached(new_tenant))?; + slot_guard.upsert(TenantSlot::Attached(new_tenant))?; Ok(()) } @@ -1305,7 +1305,7 @@ pub(crate) async fn attach_tenant( resources: TenantSharedResources, ctx: &RequestContext, ) -> Result<(), TenantMapInsertError> { - let tenant_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::MustNotExist)?; + let slot_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::MustNotExist)?; let location_conf = LocationConf::attached_single(tenant_conf, generation); let tenant_dir = create_tenant_files(conf, &location_conf, &tenant_id).await?; // TODO: tenant directory remains on disk if we bail out from here on. @@ -1332,7 +1332,7 @@ pub(crate) async fn attach_tenant( ))); } - tenant_guard.upsert(TenantSlot::Attached(attached_tenant))?; + slot_guard.upsert(TenantSlot::Attached(attached_tenant))?; Ok(()) } @@ -1751,12 +1751,12 @@ where { use utils::completion; - let mut tenant_guard = + let mut slot_guard = tenant_map_acquire_slot_impl(&tenant_id, tenants, TenantSlotAcquireMode::MustExist)?; // The SlotGuard allows us to manipulate the Tenant object without fear of some // concurrent API request doing something else for the same tenant ID. - let attached_tenant = match tenant_guard.get_old_value() { + let attached_tenant = match slot_guard.get_old_value() { Some(TenantSlot::Attached(t)) => Some(t), _ => None, }; @@ -1778,7 +1778,7 @@ where Err(_other) => { // if pageserver shutdown or other detach/ignore is already ongoing, we don't want to // wait for it but return an error right away because these are distinct requests. - tenant_guard.revert(); + slot_guard.revert(); return Err(TenantStateError::IsStopping(tenant_id)); } } @@ -1794,7 +1794,7 @@ where { Ok(hook_value) => { // Success: drop the old TenantSlot::Attached. - tenant_guard + slot_guard .drop_old_value() .expect("We just called shutdown"); @@ -1806,7 +1806,7 @@ where attached_tenant.set_broken(e.to_string()).await; } // Leave the broken tenant in the map - tenant_guard.revert(); + slot_guard.revert(); Err(TenantStateError::Other(e)) } From cefc99f05c05ac50ae19ea19e8c5a847c4b27675 Mon Sep 17 00:00:00 2001 From: John Spray Date: Mon, 6 Nov 2023 12:44:08 +0000 Subject: [PATCH 20/22] Remove a stray file --- pageserver/src/deletion_queue/check.log | 2 -- 1 file changed, 2 deletions(-) delete mode 100644 pageserver/src/deletion_queue/check.log diff --git a/pageserver/src/deletion_queue/check.log b/pageserver/src/deletion_queue/check.log deleted file mode 100644 index c75c3092b2bd..000000000000 --- a/pageserver/src/deletion_queue/check.log +++ /dev/null @@ -1,2 +0,0 @@ - Checking pageserver v0.1.0 (/home/neon/neon/pageserver) - Finished dev [optimized + debuginfo] target(s) in 7.62s From a758ed61799fe08c23ad17a0ee897e47ff628fd7 Mon Sep 17 00:00:00 2001 From: John Spray Date: Mon, 6 Nov 2023 12:54:45 +0000 Subject: [PATCH 21/22] Refactor a match --- pageserver/src/http/routes.rs | 3 +-- pageserver/src/page_service.rs | 36 +++++++++++++++++++++++----------- 2 files changed, 26 insertions(+), 13 deletions(-) diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 72540f688b98..820b8eae4670 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -161,8 +161,7 @@ impl From for ApiError { NotFound(tenant_id) => { ApiError::NotFound(anyhow::anyhow!("NotFound: tenant {tenant_id}").into()) } - e @ AlreadyExists(_, _) => ApiError::Conflict(format!("{e}")), - e @ Conflict(_) => ApiError::Conflict(format!("{e}")), + e @ (AlreadyExists(_, _) | Conflict(_)) => ApiError::Conflict(format!("{e}")), InProgress => { ApiError::ResourceUnavailable("Tenant is being modified concurrently".into()) } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 4355479335c9..6086d0b063cc 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -381,9 +381,12 @@ impl PageServerHandler { debug_assert_current_span_has_tenant_and_timeline_id(); // Make request tracer if needed - let tenant = - mgr::get_active_tenant_with_timeout(tenant_id, ACTIVE_TENANT_TIMEOUT, &self.cancel) - .await?; + let tenant = mgr::get_active_tenant_with_timeout( + tenant_id, + ACTIVE_TENANT_TIMEOUT, + &task_mgr::shutdown_token(), + ) + .await?; let mut tracer = if tenant.get_trace_read_requests() { let connection_id = ConnectionId::generate(); let path = tenant @@ -534,8 +537,12 @@ impl PageServerHandler { // Create empty timeline info!("creating new timeline"); - let tenant = - get_active_tenant_with_timeout(tenant_id, ACTIVE_TENANT_TIMEOUT, &self.cancel).await?; + let tenant = get_active_tenant_with_timeout( + tenant_id, + ACTIVE_TENANT_TIMEOUT, + &task_mgr::shutdown_token(), + ) + .await?; let timeline = tenant .create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx) .await?; @@ -911,9 +918,13 @@ impl PageServerHandler { tenant_id: TenantId, timeline_id: TimelineId, ) -> Result, GetActiveTimelineError> { - let tenant = get_active_tenant_with_timeout(tenant_id, ACTIVE_TENANT_TIMEOUT, &self.cancel) - .await - .map_err(GetActiveTimelineError::Tenant)?; + let tenant = get_active_tenant_with_timeout( + tenant_id, + ACTIVE_TENANT_TIMEOUT, + &task_mgr::shutdown_token(), + ) + .await + .map_err(GetActiveTimelineError::Tenant)?; let timeline = tenant .get_timeline(timeline_id, true) .map_err(|e| GetActiveTimelineError::Timeline(anyhow::anyhow!(e)))?; @@ -1262,9 +1273,12 @@ where self.check_permission(Some(tenant_id))?; - let tenant = - get_active_tenant_with_timeout(tenant_id, ACTIVE_TENANT_TIMEOUT, &self.cancel) - .await?; + let tenant = get_active_tenant_with_timeout( + tenant_id, + ACTIVE_TENANT_TIMEOUT, + &task_mgr::shutdown_token(), + ) + .await?; pgb.write_message_noflush(&BeMessage::RowDescription(&[ RowDescriptor::int8_col(b"checkpoint_distance"), RowDescriptor::int8_col(b"checkpoint_timeout"), From b09891e524072db395869a7293aad007483fe65a Mon Sep 17 00:00:00 2001 From: John Spray Date: Mon, 6 Nov 2023 12:55:30 +0000 Subject: [PATCH 22/22] De-indent function in a conditional --- pageserver/src/tenant/mgr.rs | 77 ++++++++++++++++++------------------ 1 file changed, 39 insertions(+), 38 deletions(-) diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index f95b21adb8f7..576bcea2cee2 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -1574,51 +1574,52 @@ impl SlotGuard { impl Drop for SlotGuard { fn drop(&mut self) { - if !self.upserted { - // Our old value is already shutdown, or it never existed: it is safe - // for us to fully release the TenantSlot back into an empty state + if self.upserted { + return; + } + // Our old value is already shutdown, or it never existed: it is safe + // for us to fully release the TenantSlot back into an empty state - let mut locked = TENANTS.write().unwrap(); + let mut locked = TENANTS.write().unwrap(); - let m = match &mut *locked { - TenantsMap::Initializing => { - // There is no map, this should never happen. - return; - } - TenantsMap::ShuttingDown(_) => { - // When we transition to shutdown, InProgress elements are removed - // from the map, so we do not need to clean up our Inprogress marker. - // See [`shutdown_all_tenants0`] - return; - } - TenantsMap::Open(m) => m, - }; - - use std::collections::hash_map::Entry; - match m.entry(self.tenant_id) { - Entry::Occupied(mut entry) => { - if !matches!(entry.get(), TenantSlot::InProgress(_)) { - METRICS.unexpected_errors.inc(); - error!(tenant_id=%self.tenant_id, "Unexpected contents of TenantSlot during drop, this is a bug. Contents: {:?}", entry.get()); - } + let m = match &mut *locked { + TenantsMap::Initializing => { + // There is no map, this should never happen. + return; + } + TenantsMap::ShuttingDown(_) => { + // When we transition to shutdown, InProgress elements are removed + // from the map, so we do not need to clean up our Inprogress marker. + // See [`shutdown_all_tenants0`] + return; + } + TenantsMap::Open(m) => m, + }; - if self.old_value_is_shutdown() { - entry.remove(); - } else { - entry.insert(self.old_value.take().unwrap()); - } - } - Entry::Vacant(_) => { + use std::collections::hash_map::Entry; + match m.entry(self.tenant_id) { + Entry::Occupied(mut entry) => { + if !matches!(entry.get(), TenantSlot::InProgress(_)) { METRICS.unexpected_errors.inc(); - error!( - tenant_id = %self.tenant_id, - "Missing InProgress marker during SlotGuard drop, this is a bug." - ); + error!(tenant_id=%self.tenant_id, "Unexpected contents of TenantSlot during drop, this is a bug. Contents: {:?}", entry.get()); } - } - METRICS.tenant_slots.set(m.len() as u64); + if self.old_value_is_shutdown() { + entry.remove(); + } else { + entry.insert(self.old_value.take().unwrap()); + } + } + Entry::Vacant(_) => { + METRICS.unexpected_errors.inc(); + error!( + tenant_id = %self.tenant_id, + "Missing InProgress marker during SlotGuard drop, this is a bug." + ); + } } + + METRICS.tenant_slots.set(m.len() as u64); } }