Skip to content

Commit

Permalink
fix(store): Check if graft is done (#4027)
Browse files Browse the repository at this point in the history
* fix(store): Check if graft is done

* refactor: Add context to `load_deployment` error
  • Loading branch information
leoyvens authored Oct 4, 2022
1 parent e12ef3d commit 0c4417e
Showing 1 changed file with 84 additions and 76 deletions.
160 changes: 84 additions & 76 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use diesel::connection::SimpleConnection;
use diesel::pg::PgConnection;
use diesel::prelude::*;
use diesel::r2d2::{ConnectionManager, PooledConnection};
use graph::anyhow::Context;
use graph::blockchain::block_stream::FirehoseCursor;
use graph::components::store::{EntityKey, EntityType, PruneReporter, StoredDynamicDataSource};
use graph::components::versions::VERSIONS;
Expand Down Expand Up @@ -211,7 +212,8 @@ impl DeploymentStore {
site: &Site,
) -> Result<SubgraphDeploymentEntity, StoreError> {
let conn = self.get_conn()?;
detail::deployment_entity(&conn, site)
Ok(detail::deployment_entity(&conn, site)
.with_context(|| format!("Deployment details not found for {}", site.deployment))?)
}

// Remove the data and metadata for the deployment `site`. This operation
Expand Down Expand Up @@ -1309,92 +1311,98 @@ impl DeploymentStore {

// Do any cleanup to bring the subgraph into a known good state
if let Some((src, block)) = graft_src {
info!(
logger,
"Initializing graft by copying data from {} to {}",
src.catalog.site.namespace,
dst.catalog.site.namespace
);

let src_manifest_idx_and_name = self
.load_deployment(&src.site)?
.manifest
.template_idx_and_name()?;
let dst_manifest_idx_and_name = self
.load_deployment(&dst.site)?
.manifest
.template_idx_and_name()?;

// Copy subgraph data
// We allow both not copying tables at all from the source, as well
// as adding new tables in `self`; we only need to check that tables
// that actually need to be copied from the source are compatible
// with the corresponding tables in `self`
let copy_conn = crate::copy::Connection::new(
logger,
self.pool.clone(),
src.clone(),
dst.clone(),
block.clone(),
src_manifest_idx_and_name,
dst_manifest_idx_and_name,
)?;
let status = copy_conn.copy_data()?;
if status == crate::copy::Status::Cancelled {
return Err(StoreError::Canceled);
}

let conn = self.get_conn()?;
conn.transaction(|| -> Result<(), StoreError> {
// Copy shared dynamic data sources and adjust their ID; if
// the subgraph uses private data sources, that is done by
// `copy::Connection::copy_data` since it requires access to
// the source schema which in sharded setups is only
// available while that function runs
let start = Instant::now();
let count = dynds::shared::copy(&conn, &src.site, &dst.site, block.number)?;
info!(logger, "Copied {} dynamic data sources", count;
let dst_block_ptr = Self::block_ptr_with_conn(&conn, dst.site.cheap_clone())?;

// If the dst block is past the graft point, then the graft has already been completed.
if dst_block_ptr.map(|ptr| ptr.number) < Some(block.number) {
info!(
logger,
"Initializing graft by copying data from {} to {}",
src.catalog.site.namespace,
dst.catalog.site.namespace
);

let src_manifest_idx_and_name = self
.load_deployment(&src.site)?
.manifest
.template_idx_and_name()?;
let dst_manifest_idx_and_name = self
.load_deployment(&dst.site)?
.manifest
.template_idx_and_name()?;

// Copy subgraph data
// We allow both not copying tables at all from the source, as well
// as adding new tables in `self`; we only need to check that tables
// that actually need to be copied from the source are compatible
// with the corresponding tables in `self`
let copy_conn = crate::copy::Connection::new(
logger,
self.pool.clone(),
src.clone(),
dst.clone(),
block.clone(),
src_manifest_idx_and_name,
dst_manifest_idx_and_name,
)?;
let status = copy_conn.copy_data()?;
if status == crate::copy::Status::Cancelled {
return Err(StoreError::Canceled);
}

let conn = self.get_conn()?;
conn.transaction(|| -> Result<(), StoreError> {
// Copy shared dynamic data sources and adjust their ID; if
// the subgraph uses private data sources, that is done by
// `copy::Connection::copy_data` since it requires access to
// the source schema which in sharded setups is only
// available while that function runs
let start = Instant::now();
let count = dynds::shared::copy(&conn, &src.site, &dst.site, block.number)?;
info!(logger, "Copied {} dynamic data sources", count;
"time_ms" => start.elapsed().as_millis());

// Copy errors across
let start = Instant::now();
let count = deployment::copy_errors(&conn, &src.site, &dst.site, &block)?;
info!(logger, "Copied {} existing errors", count;
// Copy errors across
let start = Instant::now();
let count = deployment::copy_errors(&conn, &src.site, &dst.site, &block)?;
info!(logger, "Copied {} existing errors", count;
"time_ms" => start.elapsed().as_millis());

catalog::copy_account_like(&conn, &src.site, &dst.site)?;

// Rewind the subgraph so that entity versions that are
// clamped in the future (beyond `block`) become valid for
// all blocks after `block`. `revert_block` gets rid of
// everything including the block passed to it. We want to
// preserve `block` and therefore revert `block+1`
let start = Instant::now();
let block_to_revert: BlockNumber = block
.number
.checked_add(1)
.expect("block numbers fit into an i32");
dst.revert_block(&conn, block_to_revert)?;
info!(logger, "Rewound subgraph to block {}", block.number;
catalog::copy_account_like(&conn, &src.site, &dst.site)?;

// Rewind the subgraph so that entity versions that are
// clamped in the future (beyond `block`) become valid for
// all blocks after `block`. `revert_block` gets rid of
// everything including the block passed to it. We want to
// preserve `block` and therefore revert `block+1`
let start = Instant::now();
let block_to_revert: BlockNumber = block
.number
.checked_add(1)
.expect("block numbers fit into an i32");
dst.revert_block(&conn, block_to_revert)?;
info!(logger, "Rewound subgraph to block {}", block.number;
"time_ms" => start.elapsed().as_millis());

let start = Instant::now();
deployment::set_entity_count(&conn, &dst.site, &dst.count_query)?;
info!(logger, "Counted the entities";
let start = Instant::now();
deployment::set_entity_count(&conn, &dst.site, &dst.count_query)?;
info!(logger, "Counted the entities";
"time_ms" => start.elapsed().as_millis());

// Analyze all tables for this deployment
for entity_name in dst.tables.keys() {
self.analyze_with_conn(site.cheap_clone(), entity_name.as_str(), &conn)?;
}
// Analyze all tables for this deployment
for entity_name in dst.tables.keys() {
self.analyze_with_conn(site.cheap_clone(), entity_name.as_str(), &conn)?;
}

// Set the block ptr to the graft point to signal that we successfully
// performed the graft
crate::deployment::forward_block_ptr(&conn, &dst.site.deployment, &block)?;
info!(logger, "Subgraph successfully initialized";
// Set the block ptr to the graft point to signal that we successfully
// performed the graft
crate::deployment::forward_block_ptr(&conn, &dst.site.deployment, &block)?;
info!(logger, "Subgraph successfully initialized";
"time_ms" => start.elapsed().as_millis());
Ok(())
})?;
Ok(())
})?;
}
}
// Make sure the block pointer is set. This is important for newly
// deployed subgraphs so that we respect the 'startBlock' setting
Expand Down

0 comments on commit 0c4417e

Please sign in to comment.