Skip to content

Commit

Permalink
Schema cache for log filtering (polkadot-evm#425)
Browse files Browse the repository at this point in the history
* Implement `Copy` for EthereumStorageSchema

* Setup schema cache

* TODO comment

* Map genesis hash

* Sorted cache required

* Ignore caching existing schema

* Setup db migration

* Add functions to load and write to new db column

* Use db instead Auxstore

* Semver

* Oops Decode

* fmt

* Fix fc-db changelog

* Use MetaDb instead adding new column

* Cleanup
  • Loading branch information
tgmichel authored Aug 2, 2021
1 parent a883d3b commit 03a81c1
Show file tree
Hide file tree
Showing 19 changed files with 231 additions and 22 deletions.
8 changes: 5 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion client/consensus/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# Changelog for `fc-consensus`

## Unreleased
## Unreleased
* Bump `fc-db` to `2.0.0-dev`
4 changes: 2 additions & 2 deletions client/consensus/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fc-consensus"
version = "2.0.0"
version = "2.0.0-dev"
authors = ["Parity Technologies <admin@parity.io>"]
description = "Frontier consensus for substrate"
edition = "2018"
Expand All @@ -18,7 +18,7 @@ sp-block-builder = { version = "3.0.0", git = "https://github.com/paritytech/sub
sp-inherents = { version = "3.0.0", git = "https://github.com/paritytech/substrate.git", branch = "frontier" }
fp-consensus = { version = "1.0.0", path = "../../primitives/consensus" }
fp-rpc = { version = "2.0.0", path = "../../primitives/rpc" }
fc-db = { version = "1.0.0", path = "../db" }
fc-db = { version = "2.0.0-dev", path = "../db" }
sp-consensus = { version = "0.9.0", git = "https://github.com/paritytech/substrate.git", branch = "frontier" }
log = "0.4.8"
futures = { version = "0.3.1", features = ["compat"] }
Expand Down
7 changes: 6 additions & 1 deletion client/db/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# Changelog for `fc-db`

## Unreleased
## Unreleased
* Introduce versioning and migration functions.
* New version 2.
* New column `ETHEREUM_SCHEMA_CACHE` in version 2.
* New dependency `fp-storage`.
* New dependency `pallet-ethereum`.
4 changes: 3 additions & 1 deletion client/db/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fc-db"
version = "1.0.0"
version = "2.0.0-dev"
authors = ["Parity Technologies <admin@parity.io>"]
description = "Frontier database backend"
edition = "2018"
Expand All @@ -11,6 +11,8 @@ repository = "https://github.com/paritytech/frontier/"
sp-core = { version = "3.0.0", git = "https://github.com/paritytech/substrate.git", branch = "frontier" }
sp-database = { version = "3.0.0", git = "https://github.com/paritytech/substrate.git", branch = "frontier" }
sp-runtime = { version = "3.0.0", git = "https://github.com/paritytech/substrate.git", branch = "frontier" }
pallet-ethereum = { version = "3.0.0-dev", path = "../../frame/ethereum" }
fp-storage = { version = "2.0.0-dev", path = "../../primitives/storage"}
kvdb = "0.9.0"
kvdb-rocksdb = "0.11.0"
codec = { package = "parity-scale-codec", version = "2.0.0", features = ["derive"] }
Expand Down
33 changes: 33 additions & 0 deletions client/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ mod utils;
pub use sp_database::Database;

use codec::{Decode, Encode};
use fp_storage::PALLET_ETHEREUM_SCHEMA_CACHE;
use pallet_ethereum::EthereumStorageSchema;
use parking_lot::Mutex;
use sp_core::H256;
use sp_runtime::traits::Block as BlockT;
Expand Down Expand Up @@ -138,6 +140,37 @@ impl<Block: BlockT> MetaDb<Block> {

Ok(())
}

pub fn ethereum_schema(&self) -> Result<Option<Vec<(EthereumStorageSchema, H256)>>, String> {
match self
.db
.get(crate::columns::META, &PALLET_ETHEREUM_SCHEMA_CACHE.encode())
{
Some(raw) => Ok(Some(
Decode::decode(&mut &raw[..]).map_err(|e| format!("{:?}", e))?,
)),
None => Ok(None),
}
}

pub fn write_ethereum_schema(
&self,
new_cache: Vec<(EthereumStorageSchema, H256)>,
) -> Result<(), String> {
let mut transaction = sp_database::Transaction::new();

transaction.set(
crate::columns::META,
&PALLET_ETHEREUM_SCHEMA_CACHE.encode(),
&new_cache.encode(),
);

self.db
.commit(transaction)
.map_err(|e| format!("{:?}", e))?;

Ok(())
}
}

pub struct MappingCommitment<Block: BlockT> {
Expand Down
1 change: 1 addition & 0 deletions client/mapping-sync/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
## Unreleased

* Added support for syncing mapping hashes mid-way.
* Bump `fc-db` to `2.0.0-dev`.
* Added support to optionally sync tips up to the best block number.
2 changes: 1 addition & 1 deletion client/mapping-sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ sc-client-api = { version = "3.0.0", git = "https://github.com/paritytech/substr
sp-api = { version = "3.0.0", git = "https://github.com/paritytech/substrate.git", branch = "frontier" }
fp-consensus = { version = "1.0.0", path = "../../primitives/consensus" }
fc-consensus = { version = "2.0.0-dev", path = "../consensus" }
fc-db = { version = "1.0.0", path = "../db" }
fc-db = { version = "2.0.0-dev", path = "../db" }
fp-rpc = { version = "2.0.0-dev", path = "../../primitives/rpc" }
futures = { version = "0.3.1", features = ["compat"] }
futures-timer = "3.0.1"
Expand Down
5 changes: 4 additions & 1 deletion client/rpc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,7 @@
* Fix `estimate_gas`: ensure that provided gas limit it never larger than current block's gas limit
* `EthPubSubApi::new` takes an additional `overrides` parameter.
* Fix `estimate_gas` inaccurate issue.
* Use pallet-ethereum 3.0.0-dev.
* Use pallet-ethereum 3.0.0-dev.
* `EthFilterApi::new` takes an additional `backend` parameter.
* Bump `fp-storage` to `2.0.0-dev`.
* Bump `fc-db` to `2.0.0-dev`.
4 changes: 2 additions & 2 deletions client/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ log = "0.4.8"
ethereum-types = "0.11.0"
evm = "0.27.0"
fc-consensus = { version = "2.0.0-dev", path = "../consensus" }
fc-db = { version = "1.0.0", path = "../db" }
fc-db = { version = "2.0.0-dev", path = "../db" }
fc-rpc-core = { version = "1.1.0-dev", path = "../rpc-core" }
fp-consensus = { version = "1.0.0", path = "../../primitives/consensus" }
fp-rpc = { version = "2.0.0-dev", path = "../../primitives/rpc" }
fp-storage = { version = "1.0.1-dev", path = "../../primitives/storage"}
fp-storage = { version = "2.0.0-dev", path = "../../primitives/storage"}
sp-io = { version = "3.0.0", git = "https://github.com/paritytech/substrate.git", branch = "frontier" }
sp-runtime = { version = "3.0.0", git = "https://github.com/paritytech/substrate.git", branch = "frontier" }
sp-api = { version = "3.0.0", git = "https://github.com/paritytech/substrate.git", branch = "frontier" }
Expand Down
135 changes: 130 additions & 5 deletions client/rpc/src/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ use std::{
};

use crate::overrides::OverrideHandle;
use codec::{self, Encode};
use codec::{self, Decode, Encode};
pub use fc_rpc_core::{EthApiServer, EthFilterApiServer, NetApiServer, Web3ApiServer};
use pallet_ethereum::EthereumStorageSchema;

pub struct EthApi<B: BlockT, C, P, CT, BE, H: ExHashT> {
pool: Arc<P>,
Expand Down Expand Up @@ -237,6 +238,7 @@ fn transaction_build(

fn filter_range_logs<B: BlockT, C, BE>(
client: &C,
backend: &fc_db::Backend<B>,
overrides: &OverrideHandle<B>,
ret: &mut Vec<Log>,
max_past_logs: u32,
Expand Down Expand Up @@ -268,10 +270,48 @@ where
};
let bloom_filter = FilteredParams::bloom_filter(&filter.address, &topics_input);

// Get schema cache. A single AuxStore read before the block range iteration.
// This prevents having to do an extra DB read per block range iteration to getthe actual schema.
let mut local_cache: BTreeMap<NumberFor<B>, EthereumStorageSchema> = BTreeMap::new();
if let Ok(Some(schema_cache)) = frontier_backend_client::load_cached_schema::<B>(backend) {
for (schema, hash) in schema_cache {
if let Ok(Some(header)) = client.header(BlockId::Hash(hash)) {
let number = *header.number();
local_cache.insert(number, schema);
}
}
}
let cache_keys: Vec<NumberFor<B>> = local_cache.keys().cloned().collect();
let mut default_schema: Option<&EthereumStorageSchema> = None;
if cache_keys.len() == 1 {
// There is only one schema and that's the one we use.
default_schema = local_cache.get(&cache_keys[0]);
}

while current_number >= from {
let id = BlockId::Number(current_number);

let schema = frontier_backend_client::onchain_storage_schema::<B, C, BE>(client, id);
let schema = match default_schema {
// If there is a single schema, we just assign.
Some(default_schema) => *default_schema,
_ => {
// If there are multiple schemas, we iterate over the - hopefully short - list
// of keys and assign the one belonging to the current_number.
// Because there are more than 1 schema, and current_number cannot be < 0,
// (i - 1) will always be >= 0.
let mut default_schema: Option<&EthereumStorageSchema> = None;
for (i, k) in cache_keys.iter().enumerate() {
if &current_number < k {
default_schema = local_cache.get(&cache_keys[i - 1]);
}
}
match default_schema {
Some(schema) => *schema,
// Fallback to DB read. This will happen i.e. when there is no cache
// task configured at service level.
_ => frontier_backend_client::onchain_storage_schema::<B, C, BE>(client, id),
}
}
};
let handler = overrides
.schemas
.get(&schema)
Expand Down Expand Up @@ -1315,6 +1355,7 @@ where

let _ = filter_range_logs(
self.client.as_ref(),
self.backend.as_ref(),
&self.overrides,
&mut ret,
self.max_past_logs,
Expand Down Expand Up @@ -1447,6 +1488,7 @@ where

pub struct EthFilterApi<B: BlockT, C, BE> {
client: Arc<C>,
backend: Arc<fc_db::Backend<B>>,
filter_pool: FilterPool,
max_stored_filters: usize,
overrides: Arc<OverrideHandle<B>>,
Expand All @@ -1465,13 +1507,15 @@ where
{
pub fn new(
client: Arc<C>,
backend: Arc<fc_db::Backend<B>>,
filter_pool: FilterPool,
max_stored_filters: usize,
overrides: Arc<OverrideHandle<B>>,
max_past_logs: u32,
) -> Self {
Self {
client: client.clone(),
backend: backend.clone(),
filter_pool,
max_stored_filters,
overrides,
Expand Down Expand Up @@ -1625,6 +1669,7 @@ where
let mut ret: Vec<Log> = Vec::new();
let _ = filter_range_logs(
self.client.as_ref(),
self.backend.as_ref(),
&self.overrides,
&mut ret,
self.max_past_logs,
Expand Down Expand Up @@ -1690,6 +1735,7 @@ where
let mut ret: Vec<Log> = Vec::new();
let _ = filter_range_logs(
self.client.as_ref(),
self.backend.as_ref(),
&self.overrides,
&mut ret,
self.max_past_logs,
Expand Down Expand Up @@ -1734,9 +1780,88 @@ pub struct EthTask<B, C>(PhantomData<(B, C)>);

impl<B, C> EthTask<B, C>
where
C: ProvideRuntimeApi<B> + BlockchainEvents<B>,
B: BlockT,
C: ProvideRuntimeApi<B> + BlockchainEvents<B> + HeaderBackend<B>,
B: BlockT<Hash = H256>,
{
/// Task that caches at which best hash a new EthereumStorageSchema was inserted in the Runtime Storage.
pub async fn ethereum_schema_cache_task(client: Arc<C>, backend: Arc<fc_db::Backend<B>>) {
use fp_storage::PALLET_ETHEREUM_SCHEMA;
use log::warn;
use sp_storage::{StorageData, StorageKey};

if let Ok(None) = frontier_backend_client::load_cached_schema::<B>(backend.as_ref()) {
let mut cache: Vec<(EthereumStorageSchema, H256)> = Vec::new();
if let Ok(Some(header)) = client.header(BlockId::Number(Zero::zero())) {
cache.push((EthereumStorageSchema::V1, header.hash()));
let _ = frontier_backend_client::write_cached_schema::<B>(backend.as_ref(), cache)
.map_err(|err| {
warn!("Error schema cache insert for genesis: {:?}", err);
});
} else {
warn!("Error genesis header unreachable");
}
}

// Subscribe to changes for the pallet-ethereum Schema.
if let Ok(mut stream) = client.storage_changes_notification_stream(
Some(&[StorageKey(PALLET_ETHEREUM_SCHEMA.to_vec())]),
None,
) {
while let Some((hash, changes)) = stream.next().await {
// Make sure only block hashes marked as best are referencing cache checkpoints.
if hash == client.info().best_hash {
// Just map the change set to the actual data.
let storage: Vec<Option<StorageData>> = changes
.iter()
.filter_map(|(o_sk, _k, v)| {
if o_sk.is_none() {
Some(v.cloned())
} else {
None
}
})
.collect();
for change in storage {
if let Some(data) = change {
// Decode the wrapped blob which's type is known.
let new_schema: EthereumStorageSchema =
Decode::decode(&mut &data.0[..]).unwrap();
// Cache new entry and overwrite the AuxStore value.
if let Ok(Some(old_cache)) =
frontier_backend_client::load_cached_schema::<B>(backend.as_ref())
{
let mut new_cache: Vec<(EthereumStorageSchema, H256)> = old_cache;
match &new_cache[..] {
[.., (schema, _)] if *schema == new_schema => {
warn!(
"Schema version already in AuxStore, ignoring: {:?}",
new_schema
);
}
_ => {
new_cache.push((new_schema, hash));
let _ = frontier_backend_client::write_cached_schema::<B>(
backend.as_ref(),
new_cache,
)
.map_err(|err| {
warn!(
"Error schema cache insert for genesis: {:?}",
err
);
});
}
}
} else {
warn!("Error schema cache is corrupted");
}
}
}
}
}
}
}

pub async fn pending_transaction_task(
client: Arc<C>,
pending_transactions: Arc<Mutex<HashMap<H256, PendingTransaction>>>,
Expand Down
Loading

0 comments on commit 03a81c1

Please sign in to comment.