Skip to content

Commit

Permalink
[State] Support state sync and provide RPC to sync state (#2614)
Browse files Browse the repository at this point in the history
* replace tx and event indexer sender from RoochAddress to AccountAddress, for reduce sender store size in indexer by using short_str_lossless

* save state change set

* draft implements indexer revert

* finish revert indexer

* implements sync state rpc

* display bitcoin_block_hash for l1 block

* refactor revert tx and rollback tx tool

* test revert and rollback tx
  • Loading branch information
baichuan3 committed Sep 13, 2024
1 parent e5e19af commit 400c55a
Show file tree
Hide file tree
Showing 36 changed files with 1,007 additions and 155 deletions.
149 changes: 125 additions & 24 deletions crates/rooch-db/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,29 @@
// Copyright (c) RoochNetwork
// SPDX-License-Identifier: Apache-2.0

use std::collections::HashSet;
use std::collections::{HashMap, HashSet};

use anyhow::{Error, Result};
use anyhow::{anyhow, Error, Result};
use moveos_store::config_store::ConfigStore;
use moveos_store::transaction_store::TransactionStore as TxExecutionInfoStore;
use moveos_store::MoveOSStore;
use moveos_types::access_path::AccessPath;
use moveos_types::h256::H256;
use moveos_types::moveos_std::object::ObjectMeta;
use moveos_types::startup_info::StartupInfo;
use moveos_types::state_resolver::{RootObjectResolver, StateReader};
use prometheus::Registry;
use raw_store::metrics::DBMetrics;
use raw_store::{rocks::RocksDB, StoreInstance};
use rooch_config::store_config::StoreConfig;
use rooch_indexer::store::traits::IndexerStoreTrait;
use rooch_indexer::{indexer_reader::IndexerReader, IndexerStore};
use rooch_store::transaction_store::TransactionStore;
use rooch_store::state_store::StateStore;
use rooch_store::RoochStore;
use rooch_types::indexer::state::{
collect_revert_object_change_ids, handle_revert_object_change, IndexerObjectStateChangeSet,
IndexerObjectStatesIndexGenerator,
};
use rooch_types::sequencer::SequencerInfo;

#[derive(Clone)]
Expand Down Expand Up @@ -113,27 +122,15 @@ impl RoochDB {
}
let sequencer_info = ledger_tx_opt.unwrap().sequence_info;
let tx_order = sequencer_info.tx_order;
// check last order equals to sequencer tx order via tx_hash
if sequencer_info.tx_order != last_sequencer_info.last_order {
return Err(Error::msg(format!(
"the last order {} not match current sequencer info tx order {}, tx_hash {}",
last_sequencer_info.last_order, sequencer_info.tx_order, tx_hash
)));
}
assert_eq!(
sequencer_info.tx_order, last_sequencer_info.last_order,
"the last order {} should match current sequencer info tx order {}, tx_hash {}",
last_sequencer_info.last_order, sequencer_info.tx_order, tx_hash
);

// check only write tx sequence info succ, but not write tx execution info
let execution_info = self
.moveos_store
.transaction_store
.get_tx_execution_info(tx_hash)?;
if execution_info.is_some() {
return Err(Error::msg(format!(
"the tx execution info already exist via tx_hash {}, revert tx failed",
tx_hash
)));
}
self.do_revert_tx_ignore_check(tx_hash)?;

let previous_tx_order = last_order - 1;
let previous_tx_order = if tx_order > 0 { tx_order - 1 } else { 0 };
let previous_tx_hash_opt = self
.rooch_store
.transaction_store
Expand All @@ -155,16 +152,32 @@ impl RoochDB {
previous_tx_hash
)));
}
let previous_sequencer_info = previous_ledger_tx_opt.unwrap().sequence_info;

let previous_execution_info_opt = self
.moveos_store
.transaction_store
.get_tx_execution_info(previous_tx_hash)?;
if previous_execution_info_opt.is_none() {
return Err(Error::msg(format!(
"the previous execution info not exist via tx_hash {}, revert tx failed",
previous_tx_hash
)));
}
let previous_sequencer_info = previous_ledger_tx_opt.unwrap().sequence_info;
let previous_execution_info = previous_execution_info_opt.unwrap();
let revert_sequencer_info = SequencerInfo::new(
previous_tx_order,
previous_sequencer_info.tx_accumulator_info(),
);
self.rooch_store
.meta_store
.save_sequencer_info_ignore_check(revert_sequencer_info)?;
self.rooch_store.remove_transaction(tx_hash, tx_order)?;

let startup_info = StartupInfo::new(
previous_execution_info.state_root,
previous_execution_info.size,
);
self.moveos_store.save_startup_info(startup_info)?;

println!(
"revert tx succ, tx_hash: {:?}, tx_order {}",
Expand All @@ -173,4 +186,92 @@ impl RoochDB {

Ok(())
}

pub fn do_revert_tx_ignore_check(&self, tx_hash: H256) -> Result<()> {
let ledger_tx_opt = self
.rooch_store
.transaction_store
.get_transaction_by_hash(tx_hash)?;

if ledger_tx_opt.is_none() {
println!("the ledger tx not exist via tx_hash {}", tx_hash);
return Ok(());
}

let sequencer_info = ledger_tx_opt.unwrap().sequence_info;
let tx_order = sequencer_info.tx_order;
self.rooch_store
.transaction_store
.remove_transaction(tx_hash, tx_order)?;
self.moveos_store
.transaction_store
.remove_tx_execution_info(tx_hash)?;

// remove the state change set
let state_change_set_ext_opt = self.rooch_store.get_state_change_set(tx_order)?;
if state_change_set_ext_opt.is_some() {
self.rooch_store.remove_state_change_set(tx_order)?;
}

// revert the indexer
let previous_tx_order = if tx_order > 0 { tx_order - 1 } else { 0 };
let previous_state_change_set_ext_opt =
self.rooch_store.get_state_change_set(previous_tx_order)?;
if previous_state_change_set_ext_opt.is_some() && state_change_set_ext_opt.is_some() {
let previous_state_change_set_ext = previous_state_change_set_ext_opt.unwrap();
let state_change_set_ext = state_change_set_ext_opt.unwrap();

let mut object_ids = vec![];
for (_feild_key, object_change) in state_change_set_ext.state_change_set.changes.clone()
{
collect_revert_object_change_ids(object_change, &mut object_ids)?;
}

let root = ObjectMeta::root_metadata(
previous_state_change_set_ext.state_change_set.state_root,
previous_state_change_set_ext.state_change_set.global_size,
);
let resolver = RootObjectResolver::new(root, &self.moveos_store);
let object_mapping = resolver
.get_states(AccessPath::objects(object_ids))?
.into_iter()
.flatten()
.map(|v| (v.metadata.id.clone(), v.metadata))
.collect::<HashMap<_, _>>();

// 1. revert indexer transaction
self.indexer_store
.delete_transactions(vec![tx_order])
.map_err(|e| anyhow!(format!("Revert indexer transactions error: {:?}", e)))?;

// 2. revert indexer event
self.indexer_store
.delete_events(vec![tx_order])
.map_err(|e| anyhow!(format!("Revert indexer events error: {:?}", e)))?;

// 3. revert indexer full object state, including object_states, utxos and inscriptions
// indexer object state index generator
let mut state_index_generator = IndexerObjectStatesIndexGenerator::default();
let mut indexer_object_state_change_set = IndexerObjectStateChangeSet::default();

for (_feild_key, object_change) in state_change_set_ext.state_change_set.changes {
handle_revert_object_change(
&mut state_index_generator,
tx_order,
&mut indexer_object_state_change_set,
object_change,
&object_mapping,
)?;
}
self.indexer_store
.apply_object_states(indexer_object_state_change_set)
.map_err(|e| anyhow!(format!("Revert indexer states error: {:?}", e)))?;
}
println!(
"revert tx and indexer succ, tx_hash: {:?}, tx_order {}",
tx_hash, tx_order
);

Ok(())
}
}
27 changes: 24 additions & 3 deletions crates/rooch-executor/src/actor/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

use super::messages::{
ConvertL2TransactionData, DryRunTransactionMessage, DryRunTransactionResult,
ExecuteTransactionMessage, ExecuteTransactionResult, GetRootMessage, ValidateL1BlockMessage,
ValidateL1TxMessage, ValidateL2TxMessage,
ExecuteTransactionMessage, ExecuteTransactionResult, GetRootMessage, SaveStateChangeSetMessage,
ValidateL1BlockMessage, ValidateL1TxMessage, ValidateL2TxMessage,
};
use crate::metrics::ExecutorMetrics;
use anyhow::Result;
Expand All @@ -21,14 +21,15 @@ use moveos_types::module_binding::MoveFunctionCaller;
use moveos_types::move_std::option::MoveOption;
use moveos_types::moveos_std::object::ObjectMeta;
use moveos_types::moveos_std::tx_context::TxContext;
use moveos_types::state::ObjectState;
use moveos_types::state::{ObjectState, StateChangeSetExt};
use moveos_types::state_resolver::RootObjectResolver;
use moveos_types::transaction::{FunctionCall, MoveOSTransaction, VerifiedMoveAction};
use moveos_types::transaction::{MoveAction, VerifiedMoveOSTransaction};
use prometheus::Registry;
use rooch_event::actor::{EventActor, EventActorSubscribeMessage, GasUpgradeMessage};
use rooch_event::event::GasUpgradeEvent;
use rooch_genesis::FrameworksGasParameters;
use rooch_store::state_store::StateStore;
use rooch_store::RoochStore;
use rooch_types::address::{BitcoinAddress, MultiChainAddress};
use rooch_types::bitcoin::BitcoinModule;
Expand Down Expand Up @@ -416,6 +417,15 @@ impl ExecutorActor {
self.root = root;
self.moveos.flush_module_cache(is_upgrade)
}

pub fn save_state_change_set(
&mut self,
tx_order: u64,
state_change_set: StateChangeSetExt,
) -> Result<()> {
self.rooch_store
.save_state_change_set(tx_order, state_change_set)
}
}

#[async_trait]
Expand Down Expand Up @@ -505,6 +515,17 @@ impl Handler<GetRootMessage> for ExecutorActor {
}
}

#[async_trait]
impl Handler<SaveStateChangeSetMessage> for ExecutorActor {
async fn handle(
&mut self,
msg: SaveStateChangeSetMessage,
_ctx: &mut ActorContext,
) -> Result<()> {
self.save_state_change_set(msg.tx_order, msg.state_change_set)
}
}

impl MoveFunctionCaller for ExecutorActor {
fn call_function(&self, ctx: &TxContext, call: FunctionCall) -> Result<FunctionResult> {
Ok(self
Expand Down
21 changes: 20 additions & 1 deletion crates/rooch-executor/src/actor/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use moveos_types::function_return_value::AnnotatedFunctionResult;
use moveos_types::h256::H256;
use moveos_types::moveos_std::event::{AnnotatedEvent, Event, EventID};
use moveos_types::moveos_std::object::ObjectMeta;
use moveos_types::state::{AnnotatedState, FieldKey, ObjectState};
use moveos_types::state::{AnnotatedState, FieldKey, ObjectState, StateChangeSetExt};
use moveos_types::state_resolver::{AnnotatedStateKV, StateKV};
use moveos_types::transaction::TransactionExecutionInfo;
use moveos_types::transaction::TransactionOutput;
Expand Down Expand Up @@ -213,3 +213,22 @@ pub struct GetRootMessage {}
impl Message for GetRootMessage {
type Result = Result<ObjectState>;
}

#[derive(Debug, Serialize, Deserialize)]
pub struct SaveStateChangeSetMessage {
pub tx_order: u64,
pub state_change_set: StateChangeSetExt,
}

impl Message for SaveStateChangeSetMessage {
type Result = Result<()>;
}

#[derive(Debug, Serialize, Deserialize)]
pub struct GetStateChangeSetsMessage {
pub tx_orders: Vec<u64>,
}

impl Message for GetStateChangeSetsMessage {
type Result = Result<Vec<Option<StateChangeSetExt>>>;
}
20 changes: 17 additions & 3 deletions crates/rooch-executor/src/actor/reader_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

use super::messages::{
AnnotatedStatesMessage, ExecuteViewFunctionMessage, GetAnnotatedEventsByEventHandleMessage,
GetAnnotatedEventsByEventIDsMessage, GetEventsByEventHandleMessage, RefreshStateMessage,
StatesMessage,
GetAnnotatedEventsByEventIDsMessage, GetEventsByEventHandleMessage, GetStateChangeSetsMessage,
RefreshStateMessage, StatesMessage,
};
use crate::actor::messages::{
GetEventsByEventIDsMessage, GetTxExecutionInfosByHashMessage, ListAnnotatedStatesMessage,
Expand All @@ -24,7 +24,7 @@ use moveos_types::function_return_value::AnnotatedFunctionReturnValue;
use moveos_types::moveos_std::event::EventHandle;
use moveos_types::moveos_std::event::{AnnotatedEvent, Event};
use moveos_types::moveos_std::object::ObjectMeta;
use moveos_types::state::{AnnotatedState, ObjectState};
use moveos_types::state::{AnnotatedState, ObjectState, StateChangeSetExt};
use moveos_types::state_resolver::RootObjectResolver;
use moveos_types::state_resolver::{AnnotatedStateKV, AnnotatedStateReader, StateKV, StateReader};
use moveos_types::transaction::TransactionExecutionInfo;
Expand Down Expand Up @@ -323,3 +323,17 @@ impl Handler<EventData> for ReaderExecutorActor {
Ok(())
}
}

#[async_trait]
impl Handler<GetStateChangeSetsMessage> for ReaderExecutorActor {
async fn handle(
&mut self,
msg: GetStateChangeSetsMessage,
_ctx: &mut ActorContext,
) -> Result<Vec<Option<StateChangeSetExt>>> {
let GetStateChangeSetsMessage { tx_orders } = msg;
self.rooch_store
.state_store
.multi_get_state_change_set(tx_orders)
}
}
33 changes: 28 additions & 5 deletions crates/rooch-executor/src/proxy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@

use crate::actor::messages::{
ConvertL2TransactionData, DryRunTransactionResult, GetAnnotatedEventsByEventIDsMessage,
GetEventsByEventHandleMessage, GetEventsByEventIDsMessage, GetTxExecutionInfosByHashMessage,
ListAnnotatedStatesMessage, ListStatesMessage, RefreshStateMessage, ValidateL1BlockMessage,
ValidateL1TxMessage,
GetEventsByEventHandleMessage, GetEventsByEventIDsMessage, GetStateChangeSetsMessage,
GetTxExecutionInfosByHashMessage, ListAnnotatedStatesMessage, ListStatesMessage,
RefreshStateMessage, SaveStateChangeSetMessage, ValidateL1BlockMessage, ValidateL1TxMessage,
};
use crate::actor::reader_executor::ReaderExecutorActor;
use crate::actor::{
Expand All @@ -15,7 +15,7 @@ use crate::actor::{
StatesMessage, ValidateL2TxMessage,
},
};
use anyhow::Result;
use anyhow::{anyhow, Result};
use coerce::actor::ActorRef;
use move_core_types::account_address::AccountAddress;
use move_core_types::language_storage::StructTag;
Expand All @@ -26,7 +26,7 @@ use moveos_types::moveos_std::account::Account;
use moveos_types::moveos_std::event::{Event, EventID};
use moveos_types::moveos_std::object::ObjectMeta;
use moveos_types::moveos_std::tx_context::TxContext;
use moveos_types::state::FieldKey;
use moveos_types::state::{FieldKey, StateChangeSetExt};
use moveos_types::state_resolver::{AnnotatedStateKV, StateKV};
use moveos_types::transaction::FunctionCall;
use moveos_types::transaction::TransactionExecutionInfo;
Expand Down Expand Up @@ -237,6 +237,29 @@ impl ExecutorProxy {
self.refresh_state(root.metadata, false).await
}

pub async fn save_state_change_set(
&self,
tx_order: u64,
state_change_set: StateChangeSetExt,
) -> Result<()> {
self.actor
.notify(SaveStateChangeSetMessage {
tx_order,
state_change_set,
})
.await
.map_err(|e| anyhow!(format!("Save state change set error: {:?}", e)))
}

pub async fn get_state_change_sets(
&self,
tx_orders: Vec<u64>,
) -> Result<Vec<Option<StateChangeSetExt>>> {
self.reader_actor
.send(GetStateChangeSetsMessage { tx_orders })
.await?
}

pub async fn chain_id(&self) -> Result<ChainID> {
self.get_states(AccessPath::object(ChainID::chain_id_object_id()))
.await?
Expand Down
Loading

0 comments on commit 400c55a

Please sign in to comment.