Skip to content

Commit

Permalink
[store] add update_cold_head (#8059)
Browse files Browse the repository at this point in the history
Adding update_cold_head function and integration test for it
Changing cold_db argument from dyn Database to ColdDB in cold storage functions
Making NodeStorage's ColdDB generic for testing purposes. Thus adding Phantom field for the case when cold_store feature is not provided
Adding NodeStorage::new_with_cold for testing purposes
Adding NodeStorage::cold_db to access cold_storage as ColdDB
  • Loading branch information
posvyatokum authored Nov 23, 2022
1 parent 21cd263 commit 37232db
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 49 deletions.
66 changes: 45 additions & 21 deletions core/store/src/cold_storage.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use crate::columns::DBKeyType;
use crate::refcount::add_positive_refcount;
use crate::db::ColdDB;
use crate::trie::TrieRefcountChange;
use crate::{DBCol, DBTransaction, Database, Store, TrieChanges};
use crate::{DBCol, DBTransaction, Database, Store, TrieChanges, HEAD_KEY};

use borsh::BorshDeserialize;
use near_primitives::block::Block;
use borsh::{BorshDeserialize, BorshSerialize};
use near_primitives::block::{Block, BlockHeader, Tip};
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::ShardLayout;
use near_primitives::sharding::ShardChunk;
Expand All @@ -23,6 +23,7 @@ struct StoreWithCache<'a> {
}

/// Updates provided cold database from provided hot store with information about block at `height`.
/// Block as `height` has to be final.
/// Wraps hot store in `StoreWithCache` for optimizing reads.
///
/// First, we read from hot store information necessary
Expand All @@ -40,8 +41,8 @@ struct StoreWithCache<'a> {
/// 1. add it to `DBCol::is_cold` list
/// 2. define `DBCol::key_type` for it (if it isn't already defined)
/// 3. add new clause in `get_keys_from_store` for new key types used for this column (if there are any)
pub fn update_cold_db(
cold_db: &dyn Database,
pub fn update_cold_db<D: Database>(
cold_db: &ColdDB<D>,
hot_store: &Store,
shard_layout: &ShardLayout,
height: &BlockHeight,
Expand All @@ -68,8 +69,8 @@ pub fn update_cold_db(
/// Gets values for given keys in a column from provided hot_store.
/// Creates a transaction based on that values with set DBOp s.
/// Writes that transaction to cold_db.
fn copy_from_store(
cold_db: &dyn Database,
fn copy_from_store<D: Database>(
cold_db: &ColdDB<D>,
hot_store: &mut StoreWithCache,
col: DBCol,
keys: Vec<StoreKey>,
Expand All @@ -84,30 +85,53 @@ fn copy_from_store(
// added.
let data = hot_store.get(col, &key)?;
if let Some(value) = data {
// Database checks col.is_rc() on read and write
// And in every way expects rc columns to be written with rc
//
// TODO: As an optimisation, we might consider breaking the
// abstraction layer. Since we’re always writing to cold database,
// rather than using `cold_db: &dyn Database` argument we cloud have
// `cold_db: &ColdDB` and then some custom function which lets us
// write raw bytes.
if col.is_rc() {
transaction.update_refcount(
col,
key,
add_positive_refcount(&value, std::num::NonZeroU32::new(1).unwrap()),
);
} else {
transaction.set(col, key, value);
}
transaction.set(col, key, value);
}
}
cold_db.write(transaction)?;
return Ok(());
}

pub fn test_cold_genesis_update(cold_db: &dyn Database, hot_store: &Store) -> io::Result<()> {
/// This function sets HEAD key in BlockMisc column to the Tip that reflect provided height.
/// This function should be used after all of the blocks from genesis to `height` inclusive had been copied.
///
/// This method relies on the fact that BlockHeight and BlockHeader are not garbage collectable.
/// (to construct the Tip we query hot_store for block hash and block header)
/// If this is to change, caller should be careful about `height` not being garbage collected in hot storage yet.
pub fn update_cold_head<D: Database>(
cold_db: &ColdDB<D>,
hot_store: &Store,
height: &BlockHeight,
) -> io::Result<()> {
tracing::debug!(target: "store", "update HEAD of cold db to {}", height);

let mut store = StoreWithCache { store: hot_store, cache: StoreCache::new() };

let height_key = height.to_le_bytes();
let block_hash_key = store.get_or_err(DBCol::BlockHeight, &height_key)?.as_slice().to_vec();

let mut transaction = DBTransaction::new();
transaction.set(
DBCol::BlockMisc,
HEAD_KEY.to_vec(),
Tip::from_header(
&store.get_ser_or_err::<BlockHeader>(DBCol::BlockHeader, &block_hash_key)?,
)
.try_to_vec()?,
);
cold_db.write(transaction)?;
return Ok(());
}

pub fn test_cold_genesis_update<D: Database>(
cold_db: &ColdDB<D>,
hot_store: &Store,
) -> io::Result<()> {
let mut store_with_cache = StoreWithCache { store: hot_store, cache: StoreCache::new() };
for col in DBCol::iter() {
if col.is_cold() {
Expand Down
58 changes: 40 additions & 18 deletions core/store/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::fs::File;
use std::io::{BufReader, BufWriter, Read, Write};
use std::marker::PhantomData;
use std::path::Path;
use std::sync::Arc;
use std::{fmt, io};
Expand Down Expand Up @@ -73,12 +74,13 @@ pub enum Temperature {
/// will provide interface to access hot and cold storage. This is in contrast
/// to [`Store`] which will abstract access to only one of the temperatures of
/// the storage.
pub struct NodeStorage {
pub struct NodeStorage<D = crate::db::RocksDB> {
hot_storage: Arc<dyn Database>,
#[cfg(feature = "cold_store")]
cold_storage: Option<Arc<crate::db::ColdDB>>,
cold_storage: Option<Arc<crate::db::ColdDB<D>>>,
#[cfg(not(feature = "cold_store"))]
cold_storage: Option<std::convert::Infallible>,
_phantom: PhantomData<D>,
}

/// Node’s single storage source.
Expand Down Expand Up @@ -117,6 +119,21 @@ impl NodeStorage {
)
}

/// Constructs new object backed by given database.
fn from_rocksdb(
hot_storage: crate::db::RocksDB,
#[cfg(feature = "cold_store")] cold_storage: Option<crate::db::RocksDB>,
#[cfg(not(feature = "cold_store"))] cold_storage: Option<std::convert::Infallible>,
) -> Self {
let hot_storage = Arc::new(hot_storage);
#[cfg(feature = "cold_store")]
let cold_storage = cold_storage
.map(|cold_db| Arc::new(crate::db::ColdDB::new(hot_storage.clone(), cold_db)));
#[cfg(not(feature = "cold_store"))]
let cold_storage = cold_storage.map(|_| unreachable!());
Self { hot_storage, cold_storage, _phantom: PhantomData {} }
}

/// Initialises an opener for a new temporary test store.
///
/// As per the name, this is meant for tests only. The created store will
Expand Down Expand Up @@ -147,24 +164,11 @@ impl NodeStorage {
/// possibly [`crate::test_utils::create_test_store`] (depending whether you
/// need [`NodeStorage`] or [`Store`] object.
pub fn new(storage: Arc<dyn Database>) -> Self {
Self { hot_storage: storage, cold_storage: None }
}

/// Constructs new object backed by given database.
fn from_rocksdb(
hot_storage: crate::db::RocksDB,
#[cfg(feature = "cold_store")] cold_storage: Option<crate::db::RocksDB>,
#[cfg(not(feature = "cold_store"))] cold_storage: Option<std::convert::Infallible>,
) -> Self {
let hot_storage = Arc::new(hot_storage);
#[cfg(feature = "cold_store")]
let cold_storage = cold_storage
.map(|cold_db| Arc::new(crate::db::ColdDB::new(hot_storage.clone(), cold_db)));
#[cfg(not(feature = "cold_store"))]
let cold_storage = cold_storage.map(|_| unreachable!());
Self { hot_storage, cold_storage }
Self { hot_storage: storage, cold_storage: None, _phantom: PhantomData {} }
}
}

impl<D: Database + 'static> NodeStorage<D> {
/// Returns storage for given temperature.
///
/// Some data live only in hot and some only in cold storage (which is at
Expand Down Expand Up @@ -219,7 +223,9 @@ impl NodeStorage {
Temperature::Cold => self.cold_storage.unwrap(),
}
}
}

impl<D> NodeStorage<D> {
/// Returns whether the storage has a cold database.
pub fn has_cold(&self) -> bool {
self.cold_storage.is_some()
Expand All @@ -237,6 +243,22 @@ impl NodeStorage {
metadata::DbKind::Hot | metadata::DbKind::Cold => unreachable!(),
})
}

#[cfg(feature = "cold_store")]
pub fn new_with_cold(hot: Arc<dyn Database>, cold: D) -> Self {
Self {
hot_storage: hot.clone(),
cold_storage: Some(Arc::new(crate::db::ColdDB::<D>::new(hot, cold))),
_phantom: PhantomData::<D> {},
}
}

#[cfg(feature = "cold_store")]
pub fn cold_db(&self) -> io::Result<&Arc<crate::db::ColdDB<D>>> {
self.cold_storage
.as_ref()
.map_or(Err(io::Error::new(io::ErrorKind::NotFound, "ColdDB Not Found")), |c| Ok(c))
}
}

impl Store {
Expand Down
6 changes: 6 additions & 0 deletions core/store/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ pub fn create_test_node_storage() -> NodeStorage {
NodeStorage::new(TestDB::new())
}

/// Creates an in-memory node storage with ColdDB<TestDB>
#[cfg(feature = "cold_store")]
pub fn create_test_node_storage_with_cold() -> NodeStorage<TestDB> {
NodeStorage::new_with_cold(TestDB::new(), TestDB::default())
}

/// Creates an in-memory database.
pub fn create_test_store() -> Store {
create_test_node_storage().get_store(crate::Temperature::Hot)
Expand Down
59 changes: 49 additions & 10 deletions integration-tests/src/tests/client/cold_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ use near_chain_configs::Genesis;
use near_client::test_utils::TestEnv;
use near_crypto::{InMemorySigner, KeyType};
use near_o11y::testonly::init_test_logger;
use near_primitives::block::Tip;
use near_primitives::sharding::ShardChunk;
use near_primitives::transaction::{
Action, DeployContractAction, FunctionCallAction, SignedTransaction,
};
use near_store::cold_storage::{test_cold_genesis_update, test_get_store_reads, update_cold_db};
use near_store::db::TestDB;
use near_store::{DBCol, NodeStorage, Store, Temperature};
use near_store::cold_storage::{
test_cold_genesis_update, test_get_store_reads, update_cold_db, update_cold_head,
};
use near_store::test_utils::create_test_node_storage_with_cold;
use near_store::{DBCol, Store, Temperature, HEAD_KEY};
use nearcore::config::GenesisExt;
use strum::IntoEnumIterator;

Expand Down Expand Up @@ -56,8 +59,6 @@ fn check_iter(
fn test_storage_after_commit_of_cold_update() {
init_test_logger();

let cold_db = TestDB::new();

let epoch_length = 5;
let max_height = epoch_length * 4;

Expand All @@ -70,9 +71,13 @@ fn test_storage_after_commit_of_cold_update() {
.runtime_adapters(create_nightshade_runtimes(&genesis, 1))
.build();

// TODO construct cold_db with appropriate hot storage
let store = create_test_node_storage_with_cold();

let mut last_hash = *env.clients[0].chain.genesis().hash();

test_cold_genesis_update(&*cold_db, &env.clients[0].runtime_adapter.store()).unwrap();
test_cold_genesis_update(&*store.cold_db().unwrap(), &env.clients[0].runtime_adapter.store())
.unwrap();

let state_reads = test_get_store_reads(DBCol::State);
let state_changes_reads = test_get_store_reads(DBCol::StateChanges);
Expand Down Expand Up @@ -129,7 +134,7 @@ fn test_storage_after_commit_of_cold_update() {
env.process_block(0, block.clone(), Provenance::PRODUCED);

update_cold_db(
&*cold_db,
&*store.cold_db().unwrap(),
&env.clients[0].runtime_adapter.store(),
&env.clients[0]
.runtime_adapter
Expand All @@ -152,8 +157,6 @@ fn test_storage_after_commit_of_cold_update() {
// assert that we don't read StateChanges from db again after iter_prefix
assert_eq!(state_changes_reads, test_get_store_reads(DBCol::StateChanges));

let cold_store = NodeStorage::new(cold_db).get_store(Temperature::Hot);

// We still need to filter out one chunk
let mut no_check_rules: Vec<Box<dyn Fn(DBCol, &Box<[u8]>) -> bool>> = vec![];
no_check_rules.push(Box::new(move |col, value| -> bool {
Expand All @@ -170,7 +173,7 @@ fn test_storage_after_commit_of_cold_update() {
if col.is_cold() {
let num_checks = check_iter(
&env.clients[0].runtime_adapter.store(),
&cold_store,
&store.get_store(Temperature::Cold),
col,
&no_check_rules,
);
Expand All @@ -183,3 +186,39 @@ fn test_storage_after_commit_of_cold_update() {
}
}
}

/// Producing 10 * 5 blocks and updating HEAD of cold storage after each one.
/// After every update checking that HEAD of cold db and FINAL_HEAD of hot store are equal.
#[test]
fn test_cold_db_head_update() {
init_test_logger();

let epoch_length = 5;
let max_height = epoch_length * 10;

let mut genesis = Genesis::test(vec!["test0".parse().unwrap(), "test1".parse().unwrap()], 1);

genesis.config.epoch_length = epoch_length;
let mut chain_genesis = ChainGenesis::test();
chain_genesis.epoch_length = epoch_length;
let mut env = TestEnv::builder(chain_genesis)
.runtime_adapters(create_nightshade_runtimes(&genesis, 1))
.build();

let store = create_test_node_storage_with_cold();

for h in 1..max_height {
env.produce_block(0, h);
update_cold_head(&*store.cold_db().unwrap(), &env.clients[0].runtime_adapter.store(), &h)
.unwrap();

assert_eq!(
&store.get_store(Temperature::Cold).get_ser::<Tip>(DBCol::BlockMisc, HEAD_KEY).unwrap(),
&env.clients[0]
.runtime_adapter
.store()
.get_ser::<Tip>(DBCol::BlockMisc, HEAD_KEY)
.unwrap()
);
}
}

0 comments on commit 37232db

Please sign in to comment.