Skip to content

Commit

Permalink
all: Change how WritableStore tracks the block pointer and cursor
Browse files Browse the repository at this point in the history
  • Loading branch information
lutter committed Jan 12, 2022
1 parent a80d961 commit 025d77c
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 21 deletions.
2 changes: 1 addition & 1 deletion chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ impl Blockchain for Chain {
.new(o!("component" => "FirehoseBlockStream"));

let firehose_mapper = Arc::new(FirehoseMapper {});
let firehose_cursor = writable.block_cursor()?;
let firehose_cursor = writable.block_cursor();

Ok(Box::new(FirehoseBlockStream::new(
firehose_endpoint,
Expand Down
2 changes: 1 addition & 1 deletion chain/near/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl Blockchain for Chain {
.new(o!("component" => "FirehoseBlockStream"));

let firehose_mapper = Arc::new(FirehoseMapper {});
let firehose_cursor = store.block_cursor()?;
let firehose_cursor = store.block_cursor();

Ok(Box::new(FirehoseBlockStream::new(
firehose_endpoint,
Expand Down
10 changes: 3 additions & 7 deletions core/src/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,11 +380,7 @@ where

// Initialize deployment_head with current deployment head. Any sort of trouble in
// getting the deployment head ptr leads to initializing with 0
let deployment_head = store
.block_ptr()
.ok()
.and_then(|ptr| ptr.map(|ptr| ptr.number))
.unwrap_or(0) as f64;
let deployment_head = store.block_ptr().map(|ptr| ptr.number).unwrap_or(0) as f64;
block_stream_metrics.deployment_head.set(deployment_head);

let host_builder = graph_runtime_wasm::RuntimeHostBuilder::new(
Expand Down Expand Up @@ -484,7 +480,7 @@ where
ctx.inputs.unified_api_version.clone(),
),
false => {
let start_block = ctx.inputs.store.block_ptr()?;
let start_block = ctx.inputs.store.block_ptr();

chain.new_polling_block_stream(
ctx.inputs.deployment.clone(),
Expand Down Expand Up @@ -626,7 +622,7 @@ where
if should_try_unfail_deterministic {
should_try_unfail_deterministic = false;

if let Some(current_ptr) = ctx.inputs.store.block_ptr()? {
if let Some(current_ptr) = ctx.inputs.store.block_ptr() {
if let Some(parent_ptr) =
ctx.inputs.triggers_adapter.parent_ptr(&current_ptr).await?
{
Expand Down
4 changes: 2 additions & 2 deletions graph/src/components/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1024,11 +1024,11 @@ pub trait SubgraphStore: Send + Sync + 'static {
#[async_trait]
pub trait WritableStore: Send + Sync + 'static {
/// Get a pointer to the most recently processed block in the subgraph.
fn block_ptr(&self) -> Result<Option<BlockPtr>, StoreError>;
fn block_ptr(&self) -> Option<BlockPtr>;

/// Returns the Firehose `cursor` this deployment is currently at in the block stream of events. This
/// is used when re-connecting a Firehose stream to start back exactly where we left off.
fn block_cursor(&self) -> Result<Option<String>, StoreError>;
fn block_cursor(&self) -> Option<String>;

/// Start an existing subgraph deployment.
fn start_subgraph_deployment(&self, logger: &Logger) -> Result<(), StoreError>;
Expand Down
4 changes: 2 additions & 2 deletions graph/tests/entity_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ impl MockStore {
// The store trait must be implemented manually because mockall does not support async_trait, nor borrowing from arguments.
#[async_trait]
impl WritableStore for MockStore {
fn block_ptr(&self) -> Result<Option<BlockPtr>, StoreError> {
fn block_ptr(&self) -> Option<BlockPtr> {
unimplemented!()
}

fn block_cursor(&self) -> Result<Option<String>, StoreError> {
fn block_cursor(&self) -> Option<String> {
unimplemented!()
}

Expand Down
32 changes: 24 additions & 8 deletions store/postgres/src/writable.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::sync::Mutex;
use std::time::Duration;
use std::{collections::BTreeMap, sync::Arc};

Expand Down Expand Up @@ -325,6 +326,8 @@ fn same_subgraph(mods: &Vec<EntityModification>, id: &DeploymentHash) -> bool {
#[allow(dead_code)]
pub struct WritableAgent {
store: Arc<WritableStore>,
block_ptr: Mutex<Option<BlockPtr>>,
block_cursor: Mutex<Option<String>>,
}

impl WritableAgent {
Expand All @@ -333,21 +336,26 @@ impl WritableAgent {
logger: Logger,
site: Arc<Site>,
) -> Result<Self, StoreError> {
let store = Arc::new(WritableStore::new(subgraph_store, logger, site)?);
let block_ptr = Mutex::new(store.block_ptr()?);
let block_cursor = Mutex::new(store.block_cursor()?);
Ok(Self {
store: Arc::new(WritableStore::new(subgraph_store, logger, site)?),
store,
block_ptr,
block_cursor,
})
}
}

#[allow(unused_variables)]
#[async_trait::async_trait]
impl WritableStoreTrait for WritableAgent {
fn block_ptr(&self) -> Result<Option<BlockPtr>, StoreError> {
self.store.block_ptr()
fn block_ptr(&self) -> Option<BlockPtr> {
self.block_ptr.lock().unwrap().clone()
}

fn block_cursor(&self) -> Result<Option<String>, StoreError> {
self.store.block_cursor()
fn block_cursor(&self) -> Option<String> {
self.block_cursor.lock().unwrap().clone()
}

fn start_subgraph_deployment(&self, logger: &Logger) -> Result<(), StoreError> {
Expand All @@ -356,6 +364,9 @@ impl WritableStoreTrait for WritableAgent {
}

fn revert_block_operations(&self, block_ptr_to: BlockPtr) -> Result<(), StoreError> {
*self.block_ptr.lock().unwrap() = Some(block_ptr_to.clone());
// FIXME: What about the firehose cursor? Why doesn't that get updated?

// TODO: If we haven't written the block yet, revert in memory. If
// we have, revert in the database
self.store.revert_block_operations(block_ptr_to)
Expand Down Expand Up @@ -396,13 +407,18 @@ impl WritableStoreTrait for WritableAgent {
deterministic_errors: Vec<SubgraphError>,
) -> Result<(), StoreError> {
self.store.transact_block_operations(
block_ptr_to,
firehose_cursor,
block_ptr_to.clone(),
firehose_cursor.clone(),
mods,
stopwatch,
data_sources,
deterministic_errors,
)
)?;

*self.block_ptr.lock().unwrap() = Some(block_ptr_to);
*self.block_cursor.lock().unwrap() = firehose_cursor;

Ok(())
}

fn get_many(
Expand Down

0 comments on commit 025d77c

Please sign in to comment.