Skip to content

Commit

Permalink
feat(resharding): dynamic config (#10290)
Browse files Browse the repository at this point in the history
make the resharding config dynamic
  • Loading branch information
wacban authored Dec 4, 2023
1 parent bc3590c commit 71d0ea8
Show file tree
Hide file tree
Showing 12 changed files with 66 additions and 25 deletions.
9 changes: 6 additions & 3 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use chrono::Duration;
use crossbeam_channel::{unbounded, Receiver, Sender};
use itertools::Itertools;
use lru::LruCache;
use near_chain_configs::StateSplitConfig;
use near_chain_configs::{MutableConfigValue, StateSplitConfig};
#[cfg(feature = "new_epoch_sync")]
use near_chain_primitives::error::epoch_sync::EpochSyncInfoError;
use near_chain_primitives::error::{BlockKnownError, Error, LogTransientStorageError};
Expand Down Expand Up @@ -502,7 +502,7 @@ pub struct Chain {
/// A callback to initiate state snapshot.
snapshot_callbacks: Option<SnapshotCallbacks>,

pub(crate) state_split_config: near_chain_configs::StateSplitConfig,
pub(crate) state_split_config: MutableConfigValue<near_chain_configs::StateSplitConfig>,
}

impl Drop for Chain {
Expand Down Expand Up @@ -595,7 +595,10 @@ impl Chain {
pending_state_patch: Default::default(),
requested_state_parts: StateRequestTracker::new(),
snapshot_callbacks: None,
state_split_config: StateSplitConfig::default(),
state_split_config: MutableConfigValue::new(
StateSplitConfig::default(),
"state_split_config",
),
})
}

Expand Down
8 changes: 5 additions & 3 deletions chain/chain/src/resharding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::metrics::{
};
use crate::Chain;
use itertools::Itertools;
use near_chain_configs::StateSplitConfig;
use near_chain_configs::{MutableConfigValue, StateSplitConfig};
use near_chain_primitives::error::Error;
use near_primitives::errors::StorageError::StorageInconsistentState;
use near_primitives::hash::CryptoHash;
Expand Down Expand Up @@ -52,7 +52,7 @@ pub struct StateSplitRequest {
// Time we've spent polling for the state snapshot to be ready. We autofail after a certain time.
pub curr_poll_time: Duration,
// Configuration for resharding. Can be used to throttle resharding if needed.
pub config: StateSplitConfig,
pub config: MutableConfigValue<StateSplitConfig>,
}

// Skip `runtime_adapter`, because it's a complex object that has complex logic
Expand Down Expand Up @@ -221,7 +221,7 @@ impl Chain {
state_root,
next_epoch_shard_layout,
curr_poll_time: Duration::ZERO,
config: self.state_split_config,
config: self.state_split_config.clone(),
});

RESHARDING_STATUS
Expand All @@ -235,6 +235,7 @@ impl Chain {
pub fn retry_build_state_for_split_shards(state_split_request: &StateSplitRequest) -> bool {
let StateSplitRequest { tries, prev_prev_hash, curr_poll_time, config, .. } =
state_split_request;
let config = config.get();

// Do not retry if we have spent more than max_poll_time
// The error would be caught in build_state_for_split_shards and propagated to client actor
Expand Down Expand Up @@ -290,6 +291,7 @@ impl Chain {
config,
..
} = state_split_request;
let config = config.get();

tracing::debug!(target: "resharding", ?config, ?shard_uid, "build_state_for_split_shards_impl starting");

Expand Down
8 changes: 6 additions & 2 deletions chain/chain/src/tests/simple_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,15 @@ fn build_chain() {

mock_clock_guard.add_utc(timestamp(0, 0, 3, 444));
mock_clock_guard.add_utc(timestamp(0, 0, 0, 0)); // Client startup timestamp.
mock_clock_guard.add_utc(timestamp(0, 0, 0, 0)); // Client startup timestamp.
mock_clock_guard.add_instant(Instant::now());

// this step may fail when adding a new dynamic config
// the dynamic config uses the static clock to update metrics
// for every new field one extra utc timestamp should be added
let (mut chain, _, _, signer) = setup();

assert_eq!(mock_clock_guard.utc_call_count(), 2);
assert_eq!(mock_clock_guard.utc_call_count(), 3);
assert_eq!(mock_clock_guard.instant_call_count(), 1);
assert_eq!(chain.head().unwrap().height, 0);

Expand Down Expand Up @@ -72,7 +76,7 @@ fn build_chain() {
assert_eq!(chain.head().unwrap().height, i as u64);
}

assert_eq!(mock_clock_guard.utc_call_count(), 10);
assert_eq!(mock_clock_guard.utc_call_count(), 11);
assert_eq!(mock_clock_guard.instant_call_count(), 17);
assert_eq!(chain.head().unwrap().height, 4);

Expand Down
9 changes: 7 additions & 2 deletions chain/chain/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::collections::HashMap;
use borsh::{BorshDeserialize, BorshSerialize};
use chrono::DateTime;
use chrono::Utc;
use near_chain_configs::MutableConfigValue;
use near_chain_configs::StateSplitConfig;
use near_primitives::sandbox::state_patch::SandboxStatePatch;
use near_store::flat::FlatStorageManager;
Expand Down Expand Up @@ -211,15 +212,19 @@ pub struct ChainConfig {
/// Number of threads to execute background migration work.
/// Currently used for flat storage background creation.
pub background_migration_threads: usize,
pub state_split_config: StateSplitConfig,
/// The resharding configuration.
pub state_split_config: MutableConfigValue<StateSplitConfig>,
}

impl ChainConfig {
pub fn test() -> Self {
Self {
save_trie_changes: true,
background_migration_threads: 1,
state_split_config: StateSplitConfig::default(),
state_split_config: MutableConfigValue::new(
StateSplitConfig::default(),
"state_split_config",
),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ impl Client {
let chain_config = ChainConfig {
save_trie_changes: config.save_trie_changes,
background_migration_threads: config.client_background_migration_threads,
state_split_config: config.state_split_config,
state_split_config: config.state_split_config.clone(),
};
let chain = Chain::new(
epoch_manager.clone(),
Expand Down
5 changes: 3 additions & 2 deletions chain/client/src/sync_jobs_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,10 @@ impl actix::Handler<WithSpanContext<StateSplitRequest>> for SyncJobsActor {
context: &mut Self::Context,
) -> Self::Result {
let (_span, mut state_split_request) = handler_debug_span!(target: "resharding", msg);
let config = state_split_request.config.get();

// Wait for the initial delay. It should only be used in tests.
let initial_delay = state_split_request.config.initial_delay;
let initial_delay = config.initial_delay;
if state_split_request.curr_poll_time == Duration::ZERO && initial_delay > Duration::ZERO {
tracing::debug!(target: "resharding", ?state_split_request, ?initial_delay, "Waiting for the initial delay");
state_split_request.curr_poll_time += initial_delay;
Expand All @@ -170,7 +171,7 @@ impl actix::Handler<WithSpanContext<StateSplitRequest>> for SyncJobsActor {
if Chain::retry_build_state_for_split_shards(&state_split_request) {
// Actix implementation let's us send message to ourselves with a delay.
// In case snapshots are not ready yet, we will retry resharding later.
let retry_delay = state_split_request.config.retry_delay;
let retry_delay = config.retry_delay;
tracing::debug!(target: "resharding", ?state_split_request, ?retry_delay, "Snapshot missing, retrying resharding later");
state_split_request.curr_poll_time += retry_delay;
context.notify_later(state_split_request.with_span_context(), retry_delay);
Expand Down
17 changes: 13 additions & 4 deletions chain/client/src/test_utils/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use near_chain::state_snapshot_actor::SnapshotCallbacks;
use near_chain::test_utils::{KeyValueRuntime, MockEpochManager, ValidatorSchedule};
use near_chain::types::{ChainConfig, RuntimeAdapter};
use near_chain::{Chain, ChainGenesis, DoomslugThresholdMode};
use near_chain_configs::{ClientConfig, StateSplitConfig};
use near_chain_configs::{ClientConfig, MutableConfigValue, StateSplitConfig};
use near_chunks::adapter::ShardsManagerRequestFromClient;
use near_chunks::client::ShardsManagerResponse;
use near_chunks::shards_manager_actor::start_shards_manager;
Expand Down Expand Up @@ -114,7 +114,10 @@ pub fn setup(
ChainConfig {
save_trie_changes: true,
background_migration_threads: 1,
state_split_config: StateSplitConfig::default(),
state_split_config: MutableConfigValue::new(
StateSplitConfig::default(),
"state_split_config",
),
},
None,
)
Expand Down Expand Up @@ -239,7 +242,10 @@ pub fn setup_only_view(
ChainConfig {
save_trie_changes: true,
background_migration_threads: 1,
state_split_config: StateSplitConfig::default(),
state_split_config: MutableConfigValue::new(
StateSplitConfig::default(),
"state_split_config",
),
},
None,
)
Expand Down Expand Up @@ -1011,7 +1017,10 @@ pub fn setup_synchronous_shards_manager(
ChainConfig {
save_trie_changes: true,
background_migration_threads: 1,
state_split_config: StateSplitConfig::default(),
state_split_config: MutableConfigValue::new(
StateSplitConfig::default(),
"state_split_config",
),
}, // irrelevant
None,
)
Expand Down
9 changes: 6 additions & 3 deletions core/chain-configs/src/client_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ impl SyncConfig {
}
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Copy, Debug)]
#[derive(serde::Serialize, serde::Deserialize, Clone, Copy, Debug, PartialEq)]
#[serde(default)]
pub struct StateSplitConfig {
/// The soft limit on the size of a single batch. The batch size can be
Expand Down Expand Up @@ -301,7 +301,7 @@ pub struct ClientConfig {
// Allows more detailed logging, for example a list of orphaned blocks.
pub enable_multiline_logging: bool,
// Configuration for resharding.
pub state_split_config: StateSplitConfig,
pub state_split_config: MutableConfigValue<StateSplitConfig>,
/// If the node is not a chunk producer within that many blocks, then route
/// to upcoming chunk producers.
pub tx_routing_height_horizon: BlockHeightDelta,
Expand Down Expand Up @@ -379,7 +379,10 @@ impl ClientConfig {
state_sync: StateSyncConfig::default(),
transaction_pool_size_limit: None,
enable_multiline_logging: false,
state_split_config: StateSplitConfig::default(),
state_split_config: MutableConfigValue::new(
StateSplitConfig::default(),
"state_split_config",
),
tx_routing_height_horizon: 4,
}
}
Expand Down
7 changes: 6 additions & 1 deletion core/chain-configs/src/updateable_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ use serde::{Deserialize, Serialize, Serializer};
use std::fmt::Debug;
use std::sync::{Arc, Mutex};

use crate::StateSplitConfig;

/// A wrapper for a config value that can be updated while the node is running.
/// When initializing sub-objects (e.g. `ShardsManager`), please make sure to
/// pass this wrapper instead of passing a value from a single moment in time.
/// See `expected_shutdown` for an example how to use it.
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct MutableConfigValue<T> {
value: Arc<Mutex<T>>,
// For metrics.
Expand Down Expand Up @@ -88,4 +90,7 @@ impl<T: Copy + PartialEq + Debug> MutableConfigValue<T> {
pub struct UpdateableClientConfig {
/// Graceful shutdown at expected block height.
pub expected_shutdown: Option<BlockHeight>,

// Configuration for resharding.
pub state_split_config: StateSplitConfig,
}
5 changes: 4 additions & 1 deletion nearcore/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,10 @@ impl NearConfig {
state_sync: config.state_sync.unwrap_or_default(),
transaction_pool_size_limit: config.transaction_pool_size_limit,
enable_multiline_logging: config.enable_multiline_logging.unwrap_or(true),
state_split_config: config.state_split_config,
state_split_config: MutableConfigValue::new(
config.state_split_config,
"state_split_config",
),
tx_routing_height_horizon: config.tx_routing_height_horizon,
},
network_config: NetworkConfig::new(
Expand Down
5 changes: 4 additions & 1 deletion nearcore/src/dyn_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ pub fn read_updateable_configs(
pub fn get_updateable_client_config(config: Config) -> UpdateableClientConfig {
// All fields that can be updated while the node is running should be explicitly set here.
// Keep this list in-sync with `core/dyn-configs/README.md`.
UpdateableClientConfig { expected_shutdown: config.expected_shutdown }
UpdateableClientConfig {
expected_shutdown: config.expected_shutdown,
state_split_config: config.state_split_config,
}
}

fn read_log_config(home_dir: &Path) -> Result<Option<LogConfig>, UpdateableConfigLoaderError> {
Expand Down
7 changes: 5 additions & 2 deletions tools/speedy_sync/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use borsh::{BorshDeserialize, BorshSerialize};
use near_chain::types::{ChainConfig, Tip};
use near_chain::{Chain, ChainGenesis, DoomslugThresholdMode};
use near_chain_configs::{GenesisValidationMode, StateSplitConfig};
use near_chain_configs::{GenesisValidationMode, MutableConfigValue, StateSplitConfig};
use near_epoch_manager::shard_tracker::{ShardTracker, TrackedConfig};
use near_epoch_manager::types::EpochInfoAggregator;
use near_epoch_manager::EpochManager;
Expand Down Expand Up @@ -243,7 +243,10 @@ fn load_snapshot(load_cmd: LoadCmd) {
ChainConfig {
save_trie_changes: config.client_config.save_trie_changes,
background_migration_threads: 1,
state_split_config: StateSplitConfig::default(),
state_split_config: MutableConfigValue::new(
StateSplitConfig::default(),
"state_split_config",
),
},
None,
)
Expand Down

0 comments on commit 71d0ea8

Please sign in to comment.