Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add possibility to have several connections to core from shard #42

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions backend/telemetry_shard/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ hyper = "0.14.11"
log = "0.4.14"
num_cpus = "1.13.0"
primitive-types = { version = "0.9.0", features = ["serde"] }
serde = { version = "1.0.126", features = ["derive"] }
rand = "0.8"
serde_json = "1.0.64"
serde = { version = "1.0.126", features = ["derive"] }
simple_logger = "1.11.0"
soketto = "0.6.0"
structopt = "0.3.21"
thiserror = "1.0.25"
tokio = { version = "1.10.1", features = ["full"] }
tokio-util = { version = "0.6", features = ["compat"] }
tokio = { version = "1.10.1", features = ["full"] }

[target.'cfg(not(target_env = "msvc"))'.dependencies]
jemallocator = "0.3.2"
jemallocator = "0.3.2"
84 changes: 52 additions & 32 deletions backend/telemetry_shard/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@ use common::{
node_types::BlockHash,
AssignId,
};
use futures::{Sink, SinkExt};
use std::collections::{HashMap, HashSet};
use futures::{stream::FuturesUnordered, Sink, SinkExt, StreamExt};
use rand::prelude::SliceRandom;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::{
collections::{HashMap, HashSet},
num::NonZeroUsize,
};

/// A unique Id is assigned per websocket connection (or more accurately,
/// per thing-that-subscribes-to-the-aggregator). That connection might send
Expand Down Expand Up @@ -86,7 +90,7 @@ pub type FromAggregator = internal_messages::FromShardAggregator;
/// The aggregator loop handles incoming messages from nodes, or from the telemetry core.
/// this is where we decide what effect messages will have.
#[derive(Clone)]
pub struct Aggregator(Arc<AggregatorInternal>);
pub struct Aggregator(Arc<Vec<AggregatorInternal>>);

struct AggregatorInternal {
/// Nodes that connect are each assigned a unique connection ID. Nodes
Expand All @@ -101,26 +105,27 @@ struct AggregatorInternal {
}

impl Aggregator {
/// Spawn a new Aggregator. This connects to the telemetry backend
pub async fn spawn(telemetry_uri: http::Uri) -> anyhow::Result<Aggregator> {
async fn aggregator(telemetry_uri: http::Uri) -> flume::Sender<ToAggregator> {
let (tx_to_aggregator, rx_from_external) = flume::bounded(10);

// Establish a resiliant connection to the core (this retries as needed):
let (tx_to_telemetry_core, rx_from_telemetry_core) =
create_ws_connection_to_core(telemetry_uri).await;

// Forward messages from the telemetry core into the aggregator:
let tx_to_aggregator2 = tx_to_aggregator.clone();
tokio::spawn(async move {
while let Ok(msg) = rx_from_telemetry_core.recv_async().await {
let msg_to_aggregator = match msg {
Message::Connected => ToAggregator::ConnectedToTelemetryCore,
Message::Disconnected => ToAggregator::DisconnectedFromTelemetryCore,
Message::Data(data) => ToAggregator::FromTelemetryCore(data),
};
if let Err(_) = tx_to_aggregator2.send_async(msg_to_aggregator).await {
// This will close the ws channels, which themselves log messages.
break;
tokio::spawn({
let tx_to_aggregator = tx_to_aggregator.clone();
async move {
while let Ok(msg) = rx_from_telemetry_core.recv_async().await {
let msg_to_aggregator = match msg {
Message::Connected => ToAggregator::ConnectedToTelemetryCore,
Message::Disconnected => ToAggregator::DisconnectedFromTelemetryCore,
Message::Data(data) => ToAggregator::FromTelemetryCore(data),
};
if let Err(_) = tx_to_aggregator.send_async(msg_to_aggregator).await {
// This will close the ws channels, which themselves log messages.
break;
}
}
}
});
Expand All @@ -130,12 +135,26 @@ impl Aggregator {
rx_from_external,
tx_to_telemetry_core,
));
tx_to_aggregator
}

// Return a handle to our aggregator so that we can send in messages to it:
Ok(Aggregator(Arc::new(AggregatorInternal {
conn_id: AtomicU64::new(1),
tx_to_aggregator,
})))
/// Spawn a new Aggregator. This connects to the telemetry backend
pub async fn spawn(
telemetry_uri: http::Uri,
aggregators: NonZeroUsize,
) -> anyhow::Result<Aggregator> {
let aggregators = std::iter::repeat_with(|| telemetry_uri.clone())
.take(aggregators.get())
.map(Self::aggregator)
.collect::<FuturesUnordered<_>>()
.map(|tx_to_aggregator| AggregatorInternal {
conn_id: AtomicU64::new(1),
tx_to_aggregator,
})
.collect()
.await;

Ok(Aggregator(Arc::new(aggregators)))
}

// This is spawned into a separate task and handles any messages coming
Expand Down Expand Up @@ -298,20 +317,21 @@ impl Aggregator {

/// Return a sink that a node can send messages into to be handled by the aggregator.
pub fn subscribe_node(&self) -> impl Sink<FromWebsocket, Error = anyhow::Error> + Unpin {
let aggregator = self
.0
.choose(&mut rand::thread_rng())
.expect("Always at least one is available");

// Assign a unique aggregator-local ID to each connection that subscribes, and pass
// that along with every message to the aggregator loop:
let conn_id: ConnId = self
.0
let conn_id: ConnId = aggregator
.conn_id
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let tx_to_aggregator = self.0.tx_to_aggregator.clone();

// Calling `send` on this Sink requires Unpin. There may be a nicer way than this,
// but pinning by boxing is the easy solution for now:
Box::pin(
tx_to_aggregator
.into_sink()
.with(move |msg| async move { Ok(ToAggregator::FromWebsocket(conn_id, msg)) }),
)

aggregator
.tx_to_aggregator
.clone()
.into_sink()
.with(move |msg| futures::future::ok(ToAggregator::FromWebsocket(conn_id, msg)))
}
}
6 changes: 5 additions & 1 deletion backend/telemetry_shard/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ mod real_ip;
use std::{
collections::HashMap,
net::IpAddr,
num::NonZeroUsize,
time::{Duration, Instant},
};

Expand Down Expand Up @@ -72,6 +73,9 @@ struct Opts {
default_value = "ws://127.0.0.1:8000/shard_submit/"
)]
core_url: Uri,
/// Number of aggregators
#[structopt(short = "c", long = "core", default_value = "1")]
aggregators: NonZeroUsize,
/// How many different nodes is a given connection to the /submit endpoint allowed to
/// tell us about before we ignore the rest?
///
Expand Down Expand Up @@ -138,7 +142,7 @@ fn main() {
/// Declare our routes and start the server.
async fn start_server(opts: Opts) -> anyhow::Result<()> {
let block_list = BlockedAddrs::new(Duration::from_secs(opts.node_block_seconds));
let aggregator = Aggregator::spawn(opts.core_url).await?;
let aggregator = Aggregator::spawn(opts.core_url, opts.aggregators).await?;
let socket_addr = opts.socket;
let max_nodes_per_connection = opts.max_nodes_per_connection;
let bytes_per_second = opts.max_node_data_per_second;
Expand Down
7 changes: 0 additions & 7 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,6 @@ services:
expose:
- 8001

shard1:
<<: *shard
ports:
- 127.0.0.1:8002:80
expose:
- 8002

core:
build:
dockerfile: Dockerfile
Expand Down