Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Network] Adding AptosPerf Client #6054

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/lint-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ jobs:
- uses: taiki-e/install-action@v1.5.6
with:
tool: nextest
- run: cargo nextest run --profile ci --locked --workspace --exclude smoke-test --exclude aptos-testcases --retries 3 --no-fail-fast
- run: RUST_MIN_STACK=4194304 cargo nextest run --profile ci --locked --workspace --exclude smoke-test --exclude aptos-testcases --retries 3 --no-fail-fast
env:
INDEXER_DATABASE_URL: postgresql://postgres@localhost/postgres

Expand Down
88 changes: 25 additions & 63 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ assert_approx_eq = "1.1.0"
assert_unordered = "0.1.1"
async-stream = "0.3"
async-trait = "0.1.53"
axum = "0.5.16"
axum = "0.6.1"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason we chose axum over hyper? I see that it's used by the http_test, but I'm just wondering why we didn't choose to reuse hyper given that all of our other services use it.

base64 = "0.13.0"
backtrace = "0.3.58"
bcs = { git = "https://github.com/aptos-labs/bcs.git", rev = "d31fab9d81748e2594be5cd5cdf845786a30562d" }
Expand Down Expand Up @@ -345,7 +345,7 @@ hkdf = "0.10.0"
hostname = "0.3.1"
http = "0.2.3"
httpmock = "0.6"
hyper = { version = "0.14.18", features = ["full"] }
hyper = { version = "0.14.23", features = ["full"] }
hyper-tls = "0.5.0"
include_dir = { version = "0.7.2", features = ["glob"] }
indicatif = "0.15.0"
Expand Down
2 changes: 2 additions & 0 deletions config/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,10 +434,12 @@ impl NodeConfig {

if let Some(network) = self.validator_network.as_mut() {
network.listen_address = crate::utils::get_available_port_in_multiaddr(true);
network.randomize_ports();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you also move the randomization of the listen_address into the new function? That way, everything is kept in one place.

}

for network in self.full_node_networks.iter_mut() {
network.listen_address = crate::utils::get_available_port_in_multiaddr(true);
network.randomize_ports();
}
}

Expand Down
16 changes: 16 additions & 0 deletions config/src/config/network_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ pub const INBOUND_TCP_RX_BUFFER_SIZE: u32 = 3 * 1024 * 1024; // 3MB ~6MB/s with
pub const INBOUND_TCP_TX_BUFFER_SIZE: u32 = 512 * 1024; // 1MB use a bigger spoon
pub const OUTBOUND_TCP_RX_BUFFER_SIZE: u32 = 3 * 1024 * 1024; // 3MB ~6MB/s with 500ms latency
pub const OUTBOUND_TCP_TX_BUFFER_SIZE: u32 = 1024 * 1024; // 1MB use a bigger spoon
pub const ENABLE_APTOS_NETPERF_CLIENT: bool = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be disabled by default, no?

pub const DEFAULT_APTOS_NETPERF_CLIENT_PORT: u16 = 9105;

#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[serde(default, deny_unknown_fields)]
Expand All @@ -80,6 +82,7 @@ pub struct NetworkConfig {
// Select this to enforce that both peers should authenticate each other, otherwise
// authentication only occurs for outgoing connections.
pub mutual_authentication: bool,
pub netperf_client_port: Option<u16>,
pub network_id: NetworkId,
pub runtime_threads: Option<usize>,
pub inbound_rx_buffer_size_bytes: Option<u32>,
Expand Down Expand Up @@ -120,7 +123,19 @@ impl Default for NetworkConfig {
}
}

fn netperf_client_port(enabled: bool) -> Option<u16> {
if enabled {
Some(DEFAULT_APTOS_NETPERF_CLIENT_PORT)
} else {
None
}
}

impl NetworkConfig {
pub fn randomize_ports(&mut self) {
self.netperf_client_port = Some(utils::get_available_port());
}

pub fn network_with_id(network_id: NetworkId) -> NetworkConfig {
let mutual_authentication = network_id.is_validator_network();
let mut config = Self {
Expand All @@ -129,6 +144,7 @@ impl NetworkConfig {
identity: Identity::None,
listen_address: "/ip4/0.0.0.0/tcp/6180".parse().unwrap(),
mutual_authentication,
netperf_client_port: netperf_client_port(ENABLE_APTOS_NETPERF_CLIENT),
network_id,
runtime_threads: None,
seed_addrs: HashMap::new(),
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ impl NetworkSender {
// Broadcast message over direct-send to all other validators.
if let Err(err) = self
.consensus_network_client
.send_to_many(other_validators.into_iter(), msg)
.send_to_many(other_validators.into_iter(), &msg)
{
warn!(error = ?err, "Error broadcasting message");
}
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/network_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ impl<NetworkClient: NetworkClientInterface<ConsensusMsg>> ConsensusNetworkClient
pub fn send_to_many(
&self,
peers: impl Iterator<Item = PeerId>,
message: ConsensusMsg,
message: &ConsensusMsg,
) -> Result<(), Error> {
let peer_network_ids: Vec<PeerNetworkId> = peers
.map(|peer| self.get_peer_network_id_for_peer(peer))
Expand Down
1 change: 1 addition & 0 deletions consensus/src/payload_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use tokio::sync::oneshot;

/// Responsible to extract the transactions out of the payload and notify QuorumStore about commits.
/// If QuorumStore is enabled, has to ask BatchReader for the transaction behind the proofs of availability in the payload.
#[allow(dead_code)]
pub enum PayloadManager {
DirectMempool,
InQuorumStore(BatchReader, Mutex<Sender<PayloadRequest>>),
Expand Down
2 changes: 2 additions & 0 deletions consensus/src/state_computer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ type NotificationType = (
Vec<ContractEvent>,
);

#[allow(dead_code)]
type CommitType = (u64, Round, Vec<Payload>);

/// Basic communication with the Execution module;
/// implements StateComputer traits.
#[allow(dead_code)]
pub struct ExecutionProxy {
executor: Arc<dyn BlockExecutorTrait>,
txn_notifier: Arc<dyn TxnNotifier>,
Expand Down
2 changes: 2 additions & 0 deletions crates/aptos-compression/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub enum CompressionClient {
Consensus,
Mempool,
StateSync,
NetPerf,
}

impl CompressionClient {
Expand All @@ -29,6 +30,7 @@ impl CompressionClient {
Self::Consensus => "consensus",
Self::Mempool => "mempool",
Self::StateSync => "state_sync",
Self::NetPerf => "netperf",
}
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/channel/src/message_queues.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ impl<K: Eq + Hash + Clone, T> PerKeyQueue<K, T> {
// For example, many of our queues have a max capacity of 1024. To
// handle a single rpc from a transient peer, we would end up
// allocating ~ 96 b * 1024 ~ 64 Kib per queue.
//TODO: Reconsider this. Maybe true for VFN but not Validators
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: space after // (I think this is a convention in our codebase, although, it's annoying the formatter doesn't enforce it 😢). There's a bunch of them that need to change.

.or_insert_with(|| VecDeque::with_capacity(1));

// Add the key to our round-robin queue if it's not already there
Expand Down
2 changes: 2 additions & 0 deletions network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ aptos-short-hex-str = { workspace = true }
aptos-time-service = { workspace = true }
aptos-types = { workspace = true }
async-trait = { workspace = true }
axum = { workspace = true }
bcs = { workspace = true }
bytes = { workspace = true }
dashmap = { worksapce = true }
futures = { workspace = true }
futures-util = { workspace = true }
hex = { workspace = true }
Expand Down
Loading