Skip to content

Commit

Permalink
core: Refactor OffchainMonitor::add_data_source
Browse files Browse the repository at this point in the history
  • Loading branch information
leoyvens committed Aug 15, 2022
1 parent c3cdeec commit 8e1de3c
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 20 deletions.
24 changes: 18 additions & 6 deletions core/src/subgraph/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ use self::instance::SubgraphInstance;

pub type SharedInstanceKeepAliveMap = Arc<RwLock<HashMap<DeploymentId, CancelGuard>>>;

// The context keeps track of mutable in-memory state that is retained across blocks.
//
// Currently most of the changes are applied in `runner.rs`, but ideally more of that would be
// refactored into the context so it wouldn't need `pub` fields. The entity cache should probaby
// also be moved here.
pub(crate) struct IndexingContext<C, T>
where
T: RuntimeHostBuilder<C>,
Expand All @@ -35,7 +40,7 @@ where
instance: SubgraphInstance<C, T>,
pub instances: SharedInstanceKeepAliveMap,
pub filter: C::TriggerFilter,
pub(crate) offchain_monitor: OffchainMonitor,
pub offchain_monitor: OffchainMonitor,
trigger_processor: Box<dyn TriggerProcessor<C, T>>,
}

Expand Down Expand Up @@ -109,6 +114,13 @@ impl<C: Blockchain, T: RuntimeHostBuilder<C>> IndexingContext<C, T> {
.await
}

// Removes data sources hosts with a creation block greater or equal to `reverted_block`, so
// that they are no longer candidates for `process_trigger`.
//
// This does not currently affect the `offchain_monitor` or the `filter`, so they will continue
// to include data sources that have been reverted. This is not ideal for performance, but it
// does not affect correctness since triggers that have no matching host will be ignored by
// `process_trigger`.
pub fn revert_data_sources(&mut self, reverted_block: BlockNumber) {
self.instance.revert_data_sources(reverted_block)
}
Expand All @@ -118,14 +130,16 @@ impl<C: Blockchain, T: RuntimeHostBuilder<C>> IndexingContext<C, T> {
logger: &Logger,
data_source: DataSource<C>,
) -> Result<Option<Arc<T::Host>>, Error> {
if let DataSource::Offchain(ds) = &data_source {
self.offchain_monitor.add_source(&ds.source)?;
}
self.instance.add_dynamic_data_source(logger, data_source)
}
}

pub(crate) struct OffchainMonitor {
ipfs_monitor: PollingMonitor<Cid>,
ipfs_monitor_rx: mpsc::Receiver<(Cid, Bytes)>,
data_sources: Vec<offchain::DataSource>,
}

impl OffchainMonitor {
Expand All @@ -145,15 +159,13 @@ impl OffchainMonitor {
Self {
ipfs_monitor,
ipfs_monitor_rx,
data_sources: Vec::new(),
}
}

pub fn add_data_source(&mut self, ds: offchain::DataSource) -> Result<(), Error> {
match ds.source {
fn add_source(&mut self, source: &offchain::Source) -> Result<(), Error> {
match source {
offchain::Source::Ipfs(cid) => self.ipfs_monitor.monitor(cid.clone()),
};
self.data_sources.push(ds);
Ok(())
}

Expand Down
7 changes: 5 additions & 2 deletions core/src/subgraph/context/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,18 @@ where
// we use the same order here as in the subgraph manifest to make the
// event processing behavior predictable
for ds in manifest.data_sources {
// TODO: This is duplicating code from `IndexingContext::add_dynamic_data_source` and
// `SubgraphInstance::add_dynamic_data_source`. Ideally this should be refactored into
// `IndexingContext`.

let runtime = ds.runtime();
let module_bytes = match runtime {
None => continue,
Some(ref module_bytes) => module_bytes,
};

// Create services for static offchain data sources
if let DataSource::Offchain(ds) = &ds {
offchain_monitor.add_data_source(ds.clone())?;
offchain_monitor.add_source(&ds.source)?;
}

let host = this.new_host(logger.cheap_clone(), ds, module_bytes)?;
Expand Down
13 changes: 1 addition & 12 deletions core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,12 +225,6 @@ where
let (data_sources, runtime_hosts) =
self.create_dynamic_data_sources(block_state.drain_created_data_sources())?;

for ds in &data_sources {
if let DataSource::Offchain(ds) = ds {
self.ctx.offchain_monitor.add_data_source(ds.clone())?
}
}

let filter = C::TriggerFilter::from_data_sources(
data_sources.iter().filter_map(DataSource::as_onchain),
);
Expand Down Expand Up @@ -883,13 +877,8 @@ where
.set(subgraph_ptr.number as f64);

// Revert the in-memory state:
// - Remove hosts for reverted dynamic data sources.
// - Revert any dynamic data sources.
// - Clear the entity cache.
//
// Note that we do not currently revert the filters, which means the filters
// will be broader than necessary. This is not ideal for performance, but is not
// incorrect since we will discard triggers that match the filters but do not
// match any data sources.
self.ctx.revert_data_sources(subgraph_ptr.number);
self.state.entity_lfu_cache = LfuCache::new();

Expand Down

0 comments on commit 8e1de3c

Please sign in to comment.