Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/abstract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,14 @@ pub trait AbstractTree {

/// Drops segments that are fully contained in a given range.
///
/// Accepts any `RangeBounds`, including unbounded or exclusive endpoints.
/// If the normalized lower bound is greater than the upper bound, the
/// method returns without performing any work.
///
/// # Errors
///
/// Will return `Err` if an IO error occurs.
fn drop_range(&self, key_range: crate::KeyRange) -> crate::Result<()>;
/// Will return `Err` only if an IO error occurs during compaction.
fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()>;

/// Performs major compaction, blocking the caller until it's done.
///
Expand Down
4 changes: 2 additions & 2 deletions src/blob_tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,8 +432,8 @@ impl AbstractTree for BlobTree {
self.index.tombstone_count()
}

fn drop_range(&self, key_range: crate::KeyRange) -> crate::Result<()> {
self.index.drop_range(key_range)
fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
self.index.drop_range(range)
}

fn ingest(
Expand Down
68 changes: 59 additions & 9 deletions src/compaction/drop_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,68 @@
// (found in the LICENSE-* files in the repository)

use super::{Choice, CompactionStrategy};
use crate::{config::Config, level_manifest::LevelManifest, KeyRange};
use crate::{
config::Config, level_manifest::LevelManifest, slice::Slice, version::run::Ranged, KeyRange,
};
use crate::{HashSet, Segment};
use std::ops::{Bound, RangeBounds};

#[derive(Clone, Debug)]
pub struct OwnedBounds {
pub start: Bound<Slice>,
pub end: Bound<Slice>,
}

impl RangeBounds<Slice> for OwnedBounds {
fn start_bound(&self) -> Bound<&Slice> {
match &self.start {
Bound::Unbounded => Bound::Unbounded,
Bound::Included(key) => Bound::Included(key),
Bound::Excluded(key) => Bound::Excluded(key),
}
}

fn end_bound(&self) -> Bound<&Slice> {
match &self.end {
Bound::Unbounded => Bound::Unbounded,
Bound::Included(key) => Bound::Included(key),
Bound::Excluded(key) => Bound::Excluded(key),
}
}
}

impl OwnedBounds {
#[must_use]
pub fn contains(&self, range: &KeyRange) -> bool {
let lower_ok = match &self.start {
Bound::Unbounded => true,
Bound::Included(key) => key.as_ref() <= range.min().as_ref(),
Bound::Excluded(key) => key.as_ref() < range.min().as_ref(),
};

if !lower_ok {
return false;
}

match &self.end {
Bound::Unbounded => true,
Bound::Included(key) => key.as_ref() >= range.max().as_ref(),
Bound::Excluded(key) => key.as_ref() > range.max().as_ref(),
}
}
}

/// Drops all segments that are **contained** in a key range
pub struct Strategy {
key_range: KeyRange,
bounds: OwnedBounds,
}

impl Strategy {
/// Configures a new `DropRange` compaction strategy.
///
/// # Panics
///
/// Panics, if `target_size` is below 1024 bytes.
#[must_use]
#[allow(dead_code)]
pub fn new(key_range: KeyRange) -> Self {
Self { key_range }
pub fn new(bounds: OwnedBounds) -> Self {
Self { bounds }
}
}

Expand All @@ -34,7 +78,13 @@ impl CompactionStrategy for Strategy {
.current_version()
.iter_levels()
.flat_map(|lvl| lvl.iter())
.flat_map(|run| run.get_contained(&self.key_range))
.flat_map(|run| {
run.range_overlap_indexes(&self.bounds)
.and_then(|(lo, hi)| run.get(lo..=hi))
.unwrap_or_default()
.iter()
.filter(|x| self.bounds.contains(x.key_range()))
})
.map(Segment::id)
.collect();

Expand Down
56 changes: 51 additions & 5 deletions src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub mod inner;

use crate::{
coding::{Decode, Encode},
compaction::CompactionStrategy,
compaction::{drop_range::OwnedBounds, CompactionStrategy},
config::Config,
file::BLOBS_FOLDER,
format_version::FormatVersion,
Expand All @@ -16,6 +16,7 @@ use crate::{
manifest::Manifest,
memtable::Memtable,
segment::Segment,
slice::Slice,
value::InternalValue,
vlog::BlobFile,
AbstractTree, Cache, DescriptorTable, KvPair, SegmentId, SeqNo, SequenceNumberCounter, UserKey,
Expand All @@ -24,7 +25,7 @@ use crate::{
use inner::{MemtableId, SealedMemtables, TreeId, TreeInner};
use std::{
io::Cursor,
ops::RangeBounds,
ops::{Bound, RangeBounds},
path::Path,
sync::{atomic::AtomicU64, Arc, RwLock, RwLockReadGuard, RwLockWriteGuard},
};
Expand Down Expand Up @@ -168,9 +169,14 @@ impl AbstractTree for Tree {
Ok(())
}

// TODO: change API to RangeBounds<K>
fn drop_range(&self, key_range: crate::KeyRange) -> crate::Result<()> {
let strategy = Arc::new(crate::compaction::drop_range::Strategy::new(key_range));
fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
let (bounds, is_empty) = Self::range_bounds_to_owned_bounds(&range)?;

if is_empty {
return Ok(());
}

let strategy = Arc::new(crate::compaction::drop_range::Strategy::new(bounds));

// IMPORTANT: Write lock so we can be the only compaction going on
let _lock = self
Expand Down Expand Up @@ -549,6 +555,46 @@ impl AbstractTree for Tree {
}

impl Tree {
/// Normalizes a user-provided range into owned `Bound<Slice>` values.
///
/// Returns a tuple containing:
/// - the `OwnedBounds` that mirror the original bounds semantics (including
/// inclusive/exclusive markers and unbounded endpoints), and
/// - a `bool` flag indicating whether the normalized range is logically
/// empty (e.g., when the lower bound is greater than the upper bound).
///
/// Callers can use the flag to detect empty ranges and skip further work
/// while still having access to the normalized bounds for non-empty cases.
fn range_bounds_to_owned_bounds<K: AsRef<[u8]>, R: RangeBounds<K>>(
range: &R,
) -> crate::Result<(OwnedBounds, bool)> {
use Bound::{Excluded, Included, Unbounded};

let start = match range.start_bound() {
Included(key) => Included(Slice::from(key.as_ref())),
Excluded(key) => Excluded(Slice::from(key.as_ref())),
Unbounded => Unbounded,
};

let end = match range.end_bound() {
Included(key) => Included(Slice::from(key.as_ref())),
Excluded(key) => Excluded(Slice::from(key.as_ref())),
Unbounded => Unbounded,
};

let is_empty = if let (Included(lo), Included(hi))
| (Included(lo), Excluded(hi))
| (Excluded(lo), Included(hi))
| (Excluded(lo), Excluded(hi)) = (&start, &end)
{
lo.as_ref() > hi.as_ref()
} else {
false
};

Ok((OwnedBounds { start, end }, is_empty))
}

/// Opens an LSM-tree in the given directory.
///
/// Will recover previous state if the folder was previously
Expand Down
Loading
Loading