Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: [MR-636] Add size limits as fields to the stream builder #3885

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
4 changes: 3 additions & 1 deletion rs/messaging/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ pub(crate) mod routing;
mod scheduling;
mod state_machine;

pub use message_routing::{MessageRoutingImpl, SyncMessageRouting};
pub use message_routing::{
MessageRoutingImpl, SyncMessageRouting, MAX_STREAM_MESSAGES, TARGET_STREAM_SIZE_BYTES,
};
63 changes: 60 additions & 3 deletions rs/messaging/src/message_routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,18 @@ const CRITICAL_ERROR_FAILED_TO_READ_REGISTRY: &str = "mr_failed_to_read_registry
pub const CRITICAL_ERROR_NON_INCREASING_BATCH_TIME: &str = "mr_non_increasing_batch_time";
pub const CRITICAL_ERROR_INDUCT_RESPONSE_FAILED: &str = "mr_induct_response_failed";

/// Desired byte size of an outgoing stream.
///
/// At most `MAX_STREAM_MESSAGES` are enqueued into a stream; but only until its
/// `count_bytes()` is greater than or equal to `TARGET_STREAM_SIZE_BYTES`.
pub const TARGET_STREAM_SIZE_BYTES: usize = 10 * 1024 * 1024;

/// Maximum number of messages in a stream.
///
/// At most `MAX_STREAM_MESSAGES` are enqueued into a stream; but only until its
/// `count_bytes()` is greater than or equal to `TARGET_STREAM_SIZE_BYTES`.
pub const MAX_STREAM_MESSAGES: usize = 10_000;

/// Records the timestamp when all messages before the given index (down to the
/// previous `MessageTime`) were first added to / learned about in a stream.
struct MessageTime {
Expand Down Expand Up @@ -629,6 +641,44 @@ impl BatchProcessorImpl {
log: ReplicaLogger,
registry: Arc<dyn RegistryClient>,
malicious_flags: MaliciousFlags,
) -> BatchProcessorImpl {
Self::with_stream_limits_for_testing(
state_manager,
certified_stream_store,
ingress_history_writer,
scheduler,
hypervisor_config,
cycles_account_manager,
subnet_id,
// Do NOT replace these constants. Stream limits must remain constant on mainnet,
// otherwise the payload builder might mistakenly identify subnets as dishonest.
// Changes must be carefully considered.
michael-weigelt marked this conversation as resolved.
Show resolved Hide resolved
MAX_STREAM_MESSAGES,
TARGET_STREAM_SIZE_BYTES,
metrics,
metrics_registry,
log,
registry,
malicious_flags,
)
}

#[allow(clippy::too_many_arguments)]
fn with_stream_limits_for_testing(
state_manager: Arc<dyn StateManager<State = ReplicatedState>>,
certified_stream_store: Arc<dyn CertifiedStreamStore>,
ingress_history_writer: Arc<dyn IngressHistoryWriter<State = ReplicatedState> + 'static>,
scheduler: Box<dyn Scheduler<State = ReplicatedState>>,
hypervisor_config: HypervisorConfig,
cycles_account_manager: Arc<CyclesAccountManager>,
subnet_id: SubnetId,
max_stream_messages: usize,
target_stream_size_bytes: usize,
metrics: MessageRoutingMetrics,
metrics_registry: &MetricsRegistry,
log: ReplicaLogger,
registry: Arc<dyn RegistryClient>,
malicious_flags: MaliciousFlags,
) -> BatchProcessorImpl {
let time_in_stream_metrics = Arc::new(Mutex::new(LatencyMetrics::new_time_in_stream(
metrics_registry,
Expand Down Expand Up @@ -657,6 +707,8 @@ impl BatchProcessorImpl {
));
let stream_builder = Box::new(routing::stream_builder::StreamBuilderImpl::new(
subnet_id,
max_stream_messages,
target_stream_size_bytes,
metrics_registry,
&metrics,
time_in_stream_metrics,
Expand Down Expand Up @@ -1512,6 +1564,8 @@ impl MessageRoutingImpl {
) -> Self {
let stream_builder = Box::new(routing::stream_builder::StreamBuilderImpl::new(
subnet_id,
MAX_STREAM_MESSAGES,
TARGET_STREAM_SIZE_BYTES,
metrics_registry,
&MessageRoutingMetrics::new(metrics_registry),
Arc::new(Mutex::new(LatencyMetrics::new_time_in_stream(
Expand Down Expand Up @@ -1597,8 +1651,7 @@ impl MessageRouting for MessageRoutingImpl {
}
}

/// An MessageRouting implementation that processes batches synchronously. Primarily used for
/// testing.
/// An MessageRouting implementation that processes batches synchronously. Used for state machine tests.
pub struct SyncMessageRouting {
batch_processor: Arc<Mutex<dyn BatchProcessor>>,
state_manager: Arc<dyn StateManager<State = ReplicatedState>>,
Expand All @@ -1616,21 +1669,25 @@ impl SyncMessageRouting {
hypervisor_config: HypervisorConfig,
cycles_account_manager: Arc<CyclesAccountManager>,
subnet_id: SubnetId,
max_stream_messages: usize,
target_stream_size_bytes: usize,
metrics_registry: &MetricsRegistry,
log: ReplicaLogger,
registry: Arc<dyn RegistryClient>,
malicious_flags: MaliciousFlags,
) -> Self {
let metrics = MessageRoutingMetrics::new(metrics_registry);

let batch_processor = BatchProcessorImpl::new(
let batch_processor = BatchProcessorImpl::with_stream_limits_for_testing(
state_manager.clone(),
certified_stream_store,
ingress_history_writer,
scheduler,
hypervisor_config,
cycles_account_manager,
subnet_id,
max_stream_messages,
target_stream_size_bytes,
metrics,
metrics_registry,
log.clone(),
Expand Down
32 changes: 16 additions & 16 deletions rs/messaging/src/routing/stream_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,6 @@ struct StreamBuilderMetrics {
pub critical_error_induct_response_failed: IntCounter,
}

/// Desired byte size of an outgoing stream.
///
/// At most `MAX_STREAM_MESSAGES` are enqueued into a stream; but only until its
/// `count_bytes()` is greater than or equal to `TARGET_STREAM_SIZE_BYTES`.
const TARGET_STREAM_SIZE_BYTES: usize = 10 * 1024 * 1024;

/// Maximum number of messages in a stream.
///
/// At most `MAX_STREAM_MESSAGES` are enqueued into a stream; but only until its
/// `count_bytes()` is greater than or equal to `TARGET_STREAM_SIZE_BYTES`.
const MAX_STREAM_MESSAGES: usize = 10_000;

const METRIC_STREAM_MESSAGES: &str = "mr_stream_messages";
const METRIC_STREAM_BYTES: &str = "mr_stream_bytes";
const METRIC_STREAM_BEGIN: &str = "mr_stream_begin";
Expand Down Expand Up @@ -173,8 +161,14 @@ pub(crate) trait StreamBuilder: Send {
fn build_streams(&self, state: ReplicatedState) -> ReplicatedState;
}

/// Builds streams according to the specified limits.
///
/// At most `max_stream_messages` are enqueued into a stream; but only until its
/// `count_bytes()` is greater than or equal to `target_stream_size_bytes`.
pub(crate) struct StreamBuilderImpl {
subnet_id: SubnetId,
max_stream_messages: usize,
target_stream_size_bytes: usize,
metrics: StreamBuilderMetrics,
time_in_stream_metrics: Arc<Mutex<LatencyMetrics>>,

Expand All @@ -187,6 +181,8 @@ pub(crate) struct StreamBuilderImpl {
impl StreamBuilderImpl {
pub(crate) fn new(
subnet_id: SubnetId,
max_stream_messages: usize,
target_stream_size_bytes: usize,
metrics_registry: &MetricsRegistry,
message_routing_metrics: &MessageRoutingMetrics,
time_in_stream_metrics: Arc<Mutex<LatencyMetrics>>,
Expand All @@ -195,6 +191,8 @@ impl StreamBuilderImpl {
) -> Self {
Self {
subnet_id,
max_stream_messages,
target_stream_size_bytes,
metrics: StreamBuilderMetrics::new(metrics_registry, message_routing_metrics),
time_in_stream_metrics,
best_effort_responses,
Expand Down Expand Up @@ -270,9 +268,7 @@ impl StreamBuilderImpl {
.observe(msg.payload_size_bytes().get() as f64);
}

/// Implementation of `StreamBuilder::build_streams()` that takes a
/// `target_stream_size_bytes` argument to limit how many messages will be
/// routed into each stream.
/// Implementation of `StreamBuilder::build_streams()`.
fn build_streams_impl(
&self,
mut state: ReplicatedState,
Expand Down Expand Up @@ -588,6 +584,10 @@ impl StreamBuilderImpl {

impl StreamBuilder for StreamBuilderImpl {
fn build_streams(&self, state: ReplicatedState) -> ReplicatedState {
self.build_streams_impl(state, MAX_STREAM_MESSAGES, TARGET_STREAM_SIZE_BYTES)
self.build_streams_impl(
state,
self.max_stream_messages,
self.target_stream_size_bytes,
)
}
}
6 changes: 5 additions & 1 deletion rs/messaging/src/routing/stream_builder/tests.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::message_routing::MessageRoutingMetrics;
use crate::message_routing::{
MessageRoutingMetrics, MAX_STREAM_MESSAGES, TARGET_STREAM_SIZE_BYTES,
};

use super::*;
use ic_base_types::NumSeconds;
Expand Down Expand Up @@ -1005,6 +1007,8 @@ fn new_fixture(log: &ReplicaLogger) -> (StreamBuilderImpl, ReplicatedState, Metr
let metrics_registry = MetricsRegistry::new();
let stream_builder = StreamBuilderImpl::new(
LOCAL_SUBNET,
MAX_STREAM_MESSAGES,
TARGET_STREAM_SIZE_BYTES,
&metrics_registry,
&MessageRoutingMetrics::new(&metrics_registry),
Arc::new(Mutex::new(LatencyMetrics::new_time_in_stream(
Expand Down
28 changes: 27 additions & 1 deletion rs/state_machine_tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use ic_management_canister_types_private::{
SignWithSchnorrReply, TakeCanisterSnapshotArgs, UpdateSettingsArgs, UploadChunkArgs,
UploadChunkReply,
};
use ic_messaging::SyncMessageRouting;
use ic_messaging::{SyncMessageRouting, MAX_STREAM_MESSAGES, TARGET_STREAM_SIZE_BYTES};
use ic_metrics::MetricsRegistry;
use ic_protobuf::{
registry::{
Expand Down Expand Up @@ -908,6 +908,8 @@ pub struct StateMachineBuilder {
subnet_size: usize,
nns_subnet_id: Option<SubnetId>,
subnet_id: Option<SubnetId>,
max_stream_messages: Option<usize>,
target_stream_size_bytes: Option<usize>,
routing_table: RoutingTable,
chain_keys_enabled_status: BTreeMap<MasterPublicKeyId, bool>,
ecdsa_signature_fee: Option<Cycles>,
Expand Down Expand Up @@ -938,6 +940,8 @@ impl StateMachineBuilder {
subnet_size: SMALL_APP_SUBNET_MAX_SIZE,
nns_subnet_id: None,
subnet_id: None,
max_stream_messages: None,
target_stream_size_bytes: None,
routing_table: RoutingTable::new(),
chain_keys_enabled_status: Default::default(),
ecdsa_signature_fee: None,
Expand Down Expand Up @@ -1049,6 +1053,20 @@ impl StateMachineBuilder {
}
}

pub fn with_max_stream_messages(self, max_stream_messages: usize) -> Self {
Self {
max_stream_messages: Some(max_stream_messages),
..self
}
}

pub fn with_target_stream_size_bytes(self, target_stream_size_bytes: usize) -> Self {
Self {
target_stream_size_bytes: Some(target_stream_size_bytes),
..self
}
}

pub fn with_master_ecdsa_public_key(self) -> Self {
self.with_chain_key(MasterPublicKeyId::Ecdsa(EcdsaKeyId {
curve: EcdsaCurve::Secp256k1,
Expand Down Expand Up @@ -1154,6 +1172,8 @@ impl StateMachineBuilder {
self.subnet_type,
self.subnet_size,
self.subnet_id,
self.max_stream_messages,
self.target_stream_size_bytes,
self.chain_keys_enabled_status,
self.ecdsa_signature_fee,
self.schnorr_signature_fee,
Expand Down Expand Up @@ -1480,6 +1500,8 @@ impl StateMachine {
subnet_type: SubnetType,
subnet_size: usize,
subnet_id: Option<SubnetId>,
max_stream_messages: Option<usize>,
target_stream_size_bytes: Option<usize>,
chain_keys_enabled_status: BTreeMap<MasterPublicKeyId, bool>,
ecdsa_signature_fee: Option<Cycles>,
schnorr_signature_fee: Option<Cycles>,
Expand Down Expand Up @@ -1527,6 +1549,8 @@ impl StateMachine {
let public_key_der = threshold_sig_public_key_to_der(public_key).unwrap();
let subnet_id =
subnet_id.unwrap_or(PrincipalId::new_self_authenticating(&public_key_der).into());
let max_stream_messages = max_stream_messages.unwrap_or(MAX_STREAM_MESSAGES);
let target_stream_size_bytes = target_stream_size_bytes.unwrap_or(TARGET_STREAM_SIZE_BYTES);
let registry_client = make_nodes_registry(
subnet_id,
subnet_type,
Expand Down Expand Up @@ -1639,6 +1663,8 @@ impl StateMachine {
hypervisor_config,
cycles_account_manager.clone(),
subnet_id,
max_stream_messages,
target_stream_size_bytes,
&metrics_registry,
replica_logger.clone(),
Arc::clone(&registry_client) as _,
Expand Down