Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: manifest improvements #303

Merged
merged 10 commits into from
Oct 13, 2022
1 change: 1 addition & 0 deletions src/storage/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! Storage engine implementation.
#![feature(map_first_last)]
mod arrow_stream;
mod background;
mod chunk;
Expand Down
6 changes: 6 additions & 0 deletions src/storage/src/manifest/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub struct RawColumnFamiliesMetadata {

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct RegionChange {
pub committed_sequence: SequenceNumber,
pub metadata: RawRegionMetadata,
}

Expand Down Expand Up @@ -94,6 +95,11 @@ impl RegionMetaActionList {
impl MetaAction for RegionMetaActionList {
type Error = error::Error;

fn set_protocol(&mut self, action: ProtocolAction) {
// The protocol action should be the first action in action list by convention.
self.actions.insert(0, RegionMetaAction::Protocol(action));
}

fn set_prev_version(&mut self, version: ManifestVersion) {
self.prev_version = version;
}
Expand Down
12 changes: 11 additions & 1 deletion src/storage/src/manifest/impl_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl<M: MetaAction<Error = Error>> ManifestImplInner<M> {
self.version.load(Ordering::Relaxed)
}

async fn save(&self, action_list: M) -> Result<ManifestVersion> {
async fn save(&self, mut action_list: M) -> Result<ManifestVersion> {
let protocol = self.protocol.load();

ensure!(
Expand All @@ -151,6 +151,16 @@ impl<M: MetaAction<Error = Error>> ManifestImplInner<M> {

let version = self.inc_version();

if version == 0 || protocol.min_writer_version < self.supported_writer_version {
let new_protocol = ProtocolAction {
min_reader_version: self.supported_reader_version,
min_writer_version: self.supported_writer_version,
};
action_list.set_protocol(new_protocol.clone());

self.protocol.store(Arc::new(new_protocol));
}

logging::debug!(
"Save region metadata action: {:?}, version: {}",
action_list,
Expand Down
24 changes: 20 additions & 4 deletions src/storage/src/manifest/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ mod tests {
use std::sync::Arc;

use object_store::{backend::fs, ObjectStore};
use store_api::manifest::action::ProtocolAction;
use store_api::manifest::{Manifest, MetaActionIterator, MAX_VERSION};
use tempdir::TempDir;

Expand Down Expand Up @@ -45,6 +46,7 @@ mod tests {
.update(RegionMetaActionList::with_action(RegionMetaAction::Change(
RegionChange {
metadata: region_meta.as_ref().into(),
committed_sequence: 99,
},
)))
.await
Expand All @@ -54,15 +56,22 @@ mod tests {

let (v, action_list) = iter.next_action().await.unwrap().unwrap();
assert_eq!(0, v);
assert_eq!(1, action_list.actions.len());
let action = &action_list.actions[0];
assert_eq!(2, action_list.actions.len());
let protocol = &action_list.actions[0];
assert!(matches!(
protocol,
RegionMetaAction::Protocol(ProtocolAction { .. })
));

let action = &action_list.actions[1];

match action {
RegionMetaAction::Change(c) => {
assert_eq!(
RegionMetadata::try_from(c.metadata.clone()).unwrap(),
*region_meta
);
assert_eq!(c.committed_sequence, 99);
}
_ => unreachable!(),
}
Expand All @@ -79,14 +88,21 @@ mod tests {
let mut iter = manifest.scan(0, MAX_VERSION).await.unwrap();
let (v, action_list) = iter.next_action().await.unwrap().unwrap();
assert_eq!(0, v);
assert_eq!(1, action_list.actions.len());
let action = &action_list.actions[0];
assert_eq!(2, action_list.actions.len());
let protocol = &action_list.actions[0];
assert!(matches!(
protocol,
RegionMetaAction::Protocol(ProtocolAction { .. })
));

let action = &action_list.actions[1];
match action {
RegionMetaAction::Change(c) => {
assert_eq!(
RegionMetadata::try_from(c.metadata.clone()).unwrap(),
*region_meta
);
assert_eq!(c.committed_sequence, 99);
}
_ => unreachable!(),
}
Expand Down
75 changes: 53 additions & 22 deletions src/storage/src/region.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,23 @@
#[cfg(test)]
mod tests;
mod writer;

use std::collections::BTreeMap;
use std::sync::Arc;

use async_trait::async_trait;
use common_telemetry::logging;
use snafu::{ensure, ResultExt};
use store_api::logstore::LogStore;
use store_api::manifest::{
self, action::ProtocolAction, Manifest, ManifestVersion, MetaActionIterator,
};
use store_api::manifest::{self, Manifest, ManifestVersion, MetaActionIterator};
use store_api::storage::{
AlterRequest, OpenOptions, ReadContext, Region, RegionId, RegionMeta, WriteContext,
WriteResponse,
AlterRequest, OpenOptions, ReadContext, Region, RegionId, RegionMeta, SequenceNumber,
WriteContext, WriteResponse,
};

use crate::error::{self, Error, Result};
use crate::flush::{FlushSchedulerRef, FlushStrategyRef};
use crate::manifest::{
action::{RegionChange, RegionMetaAction, RegionMetaActionList},
action::{RawRegionMetadata, RegionChange, RegionMetaAction, RegionMetaActionList},
region::RegionManifest,
};
use crate::memtable::MemtableBuilderRef;
Expand All @@ -28,7 +26,7 @@ pub use crate::region::writer::{AlterContext, RegionWriter, RegionWriterRef, Wri
use crate::snapshot::SnapshotImpl;
use crate::sst::AccessLayerRef;
use crate::version::VersionEdit;
use crate::version::{Version, VersionControl, VersionControlRef};
use crate::version::{Version, VersionControl, VersionControlRef, INIT_COMMITTED_SEQUENCE};
use crate::wal::Wal;
use crate::write_batch::WriteBatch;

Expand Down Expand Up @@ -95,6 +93,8 @@ pub struct StoreConfig<S> {
pub flush_strategy: FlushStrategyRef,
}

pub type RecoveredMetadataMap = BTreeMap<SequenceNumber, (ManifestVersion, RawRegionMetadata)>;

impl<S: LogStore> RegionImpl<S> {
/// Create a new region and also persist the region metadata to manifest.
///
Expand All @@ -108,12 +108,12 @@ impl<S: LogStore> RegionImpl<S> {
// the manifest.
let manifest_version = store_config
.manifest
.update(RegionMetaActionList::new(vec![
RegionMetaAction::Protocol(ProtocolAction::new()),
RegionMetaAction::Change(RegionChange {
.update(RegionMetaActionList::with_action(RegionMetaAction::Change(
RegionChange {
metadata: metadata.as_ref().into(),
}),
]))
committed_sequence: INIT_COMMITTED_SEQUENCE,
},
)))
.await?;

let version = Version::with_manifest_version(metadata, manifest_version);
Expand Down Expand Up @@ -156,18 +156,40 @@ impl<S: LogStore> RegionImpl<S> {
_opts: &OpenOptions,
) -> Result<Option<RegionImpl<S>>> {
// Load version meta data from manifest.
let version = match Self::recover_from_manifest(&store_config.manifest).await? {
None => return Ok(None),
Some(version) => version,
};
let (version, mut recovered_metadata) =
match Self::recover_from_manifest(&store_config.manifest).await? {
(None, _) => return Ok(None),
(Some(v), m) => (v, m),
};

logging::debug!(
"Region recovered version from manifest, version: {:?}",
version
);

let metadata = version.metadata().clone();
let flushed_sequence = version.flushed_sequence();
let version_control = Arc::new(VersionControl::with_version(version));

let recovered_metadata_after_flushed =
recovered_metadata.split_off(&(flushed_sequence + 1));
// apply the last flushed metadata
if let Some((sequence, (manifest_version, metadata))) = recovered_metadata.pop_last() {
let metadata = Arc::new(
metadata
.try_into()
.context(error::InvalidRawRegionSnafu { region: &name })?,
);
version_control.freeze_mutable_and_apply_metadata(metadata, manifest_version);

logging::debug!(
"Applied the last flushed metadata to region: {}, sequence: {}, manifest: {}",
name,
sequence,
manifest_version,
);
}

let wal = Wal::new(metadata.id(), store_config.log_store);
let shared = Arc::new(SharedData {
id: metadata.id(),
Expand All @@ -186,7 +208,9 @@ impl<S: LogStore> RegionImpl<S> {
manifest: &store_config.manifest,
};
// Replay all unflushed data.
writer.replay(writer_ctx).await?;
writer
.replay(recovered_metadata_after_flushed, writer_ctx)
.await?;

let inner = Arc::new(RegionInner {
shared,
Expand All @@ -201,13 +225,17 @@ impl<S: LogStore> RegionImpl<S> {
Ok(Some(RegionImpl { inner }))
}

async fn recover_from_manifest(manifest: &RegionManifest) -> Result<Option<Version>> {
async fn recover_from_manifest(
manifest: &RegionManifest,
) -> Result<(Option<Version>, RecoveredMetadataMap)> {
let (start, end) = Self::manifest_scan_range();
let mut iter = manifest.scan(start, end).await?;

let mut version = None;
let mut actions = Vec::new();
let mut last_manifest_version = manifest::MIN_VERSION;
let mut recovered_metadata = BTreeMap::new();

while let Some((manifest_version, action_list)) = iter.next_action().await? {
last_manifest_version = manifest_version;

Expand All @@ -227,8 +255,10 @@ impl<S: LogStore> RegionImpl<S> {
version = Self::replay_edit(manifest_version, action, version);
}
}
(RegionMetaAction::Change(_), Some(_)) => {
unimplemented!("alter schema is not implemented")
(RegionMetaAction::Change(c), Some(v)) => {
recovered_metadata
.insert(c.committed_sequence, (manifest_version, c.metadata));
version = Some(v);
}
(action, None) => {
actions.push((manifest_version, action));
Expand All @@ -249,7 +279,7 @@ impl<S: LogStore> RegionImpl<S> {
manifest.update_state(last_manifest_version + 1, protocol.clone());
}

Ok(version)
Ok((version, recovered_metadata))
}

fn manifest_scan_range() -> (ManifestVersion, ManifestVersion) {
Expand Down Expand Up @@ -351,6 +381,7 @@ impl<S: LogStore> RegionInner<S> {
// FIXME(yingwen): [alter] The schema may be outdated.
let metadata = self.in_memory_metadata();
let schema = metadata.schema();

// Only compare column schemas.
ensure!(
schema.column_schemas() == request.schema().column_schemas(),
Expand Down
27 changes: 21 additions & 6 deletions src/storage/src/region/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub fn new_metadata(region_name: &str, enable_version_column: bool) -> RegionMet
/// Test region with schema (timestamp, v0).
pub struct TesterBase<S: LogStore> {
pub region: RegionImpl<S>,
write_ctx: WriteContext,
pub write_ctx: WriteContext,
pub read_ctx: ReadContext,
}

Expand Down Expand Up @@ -197,6 +197,7 @@ async fn test_recover_region_manifets() {
assert!(RegionImpl::<NoopLogStore>::recover_from_manifest(&manifest)
.await
.unwrap()
.0
.is_none());

{
Expand All @@ -205,6 +206,7 @@ async fn test_recover_region_manifets() {
.update(RegionMetaActionList::with_action(RegionMetaAction::Change(
RegionChange {
metadata: region_meta.as_ref().into(),
committed_sequence: 40,
},
)))
.await
Expand All @@ -217,13 +219,26 @@ async fn test_recover_region_manifets() {
]))
.await
.unwrap();

manifest
.update(RegionMetaActionList::with_action(RegionMetaAction::Change(
RegionChange {
metadata: region_meta.as_ref().into(),
committed_sequence: 42,
},
)))
.await
.unwrap();
}

// try to recover
let version = RegionImpl::<NoopLogStore>::recover_from_manifest(&manifest)
.await
.unwrap()
.unwrap();
let (version, recovered_metadata) =
RegionImpl::<NoopLogStore>::recover_from_manifest(&manifest)
.await
.unwrap();

assert_eq!(42, *recovered_metadata.first_key_value().unwrap().0);
let version = version.unwrap();
assert_eq!(*version.metadata(), region_meta);
assert_eq!(version.flushed_sequence(), 2);
assert_eq!(version.manifest_version(), 1);
Expand All @@ -236,5 +251,5 @@ async fn test_recover_region_manifets() {
assert!(version.mutable_memtables().is_empty());

// check manifest state
assert_eq!(2, manifest.last_version());
assert_eq!(3, manifest.last_version());
}
Loading