Skip to content

Commit

Permalink
pageserver: cut over to batching in interpreted wal records
Browse files Browse the repository at this point in the history
This is a bigger bite than I would have liked, but this stuff is
tightly coupled.

The main goal of this commit is to use the `SerializedValueBatch`
in `InterpretedWalRecord`. This requires that `DatadirModification`
maintains a `SerializedValueBatch` and extends it as new WAL records
come in (to avoid flushing to disk on every record). In turn, this
cascaded into a number of modifications to `DatadirModification`
1. Replace `pending_data_pages` and `pending_zero_data_pages` with
`pending_data_batch`.
2. Removal of `pending_zero_data_pages` and its cousin
`on_wal_record_end`
3. Rename `pending_bytes` to `pending_metadata_bytes` since this is what
it tracks now.
4. Adapting of various utility methods like `len`,
`approx_pending_bytes` and `has_dirty_data_pages`.

Removal of `pending_zero_data_pages` and the optimisation associated
with it ((1) and (2)) deserves more detail.

Previously all zero data pages went through `pending_zero_data_pages`.
We wrote zero data pages when filling gaps caused by relation extension
(case A) and when handling special wal records (case B). If it happened
that the same WAL record contained a non zero write for an entry in
`pending_zero_data_pages` we skipped the zero write.

Case A: We handle relation extensions differently now. Once the batch
grows large enough, we identify the gaps and fill the gaps right before
flushing to the ephemeral file. Hence, the optimisation is not required
anymore. It also means that we don't account for the zero data pages
when approximating the size of the batch. I think this is fine - mid-long
term we shouldn't be writing zeroes anyway.

Case B: When the handling of a special record needs to zero out a key,
it just adds that to the current batch. I inspected the code, and I
don't think the optimisation kicked in here.
  • Loading branch information
VladLazar committed Oct 31, 2024
1 parent 2af6b50 commit 241f79d
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 214 deletions.
96 changes: 7 additions & 89 deletions libs/wal_decoder/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@
//! raw bytes which represent a raw Postgres WAL record.
use crate::models::*;
use bytes::{Buf, Bytes, BytesMut};
use pageserver_api::key::rel_block_to_key;
use pageserver_api::record::NeonWalRecord;
use crate::serialized_batch::SerializedValueBatch;
use bytes::{Buf, Bytes};
use pageserver_api::reltag::{RelTag, SlruKind};
use pageserver_api::shard::ShardIdentity;
use pageserver_api::value::Value;
use postgres_ffi::pg_constants;
use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM;
use postgres_ffi::walrecord::*;
use postgres_ffi::{page_is_new, page_set_lsn, pg_constants, BLCKSZ};
use utils::lsn::Lsn;

impl InterpretedWalRecord {
Expand All @@ -26,6 +24,7 @@ impl InterpretedWalRecord {
) -> anyhow::Result<InterpretedWalRecord> {
let mut decoded = DecodedWALRecord::default();
decode_wal_record(buf, &mut decoded, pg_version)?;
let xid = decoded.xl_xid;

let flush_uncommitted = if decoded.is_dbase_create_copy(pg_version) {
FlushUncommittedRecords::Yes
Expand All @@ -34,95 +33,14 @@ impl InterpretedWalRecord {
};

let metadata_record = MetadataRecord::from_decoded(&decoded, lsn, pg_version)?;

let mut blocks = Vec::default();
for blk in decoded.blocks.iter() {
let rel = RelTag {
spcnode: blk.rnode_spcnode,
dbnode: blk.rnode_dbnode,
relnode: blk.rnode_relnode,
forknum: blk.forknum,
};

let key = rel_block_to_key(rel, blk.blkno);

if !key.is_valid_key_on_write_path() {
anyhow::bail!("Unsupported key decoded at LSN {}: {}", lsn, key);
}

let key_is_local = shard.is_key_local(&key);

tracing::debug!(
lsn=%lsn,
key=%key,
"ingest: shard decision {}",
if !key_is_local { "drop" } else { "keep" },
);

if !key_is_local {
if shard.is_shard_zero() {
// Shard 0 tracks relation sizes. Although we will not store this block, we will observe
// its blkno in case it implicitly extends a relation.
blocks.push((key.to_compact(), None));
}

continue;
}

// Instead of storing full-page-image WAL record,
// it is better to store extracted image: we can skip wal-redo
// in this case. Also some FPI records may contain multiple (up to 32) pages,
// so them have to be copied multiple times.
//
let value = if blk.apply_image
&& blk.has_image
&& decoded.xl_rmid == pg_constants::RM_XLOG_ID
&& (decoded.xl_info == pg_constants::XLOG_FPI
|| decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
// compression of WAL is not yet supported: fall back to storing the original WAL record
&& !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, pg_version)
// do not materialize null pages because them most likely be soon replaced with real data
&& blk.bimg_len != 0
{
// Extract page image from FPI record
let img_len = blk.bimg_len as usize;
let img_offs = blk.bimg_offset as usize;
let mut image = BytesMut::with_capacity(BLCKSZ as usize);
// TODO(vlad): skip the copy
image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);

if blk.hole_length != 0 {
let tail = image.split_off(blk.hole_offset as usize);
image.resize(image.len() + blk.hole_length as usize, 0u8);
image.unsplit(tail);
}
//
// Match the logic of XLogReadBufferForRedoExtended:
// The page may be uninitialized. If so, we can't set the LSN because
// that would corrupt the page.
//
if !page_is_new(&image) {
page_set_lsn(&mut image, lsn)
}
assert_eq!(image.len(), BLCKSZ as usize);

Value::Image(image.freeze())
} else {
Value::WalRecord(NeonWalRecord::Postgres {
will_init: blk.will_init || blk.apply_image,
rec: decoded.record.clone(),
})
};

blocks.push((key.to_compact(), Some(value)));
}
let batch = SerializedValueBatch::from_decoded_filtered(decoded, shard, lsn, pg_version)?;

Ok(InterpretedWalRecord {
metadata_record,
blocks,
batch,
lsn,
flush_uncommitted,
xid: decoded.xl_xid,
xid,
})
}
}
Expand Down
11 changes: 5 additions & 6 deletions libs/wal_decoder/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@
//! |--> write to KV store within the pageserver
use bytes::Bytes;
use pageserver_api::key::CompactKey;
use pageserver_api::reltag::{RelTag, SlruKind};
use pageserver_api::value::Value;
use postgres_ffi::walrecord::{
XlMultiXactCreate, XlMultiXactTruncate, XlRelmapUpdate, XlReploriginDrop, XlReploriginSet,
XlSmgrTruncate, XlXactParsedRecord,
};
use postgres_ffi::{Oid, TransactionId};
use utils::lsn::Lsn;

use crate::serialized_batch::SerializedValueBatch;

pub enum FlushUncommittedRecords {
Yes,
No,
Expand All @@ -45,10 +45,9 @@ pub struct InterpretedWalRecord {
/// Optional metadata record - may cause writes to metadata keys
/// in the storage engine
pub metadata_record: Option<MetadataRecord>,
/// Images or deltas for blocks modified in the original WAL record.
/// The [`Value`] is optional to avoid sending superfluous data to
/// shard 0 for relation size tracking.
pub blocks: Vec<(CompactKey, Option<Value>)>,
/// A pre-serialized batch along with the required metadata for ingestion
/// by the pageserver
pub batch: SerializedValueBatch,
/// Byte offset within WAL for the end of the original PG WAL record
pub lsn: Lsn,
/// Whether to flush all uncommitted modifications to the storage engine
Expand Down
Loading

0 comments on commit 241f79d

Please sign in to comment.