Skip to content

Commit

Permalink
Add ledger sequence to peer proposals. Track proposals by sequence
Browse files Browse the repository at this point in the history
number and use only peer proposals for the current working sequence
number for consensus evaluation.
  • Loading branch information
mtrippled committed Jul 11, 2024
1 parent 22ed888 commit 7aa2819
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 22 deletions.
1 change: 1 addition & 0 deletions src/test/csf/Peer.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ struct Peer
using PeerPosition_t = Position;
using Result = ConsensusResult<Peer>;
using NodeKey = Validation::NodeKey;
// using clock_type = beast::abstract_clock<std::chrono::steady_clock>;

//! Logging support that prefixes messages with the peer ID
beast::WrappedSink sink;
Expand Down
2 changes: 1 addition & 1 deletion src/xrpld/app/consensus/RCLConsensus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ RCLConsensus::RCLConsensus(
LedgerMaster& ledgerMaster,
LocalTxs& localTxs,
InboundTransactions& inboundTransactions,
Consensus<Adaptor>::clock_type const& clock,
Consensus<Adaptor>::clock_type& clock,
ValidatorKeys const& validatorKeys,
beast::Journal journal)
: adaptor_(
Expand Down
2 changes: 1 addition & 1 deletion src/xrpld/app/consensus/RCLConsensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ class RCLConsensus
LedgerMaster& ledgerMaster,
LocalTxs& localTxs,
InboundTransactions& inboundTransactions,
Consensus<Adaptor>::clock_type const& clock,
Consensus<Adaptor>::clock_type& clock,
ValidatorKeys const& validatorKeys,
beast::Journal journal);

Expand Down
199 changes: 179 additions & 20 deletions src/xrpld/consensus/Consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <xrpld/consensus/LedgerTiming.h>
#include <xrpl/basics/Log.h>
#include <xrpl/basics/chrono.h>
#include <xrpl/beast/container/aged_unordered_map.h>
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/json/json_writer.h>
#include <boost/logic/tribool.hpp>
Expand All @@ -35,6 +36,7 @@
#include <deque>
#include <optional>
#include <sstream>
#include <stack>

namespace ripple {

Expand Down Expand Up @@ -335,7 +337,7 @@ class Consensus
@param adaptor The instance of the adaptor class
@param j The journal to log debug output
*/
Consensus(clock_type const& clock, Adaptor& adaptor, beast::Journal j);
Consensus(clock_type& clock, Adaptor& adaptor, beast::Journal j);

/** Kick-off the next round of consensus.
Expand Down Expand Up @@ -580,7 +582,15 @@ class Consensus
Ledger_t previousLedger_;

// Transaction Sets, indexed by hash of transaction tree
hash_map<typename TxSet_t::ID, const TxSet_t> acquired_;
using AcquiredType = beast::aged_unordered_map<
typename TxSet_t::ID,
const TxSet_t,
clock_type::clock_type,
beast::uhash<>>;
AcquiredType acquired_;

// Tx sets that can be purged only once there is a new consensus round.
std::stack<typename TxSet_t::ID> acquiredPurge_;

std::optional<Result> result_;
ConsensusCloseTimes rawCloseTimes_;
Expand All @@ -592,8 +602,18 @@ class Consensus
hash_map<NodeID_t, PeerPosition_t> currPeerPositions_;

// Recently received peer positions, available when transitioning between
// ledgers or rounds
hash_map<NodeID_t, std::deque<PeerPosition_t>> recentPeerPositions_;
// ledgers or rounds. Collected by ledger sequence. This allows us to
// know which positions are likely relevant to the ledger on which we are
// currently working. Also allows us to catch up faster if we fall behind
// the rest of the network since we won't need to re-aquire proposals
// and related transaction sets.
std::map<typename Ledger_t::Seq, hash_map<NodeID_t, PeerPosition_t>>
recentPeerPositions_;

// These are for peers not using code that adds a ledger sequence
// to the proposal message. TODO This should be removed eventually when
// the network fully upgrades.
hash_map<NodeID_t, std::deque<PeerPosition_t>> recentPeerPositionsLegacy_;

// The number of proposers who participated in the last consensus round
std::size_t prevProposers_ = 0;
Expand All @@ -607,10 +627,10 @@ class Consensus

template <class Adaptor>
Consensus<Adaptor>::Consensus(
clock_type const& clock,
clock_type& clock,
Adaptor& adaptor,
beast::Journal journal)
: adaptor_(adaptor), clock_(clock), j_{journal}
: adaptor_(adaptor), clock_(clock), acquired_(clock), j_{journal}
{
JLOG(j_.debug()) << "Creating consensus object";
}
Expand All @@ -636,8 +656,21 @@ Consensus<Adaptor>::startRound(
prevCloseTime_ = rawCloseTimes_.self;
}

// Clear positions that we know will not ever be necessary again.
auto it = recentPeerPositions_.begin();
while (it != recentPeerPositions_.end() && it->first <= prevLedger.seq())
it = recentPeerPositions_.erase(it);
// Get rid of untrusted positions for the current working ledger.
auto currentPositions =
recentPeerPositions_.find(prevLedger.seq() + typename Ledger_t::Seq{1});
if (currentPositions != recentPeerPositions_.end())
{
for (NodeID_t const& n : nowUntrusted)
currentPositions->second.erase(n);
}

for (NodeID_t const& n : nowUntrusted)
recentPeerPositions_.erase(n);
recentPeerPositionsLegacy_.erase(n);

ConsensusMode startMode =
proposing ? ConsensusMode::proposing : ConsensusMode::observing;
Expand Down Expand Up @@ -679,8 +712,30 @@ Consensus<Adaptor>::startRoundInternal(
convergePercent_ = 0;
haveCloseTimeConsensus_ = false;
openTime_.reset(clock_.now());
currPeerPositions_.clear();
acquired_.clear();

// beast::aged_unordered_map::erase by key is broken and
// is not used anywhere in the existing codebase.
while (!acquiredPurge_.empty())
{
auto found = acquired_.find(acquiredPurge_.top());
if (found != acquired_.end())
acquired_.erase(found);
acquiredPurge_.pop();
}
for (auto it = currPeerPositions_.begin(); it != currPeerPositions_.end();)
{
if (auto found = acquired_.find(it->second.proposal().position());
found != acquired_.end())
{
acquired_.erase(found);
}
it = currPeerPositions_.erase(it);
}

// Hold up to 30 minutes worth of acquired tx sets. This to help
// catch up quickly from extended de-sync periods.
beast::expire(acquired_, std::chrono::minutes(30));

rawCloseTimes_.peers.clear();
rawCloseTimes_.self = {};
deadNodes_.clear();
Expand Down Expand Up @@ -708,14 +763,45 @@ Consensus<Adaptor>::peerProposal(
auto const& peerID = newPeerPos.proposal().nodeID();

// Always need to store recent positions
if (newPeerPos.proposal().ledgerSeq().has_value())
{
// Ignore proposals from prior ledgers.
typename Ledger_t::Seq const& propLedgerSeq =
*newPeerPos.proposal().ledgerSeq();
if (propLedgerSeq <= previousLedger_.seq())
return false;

auto& bySeq = recentPeerPositions_[propLedgerSeq];
{
auto peerProp = bySeq.find(peerID);
if (peerProp == bySeq.end())
{
bySeq.emplace(peerID, newPeerPos);
}
else
{
// Only store if it's the latest proposal from this peer for the
// consensus round in the proposal.
if (newPeerPos.proposal().proposeSeq() <=
peerProp->second.proposal().proposeSeq())
{
return false;
}
peerProp->second = newPeerPos;
}
}
}
else
{
auto& props = recentPeerPositions_[peerID];
// legacy proposal with no ledger sequence
auto& props = recentPeerPositionsLegacy_[peerID];

if (props.size() >= 10)
props.pop_front();

props.push_back(newPeerPos);
}

return peerProposalInternal(now, newPeerPos);
}

Expand All @@ -725,10 +811,6 @@ Consensus<Adaptor>::peerProposalInternal(
NetClock::time_point const& now,
PeerPosition_t const& newPeerPos)
{
// Nothing to do for now if we are currently working on a ledger
if (phase_ == ConsensusPhase::accepted)
return false;

now_ = now;

auto const& newPeerProp = newPeerPos.proposal();
Expand All @@ -737,6 +819,20 @@ Consensus<Adaptor>::peerProposalInternal(
{
JLOG(j_.debug()) << "Got proposal for " << newPeerProp.prevLedger()
<< " but we are on " << prevLedgerID_;

if (!acquired_.count(newPeerProp.position()))
{
// acquireTxSet will return the set if it is available, or
// spawn a request for it and return nullopt/nullptr. It will call
// gotTxSet once it arrives. If we're behind, this should save
// time when we catch up.
if (auto set = adaptor_.acquireTxSet(newPeerProp.position()))
gotTxSet(now_, *set);
else
JLOG(j_.debug()) << "Do not have tx set for peer";
}

// There's nothing else to do with this proposal currently.
return false;
}

Expand Down Expand Up @@ -770,16 +866,39 @@ Consensus<Adaptor>::peerProposalInternal(
it.second.unVote(peerID);
}
if (peerPosIt != currPeerPositions_.end())
{
// Remove from acquired_ or else it will consume space for
// awhile. beast::aged_unordered_map::erase by key is broken and
// is not used anywhere in the existing codebase.
if (auto found =
acquired_.find(peerPosIt->second.proposal().position());
found != acquired_.end())
{
acquiredPurge_.push(
peerPosIt->second.proposal().position());
}
currPeerPositions_.erase(peerID);
}
deadNodes_.insert(peerID);

return true;
}

if (peerPosIt != currPeerPositions_.end())
peerPosIt->second = newPeerPos;
{
// Remove from acquired_ or else it will consume space for awhile.
// beast::aged_unordered_container::erase by key is broken and
// is not used anywhere in the existing codebase.
if (auto found = acquired_.find(newPeerPos.proposal().position());
found != acquired_.end())
{
acquiredPurge_.push(newPeerPos.proposal().position());
}
}
else
{
currPeerPositions_.emplace(peerID, newPeerPos);
}
}

if (newPeerProp.isInitial())
Expand All @@ -805,9 +924,9 @@ Consensus<Adaptor>::peerProposalInternal(
else
JLOG(j_.debug()) << "Don't have tx set for peer";
}
else if (result_)
// else if (result_)
{
updateDisputes(newPeerProp.nodeID(), ait->second);
// updateDisputes(newPeerProp.nodeID(), ait->second);
}
}

Expand Down Expand Up @@ -1025,8 +1144,18 @@ Consensus<Adaptor>::handleWrongLedger(typename Ledger_t::ID const& lgrId)
result_->disputes.clear();
result_->compares.clear();
}

currPeerPositions_.clear();
for (auto it = currPeerPositions_.begin();
it != currPeerPositions_.end();)
{
// beast::aged_unordered_map::erase by key is broken and
// is not used anywhere in the existing codebase.
if (auto found = acquired_.find(it->second.proposal().position());
found != acquired_.end())
{
acquiredPurge_.push(it->second.proposal().position());
}
it = currPeerPositions_.erase(it);
}
rawCloseTimes_.peers.clear();
deadNodes_.clear();

Expand Down Expand Up @@ -1077,8 +1206,30 @@ template <class Adaptor>
void
Consensus<Adaptor>::playbackProposals()
{
for (auto const& it : recentPeerPositions_)
// Only use proposals for the ledger sequence we're currently working on.
auto const currentPositions = recentPeerPositions_.find(
previousLedger_.seq() + typename Ledger_t::Seq{1});
if (currentPositions != recentPeerPositions_.end())
{
for (auto const& [peerID, pos] : currentPositions->second)
{
if (pos.proposal().prevLedger() == prevLedgerID_ &&
peerProposalInternal(now_, pos))
{
adaptor_.share(pos);
}
}
}

// It's safe to do this--if a proposal is based on the wrong ledger,
// then peerProposalInternal() will not replace it in currPeerPositions_.
// TODO Eventually, remove code to check for non-existent ledger sequence
// in peer proposal messages and make that parameter required in
// the protobuf definition. Do this only after the network is running on
// rippled versions with that parameter set in peer proposals. This
// can be done once an amendment for another feature forces that kind
// of upgrade, but this particular feature does not require an amendment.
for (auto const& it : recentPeerPositionsLegacy_) {
for (auto const& pos : it.second)
{
if (pos.proposal().prevLedger() == prevLedgerID_)
Expand Down Expand Up @@ -1384,6 +1535,14 @@ Consensus<Adaptor>::updateOurPositions()
JLOG(j_.warn()) << "Removing stale proposal from " << peerID;
for (auto& dt : result_->disputes)
dt.second.unVote(peerID);
// Remove from acquired_ or else it will consume space for
// awhile. beast::aged_unordered_map::erase by key is broken and
// is not used anywhere in the existing codebase.
if (auto found = acquired_.find(peerProp.position());
found != acquired_.end())
{
acquiredPurge_.push(peerProp.position());
}
it = currPeerPositions_.erase(it);
}
else
Expand Down

0 comments on commit 7aa2819

Please sign in to comment.