Skip to content
This repository has been archived by the owner on Sep 21, 2024. It is now read-only.

Commit

Permalink
chore: Cleanup some older DHT code and docs to simplify interfaces an…
Browse files Browse the repository at this point in the history
…d tests. (#139)
  • Loading branch information
jsantell authored Nov 11, 2022
1 parent 42f612b commit abf9cf6
Show file tree
Hide file tree
Showing 12 changed files with 186 additions and 207 deletions.
9 changes: 6 additions & 3 deletions rust/noosphere-ns/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ use noosphere_storage::{db::SphereDb, interface::Store};
use std::net::Ipv4Addr;
use ucan_key_support::ed25519::Ed25519KeyMaterial;

#[cfg(doc)]
use libp2p::kad::KademliaConfig;

/// [NameSystemBuilder] is the primary external interface for
/// creating a new [NameSystem]. `key_material` and `store`
/// must be provided.
Expand Down Expand Up @@ -89,7 +92,7 @@ where

/// How long, in seconds, published records are replicated to
/// peers. Should be significantly shorter than `record_ttl`.
/// See [KademliaConfig::set_publication_interval](https://docs.rs/libp2p/latest/libp2p/kad/struct.KademliaConfig.html#method.set_publication_interval).
/// See [KademliaConfig::set_publication_interval] and [KademliaConfig::set_provider_publication_interval].
pub fn publication_interval(mut self, interval: u32) -> Self {
self.dht_config.publication_interval = interval;
self
Expand All @@ -103,15 +106,15 @@ where

/// How long, in seconds, records remain valid for. Should be significantly
/// longer than `publication_interval`.
/// See [KademliaConfig::set_record_ttl](https://docs.rs/libp2p/latest/libp2p/kad/struct.KademliaConfig.html#method.set_record_ttl).
/// See [KademliaConfig::set_record_ttl] and [KademliaConfig::set_provider_record_ttl].
pub fn record_ttl(mut self, interval: u32) -> Self {
self.dht_config.record_ttl = interval;
self
}

/// How long, in seconds, stored records are replicated to
/// peers. Should be significantly shorter than `publication_interval`.
/// See [KademliaConfig::set_replication_interval](https://docs.rs/libp2p/latest/libp2p/kad/struct.KademliaConfig.html#method.set_replication_interval).
/// See [KademliaConfig::set_replication_interval].
pub fn replication_interval(mut self, interval: u32) -> Self {
self.dht_config.replication_interval = interval;
self
Expand Down
9 changes: 6 additions & 3 deletions rust/noosphere-ns/src/dht/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use libp2p::Multiaddr;

#[cfg(doc)]
use libp2p::kad::KademliaConfig;

#[derive(Clone, Debug)]
pub struct DHTConfig {
/// If bootstrap peers are provided, how often,
Expand All @@ -16,19 +19,19 @@ pub struct DHTConfig {
pub peer_dialing_interval: u64,
/// How long, in seconds, published records are replicated to
/// peers. Should be significantly shorter than `record_ttl`.
/// See [KademliaConfig::set_publication_interval](https://docs.rs/libp2p/latest/libp2p/kad/struct.KademliaConfig.html#method.set_publication_interval).
/// See [KademliaConfig::set_publication_interval] and [KademliaConfig::set_provider_publication_interval].
pub publication_interval: u32,
/// How long, in seconds, until an unsuccessful
/// DHT query times out.
pub query_timeout: u32,
/// How long, in seconds, stored records are replicated to
/// peers. Should be significantly shorter than `publication_interval`.
/// See [KademliaConfig::set_replication_interval](https://docs.rs/libp2p/latest/libp2p/kad/struct.KademliaConfig.html#method.set_replication_interval).
/// See [KademliaConfig::set_replication_interval].
/// Only applies to value records.
pub replication_interval: u32,
/// How long, in seconds, records remain valid for. Should be significantly
/// longer than `publication_interval`.
/// See [KademliaConfig::set_record_ttl](https://docs.rs/libp2p/latest/libp2p/kad/struct.KademliaConfig.html#method.set_record_ttl).
/// See [KademliaConfig::set_record_ttl] and [KademliaConfig::set_provider_record_ttl].
pub record_ttl: u32,
}

Expand Down
2 changes: 1 addition & 1 deletion rust/noosphere-ns/src/dht/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ mod validator;
pub use config::DHTConfig;
pub use errors::DHTError;
pub use node::{DHTNode, DHTStatus};
pub use types::DHTNetworkInfo;
pub use types::{DHTNetworkInfo, DHTRecord};
pub use validator::{DefaultRecordValidator, RecordValidator};
37 changes: 20 additions & 17 deletions rust/noosphere-ns/src/dht/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::dht::{
channel::message_channel,
errors::DHTError,
processor::DHTProcessor,
types::{DHTMessageClient, DHTNetworkInfo, DHTRequest, DHTResponse},
types::{DHTMessageClient, DHTNetworkInfo, DHTRecord, DHTRequest, DHTResponse},
utils::key_material_to_libp2p_keypair,
DHTConfig, DefaultRecordValidator, RecordValidator,
};
Expand Down Expand Up @@ -204,41 +204,44 @@ where
ensure_response!(response, DHTResponse::GetNetworkInfo(info) => Ok(info))
}

/// Sets the record keyed by `name` with `value` and propagates
/// Sets the record keyed by `key` with `value` and propagates
/// to peers.
/// Fails if node is not in an active state or cannot set the record
/// on any peers.
pub async fn set_record(&self, name: Vec<u8>, value: Vec<u8>) -> Result<Vec<u8>, DHTError> {
let request = DHTRequest::SetRecord { name, value };
pub async fn put_record(&self, key: &[u8], value: &[u8]) -> Result<Vec<u8>, DHTError> {
let request = DHTRequest::PutRecord {
key: key.to_vec(),
value: value.to_vec(),
};
let response = self.send_request(request).await?;
ensure_response!(response, DHTResponse::SetRecord { name } => Ok(name))
ensure_response!(response, DHTResponse::PutRecord { key } => Ok(key))
}

/// Fetches the record keyed by `name` from the network.
/// Fetches the record keyed by `key` from the network.
/// Return value may be `Ok(None)` if query finished without finding
/// any matching values.
/// Fails if node is not in an active state.
pub async fn get_record(&self, name: Vec<u8>) -> Result<(Vec<u8>, Option<Vec<u8>>), DHTError> {
let request = DHTRequest::GetRecord { name };
pub async fn get_record(&self, key: &[u8]) -> Result<DHTRecord, DHTError> {
let request = DHTRequest::GetRecord { key: key.to_vec() };
let response = self.send_request(request).await?;
ensure_response!(response, DHTResponse::GetRecord { name, value, .. } => Ok((name, value)))
ensure_response!(response, DHTResponse::GetRecord(record) => Ok(record))
}

/// Instructs the node to tell its peers that it is providing
/// the record for `name`.
/// the record for `key`.
/// Fails if node is not in an active state.
pub async fn start_providing(&self, name: Vec<u8>) -> Result<(), DHTError> {
let request = DHTRequest::StartProviding { name };
pub async fn start_providing(&self, key: &[u8]) -> Result<(), DHTError> {
let request = DHTRequest::StartProviding { key: key.to_vec() };
let response = self.send_request(request).await?;
ensure_response!(response, DHTResponse::StartProviding { name: _ } => Ok(()))
ensure_response!(response, DHTResponse::StartProviding { key: _ } => Ok(()))
}

/// Queries the network to find peers that are providing `name`.
/// Queries the network to find peers that are providing `key`.
/// Fails if node is not in an active state.
pub async fn get_providers(&self, name: Vec<u8>) -> Result<Vec<libp2p::PeerId>, DHTError> {
let request = DHTRequest::GetProviders { name };
pub async fn get_providers(&self, key: &[u8]) -> Result<Vec<libp2p::PeerId>, DHTError> {
let request = DHTRequest::GetProviders { key: key.to_vec() };
let response = self.send_request(request).await?;
ensure_response!(response, DHTResponse::GetProviders { providers, name: _ } => Ok(providers))
ensure_response!(response, DHTResponse::GetProviders { providers, key: _ } => Ok(providers))
}

async fn send_request(&self, request: DHTRequest) -> Result<DHTResponse, DHTError> {
Expand Down
39 changes: 18 additions & 21 deletions rust/noosphere-ns/src/dht/processor.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::dht::{
errors::DHTError,
swarm::{build_swarm, DHTEvent, DHTSwarm, DHTSwarmEvent},
types::{DHTMessage, DHTMessageProcessor, DHTRequest, DHTResponse},
types::{DHTMessage, DHTMessageProcessor, DHTRecord, DHTRequest, DHTResponse},
DHTConfig, RecordValidator,
};
use libp2p::{
Expand Down Expand Up @@ -155,12 +155,12 @@ where

// Process client requests.
match message.request {
DHTRequest::GetProviders { ref name } => {
DHTRequest::GetProviders { ref key } => {
store_request!(
self,
message,
Ok::<kad::QueryId, DHTError>(
self.swarm.behaviour_mut().kad.get_providers(Key::new(name))
self.swarm.behaviour_mut().kad.get_providers(Key::new(key))
)
);
}
Expand All @@ -181,36 +181,33 @@ where
let info = self.swarm.network_info();
message.respond(Ok(DHTResponse::GetNetworkInfo(info.into())));
}
DHTRequest::StartProviding { ref name } => {
DHTRequest::StartProviding { ref key } => {
store_request!(
self,
message,
self.swarm
.behaviour_mut()
.kad
.start_providing(Key::new(name))
.start_providing(Key::new(key))
);
}
DHTRequest::GetRecord { ref name } => {
DHTRequest::GetRecord { ref key } => {
store_request!(
self,
message,
Ok::<kad::QueryId, DHTError>(
self.swarm
.behaviour_mut()
.kad
.get_record(Key::new(name), Quorum::One)
.get_record(Key::new(key), Quorum::One)
)
);
}
DHTRequest::SetRecord {
ref name,
ref value,
} => {
DHTRequest::PutRecord { ref key, ref value } => {
let value_owned = value.to_owned();
if self.validate(value).await {
let record = Record {
key: Key::new(name),
key: Key::new(key),
value: value_owned,
publisher: None,
expires: None,
Expand Down Expand Up @@ -291,10 +288,10 @@ where
// We don't want to propagate validation errors for all
// possible invalid records, but handle it similarly as if
// no record at all was found.
message.respond(Ok(DHTResponse::GetRecord {
name: key.to_vec(),
message.respond(Ok(DHTResponse::GetRecord(DHTRecord {
key: key.to_vec(),
value: if is_valid { Some(value) } else { None },
}));
})));
};
}
}
Expand All @@ -304,18 +301,18 @@ where
kad::GetRecordError::NotFound { key, .. } => {
// Not finding a record is not an `Err` response,
// but simply a successful query with a `None` result.
message.respond(Ok(DHTResponse::GetRecord {
name: key.to_vec(),
message.respond(Ok(DHTResponse::GetRecord(DHTRecord {
key: key.to_vec(),
value: None,
}))
})))
}
e => message.respond(Err(DHTError::from(e))),
};
}
}
QueryResult::PutRecord(Ok(kad::PutRecordOk { key })) => {
if let Some(message) = self.requests.remove(&id) {
message.respond(Ok(DHTResponse::SetRecord { name: key.to_vec() }));
message.respond(Ok(DHTResponse::PutRecord { key: key.to_vec() }));
}
}
QueryResult::PutRecord(Err(e)) => {
Expand All @@ -340,7 +337,7 @@ where
}
QueryResult::StartProviding(Ok(kad::AddProviderOk { key })) => {
if let Some(message) = self.requests.remove(&id) {
message.respond(Ok(DHTResponse::StartProviding { name: key.to_vec() }));
message.respond(Ok(DHTResponse::StartProviding { key: key.to_vec() }));
}
}
QueryResult::StartProviding(Err(e)) => {
Expand All @@ -356,7 +353,7 @@ where
if let Some(message) = self.requests.remove(&id) {
message.respond(Ok(DHTResponse::GetProviders {
providers: providers.into_iter().collect(),
name: key.to_vec(),
key: key.to_vec(),
}));
}
}
Expand Down
Loading

0 comments on commit abf9cf6

Please sign in to comment.