Skip to content

Commit

Permalink
Reset BlockStream on error (#3316)
Browse files Browse the repository at this point in the history
* refactor: Make unfail methods return an outcome enum

* refactor: Remove unnecessary .health call and use unfail outcome instead

* fix: Make deterministic unfailure restart the BlockStream

* refactor: Move deterministic fail out of the runner loop

* fix: Update local subgraph block_ptr cache on unfailure
  • Loading branch information
evaporei authored and neysofu committed Mar 8, 2022
1 parent ff2c4ec commit e1543cb
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 80 deletions.
79 changes: 31 additions & 48 deletions core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,33 @@ where
let store_for_err = self.inputs.store.cheap_clone();
let logger = self.ctx.state.logger.cheap_clone();
let id_for_err = self.inputs.deployment.hash.clone();
let mut should_try_unfail_deterministic = true;
let mut should_try_unfail_non_deterministic = true;
let mut synced = false;
let mut skip_ptr_updates_timer = Instant::now();

// If a subgraph failed for deterministic reasons, before start indexing, we first
// revert the deployment head. It should lead to the same result since the error was
// deterministic.
if let Some(current_ptr) = self.inputs.store.block_ptr() {
if let Some(parent_ptr) = self
.inputs
.triggers_adapter
.parent_ptr(&current_ptr)
.await?
{
// This reverts the deployment head to the parent_ptr if
// deterministic errors happened.
//
// There's no point in calling it if we have no current or parent block
// pointers, because there would be: no block to revert to or to search
// errors from (first execution).
let _outcome = self
.inputs
.store
.unfail_deterministic_error(&current_ptr, &parent_ptr)?;
}
}

// Exponential backoff that starts with two minutes and keeps
// increasing its timeout exponentially until it reaches the ceiling.
let mut backoff = ExponentialBackoff::new(MINUTE * 2, *SUBGRAPH_ERROR_RETRY_CEIL_SECS);
Expand Down Expand Up @@ -240,34 +262,6 @@ where
let start = Instant::now();
let deployment_failed = self.ctx.block_stream_metrics.deployment_failed.clone();

// If a subgraph failed for deterministic reasons, before processing a new block, we
// revert the deployment head. It should lead to the same result since the error was
// deterministic.
//
// As an optimization we check this only on the first run.
if should_try_unfail_deterministic {
should_try_unfail_deterministic = false;

if let Some(current_ptr) = self.inputs.store.block_ptr() {
if let Some(parent_ptr) = self
.inputs
.triggers_adapter
.parent_ptr(&current_ptr)
.await?
{
// This reverts the deployment head to the parent_ptr if
// deterministic errors happened.
//
// There's no point in calling it if we have no current or parent block
// pointers, because there would be: no block to revert to or to search
// errors from (first execution).
self.inputs
.store
.unfail_deterministic_error(&current_ptr, &parent_ptr)?;
}
}
}

let res = self
.process_block(
&logger,
Expand Down Expand Up @@ -304,28 +298,17 @@ where
if should_try_unfail_non_deterministic {
// If the deployment head advanced, we can unfail
// the non-deterministic error (if there's any).
self.inputs
let outcome = self
.inputs
.store
.unfail_non_deterministic_error(&block_ptr)?;

match self
.inputs
.store
.health(&self.inputs.deployment.hash)
.await?
{
SubgraphHealth::Failed => {
// If the unfail call didn't change the subgraph health, we keep
// `should_try_unfail_non_deterministic` as `true` until it's
// actually unfailed.
}
SubgraphHealth::Healthy | SubgraphHealth::Unhealthy => {
// Stop trying to unfail.
should_try_unfail_non_deterministic = false;
deployment_failed.set(0.0);
backoff.reset();
}
};
if let UnfailOutcome::Unfailed = outcome {
// Stop trying to unfail.
should_try_unfail_non_deterministic = false;
deployment_failed.set(0.0);
backoff.reset();
}
}

if needs_restart {
Expand Down
13 changes: 11 additions & 2 deletions graph/src/components/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,12 @@ pub enum EntityOperation {
Remove { key: EntityKey },
}

#[derive(Debug, PartialEq)]
pub enum UnfailOutcome {
Noop,
Unfailed,
}

#[derive(Error, Debug)]
pub enum StoreError {
#[error("store error: {0}")]
Expand Down Expand Up @@ -1078,11 +1084,14 @@ pub trait WritableStore: Send + Sync + 'static {
&self,
current_ptr: &BlockPtr,
parent_ptr: &BlockPtr,
) -> Result<(), StoreError>;
) -> Result<UnfailOutcome, StoreError>;

/// If a non-deterministic error happened and the current deployment head is past the error
/// block range, this function unfails the subgraph and deletes the error.
fn unfail_non_deterministic_error(&self, current_ptr: &BlockPtr) -> Result<(), StoreError>;
fn unfail_non_deterministic_error(
&self,
current_ptr: &BlockPtr,
) -> Result<UnfailOutcome, StoreError>;

/// Set subgraph status to failed with the given error as the cause.
async fn fail_subgraph(&self, error: SubgraphError) -> Result<(), StoreError>;
Expand Down
4 changes: 2 additions & 2 deletions graph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ pub mod prelude {
EntityChangeOperation, EntityCollection, EntityFilter, EntityKey, EntityLink,
EntityModification, EntityOperation, EntityOrder, EntityQuery, EntityRange, EntityWindow,
EthereumCallCache, ParentLink, PoolWaitStats, QueryStore, QueryStoreManager, StoreError,
StoreEvent, StoreEventStream, StoreEventStreamBox, SubgraphStore, WindowAttribute,
BLOCK_NUMBER_MAX, SUBSCRIPTION_THROTTLE_INTERVAL,
StoreEvent, StoreEventStream, StoreEventStreamBox, SubgraphStore, UnfailOutcome,
WindowAttribute, BLOCK_NUMBER_MAX, SUBSCRIPTION_THROTTLE_INTERVAL,
};
pub use crate::components::subgraph::{
BlockState, DataSourceTemplateInfo, HostMetrics, RuntimeHost, RuntimeHostBuilder,
Expand Down
10 changes: 7 additions & 3 deletions graph/tests/entity_cache.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use async_trait::async_trait;
use graph::blockchain::BlockPtr;
use graph::data::subgraph::schema::{SubgraphError, SubgraphHealth};
use graph::prelude::{Schema, StopwatchMetrics, StoreError};
use graph::prelude::{Schema, StopwatchMetrics, StoreError, UnfailOutcome};
use lazy_static::lazy_static;
use slog::Logger;
use std::collections::BTreeMap;
Expand Down Expand Up @@ -66,11 +66,15 @@ impl WritableStore for MockStore {
unimplemented!()
}

fn unfail_deterministic_error(&self, _: &BlockPtr, _: &BlockPtr) -> Result<(), StoreError> {
fn unfail_deterministic_error(
&self,
_: &BlockPtr,
_: &BlockPtr,
) -> Result<UnfailOutcome, StoreError> {
unimplemented!()
}

fn unfail_non_deterministic_error(&self, _: &BlockPtr) -> Result<(), StoreError> {
fn unfail_non_deterministic_error(&self, _: &BlockPtr) -> Result<UnfailOutcome, StoreError> {
unimplemented!()
}

Expand Down
28 changes: 16 additions & 12 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use graph::prelude::{
anyhow, debug, info, lazy_static, o, warn, web3, ApiSchema, AttributeNames, BlockNumber,
BlockPtr, CheapClone, DeploymentHash, DeploymentState, Entity, EntityKey, EntityModification,
EntityQuery, Error, Logger, QueryExecutionError, Schema, StopwatchMetrics, StoreError,
StoreEvent, Value, BLOCK_NUMBER_MAX,
StoreEvent, UnfailOutcome, Value, BLOCK_NUMBER_MAX,
};
use graph_graphql::prelude::api_schema;
use web3::types::Address;
Expand Down Expand Up @@ -1316,7 +1316,7 @@ impl DeploymentStore {
site: Arc<Site>,
current_ptr: &BlockPtr,
parent_ptr: &BlockPtr,
) -> Result<(), StoreError> {
) -> Result<UnfailOutcome, StoreError> {
let conn = &self.get_conn()?;
let deployment_id = &site.deployment;

Expand All @@ -1325,12 +1325,12 @@ impl DeploymentStore {
let subgraph_error = match detail::fatal_error(conn, deployment_id)? {
Some(fatal_error) => fatal_error,
// If the subgraph is not failed then there is nothing to do.
None => return Ok(()),
None => return Ok(UnfailOutcome::Noop),
};

// Confidence check
if !subgraph_error.deterministic {
return Ok(()); // Nothing to do
return Ok(UnfailOutcome::Noop); // Nothing to do
}

use deployment::SubgraphHealth::*;
Expand Down Expand Up @@ -1366,6 +1366,8 @@ impl DeploymentStore {

// Unfail the deployment.
deployment::update_deployment_status(conn, deployment_id, prev_health, None)?;

Ok(UnfailOutcome::Unfailed)
}
// Found error, but not for deployment head, we don't need to
// revert the block operations.
Expand All @@ -1379,6 +1381,8 @@ impl DeploymentStore {
"error_block_hash" => format!("0x{}", hex::encode(&hash_bytes)),
"deployment_head" => format!("{}", current_ptr.hash),
);

Ok(UnfailOutcome::Noop)
}
// Same as branch above, if you find this warning in the logs,
// something is wrong, this shouldn't happen.
Expand All @@ -1387,10 +1391,10 @@ impl DeploymentStore {
"subgraph_id" => deployment_id,
"error_id" => &subgraph_error.id,
);
}
};

Ok(())
Ok(UnfailOutcome::Noop)
}
}
})
}

Expand All @@ -1408,7 +1412,7 @@ impl DeploymentStore {
&self,
site: Arc<Site>,
current_ptr: &BlockPtr,
) -> Result<(), StoreError> {
) -> Result<UnfailOutcome, StoreError> {
let conn = &self.get_conn()?;
let deployment_id = &site.deployment;

Expand All @@ -1417,12 +1421,12 @@ impl DeploymentStore {
let subgraph_error = match detail::fatal_error(conn, deployment_id)? {
Some(fatal_error) => fatal_error,
// If the subgraph is not failed then there is nothing to do.
None => return Ok(()),
None => return Ok(UnfailOutcome::Noop),
};

// Confidence check
if subgraph_error.deterministic {
return Ok(()); // Nothing to do
return Ok(UnfailOutcome::Noop); // Nothing to do
}

match subgraph_error.block_range {
Expand All @@ -1449,7 +1453,7 @@ impl DeploymentStore {
// Delete the fatal error.
deployment::delete_error(conn, &subgraph_error.id)?;

Ok(())
Ok(UnfailOutcome::Unfailed)
}
// NOOP, the deployment head is still before where non-deterministic error happened.
block_range => {
Expand All @@ -1463,7 +1467,7 @@ impl DeploymentStore {
"error_block_hash" => subgraph_error.block_hash.as_ref().map(|hash| format!("0x{}", hex::encode(hash))),
);

Ok(())
Ok(UnfailOutcome::Noop)
}
}
})
Expand Down
18 changes: 13 additions & 5 deletions store/postgres/src/writable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use graph::{
prelude::StoreEvent,
prelude::{
lazy_static, BlockPtr, DeploymentHash, EntityKey, EntityModification, Error, Logger,
StopwatchMetrics, StoreError,
StopwatchMetrics, StoreError, UnfailOutcome,
},
slog::{error, warn},
util::backoff::ExponentialBackoff,
Expand Down Expand Up @@ -188,14 +188,17 @@ impl WritableStore {
&self,
current_ptr: &BlockPtr,
parent_ptr: &BlockPtr,
) -> Result<(), StoreError> {
) -> Result<UnfailOutcome, StoreError> {
self.retry("unfail_deterministic_error", || {
self.writable
.unfail_deterministic_error(self.site.clone(), current_ptr, parent_ptr)
})
}

fn unfail_non_deterministic_error(&self, current_ptr: &BlockPtr) -> Result<(), StoreError> {
fn unfail_non_deterministic_error(
&self,
current_ptr: &BlockPtr,
) -> Result<UnfailOutcome, StoreError> {
self.retry("unfail_non_deterministic_error", || {
self.writable
.unfail_non_deterministic_error(self.site.clone(), current_ptr)
Expand Down Expand Up @@ -402,12 +405,17 @@ impl WritableStoreTrait for WritableAgent {
&self,
current_ptr: &BlockPtr,
parent_ptr: &BlockPtr,
) -> Result<(), StoreError> {
) -> Result<UnfailOutcome, StoreError> {
*self.block_ptr.lock().unwrap() = Some(parent_ptr.clone());

self.store
.unfail_deterministic_error(current_ptr, parent_ptr)
}

fn unfail_non_deterministic_error(&self, current_ptr: &BlockPtr) -> Result<(), StoreError> {
fn unfail_non_deterministic_error(
&self,
current_ptr: &BlockPtr,
) -> Result<UnfailOutcome, StoreError> {
self.store.unfail_non_deterministic_error(current_ptr)
}

Expand Down
Loading

0 comments on commit e1543cb

Please sign in to comment.