Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace MAX_KEYS_PER_DELETE constant with function #10061

Merged
merged 2 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions libs/remote_storage/src/azure_blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,10 @@ impl RemoteStorage for AzureBlobStorage {
res
}

fn max_keys_per_delete(&self) -> usize {
super::MAX_KEYS_PER_DELETE_AZURE
}

async fn copy(
&self,
from: &RemotePath,
Expand Down
27 changes: 26 additions & 1 deletion libs/remote_storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,14 @@ pub const DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT: usize = 100;
pub const DEFAULT_MAX_KEYS_PER_LIST_RESPONSE: Option<i32> = None;

/// As defined in S3 docs
pub const MAX_KEYS_PER_DELETE: usize = 1000;
///
/// <https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html>
pub const MAX_KEYS_PER_DELETE_S3: usize = 1000;

/// As defined in Azure docs
///
/// <https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch>
pub const MAX_KEYS_PER_DELETE_AZURE: usize = 256;

const REMOTE_STORAGE_PREFIX_SEPARATOR: char = '/';

Expand Down Expand Up @@ -340,6 +347,14 @@ pub trait RemoteStorage: Send + Sync + 'static {
cancel: &CancellationToken,
) -> anyhow::Result<()>;

/// Returns the maximum number of keys that a call to [`Self::delete_objects`] can delete without chunking
///
/// The value returned is only an optimization hint, One can pass larger number of objects to
/// `delete_objects` as well.
///
/// The value is guaranteed to be >= 1.
fn max_keys_per_delete(&self) -> usize;

/// Deletes all objects matching the given prefix.
///
/// NB: this uses NoDelimiter and will match partial prefixes. For example, the prefix /a/b will
Expand Down Expand Up @@ -533,6 +548,16 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
}
}

/// [`RemoteStorage::max_keys_per_delete`]
pub fn max_keys_per_delete(&self) -> usize {
match self {
Self::LocalFs(s) => s.max_keys_per_delete(),
Self::AwsS3(s) => s.max_keys_per_delete(),
Self::AzureBlob(s) => s.max_keys_per_delete(),
Self::Unreliable(s) => s.max_keys_per_delete(),
}
}

/// See [`RemoteStorage::delete_prefix`]
pub async fn delete_prefix(
&self,
Expand Down
4 changes: 4 additions & 0 deletions libs/remote_storage/src/local_fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,10 @@ impl RemoteStorage for LocalFs {
Ok(())
}

fn max_keys_per_delete(&self) -> usize {
super::MAX_KEYS_PER_DELETE_S3
}

async fn copy(
&self,
from: &RemotePath,
Expand Down
8 changes: 6 additions & 2 deletions libs/remote_storage/src/s3_bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use crate::{
metrics::{start_counting_cancelled_wait, start_measuring_requests},
support::PermitCarrying,
ConcurrencyLimiter, Download, DownloadError, DownloadOpts, Listing, ListingMode, ListingObject,
RemotePath, RemoteStorage, TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE,
RemotePath, RemoteStorage, TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE_S3,
REMOTE_STORAGE_PREFIX_SEPARATOR,
};

Expand Down Expand Up @@ -355,7 +355,7 @@ impl S3Bucket {
let kind = RequestKind::Delete;
let mut cancel = std::pin::pin!(cancel.cancelled());

for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE) {
for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE_S3) {
let started_at = start_measuring_requests(kind);

let req = self
Expand Down Expand Up @@ -832,6 +832,10 @@ impl RemoteStorage for S3Bucket {
self.delete_oids(&permit, &delete_objects, cancel).await
}

fn max_keys_per_delete(&self) -> usize {
MAX_KEYS_PER_DELETE_S3
}

async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
let paths = std::array::from_ref(path);
self.delete_objects(paths, cancel).await
Expand Down
4 changes: 4 additions & 0 deletions libs/remote_storage/src/simulate_failures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ impl RemoteStorage for UnreliableWrapper {
Ok(())
}

fn max_keys_per_delete(&self) -> usize {
self.inner.max_keys_per_delete()
}

async fn copy(
&self,
from: &RemotePath,
Expand Down
10 changes: 5 additions & 5 deletions pageserver/src/deletion_queue/deleter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
use remote_storage::GenericRemoteStorage;
use remote_storage::RemotePath;
use remote_storage::TimeoutOrCancel;
use remote_storage::MAX_KEYS_PER_DELETE;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use tracing::info;
Expand Down Expand Up @@ -131,7 +130,8 @@ impl Deleter {
}

pub(super) async fn background(&mut self) -> Result<(), DeletionQueueError> {
self.accumulator.reserve(MAX_KEYS_PER_DELETE);
let max_keys_per_delete = self.remote_storage.max_keys_per_delete();
self.accumulator.reserve(max_keys_per_delete);

loop {
if self.cancel.is_cancelled() {
Expand All @@ -156,14 +156,14 @@ impl Deleter {

match msg {
DeleterMessage::Delete(mut list) => {
while !list.is_empty() || self.accumulator.len() == MAX_KEYS_PER_DELETE {
if self.accumulator.len() == MAX_KEYS_PER_DELETE {
while !list.is_empty() || self.accumulator.len() == max_keys_per_delete {
if self.accumulator.len() == max_keys_per_delete {
self.flush().await?;
// If we have received this number of keys, proceed with attempting to execute
assert_eq!(self.accumulator.len(), 0);
}

let available_slots = MAX_KEYS_PER_DELETE - self.accumulator.len();
let available_slots = max_keys_per_delete - self.accumulator.len();
let take_count = std::cmp::min(available_slots, list.len());
for path in list.drain(list.len() - take_count..) {
self.accumulator.push(path);
Expand Down
11 changes: 5 additions & 6 deletions storage_scrubber/src/garbage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,12 +459,10 @@ pub async fn get_timeline_objects(
Ok(list.keys)
}

const MAX_KEYS_PER_DELETE: usize = 1000;

/// Drain a buffer of keys into DeleteObjects requests
///
/// If `drain` is true, drains keys completely; otherwise stops when <
/// MAX_KEYS_PER_DELETE keys are left.
/// `max_keys_per_delete`` keys are left.
/// `num_deleted` returns number of deleted keys.
async fn do_delete(
remote_client: &GenericRemoteStorage,
Expand All @@ -474,9 +472,10 @@ async fn do_delete(
progress_tracker: &mut DeletionProgressTracker,
) -> anyhow::Result<()> {
let cancel = CancellationToken::new();
while (!keys.is_empty() && drain) || (keys.len() >= MAX_KEYS_PER_DELETE) {
let max_keys_per_delete = remote_client.max_keys_per_delete();
while (!keys.is_empty() && drain) || (keys.len() >= max_keys_per_delete) {
let request_keys =
keys.split_off(keys.len() - (std::cmp::min(MAX_KEYS_PER_DELETE, keys.len())));
keys.split_off(keys.len() - (std::cmp::min(max_keys_per_delete, keys.len())));

let request_keys: Vec<RemotePath> = request_keys.into_iter().map(|o| o.key).collect();

Expand Down Expand Up @@ -617,7 +616,7 @@ pub async fn purge_garbage(
}

objects_to_delete.append(&mut object_list);
if objects_to_delete.len() >= MAX_KEYS_PER_DELETE {
if objects_to_delete.len() >= remote_client.max_keys_per_delete() {
do_delete(
&remote_client,
&mut objects_to_delete,
Expand Down
Loading