Skip to content

Commit

Permalink
feat: Remove Mutex from NNS ApiServer for concurrency (subconscio…
Browse files Browse the repository at this point in the history
…usnetwork#357)

* Support defining partial DhtConfig values via TOML
* Update NsRecord's Debug formatter.
* Add logs in docker startup scripts to display RUST_LOG, NOOSPHERE_LOG
* Remove vestigial tracing_subscriber dependency
* Add orb, orb_ns to default logging in initialize_tracing().
  • Loading branch information
jsantell authored May 5, 2023
1 parent fc5e42f commit 2347d10
Show file tree
Hide file tree
Showing 15 changed files with 122 additions and 58 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions images/orb-ns/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,7 @@ if [[ -z "$2" ]]; then
exit 1
fi

echo "RUST_LOG=${RUST_LOG}"
echo "NOOSPHERE_LOG=${NOOSPHERE_LOG}"

orb-ns run --config $CONFIG_FILE
3 changes: 3 additions & 0 deletions images/orb/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,7 @@ if ! [ -z "$NS_API" ]; then
ARGS="${ARGS} --name-resolver-api ${NS_API}"
fi

echo "RUST_LOG=${RUST_LOG}"
echo "NOOSPHERE_LOG=${NOOSPHERE_LOG}"

orb serve $ARGS
2 changes: 1 addition & 1 deletion rust/noosphere-cli/tests/peer_to_peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ async fn start_name_system_server(ipfs_url: &Url) -> Result<(JoinHandle<()>, Url
tokio::spawn(async move {
let mut network = NameSystemNetwork::generate(2, store).await.unwrap();
let node = network.nodes_mut().pop().unwrap();
start_name_system_api_server(Arc::new(Mutex::new(node)), listener)
start_name_system_api_server(Arc::new(node), listener)
.await
.unwrap();
}),
Expand Down
2 changes: 2 additions & 0 deletions rust/noosphere-core/src/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ pub static NOOSPHERE_LOG_LEVEL_CRATES: &[&str] = &[
"noosphere_car",
"noosphere_api",
"noosphere_ns",
"orb",
"orb_ns",
"tower_http",
];

Expand Down
2 changes: 2 additions & 0 deletions rust/noosphere-gateway/src/worker/name_system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ use ucan::crypto::KeyMaterial;
use url::Url;

const PERIODIC_PUBLISH_INTERVAL_SECONDS: u64 = 5 * 60;
/// How many seconds between queueing up an address
/// to resolve from the name system.
const PERIODIC_RESOLVER_INTERVAL_SECONDS: u64 = 60;

pub struct NameSystemConfiguration {
Expand Down
3 changes: 1 addition & 2 deletions rust/noosphere-ns/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ toml = { version = "~0.5", optional = true }
# noosphere_ns::server
axum = { version = "~0.5", features = ["json", "headers", "macros"], optional = true }
reqwest = { version = "~0.11", default-features = false, features = ["json", "rustls-tls"], optional = true }
tracing-subscriber = { workspace = true, optional = true }
tower-http = { version = "~0.3", features = ["trace"], optional = true }
url = { version = "^2", features = [ "serde" ], optional = true }

Expand All @@ -65,7 +64,7 @@ tempdir = { version = "~0.3" }

[features]
default = ["orb-ns", "api-server"]
api-server = ["axum", "reqwest", "url", "tracing-subscriber", "tower-http"]
api-server = ["axum", "reqwest", "url", "tower-http"]
orb-ns = ["clap", "noosphere", "home", "toml", "noosphere-ipfs"]

[[bin]]
Expand Down
11 changes: 11 additions & 0 deletions rust/noosphere-ns/src/bin/orb-ns/runner/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ listening_address = 10000
peers = [
"/ip4/127.0.0.1/tcp/10001"
]
[dht_config]
query_timeout = 55
"#,
)
.await?;
Expand All @@ -207,6 +210,14 @@ peers = [
)
.await?;

assert!(
config.dht_config.query_timeout == 55,
"expected explicit DhtConfig properties to apply"
);
assert!(
config.dht_config.bootstrap_interval == 5 * 60,
"expected default DhtConfig properties to apply"
);
assert!(
keys_equal(&config.key_material, &key_1).await?,
"expected key material"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use std::{
task,
time::Duration,
};
use tokio::sync::Mutex;
use url::Url;

#[cfg(feature = "api-server")]
Expand All @@ -22,7 +21,7 @@ use noosphere_ns::server::ApiServer;
struct ApiServer;
#[cfg(not(feature = "api-server"))]
impl ApiServer {
pub fn serve(_ns: Arc<Mutex<NameSystem>>, _listener: TcpListener) -> Self {
pub fn serve(_ns: Arc<NameSystem>, _listener: TcpListener) -> Self {
ApiServer {}
}
}
Expand All @@ -35,7 +34,7 @@ impl ApiServer {
pub struct NameSystemRunner {
#[serde(skip_serializing)]
#[allow(dead_code)]
name_system: Arc<Mutex<NameSystem>>,
name_system: Arc<NameSystem>,
#[serde(skip_serializing)]
#[allow(dead_code)]
api_thread: Option<ApiServer>,
Expand Down Expand Up @@ -80,7 +79,7 @@ impl NameSystemRunner {
node.add_peers(config.peers.to_owned()).await?;
node.bootstrap().await?;

let wrapped_node = Arc::new(Mutex::new(node));
let wrapped_node = Arc::new(node);

let (api_address, api_thread) = if cfg!(feature = "api-server") {
if let Some(requested_addr) = config.api_address.take() {
Expand Down
53 changes: 45 additions & 8 deletions rust/noosphere-ns/src/dht/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,40 +8,77 @@ pub struct DhtConfig {
/// If bootstrap peers are provided, how often,
/// in seconds, should the bootstrap process execute
/// to keep routing tables fresh.
#[serde(default = "default_bootstrap_interval")]
pub bootstrap_interval: u64,
/// How frequently, in seconds, the DHT attempts to
/// dial peers found in its kbucket. Outside of tests,
/// should not be lower than 5 seconds.
#[serde(default = "default_peer_dialing_interval")]
pub peer_dialing_interval: u64,
/// How long, in seconds, published records are replicated to
/// peers. Should be significantly shorter than `record_ttl`.
/// See [KademliaConfig::set_publication_interval] and [KademliaConfig::set_provider_publication_interval].
#[serde(default = "default_publication_interval")]
pub publication_interval: u32,
/// How long, in seconds, until an unsuccessful
/// DHT query times out.
#[serde(default = "default_query_timeout")]
pub query_timeout: u32,
/// How long, in seconds, stored records are replicated to
/// peers. Should be significantly shorter than `publication_interval`.
/// See [KademliaConfig::set_replication_interval].
/// Only applies to value records.
#[serde(default = "default_replication_interval")]
pub replication_interval: u32,
/// How long, in seconds, records remain valid for. Should be significantly
/// longer than `publication_interval`.
/// See [KademliaConfig::set_record_ttl] and [KademliaConfig::set_provider_record_ttl].
#[serde(default = "default_record_ttl")]
pub record_ttl: u32,
}

// We break up defaults into individual functions to support deserializing
// via serde when `DhtConfig` is used as a nested value. Otherwise,
// `[dht_config] query_timeout = 60` would require defining all other fields.

fn default_bootstrap_interval() -> u64 {
5 * 60 // 5 mins
}

fn default_peer_dialing_interval() -> u64 {
if cfg!(test) {
1
} else {
5
}
}

fn default_publication_interval() -> u32 {
60 * 60 * 24 // 1 day
}

fn default_query_timeout() -> u32 {
5 * 60 // 5 mins
}

fn default_replication_interval() -> u32 {
60 * 60 // 1 hour
}

fn default_record_ttl() -> u32 {
60 * 60 * 24 * 3 // 3 days
}

impl Default for DhtConfig {
/// Creates a new [DHTConfig] with defaults applied.
/// Creates a new [DhtConfig] with defaults applied.
fn default() -> Self {
let peer_dialing_interval = if cfg!(test) { 1 } else { 5 };
Self {
bootstrap_interval: 5 * 60, // 5 mins
peer_dialing_interval,
publication_interval: 60 * 60 * 24, // 1 day
query_timeout: 5 * 60, // 5 mins
replication_interval: 60 * 60, // 1 hour
record_ttl: 60 * 60 * 24 * 3, // 3 days
bootstrap_interval: default_bootstrap_interval(),
peer_dialing_interval: default_peer_dialing_interval(),
publication_interval: default_publication_interval(),
query_timeout: default_query_timeout(),
replication_interval: default_replication_interval(),
record_ttl: default_record_ttl(),
}
}
}
21 changes: 19 additions & 2 deletions rust/noosphere-ns/src/records.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@ use serde::{
ser::{self, Serialize, Serializer},
};
use serde_json::Value;
use std::{convert::TryFrom, fmt::Display, str, str::FromStr};
use std::{
convert::TryFrom,
fmt::{Debug, Display},
str,
str::FromStr,
};
use ucan::{
builder::UcanBuilder,
crypto::KeyMaterial,
Expand Down Expand Up @@ -59,7 +64,7 @@ use ucan::{chain::ProofChain, crypto::did::DidParser, Ucan};
/// }]
/// }
/// ```
#[derive(Debug, Clone)]
#[derive(Clone)]
pub struct NsRecord {
/// The wrapped UCAN token describing this record.
pub(crate) token: Ucan,
Expand Down Expand Up @@ -221,6 +226,18 @@ impl NsRecord {
}
}

impl Debug for NsRecord {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let link = self.link.map(|cid| cid.to_string());
write!(
f,
"NsRecord {{ \"sphere\": \"{}\", \"link\": \"{:?}\" }}",
self.token.audience(),
link
)
}
}

impl Display for NsRecord {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let link = self.link.map(|cid| cid.to_string());
Expand Down
2 changes: 1 addition & 1 deletion rust/noosphere-ns/src/server/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ mod test {
.await
.unwrap();

let ns = Arc::new(Mutex::new(ns));
let ns = Arc::new(ns);
let server = ApiServer::serve(ns, api_listener);
let data = DataPlaceholder {
_server: server,
Expand Down
52 changes: 22 additions & 30 deletions rust/noosphere-ns/src/server/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use axum::{
use noosphere_core::data::Did;
use serde::Deserialize;
use std::sync::Arc;
use tokio::sync::Mutex;

pub struct JsonErr(StatusCode, String);
impl IntoResponse for JsonErr {
Expand All @@ -23,10 +22,9 @@ impl IntoResponse for JsonErr {
type JsonResponse<T> = Result<Json<T>, JsonErr>;

pub async fn get_network_info(
Extension(name_system): Extension<Arc<Mutex<NameSystem>>>,
Extension(name_system): Extension<Arc<NameSystem>>,
) -> JsonResponse<NetworkInfo> {
let ns = name_system.lock().await;
let network_info = ns
let network_info = name_system
.network_info()
.await
.map_err(move |error| JsonErr(StatusCode::INTERNAL_SERVER_ERROR, error.to_string()))?;
Expand All @@ -38,68 +36,64 @@ pub async fn get_peer_id(Extension(peer_id): Extension<PeerId>) -> JsonResponse<
}

pub async fn get_peers(
Extension(name_system): Extension<Arc<Mutex<NameSystem>>>,
Extension(name_system): Extension<Arc<NameSystem>>,
) -> JsonResponse<Vec<Peer>> {
let ns = name_system.lock().await;
let peers = ns
let peers = name_system
.peers()
.await
.map_err(move |error| JsonErr(StatusCode::INTERNAL_SERVER_ERROR, error.to_string()))?;
Ok(Json(peers))
}

pub async fn post_peers(
Extension(name_system): Extension<Arc<Mutex<NameSystem>>>,
Extension(name_system): Extension<Arc<NameSystem>>,
Path(addr): Path<String>,
) -> JsonResponse<()> {
let ns = name_system.lock().await;
let peer_addr = parse_multiaddr(&addr)?;
ns.add_peers(vec![peer_addr])
name_system
.add_peers(vec![peer_addr])
.await
.map_err(move |error| JsonErr(StatusCode::INTERNAL_SERVER_ERROR, error.to_string()))?;
Ok(Json(()))
}

pub async fn post_listener(
Extension(name_system): Extension<Arc<Mutex<NameSystem>>>,
Extension(name_system): Extension<Arc<NameSystem>>,
Path(addr): Path<String>,
) -> JsonResponse<Multiaddr> {
let ns = name_system.lock().await;
let listener = parse_multiaddr(&addr)?;
let address = ns
let address = name_system
.listen(listener)
.await
.map_err(move |error| JsonErr(StatusCode::INTERNAL_SERVER_ERROR, error.to_string()))?;
Ok(Json(address))
}

pub async fn delete_listener(
Extension(name_system): Extension<Arc<Mutex<NameSystem>>>,
Extension(name_system): Extension<Arc<NameSystem>>,
) -> JsonResponse<()> {
let ns = name_system.lock().await;
ns.stop_listening()
name_system
.stop_listening()
.await
.map_err(move |error| JsonErr(StatusCode::INTERNAL_SERVER_ERROR, error.to_string()))?;
Ok(Json(()))
}

pub async fn get_address(
Extension(name_system): Extension<Arc<Mutex<NameSystem>>>,
Extension(name_system): Extension<Arc<NameSystem>>,
) -> JsonResponse<Option<Multiaddr>> {
let ns = name_system.lock().await;
let address = ns
let address = name_system
.address()
.await
.map_err(move |error| JsonErr(StatusCode::INTERNAL_SERVER_ERROR, error.to_string()))?;
Ok(Json(address))
}

pub async fn get_record(
Extension(name_system): Extension<Arc<Mutex<NameSystem>>>,
Extension(name_system): Extension<Arc<NameSystem>>,
Path(did): Path<Did>,
) -> JsonResponse<Option<NsRecord>> {
let ns = name_system.lock().await;
let record = ns
let record = name_system
.get_record(&did)
.await
.map_err(move |error| JsonErr(StatusCode::INTERNAL_SERVER_ERROR, error.to_string()))?;
Expand All @@ -112,22 +106,20 @@ pub struct PostRecordQuery {
}

pub async fn post_record(
Extension(name_system): Extension<Arc<Mutex<NameSystem>>>,
Extension(name_system): Extension<Arc<NameSystem>>,
Json(record): Json<NsRecord>,
Query(query): Query<PostRecordQuery>,
) -> JsonResponse<()> {
let ns = name_system.lock().await;
ns.put_record(record, query.quorum)
name_system
.put_record(record, query.quorum)
.await
.map_err(move |error| JsonErr(StatusCode::INTERNAL_SERVER_ERROR, error.to_string()))?;
Ok(Json(()))
}

pub async fn bootstrap(
Extension(name_system): Extension<Arc<Mutex<NameSystem>>>,
) -> JsonResponse<()> {
let ns = name_system.lock().await;
ns.bootstrap()
pub async fn bootstrap(Extension(name_system): Extension<Arc<NameSystem>>) -> JsonResponse<()> {
name_system
.bootstrap()
.await
.map_err(move |error| JsonErr(StatusCode::INTERNAL_SERVER_ERROR, error.to_string()))?;
Ok(Json(()))
Expand Down
Loading

0 comments on commit 2347d10

Please sign in to comment.