Skip to content

Commit

Permalink
[consensus] epoch manager refactoring; dag is coming season 1
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun committed Sep 11, 2023
1 parent e8a6dad commit 1952707
Showing 1 changed file with 127 additions and 73 deletions.
200 changes: 127 additions & 73 deletions consensus/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use crate::{
},
network_interface::{ConsensusMsg, ConsensusNetworkClient},
payload_client::QuorumStoreClient,
payload_manager::PayloadManager,
persistent_liveness_storage::{LedgerRecoveryData, PersistentLivenessStorage, RecoveryData},
quorum_store::{
quorum_store_builder::{DirectMempoolInnerBuilder, InnerBuilder, QuorumStoreBuilder},
Expand Down Expand Up @@ -601,56 +602,11 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
tokio::spawn(recovery_manager.start(recovery_manager_rx, close_rx));
}

async fn start_round_manager(
async fn init_payload_provider(
&mut self,
recovery_data: RecoveryData,
epoch_state: EpochState,
onchain_consensus_config: OnChainConsensusConfig,
onchain_execution_config: OnChainExecutionConfig,
) {
let epoch = epoch_state.epoch;
counters::EPOCH.set(epoch_state.epoch as i64);
counters::CURRENT_EPOCH_VALIDATORS.set(epoch_state.verifier.len() as i64);
info!(
epoch = epoch_state.epoch,
validators = epoch_state.verifier.to_string(),
root_block = %recovery_data.root_block(),
"Starting new epoch",
);
let last_vote = recovery_data.last_vote();

info!(epoch = epoch, "Update SafetyRules");

let mut safety_rules =
MetricsSafetyRules::new(self.safety_rules_manager.client(), self.storage.clone());
if let Err(error) = safety_rules.perform_initialize() {
error!(
epoch = epoch,
error = error,
"Unable to initialize safety rules.",
);
}

info!(epoch = epoch, "Create RoundState");
let round_state =
self.create_round_state(self.time_service.clone(), self.timeout_sender.clone());

info!(epoch = epoch, "Create ProposerElection");
let proposer_election =
self.create_proposer_election(&epoch_state, &onchain_consensus_config);
let network_sender = NetworkSender::new(
self.author,
self.network_sender.clone(),
self.self_sender.clone(),
epoch_state.verifier.clone(),
);
let chain_health_backoff_config =
ChainHealthBackoffConfig::new(self.config.chain_health_backoff.clone());
let pipeline_backpressure_config =
PipelineBackpressureConfig::new(self.config.pipeline_backpressure.clone());

let safety_rules_container = Arc::new(Mutex::new(safety_rules));

epoch_state: &EpochState,
network_sender: NetworkSender,
) -> (Arc<PayloadManager>, QuorumStoreClient, QuorumStoreBuilder) {
// Start QuorumStore
let (consensus_to_quorum_store_tx, consensus_to_quorum_store_rx) =
mpsc::channel(self.config.intra_consensus_channel_buffer_size);
Expand Down Expand Up @@ -681,11 +637,6 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
};

let (payload_manager, quorum_store_msg_tx) = quorum_store_builder.init_payload_manager();
let transaction_shuffler =
create_transaction_shuffler(onchain_execution_config.transaction_shuffler_type());
let block_gas_limit = onchain_execution_config.block_gas_limit();
let transaction_deduper =
create_transaction_deduper(onchain_execution_config.transaction_deduper_type());
self.quorum_store_msg_tx = quorum_store_msg_tx;

let payload_client = QuorumStoreClient::new(
Expand All @@ -694,23 +645,120 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
self.config.wait_for_full_blocks_above_recent_fill_threshold,
self.config.wait_for_full_blocks_above_pending_blocks,
);
(payload_manager, payload_client, quorum_store_builder)
}

fn init_state_computer(
&self,
epoch_state: &EpochState,
payload_manager: Arc<PayloadManager>,
onchain_consensus_config: &OnChainConsensusConfig,
onchain_execution_config: &OnChainExecutionConfig,
commit_signer_provider: Arc<dyn CommitSignerProvider>,
) -> Arc<dyn StateComputer> {
let transaction_shuffler =
create_transaction_shuffler(onchain_execution_config.transaction_shuffler_type());
let block_gas_limit = onchain_execution_config.block_gas_limit();
let transaction_deduper =
create_transaction_deduper(onchain_execution_config.transaction_deduper_type());
self.commit_state_computer.new_epoch(
&epoch_state,
payload_manager.clone(),
epoch_state,
payload_manager,
transaction_shuffler,
block_gas_limit,
transaction_deduper,
);
let state_computer = if onchain_consensus_config.decoupled_execution() {

let state_computer =
if onchain_consensus_config.decoupled_execution() {
Arc::new(self.spawn_decoupled_execution(
safety_rules_container.clone(),
commit_signer_provider,
epoch_state.verifier.clone(),
))
} else {
self.commit_state_computer.clone()
};

state_computer
}

fn set_epoch_start_metrics(&self, epoch_state: &EpochState) {
counters::EPOCH.set(epoch_state.epoch as i64);
counters::CURRENT_EPOCH_VALIDATORS.set(epoch_state.verifier.len() as i64);

counters::TOTAL_VOTING_POWER.set(epoch_state.verifier.total_voting_power() as f64);
counters::VALIDATOR_VOTING_POWER.set(
epoch_state
.verifier
.get_voting_power(&self.author)
.unwrap_or(0) as f64,
);
epoch_state
.verifier
.get_ordered_account_addresses_iter()
.for_each(|peer_id| {
counters::ALL_VALIDATORS_VOTING_POWER
.with_label_values(&[&peer_id.to_string()])
.set(epoch_state.verifier.get_voting_power(&peer_id).unwrap_or(0) as i64)
});
}

async fn start_round_manager(
&mut self,
recovery_data: RecoveryData,
epoch_state: EpochState,
onchain_consensus_config: OnChainConsensusConfig,
onchain_execution_config: OnChainExecutionConfig,
) {
let epoch = epoch_state.epoch;
info!(
epoch = epoch_state.epoch,
validators = epoch_state.verifier.to_string(),
root_block = %recovery_data.root_block(),
"Starting new epoch",
);

info!(epoch = epoch, "Update SafetyRules");

let mut safety_rules =
MetricsSafetyRules::new(self.safety_rules_manager.client(), self.storage.clone());
if let Err(error) = safety_rules.perform_initialize() {
error!(
epoch = epoch,
error = error,
"Unable to initialize safety rules.",
);
}

info!(epoch = epoch, "Create RoundState");
let round_state =
self.create_round_state(self.time_service.clone(), self.timeout_sender.clone());

info!(epoch = epoch, "Create ProposerElection");
let proposer_election =
self.create_proposer_election(&epoch_state, &onchain_consensus_config);
let network_sender = self.init_network_sender(&epoch_state);
let chain_health_backoff_config =
ChainHealthBackoffConfig::new(self.config.chain_health_backoff.clone());
let pipeline_backpressure_config =
PipelineBackpressureConfig::new(self.config.pipeline_backpressure.clone());

let safety_rules_container = Arc::new(Mutex::new(safety_rules));

let (payload_manager, payload_client, quorum_store_builder) = self
.init_payload_provider(&epoch_state, network_sender.clone())
.await;
let state_computer = self.init_state_computer(
&epoch_state,
payload_manager.clone(),
&onchain_consensus_config,
&onchain_execution_config,
safety_rules_container.clone(),
);

info!(epoch = epoch, "Create BlockStore");
// Read the last vote, before "moving" `recovery_data`
let last_vote = recovery_data.last_vote();
let block_store = Arc::new(BlockStore::new(
Arc::clone(&self.storage),
recovery_data,
Expand Down Expand Up @@ -755,21 +803,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {

self.round_manager_tx = Some(round_manager_tx.clone());

counters::TOTAL_VOTING_POWER.set(epoch_state.verifier.total_voting_power() as f64);
counters::VALIDATOR_VOTING_POWER.set(
epoch_state
.verifier
.get_voting_power(&self.author)
.unwrap_or(0) as f64,
);
epoch_state
.verifier
.get_ordered_account_addresses_iter()
.for_each(|peer_id| {
counters::ALL_VALIDATORS_VOTING_POWER
.with_label_values(&[&peer_id.to_string()])
.set(epoch_state.verifier.get_voting_power(&peer_id).unwrap_or(0) as i64)
});
self.set_epoch_start_metrics(&epoch_state);

let mut round_manager = RoundManager::new(
epoch_state,
Expand All @@ -794,6 +828,15 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
self.spawn_block_retrieval_task(epoch, block_store);
}

fn init_network_sender(&mut self, epoch_state: &EpochState) -> NetworkSender {
NetworkSender::new(
self.author,
self.network_sender.clone(),
self.self_sender.clone(),
epoch_state.verifier.clone(),
)
}

async fn start_new_epoch(&mut self, payload: OnChainConfigPayload<P>) {
let validator_set: ValidatorSet = payload
.get()
Expand All @@ -820,6 +863,17 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
let consensus_config = onchain_consensus_config.unwrap_or_default();
let execution_config = onchain_execution_config
.unwrap_or_else(|_| OnChainExecutionConfig::default_if_missing());
self.start_new_epoch_with_joltean(epoch_state, consensus_config, execution_config)
.await

async fn start_new_epoch_with_joltean(
&mut self,
epoch_state: EpochState,
consensus_config: OnChainConsensusConfig,
execution_config: OnChainExecutionConfig,
) {
match self.storage.start() {
LivenessStorageData::FullRecoveryData(initial_data) => {
self.quorum_store_enabled = self.enable_quorum_store(&consensus_config);
self.recovery_mode = false;
self.start_round_manager(
Expand Down

0 comments on commit 1952707

Please sign in to comment.