From d50e30c1f72bb56c4230f872452e563d5e284730 Mon Sep 17 00:00:00 2001 From: robin-near <111538878+robin-near@users.noreply.github.com> Date: Thu, 7 Mar 2024 08:23:44 -0800 Subject: [PATCH] [TestLoop Refactoring] Basic multi-node Client test based on TestLoop (#10717) An integration test written in TestLoop that runs 4 validators while propagating network messages Block and Approval. We can see from the visualizer that the four nodes produce blocks together: image image Now, there's an issue - the blocks don't have any chunks. And that is because the chunks never finish processing, since processing of the chunks happen in a rayon thread that isn't supported by test loop yet. The blocks are still produced anyway after waiting long enough (in virtual time). The test completes in 0.19s. Btw, I'm not attempting to make the test modular or clean yet. My goal right now is just to work towards a functional integration test that does state sync and later in-memory trie. --- chain/chunks/src/test/basic.rs | 8 +- chain/chunks/src/test/multi.rs | 8 +- .../test_utils/client_actions_test_utils.rs | 40 ++ chain/network/src/test_loop.rs | 5 + .../src/examples/actix_component_test.rs | 4 +- .../src/examples/async_component_test.rs | 2 +- core/async/src/examples/sum_numbers_test.rs | 10 +- core/async/src/test_loop.rs | 48 +-- core/async/src/test_loop/adhoc.rs | 11 +- core/async/src/test_loop/delay_sender.rs | 49 +++ core/chain-configs/src/updateable_config.rs | 1 + .../src/tests/client/features.rs | 1 + .../features/multinode_test_loop_example.rs | 350 ++++++++++++++++++ .../features/simple_test_loop_example.rs | 17 +- 14 files changed, 487 insertions(+), 67 deletions(-) create mode 100644 integration-tests/src/tests/client/features/multinode_test_loop_example.rs diff --git a/chain/chunks/src/test/basic.rs b/chain/chunks/src/test/basic.rs index e2c32a4819b..bcebb48534e 100644 --- a/chain/chunks/src/test/basic.rs +++ b/chain/chunks/src/test/basic.rs @@ -42,6 +42,12 @@ struct TestData { network_events: Vec, } +impl AsMut for TestData { + fn as_mut(&mut self) -> &mut Self { + self + } +} + impl TestData { fn new(shards_manager: ShardsManager, chain: MockChainForShardsManager) -> Self { Self { shards_manager, chain, client_events: vec![], network_events: vec![] } @@ -177,7 +183,7 @@ fn test_chunk_forward() { test.register_handler(forward_client_request_to_shards_manager().widen()); test.register_handler(forward_network_request_to_shards_manager().widen()); test.register_handler(periodically_resend_chunk_requests(CHUNK_REQUEST_RETRY).widen()); - test.register_handler(handle_adhoc_events()); + test.register_handler(handle_adhoc_events::().widen()); // We'll produce a single chunk whose next chunk producer is a chunk-only // producer, so that we can test that the chunk is forwarded to the next diff --git a/chain/chunks/src/test/multi.rs b/chain/chunks/src/test/multi.rs index aad1dc5b015..24e8bf84dd3 100644 --- a/chain/chunks/src/test/multi.rs +++ b/chain/chunks/src/test/multi.rs @@ -41,6 +41,12 @@ struct TestData { account_id: AccountId, } +impl AsMut for TestData { + fn as_mut(&mut self) -> &mut Self { + self + } +} + #[derive(EnumTryInto, Debug, EnumFrom)] enum TestEvent { Adhoc(AdhocEvent), @@ -99,7 +105,7 @@ fn basic_setup(config: BasicSetupConfig) -> ShardsManagerTestLoop { .collect::>(); let mut test = builder.build(data); for idx in 0..test.data.len() { - test.register_handler(handle_adhoc_events().for_index(idx)); + test.register_handler(handle_adhoc_events::().widen().for_index(idx)); test.register_handler(forward_client_request_to_shards_manager().widen().for_index(idx)); test.register_handler(forward_network_request_to_shards_manager().widen().for_index(idx)); test.register_handler(capture_events::().widen().for_index(idx)); diff --git a/chain/client/src/test_utils/client_actions_test_utils.rs b/chain/client/src/test_utils/client_actions_test_utils.rs index 31ce0fe9973..7f06bd79ad7 100644 --- a/chain/client/src/test_utils/client_actions_test_utils.rs +++ b/chain/client/src/test_utils/client_actions_test_utils.rs @@ -2,6 +2,46 @@ use crate::client_actions::{ClientActionHandler, ClientActions, ClientSenderForC use crate::sync_jobs_actions::ClientSenderForSyncJobsMessage; use near_async::test_loop::event_handler::LoopEventHandler; use near_chunks::client::ShardsManagerResponse; +use near_network::client::ClientSenderForNetworkMessage; + +pub fn forward_client_messages_from_network_to_client_actions( +) -> LoopEventHandler { + LoopEventHandler::new(|msg, client_actions: &mut ClientActions, _| { + match msg { + ClientSenderForNetworkMessage::_state_response(msg) => { + (msg.callback)(Ok(client_actions.handle(msg.message))); + } + ClientSenderForNetworkMessage::_block_approval(msg) => { + (msg.callback)(Ok(client_actions.handle(msg.message))); + } + ClientSenderForNetworkMessage::_transaction(msg) => { + (msg.callback)(Ok(client_actions.handle(msg.message))); + } + ClientSenderForNetworkMessage::_block(msg) => { + (msg.callback)(Ok(client_actions.handle(msg.message))); + } + ClientSenderForNetworkMessage::_block_headers(msg) => { + (msg.callback)(Ok(client_actions.handle(msg.message))); + } + ClientSenderForNetworkMessage::_challenge(msg) => { + (msg.callback)(Ok(client_actions.handle(msg.message))); + } + ClientSenderForNetworkMessage::_network_info(msg) => { + (msg.callback)(Ok(client_actions.handle(msg.message))); + } + ClientSenderForNetworkMessage::_chunk_state_witness(msg) => { + (msg.callback)(Ok(client_actions.handle(msg.message))); + } + ClientSenderForNetworkMessage::_chunk_endorsement(msg) => { + (msg.callback)(Ok(client_actions.handle(msg.message))); + } + _ => { + return Err(msg); + } + } + Ok(()) + }) +} pub fn forward_client_messages_from_client_to_client_actions( ) -> LoopEventHandler { diff --git a/chain/network/src/test_loop.rs b/chain/network/src/test_loop.rs index 3a53b754538..f35cccfd612 100644 --- a/chain/network/src/test_loop.rs +++ b/chain/network/src/test_loop.rs @@ -5,6 +5,7 @@ use near_primitives::types::AccountId; /// This trait is just a helper for looking up the index. pub trait SupportsRoutingLookup { fn index_for_account(&self, account: &AccountId) -> usize; + fn num_accounts(&self) -> usize; } impl> SupportsRoutingLookup for Vec { @@ -13,4 +14,8 @@ impl> SupportsRoutingLookup for Vec { .position(|data| data.as_ref() == account) .unwrap_or_else(|| panic!("Account not found: {}", account)) } + + fn num_accounts(&self) -> usize { + self.len() + } } diff --git a/core/async/src/examples/actix_component_test.rs b/core/async/src/examples/actix_component_test.rs index a5463cd9fde..a3917305399 100644 --- a/core/async/src/examples/actix_component_test.rs +++ b/core/async/src/examples/actix_component_test.rs @@ -48,7 +48,7 @@ fn test_actix_component() { dummy: (), example: ExampleComponent::new(builder.sender().into_sender()), outer: OuterComponent::new( - builder.wrapped_multi_sender::(), + builder.sender().into_wrapped_multi_sender::(), ), periodic_requests_captured: vec![], }; @@ -66,7 +66,7 @@ fn test_actix_component() { test.register_handler(example_handler().widen()); // We need to redo whatever the ExampleActor does in its `started` method. - test.data.example.start(&mut test.delayed_action_runner()); + test.data.example.start(&mut test.sender().into_delayed_action_runner()); // Send some requests; this can be done in the asynchronous context. test.future_spawner().spawn("wait for 5", { let res = test.data.outer.call_example_component_for_response(5); diff --git a/core/async/src/examples/async_component_test.rs b/core/async/src/examples/async_component_test.rs index bd51cd492e4..d8fec354ffa 100644 --- a/core/async/src/examples/async_component_test.rs +++ b/core/async/src/examples/async_component_test.rs @@ -51,7 +51,7 @@ fn inner_request_handler( fn test_async_component() { let builder = TestLoopBuilder::::new(); let sender = builder.sender(); - let future_spawner = builder.future_spawner(); + let future_spawner = builder.sender().into_future_spawner(); let mut test = builder.build(TestData { dummy: (), output: vec![], diff --git a/core/async/src/examples/sum_numbers_test.rs b/core/async/src/examples/sum_numbers_test.rs index b01eca9e36e..428c8db81d0 100644 --- a/core/async/src/examples/sum_numbers_test.rs +++ b/core/async/src/examples/sum_numbers_test.rs @@ -12,12 +12,18 @@ use crate::{ use super::sum_numbers::{ReportSumMsg, SumNumbersComponent, SumRequest}; -#[derive(derive_more::AsMut, derive_more::AsRef)] +#[derive(derive_more::AsMut)] struct TestData { summer: SumNumbersComponent, sums: Vec, } +impl AsMut for TestData { + fn as_mut(&mut self) -> &mut Self { + self + } +} + #[derive(Debug, EnumTryInto, EnumFrom)] enum TestEvent { Request(SumRequest), @@ -71,7 +77,7 @@ fn test_simple_with_adhoc() { let mut test = builder.build(data); test.register_handler(forward_sum_request().widen()); test.register_handler(capture_events::().widen()); - test.register_handler(handle_adhoc_events()); + test.register_handler(handle_adhoc_events::().widen()); // It is preferrable to put as much setup logic as possible into an adhoc // event (queued by .run below), so that as much logic as possible is diff --git a/core/async/src/test_loop.rs b/core/async/src/test_loop.rs index 173e9122a79..a99c90279c9 100644 --- a/core/async/src/test_loop.rs +++ b/core/async/src/test_loop.rs @@ -68,16 +68,10 @@ pub mod multi_instance; use self::{ delay_sender::DelaySender, event_handler::LoopEventHandler, - futures::{ - TestLoopDelayedActionEvent, TestLoopDelayedActionRunner, TestLoopFutureSpawner, - TestLoopTask, - }, -}; -use crate::{break_apart::BreakApart, time}; -use crate::{ - messaging::{IntoMultiSender, IntoSender}, - test_loop::event_handler::LoopHandlerContext, + futures::{TestLoopFutureSpawner, TestLoopTask}, }; +use crate::test_loop::event_handler::LoopHandlerContext; +use crate::time; use near_o11y::{testonly::init_test_logger, tracing::log::info}; use serde::Serialize; use std::{ @@ -192,40 +186,11 @@ impl TestLoopBuilder { self.pending_events_sender.clone() } - /// A shortcut for a common use case, where we use an enum message to - /// represent all the possible messages that a multisender may be used to - /// send. - /// - /// This assumes that S is a multisender with the derive - /// `#[derive(MultiSendMessage, ...)]`, which creates the enum - /// `MyMultiSenderMessage` (where `MyMultiSender` is the name of the struct - /// being derived from). - /// - /// To use, first include in the test loop event enum a case for - /// `MyMultiSenderMessage`. Then, call this function to get a multisender, - /// like - /// `builder.wrapped_multi_sender()`. - pub fn wrapped_multi_sender(&self) -> S - where - DelaySender: IntoSender, - BreakApart: IntoMultiSender, - { - self.sender().into_sender().break_apart().into_multi_sender() - } - /// Returns a clock that will always return the current virtual time. pub fn clock(&self) -> time::Clock { self.clock.clock() } - /// Returns a FutureSpawner that can be used to spawn futures into the loop. - pub fn future_spawner(&self) -> TestLoopFutureSpawner - where - Event: From>, - { - self.sender().narrow() - } - pub fn build(self, data: Data) -> TestLoop { TestLoop::new(self.pending_events, self.pending_events_sender, self.clock, data) } @@ -356,13 +321,6 @@ impl TestLoop { { self.sender().narrow() } - - pub fn delayed_action_runner(&self) -> TestLoopDelayedActionRunner - where - Event: From>, - { - TestLoopDelayedActionRunner { sender: self.sender().narrow() } - } } impl Drop for TestLoop { diff --git a/core/async/src/test_loop/adhoc.rs b/core/async/src/test_loop/adhoc.rs index f8e51e3ee26..dee459cffa0 100644 --- a/core/async/src/test_loop/adhoc.rs +++ b/core/async/src/test_loop/adhoc.rs @@ -1,7 +1,4 @@ -use super::{ - delay_sender::DelaySender, - event_handler::{LoopEventHandler, TryIntoOrSelf}, -}; +use super::{delay_sender::DelaySender, event_handler::LoopEventHandler}; use crate::messaging::CanSend; use crate::time; use std::fmt::Debug; @@ -54,10 +51,8 @@ impl> + 'static> AdhocEventSender>>( -) -> LoopEventHandler { - LoopEventHandler::new(|event: Event, data, _ctx| { - let event = event.try_into_or_self()?; +pub fn handle_adhoc_events() -> LoopEventHandler> { + LoopEventHandler::new(|event: AdhocEvent, data, _ctx| { (event.handler)(data); Ok(()) }) diff --git a/core/async/src/test_loop/delay_sender.rs b/core/async/src/test_loop/delay_sender.rs index cc9ec5f3b7a..205166afa2a 100644 --- a/core/async/src/test_loop/delay_sender.rs +++ b/core/async/src/test_loop/delay_sender.rs @@ -1,7 +1,12 @@ +use crate::break_apart::BreakApart; use crate::messaging; +use crate::messaging::{IntoMultiSender, IntoSender}; +use crate::test_loop::futures::{TestLoopDelayedActionEvent, TestLoopDelayedActionRunner}; use crate::time; use std::sync::Arc; +use super::futures::{TestLoopFutureSpawner, TestLoopTask}; + /// Interface to send an event with a delay (in virtual time). It can be /// converted to a Sender for any message type that can be converted into /// the event type, so that a DelaySender given by the test loop may be passed @@ -23,6 +28,14 @@ impl DelaySender { self.0(event, delay); } + pub fn with_additional_delay(&self, delay: time::Duration) -> DelaySender + where + Event: 'static, + { + let f = self.0.clone(); + Self(Arc::new(move |event, other_delay| f(event, delay + other_delay))) + } + pub fn narrow(self) -> DelaySender where Event: From + 'static, @@ -31,6 +44,42 @@ impl DelaySender { self.send_with_delay(event.into(), delay) }) } + + /// A shortcut for a common use case, where we use an enum message to + /// represent all the possible messages that a multisender may be used to + /// send. + /// + /// This assumes that S is a multisender with the derive + /// `#[derive(MultiSendMessage, ...)]`, which creates the enum + /// `MyMultiSenderMessage` (where `MyMultiSender` is the name of the struct + /// being derived from). + /// + /// To use, first include in the test loop event enum a case for + /// `MyMultiSenderMessage`. Then, call this function to get a multisender, + /// like + /// `builder.wrapped_multi_sender()`. + pub fn into_wrapped_multi_sender(self) -> S + where + Self: IntoSender, + BreakApart: IntoMultiSender, + { + self.into_sender().break_apart().into_multi_sender() + } + + pub fn into_delayed_action_runner(self) -> TestLoopDelayedActionRunner + where + Event: From> + 'static, + { + TestLoopDelayedActionRunner { sender: self.narrow() } + } + + /// Returns a FutureSpawner that can be used to spawn futures into the loop. + pub fn into_future_spawner(self) -> TestLoopFutureSpawner + where + Event: From> + 'static, + { + self.narrow() + } } impl DelaySender<(usize, Event)> { diff --git a/core/chain-configs/src/updateable_config.rs b/core/chain-configs/src/updateable_config.rs index b53394a54c8..833315ab160 100644 --- a/core/chain-configs/src/updateable_config.rs +++ b/core/chain-configs/src/updateable_config.rs @@ -98,5 +98,6 @@ pub struct UpdateableClientConfig { pub resharding_config: ReshardingConfig, /// Time limit for adding transactions in produce_chunk() + #[serde(with = "near_async::time::serde_opt_duration_as_std")] pub produce_chunk_add_transactions_time_limit: Option, } diff --git a/integration-tests/src/tests/client/features.rs b/integration-tests/src/tests/client/features.rs index ccc2ad93033..c6e8c0cf4a4 100644 --- a/integration-tests/src/tests/client/features.rs +++ b/integration-tests/src/tests/client/features.rs @@ -15,6 +15,7 @@ mod increase_deployment_cost; mod increase_storage_compute_cost; mod limit_contract_functions_number; mod lower_storage_key_limit; +mod multinode_test_loop_example; mod nearvm; #[cfg(feature = "protocol_feature_nonrefundable_transfer_nep491")] mod nonrefundable_transfer; diff --git a/integration-tests/src/tests/client/features/multinode_test_loop_example.rs b/integration-tests/src/tests/client/features/multinode_test_loop_example.rs new file mode 100644 index 00000000000..0e514145ccb --- /dev/null +++ b/integration-tests/src/tests/client/features/multinode_test_loop_example.rs @@ -0,0 +1,350 @@ +use derive_enum_from_into::{EnumFrom, EnumTryInto}; +use near_async::messaging::{noop, IntoMultiSender, IntoSender, MessageWithCallback, SendAsync}; +use near_async::test_loop::adhoc::{handle_adhoc_events, AdhocEvent, AdhocEventSender}; +use near_async::test_loop::event_handler::{ + ignore_events, LoopEventHandler, LoopHandlerContext, TryIntoOrSelf, +}; +use near_async::test_loop::futures::{ + drive_delayed_action_runners, drive_futures, TestLoopDelayedActionEvent, TestLoopTask, +}; +use near_async::test_loop::TestLoopBuilder; +use near_async::time::Duration; +use near_chain::chunks_store::ReadOnlyChunksStore; +use near_chain::ChainGenesis; +use near_chain_configs::{ClientConfig, Genesis, GenesisConfig, GenesisRecords}; +use near_chunks::adapter::ShardsManagerRequestFromClient; +use near_chunks::client::ShardsManagerResponse; +use near_chunks::test_loop::forward_client_request_to_shards_manager; +use near_chunks::ShardsManager; +use near_client::client_actions::{ + ClientActions, ClientSenderForClientMessage, SyncJobsSenderForClientMessage, +}; +use near_client::sync_jobs_actions::{ + ClientSenderForSyncJobsMessage, SyncJobsActions, SyncJobsSenderForSyncJobsMessage, +}; +use near_client::test_utils::client_actions_test_utils::{ + forward_client_messages_from_client_to_client_actions, + forward_client_messages_from_network_to_client_actions, + forward_client_messages_from_shards_manager, + forward_client_messages_from_sync_jobs_to_client_actions, +}; +use near_client::test_utils::sync_jobs_test_utils::forward_sync_jobs_messages_from_client_to_sync_jobs_actions; +use near_client::{Client, SyncAdapter, SyncMessage}; +use near_epoch_manager::shard_tracker::{ShardTracker, TrackedConfig}; +use near_epoch_manager::EpochManager; +use near_network::client::{ + BlockApproval, BlockResponse, ClientSenderForNetwork, ClientSenderForNetworkMessage, +}; +use near_network::test_loop::SupportsRoutingLookup; +use near_network::types::{ + NetworkRequests, PeerManagerMessageRequest, PeerManagerMessageResponse, SetChainInfo, +}; +use near_primitives::network::PeerId; +use near_primitives::shard_layout::ShardLayout; +use near_primitives::state_record::StateRecord; +use near_primitives::test_utils::{create_test_signer, create_user_test_signer}; +use near_primitives::types::{AccountId, AccountInfo}; +use near_primitives::version::PROTOCOL_VERSION; +use near_primitives_core::account::{AccessKey, Account}; +use near_primitives_core::hash::CryptoHash; +use near_store::genesis::initialize_genesis_state; +use near_store::test_utils::create_test_store; +use nearcore::NightshadeRuntime; +use std::path::Path; +use std::sync::{Arc, RwLock}; + +#[derive(derive_more::AsMut, derive_more::AsRef)] +struct TestData { + pub dummy: (), + pub account: AccountId, + pub client: ClientActions, + pub sync_jobs: SyncJobsActions, + pub shards_manager: ShardsManager, +} + +impl AsMut for TestData { + fn as_mut(&mut self) -> &mut Self { + self + } +} + +#[derive(EnumTryInto, Debug, EnumFrom)] +#[allow(clippy::large_enum_variant)] +enum TestEvent { + Task(Arc), + Adhoc(AdhocEvent), + ClientDelayedActions(TestLoopDelayedActionEvent), + ClientEventFromNetwork(ClientSenderForNetworkMessage), + ClientEventFromClient(ClientSenderForClientMessage), + ClientEventFromSyncJobs(ClientSenderForSyncJobsMessage), + ClientEventFromShardsManager(ShardsManagerResponse), + SyncJobsEventFromClient(SyncJobsSenderForClientMessage), + SyncJobsEventFromSyncJobs(SyncJobsSenderForSyncJobsMessage), + ShardsManagerRequestFromClient(ShardsManagerRequestFromClient), + ClientEventFromStateSyncAdapter(SyncMessage), + NetworkMessage(PeerManagerMessageRequest), + NetworkMessageForResult( + MessageWithCallback, + ), + SetChainInfo(SetChainInfo), +} + +const ONE_NEAR: u128 = 1_000_000_000_000_000_000_000_000; + +// TODO(robin-near): Complete this test so that it will actually run a chain. +// TODO(robin-near): Make this a multi-node test. +// TODO(robin-near): Make the network layer send messages. +#[test] +fn test_client_with_multi_test_loop() { + const NUM_CLIENTS: usize = 4; + let builder = TestLoopBuilder::<(usize, TestEvent)>::new(); + + let validator_stake = 1000000 * ONE_NEAR; + let initial_balance = 10000 * ONE_NEAR; + let accounts = + (0..100).map(|i| format!("account{}", i).parse().unwrap()).collect::>(); + + // TODO: Make some builder for genesis. + let mut genesis_config = GenesisConfig { + protocol_version: PROTOCOL_VERSION, + genesis_height: 10000, + shard_layout: ShardLayout::v1( + vec!["account3", "account5", "account7"] + .into_iter() + .map(|a| a.parse().unwrap()) + .collect(), + None, + 1, + ), + min_gas_price: 0, + max_gas_price: 0, + gas_limit: 100000000000000, + transaction_validity_period: 1000, + validators: (0..NUM_CLIENTS) + .map(|idx| AccountInfo { + account_id: accounts[idx].clone(), + amount: validator_stake, + public_key: create_test_signer(accounts[idx].as_str()).public_key(), + }) + .collect(), + epoch_length: 10, + protocol_treasury_account: accounts[NUM_CLIENTS].clone(), + num_block_producer_seats: 4, + minimum_validators_per_shard: 1, + num_block_producer_seats_per_shard: vec![4, 4, 4, 4], + ..Default::default() + }; + let mut records = Vec::new(); + for (i, account) in accounts.iter().enumerate() { + // The staked amount must be consistent with validators from genesis. + let staked = if i < NUM_CLIENTS { validator_stake } else { 0 }; + records.push(StateRecord::Account { + account_id: account.clone(), + account: Account::new( + initial_balance, + staked, + 0, + CryptoHash::default(), + 0, + PROTOCOL_VERSION, + ), + }); + records.push(StateRecord::AccessKey { + account_id: account.clone(), + public_key: create_user_test_signer(&account).public_key, + access_key: AccessKey::full_access(), + }); + // The total supply must be correct to pass validation. + genesis_config.total_supply += initial_balance + staked; + } + let genesis = Genesis::new(genesis_config, GenesisRecords(records)).unwrap(); + + let mut datas = Vec::new(); + for idx in 0..NUM_CLIENTS { + let mut client_config = ClientConfig::test(true, 600, 2000, 4, false, true, false, false); + client_config.max_block_wait_delay = Duration::seconds(6); + let store = create_test_store(); + initialize_genesis_state(store.clone(), &genesis, None); + + let sync_jobs_actions = SyncJobsActions::new( + builder + .sender() + .for_index(idx) + .into_wrapped_multi_sender::(), + builder + .sender() + .for_index(idx) + .into_wrapped_multi_sender::(), + ); + let chain_genesis = ChainGenesis::new(&genesis.config); + let epoch_manager = EpochManager::new_arc_handle(store.clone(), &genesis.config); + let shard_tracker = ShardTracker::new(TrackedConfig::AllShards, epoch_manager.clone()); + let state_sync_adapter = Arc::new(RwLock::new(SyncAdapter::new( + builder.sender().for_index(idx).into_sender(), + builder.sender().for_index(idx).into_sender(), + ))); + let runtime_adapter = NightshadeRuntime::test( + Path::new("."), + store.clone(), + &genesis.config, + epoch_manager.clone(), + ); + + let client = Client::new( + builder.clock(), + client_config.clone(), + chain_genesis, + epoch_manager.clone(), + shard_tracker.clone(), + state_sync_adapter, + runtime_adapter, + builder.sender().for_index(idx).into_multi_sender(), + builder.sender().for_index(idx).into_sender(), + Some(Arc::new(create_test_signer(accounts[idx].as_str()))), + true, + [0; 32], + None, + ) + .unwrap(); + + let shards_manager = ShardsManager::new( + builder.clock(), + Some(accounts[idx].clone()), + epoch_manager, + shard_tracker, + builder.sender().for_index(idx).into_sender(), + builder.sender().for_index(idx).into_sender(), + ReadOnlyChunksStore::new(store), + client.chain.head().unwrap(), + client.chain.header_head().unwrap(), + ); + + let client_actions = ClientActions::new( + builder.clock(), + client, + builder + .sender() + .for_index(idx) + .into_wrapped_multi_sender::(), + client_config, + PeerId::random(), + builder.sender().for_index(idx).into_multi_sender(), + None, + noop().into_sender(), + None, + Default::default(), + None, + builder + .sender() + .for_index(idx) + .into_wrapped_multi_sender::(), + Box::new(builder.sender().for_index(idx).into_future_spawner()), + ) + .unwrap(); + + let data = TestData { + dummy: (), + account: accounts[idx].clone(), + client: client_actions, + sync_jobs: sync_jobs_actions, + shards_manager, + }; + datas.push(data); + } + + let mut test = builder.build(datas); + for idx in 0..NUM_CLIENTS { + test.register_handler(handle_adhoc_events::().widen().for_index(idx)); + test.register_handler( + forward_client_messages_from_network_to_client_actions().widen().for_index(idx), + ); + test.register_handler( + forward_client_messages_from_client_to_client_actions().widen().for_index(idx), + ); + test.register_handler( + forward_client_messages_from_sync_jobs_to_client_actions().widen().for_index(idx), + ); + test.register_handler(forward_client_messages_from_shards_manager().widen().for_index(idx)); + test.register_handler( + forward_sync_jobs_messages_from_client_to_sync_jobs_actions( + test.sender().for_index(idx).into_future_spawner(), + ) + .widen() + .for_index(idx), + ); + test.register_handler(drive_futures().widen().for_index(idx)); + test.register_handler( + drive_delayed_action_runners::().widen().for_index(idx), + ); + test.register_handler(forward_client_request_to_shards_manager().widen().for_index(idx)); + test.register_handler(ignore_events::().widen().for_index(idx)); + } + test.register_handler(route_network_messages_to_client(Duration::milliseconds(10))); + + for idx in 0..NUM_CLIENTS { + let mut delayed_action_runner = test.sender().for_index(idx).into_delayed_action_runner(); + test.sender().for_index(idx).send_adhoc_event("start_client", move |data| { + data.client.start(&mut delayed_action_runner); + }); + } + test.run_for(Duration::seconds(10)); +} + +/// Handles outgoing network messages, and turns them into incoming client messages. +pub fn route_network_messages_to_client< + Data: SupportsRoutingLookup, + Event: TryIntoOrSelf + + From + + From, +>( + network_delay: Duration, +) -> LoopEventHandler { + // let mut route_back_lookup: HashMap = HashMap::new(); + // let mut next_hash: u64 = 0; + LoopEventHandler::new( + move |event: (usize, Event), + data: &mut Data, + context: &LoopHandlerContext<(usize, Event)>| { + let (idx, event) = event; + let message = event.try_into_or_self().map_err(|event| (idx, event.into()))?; + let PeerManagerMessageRequest::NetworkRequests(request) = message else { + return Err((idx, message.into())); + }; + + let client_senders = (0..data.num_accounts()) + .map(|idx| { + context + .sender + .with_additional_delay(network_delay) + .for_index(idx) + .into_wrapped_multi_sender::() + }) + .collect::>(); + + match request { + NetworkRequests::Block { block } => { + for other_idx in 0..data.num_accounts() { + if other_idx != idx { + drop(client_senders[other_idx].send_async(BlockResponse { + block: block.clone(), + peer_id: PeerId::random(), + was_requested: false, + })); + } + } + } + NetworkRequests::Approval { approval_message } => { + let other_idx = data.index_for_account(&approval_message.target); + drop( + client_senders[other_idx] + .send_async(BlockApproval(approval_message.approval, PeerId::random())), + ); + } + // TODO: Support more network message types as we expand the test. + _ => return Err((idx, PeerManagerMessageRequest::NetworkRequests(request).into())), + } + + Ok(()) + }, + ) +} diff --git a/integration-tests/src/tests/client/features/simple_test_loop_example.rs b/integration-tests/src/tests/client/features/simple_test_loop_example.rs index 5169b5148fe..28453a3e5c3 100644 --- a/integration-tests/src/tests/client/features/simple_test_loop_example.rs +++ b/integration-tests/src/tests/client/features/simple_test_loop_example.rs @@ -76,8 +76,8 @@ const ONE_NEAR: u128 = 1_000_000_000_000_000_000_000_000; fn test_client_with_simple_test_loop() { let builder = TestLoopBuilder::::new(); let sync_jobs_actions = SyncJobsActions::new( - builder.wrapped_multi_sender::(), - builder.wrapped_multi_sender::(), + builder.sender().into_wrapped_multi_sender::(), + builder.sender().into_wrapped_multi_sender::(), ); let client_config = ClientConfig::test( true, @@ -196,7 +196,7 @@ fn test_client_with_simple_test_loop() { let client_actions = ClientActions::new( builder.clock(), client, - builder.wrapped_multi_sender::(), + builder.sender().into_wrapped_multi_sender::(), client_config, PeerId::random(), noop().into_multi_sender(), @@ -205,8 +205,8 @@ fn test_client_with_simple_test_loop() { None, Default::default(), None, - builder.wrapped_multi_sender::(), - Box::new(builder.future_spawner()), + builder.sender().into_wrapped_multi_sender::(), + Box::new(builder.sender().into_future_spawner()), ) .unwrap(); @@ -222,14 +222,17 @@ fn test_client_with_simple_test_loop() { test.register_handler(forward_client_messages_from_sync_jobs_to_client_actions().widen()); test.register_handler(forward_client_messages_from_shards_manager().widen()); test.register_handler( - forward_sync_jobs_messages_from_client_to_sync_jobs_actions(test.future_spawner()).widen(), + forward_sync_jobs_messages_from_client_to_sync_jobs_actions( + test.sender().into_future_spawner(), + ) + .widen(), ); test.register_handler(drive_futures().widen()); test.register_handler(drive_delayed_action_runners::().widen()); test.register_handler(forward_client_request_to_shards_manager().widen()); // TODO: handle additional events. - test.delayed_action_runner::().run_later( + test.sender().into_delayed_action_runner::().run_later( "start_client", Duration::ZERO, |client, runner| {