diff --git a/influxdb3_catalog/src/catalog.rs b/influxdb3_catalog/src/catalog.rs index 0155c512265..bf2cb15c257 100644 --- a/influxdb3_catalog/src/catalog.rs +++ b/influxdb3_catalog/src/catalog.rs @@ -86,6 +86,10 @@ impl SequenceNumber { pub fn next(&self) -> Self { Self(self.0 + 1) } + + pub fn as_u32(&self) -> u32 { + self.0 + } } #[derive(Debug)] diff --git a/influxdb3_wal/src/lib.rs b/influxdb3_wal/src/lib.rs index 2d20474c3d6..b66cf70fcae 100644 --- a/influxdb3_wal/src/lib.rs +++ b/influxdb3_wal/src/lib.rs @@ -81,7 +81,10 @@ pub trait Wal: Debug + Send + Sync + 'static { ); /// Returns the last persisted wal file sequence number - async fn last_sequence_number(&self) -> WalFileSequenceNumber; + async fn last_wal_sequence_number(&self) -> WalFileSequenceNumber; + + /// Returns the last persisted wal file sequence number + async fn last_snapshot_sequence_number(&self) -> SnapshotSequenceNumber; /// Stop all writes to the WAL and flush the buffer to a WAL file. async fn shutdown(&self); @@ -600,7 +603,7 @@ impl WalFileSequenceNumber { Self(self.0 + 1) } - pub fn get(&self) -> u64 { + pub fn as_u64(&self) -> u64 { self.0 } } @@ -611,13 +614,40 @@ impl std::fmt::Display for WalFileSequenceNumber { } } +#[derive( + Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, +)] +pub struct SnapshotSequenceNumber(u64); + +impl SnapshotSequenceNumber { + pub fn new(number: u64) -> Self { + Self(number) + } + + pub fn next(&self) -> Self { + Self(self.0 + 1) + } + + pub fn as_u64(&self) -> u64 { + self.0 + } +} + +impl std::fmt::Display for SnapshotSequenceNumber { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + /// Details about a snapshot of the WAL #[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)] pub struct SnapshotDetails { + /// The sequence number for this snapshot + pub snapshot_sequence_number: SnapshotSequenceNumber, /// All chunks with data before this time can be snapshot and persisted pub end_time_marker: i64, /// All wal files with a sequence number <= to this can be deleted once snapshotting is complete - pub last_sequence_number: WalFileSequenceNumber, + pub last_wal_sequence_number: WalFileSequenceNumber, } pub fn background_wal_flush( diff --git a/influxdb3_wal/src/object_store.rs b/influxdb3_wal/src/object_store.rs index b30c5b010d4..5ebfa054f7e 100644 --- a/influxdb3_wal/src/object_store.rs +++ b/influxdb3_wal/src/object_store.rs @@ -1,8 +1,8 @@ use crate::serialize::verify_file_type_and_deserialize; use crate::snapshot_tracker::{SnapshotInfo, SnapshotTracker, WalPeriod}; use crate::{ - background_wal_flush, CatalogBatch, SnapshotDetails, Wal, WalConfig, WalContents, - WalFileNotifier, WalFileSequenceNumber, WalOp, WriteBatch, + background_wal_flush, CatalogBatch, SnapshotDetails, SnapshotSequenceNumber, Wal, WalConfig, + WalContents, WalFileNotifier, WalFileSequenceNumber, WalOp, WriteBatch, }; use bytes::Bytes; use data_types::Timestamp; @@ -33,7 +33,8 @@ impl WalObjectStore { host_identifier_prefix: impl Into + Send, file_notifier: Arc, config: WalConfig, - last_snapshot_wal_sequence: Option, + last_wal_sequence_number: Option, + last_snapshot_sequence_number: Option, ) -> Result, crate::Error> { let flush_interval = config.flush_interval; let wal = Self::new_without_replay( @@ -41,7 +42,8 @@ impl WalObjectStore { host_identifier_prefix, file_notifier, config, - last_snapshot_wal_sequence, + last_wal_sequence_number, + last_snapshot_sequence_number, ); wal.replay().await?; @@ -56,9 +58,10 @@ impl WalObjectStore { host_identifier_prefix: impl Into, file_notifier: Arc, config: WalConfig, - last_snapshot_wal_sequence: Option, + last_wal_sequence_number: Option, + last_snapshot_sequence_number: Option, ) -> Self { - let wal_file_sequence_number = last_snapshot_wal_sequence.unwrap_or_default().next(); + let wal_file_sequence_number = last_wal_sequence_number.unwrap_or_default().next(); Self { object_store, host_identifier_prefix: host_identifier_prefix.into(), @@ -73,7 +76,11 @@ impl WalObjectStore { catalog_batches: vec![], write_op_responses: vec![], }, - SnapshotTracker::new(config.snapshot_size, config.level_0_duration), + SnapshotTracker::new( + config.snapshot_size, + config.level_0_duration, + last_snapshot_sequence_number, + ), )), } } @@ -256,7 +263,7 @@ impl WalObjectStore { None => { debug!( "notify sent to buffer for wal file {}", - wal_contents.wal_file_number.get() + wal_contents.wal_file_number.as_u64() ); self.file_notifier.notify(wal_contents); None @@ -359,12 +366,20 @@ impl Wal for WalObjectStore { .await } - async fn last_sequence_number(&self) -> WalFileSequenceNumber { + async fn last_wal_sequence_number(&self) -> WalFileSequenceNumber { self.flush_buffer .lock() .await .snapshot_tracker - .last_sequence_number() + .last_wal_sequence_number() + } + + async fn last_snapshot_sequence_number(&self) -> SnapshotSequenceNumber { + self.flush_buffer + .lock() + .await + .snapshot_tracker + .last_snapshot_sequence_number() } async fn shutdown(&self) { @@ -577,7 +592,9 @@ fn wal_path(host_identifier_prefix: &str, wal_file_number: WalFileSequenceNumber #[cfg(test)] mod tests { use super::*; - use crate::{Field, FieldData, Level0Duration, Row, TableChunk, TableChunks}; + use crate::{ + Field, FieldData, Level0Duration, Row, SnapshotSequenceNumber, TableChunk, TableChunks, + }; use async_trait::async_trait; use object_store::memory::InMemory; use std::any::Any; @@ -599,6 +616,7 @@ mod tests { Arc::clone(¬ifier), wal_config, None, + None, ); let db_name: Arc = "db1".into(); @@ -805,6 +823,7 @@ mod tests { snapshot_size: 2, }, None, + None, ); assert_eq!( replay_wal.load_existing_wal_file_paths().await.unwrap(), @@ -864,8 +883,9 @@ mod tests { let (snapshot_done, snapshot_info, snapshot_permit) = wal.flush_buffer().await.unwrap(); let expected_info = SnapshotInfo { snapshot_details: SnapshotDetails { + snapshot_sequence_number: SnapshotSequenceNumber::new(1), end_time_marker: 120000000000, - last_sequence_number: WalFileSequenceNumber(2), + last_wal_sequence_number: WalFileSequenceNumber(2), }, wal_periods: vec![ WalPeriod { @@ -918,8 +938,9 @@ mod tests { max_time_ns: 128_000000000, })], snapshot: Some(SnapshotDetails { + snapshot_sequence_number: SnapshotSequenceNumber::new(1), end_time_marker: 120_000000000, - last_sequence_number: WalFileSequenceNumber(2), + last_wal_sequence_number: WalFileSequenceNumber(2), }), }; @@ -944,6 +965,7 @@ mod tests { Arc::clone(&replay_notifier), wal_config, None, + None, ); assert_eq!( replay_wal.load_existing_wal_file_paths().await.unwrap(), @@ -976,6 +998,7 @@ mod tests { Arc::clone(¬ifier), wal_config, None, + None, ); assert!(wal.flush_buffer().await.is_none()); diff --git a/influxdb3_wal/src/snapshot_tracker.rs b/influxdb3_wal/src/snapshot_tracker.rs index 2c0ff74bdee..18123e6ccc0 100644 --- a/influxdb3_wal/src/snapshot_tracker.rs +++ b/influxdb3_wal/src/snapshot_tracker.rs @@ -4,13 +4,14 @@ //! configured as it can be used to ensure that data in the write buffer is persisted in blocks //! that are not too large and unlikely to overlap. -use crate::{Level0Duration, SnapshotDetails, WalFileSequenceNumber}; +use crate::{Level0Duration, SnapshotDetails, SnapshotSequenceNumber, WalFileSequenceNumber}; use data_types::Timestamp; /// A struct that tracks the WAL periods (files if using object store) and decides when to snapshot the WAL. #[derive(Debug)] pub(crate) struct SnapshotTracker { - last_sequence_number: WalFileSequenceNumber, + last_snapshot_sequence_number: SnapshotSequenceNumber, + last_wal_sequence_number: WalFileSequenceNumber, wal_periods: Vec, snapshot_size: usize, level_0_duration: Level0Duration, @@ -20,9 +21,14 @@ impl SnapshotTracker { /// Create a new `SnapshotTracker` with the given snapshot size and level 0 duration. The /// level 0 duration is the size of chunks in the write buffer that will be persisted as /// parquet files. - pub(crate) fn new(snapshot_size: usize, level_0_duration: Level0Duration) -> Self { + pub(crate) fn new( + snapshot_size: usize, + level_0_duration: Level0Duration, + last_snapshot_sequence_number: Option, + ) -> Self { Self { - last_sequence_number: WalFileSequenceNumber::default(), + last_snapshot_sequence_number: last_snapshot_sequence_number.unwrap_or_default(), + last_wal_sequence_number: WalFileSequenceNumber::default(), wal_periods: Vec::new(), snapshot_size, level_0_duration, @@ -38,7 +44,7 @@ impl SnapshotTracker { assert!(last_period.wal_file_number < wal_period.wal_file_number); } - self.last_sequence_number = wal_period.wal_file_number; + self.last_wal_sequence_number = wal_period.wal_file_number; self.wal_periods.push(wal_period); } @@ -59,7 +65,7 @@ impl SnapshotTracker { // if the number of wal periods is > 3x the snapshot size, snapshot everything up to the last period if self.wal_periods.len() >= 3 * self.snapshot_size { - let last_sequence_number = self.wal_periods.last().unwrap().wal_file_number; + let last_wal_sequence_number = self.wal_periods.last().unwrap().wal_file_number; let max_time = self .wal_periods .iter() @@ -72,8 +78,9 @@ impl SnapshotTracker { // remove the wal periods and return the snapshot details let wal_periods = std::mem::take(&mut self.wal_periods); let snapshot_details = SnapshotDetails { + snapshot_sequence_number: self.increment_snapshot_sequence_number(), end_time_marker: t.get(), - last_sequence_number, + last_wal_sequence_number, }; return Some(SnapshotInfo { @@ -101,8 +108,9 @@ impl SnapshotTracker { SnapshotInfo { snapshot_details: SnapshotDetails { + snapshot_sequence_number: self.increment_snapshot_sequence_number(), end_time_marker: t.get(), - last_sequence_number: period.wal_file_number, + last_wal_sequence_number: period.wal_file_number, }, wal_periods: periods_to_snapshot, } @@ -115,9 +123,19 @@ impl SnapshotTracker { self.snapshot_size + self.snapshot_size / 2 } - /// Returns the last `WalFileSequenceNumber` that was added to the tracker. - pub(crate) fn last_sequence_number(&self) -> WalFileSequenceNumber { - self.last_sequence_number + /// Returns the last [`WalFileSequenceNumber`] that was added to the tracker. + pub(crate) fn last_wal_sequence_number(&self) -> WalFileSequenceNumber { + self.last_wal_sequence_number + } + + /// Returns the last [`SnapshotSequenceNumber`] that was added to the tracker. + pub(crate) fn last_snapshot_sequence_number(&self) -> SnapshotSequenceNumber { + self.last_snapshot_sequence_number + } + + fn increment_snapshot_sequence_number(&mut self) -> SnapshotSequenceNumber { + self.last_snapshot_sequence_number = self.last_snapshot_sequence_number.next(); + self.last_snapshot_sequence_number } } @@ -158,7 +176,7 @@ mod tests { #[test] fn snapshot() { - let mut tracker = SnapshotTracker::new(2, Level0Duration::new_1m()); + let mut tracker = SnapshotTracker::new(2, Level0Duration::new_1m(), None); let p1 = WalPeriod::new( WalFileSequenceNumber::new(1), Timestamp::new(0), @@ -200,8 +218,9 @@ mod tests { tracker.snapshot(), Some(SnapshotInfo { snapshot_details: SnapshotDetails { + snapshot_sequence_number: SnapshotSequenceNumber::new(1), end_time_marker: 120_000000000, - last_sequence_number: WalFileSequenceNumber::new(2) + last_wal_sequence_number: WalFileSequenceNumber::new(2) }, wal_periods: vec![p1, p2], }) @@ -213,8 +232,9 @@ mod tests { tracker.snapshot(), Some(SnapshotInfo { snapshot_details: SnapshotDetails { + snapshot_sequence_number: SnapshotSequenceNumber::new(2), end_time_marker: 240_000000000, - last_sequence_number: WalFileSequenceNumber::new(3) + last_wal_sequence_number: WalFileSequenceNumber::new(3) }, wal_periods: vec![p3] }) @@ -227,8 +247,9 @@ mod tests { tracker.snapshot(), Some(SnapshotInfo { snapshot_details: SnapshotDetails { + snapshot_sequence_number: SnapshotSequenceNumber::new(3), end_time_marker: 360_000000000, - last_sequence_number: WalFileSequenceNumber::new(5) + last_wal_sequence_number: WalFileSequenceNumber::new(5) }, wal_periods: vec![p4, p5] }) @@ -239,7 +260,7 @@ mod tests { #[test] fn snapshot_future_data_forces_snapshot() { - let mut tracker = SnapshotTracker::new(2, Level0Duration::new_1m()); + let mut tracker = SnapshotTracker::new(2, Level0Duration::new_1m(), None); let p1 = WalPeriod::new( WalFileSequenceNumber::new(1), Timestamp::new(0), @@ -285,8 +306,9 @@ mod tests { tracker.snapshot(), Some(SnapshotInfo { snapshot_details: SnapshotDetails { + snapshot_sequence_number: SnapshotSequenceNumber::new(1), end_time_marker: 360000000000, - last_sequence_number: WalFileSequenceNumber::new(6) + last_wal_sequence_number: WalFileSequenceNumber::new(6) }, wal_periods: vec![p1, p2, p3, p4, p5, p6] }) diff --git a/influxdb3_write/src/lib.rs b/influxdb3_write/src/lib.rs index af2a51f4a18..3a295800369 100644 --- a/influxdb3_write/src/lib.rs +++ b/influxdb3_write/src/lib.rs @@ -20,8 +20,8 @@ use datafusion::error::DataFusionError; use datafusion::execution::context::SessionState; use datafusion::physical_plan::SendableRecordBatchStream; use datafusion::prelude::Expr; -use influxdb3_catalog::catalog; -use influxdb3_wal::{LastCacheDefinition, WalFileSequenceNumber}; +use influxdb3_catalog::catalog::{self, SequenceNumber}; +use influxdb3_wal::{LastCacheDefinition, SnapshotSequenceNumber, WalFileSequenceNumber}; use iox_query::QueryChunk; use iox_time::Time; use last_cache::LastCacheProvider; @@ -214,8 +214,12 @@ pub struct PersistedCatalog { /// The collection of Parquet files that were persisted in a snapshot #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] pub struct PersistedSnapshot { + /// The snapshot sequence number associated with this snapshot + pub snapshot_sequence_number: SnapshotSequenceNumber, /// The wal file sequence number that triggered this snapshot pub wal_file_sequence_number: WalFileSequenceNumber, + /// The catalog sequence number associated with this snapshot + pub catalog_sequence_number: SequenceNumber, /// The size of the snapshot parquet files in bytes. pub parquet_size_bytes: u64, /// The number of rows across all parquet files in the snapshot. @@ -230,9 +234,15 @@ pub struct PersistedSnapshot { } impl PersistedSnapshot { - pub fn new(wal_file_sequence_number: WalFileSequenceNumber) -> Self { + pub fn new( + snapshot_sequence_number: SnapshotSequenceNumber, + wal_file_sequence_number: WalFileSequenceNumber, + catalog_sequence_number: SequenceNumber, + ) -> Self { Self { + snapshot_sequence_number, wal_file_sequence_number, + catalog_sequence_number, parquet_size_bytes: 0, row_count: 0, min_time: i64::MAX, diff --git a/influxdb3_write/src/paths.rs b/influxdb3_write/src/paths.rs index 53a48f23e04..52ab9ad9d96 100644 --- a/influxdb3_write/src/paths.rs +++ b/influxdb3_write/src/paths.rs @@ -1,5 +1,5 @@ use chrono::prelude::*; -use influxdb3_wal::WalFileSequenceNumber; +use influxdb3_wal::{SnapshotSequenceNumber, WalFileSequenceNumber}; use object_store::path::Path as ObjPath; use std::ops::Deref; @@ -23,7 +23,7 @@ impl CatalogFilePath { pub fn new(host_prefix: &str, wal_file_sequence_number: WalFileSequenceNumber) -> Self { let path = ObjPath::from(format!( "{host_prefix}/catalogs/{:020}.{}", - object_store_file_stem(wal_file_sequence_number.get()), + object_store_file_stem(wal_file_sequence_number.as_u64()), CATALOG_FILE_EXTENSION )); Self(path) @@ -62,7 +62,7 @@ impl ParquetFilePath { let path = ObjPath::from(format!( "{host_prefix}/dbs/{db_name}/{table_name}/{}/{}.{}", date.format("%Y-%m-%d/%H-%M"), - wal_file_sequence_number.get(), + wal_file_sequence_number.as_u64(), PARQUET_FILE_EXTENSION )); Self(path) @@ -79,7 +79,7 @@ impl ParquetFilePath { let path = ObjPath::from(format!( "dbs/{db_name}/{table_name}/{}/{:010}.{}", date_time.format("%Y-%m-%d/%H-%M"), - wal_file_sequence_number.get(), + wal_file_sequence_number.as_u64(), PARQUET_FILE_EXTENSION )); Self(path) @@ -104,10 +104,10 @@ impl AsRef for ParquetFilePath { pub struct SnapshotInfoFilePath(ObjPath); impl SnapshotInfoFilePath { - pub fn new(host_prefix: &str, wal_file_sequence_number: WalFileSequenceNumber) -> Self { + pub fn new(host_prefix: &str, snapshot_sequence_number: SnapshotSequenceNumber) -> Self { let path = ObjPath::from(format!( "{host_prefix}/snapshots/{:020}.{}", - object_store_file_stem(wal_file_sequence_number.get()), + object_store_file_stem(snapshot_sequence_number.as_u64()), SNAPSHOT_INFO_FILE_EXTENSION )); Self(path) @@ -173,7 +173,7 @@ fn parquet_file_percent_encoded() { #[test] fn snapshot_info_file_path_new() { assert_eq!( - *SnapshotInfoFilePath::new("my_host", WalFileSequenceNumber::new(0)), + *SnapshotInfoFilePath::new("my_host", SnapshotSequenceNumber::new(0)), ObjPath::from("my_host/snapshots/18446744073709551615.info.json") ); } diff --git a/influxdb3_write/src/persister.rs b/influxdb3_write/src/persister.rs index fa0c0fda4a4..c0807a4ec04 100644 --- a/influxdb3_write/src/persister.rs +++ b/influxdb3_write/src/persister.rs @@ -270,7 +270,7 @@ impl Persister for PersisterImpl { async fn persist_snapshot(&self, persisted_snapshot: &PersistedSnapshot) -> Result<()> { let snapshot_file_path = SnapshotInfoFilePath::new( self.host_identifier_prefix.as_str(), - persisted_snapshot.wal_file_sequence_number, + persisted_snapshot.snapshot_sequence_number, ); let json = serde_json::to_vec_pretty(persisted_snapshot)?; self.object_store @@ -362,6 +362,8 @@ impl TrackedMemoryArrowWriter { #[cfg(test)] mod tests { use super::*; + use influxdb3_catalog::catalog::SequenceNumber; + use influxdb3_wal::SnapshotSequenceNumber; use object_store::memory::InMemory; use { arrow::array::Int32Array, arrow::datatypes::DataType, arrow::datatypes::Field, @@ -425,7 +427,9 @@ mod tests { LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap(); let persister = PersisterImpl::new(Arc::new(local_disk), "test_host"); let info_file = PersistedSnapshot { + snapshot_sequence_number: SnapshotSequenceNumber::new(0), wal_file_sequence_number: WalFileSequenceNumber::new(0), + catalog_sequence_number: SequenceNumber::new(0), databases: HashMap::new(), min_time: 0, max_time: 1, @@ -442,7 +446,9 @@ mod tests { LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap(); let persister = PersisterImpl::new(Arc::new(local_disk), "test_host"); let info_file = PersistedSnapshot { + snapshot_sequence_number: SnapshotSequenceNumber::new(0), wal_file_sequence_number: WalFileSequenceNumber::new(0), + catalog_sequence_number: SequenceNumber::default(), databases: HashMap::new(), min_time: 0, max_time: 1, @@ -450,7 +456,9 @@ mod tests { parquet_size_bytes: 0, }; let info_file_2 = PersistedSnapshot { + snapshot_sequence_number: SnapshotSequenceNumber::new(1), wal_file_sequence_number: WalFileSequenceNumber::new(1), + catalog_sequence_number: SequenceNumber::default(), databases: HashMap::new(), max_time: 1, min_time: 0, @@ -458,7 +466,9 @@ mod tests { parquet_size_bytes: 0, }; let info_file_3 = PersistedSnapshot { + snapshot_sequence_number: SnapshotSequenceNumber::new(2), wal_file_sequence_number: WalFileSequenceNumber::new(2), + catalog_sequence_number: SequenceNumber::default(), databases: HashMap::new(), min_time: 0, max_time: 1, @@ -472,9 +482,11 @@ mod tests { let snapshots = persister.load_snapshots(2).await.unwrap(); assert_eq!(snapshots.len(), 2); - // The most recent one is first - assert_eq!(snapshots[0].wal_file_sequence_number.get(), 2); - assert_eq!(snapshots[1].wal_file_sequence_number.get(), 1); + // The most recent files are first + assert_eq!(snapshots[0].wal_file_sequence_number.as_u64(), 2); + assert_eq!(snapshots[0].snapshot_sequence_number.as_u64(), 2); + assert_eq!(snapshots[1].wal_file_sequence_number.as_u64(), 1); + assert_eq!(snapshots[1].snapshot_sequence_number.as_u64(), 1); } #[tokio::test] @@ -483,7 +495,9 @@ mod tests { LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap(); let persister = PersisterImpl::new(Arc::new(local_disk), "test_host"); let info_file = PersistedSnapshot { + snapshot_sequence_number: SnapshotSequenceNumber::new(0), wal_file_sequence_number: WalFileSequenceNumber::new(0), + catalog_sequence_number: SequenceNumber::default(), databases: HashMap::new(), min_time: 0, max_time: 1, @@ -494,7 +508,7 @@ mod tests { let snapshots = persister.load_snapshots(2).await.unwrap(); // We asked for the most recent 2 but there should only be 1 assert_eq!(snapshots.len(), 1); - assert_eq!(snapshots[0].wal_file_sequence_number.get(), 0); + assert_eq!(snapshots[0].wal_file_sequence_number.as_u64(), 0); } #[tokio::test] @@ -505,7 +519,9 @@ mod tests { let persister = PersisterImpl::new(Arc::new(local_disk), "test_host"); for id in 0..9001 { let info_file = PersistedSnapshot { + snapshot_sequence_number: SnapshotSequenceNumber::new(id), wal_file_sequence_number: WalFileSequenceNumber::new(id), + catalog_sequence_number: SequenceNumber::new(id as u32), databases: HashMap::new(), min_time: 0, max_time: 1, @@ -517,7 +533,9 @@ mod tests { let snapshots = persister.load_snapshots(9500).await.unwrap(); // We asked for the most recent 9500 so there should be 9001 of them assert_eq!(snapshots.len(), 9001); - assert_eq!(snapshots[0].wal_file_sequence_number.get(), 9000); + assert_eq!(snapshots[0].wal_file_sequence_number.as_u64(), 9000); + assert_eq!(snapshots[0].snapshot_sequence_number.as_u64(), 9000); + assert_eq!(snapshots[0].catalog_sequence_number.as_u32(), 9000); } #[tokio::test] diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index b9bcceef037..2926bd61a2f 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -111,6 +111,8 @@ pub struct WriteBufferImpl { last_cache: Arc, } +const N_SNAPSHOTS_TO_LOAD_ON_START: usize = 1_000; + impl WriteBufferImpl { pub async fn new( persister: Arc, @@ -128,10 +130,15 @@ impl WriteBufferImpl { let last_cache = Arc::new(LastCacheProvider::new_from_catalog(&catalog.clone_inner())?); - let persisted_snapshots = persister.load_snapshots(1000).await?; - let last_snapshot_wal_sequence = persisted_snapshots + let persisted_snapshots = persister + .load_snapshots(N_SNAPSHOTS_TO_LOAD_ON_START) + .await?; + let last_wal_sequence_number = persisted_snapshots .first() .map(|s| s.wal_file_sequence_number); + let last_snapshot_sequence_number = persisted_snapshots + .first() + .map(|s| s.snapshot_sequence_number); let persisted_files = Arc::new(PersistedFiles::new_from_persisted_snapshots( persisted_snapshots, )); @@ -150,7 +157,8 @@ impl WriteBufferImpl { persister.host_identifier_prefix(), Arc::clone(&queryable_buffer) as Arc, wal_config, - last_snapshot_wal_sequence, + last_wal_sequence_number, + last_snapshot_sequence_number, ) .await?; @@ -581,18 +589,22 @@ impl WriteBuffer for WriteBufferImpl {} #[cfg(test)] mod tests { use super::*; - use crate::paths::CatalogFilePath; + use crate::paths::{CatalogFilePath, SnapshotInfoFilePath}; use crate::persister::PersisterImpl; + use crate::PersistedSnapshot; use arrow::record_batch::RecordBatch; use arrow_util::assert_batches_eq; + use bytes::Bytes; use datafusion::assert_batches_sorted_eq; use datafusion_util::config::register_iox_object_store; use futures_util::StreamExt; - use influxdb3_wal::Level0Duration; + use influxdb3_catalog::catalog::SequenceNumber; + use influxdb3_wal::{Level0Duration, SnapshotSequenceNumber, WalFileSequenceNumber}; use iox_query::exec::IOxSessionContext; use iox_time::{MockProvider, Time}; + use object_store::local::LocalFileSystem; use object_store::memory::InMemory; - use object_store::ObjectStore; + use object_store::{ObjectStore, PutPayload}; #[test] fn parse_lp_into_buffer() { @@ -712,6 +724,7 @@ mod tests { async fn last_cache_create_and_delete_is_durable() { let (wbuf, _ctx) = setup( Time::from_timestamp_nanos(0), + Arc::new(InMemory::new()), WalConfig { level_0_duration: Level0Duration::new_1m(), max_write_buffer_size: 100, @@ -841,6 +854,7 @@ mod tests { async fn returns_chunks_across_parquet_and_buffered_data() { let (write_buffer, session_context) = setup( Time::from_timestamp_nanos(0), + Arc::new(InMemory::new()), WalConfig { level_0_duration: Level0Duration::new_1m(), max_write_buffer_size: 100, @@ -1031,6 +1045,7 @@ mod tests { async fn catalog_snapshots_only_if_updated() { let (write_buffer, _ctx) = setup( Time::from_timestamp_nanos(0), + Arc::new(InMemory::new()), WalConfig { level_0_duration: Level0Duration::new_1m(), max_write_buffer_size: 100, @@ -1147,6 +1162,112 @@ mod tests { verify_snapshot_count(3, &write_buffer.persister).await; } + /// Check that when a WriteBuffer is initialized with existing snapshot files, that newly + /// generated snapshot files use the next sequence number. + #[tokio::test] + async fn new_snapshots_use_correct_sequence() { + // set up a local file system object store: + let object_store: Arc = + Arc::new(LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap()); + + // create a snapshot file that will be loaded on initialization of the write buffer: + let prev_snapshot_seq = SnapshotSequenceNumber::new(42); + let prev_snapshot = PersistedSnapshot::new( + prev_snapshot_seq, + WalFileSequenceNumber::new(0), + SequenceNumber::new(0), + ); + let snapshot_json = serde_json::to_vec(&prev_snapshot).unwrap(); + + // put the snapshot file in object store: + object_store + .put( + &SnapshotInfoFilePath::new("test_host", prev_snapshot_seq), + PutPayload::from_bytes(Bytes::from(snapshot_json)), + ) + .await + .unwrap(); + + // setup the write buffer: + let (wbuf, _ctx) = setup( + Time::from_timestamp_nanos(0), + Arc::clone(&object_store), + WalConfig { + level_0_duration: Level0Duration::new_1m(), + max_write_buffer_size: 100, + flush_interval: Duration::from_millis(5), + snapshot_size: 1, + }, + ) + .await; + + // there should be one snapshot already, i.e., the one we created above: + verify_snapshot_count(1, &wbuf.persister).await; + // there aren't any catalogs yet: + verify_catalog_count(0, object_store.clone()).await; + + // do three writes to force a new snapshot + wbuf.write_lp( + NamespaceName::new("foo").unwrap(), + "cpu bar=1", + Time::from_timestamp(10, 0).unwrap(), + false, + Precision::Nanosecond, + ) + .await + .unwrap(); + wbuf.write_lp( + NamespaceName::new("foo").unwrap(), + "cpu bar=2", + Time::from_timestamp(20, 0).unwrap(), + false, + Precision::Nanosecond, + ) + .await + .unwrap(); + wbuf.write_lp( + NamespaceName::new("foo").unwrap(), + "cpu bar=3", + Time::from_timestamp(30, 0).unwrap(), + false, + Precision::Nanosecond, + ) + .await + .unwrap(); + + // Check that there are now 2 snapshots: + verify_snapshot_count(2, &wbuf.persister).await; + // Check that the next sequence number is used for the new snapshot: + assert_eq!( + prev_snapshot_seq.next(), + wbuf.wal.last_snapshot_sequence_number().await + ); + // There should be a catalog now, since the above writes updated the catalog + verify_catalog_count(1, object_store.clone()).await; + // Check the catalog sequence number in the latest snapshot is correct: + let persisted_snapshot_bytes = object_store + .get(&SnapshotInfoFilePath::new( + "test_host", + prev_snapshot_seq.next(), + )) + .await + .unwrap() + .bytes() + .await + .unwrap(); + let persisted_snapshot = + serde_json::from_slice::(&persisted_snapshot_bytes).unwrap(); + // NOTE: it appears that writes which create a new db increment the catalog sequence twice. + // This is likely due to the catalog sequence being incremented first for the db creation and + // then again for the updates to the table written to. Hence the sequence number is 2 here. + // If we manage to make it so that scenario only increments the catalog sequence once, then + // this assertion may fail: + assert_eq!( + SequenceNumber::new(2), + persisted_snapshot.catalog_sequence_number + ); + } + async fn verify_catalog_count(n: usize, object_store: Arc) { let mut checks = 0; loop { @@ -1159,7 +1280,7 @@ mod tests { if catalogs.len() > n { panic!("checking for {} catalogs but found {}", n, catalogs.len()); } else if catalogs.len() == n && checks > 5 { - // let enough checks happen to ensure extra catalog persists aren't running ion the background + // let enough checks happen to ensure extra catalog persists aren't running in the background break; } else { checks += 1; @@ -1201,9 +1322,9 @@ mod tests { async fn setup( start: Time, + object_store: Arc, wal_config: WalConfig, ) -> (WriteBufferImpl, IOxSessionContext) { - let object_store: Arc = Arc::new(InMemory::new()); let persister = Arc::new(PersisterImpl::new(Arc::clone(&object_store), "test_host")); let time_provider = Arc::new(MockProvider::new(start)); let wbuf = WriteBufferImpl::new( diff --git a/influxdb3_write/src/write_buffer/queryable_buffer.rs b/influxdb3_write/src/write_buffer/queryable_buffer.rs index cb083ce295d..68d084beb4f 100644 --- a/influxdb3_write/src/write_buffer/queryable_buffer.rs +++ b/influxdb3_write/src/write_buffer/queryable_buffer.rs @@ -197,7 +197,10 @@ impl QueryableBuffer { if !catalog.is_updated() { break; } - info!("persisting catalog for wal file {}", wal_file_number.get()); + info!( + "persisting catalog for wal file {}", + wal_file_number.as_u64() + ); let inner_catalog = catalog.clone_inner(); let sequence_number = inner_catalog.sequence_number(); @@ -219,10 +222,14 @@ impl QueryableBuffer { info!( "persisting {} chunks for wal number {}", persist_jobs.len(), - wal_file_number.get(), + wal_file_number.as_u64(), ); // persist the individual files, building the snapshot as we go - let mut persisted_snapshot = PersistedSnapshot::new(wal_file_number); + let mut persisted_snapshot = PersistedSnapshot::new( + snapshot_details.snapshot_sequence_number, + wal_file_number, + catalog.sequence_number(), + ); for persist_job in persist_jobs { let path = persist_job.path.to_string(); let database_name = Arc::clone(&persist_job.database_name);