Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(resharding): simple nayduck test for resharding #10162

Merged
merged 2 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2136,6 +2136,8 @@ impl Chain {
"start_process_block_impl",
height = block_height)
.entered();

tracing::debug!(target: "chain", "start process block");
// 0) Before we proceed with any further processing, we first check that the block
// hash and signature matches to make sure the block is indeed produced by the assigned
// block producer. If not, we drop the block immediately
Expand Down
6 changes: 0 additions & 6 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2386,10 +2386,6 @@ impl Client {
let state_sync_timeout = self.config.state_sync_timeout;
let epoch_id = self.chain.get_block(&sync_hash)?.header().epoch_id().clone();

// TODO(resharding) what happens to the shards_to_split here when
// catchup_state_syncs already contains an entry for the sync hash?
// Does it get overwritten? Are we guaranteed that the existing
// entry contains the same data?
Comment on lines -2389 to -2392
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checked that accidentaly. It's fine.

let (state_sync, shards_to_split, blocks_catch_up_state) =
self.catchup_state_syncs.entry(sync_hash).or_insert_with(|| {
notify_state_sync = true;
Expand Down Expand Up @@ -2516,8 +2512,6 @@ impl Client {
.iter()
.filter_map(|ShardInfo(shard_id, _)| self.should_split_shard(shard_id, me, prev_hash))
.collect();
// For colour decorators to work, they need to printed directly. Otherwise the decorators get escaped, garble output and don't add colours.
debug!(target: "catchup", progress_per_shard = ?format_shard_sync_phase_per_shard(&shards_to_split, false), "Need to split states for shards");
Comment on lines -2519 to -2520
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log line doesn't make much sense.

  • It is called multiple times, including after resharding is started but it doesn't actually know about the current status of it.
  • There is a different log line just a bit later that prints the current status anyway.

Ok(shards_to_split)
}

Expand Down
8 changes: 3 additions & 5 deletions chain/client/src/sync_jobs_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ use near_primitives::state_part::PartId;
use near_primitives::state_sync::StatePartKey;
use near_primitives::types::ShardId;
use near_store::DBCol;
use std::time::Duration;

const RESHARDING_RETRY_TIME: Duration = Duration::from_secs(30);

pub(crate) struct SyncJobsActor {
pub(crate) client_addr: actix::Addr<ClientActor>,
Expand Down Expand Up @@ -163,8 +160,9 @@ impl actix::Handler<WithSpanContext<StateSplitRequest>> for SyncJobsActor {
// Actix implementation let's us send message to ourselves with a delay.
// In case snapshots are not ready yet, we will retry resharding later.
tracing::debug!(target: "client", ?state_split_request, "Snapshot missing, retrying resharding later");
state_split_request.curr_poll_time += RESHARDING_RETRY_TIME;
context.notify_later(state_split_request.with_span_context(), RESHARDING_RETRY_TIME);
let retry_delay = state_split_request.config.retry_delay;
state_split_request.curr_poll_time += retry_delay;
context.notify_later(state_split_request.with_span_context(), retry_delay);
} else {
tracing::debug!(target: "client", ?state_split_request, "Starting resharding");
let response = Chain::build_state_for_split_shards(state_split_request);
Expand Down
11 changes: 10 additions & 1 deletion core/chain-configs/src/client_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,17 +168,26 @@ pub struct StateSplitConfig {
/// decreased if resharding is consuming too many resources and interfering
/// with regular node operation.
pub batch_size: bytesize::ByteSize,

/// The delay between writing batches to the db. The batch delay can be
/// increased if resharding is consuming too many resources and interfering
/// with regular node operation.
pub batch_delay: Duration,

/// The delay between attempts to start resharding while waiting for the
/// state snapshot to become available.
pub retry_delay: Duration,
}

impl Default for StateSplitConfig {
fn default() -> Self {
// Conservative default for a slower resharding that puts as little
// extra load on the node as possible.
Self { batch_size: bytesize::ByteSize::kb(500), batch_delay: Duration::from_millis(100) }
Self {
batch_size: bytesize::ByteSize::kb(500),
batch_delay: Duration::from_millis(100),
retry_delay: Duration::from_secs(10),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

30 sec?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine with any value you find appropriate. I don't think it's going to matter too much as long as it's in the domain of a couple of seconds.

}
}
}

Expand Down
4 changes: 4 additions & 0 deletions nightly/pytest-sanity.txt
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,7 @@ pytest sanity/meta_tx.py --features nightly
# Tests for split storage and split storage migration
pytest --timeout=600 sanity/split_storage.py
pytest --timeout=600 sanity/split_storage.py --features nightly

# Test for resharding
pytest --timeout=120 sanity/resharding.py
pytest --timeout=120 sanity/resharding.py --features nightly
115 changes: 115 additions & 0 deletions pytest/lib/resharding_lib.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# A library with the common constants and functions for testing resharding.

V1_PROTOCOL_VERSION = 48
V2_PROTOCOL_VERSION = 135

V0_SHARD_LAYOUT = {
"V0": {
"num_shards": 1,
"version": 0
},
}
V1_SHARD_LAYOUT = {
"V1": {
"boundary_accounts": [
"aurora", "aurora-0", "kkuuue2akv_1630967379.near"
],
"shards_split_map": [[0, 1, 2, 3]],
"to_parent_shard_map": [0, 0, 0, 0],
"version": 1
}
}


# Append the genesis config changes that are required for testing resharding.
# This method will set the protocol version, shard layout and a few other
# configs so that it matches the protocol configuration as of right before the
# protocol version of the binary under test.
def append_shard_layout_config_changes(
genesis_config_changes,
binary_protocol_version,
logger=None,
):
genesis_config_changes.append(["use_production_config", True])

if binary_protocol_version >= V2_PROTOCOL_VERSION:
if logger:
logger.info("Testing migration from V1 to V2.")
# Set the initial protocol version to a version just before V2.
genesis_config_changes.append([
"protocol_version",
V2_PROTOCOL_VERSION - 1,
])
genesis_config_changes.append([
"shard_layout",
V1_SHARD_LAYOUT,
])
genesis_config_changes.append([
"num_block_producer_seats_per_shard",
[1, 1, 1, 1],
])
genesis_config_changes.append([
"avg_hidden_validator_seats_per_shard",
[0, 0, 0, 0],
])
return

if binary_protocol_version >= V1_PROTOCOL_VERSION:
if logger:
logger.info("Testing migration from V0 to V1.")
# Set the initial protocol version to a version just before V1.
genesis_config_changes.append([
"protocol_version",
V1_PROTOCOL_VERSION - 1,
])
genesis_config_changes.append([
"shard_layout",
V0_SHARD_LAYOUT,
])
genesis_config_changes.append([
"num_block_producer_seats_per_shard",
[100],
])
genesis_config_changes.append([
"avg_hidden_validator_seats_per_shard",
[0],
])
return

assert False


def get_genesis_shard_layout_version(binary_protocol_version):
if binary_protocol_version >= V2_PROTOCOL_VERSION:
return 1
if binary_protocol_version >= V1_PROTOCOL_VERSION:
return 0

assert False


def get_target_shard_layout_version(binary_protocol_version):
if binary_protocol_version >= V2_PROTOCOL_VERSION:
return 2
if binary_protocol_version >= V1_PROTOCOL_VERSION:
return 1

assert False


def get_genesis_num_shards(binary_protocol_version):
if binary_protocol_version >= V2_PROTOCOL_VERSION:
return 4
if binary_protocol_version >= V1_PROTOCOL_VERSION:
return 1

assert False


def get_target_num_shards(binary_protocol_version):
if binary_protocol_version >= V2_PROTOCOL_VERSION:
return 5
if binary_protocol_version >= V1_PROTOCOL_VERSION:
return 4

assert False
134 changes: 134 additions & 0 deletions pytest/tests/sanity/resharding.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
#!/usr/bin/env python3

# Small test for resharding. Spins up a few nodes from genesis with the previous
# shard layout, waits for a few epochs and verifies that the shard layout is
# upgraded.
# Usage:
# python3 pytest/tests/sanity/resharding.py

import unittest
import sys
import pathlib

sys.path.append(str(pathlib.Path(__file__).resolve().parents[2] / 'lib'))

from configured_logger import logger
from cluster import get_binary_protocol_version, init_cluster, load_config, spin_up_node
from utils import MetricsTracker, poll_blocks
from resharding_lib import append_shard_layout_config_changes, get_genesis_num_shards, get_genesis_shard_layout_version, get_target_num_shards, get_target_shard_layout_version


class ReshardingTest(unittest.TestCase):

def setUp(self) -> None:
self.epoch_length = 5
self.config = load_config()
self.binary_protocol_version = get_binary_protocol_version(self.config)
assert self.binary_protocol_version is not None

self.genesis_shard_layout_version = get_genesis_shard_layout_version(
self.binary_protocol_version)
self.target_shard_layout_version = get_target_shard_layout_version(
self.binary_protocol_version)

self.genesis_num_shards = get_genesis_num_shards(
self.binary_protocol_version)
self.target_num_shards = get_target_num_shards(
self.binary_protocol_version)

def __get_genesis_config_changes(self):
genesis_config_changes = [
["epoch_length", self.epoch_length],
]

append_shard_layout_config_changes(
genesis_config_changes,
self.binary_protocol_version,
logger,
)

return genesis_config_changes

def __get_client_config_changes(self, num_nodes):
single = {
"tracked_shards": [0],
"state_split_config": {
"batch_size": 1000000,
# don't throttle resharding
"batch_delay": {
"secs": 0,
"nanos": 0,
},
# retry often to start resharding as fast as possible
"retry_delay": {
"secs": 0,
"nanos": 500_000_000
}
}
}
return {i: single for i in range(num_nodes)}

def test_resharding(self):
logger.info("The resharding test is starting.")
num_nodes = 2

genesis_config_changes = self.__get_genesis_config_changes()
client_config_changes = self.__get_client_config_changes(num_nodes)

near_root, [node0_dir, node1_dir] = init_cluster(
num_nodes=num_nodes,
num_observers=0,
num_shards=1,
config=self.config,
genesis_config_changes=genesis_config_changes,
client_config_changes=client_config_changes,
)

node0 = spin_up_node(
self.config,
near_root,
node0_dir,
0,
)
node1 = spin_up_node(
self.config,
near_root,
node1_dir,
1,
boot_node=node0,
)

metrics_tracker = MetricsTracker(node0)

for height, _ in poll_blocks(node0):
version = metrics_tracker.get_int_metric_value(
"near_shard_layout_version")
num_shards = metrics_tracker.get_int_metric_value(
"near_shard_layout_num_shards")

logger.info(
f"#{height} shard layout version: {version}, num shards: {num_shards} "
)

# This may be flaky - it shouldn't - but it may. We collect metrics
# after the block is processed. If there is some delay the shard
# layout may change and the assertions below will fail.

if height <= 2 * self.epoch_length + 1:
self.assertEqual(version, self.genesis_shard_layout_version)
self.assertEqual(num_shards, self.genesis_num_shards)
else:
self.assertEqual(version, self.target_shard_layout_version)
self.assertEqual(num_shards, self.target_num_shards)

if height >= 4 * self.epoch_length:
break

node0.kill()
node1.kill()

logger.info("The resharding test is finished.")


if __name__ == '__main__':
unittest.main()
Loading
Loading