From 28580bb76178bde5e4174dd5f73b1aed0e706b87 Mon Sep 17 00:00:00 2001 From: Filipe Azevedo Date: Tue, 21 Jun 2022 23:46:10 +0100 Subject: [PATCH] add partial account filters for near (#3669) --- chain/near/src/adapter.rs | 169 +++++++++- chain/near/src/chain.rs | 373 ++++++++++++++++++++- chain/near/src/data_source.rs | 77 ++++- graph/proto/near/transforms.proto | 13 + graph/src/firehose/sf.near.transform.v1.rs | 16 + 5 files changed, 630 insertions(+), 18 deletions(-) diff --git a/chain/near/src/adapter.rs b/chain/near/src/adapter.rs index 33c47e9bc51..1b3969251a7 100644 --- a/chain/near/src/adapter.rs +++ b/chain/near/src/adapter.rs @@ -1,9 +1,10 @@ use std::collections::HashSet; use crate::capabilities::NodeCapabilities; +use crate::data_source::PartialAccounts; use crate::{data_source::DataSource, Chain}; use graph::blockchain as bc; -use graph::firehose::BasicReceiptFilter; +use graph::firehose::{BasicReceiptFilter, PrefixSuffixPair}; use graph::prelude::*; use prost::Message; use prost_types::Any; @@ -54,6 +55,14 @@ impl bc::TriggerFilter for TriggerFilter { let filter = BasicReceiptFilter { accounts: receipt.accounts.into_iter().collect(), + prefix_and_suffix_pairs: receipt + .partial_accounts + .iter() + .map(|(prefix, suffix)| PrefixSuffixPair { + prefix: prefix.clone().unwrap_or("".to_string()), + suffix: suffix.clone().unwrap_or("".to_string()), + }) + .collect(), }; vec![Any { @@ -70,36 +79,125 @@ pub(crate) type Account = String; #[derive(Clone, Debug, Default)] pub(crate) struct NearReceiptFilter { pub accounts: HashSet, + pub partial_accounts: HashSet<(Option, Option)>, } impl NearReceiptFilter { pub fn matches(&self, account: &String) -> bool { - self.accounts.contains(account) + let NearReceiptFilter { + accounts, + partial_accounts, + } = self; + + if accounts.contains(account) { + return true; + } + + partial_accounts.iter().any(|partial| match partial { + (Some(prefix), Some(suffix)) => { + account.starts_with(prefix) && account.ends_with(suffix) + } + (Some(prefix), None) => account.starts_with(prefix), + (None, Some(suffix)) => account.ends_with(suffix), + (None, None) => unreachable!(), + }) } pub fn is_empty(&self) -> bool { - let NearReceiptFilter { accounts } = self; + let NearReceiptFilter { + accounts, + partial_accounts, + } = self; - accounts.is_empty() + accounts.is_empty() && partial_accounts.is_empty() } pub fn from_data_sources<'a>(iter: impl IntoIterator) -> Self { - let accounts: Vec = iter + struct Source { + account: Option, + partial_accounts: Option, + } + + // Select any ds with either partial or exact accounts. + let sources: Vec = iter .into_iter() .filter(|data_source| { - data_source.source.account.is_some() + (data_source.source.account.is_some() || data_source.source.accounts.is_some()) && !data_source.mapping.receipt_handlers.is_empty() }) - .map(|ds| ds.source.account.as_ref().unwrap().clone()) + .map(|ds| Source { + account: ds.source.account.clone(), + partial_accounts: ds.source.accounts.clone(), + }) + .collect(); + + // Handle exact matches + let accounts: Vec = sources + .iter() + .filter(|s| s.account.is_some()) + .map(|s| s.account.as_ref().cloned().unwrap()) + .collect(); + + // Parse all the partial accounts, produces all possible combinations of the values + // eg: + // prefix [a,b] and suffix [d] would produce [a,d], [b,d] + // prefix [a] and suffix [c,d] would produce [a,c], [a,d] + // prefix [] and suffix [c, d] would produce [None, c], [None, d] + // prefix [a,b] and suffix [] would produce [a, None], [b, None] + let partial_accounts: Vec<(Option, Option)> = sources + .iter() + .filter(|s| s.partial_accounts.is_some()) + .map(|s| { + let partials = s.partial_accounts.as_ref().unwrap(); + + let mut pairs: Vec<(Option, Option)> = vec![]; + let prefixes: Vec> = if partials.prefixes.is_empty() { + vec![None] + } else { + partials + .prefixes + .iter() + .filter(|s| !s.is_empty()) + .map(|s| Some(s.clone())) + .collect() + }; + + let suffixes: Vec> = if partials.suffixes.is_empty() { + vec![None] + } else { + partials + .suffixes + .iter() + .filter(|s| !s.is_empty()) + .map(|s| Some(s.clone())) + .collect() + }; + + for prefix in prefixes.into_iter() { + for suffix in suffixes.iter() { + pairs.push((prefix.clone(), suffix.clone())) + } + } + + pairs + }) + .flatten() .collect(); Self { accounts: HashSet::from_iter(accounts), + partial_accounts: HashSet::from_iter(partial_accounts), } } pub fn extend(&mut self, other: NearReceiptFilter) { - self.accounts.extend(other.accounts); + let NearReceiptFilter { + accounts, + partial_accounts, + } = self; + + accounts.extend(other.accounts); + partial_accounts.extend(other.partial_accounts); } } @@ -130,7 +228,10 @@ mod test { use super::NearBlockFilter; use crate::adapter::{TriggerFilter, BASIC_RECEIPT_FILTER_TYPE_URL}; - use graph::{blockchain::TriggerFilter as _, firehose::BasicReceiptFilter}; + use graph::{ + blockchain::TriggerFilter as _, + firehose::{BasicReceiptFilter, PrefixSuffixPair}, + }; use prost::Message; use prost_types::Any; @@ -142,6 +243,7 @@ mod test { }, receipt_filter: super::NearReceiptFilter { accounts: HashSet::new(), + partial_accounts: HashSet::new(), }, }; assert_eq!(filter.to_firehose_filter(), vec![]); @@ -155,6 +257,7 @@ mod test { }, receipt_filter: super::NearReceiptFilter { accounts: HashSet::from_iter(vec!["acc1".into(), "acc2".into(), "acc3".into()]), + partial_accounts: HashSet::new(), }, }; @@ -170,6 +273,7 @@ mod test { }, receipt_filter: super::NearReceiptFilter { accounts: HashSet::from_iter(vec!["acc1".into(), "acc2".into(), "acc3".into()]), + partial_accounts: HashSet::new(), }, }; @@ -188,6 +292,53 @@ mod test { ); } + #[test] + fn near_trigger_partial_filter() { + let filter = TriggerFilter { + block_filter: NearBlockFilter { + trigger_every_block: false, + }, + receipt_filter: super::NearReceiptFilter { + accounts: HashSet::from_iter(vec!["acc1".into()]), + partial_accounts: HashSet::from_iter(vec![ + (Some("acc1".into()), None), + (None, Some("acc2".into())), + (Some("acc3".into()), Some("acc4".into())), + ]), + }, + }; + + let filter = filter.to_firehose_filter(); + assert_eq!(filter.len(), 1); + + let firehose_filter = decode_filter(filter); + assert_eq!(firehose_filter.accounts, vec![String::from("acc1"),],); + + let expected_pairs = vec![ + PrefixSuffixPair { + prefix: "acc3".to_string(), + suffix: "acc4".to_string(), + }, + PrefixSuffixPair { + prefix: "".to_string(), + suffix: "acc2".to_string(), + }, + PrefixSuffixPair { + prefix: "acc1".to_string(), + suffix: "".to_string(), + }, + ]; + + let pairs = firehose_filter.prefix_and_suffix_pairs; + assert_eq!(pairs.len(), 3); + assert_eq!( + true, + expected_pairs.iter().all(|x| pairs.contains(x)), + "{:?}", + pairs + ); + } + fn decode_filter(firehose_filter: Vec) -> BasicReceiptFilter { let firehose_filter = firehose_filter[0].clone(); assert_eq!( diff --git a/chain/near/src/chain.rs b/chain/near/src/chain.rs index f351db45bbb..79760af1dc4 100644 --- a/chain/near/src/chain.rs +++ b/chain/near/src/chain.rs @@ -404,11 +404,12 @@ impl FirehoseMapperTrait for FirehoseMapper { #[cfg(test)] mod test { - use std::{collections::HashSet, vec}; + use std::{collections::HashSet, sync::Arc, vec}; use graph::{ - blockchain::{block_stream::BlockWithTriggers, TriggersAdapter as _}, - prelude::tokio, + blockchain::{block_stream::BlockWithTriggers, DataSource as _, TriggersAdapter as _}, + prelude::{tokio, Link}, + semver::Version, slog::{self, o, Logger}, }; @@ -417,15 +418,322 @@ mod test { codec::{ self, execution_outcome, receipt::{self}, - BlockHeader, DataReceiver, ExecutionOutcome, ExecutionOutcomeWithId, + Block, BlockHeader, DataReceiver, ExecutionOutcome, ExecutionOutcomeWithId, IndexerExecutionOutcomeWithReceipt, IndexerShard, ReceiptAction, SuccessValueExecutionStatus, }, + data_source::{DataSource, Mapping, PartialAccounts, ReceiptHandler, NEAR_KIND}, + trigger::{NearTrigger, ReceiptWithOutcome}, Chain, }; use super::TriggersAdapter; + #[test] + fn validate_empty() { + let ds = new_data_source(None, None); + let errs = ds.validate(); + assert_eq!(errs.len(), 1, "{:?}", ds); + assert_eq!(errs[0].to_string(), "subgraph source address is required"); + } + + #[test] + fn validate_empty_account_none_partial() { + let ds = new_data_source(None, Some(PartialAccounts::default())); + let errs = ds.validate(); + assert_eq!(errs.len(), 1, "{:?}", ds); + assert_eq!(errs[0].to_string(), "subgraph source address is required"); + } + + #[test] + fn validate_empty_account() { + let ds = new_data_source( + None, + Some(PartialAccounts { + prefixes: vec![], + suffixes: vec!["x.near".to_string()], + }), + ); + let errs = ds.validate(); + assert_eq!(errs.len(), 0, "{:?}", ds); + } + + #[test] + fn validate_empty_prefix_and_suffix_values() { + let ds = new_data_source( + None, + Some(PartialAccounts { + prefixes: vec!["".to_string()], + suffixes: vec!["".to_string()], + }), + ); + let errs: Vec = ds + .validate() + .into_iter() + .map(|err| err.to_string()) + .collect(); + assert_eq!(errs.len(), 2, "{:?}", ds); + + let expected_errors = vec![ + "partial account prefixes can't have empty values".to_string(), + "partial account suffixes can't have empty values".to_string(), + ]; + assert_eq!( + true, + expected_errors.iter().all(|err| errs.contains(err)), + "{:?}", + errs + ); + } + + #[test] + fn validate_empty_partials() { + let ds = new_data_source(Some("x.near".to_string()), None); + let errs = ds.validate(); + assert_eq!(errs.len(), 0, "{:?}", ds); + } + + #[test] + fn receipt_filter_from_ds() { + struct Case { + name: String, + account: Option, + partial_accounts: Option, + expected: HashSet<(Option, Option)>, + } + + let cases = vec![ + Case { + name: "2 prefix && 1 suffix".into(), + account: None, + partial_accounts: Some(PartialAccounts { + prefixes: vec!["a".to_string(), "b".to_string()], + suffixes: vec!["d".to_string()], + }), + expected: HashSet::from_iter(vec![ + (Some("a".to_string()), Some("d".to_string())), + (Some("b".to_string()), Some("d".to_string())), + ]), + }, + Case { + name: "1 prefix && 2 suffix".into(), + account: None, + partial_accounts: Some(PartialAccounts { + prefixes: vec!["a".to_string()], + suffixes: vec!["c".to_string(), "d".to_string()], + }), + expected: HashSet::from_iter(vec![ + (Some("a".to_string()), Some("c".to_string())), + (Some("a".to_string()), Some("d".to_string())), + ]), + }, + Case { + name: "no prefix".into(), + account: None, + partial_accounts: Some(PartialAccounts { + prefixes: vec![], + suffixes: vec!["c".to_string(), "d".to_string()], + }), + expected: HashSet::from_iter(vec![ + (None, Some("c".to_string())), + (None, Some("d".to_string())), + ]), + }, + Case { + name: "no suffix".into(), + account: None, + partial_accounts: Some(PartialAccounts { + prefixes: vec!["a".to_string(), "b".to_string()], + suffixes: vec![], + }), + expected: HashSet::from_iter(vec![ + (Some("a".to_string()), None), + (Some("b".to_string()), None), + ]), + }, + ]; + + for case in cases.into_iter() { + let ds1 = new_data_source(case.account, None); + let ds2 = new_data_source(None, case.partial_accounts); + + let receipt = NearReceiptFilter::from_data_sources(vec![&ds1, &ds2]); + assert_eq!( + receipt.partial_accounts.len(), + case.expected.len(), + "name: {}\npartial_accounts: {:?}", + case.name, + receipt.partial_accounts, + ); + assert_eq!( + true, + case.expected + .iter() + .all(|x| receipt.partial_accounts.contains(&x)), + "name: {}\npartial_accounts: {:?}", + case.name, + receipt.partial_accounts, + ); + } + } + + #[test] + fn data_source_match_and_decode() { + struct Request { + account: String, + matches: bool, + } + struct Case { + name: String, + account: Option, + partial_accounts: Option, + expected: Vec, + } + + let cases = vec![ + Case { + name: "2 prefix && 1 suffix".into(), + account: None, + partial_accounts: Some(PartialAccounts { + prefixes: vec!["a".to_string(), "b".to_string()], + suffixes: vec!["d".to_string()], + }), + expected: vec![ + Request { + account: "ssssssd".to_string(), + matches: false, + }, + Request { + account: "asasdasdas".to_string(), + matches: false, + }, + Request { + account: "asd".to_string(), + matches: true, + }, + Request { + account: "bsd".to_string(), + matches: true, + }, + ], + }, + Case { + name: "1 prefix && 2 suffix".into(), + account: None, + partial_accounts: Some(PartialAccounts { + prefixes: vec!["a".to_string()], + suffixes: vec!["c".to_string(), "d".to_string()], + }), + expected: vec![ + Request { + account: "ssssssd".to_string(), + matches: false, + }, + Request { + account: "asasdasdas".to_string(), + matches: false, + }, + Request { + account: "asdc".to_string(), + matches: true, + }, + Request { + account: "absd".to_string(), + matches: true, + }, + ], + }, + Case { + name: "no prefix with exact match".into(), + account: Some("bsda".to_string()), + partial_accounts: Some(PartialAccounts { + prefixes: vec![], + suffixes: vec!["c".to_string(), "d".to_string()], + }), + expected: vec![ + Request { + account: "ssssss".to_string(), + matches: false, + }, + Request { + account: "asasdasdas".to_string(), + matches: false, + }, + Request { + account: "asdasdasdasdc".to_string(), + matches: true, + }, + Request { + account: "bsd".to_string(), + matches: true, + }, + Request { + account: "bsda".to_string(), + matches: true, + }, + ], + }, + Case { + name: "no suffix with exact match".into(), + account: Some("zbsd".to_string()), + partial_accounts: Some(PartialAccounts { + prefixes: vec!["a".to_string(), "b".to_string()], + suffixes: vec![], + }), + expected: vec![ + Request { + account: "ssssssd".to_string(), + matches: false, + }, + Request { + account: "zasdasdas".to_string(), + matches: false, + }, + Request { + account: "asa".to_string(), + matches: true, + }, + Request { + account: "bsb".to_string(), + matches: true, + }, + Request { + account: "zbsd".to_string(), + matches: true, + }, + ], + }, + ]; + + let logger = Logger::root(slog::Discard, o!()); + for case in cases.into_iter() { + let ds = new_data_source(case.account, case.partial_accounts); + let filter = NearReceiptFilter::from_data_sources(vec![&ds]); + + for req in case.expected { + let res = filter.matches(&req.account); + assert_eq!( + res, req.matches, + "name: {} request:{} failed", + case.name, req.account + ); + + let block = Arc::new(new_success_block(11, &req.account)); + let receipt = Arc::new(new_receipt_with_outcome(&req.account, block.clone())); + let res = ds + .match_and_decode(&NearTrigger::Receipt(receipt.clone()), &block, &logger) + .expect("unable to process block"); + assert_eq!( + req.matches, + res.is_some(), + "case name: {} req: {}", + case.name, + req.account + ); + } + } + } + #[tokio::test] async fn test_trigger_filter_empty() { let account1: String = "account1".into(); @@ -482,6 +790,7 @@ mod test { let filter = TriggerFilter { receipt_filter: NearReceiptFilter { accounts: HashSet::from_iter(vec![account1]), + partial_accounts: HashSet::new(), }, ..Default::default() }; @@ -543,4 +852,60 @@ mod test { ..Default::default() } } + + fn new_data_source( + account: Option, + partial_accounts: Option, + ) -> DataSource { + DataSource { + kind: NEAR_KIND.to_string(), + network: None, + name: "asd".to_string(), + source: crate::data_source::Source { + account, + start_block: 10, + accounts: partial_accounts, + }, + mapping: Mapping { + api_version: Version::parse("1.0.0").expect("unable to parse version"), + language: "".to_string(), + entities: vec![], + block_handlers: vec![], + receipt_handlers: vec![ReceiptHandler { + handler: "asdsa".to_string(), + }], + runtime: Arc::new(vec![]), + link: Link::default(), + }, + context: Arc::new(None), + creation_block: None, + } + } + + fn new_receipt_with_outcome(receiver_id: &String, block: Arc) -> ReceiptWithOutcome { + ReceiptWithOutcome { + outcome: ExecutionOutcomeWithId { + outcome: Some(ExecutionOutcome { + status: Some(execution_outcome::Status::SuccessValue( + SuccessValueExecutionStatus::default(), + )), + + ..Default::default() + }), + ..Default::default() + }, + receipt: codec::Receipt { + receipt: Some(receipt::Receipt::Action(ReceiptAction { + output_data_receivers: vec![DataReceiver { + receiver_id: receiver_id.clone(), + ..Default::default() + }], + ..Default::default() + })), + receiver_id: receiver_id.clone(), + ..Default::default() + }, + block, + } + } } diff --git a/chain/near/src/data_source.rs b/chain/near/src/data_source.rs index dda1e59a6ae..320c7bb46b1 100644 --- a/chain/near/src/data_source.rs +++ b/chain/near/src/data_source.rs @@ -14,7 +14,7 @@ use graph::{ use std::{convert::TryFrom, sync::Arc}; use crate::chain::Chain; -use crate::trigger::NearTrigger; +use crate::trigger::{NearTrigger, ReceiptWithOutcome}; pub const NEAR_KIND: &str = "near"; @@ -49,6 +49,38 @@ impl blockchain::DataSource for DataSource { return Ok(None); } + fn account_matches(ds: &DataSource, receipt: &Arc) -> bool { + if Some(&receipt.receipt.receiver_id) == ds.source.account.as_ref() { + return true; + } + + if let Some(partial_accounts) = &ds.source.accounts { + let matches_prefix = if partial_accounts.prefixes.is_empty() { + true + } else { + partial_accounts + .prefixes + .iter() + .any(|prefix| receipt.receipt.receiver_id.starts_with(prefix)) + }; + + let matches_suffix = if partial_accounts.suffixes.is_empty() { + true + } else { + partial_accounts + .suffixes + .iter() + .any(|suffix| receipt.receipt.receiver_id.ends_with(suffix)) + }; + + if matches_prefix && matches_suffix { + return true; + } + } + + false + } + let handler = match trigger { // A block trigger matches if a block handler is present. NearTrigger::Block(_) => match self.handler_for_block() { @@ -59,7 +91,7 @@ impl blockchain::DataSource for DataSource { // A receipt trigger matches if the receiver matches `source.account` and a receipt // handler is present. NearTrigger::Receipt(receipt) => { - if Some(&receipt.receipt.receiver_id) != self.source.account.as_ref() { + if !account_matches(self, receipt) { return Ok(None); } @@ -147,11 +179,31 @@ impl blockchain::DataSource for DataSource { // Validate that there is a `source` address if there are receipt handlers let no_source_address = self.address().is_none(); + + // Validate that there are no empty PartialAccount. + let no_partial_addresses = match &self.source.accounts { + None => true, + Some(addrs) => addrs.is_empty(), + }; + let has_receipt_handlers = !self.mapping.receipt_handlers.is_empty(); - if no_source_address && has_receipt_handlers { + + // Validate not both address and partial addresses are empty. + if (no_source_address && no_partial_addresses) && has_receipt_handlers { errors.push(SubgraphManifestValidationError::SourceAddressRequired.into()); }; + // Validate empty lines not allowed in suffix or prefix + if let Some(partial_accounts) = self.source.accounts.as_ref() { + if partial_accounts.prefixes.iter().any(|x| x.is_empty()) { + errors.push(anyhow!("partial account prefixes can't have empty values")) + } + + if partial_accounts.suffixes.iter().any(|x| x.is_empty()) { + errors.push(anyhow!("partial account suffixes can't have empty values")) + } + } + // Validate that there are no more than one of both block handlers and receipt handlers if self.mapping.block_handlers.len() > 1 { errors.push(anyhow!("data source has duplicated block handlers")); @@ -390,13 +442,28 @@ pub struct MappingBlockHandler { #[derive(Clone, Debug, Hash, Eq, PartialEq, Deserialize)] pub struct ReceiptHandler { - handler: String, + pub(crate) handler: String, +} + +#[derive(Clone, Debug, Hash, Eq, PartialEq, Deserialize, Default)] +pub(crate) struct PartialAccounts { + #[serde(default)] + pub(crate) prefixes: Vec, + #[serde(default)] + pub(crate) suffixes: Vec, +} + +impl PartialAccounts { + pub fn is_empty(&self) -> bool { + self.prefixes.is_empty() && self.suffixes.is_empty() + } } #[derive(Clone, Debug, Hash, Eq, PartialEq, Deserialize)] pub(crate) struct Source { - // A data source that does not have an account can only have block handlers. + // A data source that does not have an account or accounts can only have block handlers. pub(crate) account: Option, #[serde(rename = "startBlock", default)] pub(crate) start_block: BlockNumber, + pub(crate) accounts: Option, } diff --git a/graph/proto/near/transforms.proto b/graph/proto/near/transforms.proto index 04e7fa34636..6dfe138c8f7 100644 --- a/graph/proto/near/transforms.proto +++ b/graph/proto/near/transforms.proto @@ -5,4 +5,17 @@ option go_package = "github.com/streamingfast/sf-near/pb/sf/near/transform/v1;pb message BasicReceiptFilter { repeated string accounts = 1; + repeated PrefixSuffixPair prefix_and_suffix_pairs = 2; +} + +// PrefixSuffixPair applies a logical AND to prefix and suffix when both fields are non-empty. +// * {prefix="hello",suffix="world"} will match "hello.world" but not "hello.friend" +// * {prefix="hello",suffix=""} will match both "hello.world" and "hello.friend" +// * {prefix="",suffix="world"} will match both "hello.world" and "good.day.world" +// * {prefix="",suffix=""} is invalid +// +// Note that the suffix will usually have a TLD, ex: "mydomain.near" or "mydomain.testnet" +message PrefixSuffixPair { + string prefix = 1; + string suffix = 2; } diff --git a/graph/src/firehose/sf.near.transform.v1.rs b/graph/src/firehose/sf.near.transform.v1.rs index d7df07deddc..86972b6cc54 100644 --- a/graph/src/firehose/sf.near.transform.v1.rs +++ b/graph/src/firehose/sf.near.transform.v1.rs @@ -2,4 +2,20 @@ pub struct BasicReceiptFilter { #[prost(string, repeated, tag="1")] pub accounts: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + #[prost(message, repeated, tag="2")] + pub prefix_and_suffix_pairs: ::prost::alloc::vec::Vec, +} +/// PrefixSuffixPair applies a logical AND to prefix and suffix when both fields are non-empty. +/// * {prefix="hello",suffix="world"} will match "hello.world" but not "hello.friend" +/// * {prefix="hello",suffix=""} will match both "hello.world" and "hello.friend" +/// * {prefix="",suffix="world"} will match both "hello.world" and "good.day.world" +/// * {prefix="",suffix=""} is invalid +/// +/// Note that the suffix will usually have a TLD, ex: "mydomain.near" or "mydomain.testnet" +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PrefixSuffixPair { + #[prost(string, tag="1")] + pub prefix: ::prost::alloc::string::String, + #[prost(string, tag="2")] + pub suffix: ::prost::alloc::string::String, }