Skip to content

Commit

Permalink
Progress?
Browse files Browse the repository at this point in the history
  • Loading branch information
robin-near committed Nov 10, 2023
1 parent 6e0ad94 commit 2efe57d
Show file tree
Hide file tree
Showing 18 changed files with 410 additions and 622 deletions.
307 changes: 22 additions & 285 deletions chain/client/src/adapter.rs
Original file line number Diff line number Diff line change
@@ -1,90 +1,13 @@
use crate::client_actor::ClientActor;
use crate::view_client::ViewClientActor;
use near_async::actix::AddrWithAutoSpanContextExt;
use near_async::messaging::IntoSender;
use near_network::client::{BlockRequest, BlockResponse, ClientSenderForNetwork};
use near_network::types::{
NetworkInfo, PartialEncodedChunkForwardMsg, PartialEncodedChunkRequestMsg,
PartialEncodedChunkResponseMsg, ReasonForBan, StateResponseInfo,
PartialEncodedChunkForwardMsg, PartialEncodedChunkRequestMsg, PartialEncodedChunkResponseMsg,
};
use near_o11y::WithSpanContextExt;
use near_primitives::block::{Approval, Block, BlockHeader};
use near_primitives::challenge::Challenge;
use near_primitives::errors::InvalidTxError;
use near_primitives::hash::CryptoHash;
use near_primitives::network::PeerId;
use near_primitives::sharding::PartialEncodedChunk;
use near_primitives::transaction::SignedTransaction;
use near_primitives::types::{AccountId, ShardId};
use near_primitives::views::FinalExecutionOutcomeView;

/// Transaction status query
#[derive(actix::Message, Debug)]
#[rtype(result = "Option<Box<FinalExecutionOutcomeView>>")]
pub(crate) struct TxStatusRequest {
pub tx_hash: CryptoHash,
pub signer_account_id: AccountId,
}

/// Transaction status response
#[derive(actix::Message, Debug)]
#[rtype(result = "()")]
pub(crate) struct TxStatusResponse(pub Box<FinalExecutionOutcomeView>);

/// Request a block.
#[derive(actix::Message, Debug)]
#[rtype(result = "Option<Box<Block>>")]
pub(crate) struct BlockRequest(pub CryptoHash);

/// Block response.
#[derive(actix::Message, Debug)]
#[rtype(result = "()")]
pub struct BlockResponse {
pub block: Block,
pub peer_id: PeerId,
pub was_requested: bool,
}

#[derive(actix::Message, Debug)]
#[rtype(result = "()")]
pub struct BlockApproval(pub Approval, pub PeerId);

/// Request headers.
#[derive(actix::Message, Debug)]
#[rtype(result = "Option<Vec<BlockHeader>>")]
pub(crate) struct BlockHeadersRequest(pub Vec<CryptoHash>);

/// Headers response.
#[derive(actix::Message, Debug)]
#[rtype(result = "Result<(),ReasonForBan>")]
pub(crate) struct BlockHeadersResponse(pub Vec<BlockHeader>, pub PeerId);

/// State request header.
#[derive(actix::Message, Debug)]
#[rtype(result = "Option<StateResponse>")]
pub struct StateRequestHeader {
pub shard_id: ShardId,
pub sync_hash: CryptoHash,
}

/// State request part.
#[derive(actix::Message, Debug)]
#[rtype(result = "Option<StateResponse>")]
pub struct StateRequestPart {
pub shard_id: ShardId,
pub sync_hash: CryptoHash,
pub part_id: u64,
}

/// Response to state request.
#[derive(actix::Message, Debug)]
#[rtype(result = "()")]
pub struct StateResponse(pub Box<StateResponseInfo>);

#[derive(actix::Message, Debug)]
#[rtype(result = "()")]
pub struct SetNetworkInfo(pub NetworkInfo);

#[derive(actix::Message, Debug)]
#[rtype(result = "()")]
pub(crate) struct RecvChallenge(pub Challenge);

#[derive(actix::Message, Debug)]
#[rtype(result = "()")]
Expand All @@ -105,211 +28,25 @@ pub(crate) struct RecvPartialEncodedChunkResponse(
#[rtype(result = "()")]
pub(crate) struct RecvPartialEncodedChunkRequest(pub PartialEncodedChunkRequestMsg, pub CryptoHash);

#[derive(actix::Message, Debug)]
#[rtype(result = "ProcessTxResponse")]
pub struct ProcessTxRequest {
pub transaction: SignedTransaction,
pub is_forwarded: bool,
pub check_only: bool,
}

#[derive(actix::MessageResponse, Debug, PartialEq, Eq)]
pub enum ProcessTxResponse {
/// No response.
NoResponse,
/// Valid transaction inserted into mempool as response to Transaction.
ValidTx,
/// Invalid transaction inserted into mempool as response to Transaction.
InvalidTx(InvalidTxError),
/// The request is routed to other shards
RequestRouted,
/// The node being queried does not track the shard needed and therefore cannot provide userful
/// response.
DoesNotTrackShard,
}

pub struct Adapter {
/// Address of the client actor.
pub fn client_sender_for_network(
client_addr: actix::Addr<ClientActor>,
/// Address of the view client actor.
view_client_addr: actix::Addr<ViewClientActor>,
}

impl Adapter {
pub fn new(
client_addr: actix::Addr<ClientActor>,
view_client_addr: actix::Addr<ViewClientActor>,
) -> Self {
Self { client_addr, view_client_addr }
}
}

#[async_trait::async_trait]
impl near_network::client::Client for Adapter {
async fn tx_status_request(
&self,
account_id: AccountId,
tx_hash: CryptoHash,
) -> Option<Box<FinalExecutionOutcomeView>> {
match self
.view_client_addr
.send(TxStatusRequest { tx_hash, signer_account_id: account_id }.with_span_context())
.await
{
Ok(res) => res,
Err(err) => {
tracing::error!("mailbox error: {err}");
None
}
}
}

async fn tx_status_response(&self, tx_result: FinalExecutionOutcomeView) {
match self
.view_client_addr
.send(TxStatusResponse(Box::new(tx_result.clone())).with_span_context())
.await
{
Ok(()) => {}
Err(err) => {
tracing::error!("mailbox error: {err}");
}
}
}

async fn state_request_header(
&self,
shard_id: ShardId,
sync_hash: CryptoHash,
) -> Result<Option<StateResponseInfo>, ReasonForBan> {
match self
.view_client_addr
.send(StateRequestHeader { shard_id, sync_hash }.with_span_context())
.await
{
Ok(Some(StateResponse(resp))) => Ok(Some(*resp)),
Ok(None) => Ok(None),
Err(err) => {
tracing::error!("mailbox error: {err}");
Ok(None)
}
}
}

async fn state_request_part(
&self,
shard_id: ShardId,
sync_hash: CryptoHash,
part_id: u64,
) -> Result<Option<StateResponseInfo>, ReasonForBan> {
match self
.view_client_addr
.send(StateRequestPart { shard_id, sync_hash, part_id }.with_span_context())
.await
{
Ok(Some(StateResponse(resp))) => Ok(Some(*resp)),
Ok(None) => Ok(None),
Err(err) => {
tracing::error!("mailbox error: {err}");
Ok(None)
}
}
}

async fn state_response(&self, info: StateResponseInfo) {
match self.client_addr.send(StateResponse(Box::new(info)).with_span_context()).await {
Ok(()) => {}
Err(err) => tracing::error!("mailbox error: {err}"),
}
}

async fn block_approval(&self, approval: Approval, peer_id: PeerId) {
match self.client_addr.send(BlockApproval(approval, peer_id).with_span_context()).await {
Ok(()) => {}
Err(err) => tracing::error!("mailbox error: {err}"),
}
}

async fn transaction(&self, transaction: SignedTransaction, is_forwarded: bool) {
match self
.client_addr
.send(
ProcessTxRequest { transaction, is_forwarded, check_only: false }
.with_span_context(),
)
.await
{
Ok(ProcessTxResponse::InvalidTx(err)) => {
tracing::warn!(target: "network", ?err, "Received invalid tx");
// TODO: count as malicious behavior?
}
Ok(_) => {}
Err(err) => {
tracing::error!("mailbox error: {err}");
}
}
}

async fn block_request(&self, hash: CryptoHash) -> Option<Box<Block>> {
match self.view_client_addr.send(BlockRequest(hash).with_span_context()).await {
Ok(res) => res,
Err(err) => {
tracing::error!("mailbox error: {err}");
None
}
}
}

async fn block_headers_request(&self, hashes: Vec<CryptoHash>) -> Option<Vec<BlockHeader>> {
match self.view_client_addr.send(BlockHeadersRequest(hashes).with_span_context()).await {
Ok(headers) => headers,
Err(err) => {
tracing::error!("mailbox error: {err}");
None
}
}
}

async fn block(&self, block: Block, peer_id: PeerId, was_requested: bool) {
match self
.client_addr
.send(BlockResponse { block, peer_id, was_requested }.with_span_context())
.await
{
Ok(()) => {}
Err(err) => tracing::error!("mailbox error: {err}"),
}
}

async fn block_headers(
&self,
headers: Vec<BlockHeader>,
peer_id: PeerId,
) -> Result<(), ReasonForBan> {
match self
.client_addr
.send(BlockHeadersResponse(headers, peer_id).with_span_context())
.await
{
Ok(res) => res,
Err(err) => {
tracing::error!("mailbox error: {err}");
Ok(())
}
}
}

async fn challenge(&self, challenge: Challenge) {
match self.client_addr.send(RecvChallenge(challenge).with_span_context()).await {
Ok(()) => {}
Err(err) => tracing::error!("mailbox error: {err}"),
}
}

async fn network_info(&self, info: NetworkInfo) {
match self.client_addr.send(SetNetworkInfo(info).with_span_context()).await {
Ok(()) => {}
Err(err) => tracing::error!("mailbox error: {err}"),
}
) -> ClientSenderForNetwork {
let client_addr = client_addr.with_auto_span_context();
let view_client_addr = view_client_addr.with_auto_span_context();
ClientSenderForNetwork {
block: client_addr.clone().into_sender(),
block_headers: client_addr.clone().into_sender(),
block_approval: client_addr.clone().into_sender(),
block_headers_request: view_client_addr.clone().into_sender(),
block_request: view_client_addr.clone().into_sender(),
challenge: client_addr.clone().into_sender(),
network_info: client_addr.clone().into_sender(),
state_request_header: view_client_addr.clone().into_sender(),
state_request_part: view_client_addr.clone().into_sender(),
state_response: client_addr.clone().into_sender(),
transaction: client_addr.clone().into_sender(),
tx_status_request: view_client_addr.clone().into_sender(),
tx_status_response: view_client_addr.clone().into_sender(),
}
}
8 changes: 4 additions & 4 deletions chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@
//! Unfortunately, this is not the case today. We are in the process of refactoring ClientActor
//! https://github.com/near/nearcore/issues/7899

use crate::adapter::{
BlockApproval, BlockHeadersResponse, BlockResponse, ProcessTxRequest, ProcessTxResponse,
RecvChallenge, SetNetworkInfo, StateResponse,
};
#[cfg(feature = "test_features")]
use crate::client::AdvProduceBlocksMode;
use crate::client::{Client, EPOCH_START_INFO_BLOCKS};
Expand Down Expand Up @@ -47,6 +43,10 @@ use near_client_primitives::types::{
};
use near_epoch_manager::shard_tracker::ShardTracker;
use near_epoch_manager::EpochManagerAdapter;
use near_network::client::{
BlockApproval, BlockHeadersResponse, BlockResponse, ProcessTxRequest, ProcessTxResponse,
RecvChallenge, SetNetworkInfo, StateResponse,
};
use near_network::types::ReasonForBan;
use near_network::types::{
NetworkInfo, NetworkRequests, PeerManagerAdapter, PeerManagerMessageRequest,
Expand Down
8 changes: 4 additions & 4 deletions chain/client/src/view_client.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
//! Readonly view of the chain and state of the database.
//! Useful for querying from RPC.

use crate::adapter::{
BlockHeadersRequest, BlockRequest, StateRequestHeader, StateRequestPart, StateResponse,
TxStatusRequest, TxStatusResponse,
};
use crate::{
metrics, sync, GetChunk, GetExecutionOutcomeResponse, GetNextLightClientBlock, GetStateChanges,
GetStateChangesInBlock, GetValidatorInfo, GetValidatorOrdered,
Expand All @@ -29,6 +25,10 @@ use near_client_primitives::types::{
};
use near_epoch_manager::shard_tracker::ShardTracker;
use near_epoch_manager::EpochManagerAdapter;
use near_network::client::{
BlockHeadersRequest, BlockRequest, StateRequestHeader, StateRequestPart, StateResponse,
TxStatusRequest, TxStatusResponse,
};
use near_network::types::{
NetworkRequests, PeerManagerAdapter, PeerManagerMessageRequest, StateResponseInfo,
StateResponseInfoV2,
Expand Down
Loading

0 comments on commit 2efe57d

Please sign in to comment.