Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pageserver: ingest pre-serialized batches of values #9579

Merged
merged 14 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion libs/wal_decoder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ edition.workspace = true
license.workspace = true

[features]
testing = []
testing = ["pageserver_api/testing"]

[dependencies]
anyhow.workspace = true
Expand Down
113 changes: 18 additions & 95 deletions libs/wal_decoder/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@
//! raw bytes which represent a raw Postgres WAL record.

use crate::models::*;
use bytes::{Buf, Bytes, BytesMut};
use pageserver_api::key::rel_block_to_key;
use pageserver_api::record::NeonWalRecord;
use crate::serialized_batch::SerializedValueBatch;
use bytes::{Buf, Bytes};
use pageserver_api::reltag::{RelTag, SlruKind};
use pageserver_api::shard::ShardIdentity;
use pageserver_api::value::Value;
use postgres_ffi::pg_constants;
use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM;
use postgres_ffi::walrecord::*;
use postgres_ffi::{page_is_new, page_set_lsn, pg_constants, BLCKSZ};
use utils::lsn::Lsn;

impl InterpretedWalRecord {
Expand All @@ -21,116 +19,41 @@ impl InterpretedWalRecord {
pub fn from_bytes_filtered(
buf: Bytes,
shard: &ShardIdentity,
lsn: Lsn,
record_end_lsn: Lsn,
pg_version: u32,
) -> anyhow::Result<InterpretedWalRecord> {
let mut decoded = DecodedWALRecord::default();
decode_wal_record(buf, &mut decoded, pg_version)?;
let xid = decoded.xl_xid;

let flush_uncommitted = if decoded.is_dbase_create_copy(pg_version) {
FlushUncommittedRecords::Yes
} else {
FlushUncommittedRecords::No
};

let metadata_record = MetadataRecord::from_decoded(&decoded, lsn, pg_version)?;

let mut blocks = Vec::default();
for blk in decoded.blocks.iter() {
let rel = RelTag {
spcnode: blk.rnode_spcnode,
dbnode: blk.rnode_dbnode,
relnode: blk.rnode_relnode,
forknum: blk.forknum,
};

let key = rel_block_to_key(rel, blk.blkno);

if !key.is_valid_key_on_write_path() {
anyhow::bail!("Unsupported key decoded at LSN {}: {}", lsn, key);
}

let key_is_local = shard.is_key_local(&key);

tracing::debug!(
lsn=%lsn,
key=%key,
"ingest: shard decision {}",
if !key_is_local { "drop" } else { "keep" },
);

if !key_is_local {
if shard.is_shard_zero() {
// Shard 0 tracks relation sizes. Although we will not store this block, we will observe
// its blkno in case it implicitly extends a relation.
blocks.push((key.to_compact(), None));
}

continue;
}

// Instead of storing full-page-image WAL record,
// it is better to store extracted image: we can skip wal-redo
// in this case. Also some FPI records may contain multiple (up to 32) pages,
// so them have to be copied multiple times.
//
let value = if blk.apply_image
&& blk.has_image
&& decoded.xl_rmid == pg_constants::RM_XLOG_ID
&& (decoded.xl_info == pg_constants::XLOG_FPI
|| decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
// compression of WAL is not yet supported: fall back to storing the original WAL record
&& !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, pg_version)
// do not materialize null pages because them most likely be soon replaced with real data
&& blk.bimg_len != 0
{
// Extract page image from FPI record
let img_len = blk.bimg_len as usize;
let img_offs = blk.bimg_offset as usize;
let mut image = BytesMut::with_capacity(BLCKSZ as usize);
// TODO(vlad): skip the copy
image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);

if blk.hole_length != 0 {
let tail = image.split_off(blk.hole_offset as usize);
image.resize(image.len() + blk.hole_length as usize, 0u8);
image.unsplit(tail);
}
//
// Match the logic of XLogReadBufferForRedoExtended:
// The page may be uninitialized. If so, we can't set the LSN because
// that would corrupt the page.
//
if !page_is_new(&image) {
page_set_lsn(&mut image, lsn)
}
assert_eq!(image.len(), BLCKSZ as usize);

Value::Image(image.freeze())
} else {
Value::WalRecord(NeonWalRecord::Postgres {
will_init: blk.will_init || blk.apply_image,
rec: decoded.record.clone(),
})
};

blocks.push((key.to_compact(), Some(value)));
}
let metadata_record = MetadataRecord::from_decoded(&decoded, record_end_lsn, pg_version)?;
let batch = SerializedValueBatch::from_decoded_filtered(
decoded,
shard,
record_end_lsn,
pg_version,
)?;

Ok(InterpretedWalRecord {
metadata_record,
blocks,
lsn,
batch,
end_lsn: record_end_lsn,
flush_uncommitted,
xid: decoded.xl_xid,
xid,
})
}
}

impl MetadataRecord {
fn from_decoded(
decoded: &DecodedWALRecord,
lsn: Lsn,
record_end_lsn: Lsn,
pg_version: u32,
) -> anyhow::Result<Option<MetadataRecord>> {
// Note: this doesn't actually copy the bytes since
Expand All @@ -151,7 +74,7 @@ impl MetadataRecord {
Ok(None)
}
pg_constants::RM_CLOG_ID => Self::decode_clog_record(&mut buf, decoded, pg_version),
pg_constants::RM_XACT_ID => Self::decode_xact_record(&mut buf, decoded, lsn),
pg_constants::RM_XACT_ID => Self::decode_xact_record(&mut buf, decoded, record_end_lsn),
pg_constants::RM_MULTIXACT_ID => {
Self::decode_multixact_record(&mut buf, decoded, pg_version)
}
Expand All @@ -163,7 +86,7 @@ impl MetadataRecord {
//
// Alternatively, one can make the checkpoint part of the subscription protocol
// to the pageserver. This should work fine, but can be done at a later point.
pg_constants::RM_XLOG_ID => Self::decode_xlog_record(&mut buf, decoded, lsn),
pg_constants::RM_XLOG_ID => Self::decode_xlog_record(&mut buf, decoded, record_end_lsn),
pg_constants::RM_LOGICALMSG_ID => {
Self::decode_logical_message_record(&mut buf, decoded)
}
Expand Down
1 change: 1 addition & 0 deletions libs/wal_decoder/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod decoder;
pub mod models;
pub mod serialized_batch;
16 changes: 8 additions & 8 deletions libs/wal_decoder/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
//! ready for the pageserver to interpret. They are derived from the original
//! WAL records, so that each struct corresponds closely to one WAL record of
//! a specific kind. They contain the same information as the original WAL records,
//! just decoded into structs and fields for easier access.
//! but the values are already serialized in a [`SerializedValueBatch`], which
//! is the format that the pageserver is expecting them in.
//!
//! The ingestion code uses these structs to help with parsing the WAL records,
//! and it splits them into a stream of modifications to the key-value pairs that
Expand All @@ -25,16 +26,16 @@
//! |--> write to KV store within the pageserver

use bytes::Bytes;
use pageserver_api::key::CompactKey;
use pageserver_api::reltag::{RelTag, SlruKind};
use pageserver_api::value::Value;
use postgres_ffi::walrecord::{
XlMultiXactCreate, XlMultiXactTruncate, XlRelmapUpdate, XlReploriginDrop, XlReploriginSet,
XlSmgrTruncate, XlXactParsedRecord,
};
use postgres_ffi::{Oid, TransactionId};
use utils::lsn::Lsn;

use crate::serialized_batch::SerializedValueBatch;

pub enum FlushUncommittedRecords {
Yes,
No,
Expand All @@ -45,12 +46,11 @@ pub struct InterpretedWalRecord {
/// Optional metadata record - may cause writes to metadata keys
/// in the storage engine
pub metadata_record: Option<MetadataRecord>,
/// Images or deltas for blocks modified in the original WAL record.
/// The [`Value`] is optional to avoid sending superfluous data to
/// shard 0 for relation size tracking.
pub blocks: Vec<(CompactKey, Option<Value>)>,
/// A pre-serialized batch along with the required metadata for ingestion
/// by the pageserver
pub batch: SerializedValueBatch,
VladLazar marked this conversation as resolved.
Show resolved Hide resolved
/// Byte offset within WAL for the end of the original PG WAL record
pub lsn: Lsn,
pub end_lsn: Lsn,
/// Whether to flush all uncommitted modifications to the storage engine
/// before ingesting this record. This is currently only used for legacy PG
/// database creations which read pages from a template database. Such WAL
Expand Down
Loading
Loading