Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pageserver: add InProgress tenant map state, use a sync lock for the map #5367

Merged
merged 23 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
7d75d1d
pageserver: add InProgress top level state & make TenantsMap lock syn…
jcsp Oct 10, 2023
6671b77
pageserver: return 503 for GET on InProgress tenant
jcsp Oct 27, 2023
cc2e062
pageserver: wait with timeout for InProgress slots in page service
jcsp Oct 31, 2023
a972ba1
tests: tolerate more diverse non-active tenant errors
jcsp Nov 1, 2023
19467ad
pageserver: simplify GetActiveTenantError & add Cancelled case
jcsp Nov 2, 2023
44b0352
Update pageserver/src/tenant/mgr.rs
jcsp Nov 3, 2023
914f2f4
Update pageserver/src/tenant/mgr.rs
jcsp Nov 3, 2023
7c73972
Fix a PR suggestion
jcsp Nov 3, 2023
8d6b95b
Refactor get_active_tenant_with_timeout to use a deadline
jcsp Nov 3, 2023
810404f
Refactor mode args to tenant slot helpers
jcsp Nov 3, 2023
ce82103
Pull timeout_cancellable into utils
jcsp Nov 3, 2023
2f405ef
pageserver: add `pageserver_tenant_manager_` metrics
jcsp Nov 3, 2023
cfaed2a
tests: check unexpected error counters at end of tests
jcsp Nov 3, 2023
bbfef54
clippy
jcsp Nov 3, 2023
1b119d3
refactor match in tenant_map_acquire_slot_impl
jcsp Nov 3, 2023
768da53
Fix CLI tests for assert_no_log_errors
jcsp Nov 3, 2023
130eadb
Make logic more explicit in SlotGuard::drop
jcsp Nov 3, 2023
a7a87b6
Make SlotGuard stricter: refuse to clear a slot if old value isn't shut
jcsp Nov 3, 2023
0cb75d5
s/tenant_guard/slot_guard/
jcsp Nov 3, 2023
cefc99f
Remove a stray file
jcsp Nov 6, 2023
f46cb2c
Merge remote-tracking branch 'upstream/main' into jcsp/secondary-loca…
jcsp Nov 6, 2023
a758ed6
Refactor a match
jcsp Nov 6, 2023
b09891e
De-indent function in a conditional
jcsp Nov 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions libs/utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ pub mod completion;
/// Reporting utilities
pub mod error;

/// async timeout helper
pub mod timeout;

pub mod sync;

/// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages
Expand Down
37 changes: 37 additions & 0 deletions libs/utils/src/timeout.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use std::time::Duration;

use tokio_util::sync::CancellationToken;

pub enum TimeoutCancellableError {
Timeout,
Cancelled,
}

/// Wrap [`tokio::time::timeout`] with a CancellationToken.
///
/// This wrapper is appropriate for any long running operation in a task
/// that ought to respect a CancellationToken (which means most tasks).
///
/// The only time you should use a bare tokio::timeout is when the future `F`
/// itself respects a CancellationToken: otherwise, always use this wrapper
/// with your CancellationToken to ensure that your task does not hold up
/// graceful shutdown.
pub async fn timeout_cancellable<F>(
duration: Duration,
cancel: &CancellationToken,
future: F,
) -> Result<F::Output, TimeoutCancellableError>
where
F: std::future::Future,
{
tokio::select!(
r = tokio::time::timeout(duration, future) => {
r.map_err(|_| TimeoutCancellableError::Timeout)

},
_ = cancel.cancelled() => {
Err(TimeoutCancellableError::Cancelled)

}
)
}
2 changes: 1 addition & 1 deletion pageserver/src/consumption_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ async fn calculate_synthetic_size_worker(
continue;
}

if let Ok(tenant) = mgr::get_tenant(tenant_id, true).await {
if let Ok(tenant) = mgr::get_tenant(tenant_id, true) {
// TODO should we use concurrent_background_tasks_rate_limit() here, like the other background tasks?
// We can put in some prioritization for consumption metrics.
// Same for the loop that fetches computed metrics.
Expand Down
1 change: 0 additions & 1 deletion pageserver/src/consumption_metrics/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ pub(super) async fn collect_all_metrics(
None
} else {
crate::tenant::mgr::get_tenant(id, true)
.await
.ok()
.map(|tenant| (id, tenant))
}
Expand Down
2 changes: 2 additions & 0 deletions pageserver/src/deletion_queue/check.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Checking pageserver v0.1.0 (/home/neon/neon/pageserver)
arpad-m marked this conversation as resolved.
Show resolved Hide resolved
Finished dev [optimized + debuginfo] target(s) in 7.62s
2 changes: 1 addition & 1 deletion pageserver/src/disk_usage_eviction_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ async fn collect_eviction_candidates(
if cancel.is_cancelled() {
return Ok(EvictionCandidates::Cancelled);
}
let tenant = match tenant::mgr::get_tenant(*tenant_id, true).await {
let tenant = match tenant::mgr::get_tenant(*tenant_id, true) {
Ok(tenant) => tenant,
Err(e) => {
// this can happen if tenant has lifecycle transition after we fetched it
Expand Down
74 changes: 55 additions & 19 deletions pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::task_mgr::TaskKind;
use crate::tenant::config::{LocationConf, TenantConfOpt};
use crate::tenant::mgr::{
GetTenantError, SetNewTenantConfigError, TenantMapInsertError, TenantStateError,
GetTenantError, SetNewTenantConfigError, TenantMapError, TenantMapInsertError, TenantSlotError,
TenantSlotUpsertError, TenantStateError,
};
use crate::tenant::size::ModelInputs;
use crate::tenant::storage_layer::LayerAccessStatsReset;
Expand Down Expand Up @@ -147,28 +148,60 @@ impl From<PageReconstructError> for ApiError {
impl From<TenantMapInsertError> for ApiError {
fn from(tmie: TenantMapInsertError) -> ApiError {
match tmie {
TenantMapInsertError::StillInitializing | TenantMapInsertError::ShuttingDown => {
ApiError::ResourceUnavailable(format!("{tmie}").into())
TenantMapInsertError::SlotError(e) => e.into(),
TenantMapInsertError::SlotUpsertError(e) => e.into(),
TenantMapInsertError::Other(e) => ApiError::InternalServerError(e),
}
}
}

impl From<TenantSlotError> for ApiError {
fn from(e: TenantSlotError) -> ApiError {
use TenantSlotError::*;
match e {
NotFound(tenant_id) => {
ApiError::NotFound(anyhow::anyhow!("NotFound: tenant {tenant_id}").into())
}
TenantMapInsertError::TenantAlreadyExists(id, state) => {
ApiError::Conflict(format!("tenant {id} already exists, state: {state:?}"))
e @ AlreadyExists(_, _) => ApiError::Conflict(format!("{e}")),
jcsp marked this conversation as resolved.
Show resolved Hide resolved
e @ Conflict(_) => ApiError::Conflict(format!("{e}")),
InProgress => {
ApiError::ResourceUnavailable("Tenant is being modified concurrently".into())
}
TenantMapInsertError::TenantExistsSecondary(id) => {
ApiError::Conflict(format!("tenant {id} already exists as secondary"))
MapState(e) => e.into(),
}
}
}

impl From<TenantSlotUpsertError> for ApiError {
fn from(e: TenantSlotUpsertError) -> ApiError {
use TenantSlotUpsertError::*;
match e {
InternalError(e) => ApiError::InternalServerError(anyhow::anyhow!("{e}")),
MapState(e) => e.into(),
}
}
}

impl From<TenantMapError> for ApiError {
fn from(e: TenantMapError) -> ApiError {
use TenantMapError::*;
match e {
StillInitializing | ShuttingDown => {
ApiError::ResourceUnavailable(format!("{e}").into())
}
TenantMapInsertError::Other(e) => ApiError::InternalServerError(e),
}
}
}

impl From<TenantStateError> for ApiError {
fn from(tse: TenantStateError) -> ApiError {
match tse {
TenantStateError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid).into()),
TenantStateError::IsStopping(_) => {
ApiError::ResourceUnavailable("Tenant is stopping".into())
}
_ => ApiError::InternalServerError(anyhow::Error::new(tse)),
TenantStateError::SlotError(e) => e.into(),
TenantStateError::SlotUpsertError(e) => e.into(),
TenantStateError::Other(e) => ApiError::InternalServerError(anyhow!(e)),
}
}
}
Expand All @@ -189,6 +222,7 @@ impl From<GetTenantError> for ApiError {
// (We can produce this variant only in `mgr::get_tenant(..., active=true)` calls).
ApiError::ResourceUnavailable("Tenant not yet active".into())
}
GetTenantError::MapState(e) => ApiError::ResourceUnavailable(format!("{e}").into()),
}
}
}
Expand Down Expand Up @@ -243,6 +277,9 @@ impl From<crate::tenant::delete::DeleteTenantError> for ApiError {
Get(g) => ApiError::from(g),
e @ AlreadyInProgress => ApiError::Conflict(e.to_string()),
Timeline(t) => ApiError::from(t),
NotAttached => ApiError::NotFound(anyhow::anyhow!("Tenant is not attached").into()),
SlotError(e) => e.into(),
SlotUpsertError(e) => e.into(),
Other(o) => ApiError::InternalServerError(o),
e @ InvalidState(_) => ApiError::PreconditionFailed(e.to_string().into_boxed_str()),
}
Expand Down Expand Up @@ -369,7 +406,7 @@ async fn timeline_create_handler(
let state = get_state(&request);

async {
let tenant = mgr::get_tenant(tenant_id, true).await?;
let tenant = mgr::get_tenant(tenant_id, true)?;
match tenant.create_timeline(
new_timeline_id,
request_data.ancestor_timeline_id.map(TimelineId::from),
Expand Down Expand Up @@ -416,7 +453,7 @@ async fn timeline_list_handler(
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);

let response_data = async {
let tenant = mgr::get_tenant(tenant_id, true).await?;
let tenant = mgr::get_tenant(tenant_id, true)?;
let timelines = tenant.list_timelines();

let mut response_data = Vec::with_capacity(timelines.len());
Expand Down Expand Up @@ -455,7 +492,7 @@ async fn timeline_detail_handler(
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);

let timeline_info = async {
let tenant = mgr::get_tenant(tenant_id, true).await?;
let tenant = mgr::get_tenant(tenant_id, true)?;

let timeline = tenant
.get_timeline(timeline_id, false)
Expand Down Expand Up @@ -713,7 +750,7 @@ async fn tenant_status(
check_permission(&request, Some(tenant_id))?;

let tenant_info = async {
let tenant = mgr::get_tenant(tenant_id, false).await?;
let tenant = mgr::get_tenant(tenant_id, false)?;

// Calculate total physical size of all timelines
let mut current_physical_size = 0;
Expand Down Expand Up @@ -776,7 +813,7 @@ async fn tenant_size_handler(
let headers = request.headers();

let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let tenant = mgr::get_tenant(tenant_id, true).await?;
let tenant = mgr::get_tenant(tenant_id, true)?;

// this can be long operation
let inputs = tenant
Expand Down Expand Up @@ -1035,7 +1072,7 @@ async fn get_tenant_config_handler(
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;

let tenant = mgr::get_tenant(tenant_id, false).await?;
let tenant = mgr::get_tenant(tenant_id, false)?;

let response = HashMap::from([
(
Expand Down Expand Up @@ -1094,7 +1131,7 @@ async fn put_tenant_location_config_handler(
.await
{
match e {
TenantStateError::NotFound(_) => {
TenantStateError::SlotError(TenantSlotError::NotFound(_)) => {
// This API is idempotent: a NotFound on a detach is fine.
}
_ => return Err(e.into()),
Expand Down Expand Up @@ -1132,7 +1169,6 @@ async fn handle_tenant_break(
let tenant_id: TenantId = parse_request_param(&r, "tenant_id")?;

let tenant = crate::tenant::mgr::get_tenant(tenant_id, true)
.await
.map_err(|_| ApiError::Conflict(String::from("no active tenant found")))?;

tenant.set_broken("broken from test".to_owned()).await;
Expand Down Expand Up @@ -1437,7 +1473,7 @@ async fn active_timeline_of_active_tenant(
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Result<Arc<Timeline>, ApiError> {
let tenant = mgr::get_tenant(tenant_id, true).await?;
let tenant = mgr::get_tenant(tenant_id, true)?;
tenant
.get_timeline(timeline_id, true)
.map_err(|e| ApiError::NotFound(e.into()))
Expand Down
29 changes: 29 additions & 0 deletions pageserver/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -962,6 +962,32 @@ static REMOTE_TIMELINE_CLIENT_BYTES_FINISHED_COUNTER: Lazy<IntCounterVec> = Lazy
.expect("failed to define a metric")
});

pub(crate) struct TenantManagerMetrics {
pub(crate) tenant_slots: UIntGauge,
pub(crate) tenant_slot_writes: IntCounter,
pub(crate) unexpected_errors: IntCounter,
}

pub(crate) static TENANT_MANAGER: Lazy<TenantManagerMetrics> = Lazy::new(|| {
TenantManagerMetrics {
tenant_slots: register_uint_gauge!(
"pageserver_tenant_manager_slots",
"How many slots currently exist, including all attached, secondary and in-progress operations",
)
.expect("failed to define a metric"),
tenant_slot_writes: register_int_counter!(
"pageserver_tenant_manager_slot_writes",
"Writes to a tenant slot, including all of create/attach/detach/delete"
)
.expect("failed to define a metric"),
unexpected_errors: register_int_counter!(
"pageserver_tenant_manager_unexpected_errors_total",
"Number of unexpected conditions encountered: nonzero value indicates a non-fatal bug."
)
.expect("failed to define a metric"),
}
});

pub(crate) struct DeletionQueueMetrics {
pub(crate) keys_submitted: IntCounter,
pub(crate) keys_dropped: IntCounter,
Expand Down Expand Up @@ -1884,6 +1910,9 @@ pub fn preinitialize_metrics() {
// Deletion queue stats
Lazy::force(&DELETION_QUEUE);

// Tenant manager stats
Lazy::force(&TENANT_MANAGER);

// countervecs
[&BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT]
.into_iter()
Expand Down
Loading