Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve substreams error handling #5160

Merged
merged 1 commit into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion chain/near/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl BlockStreamBuilder<Chain> for NearStreamBuilder {
deployment.hash,
chain.chain_client(),
subgraph_current_block,
block_cursor.as_ref().clone(),
block_cursor.clone(),
mapper,
package.modules.clone(),
NEAR_FILTER_MODULE_NAME.to_string(),
Expand Down
4 changes: 2 additions & 2 deletions chain/substreams/examples/substreams.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::{format_err, Context, Error};
use graph::blockchain::block_stream::BlockStreamEvent;
use graph::blockchain::block_stream::{BlockStreamEvent, FirehoseCursor};
use graph::blockchain::client::ChainClient;
use graph::blockchain::substreams_block_stream::SubstreamsBlockStream;
use graph::endpoint::EndpointMetrics;
Expand Down Expand Up @@ -67,7 +67,7 @@ async fn main() -> Result<(), Error> {
DeploymentHash::new("substreams".to_string()).unwrap(),
client,
None,
None,
FirehoseCursor::None,
Arc::new(Mapper {
schema: None,
skip_empty_blocks: false,
Expand Down
34 changes: 27 additions & 7 deletions chain/substreams/src/block_ingestor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{sync::Arc, time::Duration};

use crate::mapper::Mapper;
use anyhow::{Context, Error};
use graph::blockchain::block_stream::{BlockStreamError, FirehoseCursor};
use graph::blockchain::{
client::ChainClient, substreams_block_stream::SubstreamsBlockStream, BlockIngestor,
};
Expand Down Expand Up @@ -65,11 +66,12 @@ impl SubstreamsBlockIngestor {
/// Consumes the incoming stream of blocks infinitely until it hits an error. In which case
/// the error is logged right away and the latest available cursor is returned
/// upstream for future consumption.
/// If an error is returned it indicates a fatal/deterministic error which should not be retried.
async fn process_blocks(
&self,
cursor: String,
cursor: FirehoseCursor,
mut stream: SubstreamsBlockStream<super::Chain>,
) -> String {
) -> Result<FirehoseCursor, BlockStreamError> {
let mut latest_cursor = cursor;

while let Some(message) = stream.next().await {
Expand All @@ -90,6 +92,9 @@ impl SubstreamsBlockIngestor {
trace!(self.logger, "Received undo block to ingest, skipping");
continue;
}
Err(e) if e.is_deterministic() => {
return Err(e);
}
Err(e) => {
info!(
self.logger,
Expand All @@ -105,14 +110,15 @@ impl SubstreamsBlockIngestor {
break;
}

latest_cursor = cursor.to_string()
latest_cursor = cursor
}

error!(
self.logger,
"Stream blocks complete unexpectedly, expecting stream to always stream blocks"
);
latest_cursor

Ok(latest_cursor)
}

async fn process_new_block(
Expand All @@ -139,7 +145,7 @@ impl BlockIngestor for SubstreamsBlockIngestor {
schema: None,
skip_empty_blocks: false,
});
let mut latest_cursor = self.fetch_head_cursor().await;
let mut latest_cursor = FirehoseCursor::from(self.fetch_head_cursor().await);
let mut backoff =
ExponentialBackoff::new(Duration::from_millis(250), Duration::from_secs(30));
let package = Package::decode(SUBSTREAMS_HEAD_TRACKER_BYTES.to_vec().as_ref()).unwrap();
Expand All @@ -149,7 +155,7 @@ impl BlockIngestor for SubstreamsBlockIngestor {
DeploymentHash::default(),
self.client.cheap_clone(),
None,
Some(latest_cursor.clone()),
latest_cursor.clone(),
mapper.cheap_clone(),
package.modules.clone(),
"map_blocks".to_string(),
Expand All @@ -160,7 +166,21 @@ impl BlockIngestor for SubstreamsBlockIngestor {
);

// Consume the stream of blocks until an error is hit
latest_cursor = self.process_blocks(latest_cursor, stream).await;
// If the error is retryable it will print the error and return the cursor
// therefore if we get an error here it has to be a fatal error.
// This is a bit brittle and should probably be improved at some point.
let res = self.process_blocks(latest_cursor, stream).await;
match res {
Ok(cursor) => latest_cursor = cursor,
Err(BlockStreamError::Fatal(e)) => {
error!(
self.logger,
"fatal error while ingesting substream blocks: {}", e
);
return;
}
_ => unreachable!("Nobody should ever see this error message, something is wrong"),
}

// If we reach this point, we must wait a bit before retrying
backoff.sleep_async().await;
Expand Down
4 changes: 2 additions & 2 deletions chain/substreams/src/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl BlockStreamBuilderTrait<Chain> for BlockStreamBuilder {
deployment.hash,
chain.chain_client(),
subgraph_current_block,
block_cursor.as_ref().clone(),
block_cursor.clone(),
Arc::new(WasmBlockMapper {
handler: handler.clone(),
}),
Expand All @@ -69,7 +69,7 @@ impl BlockStreamBuilderTrait<Chain> for BlockStreamBuilder {
deployment.hash,
chain.chain_client(),
subgraph_current_block,
block_cursor.as_ref().clone(),
block_cursor.clone(),
Arc::new(Mapper {
schema: Some(schema),
skip_empty_blocks: true,
Expand Down
39 changes: 34 additions & 5 deletions core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use crate::subgraph::inputs::IndexingInputs;
use crate::subgraph::state::IndexingState;
use crate::subgraph::stream::new_block_stream;
use atomic_refcell::AtomicRefCell;
use graph::blockchain::block_stream::{BlockStreamEvent, BlockWithTriggers, FirehoseCursor};
use graph::blockchain::block_stream::{
BlockStreamError, BlockStreamEvent, BlockWithTriggers, FirehoseCursor,
};
use graph::blockchain::{Block, BlockTime, Blockchain, DataSource as _, TriggerFilter as _};
use graph::components::store::{EmptyStore, GetScope, ReadStore, StoredDynamicDataSource};
use graph::components::{
Expand Down Expand Up @@ -206,7 +208,7 @@ where
&self.metrics.subgraph,
)
.await?
.map_err(CancelableError::Error)
.map_err(CancelableError::from)
.cancelable(&block_stream_canceler, || Err(CancelableError::Cancel));

// Keep the stream's cancel guard around to be able to shut it down when the subgraph
Expand Down Expand Up @@ -910,7 +912,7 @@ where
{
async fn handle_stream_event(
&mut self,
event: Option<Result<BlockStreamEvent<C>, CancelableError<Error>>>,
event: Option<Result<BlockStreamEvent<C>, CancelableError<BlockStreamError>>>,
cancel_handle: &CancelHandle,
) -> Result<Action, Error> {
let action = match event {
Expand Down Expand Up @@ -1087,7 +1089,7 @@ trait StreamEventHandler<C: Blockchain> {
) -> Result<Action, Error>;
async fn handle_err(
&mut self,
err: CancelableError<Error>,
err: CancelableError<BlockStreamError>,
cancel_handle: &CancelHandle,
) -> Result<Action, Error>;
fn needs_restart(&self, revert_to_ptr: BlockPtr, subgraph_ptr: BlockPtr) -> bool;
Expand Down Expand Up @@ -1399,14 +1401,41 @@ where

async fn handle_err(
&mut self,
err: CancelableError<Error>,
err: CancelableError<BlockStreamError>,
cancel_handle: &CancelHandle,
) -> Result<Action, Error> {
if cancel_handle.is_canceled() {
debug!(&self.logger, "Subgraph block stream shut down cleanly");
return Ok(Action::Stop);
}

let err = match err {
CancelableError::Error(BlockStreamError::Fatal(msg)) => {
error!(
&self.logger,
"The block stream encountered a substreams fatal error and will not retry: {}",
msg
);

// If substreams returns a deterministic error we may not necessarily have a specific block
// but we should not retry since it will keep failing.
self.inputs
.store
.fail_subgraph(SubgraphError {
subgraph_id: self.inputs.deployment.hash.clone(),
message: msg,
block_ptr: None,
handler: None,
deterministic: true,
})
.await
.context("Failed to set subgraph status to `failed`")?;

return Ok(Action::Stop);
}
e => e,
};

debug!(
&self.logger,
"Block stream produced a non-fatal error";
Expand Down
33 changes: 24 additions & 9 deletions graph/src/blockchain/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,22 @@ pub const FIREHOSE_BUFFER_STREAM_SIZE: usize = 1;
pub const SUBSTREAMS_BUFFER_STREAM_SIZE: usize = 1;

pub struct BufferedBlockStream<C: Blockchain> {
inner: Pin<Box<dyn Stream<Item = Result<BlockStreamEvent<C>, Error>> + Send>>,
inner: Pin<Box<dyn Stream<Item = Result<BlockStreamEvent<C>, BlockStreamError>> + Send>>,
}

impl<C: Blockchain + 'static> BufferedBlockStream<C> {
pub fn spawn_from_stream(
size_hint: usize,
stream: Box<dyn BlockStream<C>>,
) -> Box<dyn BlockStream<C>> {
let (sender, receiver) = mpsc::channel::<Result<BlockStreamEvent<C>, Error>>(size_hint);
let (sender, receiver) =
mpsc::channel::<Result<BlockStreamEvent<C>, BlockStreamError>>(size_hint);
crate::spawn(async move { BufferedBlockStream::stream_blocks(stream, sender).await });

Box::new(BufferedBlockStream::new(receiver))
}

pub fn new(mut receiver: Receiver<Result<BlockStreamEvent<C>, Error>>) -> Self {
pub fn new(mut receiver: Receiver<Result<BlockStreamEvent<C>, BlockStreamError>>) -> Self {
let inner = stream! {
loop {
let event = match receiver.recv().await {
Expand All @@ -59,7 +60,7 @@ impl<C: Blockchain + 'static> BufferedBlockStream<C> {

pub async fn stream_blocks(
mut stream: Box<dyn BlockStream<C>>,
sender: Sender<Result<BlockStreamEvent<C>, Error>>,
sender: Sender<Result<BlockStreamEvent<C>, BlockStreamError>>,
) -> Result<(), Error> {
while let Some(event) = stream.next().await {
match sender.send(event).await {
Expand All @@ -84,7 +85,7 @@ impl<C: Blockchain> BlockStream<C> for BufferedBlockStream<C> {
}

impl<C: Blockchain> Stream for BufferedBlockStream<C> {
type Item = Result<BlockStreamEvent<C>, Error>;
type Item = Result<BlockStreamEvent<C>, BlockStreamError>;

fn poll_next(
mut self: Pin<&mut Self>,
Expand All @@ -95,7 +96,7 @@ impl<C: Blockchain> Stream for BufferedBlockStream<C> {
}

pub trait BlockStream<C: Blockchain>:
Stream<Item = Result<BlockStreamEvent<C>, Error>> + Unpin + Send
Stream<Item = Result<BlockStreamEvent<C>, BlockStreamError>> + Unpin + Send
{
fn buffer_size_hint(&self) -> usize;
}
Expand Down Expand Up @@ -482,6 +483,20 @@ pub enum SubstreamsError {
UnexpectedStoreDeltaOutput,
}

#[derive(Debug, Error)]
pub enum BlockStreamError {
#[error("block stream error")]
Unknown(#[from] anyhow::Error),
#[error("block stream fatal error")]
Fatal(String),
}

impl BlockStreamError {
pub fn is_deterministic(&self) -> bool {
matches!(self, Self::Fatal(_))
}
}

#[derive(Debug)]
pub enum BlockStreamEvent<C: Blockchain> {
// The payload is the block the subgraph should revert to, so it becomes the new subgraph head.
Expand Down Expand Up @@ -576,7 +591,6 @@ pub trait ChainHeadUpdateListener: Send + Sync + 'static {
mod test {
use std::{collections::HashSet, task::Poll};

use anyhow::Error;
use futures03::{Stream, StreamExt, TryStreamExt};

use crate::{
Expand All @@ -585,7 +599,8 @@ mod test {
};

use super::{
BlockStream, BlockStreamEvent, BlockWithTriggers, BufferedBlockStream, FirehoseCursor,
BlockStream, BlockStreamError, BlockStreamEvent, BlockWithTriggers, BufferedBlockStream,
FirehoseCursor,
};

#[derive(Debug)]
Expand All @@ -600,7 +615,7 @@ mod test {
}

impl Stream for TestStream {
type Item = Result<BlockStreamEvent<MockBlockchain>, Error>;
type Item = Result<BlockStreamEvent<MockBlockchain>, BlockStreamError>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
Expand Down
8 changes: 4 additions & 4 deletions graph/src/blockchain/firehose_block_stream.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::block_stream::{
BlockStream, BlockStreamEvent, FirehoseMapper, FIREHOSE_BUFFER_STREAM_SIZE,
BlockStream, BlockStreamError, BlockStreamEvent, FirehoseMapper, FIREHOSE_BUFFER_STREAM_SIZE,
};
use super::client::ChainClient;
use super::Blockchain;
Expand Down Expand Up @@ -100,7 +100,7 @@ impl FirehoseBlockStreamMetrics {
}

pub struct FirehoseBlockStream<C: Blockchain> {
stream: Pin<Box<dyn Stream<Item = Result<BlockStreamEvent<C>, Error>> + Send>>,
stream: Pin<Box<dyn Stream<Item = Result<BlockStreamEvent<C>, BlockStreamError>> + Send>>,
}

impl<C> FirehoseBlockStream<C>
Expand Down Expand Up @@ -156,7 +156,7 @@ fn stream_blocks<C: Blockchain, F: FirehoseMapper<C>>(
subgraph_current_block: Option<BlockPtr>,
logger: Logger,
metrics: FirehoseBlockStreamMetrics,
) -> impl Stream<Item = Result<BlockStreamEvent<C>, Error>> {
) -> impl Stream<Item = Result<BlockStreamEvent<C>, BlockStreamError>> {
let mut subgraph_current_block = subgraph_current_block;
let mut start_block_num = subgraph_current_block
.as_ref()
Expand Down Expand Up @@ -406,7 +406,7 @@ async fn process_firehose_response<C: Blockchain, F: FirehoseMapper<C>>(
}

impl<C: Blockchain> Stream for FirehoseBlockStream<C> {
type Item = Result<BlockStreamEvent<C>, Error>;
type Item = Result<BlockStreamEvent<C>, BlockStreamError>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.stream.poll_next_unpin(cx)
Expand Down
12 changes: 6 additions & 6 deletions graph/src/blockchain/polling_block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use std::task::{Context, Poll};
use std::time::Duration;

use super::block_stream::{
BlockStream, BlockStreamEvent, BlockWithTriggers, ChainHeadUpdateStream, FirehoseCursor,
TriggersAdapter, BUFFERED_BLOCK_STREAM_SIZE,
BlockStream, BlockStreamError, BlockStreamEvent, BlockWithTriggers, ChainHeadUpdateStream,
FirehoseCursor, TriggersAdapter, BUFFERED_BLOCK_STREAM_SIZE,
};
use super::{Block, BlockPtr, Blockchain};

Expand Down Expand Up @@ -470,7 +470,7 @@ impl<C: Blockchain> BlockStream<C> for PollingBlockStream<C> {
}

impl<C: Blockchain> Stream for PollingBlockStream<C> {
type Item = Result<BlockStreamEvent<C>, Error>;
type Item = Result<BlockStreamEvent<C>, BlockStreamError>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let result = loop {
Expand Down Expand Up @@ -599,8 +599,8 @@ impl<C: Blockchain> Stream for PollingBlockStream<C> {
// Chain head update stream ended
Poll::Ready(None) => {
// Should not happen
return Poll::Ready(Some(Err(anyhow::anyhow!(
"chain head update stream ended unexpectedly"
return Poll::Ready(Some(Err(BlockStreamError::from(
anyhow::anyhow!("chain head update stream ended unexpectedly"),
))));
}

Expand All @@ -610,6 +610,6 @@ impl<C: Blockchain> Stream for PollingBlockStream<C> {
}
};

result
result.map_err(BlockStreamError::from)
}
}
Loading
Loading