Skip to content

Commit

Permalink
store: Fix off-by-on bug in BlockTracker, and test queue processing
Browse files Browse the repository at this point in the history
  • Loading branch information
lutter committed Mar 30, 2022
1 parent cc56aa0 commit 703294a
Show file tree
Hide file tree
Showing 3 changed files with 221 additions and 10 deletions.
3 changes: 3 additions & 0 deletions store/postgres/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ pub mod layout_for_tests {
make_dummy_site, Connection, Mirror, Namespace, EVENT_TAP, EVENT_TAP_ENABLED,
};
pub use crate::relational::*;
pub mod writable {
pub use crate::writable::test_support::allow_steps;
}
}

pub use self::block_store::BlockStore;
Expand Down
61 changes: 51 additions & 10 deletions store/postgres/src/writable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,11 +358,20 @@ impl SyncStore {
}
}

/// Track block numbers we see in a few methods that traverse the queue
/// Track block numbers we see in a few methods that traverse the queue to
/// help determine which changes in the queue will actually be visible in
/// the database once the whole queue has been processed and the block
/// number at which queries should run so that they only consider data that
/// is not affected by any requests currently queued.
///
/// The tracker relies on `update` being called in the order newest request
/// in the queue to oldest request so that reverts are seen before the
/// writes that they revert.
struct BlockTracker {
/// The smallest block number for which we saw a revert
/// The smallest block number that has been reverted to
revert: BlockNumber,
/// The smallest block number for which the queue has an entry
/// The largest block number that is not affected by entries in the
/// queue
block: BlockNumber,
}

Expand All @@ -377,9 +386,11 @@ impl BlockTracker {
fn update(&mut self, req: &Request) {
match req {
Request::Write { block_ptr, .. } => {
self.block = self.block.min(block_ptr.number);
self.block = self.block.min(block_ptr.number - 1);
}
Request::Revert { block_ptr, .. } => {
// `block_ptr` is the block pointer we are reverting _to_,
// and is not affected by the revert
self.revert = self.revert.min(block_ptr.number);
self.block = self.block.min(block_ptr.number);
}
Expand All @@ -390,17 +401,13 @@ impl BlockTracker {
/// of any writes that might have happened concurrently but have already
/// been accounted for by inspecting the in-memory queue
fn query_block(&self) -> BlockNumber {
if self.block == BLOCK_NUMBER_MAX {
BLOCK_NUMBER_MAX
} else {
self.block - 1
}
self.block
}

/// Return `true` if a write at this block will be visible, i.e., not
/// reverted by a previous queue entry
fn visible(&self, block_ptr: &BlockPtr) -> bool {
self.revert > block_ptr.number
self.revert >= block_ptr.number
}
}

Expand Down Expand Up @@ -477,6 +484,37 @@ struct Queue {
stopwatch: StopwatchMetrics,
}

/// Support for controlling the background writer (pause/resume) only for
/// use in tests. In release builds, the checks that pause the writer are
/// compiled out. Before `allow_steps` is called, the background writer is
/// allowed to process as many requests as it can
#[cfg(debug_assertions)]
pub(crate) mod test_support {
use std::sync::atomic::{AtomicBool, Ordering};

use graph::{prelude::lazy_static, util::bounded_queue::BoundedQueue};

lazy_static! {
static ref DO_STEP: AtomicBool = AtomicBool::new(false);
static ref ALLOWED_STEPS: BoundedQueue<()> = BoundedQueue::with_capacity(1_000);
}

pub(super) async fn take_step() {
if DO_STEP.load(Ordering::SeqCst) {
ALLOWED_STEPS.pop().await
}
}

/// Allow the writer to process `steps` requests. After calling this,
/// the writer will only process the number of requests it is allowed to
pub async fn allow_steps(steps: usize) {
for _ in 0..steps {
ALLOWED_STEPS.push(()).await
}
DO_STEP.store(true, Ordering::SeqCst);
}
}

impl Queue {
/// Create a new queue and spawn a task that processes write requests
fn start(
Expand All @@ -487,6 +525,9 @@ impl Queue {
) -> Arc<Self> {
async fn start_writer(queue: Arc<Queue>, logger: Logger) {
loop {
#[cfg(debug_assertions)]
test_support::take_step().await;

// We peek at the front of the queue, rather than pop it
// right away, so that query methods like `get` have access
// to the data while it is being written. If we popped here,
Expand Down
167 changes: 167 additions & 0 deletions store/postgres/tests/writable.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
use graph::data::subgraph::schema::DeploymentCreate;
use lazy_static::lazy_static;
use std::marker::PhantomData;
use test_store::*;

use graph::components::store::{DeploymentLocator, WritableStore};
use graph::components::store::{EntityKey, EntityType};
use graph::data::subgraph::*;
use graph::prelude::*;
use graph::semver::Version;
use graph_store_postgres::layout_for_tests::writable;
use graph_store_postgres::{Store as DieselStore, SubgraphStore as DieselSubgraphStore};
use web3::types::H256;

const SCHEMA_GQL: &str = "
type Counter @entity {
id: ID!,
count: Int,
}
";

const COUNTER: &str = "Counter";

lazy_static! {
static ref TEST_SUBGRAPH_ID_STRING: String = String::from("writableSubgraph");
static ref TEST_SUBGRAPH_ID: DeploymentHash =
DeploymentHash::new(TEST_SUBGRAPH_ID_STRING.as_str()).unwrap();
static ref TEST_SUBGRAPH_SCHEMA: Schema =
Schema::parse(SCHEMA_GQL, TEST_SUBGRAPH_ID.clone()).expect("Failed to parse user schema");
}

/// Inserts test data into the store.
///
/// Create a new empty subgraph with schema `SCHEMA_GQL`
async fn insert_test_data(store: Arc<DieselSubgraphStore>) -> DeploymentLocator {
let manifest = SubgraphManifest::<graph_chain_ethereum::Chain> {
id: TEST_SUBGRAPH_ID.clone(),
spec_version: Version::new(1, 0, 0),
features: Default::default(),
description: None,
repository: None,
schema: TEST_SUBGRAPH_SCHEMA.clone(),
data_sources: vec![],
graft: None,
templates: vec![],
chain: PhantomData,
};

// Create SubgraphDeploymentEntity
let deployment = DeploymentCreate::new(&manifest, None);
let name = SubgraphName::new("test/writable").unwrap();
let node_id = NodeId::new("test").unwrap();
let deployment = store
.create_subgraph_deployment(
name,
&TEST_SUBGRAPH_SCHEMA,
deployment,
node_id,
NETWORK_NAME.to_string(),
SubgraphVersionSwitchingMode::Instant,
)
.unwrap();
deployment
}

/// Removes test data from the database behind the store.
fn remove_test_data(store: Arc<DieselSubgraphStore>) {
store
.delete_all_entities_for_test_use_only()
.expect("deleting test entities succeeds");
}

/// Test harness for running database integration tests.
fn run_test<R, F>(test: F)
where
F: FnOnce(Arc<DieselStore>, Arc<dyn WritableStore>, DeploymentLocator) -> R + Send + 'static,
R: std::future::Future<Output = ()> + Send + 'static,
{
run_test_sequentially(|store| async move {
let subgraph_store = store.subgraph_store();
// Reset state before starting
remove_test_data(subgraph_store.clone());

// Seed database with test data
let deployment = insert_test_data(subgraph_store.clone()).await;
let writable = store
.subgraph_store()
.writable(LOGGER.clone(), deployment.id)
.await
.expect("we can get a writable store");

// Run test and wait for the background writer to finish its work so
// it won't conflict with the next test
test(store, writable.clone(), deployment).await;
writable.flush().await.unwrap();
});
}

fn block_pointer(number: u8) -> BlockPtr {
let hash = H256::from([number; 32]);
BlockPtr::from((hash, number as BlockNumber))
}

fn count_key(deployment: &DeploymentLocator, id: &str) -> EntityKey {
EntityKey {
subgraph_id: deployment.hash.clone(),
entity_type: EntityType::from(COUNTER),
entity_id: id.to_owned(),
}
}

async fn insert_count(store: &Arc<DieselSubgraphStore>, deployment: &DeploymentLocator, count: u8) {
let data = entity! {
id: "1",
count: count as i32
};
let entity_op = EntityOperation::Set {
key: count_key(deployment, &data.get("id").unwrap().to_string()),
data,
};
transact_entity_operations(store, deployment, block_pointer(count), vec![entity_op])
.await
.unwrap();
}

async fn pause_writer(deployment: &DeploymentLocator) {
flush(&deployment).await.unwrap();
writable::allow_steps(0).await;
}

async fn resume_writer(deployment: &DeploymentLocator, steps: usize) {
writable::allow_steps(steps).await;
flush(&deployment).await.unwrap();
}

#[test]
fn tracker() {
run_test(|store, writable, deployment| async move {
let subgraph_store = store.subgraph_store();

let read_count = || {
let counter = writable.get(&count_key(&deployment, "1")).unwrap().unwrap();
counter.get("count").unwrap().as_int().unwrap()
};
for count in 1..4 {
insert_count(&subgraph_store, &deployment, count).await;
}
pause_writer(&deployment).await;

// Test reading back with a pending write
insert_count(&subgraph_store, &deployment, 4).await;
assert_eq!(4, read_count());
resume_writer(&deployment, 1).await;
assert_eq!(4, read_count());

// Test reading back with a pending revert
writable
.revert_block_operations(block_pointer(2), None)
.await
.unwrap();

assert_eq!(2, read_count());

resume_writer(&deployment, 1).await;
assert_eq!(2, read_count());
})
}

0 comments on commit 703294a

Please sign in to comment.