Skip to content

Commit

Permalink
pageserver: implement cutover behaviors in RemoteTimelineclient
Browse files Browse the repository at this point in the history
  • Loading branch information
jcsp committed Dec 11, 2023
1 parent 0ba4cae commit ecc0368
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 19 deletions.
15 changes: 14 additions & 1 deletion pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1214,6 +1214,7 @@ impl Tenant {
self.tenant_shard_id,
timeline_id,
self.generation,
&self.tenant_conf.read().unwrap().location,
);
let cancel_clone = cancel.clone();
part_downloads.spawn(
Expand Down Expand Up @@ -1686,6 +1687,10 @@ impl Tenant {
{
let conf = self.tenant_conf.read().unwrap();

// If we may not delete layers, then simply skip GC. Even though a tenant
// in AttachedMulti state could do GC and just enqueue the blocked deletions,
// the only advantage to doing it is to perhaps shrink the LayerMap metadata
// a bit sooner than we would achieve by waiting for AttachedSingle status.
if !conf.location.may_delete_layers_hint() {
info!("Skipping GC in location state {:?}", conf.location);
return Ok(GcResult::default());
Expand All @@ -1712,7 +1717,14 @@ impl Tenant {

{
let conf = self.tenant_conf.read().unwrap();
if !conf.location.may_delete_layers_hint() || !conf.location.may_upload_layers_hint() {

// Note that compaction usually requires deletions, but we don't respect
// may_delete_layers_hint here: that is because tenants in AttachedMulti
// should proceed with compaction even if they can't do deletion, to avoid
// accumulating dangerously deep stacks of L0 layers. Deletions will be
// enqueued inside RemoteTimelineClient, and executed layer if/when we transition
// to AttachedSingle state.
if !conf.location.may_upload_layers_hint() {
info!("Skipping compaction in location state {:?}", conf.location);
return Ok(());
}
Expand Down Expand Up @@ -3125,6 +3137,7 @@ impl Tenant {
self.tenant_shard_id,
timeline_id,
self.generation,
&self.tenant_conf.read().unwrap().location,
);
Some(remote_client)
} else {
Expand Down
113 changes: 100 additions & 13 deletions pageserver/src/tenant/remote_timeline_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ use utils::id::{TenantId, TimelineId};

use self::index::IndexPart;

use super::config::AttachedLocationConfig;
use super::storage_layer::{Layer, LayerFileName, ResidentLayer};
use super::upload_queue::SetDeletedFlagProgress;
use super::Generation;
Expand Down Expand Up @@ -281,6 +282,37 @@ pub enum PersistIndexPartWithDeletedFlagError {
Other(#[from] anyhow::Error),
}

/// Behavioral modes that enable seamless live migration.
///
/// See docs/rfcs/028-pageserver-migration.md to understand how these fit in.
struct RemoteTimelineClientConfig {
/// If this is false, then update to remote_consistent_lsn are dropped rather
/// than being submitted to DeletionQueue for validation. This behavior is
/// used when a tenant attachment is known to have a stale generation number,
/// such that validation attempts will always fail. This is not necessary
/// for correctness, but avoids spamming error statistics with failed validations
/// when doing migrations of tenants.
process_remote_consistent_lsn_updates: bool,

/// If this is true, then object deletions are held in a buffer in RemoteTimelineClient
/// rather than being submitted to the DeletionQueue. This behavior is used when a tenant
/// is known to be multi-attached, in order to avoid disrupting other attached tenants
/// whose generations' metadata refers to the deleted objects.
block_deletions: bool,
}

/// RemoteTimelineClientConfig's state is entirely driven by LocationConf, but we do
/// not carry the entire LocationConf structure: it's much more than we need. The From
/// impl extracts the subset of the LocationConf that is interesting to RemoteTimelineClient.
impl From<&AttachedLocationConfig> for RemoteTimelineClientConfig {
fn from(lc: &AttachedLocationConfig) -> Self {
Self {
block_deletions: !lc.may_delete_layers_hint(),
process_remote_consistent_lsn_updates: lc.may_upload_layers_hint(),
}
}
}

/// A client for accessing a timeline's data in remote storage.
///
/// This takes care of managing the number of connections, and balancing them
Expand All @@ -300,7 +332,7 @@ pub enum PersistIndexPartWithDeletedFlagError {
/// in the index part file, whenever timeline metadata is uploaded.
///
/// Downloads are not queued, they are performed immediately.
pub struct RemoteTimelineClient {
pub(crate) struct RemoteTimelineClient {
conf: &'static PageServerConf,

runtime: tokio::runtime::Handle,
Expand All @@ -316,6 +348,8 @@ pub struct RemoteTimelineClient {
storage_impl: GenericRemoteStorage,

deletion_queue_client: DeletionQueueClient,

config: std::sync::RwLock<RemoteTimelineClientConfig>,
}

impl RemoteTimelineClient {
Expand All @@ -325,13 +359,14 @@ impl RemoteTimelineClient {
/// Note: the caller must initialize the upload queue before any uploads can be scheduled,
/// by calling init_upload_queue.
///
pub fn new(
pub(crate) fn new(
remote_storage: GenericRemoteStorage,
deletion_queue_client: DeletionQueueClient,
conf: &'static PageServerConf,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
generation: Generation,
location_conf: &AttachedLocationConfig,
) -> RemoteTimelineClient {
RemoteTimelineClient {
conf,
Expand All @@ -351,6 +386,7 @@ impl RemoteTimelineClient {
&tenant_shard_id,
&timeline_id,
)),
config: std::sync::RwLock::new(RemoteTimelineClientConfig::from(location_conf)),
}
}

Expand Down Expand Up @@ -410,6 +446,34 @@ impl RemoteTimelineClient {
Ok(())
}

pub(super) fn update_config(&self, location_conf: &AttachedLocationConfig) {
let new_conf = RemoteTimelineClientConfig::from(location_conf);
if !new_conf.block_deletions {
// If we may now delete layers, drain any that were blocked in our old
// configuration state
let mut queue_locked = self.upload_queue.lock().unwrap();
if let Ok(queue) = queue_locked.initialized_mut() {
let blocked_deletions = std::mem::take(&mut queue.blocked_deletions);
for d in blocked_deletions {
if let Err(e) = self.deletion_queue_client.push_layers_sync(
self.tenant_shard_id,
self.timeline_id,
self.generation,
d.layers,
) {
// This could happen if the pageserver is shut down while a tenant
// is transitioning from a deletion-blocked state: we will leak some
// S3 objects in this case.
warn!("Failed to drain blocked deletions: {}", e);
break;
}
}
}
}

*self.config.write().unwrap() = new_conf;
}

pub fn remote_consistent_lsn_projected(&self) -> Option<Lsn> {
match &mut *self.upload_queue.lock().unwrap() {
UploadQueue::Uninitialized => None,
Expand Down Expand Up @@ -1326,16 +1390,24 @@ impl RemoteTimelineClient {
res
}
UploadOp::Delete(delete) => {
pausable_failpoint!("before-delete-layer-pausable");
self.deletion_queue_client
.push_layers(
self.tenant_shard_id,
self.timeline_id,
self.generation,
delete.layers.clone(),
)
.await
.map_err(|e| anyhow::anyhow!(e))
if self.config.read().unwrap().block_deletions {
let mut queue_locked = self.upload_queue.lock().unwrap();
if let Ok(queue) = queue_locked.initialized_mut() {
queue.blocked_deletions.push(delete.clone());
}
Ok(())
} else {
pausable_failpoint!("before-delete-layer-pausable");
self.deletion_queue_client
.push_layers(
self.tenant_shard_id,
self.timeline_id,
self.generation,
delete.layers.clone(),
)
.await
.map_err(|e| anyhow::anyhow!(e))
}
}
unexpected @ UploadOp::Barrier(_) | unexpected @ UploadOp::Shutdown => {
// unreachable. Barrier operations are handled synchronously in
Expand Down Expand Up @@ -1426,8 +1498,16 @@ impl RemoteTimelineClient {
// Legacy mode: skip validating generation
upload_queue.visible_remote_consistent_lsn.store(lsn);
None
} else {
} else if self
.config
.read()
.unwrap()
.process_remote_consistent_lsn_updates
{
Some((lsn, upload_queue.visible_remote_consistent_lsn.clone()))
} else {
// Our config disables remote_consistent_lsn updates: drop it.
None
}
}
UploadOp::Delete(_) => {
Expand Down Expand Up @@ -1559,6 +1639,7 @@ impl RemoteTimelineClient {
queued_operations: VecDeque::default(),
#[cfg(feature = "testing")]
dangling_files: HashMap::default(),
blocked_deletions: Vec::new(),
shutting_down: false,
shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
};
Expand Down Expand Up @@ -1705,6 +1786,7 @@ mod tests {
use crate::{
context::RequestContext,
tenant::{
config::AttachmentMode,
harness::{TenantHarness, TIMELINE_ID},
storage_layer::Layer,
Generation, Tenant, Timeline,
Expand Down Expand Up @@ -1791,6 +1873,10 @@ mod tests {

/// Construct a RemoteTimelineClient in an arbitrary generation
fn build_client(&self, generation: Generation) -> Arc<RemoteTimelineClient> {
let location_conf = AttachedLocationConfig {
generation,
attach_mode: AttachmentMode::Single,
};
Arc::new(RemoteTimelineClient {
conf: self.harness.conf,
runtime: tokio::runtime::Handle::current(),
Expand All @@ -1804,6 +1890,7 @@ mod tests {
&self.harness.tenant_shard_id,
&TIMELINE_ID,
)),
config: std::sync::RwLock::new(RemoteTimelineClientConfig::from(&location_conf)),
})
}

Expand Down
14 changes: 11 additions & 3 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ fn drop_wlock<T>(rlock: tokio::sync::RwLockWriteGuard<'_, T>) {

/// The outward-facing resources required to build a Timeline
pub struct TimelineResources {
pub remote_client: Option<RemoteTimelineClient>,
pub(crate) remote_client: Option<RemoteTimelineClient>,
pub deletion_queue_client: DeletionQueueClient,
}

Expand Down Expand Up @@ -212,7 +212,7 @@ pub struct Timeline {

/// Remote storage client.
/// See [`remote_timeline_client`](super::remote_timeline_client) module comment for details.
pub remote_client: Option<Arc<RemoteTimelineClient>>,
pub(crate) remote_client: Option<Arc<RemoteTimelineClient>>,

// What page versions do we hold in the repository? If we get a
// request > last_record_lsn, we need to wait until we receive all
Expand Down Expand Up @@ -1309,8 +1309,9 @@ impl Timeline {

// The threshold is embedded in the metric. So, we need to update it.
{
let config_read = self.tenant_conf.read().unwrap();
let new_threshold = Self::get_evictions_low_residence_duration_metric_threshold(
&self.tenant_conf.read().unwrap().tenant_conf,
&config_read.tenant_conf,
&self.conf.default_tenant_conf,
);

Expand All @@ -1319,6 +1320,13 @@ impl Timeline {
let tenant_id_str = self.tenant_shard_id.tenant_id.to_string();

let timeline_id_str = self.timeline_id.to_string();

if let Some(remote_client) = &self.remote_client {
remote_client.update_config(&config_read.location);
}

drop(config_read);

self.metrics
.evictions_with_low_residence_duration
.write()
Expand Down
2 changes: 1 addition & 1 deletion pageserver/src/tenant/timeline/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ impl DeleteTimelineFlow {
/// Shortcut to create Timeline in stopping state and spawn deletion task.
/// See corresponding parts of [`crate::tenant::delete::DeleteTenantFlow`]
#[instrument(skip_all, fields(%timeline_id))]
pub async fn resume_deletion(
pub(crate) async fn resume_deletion(
tenant: Arc<Tenant>,
timeline_id: TimelineId,
local_metadata: &TimelineMetadata,
Expand Down
7 changes: 6 additions & 1 deletion pageserver/src/tenant/upload_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ pub(crate) struct UploadQueueInitialized {
#[cfg(feature = "testing")]
pub(crate) dangling_files: HashMap<LayerFileName, Generation>,

/// Deletions that are blocked by the tenant configuration
pub(crate) blocked_deletions: Vec<Delete>,

/// Set to true when we have inserted the `UploadOp::Shutdown` into the `inprogress_tasks`.
pub(crate) shutting_down: bool,

Expand Down Expand Up @@ -156,6 +159,7 @@ impl UploadQueue {
queued_operations: VecDeque::new(),
#[cfg(feature = "testing")]
dangling_files: HashMap::new(),
blocked_deletions: Vec::new(),
shutting_down: false,
shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
};
Expand Down Expand Up @@ -205,6 +209,7 @@ impl UploadQueue {
queued_operations: VecDeque::new(),
#[cfg(feature = "testing")]
dangling_files: HashMap::new(),
blocked_deletions: Vec::new(),
shutting_down: false,
shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
};
Expand Down Expand Up @@ -250,7 +255,7 @@ pub(crate) struct UploadTask {

/// A deletion of some layers within the lifetime of a timeline. This is not used
/// for timeline deletion, which skips this queue and goes directly to DeletionQueue.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub(crate) struct Delete {
pub(crate) layers: Vec<(LayerFileName, LayerFileMetadata)>,
}
Expand Down

0 comments on commit ecc0368

Please sign in to comment.