diff --git a/libs/wal_decoder/src/decoder.rs b/libs/wal_decoder/src/decoder.rs index 780fce3d69032..7b428d3b42c1b 100644 --- a/libs/wal_decoder/src/decoder.rs +++ b/libs/wal_decoder/src/decoder.rs @@ -2,15 +2,13 @@ //! raw bytes which represent a raw Postgres WAL record. use crate::models::*; -use bytes::{Buf, Bytes, BytesMut}; -use pageserver_api::key::rel_block_to_key; -use pageserver_api::record::NeonWalRecord; +use crate::serialized_batch::SerializedValueBatch; +use bytes::{Buf, Bytes}; use pageserver_api::reltag::{RelTag, SlruKind}; use pageserver_api::shard::ShardIdentity; -use pageserver_api::value::Value; +use postgres_ffi::pg_constants; use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM; use postgres_ffi::walrecord::*; -use postgres_ffi::{page_is_new, page_set_lsn, pg_constants, BLCKSZ}; use utils::lsn::Lsn; impl InterpretedWalRecord { @@ -26,6 +24,7 @@ impl InterpretedWalRecord { ) -> anyhow::Result { let mut decoded = DecodedWALRecord::default(); decode_wal_record(buf, &mut decoded, pg_version)?; + let xid = decoded.xl_xid; let flush_uncommitted = if decoded.is_dbase_create_copy(pg_version) { FlushUncommittedRecords::Yes @@ -34,95 +33,14 @@ impl InterpretedWalRecord { }; let metadata_record = MetadataRecord::from_decoded(&decoded, lsn, pg_version)?; - - let mut blocks = Vec::default(); - for blk in decoded.blocks.iter() { - let rel = RelTag { - spcnode: blk.rnode_spcnode, - dbnode: blk.rnode_dbnode, - relnode: blk.rnode_relnode, - forknum: blk.forknum, - }; - - let key = rel_block_to_key(rel, blk.blkno); - - if !key.is_valid_key_on_write_path() { - anyhow::bail!("Unsupported key decoded at LSN {}: {}", lsn, key); - } - - let key_is_local = shard.is_key_local(&key); - - tracing::debug!( - lsn=%lsn, - key=%key, - "ingest: shard decision {}", - if !key_is_local { "drop" } else { "keep" }, - ); - - if !key_is_local { - if shard.is_shard_zero() { - // Shard 0 tracks relation sizes. Although we will not store this block, we will observe - // its blkno in case it implicitly extends a relation. - blocks.push((key.to_compact(), None)); - } - - continue; - } - - // Instead of storing full-page-image WAL record, - // it is better to store extracted image: we can skip wal-redo - // in this case. Also some FPI records may contain multiple (up to 32) pages, - // so them have to be copied multiple times. - // - let value = if blk.apply_image - && blk.has_image - && decoded.xl_rmid == pg_constants::RM_XLOG_ID - && (decoded.xl_info == pg_constants::XLOG_FPI - || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT) - // compression of WAL is not yet supported: fall back to storing the original WAL record - && !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, pg_version) - // do not materialize null pages because them most likely be soon replaced with real data - && blk.bimg_len != 0 - { - // Extract page image from FPI record - let img_len = blk.bimg_len as usize; - let img_offs = blk.bimg_offset as usize; - let mut image = BytesMut::with_capacity(BLCKSZ as usize); - // TODO(vlad): skip the copy - image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]); - - if blk.hole_length != 0 { - let tail = image.split_off(blk.hole_offset as usize); - image.resize(image.len() + blk.hole_length as usize, 0u8); - image.unsplit(tail); - } - // - // Match the logic of XLogReadBufferForRedoExtended: - // The page may be uninitialized. If so, we can't set the LSN because - // that would corrupt the page. - // - if !page_is_new(&image) { - page_set_lsn(&mut image, lsn) - } - assert_eq!(image.len(), BLCKSZ as usize); - - Value::Image(image.freeze()) - } else { - Value::WalRecord(NeonWalRecord::Postgres { - will_init: blk.will_init || blk.apply_image, - rec: decoded.record.clone(), - }) - }; - - blocks.push((key.to_compact(), Some(value))); - } + let batch = SerializedValueBatch::from_decoded_filtered(decoded, shard, lsn, pg_version)?; Ok(InterpretedWalRecord { metadata_record, - blocks, + batch, lsn, flush_uncommitted, - xid: decoded.xl_xid, + xid, }) } } diff --git a/libs/wal_decoder/src/models.rs b/libs/wal_decoder/src/models.rs index 92b66fcefdf0d..3c7b477365922 100644 --- a/libs/wal_decoder/src/models.rs +++ b/libs/wal_decoder/src/models.rs @@ -25,9 +25,7 @@ //! |--> write to KV store within the pageserver use bytes::Bytes; -use pageserver_api::key::CompactKey; use pageserver_api::reltag::{RelTag, SlruKind}; -use pageserver_api::value::Value; use postgres_ffi::walrecord::{ XlMultiXactCreate, XlMultiXactTruncate, XlRelmapUpdate, XlReploriginDrop, XlReploriginSet, XlSmgrTruncate, XlXactParsedRecord, @@ -35,6 +33,8 @@ use postgres_ffi::walrecord::{ use postgres_ffi::{Oid, TransactionId}; use utils::lsn::Lsn; +use crate::serialized_batch::SerializedValueBatch; + pub enum FlushUncommittedRecords { Yes, No, @@ -45,10 +45,9 @@ pub struct InterpretedWalRecord { /// Optional metadata record - may cause writes to metadata keys /// in the storage engine pub metadata_record: Option, - /// Images or deltas for blocks modified in the original WAL record. - /// The [`Value`] is optional to avoid sending superfluous data to - /// shard 0 for relation size tracking. - pub blocks: Vec<(CompactKey, Option)>, + /// A pre-serialized batch along with the required metadata for ingestion + /// by the pageserver + pub batch: SerializedValueBatch, /// Byte offset within WAL for the end of the original PG WAL record pub lsn: Lsn, /// Whether to flush all uncommitted modifications to the storage engine diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 16cb2b6dbe7ae..ca9083d16c686 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -24,6 +24,7 @@ use pageserver_api::key::{ use pageserver_api::keyspace::SparseKeySpace; use pageserver_api::record::NeonWalRecord; use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind}; +use pageserver_api::shard::ShardIdentity; use pageserver_api::value::Value; use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM}; use postgres_ffi::BLCKSZ; @@ -171,12 +172,11 @@ impl Timeline { tline: self, pending_lsns: Vec::new(), pending_metadata_pages: HashMap::new(), - pending_data_pages: Vec::new(), - pending_zero_data_pages: Default::default(), + pending_data_batch: None, pending_deletions: Vec::new(), pending_nblocks: 0, pending_directory_entries: Vec::new(), - pending_bytes: 0, + pending_metadata_bytes: 0, lsn, } } @@ -1026,21 +1026,14 @@ pub struct DatadirModification<'a> { /// Data writes, ready to be flushed into an ephemeral layer. See [`Self::is_data_key`] for /// which keys are stored here. - pending_data_pages: Vec<(CompactKey, Lsn, usize, Value)>, - - // Sometimes during ingest, for example when extending a relation, we would like to write a zero page. However, - // if we encounter a write from postgres in the same wal record, we will drop this entry. - // - // Unlike other 'pending' fields, this does not last until the next call to commit(): it is flushed - // at the end of each wal record, and all these writes implicitly are at lsn Self::lsn - pending_zero_data_pages: HashSet, + pending_data_batch: Option, /// For special "directory" keys that store key-value maps, track the size of the map /// if it was updated in this modification. pending_directory_entries: Vec<(DirectoryKind, usize)>, - /// An **approximation** of how large our EphemeralFile write will be when committed. - pending_bytes: usize, + /// An **approximation** of how many metadata bytes will be written to the EphemeralFile. + pending_metadata_bytes: usize, } impl<'a> DatadirModification<'a> { @@ -1055,11 +1048,17 @@ impl<'a> DatadirModification<'a> { } pub(crate) fn approx_pending_bytes(&self) -> usize { - self.pending_bytes + self.pending_data_batch + .as_ref() + .map_or(0, |b| b.buffer_size()) + + self.pending_metadata_bytes } - pub(crate) fn has_dirty_data_pages(&self) -> bool { - (!self.pending_data_pages.is_empty()) || (!self.pending_zero_data_pages.is_empty()) + pub(crate) fn dirty(&self) -> bool { + !self + .pending_data_batch + .as_ref() + .map_or(true, |b| b.is_empty()) } /// Set the current lsn @@ -1071,9 +1070,6 @@ impl<'a> DatadirModification<'a> { self.lsn ); - // If we are advancing LSN, then state from previous wal record should have been flushed. - assert!(self.pending_zero_data_pages.is_empty()); - if lsn > self.lsn { self.pending_lsns.push(self.lsn); self.lsn = lsn; @@ -1180,6 +1176,61 @@ impl<'a> DatadirModification<'a> { } } + pub async fn ingest_batch( + &mut self, + mut batch: SerializedValueBatch, + // TODO(vlad): remove this argument and replace the shard check with is_key_local + shard: &ShardIdentity, + ctx: &RequestContext, + ) -> anyhow::Result<()> { + let mut gaps_at_lsns = Vec::default(); + + for meta in batch.metadata.iter() { + let (rel, blkno) = Key::from_compact(*meta.key()).to_rel_block()?; + let new_nblocks = blkno + 1; + + let old_nblocks = self.create_relation_if_required(rel, ctx).await?; + if new_nblocks > old_nblocks { + self.put_rel_extend(rel, new_nblocks, ctx).await?; + } + + let gaps = { + let mut key = rel_block_to_key(rel, blkno); + let mut gap_accum = KeySpaceAccum::new(); + + for gap_blkno in old_nblocks..blkno { + key.field6 = gap_blkno; + + if shard.get_shard_number(&key) != shard.number { + continue; + } + + gap_accum.add_key(key); + } + + gap_accum.to_keyspace() + }; + + gaps_at_lsns.push((gaps, *meta.lsn())); + } + + batch.zero_gaps(gaps_at_lsns); + + match self.pending_data_batch.as_mut() { + Some(pending_batch) => { + pending_batch.extend(batch); + } + None if !batch.is_empty() => { + self.pending_data_batch = Some(batch); + } + None => { + // Nothing to initialize the batch with + } + } + + Ok(()) + } + /// Put a new page version that can be constructed from a WAL record /// /// NOTE: this will *not* implicitly extend the relation, if the page is beyond the @@ -1262,8 +1313,13 @@ impl<'a> DatadirModification<'a> { self.lsn ); } - self.pending_zero_data_pages.insert(key.to_compact()); - self.pending_bytes += ZERO_PAGE.len(); + + let batch = self + .pending_data_batch + .get_or_insert(SerializedValueBatch::default()); + + batch.put(key.to_compact(), Value::Image(ZERO_PAGE.clone()), self.lsn); + Ok(()) } @@ -1281,17 +1337,14 @@ impl<'a> DatadirModification<'a> { self.lsn ); } - self.pending_zero_data_pages.insert(key.to_compact()); - self.pending_bytes += ZERO_PAGE.len(); - Ok(()) - } - /// Call this at the end of each WAL record. - pub(crate) fn on_record_end(&mut self) { - let pending_zero_data_pages = std::mem::take(&mut self.pending_zero_data_pages); - for key in pending_zero_data_pages { - self.put_data(key, Value::Image(ZERO_PAGE.clone())); - } + let batch = self + .pending_data_batch + .get_or_insert(SerializedValueBatch::default()); + + batch.put(key.to_compact(), Value::Image(ZERO_PAGE.clone()), self.lsn); + + Ok(()) } /// Store a relmapper file (pg_filenode.map) in the repository @@ -1783,14 +1836,17 @@ impl<'a> DatadirModification<'a> { let mut writer = self.tline.writer().await; // Flush relation and SLRU data blocks, keep metadata. - let pending_data_pages = std::mem::take(&mut self.pending_data_pages); + if let Some(batch) = self.pending_data_batch.take() { + tracing::debug!( + "Flushing batch with max_lsn={}. Last record LSN is {}", + batch.max_lsn, + self.tline.get_last_record_lsn() + ); - // This bails out on first error without modifying pending_updates. - // That's Ok, cf this function's doc comment. - writer - .put_batch(SerializedValueBatch::from_values(pending_data_pages), ctx) - .await?; - self.pending_bytes = 0; + // This bails out on first error without modifying pending_updates. + // That's Ok, cf this function's doc comment. + writer.put_batch(batch, ctx).await?; + } if pending_nblocks != 0 { writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ)); @@ -1810,9 +1866,6 @@ impl<'a> DatadirModification<'a> { /// All the modifications in this atomic update are stamped by the specified LSN. /// pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> { - // Commit should never be called mid-wal-record - assert!(self.pending_zero_data_pages.is_empty()); - let mut writer = self.tline.writer().await; let pending_nblocks = self.pending_nblocks; @@ -1820,23 +1873,49 @@ impl<'a> DatadirModification<'a> { // Ordering: the items in this batch do not need to be in any global order, but values for // a particular Key must be in Lsn order relative to one another. InMemoryLayer relies on - // this to do efficient updates to its index. - let mut write_batch = std::mem::take(&mut self.pending_data_pages); + // this to do efficient updates to its index. See [`wal_decoder::serialized_batch`] for + // more details. - write_batch.extend( - self.pending_metadata_pages + let metadata_batch = { + let pending_meta = self + .pending_metadata_pages .drain() .flat_map(|(key, values)| { values .into_iter() .map(move |(lsn, value_size, value)| (key, lsn, value_size, value)) - }), - ); + }) + .collect::>(); - if !write_batch.is_empty() { - writer - .put_batch(SerializedValueBatch::from_values(write_batch), ctx) - .await?; + if pending_meta.is_empty() { + None + } else { + Some(SerializedValueBatch::from_values(pending_meta)) + } + }; + + let data_batch = self.pending_data_batch.take(); + + let maybe_batch = match (data_batch, metadata_batch) { + (Some(mut data), Some(metadata)) => { + data.extend(metadata); + Some(data) + } + (Some(data), None) => Some(data), + (None, Some(metadata)) => Some(metadata), + (None, None) => None, + }; + + if let Some(batch) = maybe_batch { + tracing::debug!( + "Flushing batch with max_lsn={}. Last record LSN is {}", + batch.max_lsn, + self.tline.get_last_record_lsn() + ); + + // This bails out on first error without modifying pending_updates. + // That's Ok, cf this function's doc comment. + writer.put_batch(batch, ctx).await?; } if !self.pending_deletions.is_empty() { @@ -1846,6 +1925,9 @@ impl<'a> DatadirModification<'a> { self.pending_lsns.push(self.lsn); for pending_lsn in self.pending_lsns.drain(..) { + // TODO(vlad): pretty sure the comment below is not valid anymore + // and we can call finish write with the latest LSN + // // Ideally, we should be able to call writer.finish_write() only once // with the highest LSN. However, the last_record_lsn variable in the // timeline keeps track of the latest LSN and the immediate previous LSN @@ -1861,14 +1943,14 @@ impl<'a> DatadirModification<'a> { writer.update_directory_entries_count(kind, count as u64); } - self.pending_bytes = 0; + self.pending_metadata_bytes = 0; Ok(()) } pub(crate) fn len(&self) -> usize { self.pending_metadata_pages.len() - + self.pending_data_pages.len() + + self.pending_data_batch.as_ref().map_or(0, |b| b.len()) + self.pending_deletions.len() } @@ -1910,11 +1992,10 @@ impl<'a> DatadirModification<'a> { // modifications before ingesting DB create operations, which are the only kind that reads // data pages during ingest. if cfg!(debug_assertions) { - for (dirty_key, _, _, _) in &self.pending_data_pages { - debug_assert!(&key.to_compact() != dirty_key); - } - - debug_assert!(!self.pending_zero_data_pages.contains(&key.to_compact())) + assert!(!self + .pending_data_batch + .as_ref() + .map_or(false, |b| b.updates_key(&key))); } } @@ -1932,18 +2013,10 @@ impl<'a> DatadirModification<'a> { } fn put_data(&mut self, key: CompactKey, val: Value) { - let val_serialized_size = val.serialized_size().unwrap() as usize; - - // If this page was previously zero'd in the same WalRecord, then drop the previous zero page write. This - // is an optimization that avoids persisting both the zero page generated by us (e.g. during a relation extend), - // and the subsequent postgres-originating write - if self.pending_zero_data_pages.remove(&key) { - self.pending_bytes -= ZERO_PAGE.len(); - } - - self.pending_bytes += val_serialized_size; - self.pending_data_pages - .push((key, self.lsn, val_serialized_size, val)) + let batch = self + .pending_data_batch + .get_or_insert(SerializedValueBatch::default()); + batch.put(key, val, self.lsn); } fn put_metadata(&mut self, key: CompactKey, val: Value) { @@ -1951,10 +2024,10 @@ impl<'a> DatadirModification<'a> { // Replace the previous value if it exists at the same lsn if let Some((last_lsn, last_value_ser_size, last_value)) = values.last_mut() { if *last_lsn == self.lsn { - // Update the pending_bytes contribution from this entry, and update the serialized size in place - self.pending_bytes -= *last_value_ser_size; + // Update the pending_metadata_bytes contribution from this entry, and update the serialized size in place + self.pending_metadata_bytes -= *last_value_ser_size; *last_value_ser_size = val.serialized_size().unwrap() as usize; - self.pending_bytes += *last_value_ser_size; + self.pending_metadata_bytes += *last_value_ser_size; // Use the latest value, this replaces any earlier write to the same (key,lsn), such as much // have been generated by synthesized zero page writes prior to the first real write to a page. @@ -1964,8 +2037,12 @@ impl<'a> DatadirModification<'a> { } let val_serialized_size = val.serialized_size().unwrap() as usize; - self.pending_bytes += val_serialized_size; + self.pending_metadata_bytes += val_serialized_size; values.push((self.lsn, val_serialized_size, val)); + + if key == CHECKPOINT_KEY.to_compact() { + tracing::debug!("Checkpoint key added to pending with size {val_serialized_size}"); + } } fn delete(&mut self, key_range: Range) { diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index c35c3c365f944..74507499bb9f1 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -28,14 +28,13 @@ use std::time::Duration; use std::time::Instant; use std::time::SystemTime; -use pageserver_api::key::Key; use pageserver_api::shard::ShardIdentity; use postgres_ffi::fsm_logical_to_physical; use postgres_ffi::walrecord::*; use postgres_ffi::{dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch, TimestampTz}; use wal_decoder::models::*; -use anyhow::{bail, Context, Result}; +use anyhow::{bail, Result}; use bytes::{Buf, Bytes}; use tracing::*; use utils::failpoint_support; @@ -51,7 +50,6 @@ use crate::ZERO_PAGE; use pageserver_api::key::rel_block_to_key; use pageserver_api::record::NeonWalRecord; use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind}; -use pageserver_api::value::Value; use postgres_ffi::pg_constants; use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM}; use postgres_ffi::TransactionId; @@ -161,7 +159,7 @@ impl WalIngest { if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes) { // Records of this type should always be preceded by a commit(), as they // rely on reading data pages back from the Timeline. - assert!(!modification.has_dirty_data_pages()); + assert!(!modification.dirty()); } assert!(!self.checkpoint_modified); @@ -275,28 +273,9 @@ impl WalIngest { } } - // Iterate through all the key value pairs provided in the interpreted block - // and update the modification currently in-flight to include them. - for (compact_key, maybe_value) in interpreted.blocks.into_iter() { - let (rel, blk) = Key::from_compact(compact_key).to_rel_block()?; - match maybe_value { - Some(Value::Image(img)) => { - self.put_rel_page_image(modification, rel, blk, img, ctx) - .await?; - } - Some(Value::WalRecord(rec)) => { - self.put_rel_wal_record(modification, rel, blk, rec, ctx) - .await?; - } - None => { - // Shard 0 tracks relation sizes. We will observe - // its blkno in case it implicitly extends a relation. - assert!(self.shard.is_shard_zero()); - self.observe_decoded_block(modification, rel, blk, ctx) - .await?; - } - } - } + modification + .ingest_batch(interpreted.batch, &self.shard, ctx) + .await?; // If checkpoint data was updated, store the new version in the repository if self.checkpoint_modified { @@ -310,8 +289,6 @@ impl WalIngest { // until commit() is called to flush the data into the repository and update // the latest LSN. - modification.on_record_end(); - Ok(modification.len() > prev_len) } @@ -334,17 +311,6 @@ impl WalIngest { Ok((epoch as u64) << 32 | xid as u64) } - /// Do not store this block, but observe it for the purposes of updating our relation size state. - async fn observe_decoded_block( - &mut self, - modification: &mut DatadirModification<'_>, - rel: RelTag, - blkno: BlockNumber, - ctx: &RequestContext, - ) -> Result<(), PageReconstructError> { - self.handle_rel_extend(modification, rel, blkno, ctx).await - } - async fn ingest_clear_vm_bits( &mut self, clear_vm_bits: ClearVmBits, @@ -1248,6 +1214,7 @@ impl WalIngest { Ok(()) } + #[cfg(test)] async fn put_rel_page_image( &mut self, modification: &mut DatadirModification<'_>, @@ -1524,25 +1491,21 @@ mod tests { walingest .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 2"), &ctx) .await?; - m.on_record_end(); m.commit(&ctx).await?; let mut m = tline.begin_modification(Lsn(0x30)); walingest .put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 3"), &ctx) .await?; - m.on_record_end(); m.commit(&ctx).await?; let mut m = tline.begin_modification(Lsn(0x40)); walingest .put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1 at 4"), &ctx) .await?; - m.on_record_end(); m.commit(&ctx).await?; let mut m = tline.begin_modification(Lsn(0x50)); walingest .put_rel_page_image(&mut m, TESTREL_A, 2, test_img("foo blk 2 at 5"), &ctx) .await?; - m.on_record_end(); m.commit(&ctx).await?; assert_current_logical_size(&tline, Lsn(0x50)); @@ -1684,7 +1647,6 @@ mod tests { walingest .put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1"), &ctx) .await?; - m.on_record_end(); m.commit(&ctx).await?; assert_eq!( tline @@ -1710,7 +1672,6 @@ mod tests { walingest .put_rel_page_image(&mut m, TESTREL_A, 1500, test_img("foo blk 1500"), &ctx) .await?; - m.on_record_end(); m.commit(&ctx).await?; assert_eq!( tline