Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add --throttle arg to throttle mining, and add --altlogs for simpler timestamp logs #23

Merged
merged 1 commit into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
340 changes: 263 additions & 77 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ arc-swap = "1.6.0"
keccak = { version = "0.1", optional = true }
parking = { package = "parking_lot", version = "0.12", optional = true }
shuttle = { version = "0.6", optional = true }
chrono = "0.4"

[features]
default = ["keccak?/asm"]
Expand Down
8 changes: 7 additions & 1 deletion src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ pub struct Opt {
#[clap(long = "mine-when-not-synced", display_order = 8)]
/// Mine even when kaspad says it is not synced, only useful when passing `--allow-submit-block-when-not-synced` to kaspad [default: false]
pub mine_when_not_synced: bool,
#[clap(long = "throttle", display_order = 9)]
/// Throttle (milliseconds) between each pow hash generation (used for development testing)
pub throttle: Option<u64>,
#[clap(long, display_order = 10)]
/// Output logs in alternative format (same as kaspad)
pub altlogs: bool,
}

fn parse_devfund_percent(s: &str) -> Result<u16, &'static str> {
Expand Down Expand Up @@ -74,7 +80,7 @@ impl Opt {
let port = self.port();
self.kaspad_address = format!("grpc://{}:{}", self.kaspad_address, port);
}
log::info!("kaspad address: {}", self.kaspad_address);
log::info!("Kaspad address: {}", self.kaspad_address);

Ok(())
}
Expand Down
16 changes: 10 additions & 6 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use crate::proto::kaspad_message::Payload;
use crate::proto::rpc_client::RpcClient;
use crate::proto::{GetBlockTemplateRequestMessage, GetInfoRequestMessage, KaspadMessage};
use crate::{miner::MinerManager, Error, ShutdownHandler};
use crate::{
miner::MinerManager,
proto::{
kaspad_message::Payload, rpc_client::RpcClient, GetBlockTemplateRequestMessage, GetInfoRequestMessage,
KaspadMessage,
},
Error, ShutdownHandler,
};
use log::{error, info, warn};
use tokio::sync::mpsc::{self, error::SendError, Sender};
use tokio_stream::wrappers::ReceiverStream;
Expand Down Expand Up @@ -86,7 +90,7 @@ impl KaspadHandler {
(None, true, None) => error!("No block and No Error!"),
},
Payload::SubmitBlockResponse(res) => match res.error {
None => info!("block submitted successfully!"),
None => info!("Block submitted successfully!"),
Some(e) => warn!("Failed submitting block: {:?}", e),
},
Payload::GetBlockResponse(msg) => {
Expand All @@ -100,7 +104,7 @@ impl KaspadHandler {
None => info!("Registered for block notifications"),
Some(e) => error!("Failed registering for block notifications: {:?}", e),
},
msg => info!("got unknown msg: {:?}", msg),
msg => info!("Got unknown msg: {:?}", msg),
}
Ok(())
}
Expand Down
8 changes: 4 additions & 4 deletions src/kaspad_messages.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::proto::{
kaspad_message::Payload, GetBlockTemplateRequestMessage, GetInfoRequestMessage, KaspadMessage,
NotifyBlockAddedRequestMessage, RpcBlock, SubmitBlockRequestMessage,
};
use crate::{
pow::{self, HeaderHasher},
proto::{
kaspad_message::Payload, GetBlockTemplateRequestMessage, GetInfoRequestMessage, KaspadMessage,
NotifyBlockAddedRequestMessage, RpcBlock, SubmitBlockRequestMessage,
},
Hash,
};

Expand Down
41 changes: 28 additions & 13 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
#![cfg_attr(all(test, feature = "bench"), feature(test))]

use std::error::Error as StdError;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};

use chrono::Local;
use clap::Parser;
use log::{info, warn};
use std::error::Error as StdError;
use std::{
io::Write,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};

use crate::cli::Opt;
use crate::client::KaspadHandler;
use crate::miner::MinerManager;
use crate::proto::NotifyBlockAddedRequestMessage;
use crate::target::Uint256;
use crate::{
cli::Opt, client::KaspadHandler, miner::MinerManager, proto::NotifyBlockAddedRequestMessage, target::Uint256,
};

mod cli;
mod client;
Expand Down Expand Up @@ -59,7 +61,18 @@ impl Drop for ShutdownOnDrop {
async fn main() -> Result<(), Error> {
let mut opt: Opt = Opt::parse();
opt.process()?;
env_logger::builder().filter_level(opt.log_level()).parse_default_env().init();

let mut builder = env_logger::builder();
builder.filter_level(opt.log_level()).parse_default_env();
if opt.altlogs {
builder.format(|buf, record| {
let timestamp = Local::now().format("%Y-%m-%d %H:%M:%S%.3f%:z");
writeln!(buf, "{} [{:>5}] {}", timestamp, record.level(), record.args())
});
}
builder.init();

let throttle = opt.throttle.map(Duration::from_millis);
let shutdown = ShutdownHandler(Arc::new(AtomicBool::new(false)));
let _shutdown_when_dropped = shutdown.arm();

Expand All @@ -78,7 +91,9 @@ async fn main() -> Result<(), Error> {
}
client.client_send(NotifyBlockAddedRequestMessage {}).await?;
client.client_get_block_template().await?;
let mut miner_manager = MinerManager::new(client.send_channel.clone(), opt.num_threads, shutdown.clone());

let mut miner_manager =
MinerManager::new(client.send_channel.clone(), opt.num_threads, throttle, shutdown.clone());
client.listen(&mut miner_manager, shutdown.clone()).await?;
warn!("Disconnected from kaspad, retrying");
}
Expand Down
66 changes: 49 additions & 17 deletions src/miner.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
use std::num::Wrapping;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;

use crate::proto::{KaspadMessage, RpcBlock};
use crate::swap_rust::WatchSwap;
use crate::{pow, Error, ShutdownHandler};
use crate::{
pow,
proto::{KaspadMessage, RpcBlock},
swap_rust::WatchSwap,
Error, ShutdownHandler,
};
use log::{info, warn};
use rand::{thread_rng, RngCore};
use tokio::sync::mpsc::Sender;
use tokio::task::{self, JoinHandle};
use tokio::time::MissedTickBehavior;
use std::{
num::Wrapping,
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
Arc,
},
time::Duration,
};
use tokio::{
sync::mpsc::Sender,
task::{self, JoinHandle},
time::MissedTickBehavior,
};

type MinerHandler = std::thread::JoinHandle<Result<(), Error>>;

Expand Down Expand Up @@ -40,12 +48,24 @@ pub fn get_num_cpus(n_cpus: Option<u16>) -> u16 {
const LOG_RATE: Duration = Duration::from_secs(10);

impl MinerManager {
pub fn new(send_channel: Sender<KaspadMessage>, n_cpus: Option<u16>, shutdown: ShutdownHandler) -> Self {
pub fn new(
send_channel: Sender<KaspadMessage>,
n_cpus: Option<u16>,
throttle: Option<Duration>,
shutdown: ShutdownHandler,
) -> Self {
let hashes_tried = Arc::new(AtomicU64::new(0));
let watch = WatchSwap::empty();
let handles =
Self::launch_cpu_threads(send_channel.clone(), Arc::clone(&hashes_tried), watch.clone(), shutdown, n_cpus)
.collect();
let handles = Self::launch_cpu_threads(
send_channel.clone(),
hashes_tried.clone(),
watch.clone(),
shutdown,
n_cpus,
throttle,
)
.collect();

Self {
handles,
block_channel: watch,
Expand All @@ -63,11 +83,18 @@ impl MinerManager {
work_channel: WatchSwap<pow::State>,
shutdown: ShutdownHandler,
n_cpus: Option<u16>,
throttle: Option<Duration>,
) -> impl Iterator<Item = MinerHandler> {
let n_cpus = get_num_cpus(n_cpus);
info!("launching: {} cpu miners", n_cpus);
info!("Launching: {} cpu miners", n_cpus);
(0..n_cpus).map(move |_| {
Self::launch_cpu_miner(send_channel.clone(), work_channel.clone(), hashes_tried.clone(), shutdown.clone())
Self::launch_cpu_miner(
send_channel.clone(),
work_channel.clone(),
hashes_tried.clone(),
throttle,
shutdown.clone(),
)
})
}

Expand All @@ -94,6 +121,7 @@ impl MinerManager {
send_channel: Sender<KaspadMessage>,
mut block_channel: WatchSwap<pow::State>,
hashes_tried: Arc<AtomicU64>,
throttle: Option<Duration>,
shutdown: ShutdownHandler,
) -> MinerHandler {
// We mark it cold as the function is not called often, and it's not in the hot path
Expand Down Expand Up @@ -129,6 +157,10 @@ impl MinerManager {
state = new_state.as_deref().cloned();
}
}

if let Some(sleep_duration) = throttle {
std::thread::sleep(sleep_duration)
}
}
})
}
Expand Down
6 changes: 4 additions & 2 deletions src/pow/heavy_hash.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::pow::{hasher::HeavyHasher, xoshiro::XoShiRo256PlusPlus};
use crate::Hash;
use crate::{
pow::{hasher::HeavyHasher, xoshiro::XoShiRo256PlusPlus},
Hash,
};
use std::mem::MaybeUninit;

#[derive(Debug, Ord, PartialOrd, Eq, PartialEq, Clone)]
Expand Down