Skip to content

Commit

Permalink
pageserver: add InProgress tenant map state, use a sync lock for th…
Browse files Browse the repository at this point in the history
…e map (#5367)

## Problem

Follows on from #5299 
- We didn't have a generic way to protect a tenant undergoing changes:
`Tenant` had states, but for our arbitrary transitions between
secondary/attached, we need a general way to say "reserve this tenant
ID, and don't allow any other ops on it, but don't try and report it as
being in any particular state".
- The TenantsMap structure was behind an async RwLock, but it was never
correct to hold it across await points: that would block any other
changes for all tenants.


## Summary of changes

- Add the `TenantSlot::InProgress` value.  This means:
  - Incoming administrative operations on the tenant should retry later
- Anything trying to read the live state of the tenant (e.g. a page
service reader) should retry later or block.
- Store TenantsMap in `std::sync::RwLock`
- Provide an extended `get_active_tenant_with_timeout` for page_service
to use, which will wait on InProgress slots as well as non-active
tenants.

Closes: #5378

---------

Co-authored-by: Christian Schwarz <christian@neon.tech>
  • Loading branch information
jcsp and problame authored Nov 6, 2023
1 parent e6470ee commit 85cd97a
Show file tree
Hide file tree
Showing 17 changed files with 1,146 additions and 593 deletions.
3 changes: 3 additions & 0 deletions libs/utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ pub mod completion;
/// Reporting utilities
pub mod error;

/// async timeout helper
pub mod timeout;

pub mod sync;

/// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages
Expand Down
37 changes: 37 additions & 0 deletions libs/utils/src/timeout.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use std::time::Duration;

use tokio_util::sync::CancellationToken;

pub enum TimeoutCancellableError {
Timeout,
Cancelled,
}

/// Wrap [`tokio::time::timeout`] with a CancellationToken.
///
/// This wrapper is appropriate for any long running operation in a task
/// that ought to respect a CancellationToken (which means most tasks).
///
/// The only time you should use a bare tokio::timeout is when the future `F`
/// itself respects a CancellationToken: otherwise, always use this wrapper
/// with your CancellationToken to ensure that your task does not hold up
/// graceful shutdown.
pub async fn timeout_cancellable<F>(
duration: Duration,
cancel: &CancellationToken,
future: F,
) -> Result<F::Output, TimeoutCancellableError>
where
F: std::future::Future,
{
tokio::select!(
r = tokio::time::timeout(duration, future) => {
r.map_err(|_| TimeoutCancellableError::Timeout)

},
_ = cancel.cancelled() => {
Err(TimeoutCancellableError::Cancelled)

}
)
}
2 changes: 1 addition & 1 deletion pageserver/src/consumption_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ async fn calculate_synthetic_size_worker(
continue;
}

if let Ok(tenant) = mgr::get_tenant(tenant_id, true).await {
if let Ok(tenant) = mgr::get_tenant(tenant_id, true) {
// TODO should we use concurrent_background_tasks_rate_limit() here, like the other background tasks?
// We can put in some prioritization for consumption metrics.
// Same for the loop that fetches computed metrics.
Expand Down
1 change: 0 additions & 1 deletion pageserver/src/consumption_metrics/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,6 @@ pub(super) async fn collect_all_metrics(
None
} else {
crate::tenant::mgr::get_tenant(id, true)
.await
.ok()
.map(|tenant| (id, tenant))
}
Expand Down
2 changes: 1 addition & 1 deletion pageserver/src/disk_usage_eviction_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ async fn collect_eviction_candidates(
if cancel.is_cancelled() {
return Ok(EvictionCandidates::Cancelled);
}
let tenant = match tenant::mgr::get_tenant(*tenant_id, true).await {
let tenant = match tenant::mgr::get_tenant(*tenant_id, true) {
Ok(tenant) => tenant,
Err(e) => {
// this can happen if tenant has lifecycle transition after we fetched it
Expand Down
73 changes: 54 additions & 19 deletions pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::task_mgr::TaskKind;
use crate::tenant::config::{LocationConf, TenantConfOpt};
use crate::tenant::mgr::{
GetTenantError, SetNewTenantConfigError, TenantMapInsertError, TenantStateError,
GetTenantError, SetNewTenantConfigError, TenantMapError, TenantMapInsertError, TenantSlotError,
TenantSlotUpsertError, TenantStateError,
};
use crate::tenant::size::ModelInputs;
use crate::tenant::storage_layer::LayerAccessStatsReset;
Expand Down Expand Up @@ -146,28 +147,59 @@ impl From<PageReconstructError> for ApiError {
impl From<TenantMapInsertError> for ApiError {
fn from(tmie: TenantMapInsertError) -> ApiError {
match tmie {
TenantMapInsertError::StillInitializing | TenantMapInsertError::ShuttingDown => {
ApiError::ResourceUnavailable(format!("{tmie}").into())
TenantMapInsertError::SlotError(e) => e.into(),
TenantMapInsertError::SlotUpsertError(e) => e.into(),
TenantMapInsertError::Other(e) => ApiError::InternalServerError(e),
}
}
}

impl From<TenantSlotError> for ApiError {
fn from(e: TenantSlotError) -> ApiError {
use TenantSlotError::*;
match e {
NotFound(tenant_id) => {
ApiError::NotFound(anyhow::anyhow!("NotFound: tenant {tenant_id}").into())
}
TenantMapInsertError::TenantAlreadyExists(id, state) => {
ApiError::Conflict(format!("tenant {id} already exists, state: {state:?}"))
e @ (AlreadyExists(_, _) | Conflict(_)) => ApiError::Conflict(format!("{e}")),
InProgress => {
ApiError::ResourceUnavailable("Tenant is being modified concurrently".into())
}
TenantMapInsertError::TenantExistsSecondary(id) => {
ApiError::Conflict(format!("tenant {id} already exists as secondary"))
MapState(e) => e.into(),
}
}
}

impl From<TenantSlotUpsertError> for ApiError {
fn from(e: TenantSlotUpsertError) -> ApiError {
use TenantSlotUpsertError::*;
match e {
InternalError(e) => ApiError::InternalServerError(anyhow::anyhow!("{e}")),
MapState(e) => e.into(),
}
}
}

impl From<TenantMapError> for ApiError {
fn from(e: TenantMapError) -> ApiError {
use TenantMapError::*;
match e {
StillInitializing | ShuttingDown => {
ApiError::ResourceUnavailable(format!("{e}").into())
}
TenantMapInsertError::Other(e) => ApiError::InternalServerError(e),
}
}
}

impl From<TenantStateError> for ApiError {
fn from(tse: TenantStateError) -> ApiError {
match tse {
TenantStateError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid).into()),
TenantStateError::IsStopping(_) => {
ApiError::ResourceUnavailable("Tenant is stopping".into())
}
_ => ApiError::InternalServerError(anyhow::Error::new(tse)),
TenantStateError::SlotError(e) => e.into(),
TenantStateError::SlotUpsertError(e) => e.into(),
TenantStateError::Other(e) => ApiError::InternalServerError(anyhow!(e)),
}
}
}
Expand All @@ -188,6 +220,7 @@ impl From<GetTenantError> for ApiError {
// (We can produce this variant only in `mgr::get_tenant(..., active=true)` calls).
ApiError::ResourceUnavailable("Tenant not yet active".into())
}
GetTenantError::MapState(e) => ApiError::ResourceUnavailable(format!("{e}").into()),
}
}
}
Expand Down Expand Up @@ -242,6 +275,9 @@ impl From<crate::tenant::delete::DeleteTenantError> for ApiError {
Get(g) => ApiError::from(g),
e @ AlreadyInProgress => ApiError::Conflict(e.to_string()),
Timeline(t) => ApiError::from(t),
NotAttached => ApiError::NotFound(anyhow::anyhow!("Tenant is not attached").into()),
SlotError(e) => e.into(),
SlotUpsertError(e) => e.into(),
Other(o) => ApiError::InternalServerError(o),
e @ InvalidState(_) => ApiError::PreconditionFailed(e.to_string().into_boxed_str()),
}
Expand Down Expand Up @@ -368,7 +404,7 @@ async fn timeline_create_handler(
let state = get_state(&request);

async {
let tenant = mgr::get_tenant(tenant_id, true).await?;
let tenant = mgr::get_tenant(tenant_id, true)?;
match tenant.create_timeline(
new_timeline_id,
request_data.ancestor_timeline_id.map(TimelineId::from),
Expand Down Expand Up @@ -418,7 +454,7 @@ async fn timeline_list_handler(
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);

let response_data = async {
let tenant = mgr::get_tenant(tenant_id, true).await?;
let tenant = mgr::get_tenant(tenant_id, true)?;
let timelines = tenant.list_timelines();

let mut response_data = Vec::with_capacity(timelines.len());
Expand Down Expand Up @@ -457,7 +493,7 @@ async fn timeline_detail_handler(
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);

let timeline_info = async {
let tenant = mgr::get_tenant(tenant_id, true).await?;
let tenant = mgr::get_tenant(tenant_id, true)?;

let timeline = tenant
.get_timeline(timeline_id, false)
Expand Down Expand Up @@ -713,7 +749,7 @@ async fn tenant_status(
check_permission(&request, Some(tenant_id))?;

let tenant_info = async {
let tenant = mgr::get_tenant(tenant_id, false).await?;
let tenant = mgr::get_tenant(tenant_id, false)?;

// Calculate total physical size of all timelines
let mut current_physical_size = 0;
Expand Down Expand Up @@ -776,7 +812,7 @@ async fn tenant_size_handler(
let headers = request.headers();

let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let tenant = mgr::get_tenant(tenant_id, true).await?;
let tenant = mgr::get_tenant(tenant_id, true)?;

// this can be long operation
let inputs = tenant
Expand Down Expand Up @@ -1033,7 +1069,7 @@ async fn get_tenant_config_handler(
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;

let tenant = mgr::get_tenant(tenant_id, false).await?;
let tenant = mgr::get_tenant(tenant_id, false)?;

let response = HashMap::from([
(
Expand Down Expand Up @@ -1092,7 +1128,7 @@ async fn put_tenant_location_config_handler(
.await
{
match e {
TenantStateError::NotFound(_) => {
TenantStateError::SlotError(TenantSlotError::NotFound(_)) => {
// This API is idempotent: a NotFound on a detach is fine.
}
_ => return Err(e.into()),
Expand Down Expand Up @@ -1130,7 +1166,6 @@ async fn handle_tenant_break(
let tenant_id: TenantId = parse_request_param(&r, "tenant_id")?;

let tenant = crate::tenant::mgr::get_tenant(tenant_id, true)
.await
.map_err(|_| ApiError::Conflict(String::from("no active tenant found")))?;

tenant.set_broken("broken from test".to_owned()).await;
Expand Down Expand Up @@ -1435,7 +1470,7 @@ async fn active_timeline_of_active_tenant(
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Result<Arc<Timeline>, ApiError> {
let tenant = mgr::get_tenant(tenant_id, true).await?;
let tenant = mgr::get_tenant(tenant_id, true)?;
tenant
.get_timeline(timeline_id, true)
.map_err(|e| ApiError::NotFound(e.into()))
Expand Down
29 changes: 29 additions & 0 deletions pageserver/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -962,6 +962,32 @@ static REMOTE_TIMELINE_CLIENT_BYTES_FINISHED_COUNTER: Lazy<IntCounterVec> = Lazy
.expect("failed to define a metric")
});

pub(crate) struct TenantManagerMetrics {
pub(crate) tenant_slots: UIntGauge,
pub(crate) tenant_slot_writes: IntCounter,
pub(crate) unexpected_errors: IntCounter,
}

pub(crate) static TENANT_MANAGER: Lazy<TenantManagerMetrics> = Lazy::new(|| {
TenantManagerMetrics {
tenant_slots: register_uint_gauge!(
"pageserver_tenant_manager_slots",
"How many slots currently exist, including all attached, secondary and in-progress operations",
)
.expect("failed to define a metric"),
tenant_slot_writes: register_int_counter!(
"pageserver_tenant_manager_slot_writes",
"Writes to a tenant slot, including all of create/attach/detach/delete"
)
.expect("failed to define a metric"),
unexpected_errors: register_int_counter!(
"pageserver_tenant_manager_unexpected_errors_total",
"Number of unexpected conditions encountered: nonzero value indicates a non-fatal bug."
)
.expect("failed to define a metric"),
}
});

pub(crate) struct DeletionQueueMetrics {
pub(crate) keys_submitted: IntCounter,
pub(crate) keys_dropped: IntCounter,
Expand Down Expand Up @@ -1884,6 +1910,9 @@ pub fn preinitialize_metrics() {
// Deletion queue stats
Lazy::force(&DELETION_QUEUE);

// Tenant manager stats
Lazy::force(&TENANT_MANAGER);

// countervecs
[&BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT]
.into_iter()
Expand Down
Loading

1 comment on commit 85cd97a

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2426 tests run: 2304 passed, 0 failed, 122 skipped (full report)


Code coverage (full report)

  • functions: 54.6% (8885 of 16273 functions)
  • lines: 81.7% (51197 of 62655 lines)

The comment gets automatically updated with the latest test results
85cd97a at 2023-11-06T14:56:35.566Z :recycle:

Please sign in to comment.