From 3e71a4c731b270a964a20338efe5f946fcaada4b Mon Sep 17 00:00:00 2001 From: i1i1 Date: Thu, 29 Sep 2022 13:46:05 +0300 Subject: [PATCH 1/2] Add multiple aggregators --- backend/Cargo.lock | 1 + backend/telemetry_shard/Cargo.toml | 3 +- backend/telemetry_shard/src/aggregator.rs | 84 ++++++++++++++--------- backend/telemetry_shard/src/main.rs | 6 +- 4 files changed, 60 insertions(+), 34 deletions(-) diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 4673e614..bd54388f 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -1671,6 +1671,7 @@ dependencies = [ "log", "num_cpus", "primitive-types", + "rand", "serde", "serde_json", "simple_logger", diff --git a/backend/telemetry_shard/Cargo.toml b/backend/telemetry_shard/Cargo.toml index cce698ad..aa8484cc 100644 --- a/backend/telemetry_shard/Cargo.toml +++ b/backend/telemetry_shard/Cargo.toml @@ -25,6 +25,7 @@ structopt = "0.3.21" thiserror = "1.0.25" tokio = { version = "1.10.1", features = ["full"] } tokio-util = { version = "0.6", features = ["compat"] } +rand = "0.8" [target.'cfg(not(target_env = "msvc"))'.dependencies] -jemallocator = "0.3.2" \ No newline at end of file +jemallocator = "0.3.2" diff --git a/backend/telemetry_shard/src/aggregator.rs b/backend/telemetry_shard/src/aggregator.rs index 7b7145d2..a9a7bb55 100644 --- a/backend/telemetry_shard/src/aggregator.rs +++ b/backend/telemetry_shard/src/aggregator.rs @@ -21,10 +21,14 @@ use common::{ node_types::BlockHash, AssignId, }; -use futures::{Sink, SinkExt}; -use std::collections::{HashMap, HashSet}; +use futures::{stream::FuturesUnordered, Sink, SinkExt, StreamExt}; +use rand::prelude::SliceRandom; use std::sync::atomic::AtomicU64; use std::sync::Arc; +use std::{ + collections::{HashMap, HashSet}, + num::NonZeroUsize, +}; /// A unique Id is assigned per websocket connection (or more accurately, /// per thing-that-subscribes-to-the-aggregator). That connection might send @@ -86,7 +90,7 @@ pub type FromAggregator = internal_messages::FromShardAggregator; /// The aggregator loop handles incoming messages from nodes, or from the telemetry core. /// this is where we decide what effect messages will have. #[derive(Clone)] -pub struct Aggregator(Arc); +pub struct Aggregator(Arc>); struct AggregatorInternal { /// Nodes that connect are each assigned a unique connection ID. Nodes @@ -101,8 +105,7 @@ struct AggregatorInternal { } impl Aggregator { - /// Spawn a new Aggregator. This connects to the telemetry backend - pub async fn spawn(telemetry_uri: http::Uri) -> anyhow::Result { + async fn aggregator(telemetry_uri: http::Uri) -> flume::Sender { let (tx_to_aggregator, rx_from_external) = flume::bounded(10); // Establish a resiliant connection to the core (this retries as needed): @@ -110,17 +113,19 @@ impl Aggregator { create_ws_connection_to_core(telemetry_uri).await; // Forward messages from the telemetry core into the aggregator: - let tx_to_aggregator2 = tx_to_aggregator.clone(); - tokio::spawn(async move { - while let Ok(msg) = rx_from_telemetry_core.recv_async().await { - let msg_to_aggregator = match msg { - Message::Connected => ToAggregator::ConnectedToTelemetryCore, - Message::Disconnected => ToAggregator::DisconnectedFromTelemetryCore, - Message::Data(data) => ToAggregator::FromTelemetryCore(data), - }; - if let Err(_) = tx_to_aggregator2.send_async(msg_to_aggregator).await { - // This will close the ws channels, which themselves log messages. - break; + tokio::spawn({ + let tx_to_aggregator = tx_to_aggregator.clone(); + async move { + while let Ok(msg) = rx_from_telemetry_core.recv_async().await { + let msg_to_aggregator = match msg { + Message::Connected => ToAggregator::ConnectedToTelemetryCore, + Message::Disconnected => ToAggregator::DisconnectedFromTelemetryCore, + Message::Data(data) => ToAggregator::FromTelemetryCore(data), + }; + if let Err(_) = tx_to_aggregator.send_async(msg_to_aggregator).await { + // This will close the ws channels, which themselves log messages. + break; + } } } }); @@ -130,12 +135,26 @@ impl Aggregator { rx_from_external, tx_to_telemetry_core, )); + tx_to_aggregator + } - // Return a handle to our aggregator so that we can send in messages to it: - Ok(Aggregator(Arc::new(AggregatorInternal { - conn_id: AtomicU64::new(1), - tx_to_aggregator, - }))) + /// Spawn a new Aggregator. This connects to the telemetry backend + pub async fn spawn( + telemetry_uri: http::Uri, + aggregators: NonZeroUsize, + ) -> anyhow::Result { + let aggregators = std::iter::repeat_with(|| telemetry_uri.clone()) + .take(aggregators.get()) + .map(Self::aggregator) + .collect::>() + .map(|tx_to_aggregator| AggregatorInternal { + conn_id: AtomicU64::new(1), + tx_to_aggregator, + }) + .collect() + .await; + + Ok(Aggregator(Arc::new(aggregators))) } // This is spawned into a separate task and handles any messages coming @@ -298,20 +317,21 @@ impl Aggregator { /// Return a sink that a node can send messages into to be handled by the aggregator. pub fn subscribe_node(&self) -> impl Sink + Unpin { + let aggregator = self + .0 + .choose(&mut rand::thread_rng()) + .expect("Always at least one is available"); + // Assign a unique aggregator-local ID to each connection that subscribes, and pass // that along with every message to the aggregator loop: - let conn_id: ConnId = self - .0 + let conn_id: ConnId = aggregator .conn_id .fetch_add(1, std::sync::atomic::Ordering::Relaxed); - let tx_to_aggregator = self.0.tx_to_aggregator.clone(); - - // Calling `send` on this Sink requires Unpin. There may be a nicer way than this, - // but pinning by boxing is the easy solution for now: - Box::pin( - tx_to_aggregator - .into_sink() - .with(move |msg| async move { Ok(ToAggregator::FromWebsocket(conn_id, msg)) }), - ) + + aggregator + .tx_to_aggregator + .clone() + .into_sink() + .with(move |msg| futures::future::ok(ToAggregator::FromWebsocket(conn_id, msg))) } } diff --git a/backend/telemetry_shard/src/main.rs b/backend/telemetry_shard/src/main.rs index f377bb89..5a90a79c 100644 --- a/backend/telemetry_shard/src/main.rs +++ b/backend/telemetry_shard/src/main.rs @@ -24,6 +24,7 @@ mod real_ip; use std::{ collections::HashMap, net::IpAddr, + num::NonZeroUsize, time::{Duration, Instant}, }; @@ -72,6 +73,9 @@ struct Opts { default_value = "ws://127.0.0.1:8000/shard_submit/" )] core_url: Uri, + /// Number of aggregators + #[structopt(short = "c", long = "core", default_value = "1")] + aggregators: NonZeroUsize, /// How many different nodes is a given connection to the /submit endpoint allowed to /// tell us about before we ignore the rest? /// @@ -138,7 +142,7 @@ fn main() { /// Declare our routes and start the server. async fn start_server(opts: Opts) -> anyhow::Result<()> { let block_list = BlockedAddrs::new(Duration::from_secs(opts.node_block_seconds)); - let aggregator = Aggregator::spawn(opts.core_url).await?; + let aggregator = Aggregator::spawn(opts.core_url, opts.aggregators).await?; let socket_addr = opts.socket; let max_nodes_per_connection = opts.max_nodes_per_connection; let bytes_per_second = opts.max_node_data_per_second; From 205392dffe4703f8fa6de0b541ff0b5bd79454db Mon Sep 17 00:00:00 2001 From: i1i1 Date: Fri, 30 Sep 2022 15:35:19 +0300 Subject: [PATCH 2/2] Remove second shard --- backend/telemetry_shard/Cargo.toml | 6 +++--- docker-compose.yml | 7 ------- 2 files changed, 3 insertions(+), 10 deletions(-) diff --git a/backend/telemetry_shard/Cargo.toml b/backend/telemetry_shard/Cargo.toml index aa8484cc..e7eab7f2 100644 --- a/backend/telemetry_shard/Cargo.toml +++ b/backend/telemetry_shard/Cargo.toml @@ -17,15 +17,15 @@ hyper = "0.14.11" log = "0.4.14" num_cpus = "1.13.0" primitive-types = { version = "0.9.0", features = ["serde"] } -serde = { version = "1.0.126", features = ["derive"] } +rand = "0.8" serde_json = "1.0.64" +serde = { version = "1.0.126", features = ["derive"] } simple_logger = "1.11.0" soketto = "0.6.0" structopt = "0.3.21" thiserror = "1.0.25" -tokio = { version = "1.10.1", features = ["full"] } tokio-util = { version = "0.6", features = ["compat"] } -rand = "0.8" +tokio = { version = "1.10.1", features = ["full"] } [target.'cfg(not(target_env = "msvc"))'.dependencies] jemallocator = "0.3.2" diff --git a/docker-compose.yml b/docker-compose.yml index 08e5a606..480fb938 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -49,13 +49,6 @@ services: expose: - 8001 - shard1: - <<: *shard - ports: - - 127.0.0.1:8002:80 - expose: - - 8002 - core: build: dockerfile: Dockerfile