From 7aa2819c595e6bc93c63fa67e43fe8e558b749fc Mon Sep 17 00:00:00 2001 From: Mark Travis Date: Wed, 10 Jul 2024 17:30:05 -0700 Subject: [PATCH] Add ledger sequence to peer proposals. Track proposals by sequence number and use only peer proposals for the current working sequence number for consensus evaluation. --- src/test/csf/Peer.h | 1 + src/xrpld/app/consensus/RCLConsensus.cpp | 2 +- src/xrpld/app/consensus/RCLConsensus.h | 2 +- src/xrpld/consensus/Consensus.h | 199 ++++++++++++++++++++--- 4 files changed, 182 insertions(+), 22 deletions(-) diff --git a/src/test/csf/Peer.h b/src/test/csf/Peer.h index 95f60780714..2fd81c80301 100644 --- a/src/test/csf/Peer.h +++ b/src/test/csf/Peer.h @@ -161,6 +161,7 @@ struct Peer using PeerPosition_t = Position; using Result = ConsensusResult; using NodeKey = Validation::NodeKey; +// using clock_type = beast::abstract_clock; //! Logging support that prefixes messages with the peer ID beast::WrappedSink sink; diff --git a/src/xrpld/app/consensus/RCLConsensus.cpp b/src/xrpld/app/consensus/RCLConsensus.cpp index 1b3ae5c9a0b..e3553bf9003 100644 --- a/src/xrpld/app/consensus/RCLConsensus.cpp +++ b/src/xrpld/app/consensus/RCLConsensus.cpp @@ -55,7 +55,7 @@ RCLConsensus::RCLConsensus( LedgerMaster& ledgerMaster, LocalTxs& localTxs, InboundTransactions& inboundTransactions, - Consensus::clock_type const& clock, + Consensus::clock_type& clock, ValidatorKeys const& validatorKeys, beast::Journal journal) : adaptor_( diff --git a/src/xrpld/app/consensus/RCLConsensus.h b/src/xrpld/app/consensus/RCLConsensus.h index 893e5cf0847..850fb2d3b9f 100644 --- a/src/xrpld/app/consensus/RCLConsensus.h +++ b/src/xrpld/app/consensus/RCLConsensus.h @@ -421,7 +421,7 @@ class RCLConsensus LedgerMaster& ledgerMaster, LocalTxs& localTxs, InboundTransactions& inboundTransactions, - Consensus::clock_type const& clock, + Consensus::clock_type& clock, ValidatorKeys const& validatorKeys, beast::Journal journal); diff --git a/src/xrpld/consensus/Consensus.h b/src/xrpld/consensus/Consensus.h index 35d916c7543..35ee3ce242b 100644 --- a/src/xrpld/consensus/Consensus.h +++ b/src/xrpld/consensus/Consensus.h @@ -27,6 +27,7 @@ #include #include #include +#include #include #include #include @@ -35,6 +36,7 @@ #include #include #include +#include namespace ripple { @@ -335,7 +337,7 @@ class Consensus @param adaptor The instance of the adaptor class @param j The journal to log debug output */ - Consensus(clock_type const& clock, Adaptor& adaptor, beast::Journal j); + Consensus(clock_type& clock, Adaptor& adaptor, beast::Journal j); /** Kick-off the next round of consensus. @@ -580,7 +582,15 @@ class Consensus Ledger_t previousLedger_; // Transaction Sets, indexed by hash of transaction tree - hash_map acquired_; + using AcquiredType = beast::aged_unordered_map< + typename TxSet_t::ID, + const TxSet_t, + clock_type::clock_type, + beast::uhash<>>; + AcquiredType acquired_; + + // Tx sets that can be purged only once there is a new consensus round. + std::stack acquiredPurge_; std::optional result_; ConsensusCloseTimes rawCloseTimes_; @@ -592,8 +602,18 @@ class Consensus hash_map currPeerPositions_; // Recently received peer positions, available when transitioning between - // ledgers or rounds - hash_map> recentPeerPositions_; + // ledgers or rounds. Collected by ledger sequence. This allows us to + // know which positions are likely relevant to the ledger on which we are + // currently working. Also allows us to catch up faster if we fall behind + // the rest of the network since we won't need to re-aquire proposals + // and related transaction sets. + std::map> + recentPeerPositions_; + + // These are for peers not using code that adds a ledger sequence + // to the proposal message. TODO This should be removed eventually when + // the network fully upgrades. + hash_map> recentPeerPositionsLegacy_; // The number of proposers who participated in the last consensus round std::size_t prevProposers_ = 0; @@ -607,10 +627,10 @@ class Consensus template Consensus::Consensus( - clock_type const& clock, + clock_type& clock, Adaptor& adaptor, beast::Journal journal) - : adaptor_(adaptor), clock_(clock), j_{journal} + : adaptor_(adaptor), clock_(clock), acquired_(clock), j_{journal} { JLOG(j_.debug()) << "Creating consensus object"; } @@ -636,8 +656,21 @@ Consensus::startRound( prevCloseTime_ = rawCloseTimes_.self; } + // Clear positions that we know will not ever be necessary again. + auto it = recentPeerPositions_.begin(); + while (it != recentPeerPositions_.end() && it->first <= prevLedger.seq()) + it = recentPeerPositions_.erase(it); + // Get rid of untrusted positions for the current working ledger. + auto currentPositions = + recentPeerPositions_.find(prevLedger.seq() + typename Ledger_t::Seq{1}); + if (currentPositions != recentPeerPositions_.end()) + { + for (NodeID_t const& n : nowUntrusted) + currentPositions->second.erase(n); + } + for (NodeID_t const& n : nowUntrusted) - recentPeerPositions_.erase(n); + recentPeerPositionsLegacy_.erase(n); ConsensusMode startMode = proposing ? ConsensusMode::proposing : ConsensusMode::observing; @@ -679,8 +712,30 @@ Consensus::startRoundInternal( convergePercent_ = 0; haveCloseTimeConsensus_ = false; openTime_.reset(clock_.now()); - currPeerPositions_.clear(); - acquired_.clear(); + + // beast::aged_unordered_map::erase by key is broken and + // is not used anywhere in the existing codebase. + while (!acquiredPurge_.empty()) + { + auto found = acquired_.find(acquiredPurge_.top()); + if (found != acquired_.end()) + acquired_.erase(found); + acquiredPurge_.pop(); + } + for (auto it = currPeerPositions_.begin(); it != currPeerPositions_.end();) + { + if (auto found = acquired_.find(it->second.proposal().position()); + found != acquired_.end()) + { + acquired_.erase(found); + } + it = currPeerPositions_.erase(it); + } + + // Hold up to 30 minutes worth of acquired tx sets. This to help + // catch up quickly from extended de-sync periods. + beast::expire(acquired_, std::chrono::minutes(30)); + rawCloseTimes_.peers.clear(); rawCloseTimes_.self = {}; deadNodes_.clear(); @@ -708,14 +763,45 @@ Consensus::peerProposal( auto const& peerID = newPeerPos.proposal().nodeID(); // Always need to store recent positions + if (newPeerPos.proposal().ledgerSeq().has_value()) + { + // Ignore proposals from prior ledgers. + typename Ledger_t::Seq const& propLedgerSeq = + *newPeerPos.proposal().ledgerSeq(); + if (propLedgerSeq <= previousLedger_.seq()) + return false; + + auto& bySeq = recentPeerPositions_[propLedgerSeq]; + { + auto peerProp = bySeq.find(peerID); + if (peerProp == bySeq.end()) + { + bySeq.emplace(peerID, newPeerPos); + } + else + { + // Only store if it's the latest proposal from this peer for the + // consensus round in the proposal. + if (newPeerPos.proposal().proposeSeq() <= + peerProp->second.proposal().proposeSeq()) + { + return false; + } + peerProp->second = newPeerPos; + } + } + } + else { - auto& props = recentPeerPositions_[peerID]; + // legacy proposal with no ledger sequence + auto& props = recentPeerPositionsLegacy_[peerID]; if (props.size() >= 10) props.pop_front(); props.push_back(newPeerPos); } + return peerProposalInternal(now, newPeerPos); } @@ -725,10 +811,6 @@ Consensus::peerProposalInternal( NetClock::time_point const& now, PeerPosition_t const& newPeerPos) { - // Nothing to do for now if we are currently working on a ledger - if (phase_ == ConsensusPhase::accepted) - return false; - now_ = now; auto const& newPeerProp = newPeerPos.proposal(); @@ -737,6 +819,20 @@ Consensus::peerProposalInternal( { JLOG(j_.debug()) << "Got proposal for " << newPeerProp.prevLedger() << " but we are on " << prevLedgerID_; + + if (!acquired_.count(newPeerProp.position())) + { + // acquireTxSet will return the set if it is available, or + // spawn a request for it and return nullopt/nullptr. It will call + // gotTxSet once it arrives. If we're behind, this should save + // time when we catch up. + if (auto set = adaptor_.acquireTxSet(newPeerProp.position())) + gotTxSet(now_, *set); + else + JLOG(j_.debug()) << "Do not have tx set for peer"; + } + + // There's nothing else to do with this proposal currently. return false; } @@ -770,16 +866,39 @@ Consensus::peerProposalInternal( it.second.unVote(peerID); } if (peerPosIt != currPeerPositions_.end()) + { + // Remove from acquired_ or else it will consume space for + // awhile. beast::aged_unordered_map::erase by key is broken and + // is not used anywhere in the existing codebase. + if (auto found = + acquired_.find(peerPosIt->second.proposal().position()); + found != acquired_.end()) + { + acquiredPurge_.push( + peerPosIt->second.proposal().position()); + } currPeerPositions_.erase(peerID); + } deadNodes_.insert(peerID); return true; } if (peerPosIt != currPeerPositions_.end()) - peerPosIt->second = newPeerPos; + { + // Remove from acquired_ or else it will consume space for awhile. + // beast::aged_unordered_container::erase by key is broken and + // is not used anywhere in the existing codebase. + if (auto found = acquired_.find(newPeerPos.proposal().position()); + found != acquired_.end()) + { + acquiredPurge_.push(newPeerPos.proposal().position()); + } + } else + { currPeerPositions_.emplace(peerID, newPeerPos); + } } if (newPeerProp.isInitial()) @@ -805,9 +924,9 @@ Consensus::peerProposalInternal( else JLOG(j_.debug()) << "Don't have tx set for peer"; } - else if (result_) +// else if (result_) { - updateDisputes(newPeerProp.nodeID(), ait->second); +// updateDisputes(newPeerProp.nodeID(), ait->second); } } @@ -1025,8 +1144,18 @@ Consensus::handleWrongLedger(typename Ledger_t::ID const& lgrId) result_->disputes.clear(); result_->compares.clear(); } - - currPeerPositions_.clear(); + for (auto it = currPeerPositions_.begin(); + it != currPeerPositions_.end();) + { + // beast::aged_unordered_map::erase by key is broken and + // is not used anywhere in the existing codebase. + if (auto found = acquired_.find(it->second.proposal().position()); + found != acquired_.end()) + { + acquiredPurge_.push(it->second.proposal().position()); + } + it = currPeerPositions_.erase(it); + } rawCloseTimes_.peers.clear(); deadNodes_.clear(); @@ -1077,8 +1206,30 @@ template void Consensus::playbackProposals() { - for (auto const& it : recentPeerPositions_) + // Only use proposals for the ledger sequence we're currently working on. + auto const currentPositions = recentPeerPositions_.find( + previousLedger_.seq() + typename Ledger_t::Seq{1}); + if (currentPositions != recentPeerPositions_.end()) { + for (auto const& [peerID, pos] : currentPositions->second) + { + if (pos.proposal().prevLedger() == prevLedgerID_ && + peerProposalInternal(now_, pos)) + { + adaptor_.share(pos); + } + } + } + + // It's safe to do this--if a proposal is based on the wrong ledger, + // then peerProposalInternal() will not replace it in currPeerPositions_. + // TODO Eventually, remove code to check for non-existent ledger sequence + // in peer proposal messages and make that parameter required in + // the protobuf definition. Do this only after the network is running on + // rippled versions with that parameter set in peer proposals. This + // can be done once an amendment for another feature forces that kind + // of upgrade, but this particular feature does not require an amendment. + for (auto const& it : recentPeerPositionsLegacy_) { for (auto const& pos : it.second) { if (pos.proposal().prevLedger() == prevLedgerID_) @@ -1384,6 +1535,14 @@ Consensus::updateOurPositions() JLOG(j_.warn()) << "Removing stale proposal from " << peerID; for (auto& dt : result_->disputes) dt.second.unVote(peerID); + // Remove from acquired_ or else it will consume space for + // awhile. beast::aged_unordered_map::erase by key is broken and + // is not used anywhere in the existing codebase. + if (auto found = acquired_.find(peerProp.position()); + found != acquired_.end()) + { + acquiredPurge_.push(peerProp.position()); + } it = currPeerPositions_.erase(it); } else