Skip to content

Commit

Permalink
Support migration from existing nodes populating leaf_hash column
Browse files Browse the repository at this point in the history
  • Loading branch information
jbearer committed Oct 10, 2024
1 parent a4bb17d commit 4459ab8
Showing 1 changed file with 127 additions and 2 deletions.
129 changes: 127 additions & 2 deletions sequencer/src/persistence/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,12 @@ impl PersistenceOptions for Options {
type Persistence = Persistence;

async fn create(self) -> anyhow::Result<Persistence> {
Ok(Persistence {
let persistence = Persistence {
store_undecided_state: self.store_undecided_state,
db: SqlStorage::connect(self.try_into()?).await?,
})
};
persistence.migrate_quorum_proposal_leaf_hashes().await?;
Ok(persistence)
}

async fn reset(self) -> anyhow::Result<()> {
Expand All @@ -252,6 +254,41 @@ pub struct Persistence {
store_undecided_state: bool,
}

impl Persistence {
/// Ensure the `leaf_hash` column is populated for all existing quorum proposals.
///
/// This column was added in a migration, but because it requires computing a commitment of the
/// existing data, it is not easy to populate in the SQL migration itself. Thus, on startup, we
/// check if there are any just-migrated quorum proposals with a `NULL` value for this column,
/// and if so we populate the column manually.
async fn migrate_quorum_proposal_leaf_hashes(&self) -> anyhow::Result<()> {
let mut tx = self.db.write().await?;
let mut proposals = tx.query_static("SELECT * FROM quorum_proposals").await?;
let mut updates = vec![];
while let Some(row) = proposals.next().await {
let row = row?;
let hash: Option<String> = row.try_get("leaf_hash")?;
if hash.is_none() {
let view: i64 = row.try_get("view")?;
let data: Vec<u8> = row.try_get("data")?;
let proposal: Proposal<SeqTypes, QuorumProposal<SeqTypes>> =
bincode::deserialize(&data)?;
let leaf = Leaf::from_quorum_proposal(&proposal.data);
let leaf_hash = Committable::commit(&leaf);
tracing::info!(view, %leaf_hash, "populating quorum proposal leaf hash");
updates.push((view, leaf_hash.to_string()));
}
}

let params = updates
.iter()
.map(|(view, hash)| [sql_param(view), sql_param(hash)]);
tx.upsert("quorum_proposals", ["view", "leaf_hash"], ["view"], params)
.await?;
tx.commit().await
}
}

#[async_trait]
impl SequencerPersistence for Persistence {
fn into_catchup_provider(
Expand Down Expand Up @@ -615,3 +652,91 @@ mod generic_tests {

instantiate_persistence_tests!(Persistence);
}

#[cfg(test)]
mod test {
use super::*;
use crate::{persistence::testing::TestablePersistence, BLSPubKey, PubKey};
use espresso_types::{NodeState, ValidatedState};
use futures::stream::TryStreamExt;
use hotshot_example_types::node_types::TestVersions;
use hotshot_types::traits::signature_key::SignatureKey;

#[async_std::test]
async fn test_quorum_proposals_leaf_hash_migration() {
// Create some quorum proposals to test with.
let leaf = Leaf::genesis(&ValidatedState::default(), &NodeState::mock()).await;
let privkey = BLSPubKey::generated_from_seed_indexed([0; 32], 1).1;
let signature = PubKey::sign(&privkey, &[]).unwrap();
let mut quorum_proposal = Proposal {
data: QuorumProposal::<SeqTypes> {
block_header: leaf.block_header().clone(),
view_number: ViewNumber::genesis(),
justify_qc: QuorumCertificate::genesis::<TestVersions>(
&ValidatedState::default(),
&NodeState::mock(),
)
.await,
upgrade_certificate: None,
proposal_certificate: None,
},
signature,
_pd: Default::default(),
};

let qp1 = quorum_proposal.clone();

quorum_proposal.data.view_number = ViewNumber::new(1);
let qp2 = quorum_proposal.clone();

let qps = [qp1, qp2];

// Create persistence and add the quorum proposals with NULL leaf hash.
let db = Persistence::tmp_storage().await;
let persistence = Persistence::connect(&db).await;
let mut tx = persistence.db.write().await.unwrap();
let params = qps
.iter()
.map(|qp| {
(
qp.data.view_number.u64() as i64,
bincode::serialize(&qp).unwrap(),
)
})
.collect::<Vec<_>>();
tx.upsert(
"quorum_proposals",
["view", "data"],
["view"],
params
.iter()
.map(|(view, data)| [sql_param(view), sql_param(data)]),
)
.await
.unwrap();
tx.commit().await.unwrap();

// Create a new persistence and ensure the commitments get populated.
let persistence = Persistence::connect(&db).await;
let tx = persistence.db.read().await.unwrap();
let rows = tx
.query_static("SELECT * FROM quorum_proposals ORDER BY view ASC")
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(rows.len(), qps.len());
for (row, qp) in rows.into_iter().zip(qps) {
assert_eq!(row.get::<_, i64>("view"), qp.data.view_number.u64() as i64);
assert_eq!(
row.get::<_, Vec<u8>>("data"),
bincode::serialize(&qp).unwrap()
);
assert_eq!(
row.get::<_, String>("leaf_hash"),
Committable::commit(&Leaf::from_quorum_proposal(&qp.data)).to_string()
);
}
}
}

0 comments on commit 4459ab8

Please sign in to comment.