Skip to content

Commit

Permalink
[TestLoop] Multinode test loop example with type erased TestLoop.
Browse files Browse the repository at this point in the history
  • Loading branch information
robin-near committed Jun 4, 2024
1 parent b38a0c2 commit 1149acc
Show file tree
Hide file tree
Showing 14 changed files with 1,156 additions and 285 deletions.
56 changes: 54 additions & 2 deletions chain/chain/src/test_utils/test_loop.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::state_snapshot_actor::{
StateSnapshotActor, StateSnapshotSenderForClientMessage,
StateSnapshotSenderForStateSnapshotMessage,
StateSnapshotActor, StateSnapshotSenderForClient, StateSnapshotSenderForClientMessage,
StateSnapshotSenderForStateSnapshot, StateSnapshotSenderForStateSnapshotMessage,
};
use near_async::messaging::Handler;
use near_async::test_loop::event_handler::LoopEventHandler;
use near_async::v2::{self, LoopStream};

pub fn forward_state_snapshot_messages_from_state_snapshot(
) -> LoopEventHandler<StateSnapshotActor, StateSnapshotSenderForStateSnapshotMessage> {
Expand All @@ -18,3 +19,54 @@ pub fn forward_state_snapshot_messages_from_client(
StateSnapshotSenderForClientMessage::_0(msg) => actor.handle(msg),
})
}

pub struct LoopStateSnapshotActorBuilder {
pub from_state_snapshot_stream: LoopStream<StateSnapshotSenderForStateSnapshotMessage>,
pub from_state_snapshot: StateSnapshotSenderForStateSnapshot,
pub from_client_stream: LoopStream<StateSnapshotSenderForClientMessage>,
pub from_client: StateSnapshotSenderForClient,
}

pub struct LoopStateSnapshotActor {
pub actor: v2::LoopData<StateSnapshotActor>,
pub from_state_snapshot: StateSnapshotSenderForStateSnapshot,
pub from_client: StateSnapshotSenderForClient,
}

pub fn loop_state_snapshot_actor_builder(test: &mut v2::TestLoop) -> LoopStateSnapshotActorBuilder {
let from_state_snapshot = test.new_stream();
let from_state_snapshot_multi_sender = from_state_snapshot.wrapped_multi_sender();
let from_client = test.new_stream();
let from_client_multi_sender = from_client.wrapped_multi_sender();
LoopStateSnapshotActorBuilder {
from_state_snapshot_stream: from_state_snapshot,
from_state_snapshot: from_state_snapshot_multi_sender,
from_client_stream: from_client,
from_client: from_client_multi_sender,
}
}

impl LoopStateSnapshotActorBuilder {
pub fn build(
self,
test: &mut v2::TestLoop,
actor: StateSnapshotActor,
) -> LoopStateSnapshotActor {
let actor = test.add_data(actor);
self.from_state_snapshot_stream.handle1_legacy(
test,
actor,
forward_state_snapshot_messages_from_state_snapshot(),
);
self.from_client_stream.handle1_legacy(
test,
actor,
forward_state_snapshot_messages_from_client(),
);
LoopStateSnapshotActor {
actor,
from_state_snapshot: self.from_state_snapshot,
from_client: self.from_client,
}
}
}
148 changes: 147 additions & 1 deletion chain/chunks/src/test_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,21 @@ use crate::{
test_utils::{default_tip, tip},
};
use near_async::test_loop::delay_sender::DelaySender;
use near_async::time;
use near_async::test_loop::futures::TestLoopDelayedActionRunner;
use near_async::time::Clock;
use near_async::v2::{LoopData, LoopStream};
use near_async::{
messaging::Sender,
test_loop::event_handler::{LoopEventHandler, TryIntoOrSelf},
};
use near_async::{time, v2};
use near_chain::{types::Tip, Chain};
use near_epoch_manager::{
shard_tracker::{ShardTracker, TrackedConfig},
test_utils::{record_block, setup_epoch_manager_with_block_and_chunk_producers},
EpochManagerAdapter, EpochManagerHandle,
};
use near_network::types::PeerManagerAdapterMessage;
use near_network::{
shards_manager::ShardsManagerRequestFromNetwork,
test_loop::SupportsRoutingLookup,
Expand Down Expand Up @@ -52,6 +55,62 @@ pub fn forward_network_request_to_shards_manager(
})
}

#[derive(Clone)]
pub struct LoopShardsManagerActorBuilder {
pub from_client_stream: LoopStream<ShardsManagerRequestFromClient>,
pub from_network_stream: LoopStream<ShardsManagerRequestFromNetwork>,
pub from_client: Sender<ShardsManagerRequestFromClient>,
pub from_network: Sender<ShardsManagerRequestFromNetwork>,
}

pub fn loop_shards_manager_actor_builder(test: &mut v2::TestLoop) -> LoopShardsManagerActorBuilder {
let from_client_stream = test.new_stream();
let from_client = from_client_stream.sender();
let from_network_stream = test.new_stream();
let from_network = from_network_stream.sender();
LoopShardsManagerActorBuilder {
from_client_stream,
from_network_stream,
from_client,
from_network,
}
}

#[derive(Clone)]
pub struct LoopShardsManagerActor {
pub actor: LoopData<ShardsManagerActor>,
pub from_client: Sender<ShardsManagerRequestFromClient>,
pub from_network: Sender<ShardsManagerRequestFromNetwork>,
pub delayed_action_runner: TestLoopDelayedActionRunner<ShardsManagerActor>,
}

impl LoopShardsManagerActorBuilder {
pub fn build(
self,
test: &mut v2::TestLoop,
actor: ShardsManagerActor,
) -> LoopShardsManagerActor {
let actor = test.add_data(actor);
let delayed_action_runner = test.new_delayed_actions_runner(actor);
self.from_client_stream.handle1_legacy(
test,
actor,
forward_client_request_to_shards_manager(),
);
self.from_network_stream.handle1_legacy(
test,
actor,
forward_network_request_to_shards_manager(),
);
LoopShardsManagerActor {
actor,
from_client: self.from_client,
from_network: self.from_network,
delayed_action_runner,
}
}
}

/// Routes network messages that are issued by ShardsManager to other instances
/// in a multi-instance test.
///
Expand Down Expand Up @@ -144,6 +203,93 @@ pub fn route_shards_manager_network_messages<
})
}

pub fn route_network_message_to_shards_manager(
request: NetworkRequests,
originator: &AccountId,
senders: &HashMap<AccountId, DelaySender<ShardsManagerRequestFromNetwork>>,
network_delay: time::Duration,
clock: &Clock,
route_back_lookup: &mut HashMap<CryptoHash, AccountId>,
) -> Result<(), NetworkRequests> {
match request {
NetworkRequests::PartialEncodedChunkRequest { target, request, .. } => {
let target_account = target.account_id.as_ref().unwrap();
let route_back = CryptoHash::hash_borsh(route_back_lookup.len() as u64);
route_back_lookup.insert(route_back, originator.clone());
senders[target_account].send_with_delay(
ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkRequest {
partial_encoded_chunk_request: request,
route_back,
},
network_delay,
);
Ok(())
}
NetworkRequests::PartialEncodedChunkResponse { route_back, response } => {
let target_account = route_back_lookup.get(&route_back).expect("Route back not found");
senders[target_account].send_with_delay(
ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkResponse {
partial_encoded_chunk_response: response,
received_time: clock.now().into(), // TODO: use clock
}
.into(),
network_delay,
);
Ok(())
}
NetworkRequests::PartialEncodedChunkMessage { account_id, partial_encoded_chunk } => {
senders[&account_id].send_with_delay(
ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunk(
partial_encoded_chunk.into(),
)
.into(),
network_delay,
);
Ok(())
}
NetworkRequests::PartialEncodedChunkForward { account_id, forward } => {
senders[&account_id].send_with_delay(
ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkForward(forward).into(),
network_delay,
);
Ok(())
}
other_message => Err(other_message),
}
}

pub fn handle_shards_manager_network_routing(
stream: &LoopStream<PeerManagerAdapterMessage>,
originator: &AccountId,
test: &mut v2::TestLoop,
senders: &HashMap<AccountId, DelaySender<ShardsManagerRequestFromNetwork>>,
network_delay: time::Duration,
route_back_lookup: LoopData<HashMap<CryptoHash, AccountId>>,
) {
let clock = test.clock();
let originator = originator.clone();
let senders = senders.clone();
stream.handle1(test, route_back_lookup, move |msg, route_back_lookup| {
let PeerManagerAdapterMessage::_request_sender(request) = msg else { return Err(msg) };
let PeerManagerMessageRequest::NetworkRequests(request) = request else {
return Err(PeerManagerAdapterMessage::_request_sender(request));
};
match route_network_message_to_shards_manager(
request,
&originator,
&senders,
network_delay,
&clock,
route_back_lookup,
) {
Ok(()) => Ok(()),
Err(request) => Err(PeerManagerAdapterMessage::_request_sender(
PeerManagerMessageRequest::NetworkRequests(request),
)),
}
});
}

// NOTE: this is no longer needed for TestLoop, but some other non-TestLoop tests depend on it.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ShardsManagerResendChunkRequests;
Expand Down
Loading

0 comments on commit 1149acc

Please sign in to comment.