Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added version in state sync #293

Merged
merged 2 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 40 additions & 2 deletions grovedb/src/replication.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::{
collections::{BTreeMap, BTreeSet},
fmt,
str::Utf8Error,
};

use grovedb_merk::{
Expand All @@ -19,6 +18,8 @@ use crate::{replication, Error, GroveDb, Transaction, TransactionArg};

pub(crate) type SubtreePrefix = [u8; blake3::OUT_LEN];

pub const CURRENT_STATE_SYNC_VERSION: u16 = 1;

// Struct governing state sync
pub struct StateSyncInfo<'db> {
// Current Chunk restorer
Expand All @@ -32,6 +33,8 @@ pub struct StateSyncInfo<'db> {
pub pending_chunks: BTreeSet<Vec<u8>>,
// Number of processed chunks in current prefix (Path digest)
pub num_processed_chunks: usize,
// Version of state sync protocol,
pub version: u16,
}

// Struct containing information about current subtrees found in GroveDB
Expand Down Expand Up @@ -121,6 +124,7 @@ impl GroveDb {
current_prefix: None,
pending_chunks,
num_processed_chunks: 0,
version: CURRENT_STATE_SYNC_VERSION,
}
}

Expand Down Expand Up @@ -210,7 +214,15 @@ impl GroveDb {
&self,
global_chunk_id: &[u8],
tx: TransactionArg,
version: u16,
) -> Result<Vec<Op>, Error> {
// For now, only CURRENT_STATE_SYNC_VERSION is supported
if version != CURRENT_STATE_SYNC_VERSION {
return Err(Error::CorruptedData(
"Unsupported state sync protocol version".to_string(),
));
}

let chunk_prefix_length: usize = 32;
if global_chunk_id.len() < chunk_prefix_length {
return Err(Error::CorruptedData(
Expand Down Expand Up @@ -265,7 +277,7 @@ impl GroveDb {
}
Some(t) => {
let merk = self
.open_transactional_merk_at_path(path.into(), &t, None)
.open_transactional_merk_at_path(path.into(), t, None)
.value?;

if merk.is_empty_tree().unwrap() {
Expand Down Expand Up @@ -311,7 +323,20 @@ impl GroveDb {
mut state_sync_info: StateSyncInfo<'db>,
app_hash: CryptoHash,
tx: &'db Transaction,
version: u16,
) -> Result<(Vec<Vec<u8>>, StateSyncInfo), Error> {
// For now, only CURRENT_STATE_SYNC_VERSION is supported
if version != CURRENT_STATE_SYNC_VERSION {
return Err(Error::CorruptedData(
"Unsupported state sync protocol version".to_string(),
));
}
if version != state_sync_info.version {
return Err(Error::CorruptedData(
"Unsupported state sync protocol version".to_string(),
));
}

let mut res = vec![];

match (
Expand Down Expand Up @@ -359,7 +384,20 @@ impl GroveDb {
mut state_sync_info: StateSyncInfo<'db>,
chunk: (&[u8], Vec<Op>),
tx: &'db Transaction,
version: u16,
) -> Result<(Vec<Vec<u8>>, StateSyncInfo), Error> {
// For now, only CURRENT_STATE_SYNC_VERSION is supported
if version != CURRENT_STATE_SYNC_VERSION {
return Err(Error::CorruptedData(
"Unsupported state sync protocol version".to_string(),
));
}
if version != state_sync_info.version {
return Err(Error::CorruptedData(
"Unsupported state sync protocol version".to_string(),
));
}

let mut res = vec![];

let (global_chunk_id, chunk_data) = chunk;
Expand Down
9 changes: 5 additions & 4 deletions tutorials/src/bin/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use grovedb::{operations::insert::InsertOptions, Element, GroveDb, PathQuery, Qu
use grovedb::reference_path::ReferencePathType;
use rand::{distributions::Alphanumeric, Rng, };
use grovedb::element::SumValue;
use grovedb::replication::CURRENT_STATE_SYNC_VERSION;
use grovedb_path::{SubtreePath};

const MAIN_ΚΕΥ: &[u8] = b"key_main";
Expand Down Expand Up @@ -203,7 +204,7 @@ fn query_db(db: &GroveDb, path: &[&[u8]], key: Vec<u8>) {
let path_query = PathQuery::new_unsized(path_vec, query.clone());

let (elements, _) = db
.query_item_value(&path_query, true, None)
.query_item_value(&path_query, true, false,true, None)
.unwrap()
.expect("expected successful get_path_query");
for e in elements.into_iter() {
Expand All @@ -226,15 +227,15 @@ fn sync_db_demo(
target_tx: &Transaction,
) -> Result<(), grovedb::Error> {
let app_hash = source_db.root_hash(None).value.unwrap();
let (chunk_ids, mut state_sync_info) = target_db.start_snapshot_syncing(state_sync_info, app_hash, target_tx)?;
let (chunk_ids, mut state_sync_info) = target_db.start_snapshot_syncing(state_sync_info, app_hash, target_tx, CURRENT_STATE_SYNC_VERSION)?;

let mut chunk_queue : VecDeque<Vec<u8>> = VecDeque::new();

chunk_queue.extend(chunk_ids);

while let Some(chunk_id) = chunk_queue.pop_front() {
let ops = source_db.fetch_chunk(chunk_id.as_slice(), None)?;
let (more_chunks, new_state_sync_info) = target_db.apply_chunk(state_sync_info, (chunk_id.as_slice(), ops), target_tx)?;
let ops = source_db.fetch_chunk(chunk_id.as_slice(), None, CURRENT_STATE_SYNC_VERSION)?;
let (more_chunks, new_state_sync_info) = target_db.apply_chunk(state_sync_info, (chunk_id.as_slice(), ops), target_tx, CURRENT_STATE_SYNC_VERSION)?;
state_sync_info = new_state_sync_info;
chunk_queue.extend(more_chunks);
}
Expand Down
Loading