Skip to content

Commit

Permalink
Merge pull request #38 from influxdata/hiltontj/oss-sync-2024-08-21
Browse files Browse the repository at this point in the history
chore: sync OSS
  • Loading branch information
hiltontj authored Aug 21, 2024
2 parents 9be4b40 + cf4b0b2 commit 653367a
Show file tree
Hide file tree
Showing 9 changed files with 295 additions and 60 deletions.
4 changes: 4 additions & 0 deletions influxdb3_catalog/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ impl SequenceNumber {
pub fn next(&self) -> Self {
Self(self.0 + 1)
}

pub fn as_u32(&self) -> u32 {
self.0
}
}

#[derive(Debug)]
Expand Down
36 changes: 33 additions & 3 deletions influxdb3_wal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ pub trait Wal: Debug + Send + Sync + 'static {
);

/// Returns the last persisted wal file sequence number
async fn last_sequence_number(&self) -> WalFileSequenceNumber;
async fn last_wal_sequence_number(&self) -> WalFileSequenceNumber;

/// Returns the last persisted wal file sequence number
async fn last_snapshot_sequence_number(&self) -> SnapshotSequenceNumber;

/// Stop all writes to the WAL and flush the buffer to a WAL file.
async fn shutdown(&self);
Expand Down Expand Up @@ -600,7 +603,7 @@ impl WalFileSequenceNumber {
Self(self.0 + 1)
}

pub fn get(&self) -> u64 {
pub fn as_u64(&self) -> u64 {
self.0
}
}
Expand All @@ -611,13 +614,40 @@ impl std::fmt::Display for WalFileSequenceNumber {
}
}

#[derive(
Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize,
)]
pub struct SnapshotSequenceNumber(u64);

impl SnapshotSequenceNumber {
pub fn new(number: u64) -> Self {
Self(number)
}

pub fn next(&self) -> Self {
Self(self.0 + 1)
}

pub fn as_u64(&self) -> u64 {
self.0
}
}

impl std::fmt::Display for SnapshotSequenceNumber {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}

/// Details about a snapshot of the WAL
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
pub struct SnapshotDetails {
/// The sequence number for this snapshot
pub snapshot_sequence_number: SnapshotSequenceNumber,
/// All chunks with data before this time can be snapshot and persisted
pub end_time_marker: i64,
/// All wal files with a sequence number <= to this can be deleted once snapshotting is complete
pub last_sequence_number: WalFileSequenceNumber,
pub last_wal_sequence_number: WalFileSequenceNumber,
}

pub fn background_wal_flush<W: Wal>(
Expand Down
49 changes: 36 additions & 13 deletions influxdb3_wal/src/object_store.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::serialize::verify_file_type_and_deserialize;
use crate::snapshot_tracker::{SnapshotInfo, SnapshotTracker, WalPeriod};
use crate::{
background_wal_flush, CatalogBatch, SnapshotDetails, Wal, WalConfig, WalContents,
WalFileNotifier, WalFileSequenceNumber, WalOp, WriteBatch,
background_wal_flush, CatalogBatch, SnapshotDetails, SnapshotSequenceNumber, Wal, WalConfig,
WalContents, WalFileNotifier, WalFileSequenceNumber, WalOp, WriteBatch,
};
use bytes::Bytes;
use data_types::Timestamp;
Expand Down Expand Up @@ -33,15 +33,17 @@ impl WalObjectStore {
host_identifier_prefix: impl Into<String> + Send,
file_notifier: Arc<dyn WalFileNotifier>,
config: WalConfig,
last_snapshot_wal_sequence: Option<WalFileSequenceNumber>,
last_wal_sequence_number: Option<WalFileSequenceNumber>,
last_snapshot_sequence_number: Option<SnapshotSequenceNumber>,
) -> Result<Arc<Self>, crate::Error> {
let flush_interval = config.flush_interval;
let wal = Self::new_without_replay(
object_store,
host_identifier_prefix,
file_notifier,
config,
last_snapshot_wal_sequence,
last_wal_sequence_number,
last_snapshot_sequence_number,
);

wal.replay().await?;
Expand All @@ -56,9 +58,10 @@ impl WalObjectStore {
host_identifier_prefix: impl Into<String>,
file_notifier: Arc<dyn WalFileNotifier>,
config: WalConfig,
last_snapshot_wal_sequence: Option<WalFileSequenceNumber>,
last_wal_sequence_number: Option<WalFileSequenceNumber>,
last_snapshot_sequence_number: Option<SnapshotSequenceNumber>,
) -> Self {
let wal_file_sequence_number = last_snapshot_wal_sequence.unwrap_or_default().next();
let wal_file_sequence_number = last_wal_sequence_number.unwrap_or_default().next();
Self {
object_store,
host_identifier_prefix: host_identifier_prefix.into(),
Expand All @@ -73,7 +76,11 @@ impl WalObjectStore {
catalog_batches: vec![],
write_op_responses: vec![],
},
SnapshotTracker::new(config.snapshot_size, config.level_0_duration),
SnapshotTracker::new(
config.snapshot_size,
config.level_0_duration,
last_snapshot_sequence_number,
),
)),
}
}
Expand Down Expand Up @@ -256,7 +263,7 @@ impl WalObjectStore {
None => {
debug!(
"notify sent to buffer for wal file {}",
wal_contents.wal_file_number.get()
wal_contents.wal_file_number.as_u64()
);
self.file_notifier.notify(wal_contents);
None
Expand Down Expand Up @@ -359,12 +366,20 @@ impl Wal for WalObjectStore {
.await
}

async fn last_sequence_number(&self) -> WalFileSequenceNumber {
async fn last_wal_sequence_number(&self) -> WalFileSequenceNumber {
self.flush_buffer
.lock()
.await
.snapshot_tracker
.last_sequence_number()
.last_wal_sequence_number()
}

async fn last_snapshot_sequence_number(&self) -> SnapshotSequenceNumber {
self.flush_buffer
.lock()
.await
.snapshot_tracker
.last_snapshot_sequence_number()
}

async fn shutdown(&self) {
Expand Down Expand Up @@ -577,7 +592,9 @@ fn wal_path(host_identifier_prefix: &str, wal_file_number: WalFileSequenceNumber
#[cfg(test)]
mod tests {
use super::*;
use crate::{Field, FieldData, Level0Duration, Row, TableChunk, TableChunks};
use crate::{
Field, FieldData, Level0Duration, Row, SnapshotSequenceNumber, TableChunk, TableChunks,
};
use async_trait::async_trait;
use object_store::memory::InMemory;
use std::any::Any;
Expand All @@ -599,6 +616,7 @@ mod tests {
Arc::clone(&notifier),
wal_config,
None,
None,
);

let db_name: Arc<str> = "db1".into();
Expand Down Expand Up @@ -805,6 +823,7 @@ mod tests {
snapshot_size: 2,
},
None,
None,
);
assert_eq!(
replay_wal.load_existing_wal_file_paths().await.unwrap(),
Expand Down Expand Up @@ -864,8 +883,9 @@ mod tests {
let (snapshot_done, snapshot_info, snapshot_permit) = wal.flush_buffer().await.unwrap();
let expected_info = SnapshotInfo {
snapshot_details: SnapshotDetails {
snapshot_sequence_number: SnapshotSequenceNumber::new(1),
end_time_marker: 120000000000,
last_sequence_number: WalFileSequenceNumber(2),
last_wal_sequence_number: WalFileSequenceNumber(2),
},
wal_periods: vec![
WalPeriod {
Expand Down Expand Up @@ -918,8 +938,9 @@ mod tests {
max_time_ns: 128_000000000,
})],
snapshot: Some(SnapshotDetails {
snapshot_sequence_number: SnapshotSequenceNumber::new(1),
end_time_marker: 120_000000000,
last_sequence_number: WalFileSequenceNumber(2),
last_wal_sequence_number: WalFileSequenceNumber(2),
}),
};

Expand All @@ -944,6 +965,7 @@ mod tests {
Arc::clone(&replay_notifier),
wal_config,
None,
None,
);
assert_eq!(
replay_wal.load_existing_wal_file_paths().await.unwrap(),
Expand Down Expand Up @@ -976,6 +998,7 @@ mod tests {
Arc::clone(&notifier),
wal_config,
None,
None,
);

assert!(wal.flush_buffer().await.is_none());
Expand Down
56 changes: 39 additions & 17 deletions influxdb3_wal/src/snapshot_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
//! configured as it can be used to ensure that data in the write buffer is persisted in blocks
//! that are not too large and unlikely to overlap.
use crate::{Level0Duration, SnapshotDetails, WalFileSequenceNumber};
use crate::{Level0Duration, SnapshotDetails, SnapshotSequenceNumber, WalFileSequenceNumber};
use data_types::Timestamp;

/// A struct that tracks the WAL periods (files if using object store) and decides when to snapshot the WAL.
#[derive(Debug)]
pub(crate) struct SnapshotTracker {
last_sequence_number: WalFileSequenceNumber,
last_snapshot_sequence_number: SnapshotSequenceNumber,
last_wal_sequence_number: WalFileSequenceNumber,
wal_periods: Vec<WalPeriod>,
snapshot_size: usize,
level_0_duration: Level0Duration,
Expand All @@ -20,9 +21,14 @@ impl SnapshotTracker {
/// Create a new `SnapshotTracker` with the given snapshot size and level 0 duration. The
/// level 0 duration is the size of chunks in the write buffer that will be persisted as
/// parquet files.
pub(crate) fn new(snapshot_size: usize, level_0_duration: Level0Duration) -> Self {
pub(crate) fn new(
snapshot_size: usize,
level_0_duration: Level0Duration,
last_snapshot_sequence_number: Option<SnapshotSequenceNumber>,
) -> Self {
Self {
last_sequence_number: WalFileSequenceNumber::default(),
last_snapshot_sequence_number: last_snapshot_sequence_number.unwrap_or_default(),
last_wal_sequence_number: WalFileSequenceNumber::default(),
wal_periods: Vec::new(),
snapshot_size,
level_0_duration,
Expand All @@ -38,7 +44,7 @@ impl SnapshotTracker {
assert!(last_period.wal_file_number < wal_period.wal_file_number);
}

self.last_sequence_number = wal_period.wal_file_number;
self.last_wal_sequence_number = wal_period.wal_file_number;
self.wal_periods.push(wal_period);
}

Expand All @@ -59,7 +65,7 @@ impl SnapshotTracker {

// if the number of wal periods is > 3x the snapshot size, snapshot everything up to the last period
if self.wal_periods.len() >= 3 * self.snapshot_size {
let last_sequence_number = self.wal_periods.last().unwrap().wal_file_number;
let last_wal_sequence_number = self.wal_periods.last().unwrap().wal_file_number;
let max_time = self
.wal_periods
.iter()
Expand All @@ -72,8 +78,9 @@ impl SnapshotTracker {
// remove the wal periods and return the snapshot details
let wal_periods = std::mem::take(&mut self.wal_periods);
let snapshot_details = SnapshotDetails {
snapshot_sequence_number: self.increment_snapshot_sequence_number(),
end_time_marker: t.get(),
last_sequence_number,
last_wal_sequence_number,
};

return Some(SnapshotInfo {
Expand Down Expand Up @@ -101,8 +108,9 @@ impl SnapshotTracker {

SnapshotInfo {
snapshot_details: SnapshotDetails {
snapshot_sequence_number: self.increment_snapshot_sequence_number(),
end_time_marker: t.get(),
last_sequence_number: period.wal_file_number,
last_wal_sequence_number: period.wal_file_number,
},
wal_periods: periods_to_snapshot,
}
Expand All @@ -115,9 +123,19 @@ impl SnapshotTracker {
self.snapshot_size + self.snapshot_size / 2
}

/// Returns the last `WalFileSequenceNumber` that was added to the tracker.
pub(crate) fn last_sequence_number(&self) -> WalFileSequenceNumber {
self.last_sequence_number
/// Returns the last [`WalFileSequenceNumber`] that was added to the tracker.
pub(crate) fn last_wal_sequence_number(&self) -> WalFileSequenceNumber {
self.last_wal_sequence_number
}

/// Returns the last [`SnapshotSequenceNumber`] that was added to the tracker.
pub(crate) fn last_snapshot_sequence_number(&self) -> SnapshotSequenceNumber {
self.last_snapshot_sequence_number
}

fn increment_snapshot_sequence_number(&mut self) -> SnapshotSequenceNumber {
self.last_snapshot_sequence_number = self.last_snapshot_sequence_number.next();
self.last_snapshot_sequence_number
}
}

Expand Down Expand Up @@ -158,7 +176,7 @@ mod tests {

#[test]
fn snapshot() {
let mut tracker = SnapshotTracker::new(2, Level0Duration::new_1m());
let mut tracker = SnapshotTracker::new(2, Level0Duration::new_1m(), None);
let p1 = WalPeriod::new(
WalFileSequenceNumber::new(1),
Timestamp::new(0),
Expand Down Expand Up @@ -200,8 +218,9 @@ mod tests {
tracker.snapshot(),
Some(SnapshotInfo {
snapshot_details: SnapshotDetails {
snapshot_sequence_number: SnapshotSequenceNumber::new(1),
end_time_marker: 120_000000000,
last_sequence_number: WalFileSequenceNumber::new(2)
last_wal_sequence_number: WalFileSequenceNumber::new(2)
},
wal_periods: vec![p1, p2],
})
Expand All @@ -213,8 +232,9 @@ mod tests {
tracker.snapshot(),
Some(SnapshotInfo {
snapshot_details: SnapshotDetails {
snapshot_sequence_number: SnapshotSequenceNumber::new(2),
end_time_marker: 240_000000000,
last_sequence_number: WalFileSequenceNumber::new(3)
last_wal_sequence_number: WalFileSequenceNumber::new(3)
},
wal_periods: vec![p3]
})
Expand All @@ -227,8 +247,9 @@ mod tests {
tracker.snapshot(),
Some(SnapshotInfo {
snapshot_details: SnapshotDetails {
snapshot_sequence_number: SnapshotSequenceNumber::new(3),
end_time_marker: 360_000000000,
last_sequence_number: WalFileSequenceNumber::new(5)
last_wal_sequence_number: WalFileSequenceNumber::new(5)
},
wal_periods: vec![p4, p5]
})
Expand All @@ -239,7 +260,7 @@ mod tests {

#[test]
fn snapshot_future_data_forces_snapshot() {
let mut tracker = SnapshotTracker::new(2, Level0Duration::new_1m());
let mut tracker = SnapshotTracker::new(2, Level0Duration::new_1m(), None);
let p1 = WalPeriod::new(
WalFileSequenceNumber::new(1),
Timestamp::new(0),
Expand Down Expand Up @@ -285,8 +306,9 @@ mod tests {
tracker.snapshot(),
Some(SnapshotInfo {
snapshot_details: SnapshotDetails {
snapshot_sequence_number: SnapshotSequenceNumber::new(1),
end_time_marker: 360000000000,
last_sequence_number: WalFileSequenceNumber::new(6)
last_wal_sequence_number: WalFileSequenceNumber::new(6)
},
wal_periods: vec![p1, p2, p3, p4, p5, p6]
})
Expand Down
Loading

0 comments on commit 653367a

Please sign in to comment.