Skip to content

Commit

Permalink
fixed broadcasting AccountData after node restart. (near#8407)
Browse files Browse the repository at this point in the history
AccountData is broadcasted by validator node periodically (every 15min).
AccountData with greatest `version` field is considered the freshest.
We do not store the latest broadcasted version in storage, so each time node is restarted, is starts with version = 1, unless it learns the latest broadcasted version (from the previous execution) from the network.
It means that it may happen that the correct AccountData will be broadcasted only after 15min from restart.

This in turn means that the validator might be not reachable (messages cannot be routed to it) for 15min which is unacceptable. We fix that by making the validator node broadcast a new version of AccountData as soon as it learns about stale AccountData (but with higher version) from the network.

Alternative solutions considered are described in the accounts_data/mod.rs (see documentation of is_new() function).
  • Loading branch information
pompon0 authored and marcelo-gonzalez committed Jan 26, 2023
1 parent c9909ff commit 220bec3
Show file tree
Hide file tree
Showing 16 changed files with 524 additions and 151 deletions.
134 changes: 129 additions & 5 deletions chain/network/src/accounts_data/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
use crate::concurrency;
use crate::concurrency::arc_mutex::ArcMutex;
use crate::network_protocol;
use crate::network_protocol::SignedAccountData;
use crate::network_protocol::{AccountData, SignedAccountData, VersionedAccountData};
use crate::time;
use crate::types::AccountKeys;
use near_crypto::PublicKey;
use near_primitives::validator_signer::ValidatorSigner;
use rayon::iter::ParallelBridge;
use std::collections::HashMap;
use std::sync::Arc;
Expand All @@ -47,6 +49,17 @@ pub(crate) enum Error {
SingleAccountMultipleData,
}

/// Most up-to-date AccountData of this node and a signer
/// to sign it with when there is a need to override some
/// already signed data received from the network. See `Cache::set_local`
/// for more details.
#[derive(Clone)]
pub struct LocalData {
pub signer: Arc<dyn ValidatorSigner>,
pub data: Arc<AccountData>,
}

/// See module-level documentation.
#[derive(Clone)]
pub struct CacheSnapshot {
/// Map from account ID to account key.
Expand All @@ -62,24 +75,119 @@ pub struct CacheSnapshot {
/// as cache is collecting data only about the accounts from `keys`,
/// and data about the particular account might be not known at the given moment.
pub data: im::HashMap<PublicKey, Arc<SignedAccountData>>,

pub local: Option<LocalData>,
}

impl CacheSnapshot {
/// Checks if `(d.version,d.timestamp)` is newer (greater) than
/// `(old.version,old.timestamp)`, where `old` is the AccountData for
/// `d.account_key` already stored in the Cache.
/// It returns `false` in case `d.account_key` is not in `d.keys`,
/// because it means that `Cache` is not interested in these data at all.
///
/// Note that when the node is restarted, it forgets
/// which version it has signed last, so it will again start from version
/// 0, until it learns from the network about data it already signed in the
/// previous execution. It means that a node may sign 2 data with the exact same
/// version. To avoid an inconsistent state of the network (i.e. situation in which
/// some nodes will learn about one data with the given version, some about the other)
/// we introduce a tie breaker in a form of UTC timestamp of signing (note that here
/// we do not rely on clock monotonicity, which is not guaranteed for UTC clock,
/// just assume that timestamp collision is unlikely).
///
/// The alternatives to using a timestamp would be:
/// * adding a random_minor_version to AccountData, specifically to avoid collisions
/// (so we would be comparing `(version,random_minor_version)` instead)
/// * using some crypto hash function `h` and compare `(version,h(data))`. Assuming that `h`
/// behaves like a random oracle, the semantics will be equivaluent to
/// `random_minor_version`, except that if a node signs exactly the same data and in the
/// previous run, then there will be a collision. But in such a case it doesn't matter
/// since the data is the same.
/// * storing `version` of the last signed AccountData in persistent storage,
/// so that the node literally never signs AccountsData with colling versions.
/// This assumption is fragile as long as validators migrate their nodes without copying over
/// the whole storage.
fn is_new(&self, d: &SignedAccountData) -> bool {
self.keys.contains(&d.account_key)
&& match self.data.get(&d.account_key) {
Some(old) if old.version >= d.version => false,
Some(old) if (old.version, old.timestamp) >= (d.version, d.timestamp) => false,
_ => true,
}
}

fn try_insert(&mut self, d: Arc<SignedAccountData>) -> Option<Arc<SignedAccountData>> {
/// Inserts d into self.data, if
/// * `d.account_data` is in self.keys AND
/// * `d.version > self.data[d.account_data].version`.
/// If d would override local for this node, an AccountData based on `self.local` is signed
/// and inserted instead to rollback the overriding change (it can happen in case the node has
/// been restarted and we observe the old value emitted by the previous run).
/// It returns the newly inserted value (or None if nothing changed).
/// The returned value should be broadcasted to the network.
fn try_insert(
&mut self,
clock: &time::Clock,
d: Arc<SignedAccountData>,
) -> Option<Arc<SignedAccountData>> {
if !self.is_new(&d) {
return None;
}
let d = match &self.local {
Some(local) if d.account_key == local.signer.public_key() => Arc::new(
VersionedAccountData {
data: local.data.as_ref().clone(),
account_key: local.signer.public_key().clone(),
version: d.version + 1,
timestamp: clock.now_utc(),
}
.sign(local.signer.as_ref())
.unwrap(),
),
_ => d,
};
self.data.insert(d.account_key.clone(), d.clone());
Some(d)
}

/// Set the information about this node's account (i.e. AccountData for this node).
/// It should be called whenever AccountData for the current node changes. This function
/// is expected to be called periodically, even if AccountData doesn't change.
///
/// If `self.signer` is in `self.keys` then it means that this node is a TIER1 node and
/// the new AccountData should be broadcasted immediately - set_local() will return a value
/// to be broadcasted then. Even if the AccountData didn't change since the last call to
/// set_local(), a value to be broadcasted will be returned (just with newer timestamp).
/// It is important to tell the network that "yes, I'm still alive, my AccountData didn't
/// change" (eventually we might want to use the timestamp set expiration date on AccountData).
///
/// If `self.signer` is not in `self.keys` (this is not a TIER1 node), set_local() returns
/// None.
fn set_local(
&mut self,
clock: &time::Clock,
local: LocalData,
) -> Option<Arc<SignedAccountData>> {
let account_key = local.signer.public_key();
let result = match self.keys.contains(&account_key) {
false => None,
true => {
let d = Arc::new(
VersionedAccountData {
data: local.data.as_ref().clone(),
account_key: account_key.clone(),
version: self.data.get(&account_key).map_or(0, |d| d.version) + 1,
timestamp: clock.now_utc(),
}
.sign(local.signer.as_ref())
.unwrap(),
);
self.data.insert(account_key, d.clone());
Some(d)
}
};
self.local = Some(local);
result
}
}

pub(crate) struct Cache(ArcMutex<CacheSnapshot>);
Expand All @@ -90,12 +198,17 @@ impl Cache {
keys_by_id: Arc::new(AccountKeys::default()),
keys: im::HashSet::new(),
data: im::HashMap::new(),
local: None,
}))
}

/// Updates the set of important accounts and their public keys.
/// The AccountData which is no longer important is dropped.
/// Returns true iff the set of accounts actually changed.
/// TODO(gprusak): note that local data won't be generated, even if it could be
/// (i.e. in case self.local.signer was not present in the old key set, but is in the new)
/// so a call to set_local afterwards is required to do that. For now it is fine because
/// the Cache owner is expected to call set_local periodically anyway.
pub fn set_keys(&self, keys_by_id: Arc<AccountKeys>) -> bool {
self.0
.try_update(|mut inner| {
Expand All @@ -122,7 +235,6 @@ impl Cache {
) -> (Vec<Arc<SignedAccountData>>, Option<Error>) {
// Filter out non-interesting data, so that we never check signatures for valid non-interesting data.
// Bad peers may force us to check signatures for fake data anyway, but we will ban them after first invalid signature.
// It locks epochs for reading for a short period.
let mut new_data = HashMap::new();
let inner = self.0.load();
for d in data {
Expand Down Expand Up @@ -161,19 +273,31 @@ impl Cache {
(data, None)
}

pub fn set_local(
self: &Arc<Self>,
clock: &time::Clock,
local: LocalData,
) -> Option<Arc<SignedAccountData>> {
self.0.update(|mut inner| {
let data = inner.set_local(clock, local);
(data, inner)
})
}

/// Verifies the signatures and inserts verified data to the cache.
/// Returns the data inserted and optionally a verification error.
/// WriteLock is acquired only for the final update (after verification).
pub async fn insert(
self: &Arc<Self>,
clock: &time::Clock,
data: Vec<Arc<SignedAccountData>>,
) -> (Vec<Arc<SignedAccountData>>, Option<Error>) {
let this = self.clone();
// Execute verification on the rayon threadpool.
let (data, err) = this.verify(data).await;
// Insert the successfully verified data, even if an error has been encountered.
let inserted = self.0.update(|mut inner| {
let inserted = data.into_iter().filter_map(|d| inner.try_insert(d)).collect();
let inserted = data.into_iter().filter_map(|d| inner.try_insert(clock, d)).collect();
(inserted, inner)
});
// Return the inserted data.
Expand Down
Loading

0 comments on commit 220bec3

Please sign in to comment.