From 07c89fea38cbeff0d3f2e28129e4231143eee350 Mon Sep 17 00:00:00 2001 From: wacban Date: Mon, 13 Nov 2023 17:10:26 +0000 Subject: [PATCH 1/2] feat(resharding): simple nayduck test for resharding --- chain/chain/src/chain.rs | 2 + chain/client/src/client.rs | 6 -- chain/client/src/sync_jobs_actor.rs | 8 +- core/chain-configs/src/client_config.rs | 11 +- nightly/pytest-sanity.txt | 4 + pytest/lib/resharding_lib.py | 115 +++++++++++++++++++++ pytest/tests/sanity/resharding.py | 132 ++++++++++++++++++++++++ pytest/tests/sanity/state_sync_fail.py | 71 +------------ 8 files changed, 270 insertions(+), 79 deletions(-) create mode 100644 pytest/lib/resharding_lib.py create mode 100644 pytest/tests/sanity/resharding.py diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index ba5f08a7355..9719604bd05 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -2136,6 +2136,8 @@ impl Chain { "start_process_block_impl", height = block_height) .entered(); + + tracing::debug!(target: "chain", "start process block"); // 0) Before we proceed with any further processing, we first check that the block // hash and signature matches to make sure the block is indeed produced by the assigned // block producer. If not, we drop the block immediately diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index 8bd24176f6a..ea793cbeff1 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -2386,10 +2386,6 @@ impl Client { let state_sync_timeout = self.config.state_sync_timeout; let epoch_id = self.chain.get_block(&sync_hash)?.header().epoch_id().clone(); - // TODO(resharding) what happens to the shards_to_split here when - // catchup_state_syncs already contains an entry for the sync hash? - // Does it get overwritten? Are we guaranteed that the existing - // entry contains the same data? let (state_sync, shards_to_split, blocks_catch_up_state) = self.catchup_state_syncs.entry(sync_hash).or_insert_with(|| { notify_state_sync = true; @@ -2516,8 +2512,6 @@ impl Client { .iter() .filter_map(|ShardInfo(shard_id, _)| self.should_split_shard(shard_id, me, prev_hash)) .collect(); - // For colour decorators to work, they need to printed directly. Otherwise the decorators get escaped, garble output and don't add colours. - debug!(target: "catchup", progress_per_shard = ?format_shard_sync_phase_per_shard(&shards_to_split, false), "Need to split states for shards"); Ok(shards_to_split) } diff --git a/chain/client/src/sync_jobs_actor.rs b/chain/client/src/sync_jobs_actor.rs index 55b1e584382..2b36b3f1231 100644 --- a/chain/client/src/sync_jobs_actor.rs +++ b/chain/client/src/sync_jobs_actor.rs @@ -13,9 +13,6 @@ use near_primitives::state_part::PartId; use near_primitives::state_sync::StatePartKey; use near_primitives::types::ShardId; use near_store::DBCol; -use std::time::Duration; - -const RESHARDING_RETRY_TIME: Duration = Duration::from_secs(30); pub(crate) struct SyncJobsActor { pub(crate) client_addr: actix::Addr, @@ -163,8 +160,9 @@ impl actix::Handler> for SyncJobsActor { // Actix implementation let's us send message to ourselves with a delay. // In case snapshots are not ready yet, we will retry resharding later. tracing::debug!(target: "client", ?state_split_request, "Snapshot missing, retrying resharding later"); - state_split_request.curr_poll_time += RESHARDING_RETRY_TIME; - context.notify_later(state_split_request.with_span_context(), RESHARDING_RETRY_TIME); + let retry_delay = state_split_request.config.retry_delay; + state_split_request.curr_poll_time += retry_delay; + context.notify_later(state_split_request.with_span_context(), retry_delay); } else { tracing::debug!(target: "client", ?state_split_request, "Starting resharding"); let response = Chain::build_state_for_split_shards(state_split_request); diff --git a/core/chain-configs/src/client_config.rs b/core/chain-configs/src/client_config.rs index 28299ab54af..8b7a639685e 100644 --- a/core/chain-configs/src/client_config.rs +++ b/core/chain-configs/src/client_config.rs @@ -168,17 +168,26 @@ pub struct StateSplitConfig { /// decreased if resharding is consuming too many resources and interfering /// with regular node operation. pub batch_size: bytesize::ByteSize, + /// The delay between writing batches to the db. The batch delay can be /// increased if resharding is consuming too many resources and interfering /// with regular node operation. pub batch_delay: Duration, + + /// The delay between attempts to start resharding while waiting for the + /// state snapshot to become available. + pub retry_delay: Duration, } impl Default for StateSplitConfig { fn default() -> Self { // Conservative default for a slower resharding that puts as little // extra load on the node as possible. - Self { batch_size: bytesize::ByteSize::kb(500), batch_delay: Duration::from_millis(100) } + Self { + batch_size: bytesize::ByteSize::kb(500), + batch_delay: Duration::from_millis(100), + retry_delay: Duration::from_secs(10), + } } } diff --git a/nightly/pytest-sanity.txt b/nightly/pytest-sanity.txt index c26e00b633c..f23f8204bb1 100644 --- a/nightly/pytest-sanity.txt +++ b/nightly/pytest-sanity.txt @@ -146,3 +146,7 @@ pytest sanity/meta_tx.py --features nightly # Tests for split storage and split storage migration pytest --timeout=600 sanity/split_storage.py pytest --timeout=600 sanity/split_storage.py --features nightly + +# Test for resharding +pytest --timeout=120 sanity/resharding.py +pytest --timeout=120 sanity/resharding.py --features nightly diff --git a/pytest/lib/resharding_lib.py b/pytest/lib/resharding_lib.py new file mode 100644 index 00000000000..ff2dc471370 --- /dev/null +++ b/pytest/lib/resharding_lib.py @@ -0,0 +1,115 @@ +# A library with the common constants and functions for testing resharding. + +V1_PROTOCOL_VERSION = 48 +V2_PROTOCOL_VERSION = 135 + +V0_SHARD_LAYOUT = { + "V0": { + "num_shards": 1, + "version": 0 + }, +} +V1_SHARD_LAYOUT = { + "V1": { + "boundary_accounts": [ + "aurora", "aurora-0", "kkuuue2akv_1630967379.near" + ], + "shards_split_map": [[0, 1, 2, 3]], + "to_parent_shard_map": [0, 0, 0, 0], + "version": 1 + } +} + + +# Append the genesis config changes that are required for testing resharding. +# This method will set the protocol version, shard layout and a few other +# configs so that it matches the protocol configuration as of right before the +# protocol version of the binary under test. +def append_shard_layout_config_changes( + genesis_config_changes, + binary_protocol_version, + logger=None, +): + genesis_config_changes.append(["use_production_config", True]) + + if binary_protocol_version >= V2_PROTOCOL_VERSION: + if logger: + logger.info("Testing migration from V1 to V2.") + # Set the initial protocol version to a version just before V2. + genesis_config_changes.append([ + "protocol_version", + V2_PROTOCOL_VERSION - 1, + ]) + genesis_config_changes.append([ + "shard_layout", + V1_SHARD_LAYOUT, + ]) + genesis_config_changes.append([ + "num_block_producer_seats_per_shard", + [1, 1, 1, 1], + ]) + genesis_config_changes.append([ + "avg_hidden_validator_seats_per_shard", + [0, 0, 0, 0], + ]) + return + + if binary_protocol_version >= V1_PROTOCOL_VERSION: + if logger: + logger.info("Testing migration from V0 to V1.") + # Set the initial protocol version to a version just before V1. + genesis_config_changes.append([ + "protocol_version", + V1_PROTOCOL_VERSION - 1, + ]) + genesis_config_changes.append([ + "shard_layout", + V0_SHARD_LAYOUT, + ]) + genesis_config_changes.append([ + "num_block_producer_seats_per_shard", + [100], + ]) + genesis_config_changes.append([ + "avg_hidden_validator_seats_per_shard", + [0], + ]) + return + + assert False + + +def get_genesis_shard_layout_version(binary_protocol_version): + if binary_protocol_version >= V2_PROTOCOL_VERSION: + return 1 + if binary_protocol_version >= V1_PROTOCOL_VERSION: + return 0 + + assert False + + +def get_target_shard_layout_version(binary_protocol_version): + if binary_protocol_version >= V2_PROTOCOL_VERSION: + return 2 + if binary_protocol_version >= V1_PROTOCOL_VERSION: + return 1 + + assert False + + +def get_genesis_num_shards(binary_protocol_version): + if binary_protocol_version >= V2_PROTOCOL_VERSION: + return 4 + if binary_protocol_version >= V1_PROTOCOL_VERSION: + return 1 + + assert False + + +def get_target_num_shards(binary_protocol_version): + if binary_protocol_version >= V2_PROTOCOL_VERSION: + return 5 + if binary_protocol_version >= V1_PROTOCOL_VERSION: + return 4 + + assert False diff --git a/pytest/tests/sanity/resharding.py b/pytest/tests/sanity/resharding.py new file mode 100644 index 00000000000..e9516a83ba8 --- /dev/null +++ b/pytest/tests/sanity/resharding.py @@ -0,0 +1,132 @@ +#!/usr/bin/env python3 Small test for resharding. Spins up a few nodes from +# genesis with the previous shard layout, waits for a few epochs and verifies +# that the shard layout is upgraded. +# Usage: +# python3 pytest/tests/sanity/resharding.py + +import unittest +import sys +import pathlib + +sys.path.append(str(pathlib.Path(__file__).resolve().parents[2] / 'lib')) + +from configured_logger import logger +from cluster import get_binary_protocol_version, init_cluster, load_config, spin_up_node +from utils import MetricsTracker, poll_blocks, wait_for_blocks +from resharding_lib import append_shard_layout_config_changes, get_genesis_num_shards, get_genesis_shard_layout_version, get_target_num_shards, get_target_shard_layout_version + + +class ReshardingTest(unittest.TestCase): + + def setUp(self) -> None: + self.epoch_length = 5 + self.config = load_config() + self.binary_protocol_version = get_binary_protocol_version(self.config) + assert self.binary_protocol_version is not None + + self.genesis_shard_layout_version = get_genesis_shard_layout_version( + self.binary_protocol_version) + self.target_shard_layout_version = get_target_shard_layout_version( + self.binary_protocol_version) + + self.genesis_num_shards = get_genesis_num_shards( + self.binary_protocol_version) + self.target_num_shards = get_target_num_shards( + self.binary_protocol_version) + + def __get_genesis_config_changes(self): + genesis_config_changes = [ + ["epoch_length", self.epoch_length], + ] + + append_shard_layout_config_changes( + genesis_config_changes, + self.binary_protocol_version, + logger, + ) + + return genesis_config_changes + + def __get_client_config_changes(self, num_nodes): + single = { + "tracked_shards": [0], + "state_split_config": { + "batch_size": 1000000, + # don't throttle resharding + "batch_delay": { + "secs": 0, + "nanos": 0, + }, + # retry often to start resharding as fast as possible + "retry_delay": { + "secs": 0, + "nanos": 500_000_000 + } + } + } + return {i: single for i in range(num_nodes)} + + def test_resharding(self): + logger.info("The resharding test is starting.") + num_nodes = 2 + + genesis_config_changes = self.__get_genesis_config_changes() + client_config_changes = self.__get_client_config_changes(num_nodes) + + near_root, [node0_dir, node1_dir] = init_cluster( + num_nodes=num_nodes, + num_observers=0, + num_shards=1, + config=self.config, + genesis_config_changes=genesis_config_changes, + client_config_changes=client_config_changes, + ) + + node0 = spin_up_node( + self.config, + near_root, + node0_dir, + 0, + ) + node1 = spin_up_node( + self.config, + near_root, + node1_dir, + 1, + boot_node=node0, + ) + + metrics_tracker = MetricsTracker(node0) + + for height, _ in poll_blocks(node0): + version = metrics_tracker.get_int_metric_value( + "near_shard_layout_version") + num_shards = metrics_tracker.get_int_metric_value( + "near_shard_layout_num_shards") + + logger.info( + f"#{height} shard layout version: {version}, num shards: {num_shards} " + ) + + # This may be flaky - it shouldn't - but it may. We collect metrics + # after the block is processed. If there is some delay the shard + # layout may change and the assertions below will fail. + + if height <= 2 * self.epoch_length + 1: + self.assertEqual(version, self.genesis_shard_layout_version) + self.assertEqual(num_shards, self.genesis_num_shards) + else: + self.assertEqual(version, self.target_shard_layout_version) + self.assertEqual(num_shards, self.target_num_shards) + + if height >= 4 * self.epoch_length: + break + + node0.kill() + node1.kill() + + logger.info("The resharding test is finished.") + + +if __name__ == '__main__': + unittest.main() diff --git a/pytest/tests/sanity/state_sync_fail.py b/pytest/tests/sanity/state_sync_fail.py index 9ffa718b2a4..96e3a862758 100755 --- a/pytest/tests/sanity/state_sync_fail.py +++ b/pytest/tests/sanity/state_sync_fail.py @@ -17,89 +17,26 @@ from cluster import init_cluster, spin_up_node, load_config, get_binary_protocol_version from configured_logger import logger import requests +import resharding_lib import state_sync_lib import utils EPOCH_LENGTH = 20 START_AT_BLOCK = int(EPOCH_LENGTH * 2.5) -V1_PROTOCOL_VERSION = 48 -V2_PROTOCOL_VERSION = 135 - -V0_SHARD_LAYOUT = {"V0": {"num_shards": 1, "version": 0}} -V1_SHARD_LAYOUT = { - "V1": { - "boundary_accounts": [ - "aurora", "aurora-0", "kkuuue2akv_1630967379.near" - ], - "shards_split_map": [[0, 1, 2, 3]], - "to_parent_shard_map": [0, 0, 0, 0], - "version": 1 - } -} - - -def append_shard_layout_config_changes( - binary_protocol_version, - genesis_config_changes, -): - if binary_protocol_version >= V2_PROTOCOL_VERSION: - logger.info("Testing migration from V1 to V2.") - # Set the initial protocol version to a version just before V2. - genesis_config_changes.append([ - "protocol_version", - V2_PROTOCOL_VERSION - 1, - ]) - genesis_config_changes.append([ - "shard_layout", - V1_SHARD_LAYOUT, - ]) - genesis_config_changes.append([ - "num_block_producer_seats_per_shard", - [1, 1, 1, 1], - ]) - genesis_config_changes.append([ - "avg_hidden_validator_seats_per_shard", - [0, 0, 0, 0], - ]) - return - - if binary_protocol_version >= V1_PROTOCOL_VERSION: - logger.info("Testing migration from V0 to V1.") - # Set the initial protocol version to a version just before V1. - genesis_config_changes.append([ - "protocol_version", - V1_PROTOCOL_VERSION - 1, - ]) - genesis_config_changes.append([ - "shard_layout", - V0_SHARD_LAYOUT, - ]) - genesis_config_changes.append([ - "num_block_producer_seats_per_shard", - [100], - ]) - genesis_config_changes.append([ - "avg_hidden_validator_seats_per_shard", - [0], - ]) - return - - assert False - def get_genesis_config_changes(binary_protocol_version): genesis_config_changes = [ ["min_gas_price", 0], ["max_inflation_rate", [0, 1]], ["epoch_length", EPOCH_LENGTH], - ["use_production_config", True], ["block_producer_kickout_threshold", 80], ] - append_shard_layout_config_changes( - binary_protocol_version, + resharding_lib.append_shard_layout_config_changes( genesis_config_changes, + binary_protocol_version, + logger, ) return genesis_config_changes From e1ea81d39db05290a3c9694e52780b9f4b73c0b3 Mon Sep 17 00:00:00 2001 From: wacban Date: Mon, 13 Nov 2023 17:19:01 +0000 Subject: [PATCH 2/2] self code review --- pytest/tests/sanity/resharding.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/pytest/tests/sanity/resharding.py b/pytest/tests/sanity/resharding.py index e9516a83ba8..97af3b0765c 100644 --- a/pytest/tests/sanity/resharding.py +++ b/pytest/tests/sanity/resharding.py @@ -1,6 +1,8 @@ -#!/usr/bin/env python3 Small test for resharding. Spins up a few nodes from -# genesis with the previous shard layout, waits for a few epochs and verifies -# that the shard layout is upgraded. +#!/usr/bin/env python3 + +# Small test for resharding. Spins up a few nodes from genesis with the previous +# shard layout, waits for a few epochs and verifies that the shard layout is +# upgraded. # Usage: # python3 pytest/tests/sanity/resharding.py @@ -12,7 +14,7 @@ from configured_logger import logger from cluster import get_binary_protocol_version, init_cluster, load_config, spin_up_node -from utils import MetricsTracker, poll_blocks, wait_for_blocks +from utils import MetricsTracker, poll_blocks from resharding_lib import append_shard_layout_config_changes, get_genesis_num_shards, get_genesis_shard_layout_version, get_target_num_shards, get_target_shard_layout_version