Skip to content

Commit

Permalink
refactor(scrubber): add unified command suitable for cron job (#8635)
Browse files Browse the repository at this point in the history
Part of #8128.

## Description

This PR creates a unified command to run both physical gc and metadata
health check as a cron job. This also enables us to add additional tasks
to the cron job in the future.

Signed-off-by: Yuchen Liang <yuchen@neon.tech>
  • Loading branch information
yliang412 authored and VladLazar committed Aug 20, 2024
1 parent 0765070 commit 6c855a5
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 83 deletions.
240 changes: 167 additions & 73 deletions storage_scrubber/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ use camino::Utf8PathBuf;
use pageserver_api::controller_api::{MetadataHealthUpdateRequest, MetadataHealthUpdateResponse};
use pageserver_api::shard::TenantShardId;
use reqwest::{Method, Url};
use storage_controller_client::control_api;
use storage_scrubber::garbage::{find_garbage, purge_garbage, PurgeMode};
use storage_scrubber::pageserver_physical_gc::GcMode;
use storage_scrubber::scan_pageserver_metadata::scan_metadata;
use storage_scrubber::scan_pageserver_metadata::scan_pageserver_metadata;
use storage_scrubber::tenant_snapshot::SnapshotDownloader;
use storage_scrubber::{find_large_objects, ControllerClientConfig};
use storage_scrubber::{
Expand Down Expand Up @@ -68,7 +69,7 @@ enum Command {
#[arg(long = "tenant-id", num_args = 0..)]
tenant_ids: Vec<TenantShardId>,
#[arg(long = "post", default_value_t = false)]
post_to_storage_controller: bool,
post_to_storcon: bool,
#[arg(long, default_value = None)]
/// For safekeeper node_kind only, points to db with debug dump
dump_db_connstr: Option<String>,
Expand Down Expand Up @@ -100,6 +101,16 @@ enum Command {
#[arg(long = "concurrency", short = 'j', default_value_t = 64)]
concurrency: usize,
},
CronJob {
// PageserverPhysicalGc
#[arg(long = "min-age")]
gc_min_age: humantime::Duration,
#[arg(short, long, default_value_t = GcMode::IndicesOnly)]
gc_mode: GcMode,
// ScanMetadata
#[arg(long = "post", default_value_t = false)]
post_to_storcon: bool,
},
}

#[tokio::main]
Expand All @@ -117,6 +128,7 @@ async fn main() -> anyhow::Result<()> {
Command::TenantSnapshot { .. } => "tenant-snapshot",
Command::PageserverPhysicalGc { .. } => "pageserver-physical-gc",
Command::FindLargeObjects { .. } => "find-large-objects",
Command::CronJob { .. } => "cron-job",
};
let _guard = init_logging(&format!(
"{}_{}_{}_{}.log",
Expand All @@ -126,20 +138,21 @@ async fn main() -> anyhow::Result<()> {
chrono::Utc::now().format("%Y_%m_%d__%H_%M_%S")
));

let controller_client_conf = cli.controller_api.map(|controller_api| {
let controller_client = cli.controller_api.map(|controller_api| {
ControllerClientConfig {
controller_api,
// Default to no key: this is a convenience when working in a development environment
controller_jwt: cli.controller_jwt.unwrap_or("".to_owned()),
}
.build_client()
});

match cli.command {
Command::ScanMetadata {
json,
tenant_ids,
node_kind,
post_to_storage_controller,
post_to_storcon,
dump_db_connstr,
dump_db_table,
} => {
Expand Down Expand Up @@ -178,53 +191,14 @@ async fn main() -> anyhow::Result<()> {
}
Ok(())
} else {
if controller_client_conf.is_none() && post_to_storage_controller {
return Err(anyhow!("Posting pageserver scan health status to storage controller requires `--controller-api` and `--controller-jwt` to run"));
}
match scan_metadata(bucket_config.clone(), tenant_ids).await {
Err(e) => {
tracing::error!("Failed: {e}");
Err(e)
}
Ok(summary) => {
if json {
println!("{}", serde_json::to_string(&summary).unwrap())
} else {
println!("{}", summary.summary_string());
}

if post_to_storage_controller {
if let Some(conf) = controller_client_conf {
let controller_client = conf.build_client();
let body = summary.build_health_update_request();
controller_client
.dispatch::<MetadataHealthUpdateRequest, MetadataHealthUpdateResponse>(
Method::POST,
"control/v1/metadata_health/update".to_string(),
Some(body),
)
.await?;
}
}

if summary.is_fatal() {
tracing::error!("Fatal scrub errors detected");
} else if summary.is_empty() {
// Strictly speaking an empty bucket is a valid bucket, but if someone ran the
// scrubber they were likely expecting to scan something, and if we see no timelines
// at all then it's likely due to some configuration issues like a bad prefix
tracing::error!(
"No timelines found in bucket {} prefix {}",
bucket_config.bucket,
bucket_config
.prefix_in_bucket
.unwrap_or("<none>".to_string())
);
}

Ok(())
}
}
scan_pageserver_metadata_cmd(
bucket_config,
controller_client.as_ref(),
tenant_ids,
json,
post_to_storcon,
)
.await
}
}
Command::FindGarbage {
Expand Down Expand Up @@ -254,31 +228,14 @@ async fn main() -> anyhow::Result<()> {
min_age,
mode,
} => {
match (&controller_client_conf, mode) {
(Some(_), _) => {
// Any mode may run when controller API is set
}
(None, GcMode::Full) => {
// The part of physical GC where we erase ancestor layers cannot be done safely without
// confirming the most recent complete shard split with the controller. Refuse to run, rather
// than doing it unsafely.
return Err(anyhow!("Full physical GC requires `--controller-api` and `--controller-jwt` to run"));
}
(None, GcMode::DryRun | GcMode::IndicesOnly) => {
// These GcModes do not require the controller to run.
}
}

let summary = pageserver_physical_gc(
bucket_config,
controller_client_conf,
pageserver_physical_gc_cmd(
&bucket_config,
controller_client.as_ref(),
tenant_ids,
min_age.into(),
min_age,
mode,
)
.await?;
println!("{}", serde_json::to_string(&summary).unwrap());
Ok(())
.await
}
Command::FindLargeObjects {
min_size,
Expand All @@ -295,5 +252,142 @@ async fn main() -> anyhow::Result<()> {
println!("{}", serde_json::to_string(&summary).unwrap());
Ok(())
}
Command::CronJob {
gc_min_age,
gc_mode,
post_to_storcon,
} => {
run_cron_job(
bucket_config,
controller_client.as_ref(),
gc_min_age,
gc_mode,
post_to_storcon,
)
.await
}
}
}

/// Runs the scrubber cron job.
/// 1. Do pageserver physical gc
/// 2. Scan pageserver metadata
pub async fn run_cron_job(
bucket_config: BucketConfig,
controller_client: Option<&control_api::Client>,
gc_min_age: humantime::Duration,
gc_mode: GcMode,
post_to_storcon: bool,
) -> anyhow::Result<()> {
tracing::info!(%gc_min_age, %gc_mode, "Running pageserver-physical-gc");
pageserver_physical_gc_cmd(
&bucket_config,
controller_client,
Vec::new(),
gc_min_age,
gc_mode,
)
.await?;
tracing::info!(%post_to_storcon, node_kind = %NodeKind::Pageserver, "Running scan-metadata");
scan_pageserver_metadata_cmd(
bucket_config,
controller_client,
Vec::new(),
true,
post_to_storcon,
)
.await?;

Ok(())
}

pub async fn pageserver_physical_gc_cmd(
bucket_config: &BucketConfig,
controller_client: Option<&control_api::Client>,
tenant_shard_ids: Vec<TenantShardId>,
min_age: humantime::Duration,
mode: GcMode,
) -> anyhow::Result<()> {
match (controller_client, mode) {
(Some(_), _) => {
// Any mode may run when controller API is set
}
(None, GcMode::Full) => {
// The part of physical GC where we erase ancestor layers cannot be done safely without
// confirming the most recent complete shard split with the controller. Refuse to run, rather
// than doing it unsafely.
return Err(anyhow!(
"Full physical GC requires `--controller-api` and `--controller-jwt` to run"
));
}
(None, GcMode::DryRun | GcMode::IndicesOnly) => {
// These GcModes do not require the controller to run.
}
}

let summary = pageserver_physical_gc(
bucket_config,
controller_client,
tenant_shard_ids,
min_age.into(),
mode,
)
.await?;
println!("{}", serde_json::to_string(&summary).unwrap());
Ok(())
}

pub async fn scan_pageserver_metadata_cmd(
bucket_config: BucketConfig,
controller_client: Option<&control_api::Client>,
tenant_shard_ids: Vec<TenantShardId>,
json: bool,
post_to_storcon: bool,
) -> anyhow::Result<()> {
if controller_client.is_none() && post_to_storcon {
return Err(anyhow!("Posting pageserver scan health status to storage controller requires `--controller-api` and `--controller-jwt` to run"));
}
match scan_pageserver_metadata(bucket_config.clone(), tenant_shard_ids).await {
Err(e) => {
tracing::error!("Failed: {e}");
Err(e)
}
Ok(summary) => {
if json {
println!("{}", serde_json::to_string(&summary).unwrap())
} else {
println!("{}", summary.summary_string());
}

if post_to_storcon {
if let Some(client) = controller_client {
let body = summary.build_health_update_request();
client
.dispatch::<MetadataHealthUpdateRequest, MetadataHealthUpdateResponse>(
Method::POST,
"control/v1/metadata_health/update".to_string(),
Some(body),
)
.await?;
}
}

if summary.is_fatal() {
tracing::error!("Fatal scrub errors detected");
} else if summary.is_empty() {
// Strictly speaking an empty bucket is a valid bucket, but if someone ran the
// scrubber they were likely expecting to scan something, and if we see no timelines
// at all then it's likely due to some configuration issues like a bad prefix
tracing::error!(
"No timelines found in bucket {} prefix {}",
bucket_config.bucket,
bucket_config
.prefix_in_bucket
.unwrap_or("<none>".to_string())
);
}

Ok(())
}
}
}
16 changes: 7 additions & 9 deletions storage_scrubber/src/pageserver_physical_gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ use std::time::{Duration, SystemTime};

use crate::checks::{list_timeline_blobs, BlobDataParseResult};
use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
use crate::{
init_remote, BucketConfig, ControllerClientConfig, NodeKind, RootTarget, TenantShardTimelineId,
};
use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId};
use aws_sdk_s3::Client;
use futures_util::{StreamExt, TryStreamExt};
use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
Expand Down Expand Up @@ -473,8 +471,8 @@ async fn gc_ancestor(
/// This type of GC is not necessary for correctness: rather it serves to reduce wasted storage capacity, and
/// make sure that object listings don't get slowed down by large numbers of garbage objects.
pub async fn pageserver_physical_gc(
bucket_config: BucketConfig,
controller_client_conf: Option<ControllerClientConfig>,
bucket_config: &BucketConfig,
controller_client: Option<&control_api::Client>,
tenant_shard_ids: Vec<TenantShardId>,
min_age: Duration,
mode: GcMode,
Expand Down Expand Up @@ -558,7 +556,7 @@ pub async fn pageserver_physical_gc(
let timelines = timelines.map_ok(|ttid| {
gc_timeline(
&s3_client,
&bucket_config,
bucket_config,
&min_age,
&target,
mode,
Expand All @@ -574,7 +572,7 @@ pub async fn pageserver_physical_gc(
}

// Execute cross-shard GC, using the accumulator's full view of all the shards built in the per-shard GC
let Some(controller_client) = controller_client_conf.map(|c| c.build_client()) else {
let Some(client) = controller_client else {
tracing::info!("Skipping ancestor layer GC, because no `--controller-api` was specified");
return Ok(summary);
};
Expand All @@ -583,13 +581,13 @@ pub async fn pageserver_physical_gc(
.unwrap()
.into_inner()
.unwrap()
.into_gc_ancestors(&controller_client, &mut summary)
.into_gc_ancestors(client, &mut summary)
.await;

for ancestor_shard in ancestor_shards {
gc_ancestor(
&s3_client,
&bucket_config,
bucket_config,
&target,
&min_age,
ancestor_shard,
Expand Down
2 changes: 1 addition & 1 deletion storage_scrubber/src/scan_pageserver_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ Index versions: {version_summary}
}

/// Scan the pageserver metadata in an S3 bucket, reporting errors and statistics.
pub async fn scan_metadata(
pub async fn scan_pageserver_metadata(
bucket_config: BucketConfig,
tenant_ids: Vec<TenantShardId>,
) -> anyhow::Result<MetadataSummary> {
Expand Down

0 comments on commit 6c855a5

Please sign in to comment.