Skip to content

Commit

Permalink
feat: Implement 'block header CBOR by hash' reducer (#70)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmhrpr authored Aug 30, 2022
1 parent e702e5a commit db8d2c5
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 1 deletion.
53 changes: 53 additions & 0 deletions src/reducers/block_header_by_hash.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use pallas::ledger::traverse::MultiEraBlock;
use serde::Deserialize;

use crate::prelude::*;
use crate::{crosscut, model};

#[derive(Deserialize)]
pub struct Config {
pub key_prefix: Option<String>,
pub filter: Option<crosscut::filters::Predicate>,
}

pub struct Reducer {
config: Config,
policy: crosscut::policies::RuntimePolicy,
}

impl Reducer {
pub fn reduce_block<'b>(
&mut self,
block: &'b MultiEraBlock<'b>,
ctx: &model::BlockContext,
output: &mut super::OutputPort,
) -> Result<(), gasket::error::Error> {
if filter_matches_block!(self, block, ctx) {
let value = block
.header()
.cbor()
.to_vec();

let crdt = model::CRDTCommand::any_write_wins(
self.config.key_prefix.as_deref(),
block.hash(),
value
);

output.send(gasket::messaging::Message::from(crdt))?;
}

Ok(())
}
}

impl Config {
pub fn plugin(self, policy: &crosscut::policies::RuntimePolicy) -> super::Reducer {
let reducer = Reducer {
config: self,
policy: policy.clone(),
};

super::Reducer::BlockHeaderByHash(reducer)
}
}
20 changes: 20 additions & 0 deletions src/reducers/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,24 @@ macro_rules! filter_matches {
};
}

macro_rules! filter_matches_block {
($reducer:ident, $block:expr, $ctx:expr) => {
match &$reducer.config.filter {
Some(x) => {
// match the block if any of the contained txs satisfy the predicates
let mut ret = false;

for tx in $block.txs().into_iter() {
ret |= crosscut::filters::eval_predicate(x, $block, &tx, $ctx, &$reducer.policy).or_panic()?;
}

ret
}
// if we don't have a filter, everything goes through
None => true,
}
};
}

pub(crate) use filter_matches;
pub(crate) use filter_matches_block;
12 changes: 11 additions & 1 deletion src/reducers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ pub mod balance_by_address;
pub mod tx_by_hash;
#[cfg(feature = "unstable")]
pub mod tx_count_by_address;
#[cfg(feature = "unstable")]
pub mod block_header_by_hash;

#[derive(Deserialize)]
#[serde(tag = "type")]
Expand All @@ -39,6 +41,8 @@ pub enum Config {
TxByHash(tx_by_hash::Config),
#[cfg(feature = "unstable")]
TxCountByAddress(tx_count_by_address::Config),
#[cfg(feature = "unstable")]
BlockHeaderByHash(block_header_by_hash::Config),
}

impl Config {
Expand All @@ -56,6 +60,8 @@ impl Config {
Config::TxByHash(c) => c.plugin(),
#[cfg(feature = "unstable")]
Config::TxCountByAddress(c) => c.plugin(policy),
#[cfg(feature = "unstable")]
Config::BlockHeaderByHash(c) => c.plugin(policy),
}
}
}
Expand Down Expand Up @@ -111,6 +117,8 @@ pub enum Reducer {
TxByHash(tx_by_hash::Reducer),
#[cfg(feature = "unstable")]
TxCountByAddress(tx_count_by_address::Reducer),
#[cfg(feature = "unstable")]
BlockHeaderByHash(block_header_by_hash::Reducer),
}

impl Reducer {
Expand All @@ -133,6 +141,8 @@ impl Reducer {
Reducer::TxByHash(x) => x.reduce_block(block, output),
#[cfg(feature = "unstable")]
Reducer::TxCountByAddress(x) => x.reduce_block(block, ctx, output),
#[cfg(feature = "unstable")]
Reducer::BlockHeaderByHash(x) => x.reduce_block(block, ctx, output),
}
}
}
}

0 comments on commit db8d2c5

Please sign in to comment.