Skip to content
This repository has been archived by the owner on Sep 21, 2024. It is now read-only.

Commit

Permalink
feat!: initial work on NameSystem, wrapping the underlying DHT networ…
Browse files Browse the repository at this point in the history
…k. (#122)

* Follow up documentation fixes for noosphere_ns
* Refactor internal record representation as a wrapper around a UCAN auth token representing permission to publish.
* Add tests for NSRecord::validate for delegated capabilities
* Use non-ref NSRecord as input/output of NameSystem so the tokens can be stored/validated later, and enable NameSystem to evict expired records from cache.
* Add delegated UCAN tokens to NS tests to more accurately reflect actual usage of the name system.
  • Loading branch information
jsantell authored Nov 7, 2022
1 parent c063059 commit 656fb23
Show file tree
Hide file tree
Showing 18 changed files with 1,411 additions and 241 deletions.
204 changes: 84 additions & 120 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion rust/noosphere-core/src/authority/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl TryFrom<String> for SphereAction {
}
}

#[derive(Clone, PartialEq, Eq)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct SphereReference {
pub did: String,
}
Expand Down
2 changes: 1 addition & 1 deletion rust/noosphere-core/src/data/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl DelegationIpld {
}
}

/// See https://github.com/ucan-wg/spec#66-revocation
/// See <https://github.com/ucan-wg/spec#66-revocation>
/// TODO(ucan-wg/spec#112): Verify the form of this
#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize, Hash)]
pub struct RevocationIpld {
Expand Down
16 changes: 11 additions & 5 deletions rust/noosphere-ns/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,23 @@ homepage = "https://github.com/subconsciousnetwork/noosphere"
readme = "README.md"

[dependencies]
anyhow = "^1"
tracing = "0.1"

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
noosphere-core = { version = "0.1.0-alpha.1", path = "../noosphere-core" }
ucan-key-support = { version = "0.7.0-alpha.1" }
anyhow = "^1"
tracing = "0.1"
cid = "~0.8"
serde = "^1"
serde_json = "^1"
futures = "0.3.1"
ucan = { version = "0.7.0-alpha.1" }
ucan-key-support = { version = "0.7.0-alpha.1" }
tokio = { version = "1.15", features = ["io-util", "io-std", "sync", "macros", "rt", "rt-multi-thread"] }
tracing-subscriber = { version = "~0.3", features = ["env-filter"] }
libp2p = { version = "0.49.0", default-features = false, features = [ "identify", "dns", "tcp", "tokio", "noise", "mplex", "yamux", "kad" ] }
noosphere-storage = { version = "0.1.0-alpha.1", path = "../noosphere-storage" }
noosphere-core = { version = "0.1.0-alpha.1", path = "../noosphere-core" }

[dev-dependencies]
rand = { version = "0.8.5" }
test-log = { version = "0.2.11", default-features = false, features = ["trace"] }
tracing-subscriber = { version = "~0.3", features = ["env-filter"] }
libipld-cbor = "~0.14"
202 changes: 202 additions & 0 deletions rust/noosphere-ns/src/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
use crate::{dht::DHTConfig, name_system::NameSystem};
use anyhow::{anyhow, Result};
use libp2p::{self, Multiaddr};
use noosphere_storage::{db::SphereDb, interface::Store};
use std::net::Ipv4Addr;
use ucan_key_support::ed25519::Ed25519KeyMaterial;

/// [NameSystemBuilder] is the primary external interface for
/// creating a new [NameSystem]. `key_material` and `store`
/// must be provided.
///
/// # Examples
///
/// ```
/// use noosphere_core::authority::generate_ed25519_key;
/// use noosphere_storage::{db::SphereDb, memory::{MemoryStore, MemoryStorageProvider}};
/// use noosphere_ns::{NameSystem, NameSystemBuilder};
/// use tokio;
///
/// #[tokio::main]
/// async fn main() {
/// let key_material = generate_ed25519_key();
/// let store = SphereDb::new(&MemoryStorageProvider::default()).await.unwrap();
///
/// let ns = NameSystemBuilder::default()
/// .key_material(&key_material)
/// .store(&store)
/// .listening_port(30000)
/// .build().expect("valid config");
///
/// assert!(NameSystemBuilder::<MemoryStore>::default().build().is_err(), "key_material and store must be provided.");
/// }
/// ```
pub struct NameSystemBuilder<S>
where
S: Store,
{
bootstrap_peers: Option<Vec<Multiaddr>>,
dht_config: DHTConfig,
key_material: Option<Ed25519KeyMaterial>,
store: Option<SphereDb<S>>,
propagation_interval: u64,
}

impl<S> NameSystemBuilder<S>
where
S: Store,
{
/// If bootstrap peers are provided, how often,
/// in seconds, should the bootstrap process execute
/// to keep routing tables fresh.
pub fn bootstrap_interval(mut self, interval: u64) -> Self {
self.dht_config.bootstrap_interval = interval;
self
}

/// Peer addresses to query to update routing tables
/// during bootstrap. A standalone bootstrap node would
/// have this field empty.
pub fn bootstrap_peers(mut self, peers: &Vec<Multiaddr>) -> Self {
self.bootstrap_peers = Some(peers.to_owned());
self
}

/// Public/private keypair for DHT node.
pub fn key_material(mut self, key_material: &Ed25519KeyMaterial) -> Self {
self.key_material = Some(key_material.to_owned());
self
}

/// Port to listen for incoming TCP connections. If not specified,
/// an open port is automatically chosen.
pub fn listening_port(mut self, port: u16) -> Self {
let mut address = Multiaddr::empty();
address.push(libp2p::multiaddr::Protocol::Ip4(Ipv4Addr::new(
127, 0, 0, 1,
)));
address.push(libp2p::multiaddr::Protocol::Tcp(port));
self.dht_config.listening_address = Some(address);
self
}

/// How frequently, in seconds, the DHT attempts to
/// dial peers found in its kbucket. Outside of tests,
/// should not be lower than 5 seconds.
pub fn peer_dialing_interval(mut self, interval: u64) -> Self {
self.dht_config.peer_dialing_interval = interval;
self
}

/// How long, in seconds, until a network query times out.
pub fn query_timeout(mut self, timeout: u32) -> Self {
self.dht_config.query_timeout = timeout;
self
}

/// The Noosphere Store to use for reading and writing sphere data.
pub fn store(mut self, store: &SphereDb<S>) -> Self {
self.store = Some(store.to_owned());
self
}

/// Default interval for hosted records to be propagated to the network.
pub fn propagation_interval(mut self, propagation_interval: u64) -> Self {
self.propagation_interval = propagation_interval;
self
}

/// Build a [NameSystem] based off of the provided configuration.
pub fn build(mut self) -> Result<NameSystem<S>> {
let key_material = self
.key_material
.take()
.ok_or_else(|| anyhow!("key_material required."))?;
let store = self
.store
.take()
.ok_or_else(|| anyhow!("store required."))?;
Ok(NameSystem::new(
key_material,
store,
self.bootstrap_peers.take(),
self.dht_config,
self.propagation_interval,
))
}
}

impl<S> Default for NameSystemBuilder<S>
where
S: Store,
{
fn default() -> Self {
Self {
bootstrap_peers: None,
dht_config: DHTConfig::default(),
key_material: None,
store: None,
propagation_interval: 60 * 60 * 24, // 1 day
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use noosphere_core::authority::generate_ed25519_key;
use noosphere_storage::{
db::SphereDb,
memory::{MemoryStorageProvider, MemoryStore},
};

#[tokio::test]
async fn test_name_system_builder() -> Result<(), anyhow::Error> {
let key_material = generate_ed25519_key();
let store = SphereDb::new(&MemoryStorageProvider::default())
.await
.unwrap();
let bootstrap_peers: Vec<Multiaddr> = vec![
"/ip4/127.0.0.50/tcp/33333/p2p/12D3KooWH8WgH9mgbMXrKX4veokUznvEn6Ycwg4qaGNi83nLkoUK"
.parse()?,
"/ip4/127.0.0.50/tcp/33334/p2p/12D3KooWMWo6tNGRx1G4TNqvr4SnHyVXSReC3tdX6zoJothXxV2c"
.parse()?,
];

let ns = NameSystemBuilder::default()
.listening_port(30000)
.key_material(&key_material)
.store(&store)
.bootstrap_peers(&bootstrap_peers)
.bootstrap_interval(33)
.peer_dialing_interval(11)
.query_timeout(22)
.propagation_interval(3600)
.build()?;

assert_eq!(ns.key_material.0.as_ref(), key_material.0.as_ref());
assert_eq!(ns._propagation_interval, 3600);
assert_eq!(ns.bootstrap_peers.as_ref().unwrap().len(), 2);
assert_eq!(ns.bootstrap_peers.as_ref().unwrap()[0], bootstrap_peers[0],);
assert_eq!(ns.bootstrap_peers.as_ref().unwrap()[1], bootstrap_peers[1]);
assert_eq!(
ns.dht_config.listening_address.as_ref().unwrap(),
&"/ip4/127.0.0.1/tcp/30000".parse()?
);
assert_eq!(ns.dht_config.bootstrap_interval, 33);
assert_eq!(ns.dht_config.peer_dialing_interval, 11);
assert_eq!(ns.dht_config.query_timeout, 22);

if NameSystemBuilder::default().store(&store).build().is_ok() {
panic!("key_material required.");
}
if NameSystemBuilder::<MemoryStore>::default()
.key_material(&key_material)
.build()
.is_ok()
{
panic!("store required.");
}
Ok(())
}
}
2 changes: 1 addition & 1 deletion rust/noosphere-ns/src/dht/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use core::{fmt, result::Result};
use tokio;
use tokio::sync::{mpsc, mpsc::error::SendError, oneshot, oneshot::error::RecvError};


impl std::error::Error for ChannelError {}
impl fmt::Display for ChannelError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand Down Expand Up @@ -77,6 +76,7 @@ impl<Q, S, E> MessageClient<Q, S, E> {
rx.await.map_err(|e| e.into())
}

#[allow(clippy::type_complexity)]
fn send_request_impl(
&self,
request: Q,
Expand Down
7 changes: 7 additions & 0 deletions rust/noosphere-ns/src/dht/config.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
use libp2p::Multiaddr;

#[derive(Clone, Debug)]
pub struct DHTConfig {
/// If bootstrap peers are provided, how often,
/// in seconds, should the bootstrap process execute
/// to keep routing tables fresh.
pub bootstrap_interval: u64,
/// The local network interface and TCP port to listen
/// for incoming DHT connections. If `None`, can run
/// a limited set of queries on the network.
pub listening_address: Option<Multiaddr>,
/// How frequently, in seconds, the DHT attempts to
/// dial peers found in its kbucket. Outside of tests,
/// should not be lower than 5 seconds.
Expand All @@ -18,6 +24,7 @@ impl Default for DHTConfig {
fn default() -> Self {
Self {
bootstrap_interval: 5 * 60,
listening_address: None,
peer_dialing_interval: 5,
query_timeout: 5 * 60,
}
Expand Down
55 changes: 21 additions & 34 deletions rust/noosphere-ns/src/dht/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use crate::dht::{
utils::key_material_to_libp2p_keypair,
DHTConfig,
};
use std::{net::SocketAddr, time::Duration};
use libp2p;
use std::time::Duration;
use tokio;
use ucan_key_support::ed25519::Ed25519KeyMaterial;

Expand Down Expand Up @@ -36,43 +37,33 @@ pub struct DHTNode {
thread_handle: Option<tokio::task::JoinHandle<Result<(), DHTError>>>,
keypair: libp2p::identity::Keypair,
peer_id: libp2p::PeerId,
p2p_address: libp2p::Multiaddr,
p2p_address: Option<libp2p::Multiaddr>,
bootstrap_peers: Option<Vec<libp2p::Multiaddr>>,
}

impl DHTNode {
/// Creates a new [DHTNode].
/// `listening_address` is a [std::net::SocketAddr] that is used to listen for incoming
/// connections.
/// `bootstrap_peers` is a collection of [String]s in [libp2p::Multiaddr] form of initial
/// peers to connect to during bootstrapping. This collection would be empty in the
/// standalone bootstrap node scenario.
/// `config` is a [DHTConfig] of various configurations for the node.
pub fn new(
key_material: &Ed25519KeyMaterial,
listening_address: &SocketAddr,
bootstrap_peers: Option<&Vec<String>>,
bootstrap_peers: Option<&Vec<libp2p::Multiaddr>>,
config: &DHTConfig,
) -> Result<Self, DHTError> {
let keypair = key_material_to_libp2p_keypair(key_material)?;
let peer_id = libp2p::PeerId::from(keypair.public());
let p2p_address = {
let mut multiaddr: libp2p::Multiaddr = listening_address.ip().into();
multiaddr.push(libp2p::multiaddr::Protocol::Tcp(listening_address.port()));
multiaddr.push(libp2p::multiaddr::Protocol::P2p(peer_id.into()));
multiaddr
};

let peers: Option<Vec<libp2p::Multiaddr>> = if let Some(peers) = bootstrap_peers {
Some(
peers
.iter()
.map(|p| p.parse())
.collect::<Result<Vec<libp2p::Multiaddr>, libp2p::multiaddr::Error>>()
.map_err(|e| DHTError::Error(e.to_string()))?,
)
} else {
None
};
let peers: Option<Vec<libp2p::Multiaddr>> = bootstrap_peers.map(|peers| peers.to_vec());

let p2p_address: Option<libp2p::Multiaddr> =
if let Some(listening_address) = config.listening_address.as_ref() {
let mut p2p_address = listening_address.to_owned();
p2p_address.push(libp2p::multiaddr::Protocol::P2p(peer_id.into()));
Some(p2p_address)
} else {
None
};

Ok(DHTNode {
keypair,
Expand Down Expand Up @@ -114,13 +105,9 @@ impl DHTNode {
}

/// Adds additional bootstrap peers. Can only be executed before calling [DHTNode::run].
pub fn add_peers(&mut self, new_peers: &Vec<String>) -> Result<(), DHTError> {
pub fn add_peers(&mut self, new_peers: &[libp2p::Multiaddr]) -> Result<(), DHTError> {
self.ensure_state(DHTStatus::Initialized)?;
let mut new_peers_list: Vec<libp2p::Multiaddr> = new_peers
.iter()
.map(|p| p.parse())
.collect::<Result<Vec<libp2p::Multiaddr>, libp2p::multiaddr::Error>>()
.map_err(|e| DHTError::Error(e.to_string()))?;
let mut new_peers_list: Vec<libp2p::Multiaddr> = new_peers.to_vec();

if let Some(ref mut peers) = self.bootstrap_peers {
peers.append(&mut new_peers_list);
Expand All @@ -142,8 +129,8 @@ impl DHTNode {
}

/// Returns the listening address of this node.
pub fn p2p_address(&self) -> &libp2p::Multiaddr {
&self.p2p_address
pub fn p2p_address(&self) -> Option<&libp2p::Multiaddr> {
self.p2p_address.as_ref()
}

pub fn status(&self) -> DHTStatus {
Expand Down Expand Up @@ -199,10 +186,10 @@ impl DHTNode {
/// Return value may be `Ok(None)` if query finished without finding
/// any matching values.
/// Fails if node is not in an active state.
pub async fn get_record(&self, name: Vec<u8>) -> Result<Option<Vec<u8>>, DHTError> {
pub async fn get_record(&self, name: Vec<u8>) -> Result<(Vec<u8>, Option<Vec<u8>>), DHTError> {
let request = DHTRequest::GetRecord { name };
let response = self.send_request(request).await?;
ensure_response!(response, DHTResponse::GetRecord { value, .. } => Ok(Some(value)))
ensure_response!(response, DHTResponse::GetRecord { name, value, .. } => Ok((name, value)))
}

/// Instructs the node to tell its peers that it is providing
Expand Down
Loading

0 comments on commit 656fb23

Please sign in to comment.