From 6b0dc5e9f872ed802ecfeec262485d673e508a66 Mon Sep 17 00:00:00 2001 From: Jordan Santell Date: Fri, 28 Oct 2022 12:39:20 -0700 Subject: [PATCH] dev: initial work on NameSystem, wrapping the underlying DHT network. --- Cargo.lock | 196 ++++------- rust/noosphere-ns/Cargo.toml | 6 +- rust/noosphere-ns/src/builder.rs | 157 +++++++++ rust/noosphere-ns/src/dht/channel.rs | 2 +- rust/noosphere-ns/src/dht/config.rs | 7 + rust/noosphere-ns/src/dht/node.rs | 55 ++- rust/noosphere-ns/src/dht/processor.rs | 67 ++-- rust/noosphere-ns/src/dht/swarm.rs | 5 +- rust/noosphere-ns/src/dht/types.rs | 8 +- rust/noosphere-ns/src/lib.rs | 19 ++ rust/noosphere-ns/src/name_system.rs | 195 +++++++++++ rust/noosphere-ns/src/records.rs | 318 ++++++++++++++++++ .../{integration_test.rs => dht_test.rs} | 18 +- rust/noosphere-ns/tests/ns_test.rs | 110 ++++++ rust/noosphere-ns/tests/utils/mod.rs | 39 +-- 15 files changed, 968 insertions(+), 234 deletions(-) create mode 100644 rust/noosphere-ns/src/builder.rs create mode 100644 rust/noosphere-ns/src/name_system.rs create mode 100644 rust/noosphere-ns/src/records.rs rename rust/noosphere-ns/tests/{integration_test.rs => dht_test.rs} (90%) create mode 100644 rust/noosphere-ns/tests/ns_test.rs diff --git a/Cargo.lock b/Cargo.lock index 6c3a3287f..6d1d909b0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1002,21 +1002,6 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" -[[package]] -name = "foreign-types" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" -dependencies = [ - "foreign-types-shared", -] - -[[package]] -name = "foreign-types-shared" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" - [[package]] name = "forest_hash_utils" version = "0.1.0" @@ -1473,16 +1458,16 @@ dependencies = [ ] [[package]] -name = "hyper-tls" -version = "0.5.0" +name = "hyper-rustls" +version = "0.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +checksum = "d87c48c02e0dc5e3b849a2041db3029fd066650f8f717c07bf8ed78ccb895cac" dependencies = [ - "bytes", + "http", "hyper", - "native-tls", + "rustls", "tokio", - "tokio-native-tls", + "tokio-rustls", ] [[package]] @@ -2135,24 +2120,6 @@ dependencies = [ "unsigned-varint", ] -[[package]] -name = "native-tls" -version = "0.2.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd7e2f3618557f980e0b17e8856252eee3c97fa12c54dff0ca290fb6266ca4a9" -dependencies = [ - "lazy_static", - "libc", - "log", - "openssl", - "openssl-probe", - "openssl-sys", - "schannel", - "security-framework", - "security-framework-sys", - "tempfile", -] - [[package]] name = "netlink-packet-core" version = "0.4.2" @@ -2424,14 +2391,18 @@ name = "noosphere-ns" version = "0.1.0-alpha.1" dependencies = [ "anyhow", + "cid", "futures", "libp2p", "noosphere-core", "rand 0.8.5", + "serde", + "serde_json", "test-log", "tokio", "tracing", "tracing-subscriber", + "ucan", "ucan-key-support", ] @@ -2599,51 +2570,6 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" -[[package]] -name = "openssl" -version = "0.10.42" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12fc0523e3bd51a692c8850d075d74dc062ccf251c0110668cbd921917118a13" -dependencies = [ - "bitflags", - "cfg-if", - "foreign-types", - "libc", - "once_cell", - "openssl-macros", - "openssl-sys", -] - -[[package]] -name = "openssl-macros" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b501e44f11665960c7e7fcf062c7d96a14ade4aa98116c004b2e37b5be7d736c" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - -[[package]] -name = "openssl-probe" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" - -[[package]] -name = "openssl-sys" -version = "0.9.76" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5230151e44c0f05157effb743e8d517472843121cf9243e8b81393edb5acd9ce" -dependencies = [ - "autocfg", - "cc", - "libc", - "pkg-config", - "vcpkg", -] - [[package]] name = "os_str_bytes" version = "6.3.0" @@ -2828,12 +2754,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "pkg-config" -version = "0.3.25" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1df8c4ec4b0627e53bdf214615ad287367e482558cf84b109250b37464dc03ae" - [[package]] name = "polling" version = "2.3.0" @@ -3167,25 +3087,27 @@ dependencies = [ "http", "http-body", "hyper", - "hyper-tls", + "hyper-rustls", "ipnet", "js-sys", "log", "mime", - "native-tls", "once_cell", "percent-encoding", "pin-project-lite", + "rustls", + "rustls-pemfile", "serde", "serde_json", "serde_urlencoded", "tokio", - "tokio-native-tls", + "tokio-rustls", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", "web-sys", + "webpki-roots", "winreg 0.10.1", ] @@ -3288,6 +3210,27 @@ dependencies = [ "semver 1.0.14", ] +[[package]] +name = "rustls" +version = "0.20.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "539a2bfe908f471bfa933876bd1eb6a19cf2176d375f82ef7f99530a40e48c2c" +dependencies = [ + "log", + "ring", + "sct", + "webpki", +] + +[[package]] +name = "rustls-pemfile" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0864aeff53f8c05aa08d86e5ef839d3dfcf07aeba2db32f12db0ef716e87bd55" +dependencies = [ + "base64", +] + [[package]] name = "rustversion" version = "1.0.9" @@ -3311,16 +3254,6 @@ version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4501abdff3ae82a1c1b477a17252eb69cee9e66eb915c1abaa4f44d873df9f09" -[[package]] -name = "schannel" -version = "0.1.20" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "88d6731146462ea25d9244b2ed5fd1d716d25c52e4d54aa4fb0f3c4e9854dbe2" -dependencies = [ - "lazy_static", - "windows-sys 0.36.1", -] - [[package]] name = "scoped-tls" version = "1.0.0" @@ -3334,26 +3267,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" [[package]] -name = "security-framework" -version = "2.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2bc1bb97804af6631813c55739f771071e0f2ed33ee20b68c86ec505d906356c" -dependencies = [ - "bitflags", - "core-foundation", - "core-foundation-sys", - "libc", - "security-framework-sys", -] - -[[package]] -name = "security-framework-sys" -version = "2.6.1" +name = "sct" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0160a13a177a45bfb43ce71c01580998474f556ad854dcbca936dd2841a5c556" +checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" dependencies = [ - "core-foundation-sys", - "libc", + "ring", + "untrusted", ] [[package]] @@ -3949,13 +3869,14 @@ dependencies = [ ] [[package]] -name = "tokio-native-tls" -version = "0.3.0" +name = "tokio-rustls" +version = "0.23.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7d995660bd2b7f8c1568414c1126076c13fbb725c40112dc0120b78eb9b717b" +checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" dependencies = [ - "native-tls", + "rustls", "tokio", + "webpki", ] [[package]] @@ -4340,12 +4261,6 @@ dependencies = [ "version_check", ] -[[package]] -name = "vcpkg" -version = "0.2.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" - [[package]] name = "version_check" version = "0.9.4" @@ -4486,6 +4401,25 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd" +dependencies = [ + "ring", + "untrusted", +] + +[[package]] +name = "webpki-roots" +version = "0.22.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "368bfe657969fb01238bb756d351dcade285e0f6fcbd36dcb23359a5169975be" +dependencies = [ + "webpki", +] + [[package]] name = "wepoll-ffi" version = "0.1.2" diff --git a/rust/noosphere-ns/Cargo.toml b/rust/noosphere-ns/Cargo.toml index d7ab99fe9..ddf4045eb 100644 --- a/rust/noosphere-ns/Cargo.toml +++ b/rust/noosphere-ns/Cargo.toml @@ -24,15 +24,19 @@ readme = "README.md" [dependencies] anyhow = "^1" tracing = "0.1" +cid = "~0.8" +serde = "^1" +serde_json = "^1" [target.'cfg(not(target_arch = "wasm32"))'.dependencies] noosphere-core = { version = "0.1.0-alpha.1", path = "../noosphere-core" } ucan-key-support = { version = "0.7.0-alpha.1" } futures = "0.3.1" tokio = { version = "1.15", features = ["io-util", "io-std", "sync", "macros", "rt", "rt-multi-thread"] } -tracing-subscriber = { version = "~0.3", features = ["env-filter"] } libp2p = { version = "0.49.0", default-features = false, features = [ "identify", "dns", "tcp", "tokio", "noise", "mplex", "yamux", "kad" ] } +ucan = { version = "0.7.0-alpha.1" } [dev-dependencies] rand = { version = "0.8.5" } test-log = { version = "0.2.11", default-features = false, features = ["trace"] } +tracing-subscriber = { version = "~0.3", features = ["env-filter"] } diff --git a/rust/noosphere-ns/src/builder.rs b/rust/noosphere-ns/src/builder.rs new file mode 100644 index 000000000..d7f63ff71 --- /dev/null +++ b/rust/noosphere-ns/src/builder.rs @@ -0,0 +1,157 @@ +use crate::{dht::DHTConfig, name_system::NameSystem}; +use anyhow::{anyhow, Result}; +use libp2p::{self, Multiaddr}; +use std::collections::HashMap; +use std::net::Ipv4Addr; +use ucan_key_support::ed25519::Ed25519KeyMaterial; + +/// [NameSystemBuilder] is the primary external interface for +/// creating a new [NameSystem]. +/// +/// # Examples +/// +/// ``` +/// use noosphere_core::authority::generate_ed25519_key; +/// use noosphere_ns::{NameSystem, NameSystemBuilder}; +/// +/// let key_material = generate_ed25519_key(); +/// let ns = NameSystemBuilder::default() +/// .key_material(&key_material) +/// .listening_port(30000) +/// .build().expect("valid config"); +/// +/// ``` +pub struct NameSystemBuilder<'a> { + bootstrap_peers: Option<&'a Vec>, + dht_config: DHTConfig, + key_material: Option<&'a Ed25519KeyMaterial>, + ttl: u64, +} + +impl<'a> NameSystemBuilder<'a> { + /// If bootstrap peers are provided, how often, + /// in seconds, should the bootstrap process execute + /// to keep routing tables fresh. + pub fn bootstrap_interval(mut self, interval: u64) -> Self { + self.dht_config.bootstrap_interval = interval; + self + } + + /// Peer addresses to query to update routing tables + /// during bootstrap. A standalone bootstrap node would + /// have this field empty. + pub fn bootstrap_peers(mut self, peers: &'a Vec) -> Self { + self.bootstrap_peers = Some(peers); + self + } + + /// Public/private keypair for DHT node. + pub fn key_material(mut self, key_material: &'a Ed25519KeyMaterial) -> Self { + self.key_material = Some(key_material); + self + } + + /// Port to listen for incoming TCP connections. + pub fn listening_port(mut self, port: u16) -> Self { + let mut address = Multiaddr::empty(); + address.push(libp2p::multiaddr::Protocol::Ip4(Ipv4Addr::new( + 127, 0, 0, 1, + ))); + address.push(libp2p::multiaddr::Protocol::Tcp(port)); + self.dht_config.listening_address = Some(address); + self + } + + /// How frequently, in seconds, the DHT attempts to + /// dial peers found in its kbucket. Outside of tests, + /// should not be lower than 5 seconds. + pub fn peer_dialing_interval(mut self, interval: u64) -> Self { + self.dht_config.peer_dialing_interval = interval; + self + } + + /// How long, in seconds, until a network query times out. + pub fn query_timeout(mut self, timeout: u32) -> Self { + self.dht_config.query_timeout = timeout; + self + } + + /// Default Time To Live (TTL) for records propagated to the network. + pub fn ttl(mut self, ttl: u64) -> Self { + self.ttl = ttl; + self + } + + /// Build a [NameSystem] based off of the provided configuration. + pub fn build(mut self) -> Result> { + let key_material = self + .key_material + .take() + .ok_or_else(|| anyhow!("key_material required."))?; + Ok(NameSystem { + bootstrap_peers: self.bootstrap_peers.take(), + dht: None, + dht_config: self.dht_config, + key_material, + ttl: self.ttl, + hosted_records: HashMap::new(), + resolved_records: HashMap::new(), + }) + } +} + +impl<'a> Default for NameSystemBuilder<'a> { + fn default() -> Self { + Self { + bootstrap_peers: None, + ttl: 60 * 60 * 24, // 1 day + dht_config: DHTConfig::default(), + key_material: None, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use noosphere_core::authority::generate_ed25519_key; + + #[test] + fn test_name_system_builder() -> Result<(), anyhow::Error> { + let key_material = generate_ed25519_key(); + let bootstrap_peers: Vec = vec![ + "/ip4/127.0.0.50/tcp/33333/p2p/12D3KooWH8WgH9mgbMXrKX4veokUznvEn6Ycwg4qaGNi83nLkoUK" + .parse()?, + "/ip4/127.0.0.50/tcp/33334/p2p/12D3KooWMWo6tNGRx1G4TNqvr4SnHyVXSReC3tdX6zoJothXxV2c" + .parse()?, + ]; + + let ns = NameSystemBuilder::default() + .listening_port(30000) + .key_material(&key_material) + .bootstrap_peers(&bootstrap_peers) + .bootstrap_interval(33) + .peer_dialing_interval(11) + .query_timeout(22) + .ttl(3600) + .build()?; + + assert_eq!(ns.key_material.0.as_ref(), key_material.0.as_ref()); + assert_eq!(ns.ttl, 3600); + assert_eq!(ns.bootstrap_peers.as_ref().unwrap().len(), 2); + assert_eq!(ns.bootstrap_peers.as_ref().unwrap()[0], bootstrap_peers[0],); + assert_eq!(ns.bootstrap_peers.as_ref().unwrap()[1], bootstrap_peers[1]); + assert_eq!( + ns.dht_config.listening_address.as_ref().unwrap(), + &"/ip4/127.0.0.1/tcp/30000".parse()? + ); + assert_eq!(ns.dht_config.bootstrap_interval, 33); + assert_eq!(ns.dht_config.peer_dialing_interval, 11); + assert_eq!(ns.dht_config.query_timeout, 22); + + if let Ok(_) = NameSystemBuilder::default().build() { + panic!("key_material required."); + } + Ok(()) + } +} diff --git a/rust/noosphere-ns/src/dht/channel.rs b/rust/noosphere-ns/src/dht/channel.rs index 05e1c516e..df12ce288 100644 --- a/rust/noosphere-ns/src/dht/channel.rs +++ b/rust/noosphere-ns/src/dht/channel.rs @@ -2,7 +2,6 @@ use core::{fmt, result::Result}; use tokio; use tokio::sync::{mpsc, mpsc::error::SendError, oneshot, oneshot::error::RecvError}; - impl std::error::Error for ChannelError {} impl fmt::Display for ChannelError { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -77,6 +76,7 @@ impl MessageClient { rx.await.map_err(|e| e.into()) } + #[allow(clippy::type_complexity)] fn send_request_impl( &self, request: Q, diff --git a/rust/noosphere-ns/src/dht/config.rs b/rust/noosphere-ns/src/dht/config.rs index a59484abc..6a7daef9f 100644 --- a/rust/noosphere-ns/src/dht/config.rs +++ b/rust/noosphere-ns/src/dht/config.rs @@ -1,9 +1,15 @@ +use libp2p::Multiaddr; + #[derive(Clone, Debug)] pub struct DHTConfig { /// If bootstrap peers are provided, how often, /// in seconds, should the bootstrap process execute /// to keep routing tables fresh. pub bootstrap_interval: u64, + /// The local network interface and TCP port to listen + /// for incoming DHT connections. If `None`, can run + /// a limited set of queries on the network. + pub listening_address: Option, /// How frequently, in seconds, the DHT attempts to /// dial peers found in its kbucket. Outside of tests, /// should not be lower than 5 seconds. @@ -18,6 +24,7 @@ impl Default for DHTConfig { fn default() -> Self { Self { bootstrap_interval: 5 * 60, + listening_address: None, peer_dialing_interval: 5, query_timeout: 5 * 60, } diff --git a/rust/noosphere-ns/src/dht/node.rs b/rust/noosphere-ns/src/dht/node.rs index e28139ceb..173274b50 100644 --- a/rust/noosphere-ns/src/dht/node.rs +++ b/rust/noosphere-ns/src/dht/node.rs @@ -6,7 +6,8 @@ use crate::dht::{ utils::key_material_to_libp2p_keypair, DHTConfig, }; -use std::{net::SocketAddr, time::Duration}; +use libp2p; +use std::time::Duration; use tokio; use ucan_key_support::ed25519::Ed25519KeyMaterial; @@ -36,43 +37,33 @@ pub struct DHTNode { thread_handle: Option>>, keypair: libp2p::identity::Keypair, peer_id: libp2p::PeerId, - p2p_address: libp2p::Multiaddr, + p2p_address: Option, bootstrap_peers: Option>, } impl DHTNode { /// Creates a new [DHTNode]. - /// `listening_address` is a [std::net::SocketAddr] that is used to listen for incoming - /// connections. /// `bootstrap_peers` is a collection of [String]s in [libp2p::Multiaddr] form of initial /// peers to connect to during bootstrapping. This collection would be empty in the /// standalone bootstrap node scenario. + /// `config` is a [DHTConfig] of various configurations for the node. pub fn new( key_material: &Ed25519KeyMaterial, - listening_address: &SocketAddr, - bootstrap_peers: Option<&Vec>, + bootstrap_peers: Option<&Vec>, config: &DHTConfig, ) -> Result { let keypair = key_material_to_libp2p_keypair(key_material)?; let peer_id = libp2p::PeerId::from(keypair.public()); - let p2p_address = { - let mut multiaddr: libp2p::Multiaddr = listening_address.ip().into(); - multiaddr.push(libp2p::multiaddr::Protocol::Tcp(listening_address.port())); - multiaddr.push(libp2p::multiaddr::Protocol::P2p(peer_id.into())); - multiaddr - }; - - let peers: Option> = if let Some(peers) = bootstrap_peers { - Some( - peers - .iter() - .map(|p| p.parse()) - .collect::, libp2p::multiaddr::Error>>() - .map_err(|e| DHTError::Error(e.to_string()))?, - ) - } else { - None - }; + let peers: Option> = bootstrap_peers.map(|peers| peers.to_vec()); + + let p2p_address: Option = + if let Some(listening_address) = config.listening_address.as_ref() { + let mut p2p_address = listening_address.to_owned(); + p2p_address.push(libp2p::multiaddr::Protocol::P2p(peer_id.into())); + Some(p2p_address) + } else { + None + }; Ok(DHTNode { keypair, @@ -114,13 +105,9 @@ impl DHTNode { } /// Adds additional bootstrap peers. Can only be executed before calling [DHTNode::run]. - pub fn add_peers(&mut self, new_peers: &Vec) -> Result<(), DHTError> { + pub fn add_peers(&mut self, new_peers: &[libp2p::Multiaddr]) -> Result<(), DHTError> { self.ensure_state(DHTStatus::Initialized)?; - let mut new_peers_list: Vec = new_peers - .iter() - .map(|p| p.parse()) - .collect::, libp2p::multiaddr::Error>>() - .map_err(|e| DHTError::Error(e.to_string()))?; + let mut new_peers_list: Vec = new_peers.to_vec(); if let Some(ref mut peers) = self.bootstrap_peers { peers.append(&mut new_peers_list); @@ -142,8 +129,8 @@ impl DHTNode { } /// Returns the listening address of this node. - pub fn p2p_address(&self) -> &libp2p::Multiaddr { - &self.p2p_address + pub fn p2p_address(&self) -> Option<&libp2p::Multiaddr> { + self.p2p_address.as_ref() } pub fn status(&self) -> DHTStatus { @@ -199,10 +186,10 @@ impl DHTNode { /// Return value may be `Ok(None)` if query finished without finding /// any matching values. /// Fails if node is not in an active state. - pub async fn get_record(&self, name: Vec) -> Result>, DHTError> { + pub async fn get_record(&self, name: Vec) -> Result<(Vec, Option>), DHTError> { let request = DHTRequest::GetRecord { name }; let response = self.send_request(request).await?; - ensure_response!(response, DHTResponse::GetRecord { value, .. } => Ok(Some(value))) + ensure_response!(response, DHTResponse::GetRecord { name, value, .. } => Ok((name, value))) } /// Instructs the node to tell its peers that it is providing diff --git a/rust/noosphere-ns/src/dht/processor.rs b/rust/noosphere-ns/src/dht/processor.rs index 9e30627ff..590a31482 100644 --- a/rust/noosphere-ns/src/dht/processor.rs +++ b/rust/noosphere-ns/src/dht/processor.rs @@ -27,8 +27,8 @@ use tokio; /// should only interface with a [DHTProcessor] via [DHTNode]. pub struct DHTProcessor { config: DHTConfig, - p2p_address: libp2p::Multiaddr, peer_id: PeerId, + p2p_address: Option, processor: DHTMessageProcessor, swarm: DHTSwarm, requests: HashMap, @@ -59,7 +59,7 @@ impl DHTProcessor { pub(crate) fn spawn( keypair: &libp2p::identity::Keypair, peer_id: &PeerId, - p2p_address: &libp2p::Multiaddr, + p2p_address: &Option, bootstrap_peers: &Option>, config: &DHTConfig, processor: DHTMessageProcessor, @@ -136,7 +136,7 @@ impl DHTProcessor { _ = bootstrap_tick.tick() => self.execute_bootstrap()?, _ = peer_dialing_tick.tick() => self.dial_next_peer(), } - }; + } Ok(()) } @@ -169,9 +169,7 @@ impl DHTProcessor { } */ DHTRequest::Bootstrap => { - message.respond( - self.execute_bootstrap().map(|_| DHTResponse::Success), - ); + message.respond(self.execute_bootstrap().map(|_| DHTResponse::Success)); } DHTRequest::GetNetworkInfo => { let info = self.swarm.network_info(); @@ -199,6 +197,7 @@ impl DHTProcessor { publisher: None, expires: None, }; + info!("PUTTING RECORD {:#?}", record); store_request!(self, message, behaviour.kad.put_record(record, Quorum::One)); } }; @@ -263,7 +262,7 @@ impl DHTProcessor { if let Some(message) = self.requests.remove(&id) { message.respond(Ok(DHTResponse::GetRecord { name: key.to_vec(), - value, + value: Some(value), })); } } @@ -350,7 +349,13 @@ impl DHTProcessor { } => {} kad::InboundRequest::PutRecord { source, record, .. } => match record { Some(rec) => { - if self.swarm.behaviour_mut().kad.store_mut().put(rec.clone()).is_err() + if self + .swarm + .behaviour_mut() + .kad + .store_mut() + .put(rec.clone()) + .is_err() { warn!("InboundRequest::PutRecord failed: {:?} {:?}", rec, source); } @@ -377,22 +382,19 @@ impl DHTProcessor { } fn process_identify_event(&mut self, event: IdentifyEvent) { - match event { - IdentifyEvent::Received { peer_id, info } => { - if info - .protocols - .iter() - .any(|p| p.as_bytes() == kad::protocol::DEFAULT_PROTO_NAME) - { - for addr in &info.listen_addrs { - self.swarm - .behaviour_mut() - .kad - .add_address(&peer_id, addr.clone()); - } + if let IdentifyEvent::Received { peer_id, info } = event { + if info + .protocols + .iter() + .any(|p| p.as_bytes() == kad::protocol::DEFAULT_PROTO_NAME) + { + for addr in &info.listen_addrs { + self.swarm + .behaviour_mut() + .kad + .add_address(&peer_id, addr.clone()); } } - _ => {} } } @@ -433,11 +435,17 @@ impl DHTProcessor { } fn start_listening(&mut self) -> Result<(), DHTError> { - let addr = self.p2p_address.clone(); - dht_event_trace(self, &format!("Start listening on {}", addr)); - self.swarm - .listen_on(addr).map(|_| ()) - .map_err(DHTError::from) + match self.p2p_address.as_ref() { + Some(p2p_address) => { + let addr = p2p_address.to_owned(); + dht_event_trace(self, &format!("Start listening on {}", addr)); + self.swarm + .listen_on(addr) + .map(|_| ()) + .map_err(DHTError::from) + } + None => Ok(()), + } } fn execute_bootstrap(&mut self) -> Result<(), DHTError> { @@ -456,7 +464,6 @@ impl fmt::Debug for DHTProcessor { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("DHTNode") .field("peer_id", &self.peer_id) - .field("p2p_address", &self.p2p_address) .field("config", &self.config) .finish() } @@ -476,9 +483,7 @@ fn dht_event_trace(processor: &DHTProcessor, data: &T) { let peer_id_b58 = processor.peer_id.to_base58(); trace!( "\nFrom ..{:#?}..\n{:#?}", - peer_id_b58 - .get(8..14) - .unwrap_or("INVALID PEER ID"), + peer_id_b58.get(8..14).unwrap_or("INVALID PEER ID"), data ); } diff --git a/rust/noosphere-ns/src/dht/swarm.rs b/rust/noosphere-ns/src/dht/swarm.rs index dfb7148fb..f15667bb8 100644 --- a/rust/noosphere-ns/src/dht/swarm.rs +++ b/rust/noosphere-ns/src/dht/swarm.rs @@ -7,7 +7,7 @@ use libp2p::{ dns, identify::{Behaviour as Identify, Config as IdentifyConfig, Event as IdentifyEvent}, identity::Keypair, - kad::{self, Kademlia, KademliaConfig, KademliaEvent}, + kad::{self, Kademlia, KademliaConfig, KademliaEvent, KademliaStoreInserts}, mplex, noise, swarm::SwarmBuilder, swarm::{self, ConnectionHandler, IntoConnectionHandler, SwarmEvent}, @@ -52,6 +52,7 @@ impl DHTBehaviour { let kad = { let mut cfg = KademliaConfig::default(); cfg.set_query_timeout(Duration::from_secs(config.query_timeout.into())); + cfg.set_record_filtering(KademliaStoreInserts::FilterBoth); // TODO(#99): Use SphereFS storage let store = kad::record::store::MemoryStore::new(local_peer_id.to_owned()); @@ -60,7 +61,7 @@ impl DHTBehaviour { let identify = { let config = IdentifyConfig::new("ipfs/1.0.0".into(), keypair.public()) - .with_agent_version(format!("noosphere-p2p/{}", env!("CARGO_PKG_VERSION"))); + .with_agent_version(format!("noosphere-ns/{}", env!("CARGO_PKG_VERSION"))); Identify::new(config) }; diff --git a/rust/noosphere-ns/src/dht/types.rs b/rust/noosphere-ns/src/dht/types.rs index 483c85d9a..81ac17917 100644 --- a/rust/noosphere-ns/src/dht/types.rs +++ b/rust/noosphere-ns/src/dht/types.rs @@ -71,7 +71,7 @@ pub enum DHTResponse { GetNetworkInfo(DHTNetworkInfo), GetRecord { name: Vec, - value: Vec, + value: Option>, }, SetRecord { name: Vec, @@ -96,7 +96,11 @@ impl fmt::Display for DHTResponse { fmt, "DHTResponse::GetRecord {{ name={:?}, value={:?} }}", str::from_utf8(name), - str::from_utf8(value) + if value.is_some() { + str::from_utf8(value.as_ref().unwrap()) + } else { + Ok("None") + } ), DHTResponse::SetRecord { name } => write!( fmt, diff --git a/rust/noosphere-ns/src/lib.rs b/rust/noosphere-ns/src/lib.rs index 3d81109a3..dd6032f15 100644 --- a/rust/noosphere-ns/src/lib.rs +++ b/rust/noosphere-ns/src/lib.rs @@ -1,5 +1,24 @@ +#[cfg(not(target_arch = "wasm32"))] #[macro_use] extern crate tracing; #[cfg(not(target_arch = "wasm32"))] pub mod dht; + +#[cfg(not(target_arch = "wasm32"))] +mod builder; +#[cfg(not(target_arch = "wasm32"))] +mod name_system; +#[cfg(not(target_arch = "wasm32"))] +mod records; + +#[cfg(not(target_arch = "wasm32"))] +pub use builder::NameSystemBuilder; +#[cfg(not(target_arch = "wasm32"))] +pub use cid::Cid; +#[cfg(not(target_arch = "wasm32"))] +pub use libp2p::Multiaddr; +#[cfg(not(target_arch = "wasm32"))] +pub use name_system::NameSystem; +#[cfg(not(target_arch = "wasm32"))] +pub use records::NSRecord; diff --git a/rust/noosphere-ns/src/name_system.rs b/rust/noosphere-ns/src/name_system.rs new file mode 100644 index 000000000..e975f79e0 --- /dev/null +++ b/rust/noosphere-ns/src/name_system.rs @@ -0,0 +1,195 @@ +use crate::{ + dht::{DHTConfig, DHTNode}, + records::NSRecord, +}; +use anyhow::{anyhow, Result}; +use cid::Cid; +use futures::future::try_join_all; +use libp2p::Multiaddr; +use std::collections::HashMap; +use ucan_key_support::ed25519::Ed25519KeyMaterial; + +/// The [NameSystem] is responsible for both propagating and resolving address records +/// in the Noosphere DHT. Hosted records can be set via `set_record(identity, link)`, propagating the +/// record immediately, and repropagated every `propagation_interval` seconds. Records +/// can be resolved via `get_record(identity)`. +pub struct NameSystem<'a> { + /// Bootstrap peers for the DHT network. + pub(crate) bootstrap_peers: Option<&'a Vec>, + pub(crate) dht: Option, + pub(crate) dht_config: DHTConfig, + /// Key of the NameSystem's sphere. + pub(crate) key_material: &'a Ed25519KeyMaterial, + /// In seconds, the Time-To-Live (TTL) duration of records set + /// in the network, and implicitly, how often the records are + /// propagated. + pub(crate) ttl: u64, + /// Map of sphere DIDs to [NSRecord] hosted/propagated by this name system. + pub(crate) hosted_records: HashMap, + /// Map of resolved sphere DIDs to resolved [NSRecord]. + pub(crate) resolved_records: HashMap, +} + +impl<'a> NameSystem<'a> { + /// Initializes and attempts to connect to the network. + pub async fn connect(&mut self) -> Result<()> { + let mut dht = DHTNode::new(self.key_material, self.bootstrap_peers, &self.dht_config)?; + dht.run().map_err(|e| anyhow!(e.to_string()))?; + dht.bootstrap().await.map_err(|e| anyhow!(e.to_string()))?; + dht.wait_for_peers(1) + .await + .map_err(|e| anyhow!(e.to_string()))?; + self.dht = Some(dht); + Ok(()) + } + + /// Disconnect and deallocate connections to the network. + pub fn disconnect(&mut self) -> Result<()> { + if let Some(mut dht) = self.dht.take() { + dht.terminate()?; + } + Ok(()) + } + + /// Propagates all hosted records on nearby peers in the DHT network. + /// in the DHT network. Automatically called every `ttl` seconds (TBD), + /// but can be manually called push updated records to the network. + /// + /// Can fail if NameSystem is not connected or if no peers can be found. + pub async fn propagate_records(&self) -> Result<()> { + if self.dht.is_none() { + return Err(anyhow!("not connected")); + } + if self.hosted_records.is_empty() { + return Ok(()); + } + + let pending_tasks: Vec<_> = self + .hosted_records + .iter() + .map(|(identity, record)| self.dht_set_record(identity, record)) + .collect(); + try_join_all(pending_tasks).await?; + Ok(()) + } + + /// Propagates the corresponding managed sphere's content Cid on nearby peers + /// in the DHT network. + /// + /// Can fail if NameSystem is not connected or if no peers can be found. + pub async fn set_record(&mut self, identity: &String, link: &Cid) -> Result<()> { + if self.dht.is_none() { + return Err(anyhow!("not connected")); + } + + let record = NSRecord::new_from_ttl(link.to_owned(), self.ttl)?; + self.dht_set_record(identity, &record).await?; + self.hosted_records.insert(identity.to_owned(), record); + Ok(()) + } + + /// Gets the content Cid for the provided sphere identity. + pub async fn get_record(&mut self, identity: &String) -> Option<&Cid> { + // Round about way of checking for local valid record before + // querying the DHT network due to the borrow checker. + // https://stackoverflow.com/questions/70779967/rust-borrow-checker-and-early-returns# + let has_valid_record: bool = { + if let Some(record) = self.resolved_records.get(identity) { + !record.is_expired() + } else { + false + } + }; + + // No non-expired record found locally, query the network. + if !has_valid_record { + match self.dht_get_record(identity).await { + Ok((_, Some(record))) => { + self.resolved_records.insert(identity.to_owned(), record); + } + Ok((_, None)) => {} + Err(_) => {} + } + } + match self.resolved_records.get(identity) { + Some(record) => Some(&record.cid), + None => None, + } + } + + /// Clears out the internal cache of resolved records. + pub fn flush_records(&mut self) { + self.resolved_records.drain(); + } + + /// Clears out the internal cache of resolved records + /// for the matched identity. Returned value indicates whether + /// a record was successfully removed. + pub fn flush_records_for_identity(&mut self, identity: &String) -> bool { + self.resolved_records.remove(identity).is_some() + } + + pub fn p2p_address(&self) -> Option<&Multiaddr> { + if let Some(dht) = &self.dht { + dht.p2p_address() + } else { + None + } + } + + /// Queries the DHT for a record for the given sphere identity. + /// If no record is found, no error is returned. + /// + /// Returns an error if not connected to the DHT network. + async fn dht_get_record(&self, identity: &String) -> Result<(String, Option)> { + let dht = self.dht.as_ref().ok_or_else(|| anyhow!("not connected"))?; + match dht.get_record(identity.to_owned().into_bytes()).await { + Ok((_, result)) => match result { + Some(value) => { + // Validation/correctness and filtering through + // the most recent values can be performed here + let record = NSRecord::from_bytes(value)?; + info!("NameSystem: GetRecord: {} {}", identity, &record.cid); + Ok((identity.to_owned(), Some(record))) + } + None => { + warn!("NameSystem: GetRecord: No record found for {}.", identity); + Ok((identity.to_owned(), None)) + } + }, + Err(e) => { + warn!("NameSystem: GetRecord: Failure for {} {:?}.", identity, e); + Err(anyhow!(e.to_string())) + } + } + } + + /// Propagates and serializes the record on peers in the DHT network. + /// + /// Can fail if NameSystem is not connected or if no peers can be found. + async fn dht_set_record(&self, identity: &String, record: &NSRecord) -> Result<()> { + let dht = self.dht.as_ref().ok_or_else(|| anyhow!("not connected"))?; + + match dht + .set_record(identity.to_owned().into_bytes(), record.to_bytes()?) + .await + { + Ok(_) => { + info!("NameSystem: SetRecord: {}", identity); + Ok(()) + } + Err(e) => { + warn!("NameSystem: SetRecord: Failure for {} {:?}.", identity, e); + Err(anyhow!(e.to_string())) + } + } + } +} + +impl<'a> Drop for NameSystem<'a> { + fn drop(&mut self) { + if let Err(e) = self.disconnect() { + error!("{}", e.to_string()); + } + } +} diff --git a/rust/noosphere-ns/src/records.rs b/rust/noosphere-ns/src/records.rs new file mode 100644 index 000000000..c5dbfa64a --- /dev/null +++ b/rust/noosphere-ns/src/records.rs @@ -0,0 +1,318 @@ +use anyhow::{anyhow, Error}; +use cid::Cid; +use serde::{ + de::{self, Deserializer, MapAccess, SeqAccess, Visitor}, + ser::{SerializeStruct, Serializer}, + Deserialize, Serialize, +}; + +use std::{ + fmt, + str::FromStr, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; + +/// An [NSRecord] is a struct representing a record stored in the +/// Noosphere Name System's DHT containing a [cid::Cid] of the +/// result, as well as a TTL expiration as seconds from Unix epoch. +/// +/// # Serialization +/// +/// When transmitting records across the network, they're encoded +/// as JSON-formatted UTF-8 strings. The [NSRecord::to_bytes] +/// and [NSRecord::from_bytes] methods respectively handle the +/// serialization and deserialization in this format. +/// +/// Fields are mapped by: +/// +/// `cid` => `"cid": String` +/// `expires` => `"exp": Number` +/// +/// An example of the serialized payload structure and +/// conversion looks like: +/// +/// ``` +/// use noosphere_ns::NSRecord; +/// use cid::{Cid, multihash::{Code, MultihashDigest}}; +/// use std::str::FromStr; +/// +/// let cid_str = "bafkreibme22gw2h7y2h7tg2fhqotaqjucnbc24deqo72b6mkl2egezxhvy"; +/// let expires = 1667262626; +/// +/// let record = NSRecord::new(Cid::from_str(cid_str).unwrap(), expires); +/// assert_eq!(record.cid.to_string(), cid_str); +/// assert_eq!(record.expires, expires); +/// +/// let bytes = record.to_bytes().unwrap(); +/// assert_eq!(&String::from_utf8(bytes.clone()).unwrap(), "{\"cid\":\"bafkreibme22gw2h7y2h7tg2fhqotaqjucnbc24deqo72b6mkl2egezxhvy\",\"exp\":1667262626}"); +/// assert_eq!(NSRecord::from_bytes(bytes).unwrap(), record); +/// ``` +#[derive(Debug, Eq, PartialEq, Clone)] +pub struct NSRecord { + pub cid: Cid, + pub expires: u64, +} + +impl NSRecord { + /// Creates a new [NSRecord]. + pub fn new(cid: Cid, expires: u64) -> Self { + Self { cid, expires } + } + + /// Creates a new [NSRecord] with an expiration `ttl` seconds from now. + pub fn new_from_ttl(cid: Cid, ttl: u64) -> Result { + let expires: u64 = SystemTime::now() + .checked_add(Duration::new(ttl, 0)) + .ok_or_else(|| anyhow!("Duration overflow."))? + .duration_since(UNIX_EPOCH) + .map_err(|e| anyhow!(e.to_string()))? + .as_secs(); + Ok(Self { cid, expires }) + } + + /// Creates a new [NSRecord] from serialized bytes. See [NSRecord] + /// for serialization details. + pub fn from_bytes(bytes: Vec) -> Result { + let string = String::from_utf8(bytes).map_err(|e| anyhow!(e.to_string()))?; + serde_json::from_str(&string).map_err(|e| anyhow!(e.to_string())) + } + + /// Serializes the record into bytes. See [NSRecord] for + /// serialization details. + pub fn to_bytes(&self) -> Result, Error> { + let bytes = serde_json::to_vec(self).map_err(|e| anyhow!(e.to_string()))?; + Ok(bytes) + } + + /// Validates the [NSRecord] based off of its expiration time + /// compared to the current system time. + pub fn is_expired(&self) -> bool { + match SystemTime::now().duration_since(UNIX_EPOCH) { + Ok(duration) => duration.as_secs() >= self.expires, + Err(_) => false, + } + } +} + +impl fmt::Display for NSRecord { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { + match serde_json::to_string(self) { + Ok(record_str) => write!(f, "{}", record_str), + Err(_) => write!(f, "{{ INVALID_RECORD }}"), + } + } +} + +/// Serialization for NSRecords. While [cid::Cid] has built-in serde +/// support under a feature flag, we roll our own to store the Cid +/// as a string rather than bytes. +impl Serialize for NSRecord { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut state = serializer.serialize_struct("NSRecord", 2)?; + state.serialize_field("cid", &self.cid.to_string())?; + state.serialize_field("exp", &self.expires)?; + state.end() + } +} + +/// Deserialization for NSRecords. While [cid::Cid] has built-in serde +/// support under a feature flag, we roll our own to store the Cid +/// as a string rather than bytes. +/// For more details on custom deserialization: https://serde.rs/deserialize-struct.html +impl<'de> Deserialize<'de> for NSRecord { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + const FIELDS: &[&str] = &["cid", "exp"]; + enum Field { + Cid, + Expires, + } + + impl<'de> Deserialize<'de> for Field { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + struct FieldVisitor; + + impl<'de> Visitor<'de> for FieldVisitor { + type Value = Field; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("`cid` or `exp`") + } + + fn visit_str(self, value: &str) -> Result + where + E: de::Error, + { + match value { + "cid" => Ok(Field::Cid), + "exp" => Ok(Field::Expires), + _ => Err(de::Error::unknown_field(value, FIELDS)), + } + } + } + + deserializer.deserialize_identifier(FieldVisitor) + } + } + + struct NSRecordVisitor; + + impl<'de> Visitor<'de> for NSRecordVisitor { + type Value = NSRecord; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("struct NSRecord") + } + + // This handler is not used with serde_json, but for sequence-based + // deserializers e.g. postcard + fn visit_seq(self, mut seq: V) -> Result + where + V: SeqAccess<'de>, + { + let cid = seq + .next_element()? + .ok_or_else(|| de::Error::invalid_length(0, &self))?; + let expires = seq + .next_element()? + .ok_or_else(|| de::Error::invalid_length(1, &self))?; + Ok(NSRecord::new(cid, expires)) + } + + fn visit_map(self, mut map: V) -> Result + where + V: MapAccess<'de>, + { + let mut cid = None; + let mut expires = None; + while let Some(key) = map.next_key()? { + match key { + Field::Cid => { + if cid.is_some() { + return Err(de::Error::duplicate_field("cid")); + } + cid = Some( + Cid::from_str(map.next_value::<&str>()?) + .map_err(de::Error::custom)?, + ); + } + Field::Expires => { + if expires.is_some() { + return Err(de::Error::duplicate_field("exp")); + } + expires = Some(map.next_value()?); + } + } + } + let cid = cid.ok_or_else(|| de::Error::missing_field("cid"))?; + let expires = expires.ok_or_else(|| de::Error::missing_field("exp"))?; + Ok(NSRecord::new(cid, expires)) + } + } + + deserializer.deserialize_struct("NSRecord", FIELDS, NSRecordVisitor) + } +} + +#[cfg(test)] +mod test { + use super::*; + use cid::multihash::{Code, MultihashDigest}; + use std::str::FromStr; + use std::time::{Duration, SystemTime, UNIX_EPOCH}; + + fn new_cid(s: &[u8]) -> Cid { + Cid::new_v1(0x55, Code::Sha2_256.digest(s)) + } + + #[test] + fn test_nsrecord_new() -> Result<(), Box> { + let cid = new_cid(b"foo"); + let expires: u64 = SystemTime::now() + .checked_add(Duration::new(3600, 0)) + .expect("valid duration") + .duration_since(UNIX_EPOCH)? + .as_secs(); + let record = NSRecord::new(cid, expires); + assert_eq!(record.cid, cid, "NSRecord::new() cid works"); + assert_eq!(record.expires, expires, "NSRecord::new() expires works"); + Ok(()) + } + + #[test] + fn test_nsrecord_new_from_ttl() -> Result<(), Box> { + let cid = new_cid(b"foo"); + let ttl = 3600; + let expected_expiration: u64 = SystemTime::now() + .checked_add(Duration::new(ttl, 0)) + .expect("valid duration") + .duration_since(UNIX_EPOCH)? + .as_secs(); + let record = NSRecord::new_from_ttl(cid, ttl)?; + assert_eq!(record.cid, cid); + assert!(record.expires.abs_diff(expected_expiration) < 5); + Ok(()) + } + + #[test] + fn test_nsrecord_from_bytes() -> Result<(), Box> { + let record = NSRecord::from_bytes( + String::from( + r#"{ + "cid": "bafkreibme22gw2h7y2h7tg2fhqotaqjucnbc24deqo72b6mkl2egezxhvy", + "exp": 1667262626 + }"#, + ) + .into_bytes(), + )?; + + assert_eq!( + record.cid.to_string(), + "bafkreibme22gw2h7y2h7tg2fhqotaqjucnbc24deqo72b6mkl2egezxhvy" + ); + assert_eq!(record.expires, 1667262626); + Ok(()) + } + + #[test] + fn test_nsrecord_to_bytes() -> Result<(), Box> { + let cid_str = "bafkreibme22gw2h7y2h7tg2fhqotaqjucnbc24deqo72b6mkl2egezxhvy"; + let record = NSRecord::new(Cid::from_str(cid_str)?, 1667262626); + let bytes = record.to_bytes()?; + + assert_eq!(&String::from_utf8(bytes.clone())?, "{\"cid\":\"bafkreibme22gw2h7y2h7tg2fhqotaqjucnbc24deqo72b6mkl2egezxhvy\",\"exp\":1667262626}"); + let de_record = NSRecord::from_bytes(bytes)?; + assert_eq!(de_record.cid.to_string(), cid_str); + assert_eq!(de_record.expires, 1667262626); + Ok(()) + } + + #[test] + fn test_nsrecord_is_expired() -> Result<(), Box> { + let record = NSRecord::new_from_ttl(new_cid(b"foo"), 3600)?; + assert!(!record.is_expired()); + + let record = NSRecord::new( + new_cid(b"foo"), + 60 * 60 * 24 * 365, /* a year after unix epoch */ + ); + assert!(record.is_expired()); + Ok(()) + } + + #[test] + fn test_nsrecord_to_string() -> Result<(), Box> { + let cid_str = "bafkreibme22gw2h7y2h7tg2fhqotaqjucnbc24deqo72b6mkl2egezxhvy"; + let record = NSRecord::new(Cid::from_str(cid_str)?, 1667262626); + assert_eq!(record.to_string(), "{\"cid\":\"bafkreibme22gw2h7y2h7tg2fhqotaqjucnbc24deqo72b6mkl2egezxhvy\",\"exp\":1667262626}"); + Ok(()) + } +} diff --git a/rust/noosphere-ns/tests/integration_test.rs b/rust/noosphere-ns/tests/dht_test.rs similarity index 90% rename from rust/noosphere-ns/tests/integration_test.rs rename to rust/noosphere-ns/tests/dht_test.rs index bbacaa4c1..bdd834af0 100644 --- a/rust/noosphere-ns/tests/integration_test.rs +++ b/rust/noosphere-ns/tests/dht_test.rs @@ -3,19 +3,15 @@ use noosphere_ns::dht::{DHTError, DHTNetworkInfo, DHTNode, DHTStatus}; use std::str; -mod utils; +pub mod utils; use noosphere_core::authority::generate_ed25519_key; -use utils::{create_test_config, generate_listening_addr, initialize_network, swarm_command}; + +use utils::{create_test_config, initialize_network, swarm_command}; /// Testing a detached DHTNode as a server with no peers. #[test_log::test(tokio::test)] async fn test_dhtnode_base_case() -> Result<(), DHTError> { - let mut node = DHTNode::new( - &generate_ed25519_key(), - &generate_listening_addr(), - None, - &create_test_config(), - )?; + let mut node = DHTNode::new(&generate_ed25519_key(), None, &create_test_config())?; assert_eq!(node.status(), DHTStatus::Initialized, "DHT is initialized"); node.run()?; assert_eq!(node.status(), DHTStatus::Active, "DHT is active"); @@ -85,7 +81,11 @@ async fn test_dhtnode_simple() -> Result<(), DHTError> { let result = client_b .get_record(String::from("foo").into_bytes()) .await?; - assert_eq!(str::from_utf8(result.as_ref().unwrap()).unwrap(), "bar"); + assert_eq!(str::from_utf8(&result.0).expect("parseable"), "foo"); + assert_eq!( + str::from_utf8(result.1.as_ref().unwrap()).expect("parseable"), + "bar" + ); Ok(()) } diff --git a/rust/noosphere-ns/tests/ns_test.rs b/rust/noosphere-ns/tests/ns_test.rs new file mode 100644 index 000000000..1bc775162 --- /dev/null +++ b/rust/noosphere-ns/tests/ns_test.rs @@ -0,0 +1,110 @@ +#![cfg(not(target_arch = "wasm32"))] +#![cfg(test)] +pub mod utils; +use anyhow::{anyhow, Result}; +use cid::{ + multihash::{Code, MultihashDigest}, + Cid, +}; +use noosphere_core::authority::generate_ed25519_key; +use noosphere_ns::{NameSystem, NameSystemBuilder}; + +use utils::{create_bootstrap_nodes, wait_ms}; + +fn new_cid(bytes: &[u8]) -> Cid { + const RAW: u64 = 0x55; + Cid::new_v1(RAW, Code::Sha2_256.digest(bytes)) +} + +#[test_log::test(tokio::test)] +async fn test_name_system() -> Result<()> { + let bootstrap_node = create_bootstrap_nodes(1) + .map_err(|e| anyhow!(e.to_string()))? + .pop() + .unwrap(); + let bootstrap_addresses = vec![bootstrap_node.p2p_address().unwrap().to_owned()]; + + let sphere_1_cid_1 = new_cid(b"00000000"); + let sphere_1_cid_2 = new_cid(b"11111111"); + let sphere_1_cid_3 = new_cid(b"22222222"); + let sphere_1_id = String::from("did:sphere_1"); + let sphere_2_cid_1 = new_cid(b"99999999"); + let sphere_2_cid_2 = new_cid(b"88888888"); + let sphere_2_id = String::from("did:sphere_2"); + + let ns_1_key = generate_ed25519_key(); + let ns_2_key = generate_ed25519_key(); + + let mut ns_1: NameSystem = NameSystemBuilder::default() + .key_material(&ns_1_key) + .listening_port(30000) + .ttl(3600) + .peer_dialing_interval(1) + .bootstrap_peers(&bootstrap_addresses) + .build()?; + + let mut ns_2: NameSystem = NameSystemBuilder::default() + .key_material(&ns_2_key) + .listening_port(30001) + .ttl(1) + .peer_dialing_interval(1) + .bootstrap_peers(&bootstrap_addresses) + .build()?; + + ns_1.connect().await?; + ns_2.connect().await?; + + // Test propagating records from ns_1 to ns_2 + ns_1.set_record(&sphere_1_id, &sphere_1_cid_1).await?; + + // `None` for a record that cannot be found + let cid = ns_2.get_record(&String::from("unknown")).await; + assert_eq!(cid, None, "no record found"); + + // Baseline fetching record from the network. + let cid = ns_2.get_record(&sphere_1_id).await.expect("to be some"); + assert_eq!(cid, &sphere_1_cid_1, "first record found"); + + // Use cache if record is not expired + ns_1.set_record(&sphere_1_id, &sphere_1_cid_2).await?; + let cid = ns_2.get_record(&sphere_1_id).await.expect("to be some"); + assert_eq!(cid, &sphere_1_cid_1, "record found in cache"); + + // Flush records and fetch latest value from network. + ns_2.flush_records(); + let cid = ns_2.get_record(&sphere_1_id).await.expect("to be some"); + assert_eq!( + cid, &sphere_1_cid_2, + "latest record is found from network after flushing all records" + ); + + // Flush records by identity and fetch latest value from network. + ns_1.set_record(&sphere_1_id, &sphere_1_cid_3).await?; + assert!(!ns_2.flush_records_for_identity(&String::from("invalid did"))); + assert!(ns_2.flush_records_for_identity(&sphere_1_id)); + let cid = ns_2.get_record(&sphere_1_id).await.expect("to be some"); + assert_eq!( + cid, &sphere_1_cid_3, + "latest record is found from network after flushing record" + ); + + // Now testing propagating records from ns_2 to ns_1, + // with a much shorter TTL + ns_2.set_record(&sphere_2_id, &sphere_2_cid_1).await?; + + // Baseline fetching record from the network. + let cid = ns_1.get_record(&sphere_2_id).await.expect("to be some"); + assert_eq!(cid, &sphere_2_cid_1, "first record found"); + + // Fetch record from network if local record is expired. + ns_2.set_record(&sphere_2_id, &sphere_2_cid_2).await?; + // Wait to ensure record is expired ;_; + wait_ms(1000).await; + let cid = ns_1.get_record(&sphere_2_id).await.expect("to be some"); + assert_eq!( + cid, &sphere_2_cid_2, + "record found from network from local expired record" + ); + + Ok(()) +} diff --git a/rust/noosphere-ns/tests/utils/mod.rs b/rust/noosphere-ns/tests/utils/mod.rs index 26239276d..eee47be0e 100644 --- a/rust/noosphere-ns/tests/utils/mod.rs +++ b/rust/noosphere-ns/tests/utils/mod.rs @@ -1,25 +1,26 @@ #![cfg(test)] use futures::future::try_join_all; - +use libp2p::{self, Multiaddr}; use noosphere_core::authority::generate_ed25519_key; use noosphere_ns::dht::{DHTConfig, DHTError, DHTNode}; use rand::{thread_rng, Rng}; use std::future::Future; -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::time::Duration; -pub fn generate_listening_addr() -> SocketAddr { - SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), - thread_rng().gen_range(49152..65535), +fn generate_listening_addr() -> Multiaddr { + format!( + "/ip4/127.0.0.1/tcp/{}", + thread_rng().gen_range(49152..65535) ) + .parse() + .expect("parseable") } pub async fn wait_ms(ms: u64) { tokio::time::sleep(Duration::from_millis(ms)).await; } -pub async fn await_or_timeout( +async fn await_or_timeout( timeout_ms: u64, future: impl Future, message: String, @@ -33,6 +34,7 @@ pub async fn await_or_timeout( pub fn create_test_config() -> DHTConfig { let mut config = DHTConfig::default(); config.peer_dialing_interval = 1; + config.listening_address = Some(generate_listening_addr()); config } @@ -48,28 +50,21 @@ where try_join_all(futures).await } -pub fn create_client_nodes_with_bootstrap_peers( +fn create_client_nodes_with_bootstrap_peers( bootstrap_count: usize, client_count: usize, ) -> Result<(Vec, Vec), DHTError> { let bootstrap_nodes = create_bootstrap_nodes(bootstrap_count)?; - let bootstrap_addresses: Vec = bootstrap_nodes + let bootstrap_addresses: Vec = bootstrap_nodes .iter() - // Remap Multiaddr to String for DHTNode interface - .map(|node| node.p2p_address().to_string()) + .map(|node| node.p2p_address().unwrap().to_owned()) .collect(); let mut client_nodes: Vec = vec![]; for _ in 0..client_count { let key_material = generate_ed25519_key(); - let listening_address = generate_listening_addr(); let config = create_test_config(); - let mut node = DHTNode::new( - &key_material, - &listening_address, - Some(&bootstrap_addresses), - &config, - )?; + let mut node = DHTNode::new(&key_material, Some(&bootstrap_addresses), &config)?; node.run()?; client_nodes.push(node); } @@ -80,14 +75,12 @@ pub fn create_client_nodes_with_bootstrap_peers( /// bootstrap nodes as bootstrap peers. pub fn create_bootstrap_nodes(count: usize) -> Result, DHTError> { let mut nodes: Vec = vec![]; - let mut addresses: Vec = vec![]; + let mut addresses: Vec = vec![]; for _ in 0..count { let key_material = generate_ed25519_key(); - let listening_address = generate_listening_addr(); let config = create_test_config(); - let node = DHTNode::new(&key_material, &listening_address, None, &config)?; - // Remap Multiaddr to String for DHTNode interface - addresses.push(node.p2p_address().to_string()); + let node = DHTNode::new(&key_material, None, &config)?; + addresses.push(node.p2p_address().unwrap().to_owned()); nodes.push(node); }