Skip to content

Commit

Permalink
feat(congestion): reject new transactions on RPC level (#11419)
Browse files Browse the repository at this point in the history
Summary: In this PR, we introduce a new failure mode on the RPC level
when a transaction is submitted under congestion. The error is of type
`InvalidTxError` and called `ShardCongested` with a single field
`shard_id` referencing the congested shard.

## Details

With [cross-shard congestion
control](near/NEPs#539) being stabilized soon,
we must deal with the case when a shard rejects new transactions.

On the chunk producer level, all transactions going to a congested shard
will be dropped. This keeps the memory requirements of chunk producers
bounded. Further, we decided to go for a relatively low threshold in
order to keep the latency of accepted transactions low, preventing new
transactions as soon as we hit 25% congestion on a specific shard.
Consequently, when shards are congested, it will not be long before
transactions are rejected.

This has consequences for the users. On the positive side, they will no
longer have to wait for a long time not knowing if their transaction
will be accepted or not. Either, it is executed within a bounded time
(at most 20 blocks after inclusion) or it will be rejected immediately.

But on the negative side, when a shard is congested, they will have to
actively retry sending the transaction until it gets accepted. We hope
that this can be automated by wallets, which can also provide useful
live updates to the user about what is happening. But for this, they
will need to understand and handle the new error `ShardCongested`
differently from existing errors. The key difference is that the same
signed transaction can be sent again and will be accepted if congestion
has gone down.
  • Loading branch information
jakmeier authored Jun 4, 2024
1 parent ec55a69 commit dd1e278
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 10 deletions.
16 changes: 16 additions & 0 deletions chain/chain/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,9 +643,25 @@ impl RuntimeAdapter for NightshadeRuntime {
verify_signature: bool,
epoch_id: &EpochId,
current_protocol_version: ProtocolVersion,
receiver_congestion_info: Option<ExtendedCongestionInfo>,
) -> Result<Option<InvalidTxError>, Error> {
let runtime_config = self.runtime_config_store.get_config(current_protocol_version);

if let Some(congestion_info) = receiver_congestion_info {
let congestion_control = CongestionControl::new(
runtime_config.congestion_control_config,
congestion_info.congestion_info,
congestion_info.missed_chunks_count,
);
if !congestion_control.shard_accepts_transactions() {
let receiver_shard =
self.account_id_to_shard_uid(transaction.transaction.receiver_id(), epoch_id)?;
return Ok(Some(InvalidTxError::ShardCongested {
shard_id: receiver_shard.shard_id,
}));
}
}

if let Some(state_root) = state_root {
let shard_uid =
self.account_id_to_shard_uid(transaction.transaction.signer_id(), epoch_id)?;
Expand Down
3 changes: 2 additions & 1 deletion chain/chain/src/test_utils/kv_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use near_primitives::account::{AccessKey, Account};
use near_primitives::apply::ApplyChunkReason;
use near_primitives::block::Tip;
use near_primitives::block_header::{Approval, ApprovalInner};
use near_primitives::congestion_info::CongestionInfo;
use near_primitives::congestion_info::{CongestionInfo, ExtendedCongestionInfo};
use near_primitives::epoch_manager::block_info::BlockInfo;
use near_primitives::epoch_manager::epoch_info::EpochInfo;
use near_primitives::epoch_manager::EpochConfig;
Expand Down Expand Up @@ -1089,6 +1089,7 @@ impl RuntimeAdapter for KeyValueRuntime {
_verify_signature: bool,
_epoch_id: &EpochId,
_current_protocol_version: ProtocolVersion,
_receiver_congestion_info: Option<ExtendedCongestionInfo>,
) -> Result<Option<InvalidTxError>, Error> {
Ok(None)
}
Expand Down
1 change: 1 addition & 0 deletions chain/chain/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ pub trait RuntimeAdapter: Send + Sync {
verify_signature: bool,
epoch_id: &EpochId,
current_protocol_version: ProtocolVersion,
receiver_congestion_info: Option<ExtendedCongestionInfo>,
) -> Result<Option<InvalidTxError>, Error>;

/// Returns an ordered list of valid transactions from the pool up the given limits.
Expand Down
28 changes: 24 additions & 4 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2270,7 +2270,8 @@ impl Client {
) -> Result<ProcessTxResponse, Error> {
let head = self.chain.head()?;
let me = self.validator_signer.as_ref().map(|vs| vs.validator_id());
let cur_block_header = self.chain.head_header()?;
let cur_block = self.chain.get_head_block()?;
let cur_block_header = cur_block.header();
let transaction_validity_period = self.chain.transaction_validity_period;
// here it is fine to use `cur_block_header` as it is a best effort estimate. If the transaction
// were to be included, the block that the chunk points to will have height >= height of
Expand All @@ -2285,12 +2286,23 @@ impl Client {
}
let gas_price = cur_block_header.next_gas_price();
let epoch_id = self.epoch_manager.get_epoch_id_from_prev_block(&head.last_block_hash)?;

let receiver_shard =
self.epoch_manager.account_id_to_shard_id(tx.transaction.receiver_id(), &epoch_id)?;
let receiver_congestion_info =
cur_block.shards_congestion_info().get(&receiver_shard).copied();
let protocol_version = self.epoch_manager.get_epoch_protocol_version(&epoch_id)?;

if let Some(err) = self
.runtime_adapter
.validate_tx(gas_price, None, tx, true, &epoch_id, protocol_version)
.validate_tx(
gas_price,
None,
tx,
true,
&epoch_id,
protocol_version,
receiver_congestion_info,
)
.expect("no storage errors")
{
debug!(target: "client", tx_hash = ?tx.get_hash(), ?err, "Invalid tx during basic validation");
Expand Down Expand Up @@ -2322,7 +2334,15 @@ impl Client {
};
if let Some(err) = self
.runtime_adapter
.validate_tx(gas_price, Some(state_root), tx, false, &epoch_id, protocol_version)
.validate_tx(
gas_price,
Some(state_root),
tx,
false,
&epoch_id,
protocol_version,
receiver_congestion_info,
)
.expect("no storage errors")
{
debug!(target: "client", ?err, "Invalid tx");
Expand Down
10 changes: 9 additions & 1 deletion chain/jsonrpc/res/rpc_errors_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,8 @@
"ActionsValidation",
"TransactionSizeExceeded",
"InvalidTransactionVersion",
"StorageError"
"StorageError",
"ShardCongested"
],
"props": {}
},
Expand Down Expand Up @@ -773,6 +774,13 @@
"subtypes": [],
"props": {}
},
"ShardCongested": {
"name": "ShardCongested",
"subtypes": [],
"props": {
"shard_id": ""
}
},
"SignerDoesNotExist": {
"name": "SignerDoesNotExist",
"subtypes": [],
Expand Down
8 changes: 8 additions & 0 deletions core/primitives/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ pub enum InvalidTxError {
InvalidTransactionVersion,
// Error occurred during storage access
StorageError(StorageError),
/// The receiver shard of the transaction is too congested to accept new
/// transactions at the moment.
ShardCongested {
shard_id: u32,
},
}

impl From<StorageError> for InvalidTxError {
Expand Down Expand Up @@ -620,6 +625,9 @@ impl Display for InvalidTxError {
InvalidTxError::StorageError(error) => {
write!(f, "Storage error: {}", error)
}
InvalidTxError::ShardCongested { shard_id } => {
write!(f, "Shard {shard_id} is currently congested and rejects new transactions.")
}
}
}
}
Expand Down
61 changes: 59 additions & 2 deletions integration-tests/src/tests/client/features/congestion_control.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use assert_matches::assert_matches;
use near_chain_configs::Genesis;
use near_client::test_utils::TestEnv;
use near_client::ProcessTxResponse;
Expand All @@ -6,7 +7,9 @@ use near_o11y::testonly::init_test_logger;
use near_parameters::{RuntimeConfig, RuntimeConfigStore};
use near_primitives::account::id::AccountId;
use near_primitives::congestion_info::{CongestionControl, CongestionInfo};
use near_primitives::errors::{ActionErrorKind, FunctionCallError, TxExecutionError};
use near_primitives::errors::{
ActionErrorKind, FunctionCallError, InvalidTxError, TxExecutionError,
};
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::ShardLayout;
use near_primitives::sharding::{ShardChunk, ShardChunkHeader};
Expand Down Expand Up @@ -292,7 +295,11 @@ fn submit_n_100tgas_fns(env: &mut TestEnv, n: u32, nonce: &mut u64, signer: &InM
let fn_tx = new_fn_call_100tgas(nonce, signer, *block.hash());
// this only adds the tx to the pool, no chain progress is made
let response = env.clients[0].process_tx(fn_tx, false, false);
assert_eq!(response, ProcessTxResponse::ValidTx);
match response {
ProcessTxResponse::ValidTx
| ProcessTxResponse::InvalidTx(InvalidTxError::ShardCongested { .. }) => (),
other => panic!("unexpected result from submitting tx: {other:?}"),
}
}
}

Expand Down Expand Up @@ -565,3 +572,53 @@ fn measure_tx_limit(
local_tx_included_with_congestion,
)
}

/// Test that RPC clients stop accepting transactions when the receiver is
/// congested.
#[test]
fn test_rpc_client_rejection() {
let sender_id: AccountId = "test0".parse().unwrap();
let mut env = setup_runtime(sender_id.clone(), PROTOCOL_VERSION);

// prepare a contract to call
setup_contract(&mut env);

let signer = InMemorySigner::from_seed(sender_id.clone(), KeyType::ED25519, sender_id.as_str());
let mut nonce = 10;

// Check we can send transactions at the start.
let fn_tx = new_fn_call_100tgas(
&mut nonce,
&signer,
*env.clients[0].chain.head_header().unwrap().hash(),
);
let response = env.clients[0].process_tx(fn_tx, false, false);
assert_eq!(response, ProcessTxResponse::ValidTx);

// Congest the network with a burst of 100 PGas.
submit_n_100tgas_fns(&mut env, 1_000, &mut nonce, &signer);

// Allow transactions to enter the chain and enough receipts to arrive at
// the receiver shard for it to become congested.
let tip = env.clients[0].chain.head().unwrap();
for i in 1..10 {
env.produce_block(0, tip.height + i);
}

// Check that congestion control rejects new transactions.
let fn_tx = new_fn_call_100tgas(
&mut nonce,
&signer,
*env.clients[0].chain.head_header().unwrap().hash(),
);
let response = env.clients[0].process_tx(fn_tx, false, false);

if ProtocolFeature::CongestionControl.enabled(PROTOCOL_VERSION) {
assert_matches!(
response,
ProcessTxResponse::InvalidTx(InvalidTxError::ShardCongested { .. })
);
} else {
assert_eq!(response, ProcessTxResponse::ValidTx);
}
}
2 changes: 1 addition & 1 deletion integration-tests/src/tests/client/resharding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ impl TestReshardingEnv {
.validator_seats(num_validators)
.real_stores()
.epoch_managers_with_test_overrides(epoch_config_test_overrides)
.nightshade_runtimes(&genesis)
.nightshade_runtimes_congestion_control_disabled(&genesis)
.track_all_shards()
.build();
assert_eq!(env.validators.len(), num_validators);
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/src/tests/client/sync_state_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ fn test_dump_epoch_missing_chunk_in_last_block() {
.clients_count(2)
.use_state_snapshots()
.real_stores()
.nightshade_runtimes(&genesis)
.nightshade_runtimes_congestion_control_disabled(&genesis)
.build();

let genesis_block = env.clients[0].chain.get_block_by_height(0).unwrap();
Expand Down

0 comments on commit dd1e278

Please sign in to comment.