Skip to content

Commit

Permalink
Fix cross shard graft (#4030)
Browse files Browse the repository at this point in the history
* fix(store): Partially revert 0c4417e

* fix(store): Fix cross shard grafting
  • Loading branch information
leoyvens authored Oct 4, 2022
1 parent ef75f31 commit 0f08b81
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 86 deletions.
159 changes: 75 additions & 84 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1305,104 +1305,95 @@ impl DeploymentStore {
&self,
logger: &Logger,
site: Arc<Site>,
graft_src: Option<(Arc<Layout>, BlockPtr)>,
graft_src: Option<(Arc<Layout>, BlockPtr, SubgraphDeploymentEntity)>,
) -> Result<(), StoreError> {
let dst = self.find_layout(site.cheap_clone())?;

// Do any cleanup to bring the subgraph into a known good state
if let Some((src, block)) = graft_src {
let conn = self.get_conn()?;
let dst_block_ptr = Self::block_ptr_with_conn(&conn, dst.site.cheap_clone())?;

// If the dst block is past the graft point, then the graft has already been completed.
if dst_block_ptr.map(|ptr| ptr.number) < Some(block.number) {
info!(
logger,
"Initializing graft by copying data from {} to {}",
src.catalog.site.namespace,
dst.catalog.site.namespace
);

let src_manifest_idx_and_name = self
.load_deployment(&src.site)?
.manifest
.template_idx_and_name()?;
let dst_manifest_idx_and_name = self
.load_deployment(&dst.site)?
.manifest
.template_idx_and_name()?;

// Copy subgraph data
// We allow both not copying tables at all from the source, as well
// as adding new tables in `self`; we only need to check that tables
// that actually need to be copied from the source are compatible
// with the corresponding tables in `self`
let copy_conn = crate::copy::Connection::new(
logger,
self.pool.clone(),
src.clone(),
dst.clone(),
block.clone(),
src_manifest_idx_and_name,
dst_manifest_idx_and_name,
)?;
let status = copy_conn.copy_data()?;
if status == crate::copy::Status::Cancelled {
return Err(StoreError::Canceled);
}
// If `graft_src` is `Some`, then there is a pending graft.
if let Some((src, block, src_deployment)) = graft_src {
info!(
logger,
"Initializing graft by copying data from {} to {}",
src.catalog.site.namespace,
dst.catalog.site.namespace
);

let src_manifest_idx_and_name = src_deployment.manifest.template_idx_and_name()?;
let dst_manifest_idx_and_name = self
.load_deployment(&dst.site)?
.manifest
.template_idx_and_name()?;

// Copy subgraph data
// We allow both not copying tables at all from the source, as well
// as adding new tables in `self`; we only need to check that tables
// that actually need to be copied from the source are compatible
// with the corresponding tables in `self`
let copy_conn = crate::copy::Connection::new(
logger,
self.pool.clone(),
src.clone(),
dst.clone(),
block.clone(),
src_manifest_idx_and_name,
dst_manifest_idx_and_name,
)?;
let status = copy_conn.copy_data()?;
if status == crate::copy::Status::Cancelled {
return Err(StoreError::Canceled);
}

let conn = self.get_conn()?;
conn.transaction(|| -> Result<(), StoreError> {
// Copy shared dynamic data sources and adjust their ID; if
// the subgraph uses private data sources, that is done by
// `copy::Connection::copy_data` since it requires access to
// the source schema which in sharded setups is only
// available while that function runs
let start = Instant::now();
let count = dynds::shared::copy(&conn, &src.site, &dst.site, block.number)?;
info!(logger, "Copied {} dynamic data sources", count;
let conn = self.get_conn()?;
conn.transaction(|| -> Result<(), StoreError> {
// Copy shared dynamic data sources and adjust their ID; if
// the subgraph uses private data sources, that is done by
// `copy::Connection::copy_data` since it requires access to
// the source schema which in sharded setups is only
// available while that function runs
let start = Instant::now();
let count = dynds::shared::copy(&conn, &src.site, &dst.site, block.number)?;
info!(logger, "Copied {} dynamic data sources", count;
"time_ms" => start.elapsed().as_millis());

// Copy errors across
let start = Instant::now();
let count = deployment::copy_errors(&conn, &src.site, &dst.site, &block)?;
info!(logger, "Copied {} existing errors", count;
// Copy errors across
let start = Instant::now();
let count = deployment::copy_errors(&conn, &src.site, &dst.site, &block)?;
info!(logger, "Copied {} existing errors", count;
"time_ms" => start.elapsed().as_millis());

catalog::copy_account_like(&conn, &src.site, &dst.site)?;

// Rewind the subgraph so that entity versions that are
// clamped in the future (beyond `block`) become valid for
// all blocks after `block`. `revert_block` gets rid of
// everything including the block passed to it. We want to
// preserve `block` and therefore revert `block+1`
let start = Instant::now();
let block_to_revert: BlockNumber = block
.number
.checked_add(1)
.expect("block numbers fit into an i32");
dst.revert_block(&conn, block_to_revert)?;
info!(logger, "Rewound subgraph to block {}", block.number;
catalog::copy_account_like(&conn, &src.site, &dst.site)?;

// Rewind the subgraph so that entity versions that are
// clamped in the future (beyond `block`) become valid for
// all blocks after `block`. `revert_block` gets rid of
// everything including the block passed to it. We want to
// preserve `block` and therefore revert `block+1`
let start = Instant::now();
let block_to_revert: BlockNumber = block
.number
.checked_add(1)
.expect("block numbers fit into an i32");
dst.revert_block(&conn, block_to_revert)?;
info!(logger, "Rewound subgraph to block {}", block.number;
"time_ms" => start.elapsed().as_millis());

let start = Instant::now();
deployment::set_entity_count(&conn, &dst.site, &dst.count_query)?;
info!(logger, "Counted the entities";
let start = Instant::now();
deployment::set_entity_count(&conn, &dst.site, &dst.count_query)?;
info!(logger, "Counted the entities";
"time_ms" => start.elapsed().as_millis());

// Analyze all tables for this deployment
for entity_name in dst.tables.keys() {
self.analyze_with_conn(site.cheap_clone(), entity_name.as_str(), &conn)?;
}
// Analyze all tables for this deployment
for entity_name in dst.tables.keys() {
self.analyze_with_conn(site.cheap_clone(), entity_name.as_str(), &conn)?;
}

// Set the block ptr to the graft point to signal that we successfully
// performed the graft
crate::deployment::forward_block_ptr(&conn, &dst.site.deployment, &block)?;
info!(logger, "Subgraph successfully initialized";
// Set the block ptr to the graft point to signal that we successfully
// performed the graft
crate::deployment::forward_block_ptr(&conn, &dst.site.deployment, &block)?;
info!(logger, "Subgraph successfully initialized";
"time_ms" => start.elapsed().as_millis());
Ok(())
})?;
}
Ok(())
})?;
}
// Make sure the block pointer is set. This is important for newly
// deployed subgraphs so that we respect the 'startBlock' setting
Expand Down
10 changes: 8 additions & 2 deletions store/postgres/src/writable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use graph::components::store::ReadStore;
use graph::data::subgraph::schema;
use graph::env::env_var;
use graph::prelude::{
BlockNumber, Entity, MetricsRegistry, Schema, SubgraphStore as _, BLOCK_NUMBER_MAX,
BlockNumber, Entity, MetricsRegistry, Schema, SubgraphDeploymentEntity, SubgraphStore as _,
BLOCK_NUMBER_MAX,
};
use graph::slog::info;
use graph::util::bounded_queue::BoundedQueue;
Expand Down Expand Up @@ -58,6 +59,10 @@ impl WritableSubgraphStore {
fn layout(&self, id: &DeploymentHash) -> Result<Arc<Layout>, StoreError> {
self.0.layout(id)
}

fn load_deployment(&self, site: &Site) -> Result<SubgraphDeploymentEntity, StoreError> {
self.0.load_deployment(site)
}
}

/// Write synchronously to the actual store, i.e., once a method returns,
Expand Down Expand Up @@ -171,7 +176,8 @@ impl SyncStore {
let graft_base = match self.writable.graft_pending(&self.site.deployment)? {
Some((base_id, base_ptr)) => {
let src = self.store.layout(&base_id)?;
Some((src, base_ptr))
let deployment_entity = self.store.load_deployment(&src.site)?;
Some((src, base_ptr, deployment_entity))
}
None => None,
};
Expand Down

0 comments on commit 0f08b81

Please sign in to comment.