Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
multithreaded pow mining worker
Browse files Browse the repository at this point in the history
  • Loading branch information
Wizdave97 committed Sep 5, 2021
1 parent 17ce41a commit cdb68cd
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 232 deletions.
12 changes: 8 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions client/consensus/pow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,6 @@ parking_lot = "0.11.1"
derive_more = "0.99.2"
prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../../utils/prometheus", version = "0.9.0"}
async-trait = "0.1.50"
tokio = { version = "1.10.1", features = ["sync"] }
tokio-stream = { version = "0.1.7", features = ['sync'] }
futures-core = "0.3.16"
193 changes: 145 additions & 48 deletions client/consensus/pow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,24 +41,24 @@
mod worker;

pub use crate::worker::{MiningBuild, MiningMetadata, MiningWorker};
pub use crate::worker::{MiningBuild, MiningData, MiningMetadata, MiningDataStream};

use crate::worker::UntilImportedOrTimeout;
use codec::{Decode, Encode};
use futures::{Future, StreamExt};
use futures::{Future, StreamExt, future};
use log::*;
use parking_lot::Mutex;
use prometheus_endpoint::Registry;
use sc_client_api::{self, backend::AuxStore, BlockOf, BlockchainEvents};
use sc_consensus::{
BasicQueue, BlockCheckParams, BlockImport, BlockImportParams, BoxBlockImport,
BoxJustificationImport, ForkChoiceStrategy, ImportResult, Verifier,
BoxJustificationImport, ForkChoiceStrategy, ImportResult, StateAction, StorageChanges,
Verifier,
};
use sp_api::ProvideRuntimeApi;
use sp_block_builder::BlockBuilder as BlockBuilderApi;
use sp_blockchain::{well_known_cache_keys::Id as CacheKeyId, HeaderBackend, ProvideCache};
use sp_consensus::{
CanAuthorWith, Environment, Error as ConsensusError, Proposer, SelectChain, SyncOracle,
BlockOrigin, CanAuthorWith, Environment, Error as ConsensusError, Proposer, SelectChain,
SyncOracle,
};
use sp_consensus_pow::{Seal, TotalDifficulty, POW_ENGINE_ID};
use sp_core::ExecutionContext;
Expand All @@ -72,6 +72,7 @@ use std::{
borrow::Cow, cmp::Ordering, collections::HashMap, marker::PhantomData, sync::Arc,
time::Duration,
};
use tokio_stream::wrappers::ReceiverStream;

#[derive(derive_more::Display, Debug)]
pub enum Error<B: BlockT> {
Expand Down Expand Up @@ -502,6 +503,7 @@ where
Ok(BasicQueue::new(verifier, block_import, justification_import, spawner, registry))
}

type SealStream = ReceiverStream<Seal>;
/// Start the mining worker for PoW. This function provides the necessary helper functions that can
/// be used to implement a miner. However, it does not do the CPU-intensive mining itself.
///
Expand All @@ -511,55 +513,72 @@ where
///
/// `pre_runtime` is a parameter that allows a custom additional pre-runtime digest to be inserted
/// for blocks being built. This can encode authorship information, or just be a graffiti.
pub fn start_mining_worker<Block, C, S, Algorithm, E, SO, L, CIDP, CAW>(
block_import: BoxBlockImport<Block, sp_api::TransactionFor<C, Block>>,
pub fn start_mining_worker<B, C, S, A, E, SO, L, CIDP, CAW>(
mut block_import: BoxBlockImport<B, sp_api::TransactionFor<C, B>>,
client: Arc<C>,
select_chain: S,
algorithm: Algorithm,
algorithm: A,
mut env: E,
mut sync_oracle: SO,
justification_sync_link: L,
mut justification_sync_link: L,
pre_runtime: Option<Vec<u8>>,
create_inherent_data_providers: CIDP,
timeout: Duration,
build_time: Duration,
can_author_with: CAW,
) -> (
Arc<Mutex<MiningWorker<Block, Algorithm, C, L, <E::Proposer as Proposer<Block>>::Proof>>>,
impl Future<Output = ()>,
)
) -> (MiningDataStream<<B as BlockT>::Hash, A::Difficulty>, impl Future<Output = ()>)
where
Block: BlockT,
C: ProvideRuntimeApi<Block> + BlockchainEvents<Block> + 'static,
S: SelectChain<Block> + 'static,
Algorithm: PowAlgorithm<Block> + Clone,
Algorithm::Difficulty: Send + 'static,
E: Environment<Block> + Send + Sync + 'static,
B: BlockT,
B::Hash: Unpin,
C: ProvideRuntimeApi<B> + BlockchainEvents<B> + 'static,
S: SelectChain<B> + 'static,
A: PowAlgorithm<B> + Clone,
A::Difficulty: Send + Sync + Unpin + 'static,
E: Environment<B> + Send + Sync + 'static,
E::Error: std::fmt::Debug,
E::Proposer: Proposer<Block, Transaction = sp_api::TransactionFor<C, Block>>,
E::Proposer: Proposer<B, Transaction = sp_api::TransactionFor<C, B>>,
SO: SyncOracle + Clone + Send + Sync + 'static,
L: sc_consensus::JustificationSyncLink<Block>,
CIDP: CreateInherentDataProviders<Block, ()>,
CAW: CanAuthorWith<Block> + Clone + Send + 'static,
L: sc_consensus::JustificationSyncLink<B>,
CIDP: CreateInherentDataProviders<B, ()>,
CAW: CanAuthorWith<B> + Clone + Send + 'static,
{
let mut timer = UntilImportedOrTimeout::new(client.import_notification_stream(), timeout);
let worker = Arc::new(Mutex::new(MiningWorker {
build: None,
algorithm: algorithm.clone(),
block_import,
justification_sync_link,
}));
let worker_ret = worker.clone();
use futures::future::Either;

// Create a spmc channel here
let (producer, mining_data_stream) = MiningDataStream::new();

// Create channel for receiving a seal from the node
let mut seal_channel: Option<SealStream> = None;
let mut import_stream = client.import_notification_stream()
.filter(|block| future::ready(!matches!(block.origin, BlockOrigin::Own)));

let mut build = None;

// authorship
let task = async move {
loop {
if timer.next().await.is_none() {
break
}
if let Some(mut channel) = seal_channel.take() {
let result = futures::future::select(channel.next(), import_stream.next()).await;

match result {
// we only care about this case.
Either::Left((Some(seal), _)) => {
if let Some(mining_build) = build.take() {
do_import_block(
seal,
mining_build,
&algorithm,
&mut block_import,
&mut justification_sync_link,
)
.await
}
}
_ => {}
}
};

if sync_oracle.is_major_syncing() {
debug!(target: "pow", "Skipping proposal due to sync.");
worker.lock().on_major_syncing();
continue
}

Expand Down Expand Up @@ -587,13 +606,6 @@ where
continue
}

if worker.lock().best_hash() == Some(best_hash) {
continue
}

// The worker is locked for the duration of the whole proposing period. Within this
// period, the mining target is outdated and useless anyway.

let difficulty = match algorithm.difficulty(best_hash) {
Ok(x) => x,
Err(err) => {
Expand Down Expand Up @@ -636,7 +648,7 @@ where
},
};

let mut inherent_digest = Digest::<Block::Hash>::default();
let mut inherent_digest = Digest::<B::Hash>::default();
if let Some(pre_runtime) = &pre_runtime {
inherent_digest.push(DigestItem::PreRuntime(POW_ENGINE_ID, pre_runtime.to_vec()));
}
Expand Down Expand Up @@ -672,7 +684,11 @@ where
},
};

let build = MiningBuild::<Block, Algorithm, C, _> {
let (sender, consumer) = tokio::sync::mpsc::channel(10);

seal_channel = Some(ReceiverStream::new(consumer));

let mining_build = MiningBuild::<B, A, C, _> {
metadata: MiningMetadata {
best_hash,
pre_hash: proposal.block.header().hash(),
Expand All @@ -682,11 +698,19 @@ where
proposal,
};

worker.lock().on_build(build);
let result =
producer.send(Some(MiningData { metadata: mining_build.metadata.clone(), sender }));

if result.is_err() {
// Terminate task since all receivers have been dropped and in essence all mining threaded
return
}

build = Some(mining_build);
}
};

(worker_ret, task)
(mining_data_stream, task)
}

/// Find PoW pre-runtime.
Expand Down Expand Up @@ -722,3 +746,76 @@ fn fetch_seal<B: BlockT>(
_ => return Err(Error::<B>::HeaderUnsealed(hash).into()),
}
}

pub async fn do_import_block<B, C, A, P, L>(
seal: Seal,
build: MiningBuild<B, A, C, P>,
algorithm: &A,
block_import: &mut BoxBlockImport<B, sp_api::TransactionFor<C, B>>,
justification_sync_link: &mut L,
) where
B: BlockT,
C: ProvideRuntimeApi<B> + BlockchainEvents<B> + 'static,
A: PowAlgorithm<B> + Clone,
A::Difficulty: Send + 'static,
L: sc_consensus::JustificationSyncLink<B>,
{
match algorithm.verify(
&BlockId::Hash(build.metadata.best_hash),
&build.metadata.pre_hash,
build.metadata.pre_runtime.as_ref().map(|v| &v[..]),
&seal,
build.metadata.difficulty,
) {
Ok(true) => (),
Ok(false) => {
warn!(
target: "pow",
"Unable to import mined block: seal is invalid",
);
}
Err(err) => {
warn!(
target: "pow",
"Unable to import mined block: {:?}",
err,
);
}
}

let seal = DigestItem::Seal(POW_ENGINE_ID, seal);
let (header, body) = build.proposal.block.deconstruct();

let mut import_block = BlockImportParams::new(BlockOrigin::Own, header);
import_block.post_digests.push(seal);
import_block.body = Some(body);
import_block.state_action =
StateAction::ApplyChanges(StorageChanges::Changes(build.proposal.storage_changes));

let intermediate =
PowIntermediate::<A::Difficulty> { difficulty: Some(build.metadata.difficulty) };

import_block
.intermediates
.insert(Cow::from(INTERMEDIATE_KEY), Box::new(intermediate) as Box<_>);

let header = import_block.post_header();
match block_import.import_block(import_block, HashMap::default()).await {
Ok(res) => {
res.handle_justification(&header.hash(), *header.number(), justification_sync_link);

info!(
target: "pow",
"✅ Successfully mined block on top of: {}",
build.metadata.best_hash
);
}
Err(err) => {
warn!(
target: "pow",
"Unable to import mined block: {:?}",
err,
);
}
}
}
Loading

0 comments on commit cdb68cd

Please sign in to comment.