diff --git a/core/src/subgraph/instance_manager.rs b/core/src/subgraph/instance_manager.rs index b193befe8b6..965474b6bd6 100644 --- a/core/src/subgraph/instance_manager.rs +++ b/core/src/subgraph/instance_manager.rs @@ -8,13 +8,13 @@ use graph::data::store::EntityVersion; use graph::data::subgraph::{UnifiedMappingApiVersion, MAX_SPEC_VERSION}; use graph::prelude::TryStreamExt; use graph::prelude::{SubgraphInstanceManager as SubgraphInstanceManagerTrait, *}; -use graph::util::lfu_cache::LfuCache; +use graph::util::{backoff::ExponentialBackoff, lfu_cache::LfuCache}; use graph::{blockchain::block_stream::BlockStreamMetrics, components::store::WritableStore}; use graph::{blockchain::block_stream::BlockWithTriggers, data::subgraph::SubgraphFeature}; use graph::{ blockchain::NodeCapabilities, blockchain::TriggersAdapter, - data::subgraph::schema::{SubgraphError, POI_OBJECT}, + data::subgraph::schema::{SubgraphError, SubgraphHealth, POI_OBJECT}, }; use graph::{ blockchain::{block_stream::BlockStreamEvent, Blockchain, TriggerFilter as _}, @@ -27,9 +27,11 @@ use graph::{ use lazy_static::lazy_static; use std::collections::{BTreeSet, HashMap}; use std::sync::{Arc, RwLock}; -use std::time::Instant; +use std::time::{Duration, Instant}; use tokio::task; +const MINUTE: Duration = Duration::from_secs(60); + lazy_static! { /// Size limit of the entity LFU cache, in bytes. // Multiplied by 1000 because the env var is in KB. @@ -43,6 +45,14 @@ lazy_static! { // Used for testing Graph Node itself. pub static ref DISABLE_FAIL_FAST: bool = std::env::var("GRAPH_DISABLE_FAIL_FAST").is_ok(); + + /// Ceiling for the backoff retry of non-deterministic errors, in seconds. + pub static ref SUBGRAPH_ERROR_RETRY_CEIL_SECS: Duration = + std::env::var("GRAPH_SUBGRAPH_ERROR_RETRY_CEIL_SECS") + .unwrap_or((MINUTE * 30).as_secs().to_string()) + .parse::() + .map(Duration::from_secs) + .expect("invalid GRAPH_SUBGRAPH_ERROR_RETRY_CEIL_SECS"); } type SharedInstanceKeepAliveMap = Arc>>; @@ -462,6 +472,10 @@ where let mut should_try_unfail_deterministic = true; let mut should_try_unfail_non_deterministic = true; + // Exponential backoff that starts with two minutes and keeps + // increasing its timeout exponentially until it reaches the ceiling. + let mut backoff = ExponentialBackoff::new(MINUTE * 2, *SUBGRAPH_ERROR_RETRY_CEIL_SECS); + loop { debug!(logger, "Starting or restarting subgraph"); @@ -634,18 +648,29 @@ where match res { Ok(needs_restart) => { - // Runs only once + // Keep trying to unfail subgraph for everytime it advances block(s) until it's + // health is not Failed anymore. if should_try_unfail_non_deterministic { - should_try_unfail_non_deterministic = false; - // If the deployment head advanced, we can unfail // the non-deterministic error (if there's any). ctx.inputs .store .unfail_non_deterministic_error(&block_ptr)?; - } - deployment_failed.set(0.0); + match ctx.inputs.store.health(&ctx.inputs.deployment.hash).await? { + SubgraphHealth::Failed => { + // If the unfail call didn't change the subgraph health, we keep + // `should_try_unfail_non_deterministic` as `true` until it's + // actually unfailed. + } + SubgraphHealth::Healthy | SubgraphHealth::Unhealthy => { + // Stop trying to unfail. + should_try_unfail_non_deterministic = false; + deployment_failed.set(0.0); + backoff.reset(); + } + }; + } // Notify the BlockStream implementation that a block was succesfully consumed // and that its internal cursoring mechanism can be saved to memory. @@ -674,24 +699,74 @@ where // Handle unexpected stream errors by marking the subgraph as failed. Err(e) => { + deployment_failed.set(1.0); + let message = format!("{:#}", e).replace("\n", "\t"); let err = anyhow!("{}, code: {}", message, LogCode::SubgraphSyncingFailure); + let deterministic = e.is_deterministic(); let error = SubgraphError { subgraph_id: id_for_err.clone(), message, block_ptr: Some(block_ptr), handler: None, - deterministic: e.is_deterministic(), + deterministic, }; - deployment_failed.set(1.0); - store_for_err - .fail_subgraph(error) - .await - .context("Failed to set subgraph status to `failed`")?; + match deterministic { + true => { + // Fail subgraph: + // - Change status/health. + // - Save the error to the database. + store_for_err + .fail_subgraph(error) + .await + .context("Failed to set subgraph status to `failed`")?; + + return Err(err); + } + false => { + // Shouldn't fail subgraph if it's already failed for non-deterministic + // reasons. + // + // If we don't do this check we would keep adding the same error to the + // database. + let should_fail_subgraph = + ctx.inputs.store.health(&ctx.inputs.deployment.hash).await? + != SubgraphHealth::Failed; + + if should_fail_subgraph { + // Fail subgraph: + // - Change status/health. + // - Save the error to the database. + store_for_err + .fail_subgraph(error) + .await + .context("Failed to set subgraph status to `failed`")?; + } + + // Retry logic below: + + // Cancel the stream for real. + ctx.state + .instances + .write() + .unwrap() + .remove(&ctx.inputs.deployment.id); - return Err(err); + error!(logger, "Subgraph failed for non-deterministic error: {}", e; + "attempt" => backoff.attempt, + "retry_delay_s" => backoff.delay().as_secs()); + + // Sleep before restarting. + backoff.sleep_async().await; + + should_try_unfail_non_deterministic = true; + + // And restart the subgraph. + break; + } + } } } } diff --git a/graph/src/components/store.rs b/graph/src/components/store.rs index 0b292bc1a55..e2d6bba1a3b 100644 --- a/graph/src/components/store.rs +++ b/graph/src/components/store.rs @@ -1074,6 +1074,8 @@ pub trait WritableStore: Send + Sync + 'static { /// Report the name of the shard in which the subgraph is stored. This /// should only be used for reporting and monitoring fn shard(&self) -> &str; + + async fn health(&self, id: &DeploymentHash) -> Result; } #[async_trait] @@ -1264,6 +1266,10 @@ impl WritableStore for MockStore { fn shard(&self) -> &str { unimplemented!() } + + async fn health(&self, _: &DeploymentHash) -> Result { + unimplemented!() + } } pub trait BlockStore: Send + Sync + 'static { diff --git a/graph/src/data/subgraph/schema.rs b/graph/src/data/subgraph/schema.rs index 7185fd99672..47bf8fbfcfd 100644 --- a/graph/src/data/subgraph/schema.rs +++ b/graph/src/data/subgraph/schema.rs @@ -44,9 +44,8 @@ impl SubgraphHealth { pub fn is_failed(&self) -> bool { match self { - SubgraphHealth::Healthy => false, - SubgraphHealth::Unhealthy => false, SubgraphHealth::Failed => true, + SubgraphHealth::Healthy | SubgraphHealth::Unhealthy => false, } } } diff --git a/graph/src/util/backoff.rs b/graph/src/util/backoff.rs index 1ac3d66254d..1a5d3b29f78 100644 --- a/graph/src/util/backoff.rs +++ b/graph/src/util/backoff.rs @@ -45,4 +45,8 @@ impl ExponentialBackoff { self.attempt += 1; delay } + + pub fn reset(&mut self) { + self.attempt = 0; + } } diff --git a/store/postgres/src/deployment.rs b/store/postgres/src/deployment.rs index 640fb1768f3..f6a2bfb6fd3 100644 --- a/store/postgres/src/deployment.rs +++ b/store/postgres/src/deployment.rs @@ -36,10 +36,9 @@ pub enum SubgraphHealth { impl SubgraphHealth { fn is_failed(&self) -> bool { - match self { - Self::Failed => true, - Self::Healthy | Self::Unhealthy => false, - } + use graph::data::subgraph::schema::SubgraphHealth as H; + + H::from(*self).is_failed() } } @@ -627,6 +626,19 @@ fn check_health( .map_err(|e| e.into()) } +pub(crate) fn health( + conn: &PgConnection, + id: &DeploymentHash, +) -> Result { + use subgraph_deployment as d; + + d::table + .filter(d::deployment.eq(id.as_str())) + .select(d::health) + .get_result(conn) + .map_err(|e| e.into()) +} + /// Reverts the errors and updates the subgraph health if necessary. pub(crate) fn revert_subgraph_errors( conn: &PgConnection, diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index 9b75dd69e2a..1d6535d01c8 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -1321,48 +1321,43 @@ impl DeploymentStore { // Deployment head (current_ptr) advanced more than the error. // That means it's healthy, and the non-deterministic error got // solved (didn't happen on another try). - // - // This should be the scenario where the unfail happens, however - // for now we unfail in all cases that non-deterministic errors - // were found and the deployment head advanced. (Bound::Included(error_block_number), _) if current_ptr.number >= error_block_number => { + info!( + self.logger, + "Unfailing the deployment status"; + "subgraph_id" => deployment_id, + ); + + // Unfail the deployment. + deployment::update_deployment_status( + conn, + deployment_id, + deployment::SubgraphHealth::Healthy, + None, + )?; + + // Delete the fatal error. + deployment::delete_error(conn, &subgraph_error.id)?; + + Ok(()) } - // The deployment head is still before where non-deterministic error happened. - // - // Technically we shouldn't unfail the subgraph and delete the error - // until it's head actually passed the error block range. But for - // now we'll only log this and keep the old behavior. + // NOOP, the deployment head is still before where non-deterministic error happened. block_range => { info!( self.logger, - "Subgraph error is still ahead of deployment head"; + "Subgraph error is still ahead of deployment head, nothing to unfail"; "subgraph_id" => deployment_id, "block_number" => format!("{}", current_ptr.number), "block_hash" => format!("{}", current_ptr.hash), "error_block_range" => format!("{:?}", block_range), "error_block_hash" => subgraph_error.block_hash.as_ref().map(|hash| format!("0x{}", hex::encode(hash))), ); - } - }; - info!( - self.logger, - "Unfailing the deployment status"; - "subgraph_id" => deployment_id, - ); - - // Unfail the deployment. - deployment::update_deployment_status( - conn, - deployment_id, - deployment::SubgraphHealth::Healthy, - None, - )?; - - // Delete the fatal error. - deployment::delete_error(conn, &subgraph_error.id) + Ok(()) + } + } }) } @@ -1379,4 +1374,13 @@ impl DeploymentStore { "shard" => self.pool.shard.as_str()) }); } + + pub(crate) async fn health( + &self, + id: &DeploymentHash, + ) -> Result { + let id = id.clone(); + self.with_conn(move |conn, _| deployment::health(&conn, &id).map_err(Into::into)) + .await + } } diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index f3ad257d71c..6a671e83dd5 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -16,7 +16,7 @@ use graph::{ }, constraint_violation, data::query::QueryTarget, - data::subgraph::schema::SubgraphError, + data::subgraph::schema::{self, SubgraphError}, data::{ store::{EntityVersion, Vid}, subgraph::status, @@ -1325,6 +1325,13 @@ impl WritableStoreTrait for WritableStore { fn shard(&self) -> &str { self.site.shard.as_str() } + + async fn health(&self, id: &DeploymentHash) -> Result { + self.retry_async("health", || async { + self.writable.health(id).await.map(Into::into) + }) + .await + } } fn same_subgraph(mods: &Vec, id: &DeploymentHash) -> bool { diff --git a/store/postgres/tests/subgraph.rs b/store/postgres/tests/subgraph.rs index a4c83d05d89..1b9133b36a8 100644 --- a/store/postgres/tests/subgraph.rs +++ b/store/postgres/tests/subgraph.rs @@ -966,15 +966,14 @@ fn fail_unfail_non_deterministic_error_noop() { // Fail the subgraph with a non-deterministic error, but with an advanced block. writable.fail_subgraph(error).await.unwrap(); - // Since the block range of the block won't match the deployment head, this would be NOOP, - // but we're skipping the confidence check for now. + // Since the block range of the block won't match the deployment head, this will be NOOP. writable.unfail_non_deterministic_error(&BLOCKS[1]).unwrap(); - // Unfail ocurrs as expected. - assert_eq!(count(), 1); + // State continues the same besides a new error added to the database. + assert_eq!(count(), 2); let vi = get_version_info(&store, NAME); assert_eq!(&*NAME, vi.deployment_id.as_str()); - assert_eq!(false, vi.failed); + assert_eq!(true, vi.failed); assert_eq!(Some(1), vi.latest_ethereum_block_number); test_store::remove_subgraphs();