Skip to content

Commit

Permalink
offchain: Remove dses that have been processed
Browse files Browse the repository at this point in the history
  • Loading branch information
leoyvens committed Aug 18, 2022
1 parent 47a6ece commit 76d856c
Show file tree
Hide file tree
Showing 12 changed files with 113 additions and 14 deletions.
16 changes: 9 additions & 7 deletions core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::subgraph::stream::new_block_stream;
use atomic_refcell::AtomicRefCell;
use graph::blockchain::block_stream::{BlockStreamEvent, BlockWithTriggers, FirehoseCursor};
use graph::blockchain::{Block, Blockchain, TriggerFilter as _};
use graph::components::store::StoredDynamicDataSource;
use graph::components::{
store::ModificationsAndCache,
subgraph::{CausalityRegion, MappingError, ProofOfIndexing, SharedProofOfIndexing},
Expand Down Expand Up @@ -327,11 +328,10 @@ where

// Check for offchain events and process them, including their entity modifications in the
// set to be transacted.
{
let offchain_events = self.ctx.offchain_monitor.ready_offchain_events()?;
let offchain_mods = self.handle_offchain_triggers(offchain_events).await?;
mods.extend(offchain_mods);
}
let offchain_events = self.ctx.offchain_monitor.ready_offchain_events()?;
let (offchain_mods, offchain_to_remove) =
self.handle_offchain_triggers(offchain_events).await?;
mods.extend(offchain_mods);

// Put the cache back in the state, asserting that the placeholder cache was not used.
assert!(self.state.entity_lfu_cache.is_empty());
Expand Down Expand Up @@ -385,6 +385,7 @@ where
data_sources,
deterministic_errors,
self.inputs.manifest_idx_and_name.clone(),
offchain_to_remove,
)
.await
.context("Failed to transact block operations")?;
Expand Down Expand Up @@ -564,7 +565,7 @@ where
async fn handle_offchain_triggers(
&mut self,
triggers: Vec<offchain::TriggerData>,
) -> Result<Vec<EntityModification>, Error> {
) -> Result<(Vec<EntityModification>, Vec<StoredDynamicDataSource>), Error> {
// TODO: Dont expose store with onchain entites
let mut block_state =
BlockState::<C>::new(self.inputs.store.cheap_clone(), LfuCache::new());
Expand Down Expand Up @@ -604,7 +605,8 @@ where
"Attempted to create data source in offchain data source handler. This is not yet supported.",
);

Ok(block_state.entity_cache.as_modifications()?.modifications)
let mods = block_state.entity_cache.as_modifications()?.modifications;
Ok((mods, block_state.offchain_to_remove))
}
}

Expand Down
7 changes: 7 additions & 0 deletions core/src/subgraph/trigger_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ where
.await?;
let elapsed = start.elapsed().as_secs_f64();
subgraph_metrics.observe_trigger_processing_duration(elapsed);

if host.data_source().as_offchain().is_some() {
// Remove this offchain data source since it has just been processed.
state
.offchain_to_remove
.push(host.data_source().as_stored_dynamic_data_source());
}
}

if let Some(proof_of_indexing) = proof_of_indexing {
Expand Down
1 change: 1 addition & 0 deletions graph/src/components/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ pub trait WritableStore: Send + Sync + 'static {
data_sources: Vec<StoredDynamicDataSource>,
deterministic_errors: Vec<SubgraphError>,
manifest_idx_and_name: Vec<(u32, String)>,
offchain_to_remove: Vec<StoredDynamicDataSource>,
) -> Result<(), StoreError>;

/// Look up multiple entities as of the latest block. Returns a map of
Expand Down
13 changes: 11 additions & 2 deletions graph/src/components/subgraph/instance.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use crate::{
blockchain::Blockchain, components::store::WritableStore,
data::subgraph::schema::SubgraphError, data_source::DataSourceTemplate, prelude::*,
blockchain::Blockchain,
components::store::{StoredDynamicDataSource, WritableStore},
data::subgraph::schema::SubgraphError,
data_source::DataSourceTemplate,
prelude::*,
util::lfu_cache::LfuCache,
};

Expand All @@ -21,6 +24,9 @@ pub struct BlockState<C: Blockchain> {
// Data sources created in the current handler.
handler_created_data_sources: Vec<DataSourceTemplateInfo<C>>,

// offchain data sources to be removed because they've been processed.
pub offchain_to_remove: Vec<StoredDynamicDataSource>,

// Marks whether a handler is currently executing.
in_handler: bool,
}
Expand All @@ -35,6 +41,7 @@ impl<C: Blockchain> BlockState<C> {
deterministic_errors: Vec::new(),
created_data_sources: Vec::new(),
handler_created_data_sources: Vec::new(),
offchain_to_remove: Vec::new(),
in_handler: false,
}
}
Expand All @@ -47,6 +54,7 @@ impl<C: Blockchain> BlockState<C> {
deterministic_errors,
created_data_sources,
handler_created_data_sources,
offchain_to_remove,
in_handler,
} = self;

Expand All @@ -56,6 +64,7 @@ impl<C: Blockchain> BlockState<C> {
}
deterministic_errors.extend(other.deterministic_errors);
entity_cache.extend(other.entity_cache);
offchain_to_remove.extend(other.offchain_to_remove);
}

pub fn has_errors(&self) -> bool {
Expand Down
6 changes: 2 additions & 4 deletions graph/src/data_source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,8 @@ impl<C: Blockchain> DataSource<C> {
match (self, other) {
(Self::Onchain(a), Self::Onchain(b)) => a.is_duplicate_of(b),
(Self::Offchain(a), Self::Offchain(b)) => {
a.kind == b.kind
&& a.name == b.name
&& a.source == b.source
&& a.context == b.context
// See also: data-source-is-duplicate-of
a.manifest_idx == b.manifest_idx && a.source == b.source && a.context == b.context
}
_ => false,
}
Expand Down
1 change: 1 addition & 0 deletions graph/tests/entity_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ impl WritableStore for MockStore {
_: Vec<StoredDynamicDataSource>,
_: Vec<SubgraphError>,
_: Vec<(u32, String)>,
_: Vec<StoredDynamicDataSource>,
) -> Result<(), StoreError> {
unimplemented!()
}
Expand Down
1 change: 0 additions & 1 deletion graphql/tests/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,6 @@ fn mixed_parent_child_id() {
};
let data = extract_data!(result).unwrap();
assert_eq!(data, exp);
dbg!(&data);
})
}

Expand Down
3 changes: 3 additions & 0 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,7 @@ impl DeploymentStore {
data_sources: &[StoredDynamicDataSource],
deterministic_errors: &[SubgraphError],
manifest_idx_and_name: &[(u32, String)],
offchain_to_remove: &[StoredDynamicDataSource],
) -> Result<StoreEvent, StoreError> {
// All operations should apply only to data or metadata for this subgraph
if mods
Expand Down Expand Up @@ -1019,6 +1020,8 @@ impl DeploymentStore {
manifest_idx_and_name,
)?;

dynds::remove_offchain(&conn, &site, offchain_to_remove)?;

if !deterministic_errors.is_empty() {
deployment::insert_subgraph_errors(
&conn,
Expand Down
18 changes: 18 additions & 0 deletions store/postgres/src/dynds/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use diesel::PgConnection;
use graph::{
blockchain::BlockPtr,
components::store::StoredDynamicDataSource,
constraint_violation,
prelude::{BlockNumber, StoreError},
};

Expand Down Expand Up @@ -56,3 +57,20 @@ pub(crate) fn revert(
false => shared::revert(conn, &site.deployment, block),
}
}

pub(crate) fn remove_offchain(
conn: &PgConnection,
site: &Site,
data_sources: &[StoredDynamicDataSource],
) -> Result<(), StoreError> {
if data_sources.len() == 0 {
return Ok(());
}

match site.schema_version.private_data_sources() {
true => DataSourcesTable::new(site.namespace.clone()).remove_offchain(conn, data_sources),
false => Err(constraint_violation!(
"shared schema does not support data source offchain_found",
)),
}
}
44 changes: 44 additions & 0 deletions store/postgres/src/dynds/private.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,4 +215,48 @@ impl DataSourcesTable {

Ok(count)
}

// Remove offchain data sources by checking for equality. Their range will be set to the empty range.
pub(super) fn remove_offchain(
&self,
conn: &PgConnection,
data_sources: &[StoredDynamicDataSource],
) -> Result<(), StoreError> {
for ds in data_sources {
let StoredDynamicDataSource {
manifest_idx,
param,
context,
creation_block,
} = ds;

let query = format!(
"update {} set block_range = 'empty'::int4range \
where manifest_idx = $1
and param is not distinct from $2
and context is not distinct from $3
and lower(block_range) is not distinct from $4",
self.qname
);

let count = sql_query(query)
.bind::<Integer, _>(*manifest_idx as i32)
.bind::<Nullable<Binary>, _>(param.as_ref().map(|p| &**p))
.bind::<Nullable<Jsonb>, _>(context)
.bind::<Nullable<Integer>, _>(creation_block)
.execute(conn)?;

if count > 1 {
// Data source deduplication enforces this invariant.
// See also: data-source-is-duplicate-of
return Err(constraint_violation!(
"expected to remove at most one offchain data source but removed {}, ds: {:?}",
count,
ds
));
}
}

Ok(())
}
}
15 changes: 15 additions & 0 deletions store/postgres/src/writable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ impl SyncStore {
data_sources: &[StoredDynamicDataSource],
deterministic_errors: &[SubgraphError],
manifest_idx_and_name: &[(u32, String)],
offchain_to_remove: &[StoredDynamicDataSource],
) -> Result<(), StoreError> {
fn same_subgraph(mods: &[EntityModification], id: &DeploymentHash) -> bool {
mods.iter().all(|md| &md.entity_key().subgraph_id == id)
Expand All @@ -272,6 +273,7 @@ impl SyncStore {
data_sources,
deterministic_errors,
manifest_idx_and_name,
offchain_to_remove,
)?;

let _section = stopwatch.start_section("send_store_event");
Expand Down Expand Up @@ -428,6 +430,7 @@ enum Request {
data_sources: Vec<StoredDynamicDataSource>,
deterministic_errors: Vec<SubgraphError>,
manifest_idx_and_name: Vec<(u32, String)>,
offchain_to_remove: Vec<StoredDynamicDataSource>,
},
RevertTo {
store: Arc<SyncStore>,
Expand All @@ -449,6 +452,7 @@ impl Request {
data_sources,
deterministic_errors,
manifest_idx_and_name,
offchain_to_remove,
} => store.transact_block_operations(
block_ptr_to,
firehose_cursor,
Expand All @@ -457,6 +461,7 @@ impl Request {
data_sources,
deterministic_errors,
manifest_idx_and_name,
offchain_to_remove,
),
Request::RevertTo {
store,
Expand Down Expand Up @@ -763,10 +768,15 @@ impl Queue {
Request::Write {
block_ptr,
data_sources,
offchain_to_remove,
..
} => {
if tracker.visible(block_ptr) {
dds.extend(data_sources.clone());
dds = dds
.into_iter()
.filter(|dds| offchain_to_remove.contains(dds))
.collect();
}
}
Request::RevertTo { .. } => { /* nothing to do */ }
Expand Down Expand Up @@ -819,6 +829,7 @@ impl Writer {
data_sources: Vec<StoredDynamicDataSource>,
deterministic_errors: Vec<SubgraphError>,
manifest_idx_and_name: Vec<(u32, String)>,
offchain_to_remove: Vec<StoredDynamicDataSource>,
) -> Result<(), StoreError> {
match self {
Writer::Sync(store) => store.transact_block_operations(
Expand All @@ -829,6 +840,7 @@ impl Writer {
&data_sources,
&deterministic_errors,
&manifest_idx_and_name,
&offchain_to_remove,
),
Writer::Async(queue) => {
let req = Request::Write {
Expand All @@ -840,6 +852,7 @@ impl Writer {
data_sources,
deterministic_errors,
manifest_idx_and_name,
offchain_to_remove,
};
queue.push(req).await
}
Expand Down Expand Up @@ -1019,6 +1032,7 @@ impl WritableStoreTrait for WritableStore {
data_sources: Vec<StoredDynamicDataSource>,
deterministic_errors: Vec<SubgraphError>,
manifest_idx_and_name: Vec<(u32, String)>,
offchain_to_remove: Vec<StoredDynamicDataSource>,
) -> Result<(), StoreError> {
self.writer
.write(
Expand All @@ -1029,6 +1043,7 @@ impl WritableStoreTrait for WritableStore {
data_sources,
deterministic_errors,
manifest_idx_and_name,
offchain_to_remove,
)
.await?;

Expand Down
2 changes: 2 additions & 0 deletions store/test-store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ pub async fn transact_errors(
Vec::new(),
errs,
Vec::new(),
Vec::new(),
)
.await?;
flush(deployment).await
Expand Down Expand Up @@ -307,6 +308,7 @@ pub async fn transact_entities_and_dynamic_data_sources(
data_sources,
Vec::new(),
manifest_idx_and_name,
Vec::new(),
)
.await
}
Expand Down

0 comments on commit 76d856c

Please sign in to comment.