Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change 'need_bootstrappers' to have dinamic threshold #143

Merged
merged 1 commit into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Add `BucketConfig::min_peers` in configuration [#135]

### Changed

- Change 'need_bootstrappers' to have dinamically threshold [#135]

## [0.6.0] - 2023-11-01

### Added
Expand Down Expand Up @@ -126,6 +134,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
[#115]: https://github.com/dusk-network/kadcast/issues/115
[#117]: https://github.com/dusk-network/kadcast/issues/117
[#123]: https://github.com/dusk-network/kadcast/issues/123
[#135]: https://github.com/dusk-network/kadcast/issues/135

<!-- Releases -->

Expand Down
18 changes: 18 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ pub const DEFAULT_SEND_RETRY_COUNT: u8 = 3;
pub const DEFAULT_SEND_RETRY_SLEEP_MILLIS: u64 = 5;
pub const DEFAULT_BLOCKLIST_REFRESH_SECS: u64 = 10;

/// Default minimum peers required for network integration without bootstrapping
pub const DEFAULT_MIN_PEERS_FOR_INTEGRATION: usize = 3;

const fn default_min_peers() -> usize {
DEFAULT_MIN_PEERS_FOR_INTEGRATION
}

#[derive(Clone, Serialize, Deserialize)]
pub struct Config {
/// KadcastID
Expand Down Expand Up @@ -122,6 +129,16 @@ pub struct BucketConfig {
/// Default value [BUCKET_DEFAULT_TTL_SECS]
#[serde(with = "humantime_serde")]
pub bucket_ttl: Duration,

/// Minimum number of nodes at which bootstrapping is not required
///
/// This value represents the minimum number of nodes required for a peer
/// to consider bootstrapping unnecessary and seamlessly integrate into
/// the existing network.
///
/// Default value [DEFAULT_MIN_PEERS_FOR_INTEGRATION]
#[serde(default = "default_min_peers")]
pub min_peers: usize,
}

impl Default for BucketConfig {
Expand All @@ -132,6 +149,7 @@ impl Default for BucketConfig {
),
node_ttl: Duration::from_millis(BUCKET_DEFAULT_NODE_TTL_MILLIS),
bucket_ttl: Duration::from_secs(BUCKET_DEFAULT_TTL_SECS),
min_peers: default_min_peers(),
}
}
}
Expand Down
9 changes: 8 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ impl Peer {
};
let nodes = config.bootstrapping_nodes.clone();
let idle_time = config.bucket.bucket_ttl;
let min_peers = config.bucket.min_peers;
MessageHandler::start(
table.clone(),
inbound_channel_rx,
Expand All @@ -118,7 +119,13 @@ impl Peer {
config,
blocklist,
);
TableMaintainer::start(nodes, table, outbound_channel_tx, idle_time);
TableMaintainer::start(
nodes,
table,
outbound_channel_tx,
idle_time,
min_peers,
);
task::spawn(Peer::notifier(listener_channel_rx, listener));
Ok(peer)
}
Expand Down
21 changes: 8 additions & 13 deletions src/maintainer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ impl TableMaintainer {
ktable: RwLock<Tree<PeerInfo>>,
outbound_sender: Sender<MessageBeanOut>,
idle_time: Duration,
min_peers: usize,
) {
tokio::spawn(async move {
let my_ip = *ktable.read().await.root().value().address();
Expand All @@ -42,20 +43,14 @@ impl TableMaintainer {
my_ip,
header,
};
maintainer.monitor_buckets(idle_time).await;
maintainer.monitor_buckets(idle_time, min_peers).await;
});
}

/// Check if the peer need to contact the bootstrappers in order to join the
/// network
async fn need_bootstrappers(&self) -> bool {
let binary_key = self.header.binary_id().as_binary();
self.ktable
.read()
.await
.closest_peers::<10>(binary_key)
.count()
< 3
async fn need_bootstrappers(&self, min_peers: usize) -> bool {
self.ktable.read().await.alive_nodes().count() < min_peers
}

/// Return a vector containing the Socket Addresses bound to the provided
Expand All @@ -74,8 +69,8 @@ impl TableMaintainer {
}

/// Try to contact the bootstrappers node until no needed anymore
async fn contact_bootstrappers(&self) {
while self.need_bootstrappers().await {
async fn contact_bootstrappers(&self, min_peers: usize) {
while self.need_bootstrappers(min_peers).await {
info!("TableMaintainer::contact_bootstrappers");
let bootstrapping_nodes_addr = self.bootstrapping_nodes_addr();
let binary_key = self.header.binary_id().as_binary();
Expand All @@ -98,10 +93,10 @@ impl TableMaintainer {
/// 1. Contact bootstrappers (if needed)
/// 2. Ping idle buckets
/// 3. Remove idles nodes from buckets
async fn monitor_buckets(&self, idle_time: Duration) {
async fn monitor_buckets(&self, idle_time: Duration, min_peers: usize) {
info!("TableMaintainer::monitor_buckets started");
loop {
self.contact_bootstrappers().await;
self.contact_bootstrappers(min_peers).await;
info!("TableMaintainer::monitor_buckets back to sleep");

tokio::time::sleep(idle_time).await;
Expand Down