Skip to content

Commit

Permalink
Revert "Fix candidate set address state handling (#1709)"
Browse files Browse the repository at this point in the history
This reverts commit 5424e1d.
  • Loading branch information
teor2345 authored Feb 23, 2021
1 parent 304d768 commit bc42772
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 424 deletions.
262 changes: 103 additions & 159 deletions zebra-network/src/address_book.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,44 +10,52 @@ use std::{
use chrono::{DateTime, Utc};
use tracing::Span;

use crate::{constants, types::MetaAddr, PeerAddrState};
use crate::{
constants,
types::{MetaAddr, PeerServices},
};

/// A database of peers, their advertised services, and information on when they
/// were last seen.
#[derive(Debug)]
pub struct AddressBook {
/// Each known peer address has a matching `MetaAddr`
by_addr: HashMap<SocketAddr, MetaAddr>,

/// The span for operations on this address book.
by_addr: HashMap<SocketAddr, (DateTime<Utc>, PeerServices)>,
by_time: BTreeSet<MetaAddr>,
span: Span,
}

#[allow(clippy::len_without_is_empty)]
impl AddressBook {
/// Construct an `AddressBook` with the given [`tracing::Span`].
pub fn new(span: Span) -> AddressBook {
let constructor_span = span.clone();
let _guard = constructor_span.enter();

let new_book = AddressBook {
AddressBook {
by_addr: HashMap::default(),
by_time: BTreeSet::default(),
span,
};

new_book.update_metrics();
new_book
}
}

/// Get the contents of `self` in random order with sanitized timestamps.
pub fn sanitized(&self) -> Vec<MetaAddr> {
use rand::seq::SliceRandom;
let _guard = self.span.enter();
let mut peers = self.peers().map(MetaAddr::sanitize).collect::<Vec<_>>();
peers.shuffle(&mut rand::thread_rng());
peers
}

/// Check consistency of the address book invariants or panic, doing work
/// quadratic in the address book size.
#[cfg(test)]
fn assert_consistency(&self) {
for (a, (t, s)) in self.by_addr.iter() {
for meta in self.by_time.iter().filter(|meta| meta.addr == *a) {
if meta.last_seen != *t || meta.services != *s {
panic!("meta {:?} is not {:?}, {:?}, {:?}", meta, a, t, s);
}
}
}
}

/// Returns true if the address book has an entry for `addr`.
pub fn contains_addr(&self, addr: &SocketAddr) -> bool {
let _guard = self.span.enter();
Expand All @@ -57,205 +65,131 @@ impl AddressBook {
/// Returns the entry corresponding to `addr`, or `None` if it does not exist.
pub fn get_by_addr(&self, addr: SocketAddr) -> Option<MetaAddr> {
let _guard = self.span.enter();
self.by_addr.get(&addr).cloned()
let (last_seen, services) = self.by_addr.get(&addr).cloned()?;
Some(MetaAddr {
addr,
last_seen,
services,
})
}

/// Add `new` to the address book, updating the previous entry if `new` is
/// more recent or discarding `new` if it is stale.
///
/// ## Note
///
/// All changes should go through `update` or `take`, to ensure accurate metrics.
pub fn update(&mut self, new: MetaAddr) {
let _guard = self.span.enter();
trace!(
?new,
total_peers = self.by_addr.len(),
recent_peers = self.recently_live_peers().count(),
data.total = self.by_time.len(),
data.recent = (self.by_time.len() - self.disconnected_peers().count()),
);
#[cfg(test)]
self.assert_consistency();

if let Some(prev) = self.get_by_addr(new.addr) {
if prev.last_seen > new.last_seen {
return;
} else {
self.by_time
.take(&prev)
.expect("cannot have by_addr entry without by_time entry");
}
}
self.by_time.insert(new);
self.by_addr.insert(new.addr, (new.last_seen, new.services));

self.by_addr.insert(new.addr, new);
self.update_metrics();
}

/// Removes the entry with `addr`, returning it if it exists
///
/// ## Note
///
/// All changes should go through `update` or `take`, to ensure accurate metrics.
fn take(&mut self, removed_addr: SocketAddr) -> Option<MetaAddr> {
let _guard = self.span.enter();
trace!(
?removed_addr,
total_peers = self.by_addr.len(),
recent_peers = self.recently_live_peers().count(),
);

if let Some(entry) = self.by_addr.remove(&removed_addr) {
self.update_metrics();
Some(entry)
} else {
None
}
#[cfg(test)]
self.assert_consistency();
}

/// Compute a cutoff time that can determine whether an entry
/// in an address book being updated with peer message timestamps
/// represents a likely-dead (or hung) peer, or a potentially-connected peer.
/// represents a known-disconnected peer or a potentially-connected peer.
///
/// [`constants::LIVE_PEER_DURATION`] represents the time interval in which
/// we should receive at least one message from a peer, or close the
/// connection. Therefore, if the last-seen timestamp is older than
/// [`constants::LIVE_PEER_DURATION`] ago, we know we should have
/// disconnected from it. Otherwise, we could potentially be connected to it.
fn liveness_cutoff_time() -> DateTime<Utc> {
/// we are guaranteed to receive at least one message from a peer or close
/// the connection. Therefore, if the last-seen timestamp is older than
/// [`constants::LIVE_PEER_DURATION`] ago, we know we must have disconnected
/// from it. Otherwise, we could potentially be connected to it.
fn cutoff_time() -> DateTime<Utc> {
// chrono uses signed durations while stdlib uses unsigned durations
use chrono::Duration as CD;
Utc::now() - CD::from_std(constants::LIVE_PEER_DURATION).unwrap()
}

/// Returns true if the given [`SocketAddr`] has recently sent us a message.
pub fn recently_live_addr(&self, addr: &SocketAddr) -> bool {
let _guard = self.span.enter();
match self.by_addr.get(addr) {
None => false,
// NeverAttempted, Failed, and AttemptPending peers should never be live
Some(peer) => {
peer.last_connection_state == PeerAddrState::Responded
&& peer.last_seen > AddressBook::liveness_cutoff_time()
}
/// Used for range bounds, see cutoff_time
fn cutoff_meta() -> MetaAddr {
use std::net::{IpAddr, Ipv4Addr};
MetaAddr {
last_seen: AddressBook::cutoff_time(),
// The ordering on MetaAddrs is newest-first, then arbitrary,
// so any fields will do here.
addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0),
services: PeerServices::default(),
}
}

/// Returns true if the given [`SocketAddr`] is pending a reconnection
/// attempt.
pub fn pending_reconnection_addr(&self, addr: &SocketAddr) -> bool {
/// Returns true if the given [`SocketAddr`] could potentially be connected
/// to a node feeding timestamps into this address book.
pub fn is_potentially_connected(&self, addr: &SocketAddr) -> bool {
let _guard = self.span.enter();
match self.by_addr.get(addr) {
None => false,
Some(peer) => peer.last_connection_state == PeerAddrState::AttemptPending,
Some((ref last_seen, _)) => last_seen > &AddressBook::cutoff_time(),
}
}

/// Returns true if the given [`SocketAddr`] might be connected to a node
/// feeding timestamps into this address book.
pub fn maybe_connected_addr(&self, addr: &SocketAddr) -> bool {
self.recently_live_addr(addr) || self.pending_reconnection_addr(addr)
}

/// Return an iterator over all peers.
///
/// Returns peers in reconnection attempt order, then recently live peers in
/// an arbitrary order.
/// Return an iterator over all peers, ordered from most recently seen to
/// least recently seen.
pub fn peers(&'_ self) -> impl Iterator<Item = MetaAddr> + '_ {
let _guard = self.span.enter();
self.reconnection_peers()
.chain(self.maybe_connected_peers())
}

/// Return an iterator over peers that are due for a reconnection attempt,
/// in reconnection attempt order.
pub fn reconnection_peers(&'_ self) -> impl Iterator<Item = MetaAddr> + '_ {
let _guard = self.span.enter();

// TODO: optimise, if needed, or get rid of older peers

// Skip live peers, and peers pending a reconnect attempt, then sort using BTreeSet
self.by_addr
.values()
.filter(move |peer| !self.maybe_connected_addr(&peer.addr))
.collect::<BTreeSet<_>>()
.into_iter()
.cloned()
self.by_time.iter().rev().cloned()
}

/// Return an iterator over all the peers in `state`, in arbitrary order.
pub fn state_peers(&'_ self, state: PeerAddrState) -> impl Iterator<Item = MetaAddr> + '_ {
/// Return an iterator over peers known to be disconnected, ordered from most
/// recently seen to least recently seen.
pub fn disconnected_peers(&'_ self) -> impl Iterator<Item = MetaAddr> + '_ {
let _guard = self.span.enter();
use std::ops::Bound::{Excluded, Unbounded};

self.by_addr
.values()
.filter(move |peer| peer.last_connection_state == state)
self.by_time
.range((Excluded(Self::cutoff_meta()), Unbounded))
.rev()
.cloned()
}

/// Return an iterator over peers that might be connected, in arbitrary
/// order.
pub fn maybe_connected_peers(&'_ self) -> impl Iterator<Item = MetaAddr> + '_ {
/// Return an iterator over peers that could potentially be connected, ordered from most
/// recently seen to least recently seen.
pub fn potentially_connected_peers(&'_ self) -> impl Iterator<Item = MetaAddr> + '_ {
let _guard = self.span.enter();
use std::ops::Bound::{Included, Unbounded};

self.by_addr
.values()
.filter(move |peer| self.maybe_connected_addr(&peer.addr))
self.by_time
.range((Unbounded, Included(Self::cutoff_meta())))
.rev()
.cloned()
}

/// Return an iterator over peers we've seen recently, in arbitrary order.
pub fn recently_live_peers(&'_ self) -> impl Iterator<Item = MetaAddr> + '_ {
let _guard = self.span.enter();

self.by_addr
.values()
.filter(move |peer| self.recently_live_addr(&peer.addr))
.cloned()
/// Returns an iterator that drains entries from the address book, removing
/// them in order from most recent to least recent.
pub fn drain_newest(&'_ mut self) -> impl Iterator<Item = MetaAddr> + '_ {
Drain {
book: self,
newest_first: true,
}
}

/// Returns an iterator that drains entries from the address book.
///
/// Removes entries in reconnection attempt then arbitrary order,
/// see [`peers`] for details.
pub fn drain(&'_ mut self) -> impl Iterator<Item = MetaAddr> + '_ {
Drain { book: self }
/// Returns an iterator that drains entries from the address book, removing
/// them in order from least recent to most recent.
pub fn drain_oldest(&'_ mut self) -> impl Iterator<Item = MetaAddr> + '_ {
Drain {
book: self,
newest_first: false,
}
}

/// Returns the number of entries in this address book.
pub fn len(&self) -> usize {
self.by_addr.len()
}

/// Update the metrics for this address book.
fn update_metrics(&self) {
let _guard = self.span.enter();

let responded = self.state_peers(PeerAddrState::Responded).count();
let never_attempted = self.state_peers(PeerAddrState::NeverAttempted).count();
let failed = self.state_peers(PeerAddrState::Failed).count();
let pending = self.state_peers(PeerAddrState::AttemptPending).count();

let recently_live = self.recently_live_peers().count();
let recently_stopped_responding = responded
.checked_sub(recently_live)
.expect("all recently live peers must have responded");

// TODO: rename to address_book.responded.recently_live
metrics::gauge!("candidate_set.recently_live", recently_live as f64);
// TODO: rename to address_book.responded.stopped_responding
metrics::gauge!(
"candidate_set.disconnected",
recently_stopped_responding as f64
);

// TODO: rename to address_book.[state_name]
metrics::gauge!("candidate_set.responded", responded as f64);
metrics::gauge!("candidate_set.gossiped", never_attempted as f64);
metrics::gauge!("candidate_set.failed", failed as f64);
metrics::gauge!("candidate_set.pending", pending as f64);

debug!(
%recently_live,
%recently_stopped_responding,
%responded,
%never_attempted,
%failed,
%pending,
"address book peers"
);
self.by_time.len()
}
}

Expand All @@ -272,13 +206,23 @@ impl Extend<MetaAddr> for AddressBook {

struct Drain<'a> {
book: &'a mut AddressBook,
newest_first: bool,
}

impl<'a> Iterator for Drain<'a> {
type Item = MetaAddr;

fn next(&mut self) -> Option<Self::Item> {
let next_item_addr = self.book.peers().next()?.addr;
self.book.take(next_item_addr)
let next_item = if self.newest_first {
*self.book.by_time.iter().next()?
} else {
*self.book.by_time.iter().rev().next()?
};
self.book.by_time.remove(&next_item);
self.book
.by_addr
.remove(&next_item.addr)
.expect("cannot have by_time entry without by_addr entry");
Some(next_item)
}
}
1 change: 0 additions & 1 deletion zebra-network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ pub use crate::{
address_book::AddressBook,
config::Config,
isolated::connect_isolated,
meta_addr::PeerAddrState,
peer_set::init,
policies::{RetryErrors, RetryLimit},
protocol::internal::{Request, Response},
Expand Down
Loading

0 comments on commit bc42772

Please sign in to comment.