Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pageserver: fix vectored read image layer skip #9015

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8470,4 +8470,127 @@ mod tests {

Ok(())
}

// Regression test for https://github.com/neondatabase/neon/issues/9012
// Create an image arrangement where we have to read at different LSN ranges
// from a delta layer. This is achieved by overlapping an image layer on top of
// a delta layer. Like so:
//
// A B
// +----------------+ -> delta_layer
// | | ^ lsn
// | =========|-> nested_image_layer |
// | C | |
// +----------------+ |
// ======== -> baseline_image_layer +-------> key
//
//
// When querying the key range [A, B) we need to read at different LSN ranges
// for [A, C) and [C, B). This test checks that the described edge case is handled correctly.
#[tokio::test]
async fn test_vectored_read_with_nested_image_layer() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_vectored_read_with_nested_image_layer").await?;
let (tenant, ctx) = harness.load().await;

fn get_key(id: u32) -> Key {
let mut key = Key::from_hex("110000000033333333444444445500000000").unwrap();
key.field6 = id;
key
}

let mut expected_key_values = HashMap::new();

let baseline_image_layer_lsn = Lsn(0x10);
let mut baseline_img_layer = Vec::new();
for i in 0..5 {
let key = get_key(i);
let value = format!("value {i}@{baseline_image_layer_lsn}");

let removed = expected_key_values.insert(key, value.clone());
assert!(removed.is_none());

baseline_img_layer.push((key, Bytes::from(value)));
}

let nested_image_layer_lsn = Lsn(0x50);
let mut nested_img_layer = Vec::new();
for i in 5..10 {
let key = get_key(i);
let value = format!("value {i}@{nested_image_layer_lsn}");

let removed = expected_key_values.insert(key, value.clone());
assert!(removed.is_none());

nested_img_layer.push((key, Bytes::from(value)));
}

let mut delta_layer_spec = Vec::default();
let delta_layer_start_lsn = Lsn(0x20);
let mut delta_layer_end_lsn = delta_layer_start_lsn;

for i in 0..10 {
let key = get_key(i);
let key_in_nested = nested_img_layer
.iter()
.any(|(key_with_img, _)| *key_with_img == key);
let lsn = {
if key_in_nested {
Lsn(nested_image_layer_lsn.0 + 0x10)
} else {
delta_layer_start_lsn
}
};

let delta = format!("@{lsn}");
delta_layer_spec.push((
key,
lsn,
Value::WalRecord(NeonWalRecord::wal_append(&delta)),
));
delta_layer_end_lsn = std::cmp::max(delta_layer_start_lsn, lsn);

expected_key_values
.get_mut(&key)
.expect("An image exists for each key")
.push_str(delta.as_str());
}

delta_layer_end_lsn = Lsn(delta_layer_end_lsn.0 + 1);

assert!(
nested_image_layer_lsn > delta_layer_start_lsn
&& nested_image_layer_lsn < delta_layer_end_lsn
);

let tline = tenant
.create_test_timeline_with_layers(
TIMELINE_ID,
baseline_image_layer_lsn,
DEFAULT_PG_VERSION,
&ctx,
vec![DeltaLayerTestDesc::new_with_inferred_key_range(
delta_layer_start_lsn..delta_layer_end_lsn,
delta_layer_spec,
)], // delta layers
vec![
(baseline_image_layer_lsn, baseline_img_layer),
(nested_image_layer_lsn, nested_img_layer),
], // image layers
delta_layer_end_lsn,
)
.await?;

let keyspace = KeySpace::single(get_key(0)..get_key(10));
let results = tline
.get_vectored(keyspace, delta_layer_end_lsn, &ctx)
.await
.expect("No vectored errors");
for (key, res) in results {
let value = res.expect("No key errors");
let expected_value = expected_key_values.remove(&key).expect("No unknown keys");
assert_eq!(value, Bytes::from(expected_value));
}

Ok(())
}
}
63 changes: 42 additions & 21 deletions pageserver/src/tenant/storage_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ pub(crate) enum LayerId {
/// Layer wrapper for the read path. Note that it is valid
/// to use these layers even after external operations have
/// been performed on them (compaction, freeze, etc.).
#[derive(Debug)]
#[derive(Debug, Clone)]
pub(crate) enum ReadableLayer {
PersistentLayer(Layer),
InMemoryLayer(Arc<InMemoryLayer>),
Expand All @@ -292,6 +292,8 @@ struct ReadDesc {
layer_id: LayerId,
/// Lsn range for the read, used for selecting the next read
lsn_range: Range<Lsn>,
/// This read's index in [`LayerKeyspace::reads`];
read_id: LayerKeyspaceReadId,
}

/// Data structure which maintains a fringe of layers for the
Expand All @@ -310,9 +312,13 @@ pub(crate) struct LayerFringe {
#[derive(Debug)]
struct LayerKeyspace {
layer: ReadableLayer,
target_keyspace: KeySpaceRandomAccum,
next_read_id: LayerKeyspaceReadId,
reads: HashMap<LayerKeyspaceReadId, (Range<Lsn>, KeySpace)>,
}

#[derive(PartialEq, Eq, Hash, Debug, Clone, Copy)]
struct LayerKeyspaceReadId(usize);

impl LayerFringe {
pub(crate) fn new() -> Self {
LayerFringe {
Expand All @@ -327,22 +333,24 @@ impl LayerFringe {
None => return None,
};

let removed = self.layers.remove_entry(&read_desc.layer_id);
let mut entry = match self.layers.entry(read_desc.layer_id) {
Entry::Occupied(o) => o,
Entry::Vacant(_) => unreachable!("fringe internals are always consistent"),
};

match removed {
Some((
_,
LayerKeyspace {
layer,
mut target_keyspace,
},
)) => Some((
layer,
target_keyspace.consume_keyspace(),
read_desc.lsn_range,
)),
None => unreachable!("fringe internals are always consistent"),
let (lsn_range, keyspace) = entry
.get_mut()
.reads
.remove(&read_desc.read_id)
.expect("fringe internals are always consistent");

let layer = entry.get().layer.clone();

if entry.get().reads.is_empty() {
entry.remove();
}

Some((layer, keyspace, lsn_range))
}

pub(crate) fn update(
Expand All @@ -355,18 +363,31 @@ impl LayerFringe {
let entry = self.layers.entry(layer_id.clone());
match entry {
Entry::Occupied(mut entry) => {
entry.get_mut().target_keyspace.add_keyspace(keyspace);
let read_id = {
let r = &mut entry.get_mut().next_read_id;
let read_id = *r;
*r = LayerKeyspaceReadId(r.0 + 1);
read_id
};
self.planned_reads_by_lsn.push(ReadDesc {
lsn_range: lsn_range.clone(),
layer_id: layer_id.clone(),
read_id,
});
let replaced = entry.get_mut().reads.insert(read_id, (lsn_range, keyspace));
assert!(replaced.is_none());
}
Entry::Vacant(entry) => {
let read_id = LayerKeyspaceReadId(0);
self.planned_reads_by_lsn.push(ReadDesc {
lsn_range,
lsn_range: lsn_range.clone(),
layer_id: layer_id.clone(),
read_id,
});
let mut accum = KeySpaceRandomAccum::new();
accum.add_keyspace(keyspace);
entry.insert(LayerKeyspace {
layer,
target_keyspace: accum,
next_read_id: LayerKeyspaceReadId(1),
reads: [(read_id, (lsn_range, keyspace))].into(),
});
}
}
Expand Down
Loading