Skip to content

Commit

Permalink
pageserver: wait with timeout for InProgress slots in page service
Browse files Browse the repository at this point in the history
  • Loading branch information
jcsp committed Nov 1, 2023
1 parent 60126f6 commit 15237c7
Show file tree
Hide file tree
Showing 2 changed files with 183 additions and 81 deletions.
120 changes: 39 additions & 81 deletions pageserver/src/page_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use async_compression::tokio::write::GzipEncoder;
use bytes::Buf;
use bytes::Bytes;
use futures::Stream;
use pageserver_api::models::TenantState;
use pageserver_api::models::{
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
Expand Down Expand Up @@ -55,16 +54,20 @@ use crate::metrics;
use crate::metrics::LIVE_CONNECTIONS_COUNT;
use crate::task_mgr;
use crate::task_mgr::TaskKind;
use crate::tenant;
use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::tenant::mgr;
use crate::tenant::mgr::GetTenantError;
use crate::tenant::{Tenant, Timeline};
use crate::tenant::mgr::get_active_tenant_with_timeout;
use crate::tenant::mgr::GetActiveTenantError;
use crate::tenant::Timeline;
use crate::trace::Tracer;

use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
use postgres_ffi::BLCKSZ;

// How long we may block waiting for a [`TenantSlot::InProgress`]` and/or a [`Tenant`] which
// is not yet in state [`TenantState::Active`].
const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(5000);

/// Read the end of a tar archive.
///
/// A tar archive normally ends with two consecutive blocks of zeros, 512 bytes each.
Expand Down Expand Up @@ -389,7 +392,9 @@ impl PageServerHandler {
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));

// Make request tracer if needed
let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
let tenant =
mgr::get_active_tenant_with_timeout(tenant_id, ACTIVE_TENANT_TIMEOUT, &self.cancel)
.await?;
let mut tracer = if tenant.get_trace_read_requests() {
let connection_id = ConnectionId::generate();
let path = tenant
Expand Down Expand Up @@ -525,7 +530,8 @@ impl PageServerHandler {
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
// Create empty timeline
info!("creating new timeline");
let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
let tenant =
get_active_tenant_with_timeout(tenant_id, ACTIVE_TENANT_TIMEOUT, &self.cancel).await?;
let timeline = tenant
.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)
.await?;
Expand Down Expand Up @@ -584,7 +590,9 @@ impl PageServerHandler {
debug_assert_current_span_has_tenant_and_timeline_id();
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));

let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?;
let timeline = self
.get_active_tenant_timeline(tenant_id, timeline_id)
.await?;
let last_record_lsn = timeline.get_last_record_lsn();
if last_record_lsn != start_lsn {
return Err(QueryError::Other(
Expand Down Expand Up @@ -792,7 +800,9 @@ impl PageServerHandler {
let started = std::time::Instant::now();

// check that the timeline exists
let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?;
let timeline = self
.get_active_tenant_timeline(tenant_id, timeline_id)
.await?;
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
if let Some(lsn) = lsn {
// Backup was requested at a particular LSN. Wait for it to arrive.
Expand Down Expand Up @@ -891,6 +901,21 @@ impl PageServerHandler {
.expect("claims presence already checked");
check_permission(claims, tenant_id)
}

/// Shorthand for getting a reference to a Timeline of an Active tenant.
async fn get_active_tenant_timeline(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Result<Arc<Timeline>, GetActiveTimelineError> {
let tenant = get_active_tenant_with_timeout(tenant_id, ACTIVE_TENANT_TIMEOUT, &self.cancel)
.await
.map_err(GetActiveTimelineError::Tenant)?;
let timeline = tenant
.get_timeline(timeline_id, true)
.map_err(|e| GetActiveTimelineError::Timeline(anyhow::anyhow!(e)))?;
Ok(timeline)
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -1048,7 +1073,9 @@ where
.record("timeline_id", field::display(timeline_id));

self.check_permission(Some(tenant_id))?;
let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?;
let timeline = self
.get_active_tenant_timeline(tenant_id, timeline_id)
.await?;

let end_of_timeline = timeline.get_last_record_rlsn();

Expand Down Expand Up @@ -1232,7 +1259,9 @@ where

self.check_permission(Some(tenant_id))?;

let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
let tenant =
get_active_tenant_with_timeout(tenant_id, ACTIVE_TENANT_TIMEOUT, &self.cancel)
.await?;
pgb.write_message_noflush(&BeMessage::RowDescription(&[
RowDescriptor::int8_col(b"checkpoint_distance"),
RowDescriptor::int8_col(b"checkpoint_timeout"),
Expand Down Expand Up @@ -1278,21 +1307,6 @@ where
}
}

#[derive(thiserror::Error, Debug)]
enum GetActiveTenantError {
#[error(
"Timed out waiting {wait_time:?} for tenant active state. Latest state: {latest_state:?}"
)]
WaitForActiveTimeout {
latest_state: TenantState,
wait_time: Duration,
},
#[error(transparent)]
NotFound(GetTenantError),
#[error(transparent)]
WaitTenantActive(tenant::WaitToBecomeActiveError),
}

impl From<GetActiveTenantError> for QueryError {
fn from(e: GetActiveTenantError) -> Self {
match e {
Expand All @@ -1305,47 +1319,6 @@ impl From<GetActiveTenantError> for QueryError {
}
}

/// Get active tenant.
///
/// If the tenant is Loading, waits for it to become Active, for up to 30 s. That
/// ensures that queries don't fail immediately after pageserver startup, because
/// all tenants are still loading.
async fn get_active_tenant_with_timeout(
tenant_id: TenantId,
_ctx: &RequestContext, /* require get a context to support cancellation in the future */
) -> Result<Arc<Tenant>, GetActiveTenantError> {
let tenant = match mgr::get_tenant(tenant_id, false) {
Ok(tenant) => tenant,
Err(e @ GetTenantError::NotFound(_)) => return Err(GetActiveTenantError::NotFound(e)),
Err(GetTenantError::NotActive(_)) => {
unreachable!("we're calling get_tenant with active_only=false")
}
Err(GetTenantError::Broken(_)) => {
unreachable!("we're calling get_tenant with active_only=false")
}
Err(GetTenantError::MapState(_)) => {
unreachable!("TenantManager is initialized before page service starts")
}
};
let wait_time = Duration::from_secs(30);
match tokio::time::timeout(wait_time, tenant.wait_to_become_active()).await {
Ok(Ok(())) => Ok(tenant),
// no .context(), the error message is good enough and some tests depend on it
Ok(Err(e)) => Err(GetActiveTenantError::WaitTenantActive(e)),
Err(_) => {
let latest_state = tenant.current_state();
if latest_state == TenantState::Active {
Ok(tenant)
} else {
Err(GetActiveTenantError::WaitForActiveTimeout {
latest_state,
wait_time,
})
}
}
}
}

#[derive(Debug, thiserror::Error)]
enum GetActiveTimelineError {
#[error(transparent)]
Expand All @@ -1362,18 +1335,3 @@ impl From<GetActiveTimelineError> for QueryError {
}
}
}

/// Shorthand for getting a reference to a Timeline of an Active tenant.
async fn get_active_tenant_timeline(
tenant_id: TenantId,
timeline_id: TimelineId,
ctx: &RequestContext,
) -> Result<Arc<Timeline>, GetActiveTimelineError> {
let tenant = get_active_tenant_with_timeout(tenant_id, ctx)
.await
.map_err(GetActiveTimelineError::Tenant)?;
let timeline = tenant
.get_timeline(timeline_id, true)
.map_err(|e| GetActiveTimelineError::Timeline(anyhow::anyhow!(e)))?;
Ok(timeline)
}
144 changes: 144 additions & 0 deletions pageserver/src/tenant/mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::borrow::Cow;
use std::collections::HashMap;
use std::ops::Deref;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::fs;

use anyhow::Context;
Expand Down Expand Up @@ -912,6 +913,149 @@ pub(crate) fn get_tenant(
}
}

#[derive(thiserror::Error, Debug)]
pub(crate) enum GetActiveTenantError {
#[error(
"Timed out waiting {wait_time:?} for tenant active state. Latest state: {latest_state:?}"
)]
WaitForActiveTimeout {
latest_state: TenantState,
wait_time: Duration,
},
#[error(transparent)]
NotFound(#[from] GetTenantError),
#[error(transparent)]
WaitTenantActive(crate::tenant::WaitToBecomeActiveError),
}

enum TimeoutCancellableError {
Timeout,
Cancelled,
}

/// Wrap [`tokio::time::timeout`] with a CancellationToken.
async fn timeout_cancellable<F>(
duration: Duration,
future: F,
cancel: &CancellationToken,
) -> Result<F::Output, TimeoutCancellableError>
where
F: std::future::Future,
{
tokio::select!(
r = tokio::time::timeout(duration, future) => {
r.map_err(|_| TimeoutCancellableError::Timeout)

},
_ = cancel.cancelled() => {
Err(TimeoutCancellableError::Cancelled)

}
)
}

/// Get a [`Tenant`] in its active state. If the tenant_id is currently in [`TenantSlot::InProgress`]
/// state, then wait for up to `timeout`. If the [`Tenant`] is not currently in [`TenantState::Active`],
/// then wait for up to `timeout` (minus however long we waited for the slot).
pub(crate) async fn get_active_tenant_with_timeout(
tenant_id: TenantId,
mut timeout: Duration,
cancel: &CancellationToken,
) -> Result<Arc<Tenant>, GetActiveTenantError> {
enum WaitFor {
Barrier(utils::completion::Barrier),
Tenant(Arc<Tenant>),
}

let wait_for = {
let locked = TENANTS.read().unwrap();
let peek_slot =
tenant_map_peek_slot(&locked, &tenant_id, true).map_err(GetTenantError::MapState)?;
match peek_slot {
Some(TenantSlot::Attached(tenant)) => {
match tenant.current_state() {
TenantState::Active => {
// Fast path: we don't need to do any async waiting.
return Ok(tenant.clone());
}
_ => WaitFor::Tenant(tenant.clone()),
}
}
Some(TenantSlot::Secondary) => {
return Err(GetActiveTenantError::NotFound(GetTenantError::NotActive(
tenant_id,
)))
}
Some(TenantSlot::InProgress(barrier)) => WaitFor::Barrier(barrier.clone()),
None => {
return Err(GetActiveTenantError::NotFound(GetTenantError::NotFound(
tenant_id,
)))
}
}
};

let tenant = match wait_for {
WaitFor::Barrier(barrier) => {
tracing::debug!("Waiting for tenant InProgress state to pass...");
let wait_start = Instant::now();
timeout_cancellable(timeout, barrier.wait(), cancel)
.await
.map_err(|e| match e {
TimeoutCancellableError::Timeout => {
GetActiveTenantError::WaitForActiveTimeout {
latest_state: TenantState::Loading,
wait_time: wait_start.elapsed(),
}
}
TimeoutCancellableError::Cancelled => {
GetActiveTenantError::NotFound(GetTenantError::NotFound(tenant_id))
}
})?;
let wait_duration = Instant::now().duration_since(wait_start);
timeout -= wait_duration;
{
let locked = TENANTS.read().unwrap();
let peek_slot = tenant_map_peek_slot(&locked, &tenant_id, true)
.map_err(GetTenantError::MapState)?;
match peek_slot {
Some(TenantSlot::Attached(tenant)) => tenant.clone(),
_ => {
return Err(GetActiveTenantError::NotFound(GetTenantError::NotActive(
tenant_id,
)))
}
}
}
}
WaitFor::Tenant(tenant) => tenant,
};

tracing::debug!("Waiting for tenant to enter active state...");
match timeout_cancellable(timeout, tenant.wait_to_become_active(), cancel).await {
Ok(Ok(())) => Ok(tenant),
// no .context(), the error message is good enough and some tests depend on it
Ok(Err(e)) => Err(GetActiveTenantError::WaitTenantActive(e)),
Err(TimeoutCancellableError::Timeout) => {
let latest_state = tenant.current_state();
if latest_state == TenantState::Active {
Ok(tenant)
} else {
Err(GetActiveTenantError::WaitForActiveTimeout {
latest_state,
wait_time: timeout,
})
}
}
Err(TimeoutCancellableError::Cancelled) => {
Err(GetActiveTenantError::WaitForActiveTimeout {
latest_state: TenantState::Loading,
wait_time: timeout,
})
}
}
}

pub(crate) async fn delete_tenant(
conf: &'static PageServerConf,
remote_storage: Option<GenericRemoteStorage>,
Expand Down

0 comments on commit 15237c7

Please sign in to comment.