Skip to content

Commit

Permalink
add substreams config provider (#3897)
Browse files Browse the repository at this point in the history
  • Loading branch information
mangas authored Sep 6, 2022
1 parent 5009eff commit c1c7523
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 86 deletions.
4 changes: 3 additions & 1 deletion node/resources/tests/full_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ ingestor = "index_0"
[chains.mainnet]
shard = "primary"
provider = [
{ label = "mainnet-0", url = "http://rpc.mainnet.io", features = ["archive", "traces"] }
{ label = "mainnet-0", url = "http://rpc.mainnet.io", features = ["archive", "traces"] },
{ label = "firehose", details = { type = "firehose", url = "http://localhost:9000", features = [] }},
{ label = "substreams", details = { type = "substreams", url = "http://localhost:9000", features = [] }},
]

[chains.ropsten]
Expand Down
41 changes: 41 additions & 0 deletions node/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,47 @@ pub async fn create_ethereum_networks(
Ok(parsed_networks)
}

pub fn create_substreams_networks(
logger: Logger,
config: &Config,
) -> BTreeMap<BlockchainKind, FirehoseNetworks> {
debug!(
logger,
"Creating firehose networks [{} chains, ingestor {}]",
config.chains.chains.len(),
config.chains.ingestor,
);

let mut networks_by_kind = BTreeMap::new();

for (name, chain) in &config.chains.chains {
for provider in &chain.providers {
if let ProviderDetails::Substreams(ref firehose) = provider.details {
info!(
logger,
"Configuring firehose endpoint";
"provider" => &provider.label,
);

let endpoint = FirehoseEndpoint::new(
&provider.label,
&firehose.url,
firehose.token.clone(),
firehose.filters_enabled(),
firehose.conn_pool_size,
);

let parsed_networks = networks_by_kind
.entry(chain.protocol)
.or_insert_with(|| FirehoseNetworks::new());
parsed_networks.insert(name.to_string(), Arc::new(endpoint));
}
}
}

networks_by_kind
}

pub fn create_firehose_networks(
logger: Logger,
config: &Config,
Expand Down
30 changes: 27 additions & 3 deletions node/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,7 @@ pub struct Provider {
pub enum ProviderDetails {
Firehose(FirehoseProvider),
Web3(Web3Provider),
Substreams(FirehoseProvider),
}

const FIREHOSE_FILTER_FEATURE: &str = "filters";
Expand Down Expand Up @@ -623,8 +624,7 @@ impl Web3Provider {
pub fn limit_for(&self, node: &NodeId) -> usize {
self.rules
.iter()
.filter_map(|l| l.limit_for(node))
.next()
.find_map(|l| l.limit_for(node))
.unwrap_or(usize::MAX)
}
}
Expand All @@ -637,7 +637,8 @@ impl Provider {
validate_name(&self.label).context("illegal provider name")?;

match self.details {
ProviderDetails::Firehose(ref mut firehose) => {
ProviderDetails::Firehose(ref mut firehose)
| ProviderDetails::Substreams(ref mut firehose) => {
firehose.url = shellexpand::env(&firehose.url)?.into_owned();

// A Firehose url must be a valid Uri since gRPC library we use (Tonic)
Expand Down Expand Up @@ -1339,6 +1340,29 @@ mod tests {
);
}

#[test]
fn it_works_on_substreams_provider_from_toml() {
let actual = toml::from_str(
r#"
label = "bananas"
details = { type = "substreams", url = "http://localhost:9000", features = [] }
"#,
)
.unwrap();

assert_eq!(
Provider {
label: "bananas".to_owned(),
details: ProviderDetails::Substreams(FirehoseProvider {
url: "http://localhost:9000".to_owned(),
token: None,
features: BTreeSet::new(),
conn_pool_size: 10,
}),
},
actual
);
}
#[test]
fn it_works_on_new_firehose_provider_from_toml_no_features() {
let actual = toml::from_str(
Expand Down
114 changes: 32 additions & 82 deletions node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ use ethereum::{
};
use git_testament::{git_testament, render_testament};
use graph::blockchain::firehose_block_ingestor::FirehoseBlockIngestor;
use graph::blockchain::{
Block as BlockchainBlock, Blockchain, BlockchainKind, BlockchainMap, ChainIdentifier,
};
use graph::blockchain::{Block as BlockchainBlock, Blockchain, BlockchainKind, BlockchainMap};
use graph::components::store::BlockStore;
use graph::data::graphql::effort::LoadManager;
use graph::env::EnvVars;
Expand All @@ -28,7 +26,7 @@ use graph_core::{
use graph_graphql::prelude::GraphQlRunner;
use graph_node::chain::{
connect_ethereum_networks, connect_firehose_networks, create_ethereum_networks,
create_firehose_networks, create_ipfs_clients,
create_firehose_networks, create_ipfs_clients, create_substreams_networks,
};
use graph_node::config::Config;
use graph_node::opt;
Expand Down Expand Up @@ -236,6 +234,12 @@ async fn main() {
create_firehose_networks(logger.clone(), &config)
};

let substreams_networks_by_kind = if query_only {
BTreeMap::new()
} else {
create_substreams_networks(logger.clone(), &config)
};

let graphql_metrics_registry = metrics_registry.clone();

let contention_logger = logger.clone();
Expand Down Expand Up @@ -291,30 +295,11 @@ async fn main() {
)
.await;

let substreams_networks = firehose_networks_by_kind
.remove(&BlockchainKind::Substreams)
.unwrap_or_else(|| FirehoseNetworks::new());

let substreams_idents: Vec<(String, Vec<ChainIdentifier>)> = substreams_networks
.flatten()
.into_iter()
.map(|(ident, _)| {
(
ident,
vec![ChainIdentifier {
net_version: "0".to_string(),
genesis_block_hash: BlockHash::zero(),
}],
)
})
.collect();

let network_identifiers = ethereum_idents
.into_iter()
.chain(arweave_idents)
.chain(near_idents)
.chain(cosmos_idents)
.chain(substreams_idents)
.collect();

let network_store = store_builder.network_store(network_identifiers);
Expand All @@ -334,10 +319,12 @@ async fn main() {
node_id.clone(),
metrics_registry.clone(),
firehose_networks_by_kind.get(&BlockchainKind::Ethereum),
substreams_networks_by_kind.get(&BlockchainKind::Ethereum),
&eth_networks,
network_store.as_ref(),
chain_head_update_listener,
&logger_factory,
metrics_registry.clone(),
);

let near_chains = near_networks_as_chains(
Expand All @@ -358,15 +345,6 @@ async fn main() {
metrics_registry.clone(),
);

let _substreams_chains = substreams_networks_as_chains(
&mut blockchain_map,
&logger,
&substreams_networks,
network_store.as_ref(),
&logger_factory,
metrics_registry.clone(),
);

let blockchain_map = Arc::new(blockchain_map);

let load_manager = Arc::new(LoadManager::new(
Expand Down Expand Up @@ -658,10 +636,12 @@ fn ethereum_networks_as_chains(
node_id: NodeId,
registry: Arc<MetricsRegistry>,
firehose_networks: Option<&FirehoseNetworks>,
substreams_networks: Option<&FirehoseNetworks>,
eth_networks: &EthereumNetworks,
store: &Store,
chain_head_update_listener: Arc<ChainHeadUpdateListener>,
logger_factory: &LoggerFactory,
metrics_registry: Arc<MetricsRegistry>,
) -> HashMap<String, Arc<ethereum::Chain>> {
let chains: Vec<_> = eth_networks
.networks
Expand Down Expand Up @@ -727,6 +707,26 @@ fn ethereum_networks_as_chains(
blockchain_map.insert::<graph_chain_ethereum::Chain>(network_name, chain)
}

if let Some(substreams_networks) = substreams_networks {
for (network_name, firehose_endpoints) in substreams_networks.networks.iter() {
let chain_store = blockchain_map
.get::<graph_chain_ethereum::Chain>(network_name.clone())
.expect("any substreams endpoint needs an rpc or firehose chain defined")
.chain_store();

blockchain_map.insert::<substreams::Chain>(
network_name.clone(),
Arc::new(substreams::Chain::new(
logger_factory.clone(),
firehose_endpoints.clone(),
metrics_registry.clone(),
chain_store,
Arc::new(substreams::BlockStreamBuilder::new()),
)),
);
}
}

HashMap::from_iter(chains)
}

Expand Down Expand Up @@ -830,56 +830,6 @@ fn near_networks_as_chains(
HashMap::from_iter(chains)
}

/// Return the hashmap of SUBSTREAMS chains and also add them to `blockchain_map`.
fn substreams_networks_as_chains(
blockchain_map: &mut BlockchainMap,
logger: &Logger,
firehose_networks: &FirehoseNetworks,
store: &Store,
logger_factory: &LoggerFactory,
metrics_registry: Arc<MetricsRegistry>,
) -> HashMap<String, FirehoseChain<substreams::Chain>> {
let chains: Vec<_> = firehose_networks
.networks
.iter()
.filter_map(|(chain_id, endpoints)| {
store
.block_store()
.chain_store(chain_id)
.map(|chain_store| (chain_id, chain_store, endpoints))
.or_else(|| {
error!(
logger,
"No store configured for SUBSTREAMS chain {}; ignoring this chain",
chain_id
);
None
})
})
.map(|(chain_id, chain_store, endpoints)| {
(
chain_id.clone(),
FirehoseChain {
chain: Arc::new(substreams::Chain::new(
logger_factory.clone(),
endpoints.clone(),
metrics_registry.clone(),
chain_store,
Arc::new(substreams::BlockStreamBuilder::new()),
)),
firehose_endpoints: endpoints.clone(),
},
)
})
.collect();

for (chain_id, firehose_chain) in chains.iter() {
blockchain_map.insert::<substreams::Chain>(chain_id.clone(), firehose_chain.chain.clone())
}

HashMap::from_iter(chains)
}

fn start_block_ingestor(
logger: &Logger,
logger_factory: &LoggerFactory,
Expand Down

0 comments on commit c1c7523

Please sign in to comment.