-
Notifications
You must be signed in to change notification settings - Fork 618
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(resharding): simple nayduck test for resharding #10162
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2386,10 +2386,6 @@ impl Client { | |
let state_sync_timeout = self.config.state_sync_timeout; | ||
let epoch_id = self.chain.get_block(&sync_hash)?.header().epoch_id().clone(); | ||
|
||
// TODO(resharding) what happens to the shards_to_split here when | ||
// catchup_state_syncs already contains an entry for the sync hash? | ||
// Does it get overwritten? Are we guaranteed that the existing | ||
// entry contains the same data? | ||
let (state_sync, shards_to_split, blocks_catch_up_state) = | ||
self.catchup_state_syncs.entry(sync_hash).or_insert_with(|| { | ||
notify_state_sync = true; | ||
|
@@ -2516,8 +2512,6 @@ impl Client { | |
.iter() | ||
.filter_map(|ShardInfo(shard_id, _)| self.should_split_shard(shard_id, me, prev_hash)) | ||
.collect(); | ||
// For colour decorators to work, they need to printed directly. Otherwise the decorators get escaped, garble output and don't add colours. | ||
debug!(target: "catchup", progress_per_shard = ?format_shard_sync_phase_per_shard(&shards_to_split, false), "Need to split states for shards"); | ||
Comment on lines
-2519
to
-2520
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This log line doesn't make much sense.
|
||
Ok(shards_to_split) | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -168,17 +168,26 @@ pub struct StateSplitConfig { | |
/// decreased if resharding is consuming too many resources and interfering | ||
/// with regular node operation. | ||
pub batch_size: bytesize::ByteSize, | ||
|
||
/// The delay between writing batches to the db. The batch delay can be | ||
/// increased if resharding is consuming too many resources and interfering | ||
/// with regular node operation. | ||
pub batch_delay: Duration, | ||
|
||
/// The delay between attempts to start resharding while waiting for the | ||
/// state snapshot to become available. | ||
pub retry_delay: Duration, | ||
} | ||
|
||
impl Default for StateSplitConfig { | ||
fn default() -> Self { | ||
// Conservative default for a slower resharding that puts as little | ||
// extra load on the node as possible. | ||
Self { batch_size: bytesize::ByteSize::kb(500), batch_delay: Duration::from_millis(100) } | ||
Self { | ||
batch_size: bytesize::ByteSize::kb(500), | ||
batch_delay: Duration::from_millis(100), | ||
retry_delay: Duration::from_secs(10), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 30 sec? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fine with any value you find appropriate. I don't think it's going to matter too much as long as it's in the domain of a couple of seconds. |
||
} | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
# A library with the common constants and functions for testing resharding. | ||
|
||
V1_PROTOCOL_VERSION = 48 | ||
V2_PROTOCOL_VERSION = 135 | ||
|
||
V0_SHARD_LAYOUT = { | ||
"V0": { | ||
"num_shards": 1, | ||
"version": 0 | ||
}, | ||
} | ||
V1_SHARD_LAYOUT = { | ||
"V1": { | ||
"boundary_accounts": [ | ||
"aurora", "aurora-0", "kkuuue2akv_1630967379.near" | ||
], | ||
"shards_split_map": [[0, 1, 2, 3]], | ||
"to_parent_shard_map": [0, 0, 0, 0], | ||
"version": 1 | ||
} | ||
} | ||
|
||
|
||
# Append the genesis config changes that are required for testing resharding. | ||
# This method will set the protocol version, shard layout and a few other | ||
# configs so that it matches the protocol configuration as of right before the | ||
# protocol version of the binary under test. | ||
def append_shard_layout_config_changes( | ||
genesis_config_changes, | ||
binary_protocol_version, | ||
logger=None, | ||
): | ||
genesis_config_changes.append(["use_production_config", True]) | ||
|
||
if binary_protocol_version >= V2_PROTOCOL_VERSION: | ||
if logger: | ||
logger.info("Testing migration from V1 to V2.") | ||
# Set the initial protocol version to a version just before V2. | ||
genesis_config_changes.append([ | ||
"protocol_version", | ||
V2_PROTOCOL_VERSION - 1, | ||
]) | ||
genesis_config_changes.append([ | ||
"shard_layout", | ||
V1_SHARD_LAYOUT, | ||
]) | ||
genesis_config_changes.append([ | ||
"num_block_producer_seats_per_shard", | ||
[1, 1, 1, 1], | ||
]) | ||
genesis_config_changes.append([ | ||
"avg_hidden_validator_seats_per_shard", | ||
[0, 0, 0, 0], | ||
]) | ||
return | ||
|
||
if binary_protocol_version >= V1_PROTOCOL_VERSION: | ||
if logger: | ||
logger.info("Testing migration from V0 to V1.") | ||
# Set the initial protocol version to a version just before V1. | ||
genesis_config_changes.append([ | ||
"protocol_version", | ||
V1_PROTOCOL_VERSION - 1, | ||
]) | ||
genesis_config_changes.append([ | ||
"shard_layout", | ||
V0_SHARD_LAYOUT, | ||
]) | ||
genesis_config_changes.append([ | ||
"num_block_producer_seats_per_shard", | ||
[100], | ||
]) | ||
genesis_config_changes.append([ | ||
"avg_hidden_validator_seats_per_shard", | ||
[0], | ||
]) | ||
return | ||
|
||
assert False | ||
|
||
|
||
def get_genesis_shard_layout_version(binary_protocol_version): | ||
if binary_protocol_version >= V2_PROTOCOL_VERSION: | ||
return 1 | ||
if binary_protocol_version >= V1_PROTOCOL_VERSION: | ||
return 0 | ||
|
||
assert False | ||
|
||
|
||
def get_target_shard_layout_version(binary_protocol_version): | ||
if binary_protocol_version >= V2_PROTOCOL_VERSION: | ||
return 2 | ||
if binary_protocol_version >= V1_PROTOCOL_VERSION: | ||
return 1 | ||
|
||
assert False | ||
|
||
|
||
def get_genesis_num_shards(binary_protocol_version): | ||
if binary_protocol_version >= V2_PROTOCOL_VERSION: | ||
return 4 | ||
if binary_protocol_version >= V1_PROTOCOL_VERSION: | ||
return 1 | ||
|
||
assert False | ||
|
||
|
||
def get_target_num_shards(binary_protocol_version): | ||
if binary_protocol_version >= V2_PROTOCOL_VERSION: | ||
return 5 | ||
if binary_protocol_version >= V1_PROTOCOL_VERSION: | ||
return 4 | ||
|
||
assert False |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,134 @@ | ||
#!/usr/bin/env python3 | ||
|
||
# Small test for resharding. Spins up a few nodes from genesis with the previous | ||
# shard layout, waits for a few epochs and verifies that the shard layout is | ||
# upgraded. | ||
# Usage: | ||
# python3 pytest/tests/sanity/resharding.py | ||
|
||
import unittest | ||
import sys | ||
import pathlib | ||
|
||
sys.path.append(str(pathlib.Path(__file__).resolve().parents[2] / 'lib')) | ||
|
||
from configured_logger import logger | ||
from cluster import get_binary_protocol_version, init_cluster, load_config, spin_up_node | ||
from utils import MetricsTracker, poll_blocks | ||
from resharding_lib import append_shard_layout_config_changes, get_genesis_num_shards, get_genesis_shard_layout_version, get_target_num_shards, get_target_shard_layout_version | ||
|
||
|
||
class ReshardingTest(unittest.TestCase): | ||
|
||
def setUp(self) -> None: | ||
self.epoch_length = 5 | ||
self.config = load_config() | ||
self.binary_protocol_version = get_binary_protocol_version(self.config) | ||
assert self.binary_protocol_version is not None | ||
|
||
self.genesis_shard_layout_version = get_genesis_shard_layout_version( | ||
self.binary_protocol_version) | ||
self.target_shard_layout_version = get_target_shard_layout_version( | ||
self.binary_protocol_version) | ||
|
||
self.genesis_num_shards = get_genesis_num_shards( | ||
self.binary_protocol_version) | ||
self.target_num_shards = get_target_num_shards( | ||
self.binary_protocol_version) | ||
|
||
def __get_genesis_config_changes(self): | ||
genesis_config_changes = [ | ||
["epoch_length", self.epoch_length], | ||
] | ||
|
||
append_shard_layout_config_changes( | ||
genesis_config_changes, | ||
self.binary_protocol_version, | ||
logger, | ||
) | ||
|
||
return genesis_config_changes | ||
|
||
def __get_client_config_changes(self, num_nodes): | ||
single = { | ||
"tracked_shards": [0], | ||
"state_split_config": { | ||
"batch_size": 1000000, | ||
# don't throttle resharding | ||
"batch_delay": { | ||
"secs": 0, | ||
"nanos": 0, | ||
}, | ||
# retry often to start resharding as fast as possible | ||
"retry_delay": { | ||
"secs": 0, | ||
"nanos": 500_000_000 | ||
} | ||
} | ||
} | ||
return {i: single for i in range(num_nodes)} | ||
|
||
def test_resharding(self): | ||
logger.info("The resharding test is starting.") | ||
num_nodes = 2 | ||
|
||
genesis_config_changes = self.__get_genesis_config_changes() | ||
client_config_changes = self.__get_client_config_changes(num_nodes) | ||
|
||
near_root, [node0_dir, node1_dir] = init_cluster( | ||
num_nodes=num_nodes, | ||
num_observers=0, | ||
num_shards=1, | ||
config=self.config, | ||
genesis_config_changes=genesis_config_changes, | ||
client_config_changes=client_config_changes, | ||
) | ||
|
||
node0 = spin_up_node( | ||
self.config, | ||
near_root, | ||
node0_dir, | ||
0, | ||
) | ||
node1 = spin_up_node( | ||
self.config, | ||
near_root, | ||
node1_dir, | ||
1, | ||
boot_node=node0, | ||
) | ||
|
||
metrics_tracker = MetricsTracker(node0) | ||
|
||
for height, _ in poll_blocks(node0): | ||
version = metrics_tracker.get_int_metric_value( | ||
"near_shard_layout_version") | ||
num_shards = metrics_tracker.get_int_metric_value( | ||
"near_shard_layout_num_shards") | ||
|
||
logger.info( | ||
f"#{height} shard layout version: {version}, num shards: {num_shards} " | ||
) | ||
|
||
# This may be flaky - it shouldn't - but it may. We collect metrics | ||
# after the block is processed. If there is some delay the shard | ||
# layout may change and the assertions below will fail. | ||
|
||
if height <= 2 * self.epoch_length + 1: | ||
self.assertEqual(version, self.genesis_shard_layout_version) | ||
self.assertEqual(num_shards, self.genesis_num_shards) | ||
else: | ||
self.assertEqual(version, self.target_shard_layout_version) | ||
self.assertEqual(num_shards, self.target_num_shards) | ||
|
||
if height >= 4 * self.epoch_length: | ||
break | ||
|
||
node0.kill() | ||
node1.kill() | ||
|
||
logger.info("The resharding test is finished.") | ||
|
||
|
||
if __name__ == '__main__': | ||
unittest.main() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checked that accidentaly. It's fine.