Skip to content

Commit

Permalink
[resharding] Add pytest for checking RPC calls after resharding (#10296)
Browse files Browse the repository at this point in the history
This PR adds a basic pytest to check if the RPC calls to tx endpoint
work after a resharding event.

Note that this test currently works under the assumption that the node
is tracking all shards and future improvements to the tests where we may
need to redirect the request to a node tracking the specific shard we
are interested in, would come later.

Resolving #5047
  • Loading branch information
Shreyan Gupta authored Dec 5, 2023
1 parent 2fcbeb7 commit 6ce6eb2
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 85 deletions.
4 changes: 2 additions & 2 deletions nightly/pytest-sanity.txt
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ pytest --timeout=600 sanity/split_storage.py --features nightly
# Test for resharding
pytest --timeout=120 sanity/resharding.py
pytest --timeout=120 sanity/resharding.py --features nightly

# Test for resharding error handling
pytest --timeout=120 sanity/resharding_error_handling.py
pytest --timeout=120 sanity/resharding_error_handling.py --features nightly
pytest --timeout=120 sanity/resharding_rpc_tx.py
pytest --timeout=120 sanity/resharding_rpc_tx.py --features nightly
39 changes: 39 additions & 0 deletions pytest/lib/resharding_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,20 @@
}


def get_genesis_config_changes(epoch_length,
binary_protocol_version,
logger=None):
genesis_config_changes = [
["epoch_length", epoch_length],
]
append_shard_layout_config_changes(
genesis_config_changes,
binary_protocol_version,
logger,
)
return genesis_config_changes


# Append the genesis config changes that are required for testing resharding.
# This method will set the protocol version, shard layout and a few other
# configs so that it matches the protocol configuration as of right before the
Expand Down Expand Up @@ -122,3 +136,28 @@ def get_epoch_offset(binary_protocol_version):
return 0

assert False


def get_client_config_changes(num_nodes, initial_delay=None):
single = {
"tracked_shards": [0],
"state_split_config": {
"batch_size": 1000000,
# don't throttle resharding
"batch_delay": {
"secs": 0,
"nanos": 0,
},
# retry often to start resharding as fast as possible
"retry_delay": {
"secs": 0,
"nanos": 100_000_000
}
}
}
if initial_delay is not None:
single["state_split_config"]["initial_delay"] = {
"secs": initial_delay,
"nanos": 0
}
return {i: single for i in range(num_nodes)}
39 changes: 4 additions & 35 deletions pytest/tests/sanity/resharding.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from configured_logger import logger
from cluster import get_binary_protocol_version, init_cluster, load_config, spin_up_node
from utils import MetricsTracker, poll_blocks
from resharding_lib import append_shard_layout_config_changes, get_epoch_offset, get_genesis_num_shards, get_genesis_shard_layout_version, get_target_num_shards, get_target_shard_layout_version
from resharding_lib import get_genesis_shard_layout_version, get_target_shard_layout_version, get_genesis_num_shards, get_target_num_shards, get_epoch_offset, get_genesis_config_changes, get_client_config_changes


class ReshardingTest(unittest.TestCase):
Expand All @@ -38,44 +38,13 @@ def setUp(self) -> None:

self.epoch_offset = get_epoch_offset(self.binary_protocol_version)

def __get_genesis_config_changes(self):
genesis_config_changes = [
["epoch_length", self.epoch_length],
]

append_shard_layout_config_changes(
genesis_config_changes,
self.binary_protocol_version,
logger,
)

return genesis_config_changes

def __get_client_config_changes(self, num_nodes):
single = {
"tracked_shards": [0],
"state_split_config": {
"batch_size": 1000000,
# don't throttle resharding
"batch_delay": {
"secs": 0,
"nanos": 0,
},
# retry often to start resharding as fast as possible
"retry_delay": {
"secs": 0,
"nanos": 100_000_000
}
}
}
return {i: single for i in range(num_nodes)}

def test_resharding(self):
logger.info("The resharding test is starting.")
num_nodes = 2

genesis_config_changes = self.__get_genesis_config_changes()
client_config_changes = self.__get_client_config_changes(num_nodes)
genesis_config_changes = get_genesis_config_changes(
self.epoch_length, self.binary_protocol_version, logger)
client_config_changes = get_client_config_changes(num_nodes)

near_root, [node0_dir, node1_dir] = init_cluster(
num_nodes=num_nodes,
Expand Down
53 changes: 5 additions & 48 deletions pytest/tests/sanity/resharding_error_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@
from configured_logger import logger
from cluster import corrupt_state_snapshot, get_binary_protocol_version, init_cluster, load_config, spin_up_node
from utils import MetricsTracker, poll_blocks, wait_for_blocks
from resharding_lib import append_shard_layout_config_changes, get_genesis_num_shards, get_genesis_shard_layout_version, get_target_num_shards, get_target_shard_layout_version
from resharding_lib import get_genesis_shard_layout_version, get_target_shard_layout_version, get_genesis_num_shards, get_target_num_shards, get_genesis_config_changes, get_client_config_changes


# TODO(resharding): refactor the resharding tests to re-use the common logic
class ReshardingErrorHandlingTest(unittest.TestCase):

def setUp(self) -> None:
Expand All @@ -39,47 +38,6 @@ def setUp(self) -> None:
self.target_num_shards = get_target_num_shards(
self.binary_protocol_version)

def __get_genesis_config_changes(self):
genesis_config_changes = [
["epoch_length", self.epoch_length],
]

append_shard_layout_config_changes(
genesis_config_changes,
self.binary_protocol_version,
logger,
)

return genesis_config_changes

def __get_client_config_changes(self, num_nodes):
single = {
"tracked_shards": [0],
# arbitrary long initial delay to not trigger resharding
# will get overwritten before restarting the node
"state_split_config": self.__get_state_split_config(10)
}
return {i: single for i in range(num_nodes)}

def __get_state_split_config(self, initial_delay):
return {
"batch_size": 1000000,
# don't throttle resharding
"batch_delay": {
"secs": 0,
"nanos": 0,
},
# retry often to start resharding as fast as possible
"retry_delay": {
"secs": 0,
"nanos": 100_000_000
},
"initial_delay": {
"secs": initial_delay,
"nanos": 0
},
}

# timeline by block number
# epoch_length + 2 - snapshot is requested
# epoch_length + 3 - snapshot is finished
Expand All @@ -91,8 +49,9 @@ def test_resharding(self):
logger.info("The resharding test is starting.")
num_nodes = 2

genesis_config_changes = self.__get_genesis_config_changes()
client_config_changes = self.__get_client_config_changes(num_nodes)
genesis_config_changes = get_genesis_config_changes(
self.epoch_length, self.binary_protocol_version, logger)
client_config_changes = get_client_config_changes(num_nodes, 10)

near_root, [node0_dir, node1_dir] = init_cluster(
num_nodes=num_nodes,
Expand Down Expand Up @@ -134,9 +93,7 @@ def test_resharding(self):
logger.info(f"corrupted state snapshot\n{output}")

# Update the initial delay to start resharding as soon as possible.
client_config_changes = {
"state_split_config": self.__get_state_split_config(initial_delay=0)
}
client_config_changes = get_client_config_changes(1, 0)[0]
node0.change_config(client_config_changes)
node1.change_config(client_config_changes)

Expand Down
119 changes: 119 additions & 0 deletions pytest/tests/sanity/resharding_rpc_tx.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
#!/usr/bin/env python3
"""
Testing RPC call to transaction status after a resharding event.
We create two account that we know would fall into different shards after resharding.
We submit a transfer transaction between the accounts and verify the transaction status after resharding.
"""

import sys
import unittest
import pathlib

sys.path.append(str(pathlib.Path(__file__).resolve().parents[2] / 'lib'))

from configured_logger import logger
import cluster
from resharding_lib import get_target_shard_layout_version, get_target_num_shards, get_genesis_config_changes, get_client_config_changes
import transaction
from utils import MetricsTracker, poll_blocks
import key

STARTING_AMOUNT = 123 * (10**24)


class ReshardingRpcTx(unittest.TestCase):

def setUp(self) -> None:
self.epoch_length = 5
self.config = cluster.load_config()
self.binary_protocol_version = cluster.get_binary_protocol_version(
self.config)
assert self.binary_protocol_version is not None

self.target_shard_layout_version = get_target_shard_layout_version(
self.binary_protocol_version)
self.target_num_shards = get_target_num_shards(
self.binary_protocol_version)

def __setup_account(self, account_id, nonce):
""" Create an account with full access key and balance. """
encoded_block_hash = self.node.get_latest_block().hash_bytes
account = key.Key.from_random(account_id)
account_tx = transaction.sign_create_account_with_full_access_key_and_balance_tx(
self.node.signer_key, account.account_id, account, STARTING_AMOUNT,
nonce, encoded_block_hash)
response = self.node.send_tx_and_wait(account_tx, timeout=20)
assert 'error' not in response, response
assert 'Failure' not in response['result']['status'], response
return account

def __submit_transfer_tx(self, from_key, to_account_id, nonce):
""" Submit a transfer transaction and wait for the response. """
encoded_block_hash = self.node.get_latest_block().hash_bytes
payment_tx = transaction.sign_payment_tx(from_key, to_account_id, 100,
nonce, encoded_block_hash)
response = self.node.send_tx_and_wait(payment_tx, timeout=20)
assert 'error' not in response, response
assert 'Failure' not in response['result']['status'], response
return response

def __verify_tx_status(self, transfer_response, sender_account_id):
tx_hash = transfer_response['result']['transaction']['hash']
response = self.node.get_tx(tx_hash, sender_account_id)
assert response == transfer_response, response
pass

def test_resharding_rpc_tx(self):
num_nodes = 2
genesis_config_changes = get_genesis_config_changes(
self.epoch_length, self.binary_protocol_version, logger)
client_config_changes = get_client_config_changes(num_nodes)
nodes = cluster.start_cluster(
num_nodes=num_nodes,
num_observers=0,
num_shards=1,
config=self.config,
genesis_config_changes=genesis_config_changes,
client_config_changes=client_config_changes)
self.node = nodes[0]

# The shard boundaries are at "kkuuue2akv_1630967379.near" and "tge-lockup.sweat" for shard 3 and 4
# We would like to create accounts that are in different shards
# The first account before and after resharding is in shard 3
# The second account after resharding is in shard 4
account0 = self.__setup_account('setup_test_account.test0', 1)
account1 = self.__setup_account('z_setup_test_account.test0', 2)

# Poll one block to create the accounts
poll_blocks(self.node)

# Submit a transfer transaction between the accounts, we would verify the transaction status later
response0 = self.__submit_transfer_tx(account0, account1.account_id,
6000001)
response1 = self.__submit_transfer_tx(account1, account0.account_id,
12000001)

metrics_tracker = MetricsTracker(self.node)
for height, _ in poll_blocks(self.node):
# wait for resharding to complete
if height < 2 * self.epoch_length:
continue

# Quick check whether resharding is completed
version = metrics_tracker.get_int_metric_value(
"near_shard_layout_version")
num_shards = metrics_tracker.get_int_metric_value(
"near_shard_layout_num_shards")
self.assertEqual(version, self.target_shard_layout_version)
self.assertEqual(num_shards, self.target_num_shards)

# Verify the transaction status after resharding
self.__verify_tx_status(response0, account0.account_id)
self.__verify_tx_status(response0, account1.account_id)
self.__verify_tx_status(response1, account0.account_id)
self.__verify_tx_status(response1, account1.account_id)
break


if __name__ == '__main__':
unittest.main()

0 comments on commit 6ce6eb2

Please sign in to comment.