Skip to content

Commit

Permalink
Recover old open ledger transactions to the queue:
Browse files Browse the repository at this point in the history
* Recover to the open ledger once, then to the queue.
* If transaction fails to queue for any reason, drop it.
* New result codes for transactions that can not queue.
* Add minimum queue size
* RIPD-1530
* fix XRPLF#2215
  • Loading branch information
ximinez committed Sep 19, 2017
1 parent 3bfd9de commit 9b0c6b6
Show file tree
Hide file tree
Showing 12 changed files with 236 additions and 66 deletions.
9 changes: 8 additions & 1 deletion src/ripple/app/consensus/RCLConsensus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ RCLConsensus::relay(RCLCxTx const& tx)
// If we didn't relay this transaction recently, relay it to all peers
if (app_.getHashRouter().shouldRelay(tx.id()))
{
JLOG(j_.debug()) << "Relaying disputed tx " << tx.id();
auto const slice = tx.tx_.slice();
protocol::TMTransaction msg;
msg.set_rawtransaction(slice.data(), slice.size());
Expand All @@ -163,6 +164,10 @@ RCLConsensus::relay(RCLCxTx const& tx)
app_.overlay().foreach (send_always(
std::make_shared<Message>(msg, protocol::mtTRANSACTION)));
}
else
{
JLOG(j_.debug()) << "Not relaying disputedtx " << tx.id();
}
}
void
RCLConsensus::propose(RCLCxPeerPos::Proposal const& proposal)
Expand Down Expand Up @@ -303,6 +308,8 @@ RCLConsensus::onClose(
// Build SHAMap containing all transactions in our open ledger
for (auto const& tx : initialLedger->txs)
{
JLOG(j_.trace()) << "Adding open ledger TX " <<
tx.first->getTransactionID();
Serializer s(2048);
tx.first->add(s);
initialSet->addItem(
Expand Down Expand Up @@ -474,7 +481,7 @@ RCLConsensus::doAccept(
{
JLOG(j_.debug())
<< "Test applying disputed transaction that did"
<< " not get in";
<< " not get in " << it.second.tx().id();

SerialIter sit(it.second.tx().tx_.slice());
auto txn = std::make_shared<STTx const>(sit);
Expand Down
48 changes: 45 additions & 3 deletions src/ripple/app/ledger/impl/OpenLedger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@
#include <BeastConfig.h>
#include <ripple/app/ledger/OpenLedger.h>
#include <ripple/app/main/Application.h>
#include <ripple/app/misc/HashRouter.h>
#include <ripple/app/misc/TxQ.h>
#include <ripple/app/tx/apply.h>
#include <ripple/ledger/CachedView.h>
#include <ripple/overlay/Message.h>
#include <ripple/overlay/Overlay.h>
#include <ripple/overlay/predicates.h>
#include <ripple/protocol/Feature.h>
#include <boost/range/adaptor/transformed.hpp>

Expand Down Expand Up @@ -117,6 +121,29 @@ OpenLedger::accept(Application& app, Rules const& rules,
for (auto const& item : locals)
app.getTxQ().apply(app, *next,
item.second, flags, j_);

// If we didn't relay this transaction recently, relay it to all peers
for (auto const& txpair : next->txs)
{
auto const& tx = txpair.first;
auto const txId = tx->getTransactionID();
if (auto const toSkip = app.getHashRouter().shouldRelay(txId))
{
JLOG(j_.debug()) << "Relaying recovered tx " << txId;
protocol::TMTransaction msg;
Serializer s;

tx->add(s);
msg.set_rawtransaction(&s.getData().front(), s.getLength());
msg.set_status(protocol::tsNEW);
msg.set_receivetimestamp(
app.timeKeeper().now().time_since_epoch().count());
app.overlay().foreach(send_if_not(
std::make_shared<Message>(msg, protocol::mtTRANSACTION),
peer_in_set(*toSkip)));
}
}

// Switch to the new open view
std::lock_guard<
std::mutex> lock2(current_mutex_);
Expand All @@ -143,9 +170,24 @@ OpenLedger::apply_one (Application& app, OpenView& view,
{
if (retry)
flags = flags | tapRETRY;
auto const result = ripple::apply(
app, view, *tx, flags, j);
if (result.second)
auto const result = [&]
{
if (app.getHashRouter().shouldRecover(tx->getTransactionID()))
{
return ripple::apply(app, view, *tx, flags, j);
}
else
{
// If the transaction can't get into the queue for any reason,
// drop it. If other nodes / validators have it, it'll be in
// their proposed set. If it's stuck on this node, then clean
// up the open ledger.
return app.getTxQ().apply(
app, view, tx, flags | tapPREFER_QUEUE, j);
}
}();
if (result.second ||
result.first == terQUEUED)
return Result::success;
if (isTefFailure (result.first) ||
isTemMalformed (result.first) ||
Expand Down
3 changes: 2 additions & 1 deletion src/ripple/app/main/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,8 @@ class ApplicationImp
, mFeeTrack (std::make_unique<LoadFeeTrack>(logs_->journal("LoadManager")))

, mHashRouter (std::make_unique<HashRouter>(
stopwatch(), HashRouter::getDefaultHoldTime ()))
stopwatch(), HashRouter::getDefaultHoldTime (),
HashRouter::getDefaultRecoverLimit ()))

, mValidations (make_Validations (*this))

Expand Down
10 changes: 10 additions & 0 deletions src/ripple/app/misc/HashRouter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,14 @@ HashRouter::shouldRelay (uint256 const& key)
return s.releasePeerSet();
}

bool
HashRouter::shouldRecover(uint256 const& key)
{
std::lock_guard <std::mutex> lock(mutex_);

auto& s = emplace(key).first;

return s.shouldRecover(recoverLimit_);
}

} // ripple
34 changes: 31 additions & 3 deletions src/ripple/app/misc/HashRouter.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ class HashRouter
static char const* getCountedObjectName () { return "HashRouterEntry"; }

Entry ()
: flags_ (0)
{
}

Expand Down Expand Up @@ -107,12 +106,26 @@ class HashRouter
return true;
}

/** Determines if this item should be recovered from the open ledger.
Counts the number of times the item has been recovered.
If it hits the limit, reset the counter and return false.
Else, increment the counter and return true.
@note The limit must be > 0
*/
bool shouldRecover(std::uint32_t limit)
{
return ++recoveries_ % limit != 0;
}

private:
int flags_;
int flags_ = 0;
std::set <PeerShortID> peers_;
// This could be generalized to a map, if more
// than one flag needs to expire independently.
boost::optional<Stopwatch::time_point> relayed_;
std::uint32_t recoveries_ = 0;
};

public:
Expand All @@ -123,9 +136,16 @@ class HashRouter
return 300s;
}

HashRouter (Stopwatch& clock, std::chrono::seconds entryHoldTimeInSeconds)
static inline std::uint32_t getDefaultRecoverLimit()
{
return 1;
}

HashRouter (Stopwatch& clock, std::chrono::seconds entryHoldTimeInSeconds,
std::uint32_t recoverLimit)
: suppressionMap_(clock)
, holdTime_ (entryHoldTimeInSeconds)
, recoverLimit_ (recoverLimit + 1u)
{
}

Expand Down Expand Up @@ -164,6 +184,12 @@ class HashRouter
*/
boost::optional<std::set<PeerShortID>> shouldRelay(uint256 const& key);

/** Determines whether the hashed item should be recovered
@return `bool` indicates whether the item should be relayed
*/
bool shouldRecover(uint256 const& key);

private:
// pair.second indicates whether the entry was created
std::pair<Entry&, bool> emplace (uint256 const&);
Expand All @@ -175,6 +201,8 @@ class HashRouter
hardened_hash<strong_hash>> suppressionMap_;

std::chrono::seconds const holdTime_;

std::uint32_t const recoverLimit_;
};

} // ripple
Expand Down
1 change: 1 addition & 0 deletions src/ripple/app/misc/TxQ.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class TxQ
struct Setup
{
std::size_t ledgersInQueue = 20;
std::size_t queueSizeMin = 2000;
std::uint32_t retrySequencePercent = 25;
// TODO: eahennis. Can we remove the multi tx factor?
std::int32_t multiTxnPercent = -90;
Expand Down
Loading

0 comments on commit 9b0c6b6

Please sign in to comment.