Skip to content

Commit

Permalink
Merge pull request #1097 from jbesraa/2024-08-13-refactor-poolsv2
Browse files Browse the repository at this point in the history
Move`PoolSv2` lib code out of `main.rs`
  • Loading branch information
plebhash authored Aug 30, 2024
2 parents fd71db1 + 14691d5 commit 1848fc6
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 98 deletions.
76 changes: 76 additions & 0 deletions roles/pool/src/lib/mining_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,15 @@ pub struct CoinbaseOutput {
output_script_value: String,
}

impl CoinbaseOutput {
pub fn new(output_script_type: String, output_script_value: String) -> Self {
Self {
output_script_type,
output_script_value,
}
}
}

impl TryFrom<&CoinbaseOutput> for CoinbaseOutput_ {
type Error = Error;

Expand Down Expand Up @@ -96,6 +105,73 @@ pub struct Configuration {
pub test_only_listen_adress_plain: String,
}

pub struct TemplateProviderConfig {
address: String,
authority_public_key: Option<Secp256k1PublicKey>,
}

impl TemplateProviderConfig {
pub fn new(address: String, authority_public_key: Option<Secp256k1PublicKey>) -> Self {
Self {
address,
authority_public_key,
}
}
}

pub struct AuthorityConfig {
pub public_key: Secp256k1PublicKey,
pub secret_key: Secp256k1SecretKey,
}

impl AuthorityConfig {
pub fn new(public_key: Secp256k1PublicKey, secret_key: Secp256k1SecretKey) -> Self {
Self {
public_key,
secret_key,
}
}
}

pub struct ConnectionConfig {
listen_address: String,
cert_validity_sec: u64,
signature: String,
}

impl ConnectionConfig {
pub fn new(listen_address: String, cert_validity_sec: u64, signature: String) -> Self {
Self {
listen_address,
cert_validity_sec,
signature,
}
}
}

impl Configuration {
pub fn new(
pool_connection: ConnectionConfig,
template_provider: TemplateProviderConfig,
authority_config: AuthorityConfig,
coinbase_outputs: Vec<CoinbaseOutput>,
#[cfg(feature = "test_only_allow_unencrypted")] test_only_listen_adress_plain: String,
) -> Self {
Self {
listen_address: pool_connection.listen_address,
tp_address: template_provider.address,
tp_authority_public_key: template_provider.authority_public_key,
authority_public_key: authority_config.public_key,
authority_secret_key: authority_config.secret_key,
cert_validity_sec: pool_connection.cert_validity_sec,
coinbase_outputs,
pool_signature: pool_connection.signature,
#[cfg(feature = "test_only_allow_unencrypted")]
test_only_listen_adress_plain,
}
}
}

#[derive(Debug)]
pub struct Downstream {
// Either group or channel id
Expand Down
108 changes: 108 additions & 0 deletions roles/pool/src/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,111 @@ pub mod error;
pub mod mining_pool;
pub mod status;
pub mod template_receiver;

use async_channel::{bounded, unbounded};

use mining_pool::{get_coinbase_output, Configuration, Pool};
use template_receiver::TemplateRx;
use tracing::{error, info, warn};

use tokio::select;

pub struct PoolSv2 {
config: Configuration,
}

impl PoolSv2 {
pub fn new(config: Configuration) -> PoolSv2 {
PoolSv2 { config }
}
pub async fn start(self) {
let config = self.config.clone();
let (status_tx, status_rx) = unbounded();
let (s_new_t, r_new_t) = bounded(10);
let (s_prev_hash, r_prev_hash) = bounded(10);
let (s_solution, r_solution) = bounded(10);
let (s_message_recv_signal, r_message_recv_signal) = bounded(10);
let coinbase_output_result = get_coinbase_output(&config);
let coinbase_output_len = match coinbase_output_result {
Ok(coinbase_output) => coinbase_output.len() as u32,
Err(err) => {
error!("Failed to get Coinbase output: {:?}", err);
return;
}
};
let tp_authority_public_key = config.tp_authority_public_key;
let template_rx_res = TemplateRx::connect(
config.tp_address.parse().unwrap(),
s_new_t,
s_prev_hash,
r_solution,
r_message_recv_signal,
status::Sender::Upstream(status_tx.clone()),
coinbase_output_len,
tp_authority_public_key,
)
.await;

if let Err(e) = template_rx_res {
error!("Could not connect to Template Provider: {}", e);
return;
}

let pool = Pool::start(
config.clone(),
r_new_t,
r_prev_hash,
s_solution,
s_message_recv_signal,
status::Sender::DownstreamListener(status_tx),
);

// Start the error handling loop
// See `./status.rs` and `utils/error_handling` for information on how this operates
loop {
let task_status = select! {
task_status = status_rx.recv() => task_status,
interrupt_signal = tokio::signal::ctrl_c() => {
match interrupt_signal {
Ok(()) => {
info!("Interrupt received");
},
Err(err) => {
error!("Unable to listen for interrupt signal: {}", err);
// we also shut down in case of error
},
}
break;
}
};
let task_status: status::Status = task_status.unwrap();

match task_status.state {
// Should only be sent by the downstream listener
status::State::DownstreamShutdown(err) => {
error!(
"SHUTDOWN from Downstream: {}\nTry to restart the downstream listener",
err
);
break;
}
status::State::TemplateProviderShutdown(err) => {
error!("SHUTDOWN from Upstream: {}\nTry to reconnecting or connecting to a new upstream", err);
break;
}
status::State::Healthy(msg) => {
info!("HEALTHY message: {}", msg);
}
status::State::DownstreamInstanceDropped(downstream_id) => {
warn!("Dropping downstream instance {} from pool", downstream_id);
if pool
.safe_lock(|p| p.remove_downstream(downstream_id))
.is_err()
{
break;
}
}
}
}
}
}
101 changes: 3 additions & 98 deletions roles/pool/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,9 @@
#![allow(special_module_name)]
use async_channel::{bounded, unbounded};

use tracing::{error, info, warn};
mod lib;
use lib::{
mining_pool::{get_coinbase_output, Configuration, Pool},
status,
template_receiver::TemplateRx,
};

use ext_config::{Config, File, FileFormat};
use tokio::select;
pub use lib::{mining_pool::Configuration, status, PoolSv2};
use tracing::error;

mod args {
use std::path::PathBuf;
Expand Down Expand Up @@ -106,93 +99,5 @@ async fn main() {
return;
}
};

let (status_tx, status_rx) = unbounded();
let (s_new_t, r_new_t) = bounded(10);
let (s_prev_hash, r_prev_hash) = bounded(10);
let (s_solution, r_solution) = bounded(10);
let (s_message_recv_signal, r_message_recv_signal) = bounded(10);
info!("Pool INITIALIZING with config: {:?}", &args.config_path);
let coinbase_output_result = get_coinbase_output(&config);
let coinbase_output_len = match coinbase_output_result {
Ok(coinbase_output) => coinbase_output.len() as u32,
Err(err) => {
error!("Failed to get coinbase output: {:?}", err);
return;
}
};
let tp_authority_public_key = config.tp_authority_public_key;
let template_rx_res = TemplateRx::connect(
config.tp_address.parse().unwrap(),
s_new_t,
s_prev_hash,
r_solution,
r_message_recv_signal,
status::Sender::Upstream(status_tx.clone()),
coinbase_output_len,
tp_authority_public_key,
)
.await;

if let Err(e) = template_rx_res {
error!("Could not connect to Template Provider: {}", e);
return;
}

let pool = Pool::start(
config.clone(),
r_new_t,
r_prev_hash,
s_solution,
s_message_recv_signal,
status::Sender::DownstreamListener(status_tx),
);

// Start the error handling loop
// See `./status.rs` and `utils/error_handling` for information on how this operates
loop {
let task_status = select! {
task_status = status_rx.recv() => task_status,
interrupt_signal = tokio::signal::ctrl_c() => {
match interrupt_signal {
Ok(()) => {
info!("Interrupt received");
},
Err(err) => {
error!("Unable to listen for interrupt signal: {}", err);
// we also shut down in case of error
},
}
break;
}
};
let task_status: status::Status = task_status.unwrap();

match task_status.state {
// Should only be sent by the downstream listener
status::State::DownstreamShutdown(err) => {
error!(
"SHUTDOWN from Downstream: {}\nTry to restart the downstream listener",
err
);
break;
}
status::State::TemplateProviderShutdown(err) => {
error!("SHUTDOWN from Upstream: {}\nTry to reconnecting or connecting to a new upstream", err);
break;
}
status::State::Healthy(msg) => {
info!("HEALTHY message: {}", msg);
}
status::State::DownstreamInstanceDropped(downstream_id) => {
warn!("Dropping downstream instance {} from pool", downstream_id);
if pool
.safe_lock(|p| p.remove_downstream(downstream_id))
.is_err()
{
break;
}
}
}
}
PoolSv2::new(config).start().await;
}

0 comments on commit 1848fc6

Please sign in to comment.