diff --git a/Cargo.lock b/Cargo.lock index 143fff087..d5b6fba0d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4220,7 +4220,7 @@ dependencies = [ [[package]] name = "hotshot-query-service" version = "0.1.39" -source = "git+https://github.com/EspressoSystems/hotshot-query-service?tag=rc-0.1.40#0a2481db04df0a2076613546413f592e160ec1be" +source = "git+https://github.com/EspressoSystems/hotshot-query-service?tag=rc-0.1.40#a41c0a50aef228b1a0ea017bdb4341d06c902725" dependencies = [ "anyhow", "ark-serialize", @@ -7589,7 +7589,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" dependencies = [ "anyhow", - "itertools 0.12.1", + "itertools 0.11.0", "proc-macro2", "quote", "syn 2.0.66", @@ -8712,6 +8712,7 @@ dependencies = [ "jf-vid", "libp2p", "num-traits", + "num_enum", "paste", "portpicker", "pretty_assertions", @@ -8724,6 +8725,7 @@ dependencies = [ "serde_json", "sha2 0.10.8", "snafu 0.8.3", + "static_assertions", "strum", "surf-disco", "tagged-base64", diff --git a/builder/src/lib.rs b/builder/src/lib.rs index b2f62ce92..883a66e74 100644 --- a/builder/src/lib.rs +++ b/builder/src/lib.rs @@ -9,9 +9,7 @@ use futures::{ stream::{Stream, StreamExt}, }; use hotshot::{ - traits::{ - election::static_committee::GeneralStaticCommittee, implementations::NetworkingMetricsValue, - }, + traits::election::static_committee::GeneralStaticCommittee, types::{SignatureKey, SystemContextHandle}, HotShotInitializer, Memberships, Networks, SystemContext, }; @@ -379,8 +377,7 @@ pub mod testing { let network = Arc::new(MemoryNetwork::new( config.my_own_validator_config.public_key, - NetworkingMetricsValue::new(metrics), - self.master_map.clone(), + &self.master_map, None, )); let networks = Networks { diff --git a/builder/src/permissioned.rs b/builder/src/permissioned.rs index 30dd571f1..93a5d6073 100644 --- a/builder/src/permissioned.rs +++ b/builder/src/permissioned.rs @@ -12,8 +12,8 @@ use hotshot::{ traits::{ election::static_committee::GeneralStaticCommittee, implementations::{ - derive_libp2p_peer_id, CombinedNetworks, KeyPair, Libp2pNetwork, - NetworkingMetricsValue, PushCdnNetwork, Topic, WrappedSignatureKey, + derive_libp2p_peer_id, CdnMetricsValue, CombinedNetworks, KeyPair, Libp2pNetwork, + PushCdnNetwork, Topic, WrappedSignatureKey, }, }, types::{SignatureKey, SystemContextHandle}, @@ -209,6 +209,7 @@ pub async fn init_node Result<()> { @@ -111,6 +123,7 @@ async fn main() -> Result<()> { public_advertise_endpoint: args.public_advertise_endpoint, private_bind_endpoint: args.private_bind_endpoint, private_advertise_endpoint: args.private_advertise_endpoint, + global_memory_pool_size: Some(args.global_memory_pool_size), }; // Create new `Broker` diff --git a/sequencer/src/bin/cdn-marshal.rs b/sequencer/src/bin/cdn-marshal.rs index a9646d8b7..4a5030deb 100644 --- a/sequencer/src/bin/cdn-marshal.rs +++ b/sequencer/src/bin/cdn-marshal.rs @@ -4,7 +4,7 @@ use anyhow::Result; use cdn_marshal::{Config, Marshal}; use clap::Parser; -use sequencer::{network::cdn::ProductionDef, SeqTypes}; +use sequencer::{network::cdn::ProductionDef, options::parse_size, SeqTypes}; use tracing_subscriber::EnvFilter; #[derive(Parser, Debug)] @@ -38,6 +38,17 @@ struct Args { /// If not provided, a local, pinned CA is used #[arg(long, env = "ESPRESSO_CDN_MARSHAL_CA_KEY_PATH")] ca_key_path: Option, + + /// The size of the global memory pool. This is the maximum number of bytes that + /// can be allocated at once for all connections. A connection will block if it + /// tries to allocate more than this amount until some memory is freed. + #[arg( + long, + default_value = "1GB", + value_parser = parse_size, + env = "ESPRESSO_CDN_MARSHAL_GLOBAL_MEMORY_POOL_SIZE" + )] + global_memory_pool_size: usize, } #[async_std::main] @@ -64,6 +75,7 @@ async fn main() -> Result<()> { metrics_bind_endpoint: args.metrics_bind_endpoint, ca_cert_path: args.ca_cert_path, ca_key_path: args.ca_key_path, + global_memory_pool_size: Some(args.global_memory_pool_size), }; // Create new `Marshal` from the config diff --git a/sequencer/src/bin/dev-cdn.rs b/sequencer/src/bin/dev-cdn.rs index 73def9e41..064142d71 100644 --- a/sequencer/src/bin/dev-cdn.rs +++ b/sequencer/src/bin/dev-cdn.rs @@ -75,6 +75,7 @@ async fn main() -> Result<()> { ca_cert_path: None, ca_key_path: None, + global_memory_pool_size: Some(1024 * 1024 * 1024), }; // Configure the marshal @@ -84,6 +85,7 @@ async fn main() -> Result<()> { discovery_endpoint: discovery_endpoint.clone(), ca_cert_path: None, ca_key_path: None, + global_memory_pool_size: Some(1024 * 1024 * 1024), }; // Create a new `Broker` diff --git a/sequencer/src/lib.rs b/sequencer/src/lib.rs index 6192f8d28..5f6fbb9f6 100644 --- a/sequencer/src/lib.rs +++ b/sequencer/src/lib.rs @@ -42,8 +42,8 @@ use hotshot::{ traits::{ election::static_committee::GeneralStaticCommittee, implementations::{ - derive_libp2p_peer_id, KeyPair, MemoryNetwork, NetworkingMetricsValue, PushCdnNetwork, - Topic, WrappedSignatureKey, + derive_libp2p_peer_id, CdnMetricsValue, KeyPair, MemoryNetwork, PushCdnNetwork, Topic, + WrappedSignatureKey, }, }, types::SignatureKey, @@ -445,6 +445,7 @@ pub async fn init_node( public_key: WrappedSignatureKey(my_config.public_key), private_key: my_config.private_key.clone(), }, + CdnMetricsValue::new(metrics), ) .with_context(|| "Failed to create CDN network")?; @@ -458,6 +459,7 @@ pub async fn init_node( // We need the private key so we can derive our Libp2p keypair // (using https://docs.rs/blake3/latest/blake3/fn.derive_key.html) &my_config.private_key, + hotshot::traits::implementations::Libp2pMetricsValue::new(metrics), ) .await .with_context(|| "Failed to create libp2p network")?; @@ -503,12 +505,6 @@ pub async fn init_node( _pd: Default::default(), }; - // The web server network doesn't have any metrics. By creating and dropping a - // `NetworkingMetricsValue`, we ensure the networking metrics are created, but just not - // populated, so that monitoring software built to work with network-related metrics doesn't - // crash horribly just because we're not using the P2P network yet. - let _ = NetworkingMetricsValue::new(metrics); - let mut genesis_state = ValidatedState { chain_config: genesis.chain_config.into(), ..Default::default() @@ -789,8 +785,7 @@ pub mod testing { let network = Arc::new(MemoryNetwork::new( config.my_own_validator_config.public_key, - NetworkingMetricsValue::new(metrics), - self.master_map.clone(), + &self.master_map, None, )); let networks = Networks { diff --git a/sequencer/src/network/cdn.rs b/sequencer/src/network/cdn.rs index e0b998e41..a3e6b7074 100644 --- a/sequencer/src/network/cdn.rs +++ b/sequencer/src/network/cdn.rs @@ -3,16 +3,33 @@ use std::marker::PhantomData; use bincode::Options; use cdn_broker::reexports::{ - connection::{ - protocols::{Quic, Tcp}, - NoMiddleware, TrustedMiddleware, UntrustedMiddleware, - }, + connection::protocols::{Quic, Tcp}, crypto::signature::{Serializable, SignatureScheme}, - def::{ConnectionDef, RunDef}, + def::{ConnectionDef, RunDef, Topic as TopicTrait}, discovery::{Embedded, Redis}, }; -use hotshot::{traits::implementations::Topic, types::SignatureKey}; +use hotshot::{traits::implementations::Topic as HotShotTopic, types::SignatureKey}; use hotshot_types::{traits::node_implementation::NodeType, utils::bincode_opts}; +use num_enum::{IntoPrimitive, TryFromPrimitive}; +use static_assertions::const_assert_eq; + +/// The enum for the topics we can subscribe to in the Push CDN +#[repr(u8)] +#[derive(IntoPrimitive, TryFromPrimitive, Clone, PartialEq, Eq)] +pub enum Topic { + /// The global topic + Global = 0, + /// The DA topic + Da = 1, +} + +// Make sure the topics are the same as defined in `HotShot`. +const_assert_eq!(Topic::Global as u8, HotShotTopic::Global as u8); +const_assert_eq!(Topic::Da as u8, HotShotTopic::Da as u8); + +/// Implement the `TopicTrait` for our `Topic` enum. This lets us define +/// compatible topics at the broker-level. Others will be rejected. +impl TopicTrait for Topic {} /// A wrapped `SignatureKey`. We need to implement the Push CDN's `SignatureScheme` /// trait in order to sign and verify messages to/from the CDN. @@ -70,7 +87,6 @@ pub struct UserDef(PhantomData); impl ConnectionDef for UserDef { type Scheme = WrappedSignatureKey; type Protocol = Quic; - type Middleware = UntrustedMiddleware; } /// The broker definition for the Push CDN. @@ -79,7 +95,6 @@ pub struct BrokerDef(PhantomData); impl ConnectionDef for BrokerDef { type Scheme = WrappedSignatureKey; type Protocol = Tcp; - type Middleware = TrustedMiddleware; } /// The client definition for the Push CDN. Uses the Quic @@ -90,7 +105,6 @@ pub struct ClientDef(PhantomData); impl ConnectionDef for ClientDef { type Scheme = WrappedSignatureKey; type Protocol = Quic; - type Middleware = NoMiddleware; } /// The testing run definition for the Push CDN.