From db8d2c5a3a18e904797d0b2f912bf4db51436f45 Mon Sep 17 00:00:00 2001 From: Harper Date: Tue, 30 Aug 2022 21:52:39 +0100 Subject: [PATCH] feat: Implement 'block header CBOR by hash' reducer (#70) --- src/reducers/block_header_by_hash.rs | 53 ++++++++++++++++++++++++++++ src/reducers/macros.rs | 20 +++++++++++ src/reducers/mod.rs | 12 ++++++- 3 files changed, 84 insertions(+), 1 deletion(-) create mode 100644 src/reducers/block_header_by_hash.rs diff --git a/src/reducers/block_header_by_hash.rs b/src/reducers/block_header_by_hash.rs new file mode 100644 index 00000000..c7c38ab4 --- /dev/null +++ b/src/reducers/block_header_by_hash.rs @@ -0,0 +1,53 @@ +use pallas::ledger::traverse::MultiEraBlock; +use serde::Deserialize; + +use crate::prelude::*; +use crate::{crosscut, model}; + +#[derive(Deserialize)] +pub struct Config { + pub key_prefix: Option, + pub filter: Option, +} + +pub struct Reducer { + config: Config, + policy: crosscut::policies::RuntimePolicy, +} + +impl Reducer { + pub fn reduce_block<'b>( + &mut self, + block: &'b MultiEraBlock<'b>, + ctx: &model::BlockContext, + output: &mut super::OutputPort, + ) -> Result<(), gasket::error::Error> { + if filter_matches_block!(self, block, ctx) { + let value = block + .header() + .cbor() + .to_vec(); + + let crdt = model::CRDTCommand::any_write_wins( + self.config.key_prefix.as_deref(), + block.hash(), + value + ); + + output.send(gasket::messaging::Message::from(crdt))?; + } + + Ok(()) + } +} + +impl Config { + pub fn plugin(self, policy: &crosscut::policies::RuntimePolicy) -> super::Reducer { + let reducer = Reducer { + config: self, + policy: policy.clone(), + }; + + super::Reducer::BlockHeaderByHash(reducer) + } +} diff --git a/src/reducers/macros.rs b/src/reducers/macros.rs index cace91a4..6d4d2262 100644 --- a/src/reducers/macros.rs +++ b/src/reducers/macros.rs @@ -9,4 +9,24 @@ macro_rules! filter_matches { }; } +macro_rules! filter_matches_block { + ($reducer:ident, $block:expr, $ctx:expr) => { + match &$reducer.config.filter { + Some(x) => { + // match the block if any of the contained txs satisfy the predicates + let mut ret = false; + + for tx in $block.txs().into_iter() { + ret |= crosscut::filters::eval_predicate(x, $block, &tx, $ctx, &$reducer.policy).or_panic()?; + } + + ret + } + // if we don't have a filter, everything goes through + None => true, + } + }; +} + pub(crate) use filter_matches; +pub(crate) use filter_matches_block; diff --git a/src/reducers/mod.rs b/src/reducers/mod.rs index e28d68d0..34e6d4ff 100644 --- a/src/reducers/mod.rs +++ b/src/reducers/mod.rs @@ -23,6 +23,8 @@ pub mod balance_by_address; pub mod tx_by_hash; #[cfg(feature = "unstable")] pub mod tx_count_by_address; +#[cfg(feature = "unstable")] +pub mod block_header_by_hash; #[derive(Deserialize)] #[serde(tag = "type")] @@ -39,6 +41,8 @@ pub enum Config { TxByHash(tx_by_hash::Config), #[cfg(feature = "unstable")] TxCountByAddress(tx_count_by_address::Config), + #[cfg(feature = "unstable")] + BlockHeaderByHash(block_header_by_hash::Config), } impl Config { @@ -56,6 +60,8 @@ impl Config { Config::TxByHash(c) => c.plugin(), #[cfg(feature = "unstable")] Config::TxCountByAddress(c) => c.plugin(policy), + #[cfg(feature = "unstable")] + Config::BlockHeaderByHash(c) => c.plugin(policy), } } } @@ -111,6 +117,8 @@ pub enum Reducer { TxByHash(tx_by_hash::Reducer), #[cfg(feature = "unstable")] TxCountByAddress(tx_count_by_address::Reducer), + #[cfg(feature = "unstable")] + BlockHeaderByHash(block_header_by_hash::Reducer), } impl Reducer { @@ -133,6 +141,8 @@ impl Reducer { Reducer::TxByHash(x) => x.reduce_block(block, output), #[cfg(feature = "unstable")] Reducer::TxCountByAddress(x) => x.reduce_block(block, ctx, output), + #[cfg(feature = "unstable")] + Reducer::BlockHeaderByHash(x) => x.reduce_block(block, ctx, output), } } -} +} \ No newline at end of file