Skip to content
This repository has been archived by the owner on Sep 21, 2024. It is now read-only.

dev: initial work on NameSystem, wrapping the underlying DHT network. #122

Merged
merged 6 commits into from
Nov 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 84 additions & 120 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion rust/noosphere-core/src/authority/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl TryFrom<String> for SphereAction {
}
}

#[derive(Clone, PartialEq, Eq)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct SphereReference {
pub did: String,
}
Expand Down
2 changes: 1 addition & 1 deletion rust/noosphere-core/src/data/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl DelegationIpld {
}
}

/// See https://github.com/ucan-wg/spec#66-revocation
/// See <https://github.com/ucan-wg/spec#66-revocation>
/// TODO(ucan-wg/spec#112): Verify the form of this
#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize, Hash)]
pub struct RevocationIpld {
Expand Down
16 changes: 11 additions & 5 deletions rust/noosphere-ns/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,23 @@ homepage = "https://github.com/subconsciousnetwork/noosphere"
readme = "README.md"

[dependencies]
anyhow = "^1"
tracing = "0.1"

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
noosphere-core = { version = "0.1.0-alpha.1", path = "../noosphere-core" }
ucan-key-support = { version = "0.7.0-alpha.1" }
anyhow = "^1"
tracing = "0.1"
cid = "~0.8"
serde = "^1"
serde_json = "^1"
futures = "0.3.1"
ucan = { version = "0.7.0-alpha.1" }
ucan-key-support = { version = "0.7.0-alpha.1" }
tokio = { version = "1.15", features = ["io-util", "io-std", "sync", "macros", "rt", "rt-multi-thread"] }
tracing-subscriber = { version = "~0.3", features = ["env-filter"] }
libp2p = { version = "0.49.0", default-features = false, features = [ "identify", "dns", "tcp", "tokio", "noise", "mplex", "yamux", "kad" ] }
noosphere-storage = { version = "0.1.0-alpha.1", path = "../noosphere-storage" }
noosphere-core = { version = "0.1.0-alpha.1", path = "../noosphere-core" }

[dev-dependencies]
rand = { version = "0.8.5" }
test-log = { version = "0.2.11", default-features = false, features = ["trace"] }
tracing-subscriber = { version = "~0.3", features = ["env-filter"] }
libipld-cbor = "~0.14"
202 changes: 202 additions & 0 deletions rust/noosphere-ns/src/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
use crate::{dht::DHTConfig, name_system::NameSystem};
use anyhow::{anyhow, Result};
use libp2p::{self, Multiaddr};
use noosphere_storage::{db::SphereDb, interface::Store};
use std::net::Ipv4Addr;
use ucan_key_support::ed25519::Ed25519KeyMaterial;

/// [NameSystemBuilder] is the primary external interface for
/// creating a new [NameSystem]. `key_material` and `store`
/// must be provided.
///
/// # Examples
///
/// ```
/// use noosphere_core::authority::generate_ed25519_key;
/// use noosphere_storage::{db::SphereDb, memory::{MemoryStore, MemoryStorageProvider}};
/// use noosphere_ns::{NameSystem, NameSystemBuilder};
/// use tokio;
///
/// #[tokio::main]
/// async fn main() {
/// let key_material = generate_ed25519_key();
/// let store = SphereDb::new(&MemoryStorageProvider::default()).await.unwrap();
///
/// let ns = NameSystemBuilder::default()
/// .key_material(&key_material)
/// .store(&store)
/// .listening_port(30000)
/// .build().expect("valid config");
///
/// assert!(NameSystemBuilder::<MemoryStore>::default().build().is_err(), "key_material and store must be provided.");
/// }
/// ```
pub struct NameSystemBuilder<S>
where
S: Store,
{
bootstrap_peers: Option<Vec<Multiaddr>>,
dht_config: DHTConfig,
key_material: Option<Ed25519KeyMaterial>,
store: Option<SphereDb<S>>,
propagation_interval: u64,
}

impl<S> NameSystemBuilder<S>
where
S: Store,
{
/// If bootstrap peers are provided, how often,
/// in seconds, should the bootstrap process execute
/// to keep routing tables fresh.
pub fn bootstrap_interval(mut self, interval: u64) -> Self {
self.dht_config.bootstrap_interval = interval;
self
}

/// Peer addresses to query to update routing tables
/// during bootstrap. A standalone bootstrap node would
/// have this field empty.
pub fn bootstrap_peers(mut self, peers: &Vec<Multiaddr>) -> Self {
self.bootstrap_peers = Some(peers.to_owned());
self
}

/// Public/private keypair for DHT node.
pub fn key_material(mut self, key_material: &Ed25519KeyMaterial) -> Self {
self.key_material = Some(key_material.to_owned());
self
}

/// Port to listen for incoming TCP connections. If not specified,
/// an open port is automatically chosen.
pub fn listening_port(mut self, port: u16) -> Self {
let mut address = Multiaddr::empty();
address.push(libp2p::multiaddr::Protocol::Ip4(Ipv4Addr::new(
127, 0, 0, 1,
)));
address.push(libp2p::multiaddr::Protocol::Tcp(port));
self.dht_config.listening_address = Some(address);
self
}

/// How frequently, in seconds, the DHT attempts to
/// dial peers found in its kbucket. Outside of tests,
/// should not be lower than 5 seconds.
pub fn peer_dialing_interval(mut self, interval: u64) -> Self {
self.dht_config.peer_dialing_interval = interval;
self
}

/// How long, in seconds, until a network query times out.
pub fn query_timeout(mut self, timeout: u32) -> Self {
self.dht_config.query_timeout = timeout;
self
}

/// The Noosphere Store to use for reading and writing sphere data.
pub fn store(mut self, store: &SphereDb<S>) -> Self {
self.store = Some(store.to_owned());
self
}

/// Default interval for hosted records to be propagated to the network.
pub fn propagation_interval(mut self, propagation_interval: u64) -> Self {
self.propagation_interval = propagation_interval;
self
}

/// Build a [NameSystem] based off of the provided configuration.
pub fn build(mut self) -> Result<NameSystem<S>> {
let key_material = self
.key_material
.take()
.ok_or_else(|| anyhow!("key_material required."))?;
let store = self
.store
.take()
.ok_or_else(|| anyhow!("store required."))?;
Ok(NameSystem::new(
key_material,
store,
self.bootstrap_peers.take(),
self.dht_config,
self.propagation_interval,
))
}
}

impl<S> Default for NameSystemBuilder<S>
where
S: Store,
{
fn default() -> Self {
Self {
bootstrap_peers: None,
dht_config: DHTConfig::default(),
key_material: None,
store: None,
propagation_interval: 60 * 60 * 24, // 1 day
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use noosphere_core::authority::generate_ed25519_key;
use noosphere_storage::{
db::SphereDb,
memory::{MemoryStorageProvider, MemoryStore},
};

#[tokio::test]
async fn test_name_system_builder() -> Result<(), anyhow::Error> {
let key_material = generate_ed25519_key();
let store = SphereDb::new(&MemoryStorageProvider::default())
.await
.unwrap();
let bootstrap_peers: Vec<Multiaddr> = vec![
"/ip4/127.0.0.50/tcp/33333/p2p/12D3KooWH8WgH9mgbMXrKX4veokUznvEn6Ycwg4qaGNi83nLkoUK"
.parse()?,
"/ip4/127.0.0.50/tcp/33334/p2p/12D3KooWMWo6tNGRx1G4TNqvr4SnHyVXSReC3tdX6zoJothXxV2c"
.parse()?,
];

let ns = NameSystemBuilder::default()
.listening_port(30000)
.key_material(&key_material)
.store(&store)
.bootstrap_peers(&bootstrap_peers)
.bootstrap_interval(33)
.peer_dialing_interval(11)
.query_timeout(22)
.propagation_interval(3600)
.build()?;

assert_eq!(ns.key_material.0.as_ref(), key_material.0.as_ref());
assert_eq!(ns._propagation_interval, 3600);
assert_eq!(ns.bootstrap_peers.as_ref().unwrap().len(), 2);
assert_eq!(ns.bootstrap_peers.as_ref().unwrap()[0], bootstrap_peers[0],);
assert_eq!(ns.bootstrap_peers.as_ref().unwrap()[1], bootstrap_peers[1]);
assert_eq!(
ns.dht_config.listening_address.as_ref().unwrap(),
&"/ip4/127.0.0.1/tcp/30000".parse()?
);
assert_eq!(ns.dht_config.bootstrap_interval, 33);
assert_eq!(ns.dht_config.peer_dialing_interval, 11);
assert_eq!(ns.dht_config.query_timeout, 22);

if NameSystemBuilder::default().store(&store).build().is_ok() {
panic!("key_material required.");
}
if NameSystemBuilder::<MemoryStore>::default()
.key_material(&key_material)
.build()
.is_ok()
{
panic!("store required.");
}
Ok(())
}
}
2 changes: 1 addition & 1 deletion rust/noosphere-ns/src/dht/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use core::{fmt, result::Result};
use tokio;
use tokio::sync::{mpsc, mpsc::error::SendError, oneshot, oneshot::error::RecvError};


impl std::error::Error for ChannelError {}
impl fmt::Display for ChannelError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand Down Expand Up @@ -77,6 +76,7 @@ impl<Q, S, E> MessageClient<Q, S, E> {
rx.await.map_err(|e| e.into())
}

#[allow(clippy::type_complexity)]
fn send_request_impl(
&self,
request: Q,
Expand Down
7 changes: 7 additions & 0 deletions rust/noosphere-ns/src/dht/config.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
use libp2p::Multiaddr;

#[derive(Clone, Debug)]
pub struct DHTConfig {
/// If bootstrap peers are provided, how often,
/// in seconds, should the bootstrap process execute
/// to keep routing tables fresh.
pub bootstrap_interval: u64,
/// The local network interface and TCP port to listen
/// for incoming DHT connections. If `None`, can run
/// a limited set of queries on the network.
pub listening_address: Option<Multiaddr>,
/// How frequently, in seconds, the DHT attempts to
/// dial peers found in its kbucket. Outside of tests,
/// should not be lower than 5 seconds.
Expand All @@ -18,6 +24,7 @@ impl Default for DHTConfig {
fn default() -> Self {
Self {
bootstrap_interval: 5 * 60,
listening_address: None,
peer_dialing_interval: 5,
query_timeout: 5 * 60,
}
Expand Down
55 changes: 21 additions & 34 deletions rust/noosphere-ns/src/dht/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use crate::dht::{
utils::key_material_to_libp2p_keypair,
DHTConfig,
};
use std::{net::SocketAddr, time::Duration};
use libp2p;
use std::time::Duration;
use tokio;
use ucan_key_support::ed25519::Ed25519KeyMaterial;

Expand Down Expand Up @@ -36,43 +37,33 @@ pub struct DHTNode {
thread_handle: Option<tokio::task::JoinHandle<Result<(), DHTError>>>,
keypair: libp2p::identity::Keypair,
peer_id: libp2p::PeerId,
p2p_address: libp2p::Multiaddr,
p2p_address: Option<libp2p::Multiaddr>,
bootstrap_peers: Option<Vec<libp2p::Multiaddr>>,
}

impl DHTNode {
/// Creates a new [DHTNode].
/// `listening_address` is a [std::net::SocketAddr] that is used to listen for incoming
/// connections.
/// `bootstrap_peers` is a collection of [String]s in [libp2p::Multiaddr] form of initial
/// peers to connect to during bootstrapping. This collection would be empty in the
/// standalone bootstrap node scenario.
/// `config` is a [DHTConfig] of various configurations for the node.
pub fn new(
key_material: &Ed25519KeyMaterial,
listening_address: &SocketAddr,
bootstrap_peers: Option<&Vec<String>>,
bootstrap_peers: Option<&Vec<libp2p::Multiaddr>>,
config: &DHTConfig,
) -> Result<Self, DHTError> {
let keypair = key_material_to_libp2p_keypair(key_material)?;
let peer_id = libp2p::PeerId::from(keypair.public());
let p2p_address = {
let mut multiaddr: libp2p::Multiaddr = listening_address.ip().into();
multiaddr.push(libp2p::multiaddr::Protocol::Tcp(listening_address.port()));
multiaddr.push(libp2p::multiaddr::Protocol::P2p(peer_id.into()));
multiaddr
};

let peers: Option<Vec<libp2p::Multiaddr>> = if let Some(peers) = bootstrap_peers {
Some(
peers
.iter()
.map(|p| p.parse())
.collect::<Result<Vec<libp2p::Multiaddr>, libp2p::multiaddr::Error>>()
.map_err(|e| DHTError::Error(e.to_string()))?,
)
} else {
None
};
let peers: Option<Vec<libp2p::Multiaddr>> = bootstrap_peers.map(|peers| peers.to_vec());

let p2p_address: Option<libp2p::Multiaddr> =
if let Some(listening_address) = config.listening_address.as_ref() {
let mut p2p_address = listening_address.to_owned();
p2p_address.push(libp2p::multiaddr::Protocol::P2p(peer_id.into()));
Some(p2p_address)
} else {
None
};

Ok(DHTNode {
keypair,
Expand Down Expand Up @@ -114,13 +105,9 @@ impl DHTNode {
}

/// Adds additional bootstrap peers. Can only be executed before calling [DHTNode::run].
pub fn add_peers(&mut self, new_peers: &Vec<String>) -> Result<(), DHTError> {
pub fn add_peers(&mut self, new_peers: &[libp2p::Multiaddr]) -> Result<(), DHTError> {
self.ensure_state(DHTStatus::Initialized)?;
let mut new_peers_list: Vec<libp2p::Multiaddr> = new_peers
.iter()
.map(|p| p.parse())
.collect::<Result<Vec<libp2p::Multiaddr>, libp2p::multiaddr::Error>>()
.map_err(|e| DHTError::Error(e.to_string()))?;
let mut new_peers_list: Vec<libp2p::Multiaddr> = new_peers.to_vec();

if let Some(ref mut peers) = self.bootstrap_peers {
peers.append(&mut new_peers_list);
Expand All @@ -142,8 +129,8 @@ impl DHTNode {
}

/// Returns the listening address of this node.
pub fn p2p_address(&self) -> &libp2p::Multiaddr {
&self.p2p_address
pub fn p2p_address(&self) -> Option<&libp2p::Multiaddr> {
self.p2p_address.as_ref()
}

pub fn status(&self) -> DHTStatus {
Expand Down Expand Up @@ -199,10 +186,10 @@ impl DHTNode {
/// Return value may be `Ok(None)` if query finished without finding
/// any matching values.
/// Fails if node is not in an active state.
pub async fn get_record(&self, name: Vec<u8>) -> Result<Option<Vec<u8>>, DHTError> {
pub async fn get_record(&self, name: Vec<u8>) -> Result<(Vec<u8>, Option<Vec<u8>>), DHTError> {
let request = DHTRequest::GetRecord { name };
let response = self.send_request(request).await?;
ensure_response!(response, DHTResponse::GetRecord { value, .. } => Ok(Some(value)))
ensure_response!(response, DHTResponse::GetRecord { name, value, .. } => Ok((name, value)))
}

/// Instructs the node to tell its peers that it is providing
Expand Down
Loading