From 85cd97af61ffc92f0e4c7ad7ee4992af0e98dce2 Mon Sep 17 00:00:00 2001 From: John Spray Date: Mon, 6 Nov 2023 14:03:22 +0000 Subject: [PATCH] pageserver: add `InProgress` tenant map state, use a sync lock for the map (#5367) ## Problem Follows on from #5299 - We didn't have a generic way to protect a tenant undergoing changes: `Tenant` had states, but for our arbitrary transitions between secondary/attached, we need a general way to say "reserve this tenant ID, and don't allow any other ops on it, but don't try and report it as being in any particular state". - The TenantsMap structure was behind an async RwLock, but it was never correct to hold it across await points: that would block any other changes for all tenants. ## Summary of changes - Add the `TenantSlot::InProgress` value. This means: - Incoming administrative operations on the tenant should retry later - Anything trying to read the live state of the tenant (e.g. a page service reader) should retry later or block. - Store TenantsMap in `std::sync::RwLock` - Provide an extended `get_active_tenant_with_timeout` for page_service to use, which will wait on InProgress slots as well as non-active tenants. Closes: https://github.com/neondatabase/neon/issues/5378 --------- Co-authored-by: Christian Schwarz --- libs/utils/src/lib.rs | 3 + libs/utils/src/timeout.rs | 37 + pageserver/src/consumption_metrics.rs | 2 +- pageserver/src/consumption_metrics/metrics.rs | 1 - pageserver/src/disk_usage_eviction_task.rs | 2 +- pageserver/src/http/routes.rs | 73 +- pageserver/src/metrics.rs | 29 + pageserver/src/page_service.rs | 134 +- pageserver/src/tenant.rs | 63 +- pageserver/src/tenant/delete.rs | 52 +- pageserver/src/tenant/mgr.rs | 1215 ++++++++++++----- .../src/tenant/timeline/eviction_task.rs | 15 +- test_runner/fixtures/neon_fixtures.py | 17 + test_runner/regress/test_neon_cli.py | 7 + .../regress/test_pageserver_restart.py | 5 + test_runner/regress/test_tenant_delete.py | 8 + test_runner/regress/test_tenant_detach.py | 76 +- 17 files changed, 1146 insertions(+), 593 deletions(-) create mode 100644 libs/utils/src/timeout.rs diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index 1e3403402318..ff5d774285db 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -77,6 +77,9 @@ pub mod completion; /// Reporting utilities pub mod error; +/// async timeout helper +pub mod timeout; + pub mod sync; /// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages diff --git a/libs/utils/src/timeout.rs b/libs/utils/src/timeout.rs new file mode 100644 index 000000000000..11fa417242da --- /dev/null +++ b/libs/utils/src/timeout.rs @@ -0,0 +1,37 @@ +use std::time::Duration; + +use tokio_util::sync::CancellationToken; + +pub enum TimeoutCancellableError { + Timeout, + Cancelled, +} + +/// Wrap [`tokio::time::timeout`] with a CancellationToken. +/// +/// This wrapper is appropriate for any long running operation in a task +/// that ought to respect a CancellationToken (which means most tasks). +/// +/// The only time you should use a bare tokio::timeout is when the future `F` +/// itself respects a CancellationToken: otherwise, always use this wrapper +/// with your CancellationToken to ensure that your task does not hold up +/// graceful shutdown. +pub async fn timeout_cancellable( + duration: Duration, + cancel: &CancellationToken, + future: F, +) -> Result +where + F: std::future::Future, +{ + tokio::select!( + r = tokio::time::timeout(duration, future) => { + r.map_err(|_| TimeoutCancellableError::Timeout) + + }, + _ = cancel.cancelled() => { + Err(TimeoutCancellableError::Cancelled) + + } + ) +} diff --git a/pageserver/src/consumption_metrics.rs b/pageserver/src/consumption_metrics.rs index 061045eb76b8..9e8377c1f1b0 100644 --- a/pageserver/src/consumption_metrics.rs +++ b/pageserver/src/consumption_metrics.rs @@ -266,7 +266,7 @@ async fn calculate_synthetic_size_worker( continue; } - if let Ok(tenant) = mgr::get_tenant(tenant_id, true).await { + if let Ok(tenant) = mgr::get_tenant(tenant_id, true) { // TODO should we use concurrent_background_tasks_rate_limit() here, like the other background tasks? // We can put in some prioritization for consumption metrics. // Same for the loop that fetches computed metrics. diff --git a/pageserver/src/consumption_metrics/metrics.rs b/pageserver/src/consumption_metrics/metrics.rs index c22f218976fe..2989e15e8eaa 100644 --- a/pageserver/src/consumption_metrics/metrics.rs +++ b/pageserver/src/consumption_metrics/metrics.rs @@ -202,7 +202,6 @@ pub(super) async fn collect_all_metrics( None } else { crate::tenant::mgr::get_tenant(id, true) - .await .ok() .map(|tenant| (id, tenant)) } diff --git a/pageserver/src/disk_usage_eviction_task.rs b/pageserver/src/disk_usage_eviction_task.rs index bc4bf518623a..642cafad285b 100644 --- a/pageserver/src/disk_usage_eviction_task.rs +++ b/pageserver/src/disk_usage_eviction_task.rs @@ -545,7 +545,7 @@ async fn collect_eviction_candidates( if cancel.is_cancelled() { return Ok(EvictionCandidates::Cancelled); } - let tenant = match tenant::mgr::get_tenant(*tenant_id, true).await { + let tenant = match tenant::mgr::get_tenant(*tenant_id, true) { Ok(tenant) => tenant, Err(e) => { // this can happen if tenant has lifecycle transition after we fetched it diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index db728f243ebf..820b8eae4670 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -35,7 +35,8 @@ use crate::pgdatadir_mapping::LsnForTimestamp; use crate::task_mgr::TaskKind; use crate::tenant::config::{LocationConf, TenantConfOpt}; use crate::tenant::mgr::{ - GetTenantError, SetNewTenantConfigError, TenantMapInsertError, TenantStateError, + GetTenantError, SetNewTenantConfigError, TenantMapError, TenantMapInsertError, TenantSlotError, + TenantSlotUpsertError, TenantStateError, }; use crate::tenant::size::ModelInputs; use crate::tenant::storage_layer::LayerAccessStatsReset; @@ -146,16 +147,46 @@ impl From for ApiError { impl From for ApiError { fn from(tmie: TenantMapInsertError) -> ApiError { match tmie { - TenantMapInsertError::StillInitializing | TenantMapInsertError::ShuttingDown => { - ApiError::ResourceUnavailable(format!("{tmie}").into()) + TenantMapInsertError::SlotError(e) => e.into(), + TenantMapInsertError::SlotUpsertError(e) => e.into(), + TenantMapInsertError::Other(e) => ApiError::InternalServerError(e), + } + } +} + +impl From for ApiError { + fn from(e: TenantSlotError) -> ApiError { + use TenantSlotError::*; + match e { + NotFound(tenant_id) => { + ApiError::NotFound(anyhow::anyhow!("NotFound: tenant {tenant_id}").into()) } - TenantMapInsertError::TenantAlreadyExists(id, state) => { - ApiError::Conflict(format!("tenant {id} already exists, state: {state:?}")) + e @ (AlreadyExists(_, _) | Conflict(_)) => ApiError::Conflict(format!("{e}")), + InProgress => { + ApiError::ResourceUnavailable("Tenant is being modified concurrently".into()) } - TenantMapInsertError::TenantExistsSecondary(id) => { - ApiError::Conflict(format!("tenant {id} already exists as secondary")) + MapState(e) => e.into(), + } + } +} + +impl From for ApiError { + fn from(e: TenantSlotUpsertError) -> ApiError { + use TenantSlotUpsertError::*; + match e { + InternalError(e) => ApiError::InternalServerError(anyhow::anyhow!("{e}")), + MapState(e) => e.into(), + } + } +} + +impl From for ApiError { + fn from(e: TenantMapError) -> ApiError { + use TenantMapError::*; + match e { + StillInitializing | ShuttingDown => { + ApiError::ResourceUnavailable(format!("{e}").into()) } - TenantMapInsertError::Other(e) => ApiError::InternalServerError(e), } } } @@ -163,11 +194,12 @@ impl From for ApiError { impl From for ApiError { fn from(tse: TenantStateError) -> ApiError { match tse { - TenantStateError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid).into()), TenantStateError::IsStopping(_) => { ApiError::ResourceUnavailable("Tenant is stopping".into()) } - _ => ApiError::InternalServerError(anyhow::Error::new(tse)), + TenantStateError::SlotError(e) => e.into(), + TenantStateError::SlotUpsertError(e) => e.into(), + TenantStateError::Other(e) => ApiError::InternalServerError(anyhow!(e)), } } } @@ -188,6 +220,7 @@ impl From for ApiError { // (We can produce this variant only in `mgr::get_tenant(..., active=true)` calls). ApiError::ResourceUnavailable("Tenant not yet active".into()) } + GetTenantError::MapState(e) => ApiError::ResourceUnavailable(format!("{e}").into()), } } } @@ -242,6 +275,9 @@ impl From for ApiError { Get(g) => ApiError::from(g), e @ AlreadyInProgress => ApiError::Conflict(e.to_string()), Timeline(t) => ApiError::from(t), + NotAttached => ApiError::NotFound(anyhow::anyhow!("Tenant is not attached").into()), + SlotError(e) => e.into(), + SlotUpsertError(e) => e.into(), Other(o) => ApiError::InternalServerError(o), e @ InvalidState(_) => ApiError::PreconditionFailed(e.to_string().into_boxed_str()), } @@ -368,7 +404,7 @@ async fn timeline_create_handler( let state = get_state(&request); async { - let tenant = mgr::get_tenant(tenant_id, true).await?; + let tenant = mgr::get_tenant(tenant_id, true)?; match tenant.create_timeline( new_timeline_id, request_data.ancestor_timeline_id.map(TimelineId::from), @@ -418,7 +454,7 @@ async fn timeline_list_handler( let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let response_data = async { - let tenant = mgr::get_tenant(tenant_id, true).await?; + let tenant = mgr::get_tenant(tenant_id, true)?; let timelines = tenant.list_timelines(); let mut response_data = Vec::with_capacity(timelines.len()); @@ -457,7 +493,7 @@ async fn timeline_detail_handler( let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let timeline_info = async { - let tenant = mgr::get_tenant(tenant_id, true).await?; + let tenant = mgr::get_tenant(tenant_id, true)?; let timeline = tenant .get_timeline(timeline_id, false) @@ -713,7 +749,7 @@ async fn tenant_status( check_permission(&request, Some(tenant_id))?; let tenant_info = async { - let tenant = mgr::get_tenant(tenant_id, false).await?; + let tenant = mgr::get_tenant(tenant_id, false)?; // Calculate total physical size of all timelines let mut current_physical_size = 0; @@ -776,7 +812,7 @@ async fn tenant_size_handler( let headers = request.headers(); let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); - let tenant = mgr::get_tenant(tenant_id, true).await?; + let tenant = mgr::get_tenant(tenant_id, true)?; // this can be long operation let inputs = tenant @@ -1033,7 +1069,7 @@ async fn get_tenant_config_handler( let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; check_permission(&request, Some(tenant_id))?; - let tenant = mgr::get_tenant(tenant_id, false).await?; + let tenant = mgr::get_tenant(tenant_id, false)?; let response = HashMap::from([ ( @@ -1092,7 +1128,7 @@ async fn put_tenant_location_config_handler( .await { match e { - TenantStateError::NotFound(_) => { + TenantStateError::SlotError(TenantSlotError::NotFound(_)) => { // This API is idempotent: a NotFound on a detach is fine. } _ => return Err(e.into()), @@ -1130,7 +1166,6 @@ async fn handle_tenant_break( let tenant_id: TenantId = parse_request_param(&r, "tenant_id")?; let tenant = crate::tenant::mgr::get_tenant(tenant_id, true) - .await .map_err(|_| ApiError::Conflict(String::from("no active tenant found")))?; tenant.set_broken("broken from test".to_owned()).await; @@ -1435,7 +1470,7 @@ async fn active_timeline_of_active_tenant( tenant_id: TenantId, timeline_id: TimelineId, ) -> Result, ApiError> { - let tenant = mgr::get_tenant(tenant_id, true).await?; + let tenant = mgr::get_tenant(tenant_id, true)?; tenant .get_timeline(timeline_id, true) .map_err(|e| ApiError::NotFound(e.into())) diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 3e15be67dc66..429ab801d938 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -962,6 +962,32 @@ static REMOTE_TIMELINE_CLIENT_BYTES_FINISHED_COUNTER: Lazy = Lazy .expect("failed to define a metric") }); +pub(crate) struct TenantManagerMetrics { + pub(crate) tenant_slots: UIntGauge, + pub(crate) tenant_slot_writes: IntCounter, + pub(crate) unexpected_errors: IntCounter, +} + +pub(crate) static TENANT_MANAGER: Lazy = Lazy::new(|| { + TenantManagerMetrics { + tenant_slots: register_uint_gauge!( + "pageserver_tenant_manager_slots", + "How many slots currently exist, including all attached, secondary and in-progress operations", + ) + .expect("failed to define a metric"), + tenant_slot_writes: register_int_counter!( + "pageserver_tenant_manager_slot_writes", + "Writes to a tenant slot, including all of create/attach/detach/delete" + ) + .expect("failed to define a metric"), + unexpected_errors: register_int_counter!( + "pageserver_tenant_manager_unexpected_errors_total", + "Number of unexpected conditions encountered: nonzero value indicates a non-fatal bug." + ) + .expect("failed to define a metric"), +} +}); + pub(crate) struct DeletionQueueMetrics { pub(crate) keys_submitted: IntCounter, pub(crate) keys_dropped: IntCounter, @@ -1884,6 +1910,9 @@ pub fn preinitialize_metrics() { // Deletion queue stats Lazy::force(&DELETION_QUEUE); + // Tenant manager stats + Lazy::force(&TENANT_MANAGER); + // countervecs [&BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT] .into_iter() diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 334aee3dd1c1..6086d0b063cc 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -14,7 +14,6 @@ use async_compression::tokio::write::GzipEncoder; use bytes::Buf; use bytes::Bytes; use futures::Stream; -use pageserver_api::models::TenantState; use pageserver_api::models::{ PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse, PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse, @@ -55,16 +54,20 @@ use crate::metrics; use crate::metrics::LIVE_CONNECTIONS_COUNT; use crate::task_mgr; use crate::task_mgr::TaskKind; -use crate::tenant; use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id; use crate::tenant::mgr; -use crate::tenant::mgr::GetTenantError; -use crate::tenant::{Tenant, Timeline}; +use crate::tenant::mgr::get_active_tenant_with_timeout; +use crate::tenant::mgr::GetActiveTenantError; +use crate::tenant::Timeline; use crate::trace::Tracer; use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID; use postgres_ffi::BLCKSZ; +// How long we may block waiting for a [`TenantSlot::InProgress`]` and/or a [`Tenant`] which +// is not yet in state [`TenantState::Active`]. +const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(5000); + /// Read the end of a tar archive. /// /// A tar archive normally ends with two consecutive blocks of zeros, 512 bytes each. @@ -378,7 +381,12 @@ impl PageServerHandler { debug_assert_current_span_has_tenant_and_timeline_id(); // Make request tracer if needed - let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?; + let tenant = mgr::get_active_tenant_with_timeout( + tenant_id, + ACTIVE_TENANT_TIMEOUT, + &task_mgr::shutdown_token(), + ) + .await?; let mut tracer = if tenant.get_trace_read_requests() { let connection_id = ConnectionId::generate(); let path = tenant @@ -529,7 +537,12 @@ impl PageServerHandler { // Create empty timeline info!("creating new timeline"); - let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?; + let tenant = get_active_tenant_with_timeout( + tenant_id, + ACTIVE_TENANT_TIMEOUT, + &task_mgr::shutdown_token(), + ) + .await?; let timeline = tenant .create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx) .await?; @@ -587,7 +600,9 @@ impl PageServerHandler { { debug_assert_current_span_has_tenant_and_timeline_id(); - let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?; + let timeline = self + .get_active_tenant_timeline(tenant_id, timeline_id) + .await?; let last_record_lsn = timeline.get_last_record_lsn(); if last_record_lsn != start_lsn { return Err(QueryError::Other( @@ -795,7 +810,9 @@ impl PageServerHandler { let started = std::time::Instant::now(); // check that the timeline exists - let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?; + let timeline = self + .get_active_tenant_timeline(tenant_id, timeline_id) + .await?; let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); if let Some(lsn) = lsn { // Backup was requested at a particular LSN. Wait for it to arrive. @@ -894,6 +911,25 @@ impl PageServerHandler { .expect("claims presence already checked"); check_permission(claims, tenant_id) } + + /// Shorthand for getting a reference to a Timeline of an Active tenant. + async fn get_active_tenant_timeline( + &self, + tenant_id: TenantId, + timeline_id: TimelineId, + ) -> Result, GetActiveTimelineError> { + let tenant = get_active_tenant_with_timeout( + tenant_id, + ACTIVE_TENANT_TIMEOUT, + &task_mgr::shutdown_token(), + ) + .await + .map_err(GetActiveTimelineError::Tenant)?; + let timeline = tenant + .get_timeline(timeline_id, true) + .map_err(|e| GetActiveTimelineError::Timeline(anyhow::anyhow!(e)))?; + Ok(timeline) + } } #[async_trait::async_trait] @@ -1051,7 +1087,9 @@ where .record("timeline_id", field::display(timeline_id)); self.check_permission(Some(tenant_id))?; - let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?; + let timeline = self + .get_active_tenant_timeline(tenant_id, timeline_id) + .await?; let end_of_timeline = timeline.get_last_record_rlsn(); @@ -1235,7 +1273,12 @@ where self.check_permission(Some(tenant_id))?; - let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?; + let tenant = get_active_tenant_with_timeout( + tenant_id, + ACTIVE_TENANT_TIMEOUT, + &task_mgr::shutdown_token(), + ) + .await?; pgb.write_message_noflush(&BeMessage::RowDescription(&[ RowDescriptor::int8_col(b"checkpoint_distance"), RowDescriptor::int8_col(b"checkpoint_timeout"), @@ -1281,67 +1324,13 @@ where } } -#[derive(thiserror::Error, Debug)] -enum GetActiveTenantError { - #[error( - "Timed out waiting {wait_time:?} for tenant active state. Latest state: {latest_state:?}" - )] - WaitForActiveTimeout { - latest_state: TenantState, - wait_time: Duration, - }, - #[error(transparent)] - NotFound(GetTenantError), - #[error(transparent)] - WaitTenantActive(tenant::WaitToBecomeActiveError), -} - impl From for QueryError { fn from(e: GetActiveTenantError) -> Self { match e { GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected( ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())), ), - GetActiveTenantError::WaitTenantActive(e) => QueryError::Other(anyhow::Error::new(e)), - GetActiveTenantError::NotFound(e) => QueryError::Other(anyhow::Error::new(e)), - } - } -} - -/// Get active tenant. -/// -/// If the tenant is Loading, waits for it to become Active, for up to 30 s. That -/// ensures that queries don't fail immediately after pageserver startup, because -/// all tenants are still loading. -async fn get_active_tenant_with_timeout( - tenant_id: TenantId, - _ctx: &RequestContext, /* require get a context to support cancellation in the future */ -) -> Result, GetActiveTenantError> { - let tenant = match mgr::get_tenant(tenant_id, false).await { - Ok(tenant) => tenant, - Err(e @ GetTenantError::NotFound(_)) => return Err(GetActiveTenantError::NotFound(e)), - Err(GetTenantError::NotActive(_)) => { - unreachable!("we're calling get_tenant with active_only=false") - } - Err(GetTenantError::Broken(_)) => { - unreachable!("we're calling get_tenant with active_only=false") - } - }; - let wait_time = Duration::from_secs(30); - match tokio::time::timeout(wait_time, tenant.wait_to_become_active()).await { - Ok(Ok(())) => Ok(tenant), - // no .context(), the error message is good enough and some tests depend on it - Ok(Err(e)) => Err(GetActiveTenantError::WaitTenantActive(e)), - Err(_) => { - let latest_state = tenant.current_state(); - if latest_state == TenantState::Active { - Ok(tenant) - } else { - Err(GetActiveTenantError::WaitForActiveTimeout { - latest_state, - wait_time, - }) - } + e => QueryError::Other(anyhow::anyhow!(e)), } } } @@ -1362,18 +1351,3 @@ impl From for QueryError { } } } - -/// Shorthand for getting a reference to a Timeline of an Active tenant. -async fn get_active_tenant_timeline( - tenant_id: TenantId, - timeline_id: TimelineId, - ctx: &RequestContext, -) -> Result, GetActiveTimelineError> { - let tenant = get_active_tenant_with_timeout(tenant_id, ctx) - .await - .map_err(GetActiveTimelineError::Tenant)?; - let timeline = tenant - .get_timeline(timeline_id, true) - .map_err(|e| GetActiveTimelineError::Timeline(anyhow::anyhow!(e)))?; - Ok(timeline) -} diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index ad538a82fddb..a738633d5ea3 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -55,6 +55,8 @@ use self::config::TenantConf; use self::delete::DeleteTenantFlow; use self::metadata::LoadMetadataError; use self::metadata::TimelineMetadata; +use self::mgr::GetActiveTenantError; +use self::mgr::GetTenantError; use self::mgr::TenantsMap; use self::remote_timeline_client::RemoteTimelineClient; use self::timeline::uninit::TimelineUninitMark; @@ -263,6 +265,12 @@ pub struct Tenant { pub(crate) gate: Gate, } +impl std::fmt::Debug for Tenant { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{} ({})", self.tenant_id, self.current_state()) + } +} + pub(crate) enum WalRedoManager { Prod(PostgresRedoManager), #[cfg(test)] @@ -368,34 +376,6 @@ impl Debug for SetStoppingError { } } -#[derive(Debug, thiserror::Error)] -pub(crate) enum WaitToBecomeActiveError { - WillNotBecomeActive { - tenant_id: TenantId, - state: TenantState, - }, - TenantDropped { - tenant_id: TenantId, - }, -} - -impl std::fmt::Display for WaitToBecomeActiveError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - WaitToBecomeActiveError::WillNotBecomeActive { tenant_id, state } => { - write!( - f, - "Tenant {} will not become active. Current state: {:?}", - tenant_id, state - ) - } - WaitToBecomeActiveError::TenantDropped { tenant_id } => { - write!(f, "Tenant {tenant_id} will not become active (dropped)") - } - } - } -} - #[derive(thiserror::Error, Debug)] pub enum CreateTimelineError { #[error("a timeline with the given ID already exists")] @@ -537,7 +517,7 @@ impl Tenant { resources: TenantSharedResources, attached_conf: AttachedTenantConf, init_order: Option, - tenants: &'static tokio::sync::RwLock, + tenants: &'static std::sync::RwLock, mode: SpawnMode, ctx: &RequestContext, ) -> anyhow::Result> { @@ -1850,6 +1830,7 @@ impl Tenant { } Err(SetStoppingError::AlreadyStopping(other)) => { // give caller the option to wait for this this shutdown + info!("Tenant::shutdown: AlreadyStopping"); return Err(other); } }; @@ -2048,7 +2029,7 @@ impl Tenant { self.state.subscribe() } - pub(crate) async fn wait_to_become_active(&self) -> Result<(), WaitToBecomeActiveError> { + pub(crate) async fn wait_to_become_active(&self) -> Result<(), GetActiveTenantError> { let mut receiver = self.state.subscribe(); loop { let current_state = receiver.borrow_and_update().clone(); @@ -2056,11 +2037,9 @@ impl Tenant { TenantState::Loading | TenantState::Attaching | TenantState::Activating(_) => { // in these states, there's a chance that we can reach ::Active receiver.changed().await.map_err( - |_e: tokio::sync::watch::error::RecvError| { - WaitToBecomeActiveError::TenantDropped { - tenant_id: self.tenant_id, - } - }, + |_e: tokio::sync::watch::error::RecvError| + // Tenant existed but was dropped: report it as non-existent + GetActiveTenantError::NotFound(GetTenantError::NotFound(self.tenant_id)) )?; } TenantState::Active { .. } => { @@ -2068,10 +2047,7 @@ impl Tenant { } TenantState::Broken { .. } | TenantState::Stopping { .. } => { // There's no chance the tenant can transition back into ::Active - return Err(WaitToBecomeActiveError::WillNotBecomeActive { - tenant_id: self.tenant_id, - state: current_state, - }); + return Err(GetActiveTenantError::WillNotBecomeActive(current_state)); } } } @@ -2137,6 +2113,9 @@ where } impl Tenant { + pub fn get_tenant_id(&self) -> TenantId { + self.tenant_id + } pub fn tenant_specific_overrides(&self) -> TenantConfOpt { self.tenant_conf.read().unwrap().tenant_conf } @@ -4266,11 +4245,7 @@ mod tests { metadata_bytes[8] ^= 1; std::fs::write(metadata_path, metadata_bytes)?; - let err = harness - .try_load_local(&ctx) - .await - .err() - .expect("should fail"); + let err = harness.try_load_local(&ctx).await.expect_err("should fail"); // get all the stack with all .context, not only the last one let message = format!("{err:#}"); let expected = "failed to load metadata"; diff --git a/pageserver/src/tenant/delete.rs b/pageserver/src/tenant/delete.rs index 87b48e4beebf..066f239ff0b2 100644 --- a/pageserver/src/tenant/delete.rs +++ b/pageserver/src/tenant/delete.rs @@ -21,7 +21,7 @@ use crate::{ }; use super::{ - mgr::{GetTenantError, TenantsMap}, + mgr::{GetTenantError, TenantSlotError, TenantSlotUpsertError, TenantsMap}, remote_timeline_client::{FAILED_REMOTE_OP_RETRIES, FAILED_UPLOAD_WARN_THRESHOLD}, span, timeline::delete::DeleteTimelineFlow, @@ -33,12 +33,21 @@ pub(crate) enum DeleteTenantError { #[error("GetTenant {0}")] Get(#[from] GetTenantError), + #[error("Tenant not attached")] + NotAttached, + #[error("Invalid state {0}. Expected Active or Broken")] InvalidState(TenantState), #[error("Tenant deletion is already in progress")] AlreadyInProgress, + #[error("Tenant map slot error {0}")] + SlotError(#[from] TenantSlotError), + + #[error("Tenant map slot upsert error {0}")] + SlotUpsertError(#[from] TenantSlotUpsertError), + #[error("Timeline {0}")] Timeline(#[from] DeleteTimelineError), @@ -273,12 +282,12 @@ impl DeleteTenantFlow { pub(crate) async fn run( conf: &'static PageServerConf, remote_storage: Option, - tenants: &'static tokio::sync::RwLock, - tenant_id: TenantId, + tenants: &'static std::sync::RwLock, + tenant: Arc, ) -> Result<(), DeleteTenantError> { span::debug_assert_current_span_has_tenant_id(); - let (tenant, mut guard) = Self::prepare(tenants, tenant_id).await?; + let mut guard = Self::prepare(&tenant).await?; if let Err(e) = Self::run_inner(&mut guard, conf, remote_storage.as_ref(), &tenant).await { tenant.set_broken(format!("{e:#}")).await; @@ -378,7 +387,7 @@ impl DeleteTenantFlow { guard: DeletionGuard, tenant: &Arc, preload: Option, - tenants: &'static tokio::sync::RwLock, + tenants: &'static std::sync::RwLock, init_order: Option, ctx: &RequestContext, ) -> Result<(), DeleteTenantError> { @@ -405,15 +414,8 @@ impl DeleteTenantFlow { } async fn prepare( - tenants: &tokio::sync::RwLock, - tenant_id: TenantId, - ) -> Result<(Arc, tokio::sync::OwnedMutexGuard), DeleteTenantError> { - let m = tenants.read().await; - - let tenant = m - .get(&tenant_id) - .ok_or(GetTenantError::NotFound(tenant_id))?; - + tenant: &Arc, + ) -> Result, DeleteTenantError> { // FIXME: unsure about active only. Our init jobs may not be cancellable properly, // so at least for now allow deletions only for active tenants. TODO recheck // Broken and Stopping is needed for retries. @@ -447,14 +449,14 @@ impl DeleteTenantFlow { ))); } - Ok((Arc::clone(tenant), guard)) + Ok(guard) } fn schedule_background( guard: OwnedMutexGuard, conf: &'static PageServerConf, remote_storage: Option, - tenants: &'static tokio::sync::RwLock, + tenants: &'static std::sync::RwLock, tenant: Arc, ) { let tenant_id = tenant.tenant_id; @@ -487,7 +489,7 @@ impl DeleteTenantFlow { mut guard: OwnedMutexGuard, conf: &PageServerConf, remote_storage: Option, - tenants: &'static tokio::sync::RwLock, + tenants: &'static std::sync::RwLock, tenant: &Arc, ) -> Result<(), DeleteTenantError> { // Tree sort timelines, schedule delete for them. Mention retries from the console side. @@ -535,10 +537,18 @@ impl DeleteTenantFlow { .await .context("cleanup_remaining_fs_traces")?; - let mut locked = tenants.write().await; - if locked.remove(&tenant.tenant_id).is_none() { - warn!("Tenant got removed from tenants map during deletion"); - }; + { + let mut locked = tenants.write().unwrap(); + if locked.remove(&tenant.tenant_id).is_none() { + warn!("Tenant got removed from tenants map during deletion"); + }; + + // FIXME: we should not be modifying this from outside of mgr.rs. + // This will go away when we simplify deletion (https://github.com/neondatabase/neon/issues/5080) + crate::metrics::TENANT_MANAGER + .tenant_slots + .set(locked.len() as u64); + } *guard = Self::Finished; diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 33fdc76f8d65..576bcea2cee2 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -3,13 +3,16 @@ use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf}; use rand::{distributions::Alphanumeric, Rng}; -use std::collections::{hash_map, HashMap}; +use std::borrow::Cow; +use std::collections::HashMap; +use std::ops::Deref; use std::sync::Arc; +use std::time::{Duration, Instant}; use tokio::fs; +use utils::timeout::{timeout_cancellable, TimeoutCancellableError}; use anyhow::Context; use once_cell::sync::Lazy; -use tokio::sync::RwLock; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; use tracing::*; @@ -23,6 +26,7 @@ use crate::control_plane_client::{ ControlPlaneClient, ControlPlaneGenerationsApi, RetryForeverError, }; use crate::deletion_queue::DeletionQueueClient; +use crate::metrics::TENANT_MANAGER as METRICS; use crate::task_mgr::{self, TaskKind}; use crate::tenant::config::{AttachmentMode, LocationConf, LocationMode, TenantConfOpt}; use crate::tenant::delete::DeleteTenantFlow; @@ -47,26 +51,31 @@ use super::TenantSharedResources; /// that way we avoid having to carefully switch a tenant's ingestion etc on and off during /// its lifetime, and we can preserve some important safety invariants like `Tenant` always /// having a properly acquired generation (Secondary doesn't need a generation) -#[derive(Clone)] pub(crate) enum TenantSlot { Attached(Arc), Secondary, + /// In this state, other administrative operations acting on the TenantId should + /// block, or return a retry indicator equivalent to HTTP 503. + InProgress(utils::completion::Barrier), } -impl TenantSlot { - /// Return the `Tenant` in this slot if attached, else None - fn get_attached(&self) -> Option<&Arc> { +impl std::fmt::Debug for TenantSlot { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Self::Attached(t) => Some(t), - Self::Secondary => None, + Self::Attached(tenant) => write!(f, "Attached({})", tenant.current_state()), + Self::Secondary => write!(f, "Secondary"), + Self::InProgress(_) => write!(f, "InProgress"), } } +} - /// Consume self and return the `Tenant` that was in this slot if attached, else None - fn into_attached(self) -> Option> { +impl TenantSlot { + /// Return the `Tenant` in this slot if attached, else None + fn get_attached(&self) -> Option<&Arc> { match self { Self::Attached(t) => Some(t), Self::Secondary => None, + Self::InProgress(_) => None, } } } @@ -77,7 +86,7 @@ pub(crate) enum TenantsMap { /// [`init_tenant_mgr`] is not done yet. Initializing, /// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded. - /// New tenants can be added using [`tenant_map_insert`]. + /// New tenants can be added using [`tenant_map_acquire_slot`]. Open(HashMap), /// The pageserver has entered shutdown mode via [`shutdown_all_tenants`]. /// Existing tenants are still accessible, but no new tenants can be created. @@ -97,19 +106,17 @@ impl TenantsMap { } } - /// Get the contents of the map at this tenant ID, even if it is in secondary state. - pub(crate) fn get_slot(&self, tenant_id: &TenantId) -> Option<&TenantSlot> { + pub(crate) fn remove(&mut self, tenant_id: &TenantId) -> Option { match self { TenantsMap::Initializing => None, - TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.get(tenant_id), + TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.remove(tenant_id), } } - pub(crate) fn remove(&mut self, tenant_id: &TenantId) -> Option> { + + pub(crate) fn len(&self) -> usize { match self { - TenantsMap::Initializing => None, - TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => { - m.remove(tenant_id).and_then(TenantSlot::into_attached) - } + TenantsMap::Initializing => 0, + TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.len(), } } } @@ -147,7 +154,8 @@ async fn safe_rename_tenant_dir(path: impl AsRef) -> std::io::Result> = Lazy::new(|| RwLock::new(TenantsMap::Initializing)); +static TENANTS: Lazy> = + Lazy::new(|| std::sync::RwLock::new(TenantsMap::Initializing)); /// Create a directory, including parents. This does no fsyncs and makes /// no guarantees about the persistence of the resulting metadata: for @@ -456,8 +464,9 @@ pub async fn init_tenant_mgr( info!("Processed {} local tenants at startup", tenants.len()); - let mut tenants_map = TENANTS.write().await; + let mut tenants_map = TENANTS.write().unwrap(); assert!(matches!(&*tenants_map, &TenantsMap::Initializing)); + METRICS.tenant_slots.set(tenants.len() as u64); *tenants_map = TenantsMap::Open(tenants); Ok(()) } @@ -472,7 +481,7 @@ pub(crate) fn tenant_spawn( resources: TenantSharedResources, location_conf: AttachedTenantConf, init_order: Option, - tenants: &'static tokio::sync::RwLock, + tenants: &'static std::sync::RwLock, mode: SpawnMode, ctx: &RequestContext, ) -> anyhow::Result> { @@ -533,12 +542,12 @@ pub(crate) async fn shutdown_all_tenants() { shutdown_all_tenants0(&TENANTS).await } -async fn shutdown_all_tenants0(tenants: &tokio::sync::RwLock) { +async fn shutdown_all_tenants0(tenants: &std::sync::RwLock) { use utils::completion; - // Prevent new tenants from being created. - let tenants_to_shut_down = { - let mut m = tenants.write().await; + // Atomically, 1. extract the list of tenants to shut down and 2. prevent creation of new tenants. + let (in_progress_ops, tenants_to_shut_down) = { + let mut m = tenants.write().unwrap(); match &mut *m { TenantsMap::Initializing => { *m = TenantsMap::ShuttingDown(HashMap::default()); @@ -546,9 +555,28 @@ async fn shutdown_all_tenants0(tenants: &tokio::sync::RwLock) { return; } TenantsMap::Open(tenants) => { - let tenants_clone = tenants.clone(); - *m = TenantsMap::ShuttingDown(std::mem::take(tenants)); - tenants_clone + let mut shutdown_state = HashMap::new(); + let mut in_progress_ops = Vec::new(); + let mut tenants_to_shut_down = Vec::new(); + + for (k, v) in tenants.drain() { + match v { + TenantSlot::Attached(t) => { + tenants_to_shut_down.push(t.clone()); + shutdown_state.insert(k, TenantSlot::Attached(t)); + } + TenantSlot::Secondary => { + shutdown_state.insert(k, TenantSlot::Secondary); + } + TenantSlot::InProgress(notify) => { + // InProgress tenants are not visible in TenantsMap::ShuttingDown: we will + // wait for their notifications to fire in this function. + in_progress_ops.push(notify); + } + } + } + *m = TenantsMap::ShuttingDown(shutdown_state); + (in_progress_ops, tenants_to_shut_down) } TenantsMap::ShuttingDown(_) => { // TODO: it is possible that detach and shutdown happen at the same time. as a @@ -559,25 +587,31 @@ async fn shutdown_all_tenants0(tenants: &tokio::sync::RwLock) { } }; + info!( + "Waiting for {} InProgress tenants and {} Attached tenants to shut down", + in_progress_ops.len(), + tenants_to_shut_down.len() + ); + + for barrier in in_progress_ops { + barrier.wait().await; + } + + info!( + "InProgress tenants shut down, waiting for {} Attached tenants to shut down", + tenants_to_shut_down.len() + ); let started_at = std::time::Instant::now(); let mut join_set = JoinSet::new(); - for (tenant_id, tenant) in tenants_to_shut_down { + for tenant in tenants_to_shut_down { + let tenant_id = tenant.get_tenant_id(); join_set.spawn( async move { let freeze_and_flush = true; let res = { let (_guard, shutdown_progress) = completion::channel(); - match tenant { - TenantSlot::Attached(t) => { - t.shutdown(shutdown_progress, freeze_and_flush).await - } - TenantSlot::Secondary => { - // TODO: once secondary mode downloads are implemented, - // ensure they have all stopped before we reach this point. - Ok(()) - } - } + tenant.shutdown(shutdown_progress, freeze_and_flush).await }; if let Err(other_progress) = res { @@ -649,42 +683,35 @@ pub(crate) async fn create_tenant( resources: TenantSharedResources, ctx: &RequestContext, ) -> Result, TenantMapInsertError> { - tenant_map_insert(tenant_id, || async { - let location_conf = LocationConf::attached_single(tenant_conf, generation); + let location_conf = LocationConf::attached_single(tenant_conf, generation); - // We're holding the tenants lock in write mode while doing local IO. - // If this section ever becomes contentious, introduce a new `TenantState::Creating` - // and do the work in that state. - super::create_tenant_files(conf, &location_conf, &tenant_id).await?; + let slot_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::MustNotExist)?; + let tenant_path = super::create_tenant_files(conf, &location_conf, &tenant_id).await?; - // TODO: tenant directory remains on disk if we bail out from here on. - // See https://github.com/neondatabase/neon/issues/4233 + let created_tenant = tenant_spawn( + conf, + tenant_id, + &tenant_path, + resources, + AttachedTenantConf::try_from(location_conf)?, + None, + &TENANTS, + SpawnMode::Create, + ctx, + )?; + // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here. + // See https://github.com/neondatabase/neon/issues/4233 + + let created_tenant_id = created_tenant.tenant_id(); + if tenant_id != created_tenant_id { + return Err(TenantMapInsertError::Other(anyhow::anyhow!( + "loaded created tenant has unexpected tenant id (expect {tenant_id} != actual {created_tenant_id})", + ))); + } - let tenant_path = conf.tenant_path(&tenant_id); + slot_guard.upsert(TenantSlot::Attached(created_tenant.clone()))?; - let created_tenant = tenant_spawn( - conf, - tenant_id, - &tenant_path, - resources, - AttachedTenantConf::try_from(location_conf)?, - None, - &TENANTS, - SpawnMode::Create, - ctx, - )?; - // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here. - // See https://github.com/neondatabase/neon/issues/4233 - - let crated_tenant_id = created_tenant.tenant_id(); - anyhow::ensure!( - tenant_id == crated_tenant_id, - "loaded created tenant has unexpected tenant id \ - (expect {tenant_id} != actual {crated_tenant_id})", - ); - Ok(created_tenant) - }) - .await + Ok(created_tenant) } #[derive(Debug, thiserror::Error)] @@ -701,7 +728,7 @@ pub(crate) async fn set_new_tenant_config( tenant_id: TenantId, ) -> Result<(), SetNewTenantConfigError> { info!("configuring tenant {tenant_id}"); - let tenant = get_tenant(tenant_id, true).await?; + let tenant = get_tenant(tenant_id, true)?; // This is a legacy API that only operates on attached tenants: the preferred // API to use is the location_config/ endpoint, which lets the caller provide @@ -727,32 +754,49 @@ pub(crate) async fn upsert_location( ) -> Result<(), anyhow::Error> { info!("configuring tenant location {tenant_id} to state {new_location_config:?}"); - let mut existing_tenant = match get_tenant(tenant_id, false).await { - Ok(t) => Some(t), - Err(GetTenantError::NotFound(_)) => None, - Err(e) => anyhow::bail!(e), - }; - - // If we need to shut down a Tenant, do that first - let shutdown_tenant = match (&new_location_config.mode, &existing_tenant) { - (LocationMode::Secondary(_), Some(t)) => Some(t), - (LocationMode::Attached(attach_conf), Some(t)) => { - if attach_conf.generation != t.generation { - Some(t) - } else { - None + // Special case fast-path for updates to Tenant: if our upsert is only updating configuration, + // then we do not need to set the slot to InProgress, we can just call into the + // existng tenant. + { + let locked = TENANTS.read().unwrap(); + let peek_slot = tenant_map_peek_slot(&locked, &tenant_id, TenantSlotPeekMode::Write)?; + match (&new_location_config.mode, peek_slot) { + (LocationMode::Attached(attach_conf), Some(TenantSlot::Attached(tenant))) => { + if attach_conf.generation == tenant.generation { + // A transition from Attached to Attached in the same generation, we may + // take our fast path and just provide the updated configuration + // to the tenant. + tenant.set_new_location_config(AttachedTenantConf::try_from( + new_location_config, + )?); + + // Persist the new config in the background, to avoid holding up any + // locks while we do so. + // TODO + + return Ok(()); + } else { + // Different generations, fall through to general case + } + } + _ => { + // Not an Attached->Attached transition, fall through to general case } } - _ => None, - }; - - // TODO: currently we risk concurrent operations interfering with the tenant - // while we await shutdown, but we also should not hold the TenantsMap lock - // across the whole operation. Before we start using this function in production, - // a follow-on change will revise how concurrency is handled in TenantsMap. - // (https://github.com/neondatabase/neon/issues/5378) + } - if let Some(tenant) = shutdown_tenant { + // General case for upserts to TenantsMap, excluding the case above: we will substitute an + // InProgress value to the slot while we make whatever changes are required. The state for + // the tenant is inaccessible to the outside world while we are doing this, but that is sensible: + // the state is ill-defined while we're in transition. Transitions are async, but fast: we do + // not do significant I/O, and shutdowns should be prompt via cancellation tokens. + let mut slot_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::Any)?; + + if let Some(TenantSlot::Attached(tenant)) = slot_guard.get_old_value() { + // The case where we keep a Tenant alive was covered above in the special case + // for Attached->Attached transitions in the same generation. By this point, + // if we see an attached tenant we know it will be discarded and should be + // shut down. let (_guard, progress) = utils::completion::channel(); match tenant.get_attach_mode() { @@ -774,82 +818,62 @@ pub(crate) async fn upsert_location( barrier.wait().await; } } - existing_tenant = None; + slot_guard.drop_old_value().expect("We just shut it down"); } - if let Some(tenant) = existing_tenant { - // Update the existing tenant - Tenant::persist_tenant_config(conf, &tenant_id, &new_location_config) - .await - .map_err(SetNewTenantConfigError::Persist)?; - tenant.set_new_location_config(AttachedTenantConf::try_from(new_location_config)?); - } else { - // Upsert a fresh TenantSlot into TenantsMap. Do it within the map write lock, - // and re-check that the state of anything we are replacing is as expected. - tenant_map_upsert_slot(tenant_id, |old_value| async move { - if let Some(TenantSlot::Attached(t)) = old_value { - if !matches!(t.current_state(), TenantState::Stopping { .. }) { - anyhow::bail!("Tenant state changed during location configuration update"); - } - } + let tenant_path = conf.tenant_path(&tenant_id); + let new_slot = match &new_location_config.mode { + LocationMode::Secondary(_) => { let tenant_path = conf.tenant_path(&tenant_id); + // Directory doesn't need to be fsync'd because if we crash it can + // safely be recreated next time this tenant location is configured. + unsafe_create_dir_all(&tenant_path) + .await + .with_context(|| format!("Creating {tenant_path}"))?; - let new_slot = match &new_location_config.mode { - LocationMode::Secondary(_) => { - // Directory doesn't need to be fsync'd because if we crash it can - // safely be recreated next time this tenant location is configured. - unsafe_create_dir_all(&tenant_path) - .await - .with_context(|| format!("Creating {tenant_path}"))?; + Tenant::persist_tenant_config(conf, &tenant_id, &new_location_config) + .await + .map_err(SetNewTenantConfigError::Persist)?; - Tenant::persist_tenant_config(conf, &tenant_id, &new_location_config) - .await - .map_err(SetNewTenantConfigError::Persist)?; + TenantSlot::Secondary + } + LocationMode::Attached(_attach_config) => { + let timelines_path = conf.timelines_path(&tenant_id); - TenantSlot::Secondary - } - LocationMode::Attached(_attach_config) => { - // FIXME: should avoid doing this disk I/O inside the TenantsMap lock, - // we have the same problem in load_tenant/attach_tenant. Probably - // need a lock in TenantSlot to fix this. - let timelines_path = conf.timelines_path(&tenant_id); - - // Directory doesn't need to be fsync'd because we do not depend on - // it to exist after crashes: it may be recreated when tenant is - // re-attached, see https://github.com/neondatabase/neon/issues/5550 - unsafe_create_dir_all(&timelines_path) - .await - .with_context(|| format!("Creating {timelines_path}"))?; - - Tenant::persist_tenant_config(conf, &tenant_id, &new_location_config) - .await - .map_err(SetNewTenantConfigError::Persist)?; - - let tenant = tenant_spawn( - conf, - tenant_id, - &tenant_path, - TenantSharedResources { - broker_client, - remote_storage, - deletion_queue_client, - }, - AttachedTenantConf::try_from(new_location_config)?, - None, - &TENANTS, - SpawnMode::Normal, - ctx, - )?; - - TenantSlot::Attached(tenant) - } - }; + // Directory doesn't need to be fsync'd because we do not depend on + // it to exist after crashes: it may be recreated when tenant is + // re-attached, see https://github.com/neondatabase/neon/issues/5550 + unsafe_create_dir_all(&timelines_path) + .await + .with_context(|| format!("Creating {timelines_path}"))?; + + Tenant::persist_tenant_config(conf, &tenant_id, &new_location_config) + .await + .map_err(SetNewTenantConfigError::Persist)?; + + let tenant = tenant_spawn( + conf, + tenant_id, + &tenant_path, + TenantSharedResources { + broker_client, + remote_storage, + deletion_queue_client, + }, + AttachedTenantConf::try_from(new_location_config)?, + None, + &TENANTS, + SpawnMode::Normal, + ctx, + )?; + + TenantSlot::Attached(tenant) + } + }; + + slot_guard.upsert(new_slot)?; - Ok(new_slot) - }) - .await?; - } Ok(()) } @@ -864,34 +888,167 @@ pub(crate) enum GetTenantError { /// is a stuck error state #[error("Tenant is broken: {0}")] Broken(String), + + // Initializing or shutting down: cannot authoritatively say whether we have this tenant + #[error("Tenant map is not available: {0}")] + MapState(#[from] TenantMapError), } /// Gets the tenant from the in-memory data, erroring if it's absent or is not fitting to the query. /// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants. /// /// This method is cancel-safe. -pub(crate) async fn get_tenant( +pub(crate) fn get_tenant( tenant_id: TenantId, active_only: bool, ) -> Result, GetTenantError> { - let m = TENANTS.read().await; - let tenant = m - .get(&tenant_id) - .ok_or(GetTenantError::NotFound(tenant_id))?; - - match tenant.current_state() { - TenantState::Broken { - reason, - backtrace: _, - } if active_only => Err(GetTenantError::Broken(reason)), - TenantState::Active => Ok(Arc::clone(tenant)), - _ => { - if active_only { - Err(GetTenantError::NotActive(tenant_id)) + let locked = TENANTS.read().unwrap(); + let peek_slot = tenant_map_peek_slot(&locked, &tenant_id, TenantSlotPeekMode::Read)?; + + match peek_slot { + Some(TenantSlot::Attached(tenant)) => match tenant.current_state() { + TenantState::Broken { + reason, + backtrace: _, + } if active_only => Err(GetTenantError::Broken(reason)), + TenantState::Active => Ok(Arc::clone(tenant)), + _ => { + if active_only { + Err(GetTenantError::NotActive(tenant_id)) + } else { + Ok(Arc::clone(tenant)) + } + } + }, + Some(TenantSlot::InProgress(_)) => Err(GetTenantError::NotActive(tenant_id)), + None | Some(TenantSlot::Secondary) => Err(GetTenantError::NotFound(tenant_id)), + } +} + +#[derive(thiserror::Error, Debug)] +pub(crate) enum GetActiveTenantError { + /// We may time out either while TenantSlot is InProgress, or while the Tenant + /// is in a non-Active state + #[error( + "Timed out waiting {wait_time:?} for tenant active state. Latest state: {latest_state:?}" + )] + WaitForActiveTimeout { + latest_state: Option, + wait_time: Duration, + }, + + /// The TenantSlot is absent, or in secondary mode + #[error(transparent)] + NotFound(#[from] GetTenantError), + + /// Cancellation token fired while we were waiting + #[error("cancelled")] + Cancelled, + + /// Tenant exists, but is in a state that cannot become active (e.g. Stopping, Broken) + #[error("will not become active. Current state: {0}")] + WillNotBecomeActive(TenantState), +} + +/// Get a [`Tenant`] in its active state. If the tenant_id is currently in [`TenantSlot::InProgress`] +/// state, then wait for up to `timeout`. If the [`Tenant`] is not currently in [`TenantState::Active`], +/// then wait for up to `timeout` (minus however long we waited for the slot). +pub(crate) async fn get_active_tenant_with_timeout( + tenant_id: TenantId, + timeout: Duration, + cancel: &CancellationToken, +) -> Result, GetActiveTenantError> { + enum WaitFor { + Barrier(utils::completion::Barrier), + Tenant(Arc), + } + + let wait_start = Instant::now(); + let deadline = wait_start + timeout; + + let wait_for = { + let locked = TENANTS.read().unwrap(); + let peek_slot = tenant_map_peek_slot(&locked, &tenant_id, TenantSlotPeekMode::Read) + .map_err(GetTenantError::MapState)?; + match peek_slot { + Some(TenantSlot::Attached(tenant)) => { + match tenant.current_state() { + TenantState::Active => { + // Fast path: we don't need to do any async waiting. + return Ok(tenant.clone()); + } + _ => WaitFor::Tenant(tenant.clone()), + } + } + Some(TenantSlot::Secondary) => { + return Err(GetActiveTenantError::NotFound(GetTenantError::NotActive( + tenant_id, + ))) + } + Some(TenantSlot::InProgress(barrier)) => WaitFor::Barrier(barrier.clone()), + None => { + return Err(GetActiveTenantError::NotFound(GetTenantError::NotFound( + tenant_id, + ))) + } + } + }; + + let tenant = match wait_for { + WaitFor::Barrier(barrier) => { + tracing::debug!("Waiting for tenant InProgress state to pass..."); + timeout_cancellable( + deadline.duration_since(Instant::now()), + cancel, + barrier.wait(), + ) + .await + .map_err(|e| match e { + TimeoutCancellableError::Timeout => GetActiveTenantError::WaitForActiveTimeout { + latest_state: None, + wait_time: wait_start.elapsed(), + }, + TimeoutCancellableError::Cancelled => GetActiveTenantError::Cancelled, + })?; + { + let locked = TENANTS.read().unwrap(); + let peek_slot = tenant_map_peek_slot(&locked, &tenant_id, TenantSlotPeekMode::Read) + .map_err(GetTenantError::MapState)?; + match peek_slot { + Some(TenantSlot::Attached(tenant)) => tenant.clone(), + _ => { + return Err(GetActiveTenantError::NotFound(GetTenantError::NotActive( + tenant_id, + ))) + } + } + } + } + WaitFor::Tenant(tenant) => tenant, + }; + + tracing::debug!("Waiting for tenant to enter active state..."); + match timeout_cancellable( + deadline.duration_since(Instant::now()), + cancel, + tenant.wait_to_become_active(), + ) + .await + { + Ok(Ok(())) => Ok(tenant), + Ok(Err(e)) => Err(e), + Err(TimeoutCancellableError::Timeout) => { + let latest_state = tenant.current_state(); + if latest_state == TenantState::Active { + Ok(tenant) } else { - Ok(Arc::clone(tenant)) + Err(GetActiveTenantError::WaitForActiveTimeout { + latest_state: Some(latest_state), + wait_time: timeout, + }) } } + Err(TimeoutCancellableError::Cancelled) => Err(GetActiveTenantError::Cancelled), } } @@ -900,7 +1057,33 @@ pub(crate) async fn delete_tenant( remote_storage: Option, tenant_id: TenantId, ) -> Result<(), DeleteTenantError> { - DeleteTenantFlow::run(conf, remote_storage, &TENANTS, tenant_id).await + // We acquire a SlotGuard during this function to protect against concurrent + // changes while the ::prepare phase of DeleteTenantFlow executes, but then + // have to return the Tenant to the map while the background deletion runs. + // + // TODO: refactor deletion to happen outside the lifetime of a Tenant. + // Currently, deletion requires a reference to the tenants map in order to + // keep the Tenant in the map until deletion is complete, and then remove + // it at the end. + // + // See https://github.com/neondatabase/neon/issues/5080 + + let mut slot_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::MustExist)?; + + // unwrap is safe because we used MustExist mode when acquiring + let tenant = match slot_guard.get_old_value().as_ref().unwrap() { + TenantSlot::Attached(tenant) => tenant.clone(), + _ => { + // Express "not attached" as equivalent to "not found" + return Err(DeleteTenantError::NotAttached); + } + }; + + let result = DeleteTenantFlow::run(conf, remote_storage, &TENANTS, tenant).await; + + // The Tenant goes back into the map in Stopping state, it will eventually be removed by DeleteTenantFLow + slot_guard.revert(); + result } #[derive(Debug, thiserror::Error)] @@ -917,18 +1100,20 @@ pub(crate) async fn delete_timeline( timeline_id: TimelineId, _ctx: &RequestContext, ) -> Result<(), DeleteTimelineError> { - let tenant = get_tenant(tenant_id, true).await?; + let tenant = get_tenant(tenant_id, true)?; DeleteTimelineFlow::run(&tenant, timeline_id, false).await?; Ok(()) } #[derive(Debug, thiserror::Error)] pub(crate) enum TenantStateError { - #[error("Tenant {0} not found")] - NotFound(TenantId), #[error("Tenant {0} is stopping")] IsStopping(TenantId), #[error(transparent)] + SlotError(#[from] TenantSlotError), + #[error(transparent)] + SlotUpsertError(#[from] TenantSlotUpsertError), + #[error(transparent)] Other(#[from] anyhow::Error), } @@ -967,7 +1152,7 @@ pub(crate) async fn detach_tenant( async fn detach_tenant0( conf: &'static PageServerConf, - tenants: &tokio::sync::RwLock, + tenants: &std::sync::RwLock, tenant_id: TenantId, detach_ignored: bool, deletion_queue_client: &DeletionQueueClient, @@ -988,7 +1173,12 @@ async fn detach_tenant0( // Ignored tenants are not present in memory and will bail the removal from memory operation. // Before returning the error, check for ignored tenant removal case — we only need to clean its local files then. - if detach_ignored && matches!(removal_result, Err(TenantStateError::NotFound(_))) { + if detach_ignored + && matches!( + removal_result, + Err(TenantStateError::SlotError(TenantSlotError::NotFound(_))) + ) + { let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_id); if tenant_ignore_mark.exists() { info!("Detaching an ignored tenant"); @@ -1011,31 +1201,44 @@ pub(crate) async fn load_tenant( deletion_queue_client: DeletionQueueClient, ctx: &RequestContext, ) -> Result<(), TenantMapInsertError> { - tenant_map_insert(tenant_id, || async { - let tenant_path = conf.tenant_path(&tenant_id); - let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_id); - if tenant_ignore_mark.exists() { - std::fs::remove_file(&tenant_ignore_mark) - .with_context(|| format!("Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"))?; - } + let slot_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::MustNotExist)?; + let tenant_path = conf.tenant_path(&tenant_id); - let resources = TenantSharedResources { - broker_client, - remote_storage, - deletion_queue_client - }; + let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_id); + if tenant_ignore_mark.exists() { + std::fs::remove_file(&tenant_ignore_mark).with_context(|| { + format!( + "Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading" + ) + })?; + } - let mut location_conf = Tenant::load_tenant_config(conf, &tenant_id).map_err( TenantMapInsertError::Other)?; - location_conf.attach_in_generation(generation); - Tenant::persist_tenant_config(conf, &tenant_id, &location_conf).await?; + let resources = TenantSharedResources { + broker_client, + remote_storage, + deletion_queue_client, + }; - let new_tenant = tenant_spawn(conf, tenant_id, &tenant_path, resources, AttachedTenantConf::try_from(location_conf)?, None, &TENANTS, SpawnMode::Normal, ctx) - .with_context(|| { - format!("Failed to schedule tenant processing in path {tenant_path:?}") - })?; + let mut location_conf = + Tenant::load_tenant_config(conf, &tenant_id).map_err(TenantMapInsertError::Other)?; + location_conf.attach_in_generation(generation); + + Tenant::persist_tenant_config(conf, &tenant_id, &location_conf).await?; + + let new_tenant = tenant_spawn( + conf, + tenant_id, + &tenant_path, + resources, + AttachedTenantConf::try_from(location_conf)?, + None, + &TENANTS, + SpawnMode::Normal, + ctx, + ) + .with_context(|| format!("Failed to schedule tenant processing in path {tenant_path:?}"))?; - Ok(new_tenant) - }).await?; + slot_guard.upsert(TenantSlot::Attached(new_tenant))?; Ok(()) } @@ -1048,7 +1251,7 @@ pub(crate) async fn ignore_tenant( async fn ignore_tenant0( conf: &'static PageServerConf, - tenants: &tokio::sync::RwLock, + tenants: &std::sync::RwLock, tenant_id: TenantId, ) -> Result<(), TenantStateError> { remove_tenant_from_memory(tenants, tenant_id, async { @@ -1076,7 +1279,7 @@ pub(crate) enum TenantMapListError { /// Get list of tenants, for the mgmt API /// pub(crate) async fn list_tenants() -> Result, TenantMapListError> { - let tenants = TENANTS.read().await; + let tenants = TENANTS.read().unwrap(); let m = match &*tenants { TenantsMap::Initializing => return Err(TenantMapListError::Initializing), TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m, @@ -1085,6 +1288,7 @@ pub(crate) async fn list_tenants() -> Result, Tenan .filter_map(|(id, tenant)| match tenant { TenantSlot::Attached(tenant) => Some((*id, tenant.current_state())), TenantSlot::Secondary => None, + TenantSlot::InProgress(_) => None, }) .collect()) } @@ -1101,101 +1305,436 @@ pub(crate) async fn attach_tenant( resources: TenantSharedResources, ctx: &RequestContext, ) -> Result<(), TenantMapInsertError> { - tenant_map_insert(tenant_id, || async { - let location_conf = LocationConf::attached_single(tenant_conf, generation); - let tenant_dir = create_tenant_files(conf, &location_conf, &tenant_id).await?; - // TODO: tenant directory remains on disk if we bail out from here on. - // See https://github.com/neondatabase/neon/issues/4233 - - let attached_tenant = tenant_spawn(conf, tenant_id, &tenant_dir, - resources, AttachedTenantConf::try_from(location_conf)?, None, &TENANTS, SpawnMode::Normal, ctx)?; - // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here. - // See https://github.com/neondatabase/neon/issues/4233 - - let attached_tenant_id = attached_tenant.tenant_id(); - anyhow::ensure!( - tenant_id == attached_tenant_id, + let slot_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::MustNotExist)?; + let location_conf = LocationConf::attached_single(tenant_conf, generation); + let tenant_dir = create_tenant_files(conf, &location_conf, &tenant_id).await?; + // TODO: tenant directory remains on disk if we bail out from here on. + // See https://github.com/neondatabase/neon/issues/4233 + + let attached_tenant = tenant_spawn( + conf, + tenant_id, + &tenant_dir, + resources, + AttachedTenantConf::try_from(location_conf)?, + None, + &TENANTS, + SpawnMode::Normal, + ctx, + )?; + // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here. + // See https://github.com/neondatabase/neon/issues/4233 + + let attached_tenant_id = attached_tenant.tenant_id(); + if tenant_id != attached_tenant_id { + return Err(TenantMapInsertError::Other(anyhow::anyhow!( "loaded created tenant has unexpected tenant id (expect {tenant_id} != actual {attached_tenant_id})", - ); - Ok(attached_tenant) - }) - .await?; + ))); + } + + slot_guard.upsert(TenantSlot::Attached(attached_tenant))?; Ok(()) } #[derive(Debug, thiserror::Error)] pub(crate) enum TenantMapInsertError { + #[error(transparent)] + SlotError(#[from] TenantSlotError), + #[error(transparent)] + SlotUpsertError(#[from] TenantSlotUpsertError), + #[error(transparent)] + Other(#[from] anyhow::Error), +} + +/// Superset of TenantMapError: issues that can occur when acquiring a slot +/// for a particular tenant ID. +#[derive(Debug, thiserror::Error)] +pub enum TenantSlotError { + /// When acquiring a slot with the expectation that the tenant already exists. + #[error("Tenant {0} not found")] + NotFound(TenantId), + + /// When acquiring a slot with the expectation that the tenant does not already exist. + #[error("tenant {0} already exists, state: {1:?}")] + AlreadyExists(TenantId, TenantState), + + #[error("tenant {0} already exists in but is not attached")] + Conflict(TenantId), + + // Tried to read a slot that is currently being mutated by another administrative + // operation. + #[error("tenant has a state change in progress, try again later")] + InProgress, + + #[error(transparent)] + MapState(#[from] TenantMapError), +} + +/// Superset of TenantMapError: issues that can occur when using a SlotGuard +/// to insert a new value. +#[derive(Debug, thiserror::Error)] +pub enum TenantSlotUpsertError { + /// An error where the slot is in an unexpected state, indicating a code bug + #[error("Internal error updating Tenant")] + InternalError(Cow<'static, str>), + + #[error(transparent)] + MapState(#[from] TenantMapError), +} + +#[derive(Debug)] +enum TenantSlotDropError { + /// It is only legal to drop a TenantSlot if its contents are fully shut down + NotShutdown, +} + +/// Errors that can happen any time we are walking the tenant map to try and acquire +/// the TenantSlot for a particular tenant. +#[derive(Debug, thiserror::Error)] +pub enum TenantMapError { + // Tried to read while initializing #[error("tenant map is still initializing")] StillInitializing, + + // Tried to read while shutting down #[error("tenant map is shutting down")] ShuttingDown, - #[error("tenant {0} already exists, state: {1:?}")] - TenantAlreadyExists(TenantId, TenantState), - #[error("tenant {0} already exists in secondary state")] - TenantExistsSecondary(TenantId), - #[error(transparent)] - Other(#[from] anyhow::Error), } -/// Give the given closure access to the tenants map entry for the given `tenant_id`, iff that -/// entry is vacant. The closure is responsible for creating the tenant object and inserting -/// it into the tenants map through the vacnt entry that it receives as argument. +/// Guards a particular tenant_id's content in the TenantsMap. While this +/// structure exists, the TenantsMap will contain a [`TenantSlot::InProgress`] +/// for this tenant, which acts as a marker for any operations targeting +/// this tenant to retry later, or wait for the InProgress state to end. +/// +/// This structure enforces the important invariant that we do not have overlapping +/// tasks that will try use local storage for a the same tenant ID: we enforce that +/// the previous contents of a slot have been shut down before the slot can be +/// left empty or used for something else /// -/// NB: the closure should return quickly because the current implementation of tenants map -/// serializes access through an `RwLock`. -async fn tenant_map_insert( +/// Holders of a SlotGuard should explicitly dispose of it, using either `upsert` +/// to provide a new value, or `revert` to put the slot back into its initial +/// state. If the SlotGuard is dropped without calling either of these, then +/// we will leave the slot empty if our `old_value` is already shut down, else +/// we will replace the slot with `old_value` (equivalent to doing a revert). +/// +/// The `old_value` may be dropped before the SlotGuard is dropped, by calling +/// `drop_old_value`. It is an error to call this without shutting down +/// the conents of `old_value`. +pub struct SlotGuard { tenant_id: TenantId, - insert_fn: F, -) -> Result, TenantMapInsertError> -where - F: FnOnce() -> R, - R: std::future::Future>>, -{ - let mut guard = TENANTS.write().await; - let m = match &mut *guard { - TenantsMap::Initializing => return Err(TenantMapInsertError::StillInitializing), - TenantsMap::ShuttingDown(_) => return Err(TenantMapInsertError::ShuttingDown), - TenantsMap::Open(m) => m, - }; - match m.entry(tenant_id) { - hash_map::Entry::Occupied(e) => match e.get() { - TenantSlot::Attached(t) => Err(TenantMapInsertError::TenantAlreadyExists( - tenant_id, - t.current_state(), - )), - TenantSlot::Secondary => Err(TenantMapInsertError::TenantExistsSecondary(tenant_id)), - }, - hash_map::Entry::Vacant(v) => match insert_fn().await { - Ok(tenant) => { - v.insert(TenantSlot::Attached(tenant.clone())); - Ok(tenant) + old_value: Option, + upserted: bool, + + /// [`TenantSlot::InProgress`] carries the corresponding Barrier: it will + /// release any waiters as soon as this SlotGuard is dropped. + _completion: utils::completion::Completion, +} + +unsafe impl Send for SlotGuard {} +unsafe impl Sync for SlotGuard {} + +impl SlotGuard { + fn new( + tenant_id: TenantId, + old_value: Option, + completion: utils::completion::Completion, + ) -> Self { + Self { + tenant_id, + old_value, + upserted: false, + _completion: completion, + } + } + + /// Take any value that was present in the slot before we acquired ownership + /// of it: in state transitions, this will be the old state. + fn get_old_value(&mut self) -> &Option { + &self.old_value + } + + /// Emplace a new value in the slot. This consumes the guard, and after + /// returning, the slot is no longer protected from concurrent changes. + fn upsert(mut self, new_value: TenantSlot) -> Result<(), TenantSlotUpsertError> { + if !self.old_value_is_shutdown() { + // This is a bug: callers should never try to drop an old value without + // shutting it down + return Err(TenantSlotUpsertError::InternalError( + "Old TenantSlot value not shut down".into(), + )); + } + + let replaced = { + let mut locked = TENANTS.write().unwrap(); + + if let TenantSlot::InProgress(_) = new_value { + // It is never expected to try and upsert InProgress via this path: it should + // only be written via the tenant_map_acquire_slot path. If we hit this it's a bug. + return Err(TenantSlotUpsertError::InternalError( + "Attempt to upsert an InProgress state".into(), + )); } - Err(e) => Err(TenantMapInsertError::Other(e)), - }, + + let m = match &mut *locked { + TenantsMap::Initializing => return Err(TenantMapError::StillInitializing.into()), + TenantsMap::ShuttingDown(_) => { + return Err(TenantMapError::ShuttingDown.into()); + } + TenantsMap::Open(m) => m, + }; + + let replaced = m.insert(self.tenant_id, new_value); + self.upserted = true; + + METRICS.tenant_slots.set(m.len() as u64); + + replaced + }; + + // Sanity check: on an upsert we should always be replacing an InProgress marker + match replaced { + Some(TenantSlot::InProgress(_)) => { + // Expected case: we find our InProgress in the map: nothing should have + // replaced it because the code that acquires slots will not grant another + // one for the same TenantId. + Ok(()) + } + None => { + METRICS.unexpected_errors.inc(); + error!( + tenant_id = %self.tenant_id, + "Missing InProgress marker during tenant upsert, this is a bug." + ); + Err(TenantSlotUpsertError::InternalError( + "Missing InProgress marker during tenant upsert".into(), + )) + } + Some(slot) => { + METRICS.unexpected_errors.inc(); + error!(tenant_id=%self.tenant_id, "Unexpected contents of TenantSlot during upsert, this is a bug. Contents: {:?}", slot); + Err(TenantSlotUpsertError::InternalError( + "Unexpected contents of TenantSlot".into(), + )) + } + } + } + + /// Replace the InProgress slot with whatever was in the guard when we started + fn revert(mut self) { + if let Some(value) = self.old_value.take() { + match self.upsert(value) { + Err(TenantSlotUpsertError::InternalError(_)) => { + // We already logged the error, nothing else we can do. + } + Err(TenantSlotUpsertError::MapState(_)) => { + // If the map is shutting down, we need not replace anything + } + Ok(()) => {} + } + } + } + + /// We may never drop our old value until it is cleanly shut down: otherwise we might leave + /// rogue background tasks that would write to the local tenant directory that this guard + /// is responsible for protecting + fn old_value_is_shutdown(&self) -> bool { + match self.old_value.as_ref() { + Some(TenantSlot::Attached(tenant)) => { + // TODO: PR #5711 will add a gate that enables properly checking that + // shutdown completed. + matches!( + tenant.current_state(), + TenantState::Stopping { .. } | TenantState::Broken { .. } + ) + } + Some(TenantSlot::Secondary) => { + // TODO: when adding secondary mode tenants, this will check for shutdown + // in the same way that we do for `Tenant` above + true + } + Some(TenantSlot::InProgress(_)) => { + // A SlotGuard cannot be constructed for a slot that was already InProgress + unreachable!() + } + None => true, + } + } + + /// The guard holder is done with the old value of the slot: they are obliged to already + /// shut it down before we reach this point. + fn drop_old_value(&mut self) -> Result<(), TenantSlotDropError> { + if !self.old_value_is_shutdown() { + Err(TenantSlotDropError::NotShutdown) + } else { + self.old_value.take(); + Ok(()) + } } } -async fn tenant_map_upsert_slot<'a, F, R>( - tenant_id: TenantId, - upsert_fn: F, -) -> Result<(), TenantMapInsertError> -where - F: FnOnce(Option) -> R, - R: std::future::Future>, -{ - let mut guard = TENANTS.write().await; - let m = match &mut *guard { - TenantsMap::Initializing => return Err(TenantMapInsertError::StillInitializing), - TenantsMap::ShuttingDown(_) => return Err(TenantMapInsertError::ShuttingDown), +impl Drop for SlotGuard { + fn drop(&mut self) { + if self.upserted { + return; + } + // Our old value is already shutdown, or it never existed: it is safe + // for us to fully release the TenantSlot back into an empty state + + let mut locked = TENANTS.write().unwrap(); + + let m = match &mut *locked { + TenantsMap::Initializing => { + // There is no map, this should never happen. + return; + } + TenantsMap::ShuttingDown(_) => { + // When we transition to shutdown, InProgress elements are removed + // from the map, so we do not need to clean up our Inprogress marker. + // See [`shutdown_all_tenants0`] + return; + } + TenantsMap::Open(m) => m, + }; + + use std::collections::hash_map::Entry; + match m.entry(self.tenant_id) { + Entry::Occupied(mut entry) => { + if !matches!(entry.get(), TenantSlot::InProgress(_)) { + METRICS.unexpected_errors.inc(); + error!(tenant_id=%self.tenant_id, "Unexpected contents of TenantSlot during drop, this is a bug. Contents: {:?}", entry.get()); + } + + if self.old_value_is_shutdown() { + entry.remove(); + } else { + entry.insert(self.old_value.take().unwrap()); + } + } + Entry::Vacant(_) => { + METRICS.unexpected_errors.inc(); + error!( + tenant_id = %self.tenant_id, + "Missing InProgress marker during SlotGuard drop, this is a bug." + ); + } + } + + METRICS.tenant_slots.set(m.len() as u64); + } +} + +enum TenantSlotPeekMode { + /// In Read mode, peek will be permitted to see the slots even if the pageserver is shutting down + Read, + /// In Write mode, trying to peek at a slot while the pageserver is shutting down is an error + Write, +} + +fn tenant_map_peek_slot<'a>( + tenants: &'a std::sync::RwLockReadGuard<'a, TenantsMap>, + tenant_id: &TenantId, + mode: TenantSlotPeekMode, +) -> Result, TenantMapError> { + let m = match tenants.deref() { + TenantsMap::Initializing => return Err(TenantMapError::StillInitializing), + TenantsMap::ShuttingDown(m) => match mode { + TenantSlotPeekMode::Read => m, + TenantSlotPeekMode::Write => { + return Err(TenantMapError::ShuttingDown); + } + }, TenantsMap::Open(m) => m, }; - match upsert_fn(m.remove(&tenant_id)).await { - Ok(upsert_val) => { - m.insert(tenant_id, upsert_val); - Ok(()) + Ok(m.get(tenant_id)) +} + +enum TenantSlotAcquireMode { + /// Acquire the slot irrespective of current state, or whether it already exists + Any, + /// Return an error if trying to acquire a slot and it doesn't already exist + MustExist, + /// Return an error if trying to acquire a slot and it already exists + MustNotExist, +} + +fn tenant_map_acquire_slot( + tenant_id: &TenantId, + mode: TenantSlotAcquireMode, +) -> Result { + tenant_map_acquire_slot_impl(tenant_id, &TENANTS, mode) +} + +fn tenant_map_acquire_slot_impl( + tenant_id: &TenantId, + tenants: &std::sync::RwLock, + mode: TenantSlotAcquireMode, +) -> Result { + use TenantSlotAcquireMode::*; + METRICS.tenant_slot_writes.inc(); + + let mut locked = tenants.write().unwrap(); + let span = tracing::info_span!("acquire_slot", %tenant_id); + let _guard = span.enter(); + + let m = match &mut *locked { + TenantsMap::Initializing => return Err(TenantMapError::StillInitializing.into()), + TenantsMap::ShuttingDown(_) => return Err(TenantMapError::ShuttingDown.into()), + TenantsMap::Open(m) => m, + }; + + use std::collections::hash_map::Entry; + let entry = m.entry(*tenant_id); + match entry { + Entry::Vacant(v) => match mode { + MustExist => { + tracing::debug!("Vacant && MustExist: return NotFound"); + Err(TenantSlotError::NotFound(*tenant_id)) + } + _ => { + let (completion, barrier) = utils::completion::channel(); + v.insert(TenantSlot::InProgress(barrier)); + tracing::debug!("Vacant, inserted InProgress"); + Ok(SlotGuard::new(*tenant_id, None, completion)) + } + }, + Entry::Occupied(mut o) => { + // Apply mode-driven checks + match (o.get(), mode) { + (TenantSlot::InProgress(_), _) => { + tracing::debug!("Occupied, failing for InProgress"); + Err(TenantSlotError::InProgress) + } + (slot, MustNotExist) => match slot { + TenantSlot::Attached(tenant) => { + tracing::debug!("Attached && MustNotExist, return AlreadyExists"); + Err(TenantSlotError::AlreadyExists( + *tenant_id, + tenant.current_state(), + )) + } + _ => { + // FIXME: the AlreadyExists error assumes that we have a Tenant + // to get the state from + tracing::debug!("Occupied & MustNotExist, return AlreadyExists"); + Err(TenantSlotError::AlreadyExists( + *tenant_id, + TenantState::Broken { + reason: "Present but not attached".to_string(), + backtrace: "".to_string(), + }, + )) + } + }, + _ => { + // Happy case: the slot was not in any state that violated our mode + let (completion, barrier) = utils::completion::channel(); + let old_value = o.insert(TenantSlot::InProgress(barrier)); + tracing::debug!("Occupied, replaced with InProgress"); + Ok(SlotGuard::new(*tenant_id, Some(old_value), completion)) + } + } } - Err(e) => Err(TenantMapInsertError::Other(e)), } } @@ -1204,7 +1743,7 @@ where /// If the cleanup fails, tenant will stay in memory in [`TenantState::Broken`] state, and another removal /// operation would be needed to remove it. async fn remove_tenant_from_memory( - tenants: &tokio::sync::RwLock, + tenants: &std::sync::RwLock, tenant_id: TenantId, tenant_cleanup: F, ) -> Result @@ -1213,20 +1752,14 @@ where { use utils::completion; - // It's important to keep the tenant in memory after the final cleanup, to avoid cleanup races. - // The exclusive lock here ensures we don't miss the tenant state updates before trying another removal. - // tenant-wde cleanup operations may take some time (removing the entire tenant directory), we want to - // avoid holding the lock for the entire process. - let tenant = { - match tenants - .write() - .await - .get_slot(&tenant_id) - .ok_or(TenantStateError::NotFound(tenant_id))? - { - TenantSlot::Attached(t) => Some(t.clone()), - TenantSlot::Secondary => None, - } + let mut slot_guard = + tenant_map_acquire_slot_impl(&tenant_id, tenants, TenantSlotAcquireMode::MustExist)?; + + // The SlotGuard allows us to manipulate the Tenant object without fear of some + // concurrent API request doing something else for the same tenant ID. + let attached_tenant = match slot_guard.get_old_value() { + Some(TenantSlot::Attached(t)) => Some(t), + _ => None, }; // allow pageserver shutdown to await for our completion @@ -1234,7 +1767,7 @@ where // If the tenant was attached, shut it down gracefully. For secondary // locations this part is not necessary - match tenant { + match &attached_tenant { Some(attached_tenant) => { // whenever we remove a tenant from memory, we don't want to flush and wait for upload let freeze_and_flush = false; @@ -1246,6 +1779,7 @@ where Err(_other) => { // if pageserver shutdown or other detach/ignore is already ongoing, we don't want to // wait for it but return an error right away because these are distinct requests. + slot_guard.revert(); return Err(TenantStateError::IsStopping(tenant_id)); } } @@ -1260,23 +1794,21 @@ where .with_context(|| format!("Failed to run cleanup for tenant {tenant_id}")) { Ok(hook_value) => { - let mut tenants_accessor = tenants.write().await; - if tenants_accessor.remove(&tenant_id).is_none() { - warn!("Tenant {tenant_id} got removed from memory before operation finished"); - } + // Success: drop the old TenantSlot::Attached. + slot_guard + .drop_old_value() + .expect("We just called shutdown"); + Ok(hook_value) } Err(e) => { - let tenants_accessor = tenants.read().await; - match tenants_accessor.get(&tenant_id) { - Some(tenant) => { - tenant.set_broken(e.to_string()).await; - } - None => { - warn!("Tenant {tenant_id} got removed from memory"); - return Err(TenantStateError::NotFound(tenant_id)); - } + // If we had a Tenant, set it to Broken and put it back in the TenantsMap + if let Some(attached_tenant) = attached_tenant { + attached_tenant.set_broken(e.to_string()).await; } + // Leave the broken tenant in the map + slot_guard.revert(); + Err(TenantStateError::Other(e)) } } @@ -1293,7 +1825,7 @@ pub(crate) async fn immediate_gc( gc_req: TimelineGcRequest, ctx: &RequestContext, ) -> Result>, ApiError> { - let guard = TENANTS.read().await; + let guard = TENANTS.read().unwrap(); let tenant = guard .get(&tenant_id) .map(Arc::clone) @@ -1346,14 +1878,12 @@ mod tests { use super::{super::harness::TenantHarness, TenantsMap}; - #[tokio::test(start_paused = true)] - async fn shutdown_joins_remove_tenant_from_memory() { - // the test is a bit ugly with the lockstep together with spawned tasks. the aim is to make - // sure `shutdown_all_tenants0` per-tenant processing joins in any active - // remove_tenant_from_memory calls, which is enforced by making the operation last until - // we've ran `shutdown_all_tenants0` for a long time. + #[tokio::test] + async fn shutdown_awaits_in_progress_tenant() { + // Test that if an InProgress tenant is in the map during shutdown, the shutdown will gracefully + // wait for it to complete before proceeding. - let (t, _ctx) = TenantHarness::create("shutdown_joins_detach") + let (t, _ctx) = TenantHarness::create("shutdown_awaits_in_progress_tenant") .unwrap() .load() .await; @@ -1366,13 +1896,14 @@ mod tests { let _e = info_span!("testing", tenant_id = %id).entered(); let tenants = HashMap::from([(id, TenantSlot::Attached(t.clone()))]); - let tenants = Arc::new(tokio::sync::RwLock::new(TenantsMap::Open(tenants))); + let tenants = Arc::new(std::sync::RwLock::new(TenantsMap::Open(tenants))); + + // Invoke remove_tenant_from_memory with a cleanup hook that blocks until we manually + // permit it to proceed: that will stick the tenant in InProgress let (until_cleanup_completed, can_complete_cleanup) = utils::completion::channel(); let (until_cleanup_started, cleanup_started) = utils::completion::channel(); - - // start a "detaching operation", which will take a while, until can_complete_cleanup - let cleanup_task = { + let mut remove_tenant_from_memory_task = { let jh = tokio::spawn({ let tenants = tenants.clone(); async move { @@ -1391,12 +1922,6 @@ mod tests { jh }; - let mut cleanup_progress = std::pin::pin!(t - .shutdown(utils::completion::Barrier::default(), false) - .await - .unwrap_err() - .wait()); - let mut shutdown_task = { let (until_shutdown_started, shutdown_started) = utils::completion::channel(); @@ -1409,37 +1934,17 @@ mod tests { shutdown_task }; - // if the joining in is removed from shutdown_all_tenants0, the shutdown_task should always - // get to complete within timeout and fail the test. it is expected to continue awaiting - // until completion or SIGKILL during normal shutdown. - // - // the timeout is long to cover anything that shutdown_task could be doing, but it is - // handled instantly because we use tokio's time pausing in this test. 100s is much more than - // what we get from systemd on shutdown (10s). - let long_time = std::time::Duration::from_secs(100); + let long_time = std::time::Duration::from_secs(15); tokio::select! { - _ = &mut shutdown_task => unreachable!("shutdown must continue, until_cleanup_completed is not dropped"), - _ = &mut cleanup_progress => unreachable!("cleanup progress must continue, until_cleanup_completed is not dropped"), + _ = &mut shutdown_task => unreachable!("shutdown should block on remove_tenant_from_memory completing"), + _ = &mut remove_tenant_from_memory_task => unreachable!("remove_tenant_from_memory_task should not complete until explicitly unblocked"), _ = tokio::time::sleep(long_time) => {}, } - // allow the remove_tenant_from_memory and thus eventually the shutdown to continue drop(until_cleanup_completed); - let (je, ()) = tokio::join!(shutdown_task, cleanup_progress); - je.expect("Tenant::shutdown shutdown not have panicked"); - cleanup_task - .await - .expect("no panicking") - .expect("remove_tenant_from_memory failed"); - - futures::future::poll_immediate( - t.shutdown(utils::completion::Barrier::default(), false) - .await - .unwrap_err() - .wait(), - ) - .await - .expect("the stopping progress must still be complete"); + // Now that we allow it to proceed, shutdown should complete immediately + remove_tenant_from_memory_task.await.unwrap().unwrap(); + shutdown_task.await.unwrap(); } } diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 52c53a5c3b02..e3aad22e407a 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -341,20 +341,7 @@ impl Timeline { // Make one of the tenant's timelines draw the short straw and run the calculation. // The others wait until the calculation is done so that they take into account the // imitated accesses that the winner made. - // - // It is critical we are responsive to cancellation here. Otherwise, we deadlock with - // tenant deletion (holds TENANTS in read mode) any other task that attempts to - // acquire TENANTS in write mode before we here call get_tenant. - // See https://github.com/neondatabase/neon/issues/5284. - let res = tokio::select! { - _ = cancel.cancelled() => { - return ControlFlow::Break(()); - } - res = crate::tenant::mgr::get_tenant(self.tenant_id, true) => { - res - } - }; - let tenant = match res { + let tenant = match crate::tenant::mgr::get_tenant(self.tenant_id, true) { Ok(t) => t, Err(_) => { return ControlFlow::Break(()); diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 740c05dcea48..d00bf8c4d84b 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -626,6 +626,8 @@ def __exit__( sk.stop(immediate=True) for pageserver in self.env.pageservers: + pageserver.assert_no_metric_errors() + pageserver.stop(immediate=True) if self.env.attachment_service is not None: @@ -1784,6 +1786,21 @@ def assert_no_errors(self): assert not errors + def assert_no_metric_errors(self): + """ + Certain metrics should _always_ be zero: they track conditions that indicate a bug. + """ + if not self.running: + log.info(f"Skipping metrics check on pageserver {self.id}, it is not running") + return + + for metric in [ + "pageserver_tenant_manager_unexpected_errors_total", + "pageserver_deletion_queue_unexpected_errors_total", + ]: + value = self.http_client().get_metric_value(metric) + assert value == 0, f"Nonzero {metric} == {value}" + def log_contains(self, pattern: str) -> Optional[str]: """Check that the pageserver log contains a line that matches the given regex""" logfile = open(os.path.join(self.workdir, "pageserver.log"), "r") diff --git a/test_runner/regress/test_neon_cli.py b/test_runner/regress/test_neon_cli.py index 1b3984583a36..de18ea0e6b58 100644 --- a/test_runner/regress/test_neon_cli.py +++ b/test_runner/regress/test_neon_cli.py @@ -134,6 +134,9 @@ def test_cli_start_stop(neon_env_builder: NeonEnvBuilder): env.neon_cli.pageserver_stop(env.pageserver.id) env.neon_cli.safekeeper_stop() + # Keep NeonEnv state up to date, it usually owns starting/stopping services + env.pageserver.running = False + # Default start res = env.neon_cli.raw_cli(["start"]) res.check_returncode() @@ -155,6 +158,10 @@ def test_cli_start_stop_multi(neon_env_builder: NeonEnvBuilder): env.neon_cli.pageserver_stop(env.BASE_PAGESERVER_ID) env.neon_cli.pageserver_stop(env.BASE_PAGESERVER_ID + 1) + # Keep NeonEnv state up to date, it usually owns starting/stopping services + env.pageservers[0].running = False + env.pageservers[1].running = False + # Addressing a nonexistent ID throws with pytest.raises(RuntimeError): env.neon_cli.pageserver_stop(env.BASE_PAGESERVER_ID + 100) diff --git a/test_runner/regress/test_pageserver_restart.py b/test_runner/regress/test_pageserver_restart.py index 4c51155236d1..fa1b13153770 100644 --- a/test_runner/regress/test_pageserver_restart.py +++ b/test_runner/regress/test_pageserver_restart.py @@ -20,6 +20,8 @@ def test_pageserver_restart(neon_env_builder: NeonEnvBuilder, generations: bool) endpoint = env.endpoints.create_start("main") pageserver_http = env.pageserver.http_client() + assert pageserver_http.get_metric_value("pageserver_tenant_manager_slots") == 1 + pg_conn = endpoint.connect() cur = pg_conn.cursor() @@ -52,6 +54,9 @@ def test_pageserver_restart(neon_env_builder: NeonEnvBuilder, generations: bool) env.pageserver.stop() env.pageserver.start() + # We reloaded our tenant + assert pageserver_http.get_metric_value("pageserver_tenant_manager_slots") == 1 + cur.execute("SELECT count(*) FROM foo") assert cur.fetchone() == (100000,) diff --git a/test_runner/regress/test_tenant_delete.py b/test_runner/regress/test_tenant_delete.py index ae771970884a..6e35890922fa 100644 --- a/test_runner/regress/test_tenant_delete.py +++ b/test_runner/regress/test_tenant_delete.py @@ -63,6 +63,9 @@ def test_tenant_delete_smoke( conf=MANY_SMALL_LAYERS_TENANT_CONFIG, ) + # Default tenant and the one we created + assert ps_http.get_metric_value("pageserver_tenant_manager_slots") == 2 + # create two timelines one being the parent of another parent = None for timeline in ["first", "second"]: @@ -88,7 +91,9 @@ def test_tenant_delete_smoke( iterations = poll_for_remote_storage_iterations(remote_storage_kind) + assert ps_http.get_metric_value("pageserver_tenant_manager_slots") == 2 tenant_delete_wait_completed(ps_http, tenant_id, iterations) + assert ps_http.get_metric_value("pageserver_tenant_manager_slots") == 1 tenant_path = env.pageserver.tenant_dir(tenant_id) assert not tenant_path.exists() @@ -104,6 +109,9 @@ def test_tenant_delete_smoke( ), ) + # Deletion updates the tenant count: the one default tenant remains + assert ps_http.get_metric_value("pageserver_tenant_manager_slots") == 1 + class Check(enum.Enum): RETRY_WITHOUT_RESTART = enum.auto() diff --git a/test_runner/regress/test_tenant_detach.py b/test_runner/regress/test_tenant_detach.py index 11e8a80e1ddc..03e78dda2b27 100644 --- a/test_runner/regress/test_tenant_detach.py +++ b/test_runner/regress/test_tenant_detach.py @@ -26,6 +26,16 @@ from fixtures.utils import query_scalar, wait_until from prometheus_client.samples import Sample +# In tests that overlap endpoint activity with tenant attach/detach, there are +# a variety of warnings that the page service may emit when it cannot acquire +# an active tenant to serve a request +PERMIT_PAGE_SERVICE_ERRORS = [ + ".*page_service.*Tenant .* not found", + ".*page_service.*Tenant .* is not active", + ".*page_service.*cancelled", + ".*page_service.*will not become active.*", +] + def do_gc_target( pageserver_http: PageserverHttpClient, tenant_id: TenantId, timeline_id: TimelineId @@ -60,12 +70,7 @@ def test_tenant_reattach( # create new nenant tenant_id, timeline_id = env.neon_cli.create_tenant() - # Attempts to connect from compute to pageserver while the tenant is - # temporarily detached produces these errors in the pageserver log. - env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*") - env.pageserver.allowed_errors.append( - f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*" - ) + env.pageserver.allowed_errors.extend(PERMIT_PAGE_SERVICE_ERRORS) with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint: with endpoint.cursor() as cur: @@ -235,10 +240,7 @@ async def reattach_while_busy( # Attempts to connect from compute to pageserver while the tenant is # temporarily detached produces these errors in the pageserver log. - env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*") - env.pageserver.allowed_errors.append( - f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*" - ) + env.pageserver.allowed_errors.extend(PERMIT_PAGE_SERVICE_ERRORS) endpoint = env.endpoints.create_start("main", tenant_id=tenant_id) @@ -259,7 +261,7 @@ def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder): env = neon_env_builder.init_start() pageserver_http = env.pageserver.http_client() - env.pageserver.allowed_errors.append(".*NotFound: Tenant .*") + env.pageserver.allowed_errors.extend(PERMIT_PAGE_SERVICE_ERRORS) # first check for non existing tenant tenant_id = TenantId.generate() @@ -271,19 +273,9 @@ def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder): assert excinfo.value.status_code == 404 - # the error will be printed to the log too - env.pageserver.allowed_errors.append(".*NotFound: tenant *") - # create new nenant tenant_id, timeline_id = env.neon_cli.create_tenant() - # Attempts to connect from compute to pageserver while the tenant is - # temporarily detached produces these errors in the pageserver log. - env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*") - env.pageserver.allowed_errors.append( - f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*" - ) - # assert tenant exists on disk assert env.pageserver.tenant_dir(tenant_id).exists() @@ -345,12 +337,7 @@ def test_tenant_detach_ignored_tenant(neon_simple_env: NeonEnv): # create a new tenant tenant_id, _ = env.neon_cli.create_tenant() - # Attempts to connect from compute to pageserver while the tenant is - # temporarily detached produces these errors in the pageserver log. - env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*") - env.pageserver.allowed_errors.append( - f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*" - ) + env.pageserver.allowed_errors.extend(PERMIT_PAGE_SERVICE_ERRORS) # assert tenant exists on disk assert env.pageserver.tenant_dir(tenant_id).exists() @@ -401,12 +388,7 @@ def test_tenant_detach_regular_tenant(neon_simple_env: NeonEnv): # create a new tenant tenant_id, _ = env.neon_cli.create_tenant() - # Attempts to connect from compute to pageserver while the tenant is - # temporarily detached produces these errors in the pageserver log. - env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*") - env.pageserver.allowed_errors.append( - f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*" - ) + env.pageserver.allowed_errors.extend(PERMIT_PAGE_SERVICE_ERRORS) # assert tenant exists on disk assert env.pageserver.tenant_dir(tenant_id).exists() @@ -453,12 +435,7 @@ def test_detach_while_attaching( tenant_id = env.initial_tenant timeline_id = env.initial_timeline - # Attempts to connect from compute to pageserver while the tenant is - # temporarily detached produces these errors in the pageserver log. - env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*") - env.pageserver.allowed_errors.append( - f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*" - ) + env.pageserver.allowed_errors.extend(PERMIT_PAGE_SERVICE_ERRORS) # Create table, and insert some rows. Make it big enough that it doesn't fit in # shared_buffers, otherwise the SELECT after restart will just return answer @@ -593,12 +570,7 @@ def test_ignored_tenant_download_missing_layers(neon_env_builder: NeonEnvBuilder tenant_id = env.initial_tenant timeline_id = env.initial_timeline - # Attempts to connect from compute to pageserver while the tenant is - # temporarily detached produces these errors in the pageserver log. - env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*") - env.pageserver.allowed_errors.append( - f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*" - ) + env.pageserver.allowed_errors.extend(PERMIT_PAGE_SERVICE_ERRORS) data_id = 1 data_secret = "very secret secret" @@ -649,12 +621,7 @@ def test_load_attach_negatives(neon_env_builder: NeonEnvBuilder): tenant_id = env.initial_tenant - # Attempts to connect from compute to pageserver while the tenant is - # temporarily detached produces these errors in the pageserver log. - env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*") - env.pageserver.allowed_errors.append( - f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*" - ) + env.pageserver.allowed_errors.extend(PERMIT_PAGE_SERVICE_ERRORS) env.pageserver.allowed_errors.append(".*tenant .*? already exists, state:.*") with pytest.raises( @@ -693,12 +660,7 @@ def test_ignore_while_attaching( tenant_id = env.initial_tenant timeline_id = env.initial_timeline - # Attempts to connect from compute to pageserver while the tenant is - # temporarily detached produces these errors in the pageserver log. - env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*") - env.pageserver.allowed_errors.append( - f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*" - ) + env.pageserver.allowed_errors.extend(PERMIT_PAGE_SERVICE_ERRORS) data_id = 1 data_secret = "very secret secret"