Skip to content

Commit

Permalink
store: Reorganize write queue and requests to be more self-contained
Browse files Browse the repository at this point in the history
  • Loading branch information
lutter committed Mar 19, 2022
1 parent 00d9714 commit 44bbae5
Showing 1 changed file with 49 additions and 46 deletions.
95 changes: 49 additions & 46 deletions store/postgres/src/writable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ impl SyncStore {
block_ptr_to: &BlockPtr,
firehose_cursor: Option<&str>,
mods: &[EntityModification],
stopwatch: StopwatchMetrics,
stopwatch: &StopwatchMetrics,
data_sources: &[StoredDynamicDataSource],
deterministic_errors: &[SubgraphError],
) -> Result<(), StoreError> {
Expand Down Expand Up @@ -402,6 +402,8 @@ impl BlockTracker {
/// queued
enum Request {
Write {
store: Arc<SyncStore>,
stopwatch: StopwatchMetrics,
/// The block at which we are writing the changes
block_ptr: BlockPtr,
firehose_cursor: Option<String>,
Expand All @@ -410,12 +412,41 @@ enum Request {
deterministic_errors: Vec<SubgraphError>,
},
Revert {
store: Arc<SyncStore>,
/// The subgraph head will be at this block pointer after the revert
block_ptr: BlockPtr,
firehose_cursor: Option<String>,
},
}

impl Request {
fn execute(&self) -> Result<(), StoreError> {
match self {
Request::Write {
store,
stopwatch,
block_ptr: block_ptr_to,
firehose_cursor,
mods,
data_sources,
deterministic_errors,
} => store.transact_block_operations(
block_ptr_to,
firehose_cursor.as_deref(),
mods,
stopwatch,
data_sources,
deterministic_errors,
),
Request::Revert {
store,
block_ptr,
firehose_cursor,
} => store.revert_block_operations(block_ptr.clone(), firehose_cursor.as_deref()),
}
}
}

/// A queue that asynchronously writes requests queued with `push` to the
/// underlying store and allows retrieving information that is a combination
/// of queued changes and changes already committed to the store.
Expand All @@ -436,6 +467,8 @@ struct Queue {
/// happens, no more changes will be written, and any attempt to write
/// or revert will result in an error
poisoned: AtomicBool,

stopwatch: StopwatchMetrics,
}

impl Queue {
Expand All @@ -446,34 +479,7 @@ impl Queue {
capacity: usize,
registry: Arc<dyn MetricsRegistry>,
) -> Arc<Self> {
fn execute(
req: Arc<Request>,
store: Arc<SyncStore>,
stopwatch: StopwatchMetrics,
) -> Result<(), StoreError> {
match req.as_ref() {
Request::Write {
block_ptr: block_ptr_to,
firehose_cursor,
mods,
data_sources,
deterministic_errors,
} => store.transact_block_operations(
block_ptr_to,
firehose_cursor.as_deref(),
mods,
stopwatch.cheap_clone(),
data_sources,
deterministic_errors,
),
Request::Revert {
block_ptr,
firehose_cursor,
} => store.revert_block_operations(block_ptr.clone(), firehose_cursor.as_deref()),
}
}

async fn start_writer(queue: Arc<Queue>, stopwatch: StopwatchMetrics) {
async fn start_writer(queue: Arc<Queue>) {
loop {
// We peek at the front of the queue, rather than pop it
// right away, so that query methods like `get` have access
Expand All @@ -482,12 +488,7 @@ impl Queue {
// the write transaction commits, causing them to return
// incorrect results.
let req = queue.queue.peek().await;
let store = queue.store.cheap_clone();
let stopwatch = stopwatch.cheap_clone();
let res = graph::spawn_blocking_allow_panic(move || {
execute(req, store, stopwatch.cheap_clone())
})
.await;
let res = graph::spawn_blocking_allow_panic(move || req.execute()).await;
// The request has been handled. It's now safe to remove it
// from the queue
queue.queue.pop().await;
Expand All @@ -508,24 +509,23 @@ impl Queue {

let queue = BoundedQueue::with_capacity(capacity);
let write_err = Mutex::new(None);

// Use a separate instance of the `StopwatchMetrics` for background
// work since that has its own call hierarchy, and using the
// foreground metrics will lead to incorrect nesting of sections
let stopwatch =
StopwatchMetrics::new(logger, store.site.deployment.clone(), "writer", registry);

let queue = Self {
store,
queue,
write_err,
poisoned: AtomicBool::new(false),
stopwatch,
};
let queue = Arc::new(queue);

// Use a separate instance of the `StopwatchMetrics` for background
// work since that has its own call hierarchy, and using the
// foreground metrics will lead to incorrect nesting of sections
let stopwatch = StopwatchMetrics::new(
logger,
queue.store.site.deployment.clone(),
"writer",
registry,
);
graph::spawn(start_writer(queue.cheap_clone(), stopwatch));
graph::spawn(start_writer(queue.cheap_clone()));

queue
}
Expand Down Expand Up @@ -750,12 +750,14 @@ impl Writer {
&block_ptr_to,
firehose_cursor.as_deref(),
&mods,
stopwatch,
&stopwatch,
&data_sources,
&deterministic_errors,
),
Writer::Async(queue) => {
let req = Request::Write {
store: queue.store.cheap_clone(),
stopwatch: queue.stopwatch.cheap_clone(),
block_ptr: block_ptr_to,
firehose_cursor,
mods,
Expand All @@ -777,6 +779,7 @@ impl Writer {
Writer::Async(queue) => {
let firehose_cursor = firehose_cursor.map(|c| c.to_string());
let req = Request::Revert {
store: queue.store.cheap_clone(),
block_ptr: block_ptr_to,
firehose_cursor,
};
Expand Down

0 comments on commit 44bbae5

Please sign in to comment.