Skip to content

Commit

Permalink
2.1.0-rc1-debugrelay-8
Browse files Browse the repository at this point in the history
Squashed previous debugrelay commits and rebased
onto 2.1.0-rc1.

Diagnostics for relaying transactions. Should show whether or not
each transaction was relayed to each peer once committed locally.

Log relay better.

Log peer messages and charges.

Diags for receiving tx.

Log outgoing peer messages.

Log every caller of Peer::send(). Try to find why ledger_data and
validations messages are being spammed.

Bump version to 2.0.0-rcX-debugrelay-2

2.0.0-rcX-debugrelay-3: logs for batch apply timer.

2.0.0-rcX-debugrelay-4: log batch size and dispatch state.

a2.0.0-rcX-debugrelay-5: log condition for running transactionBatch()

2.0.0-rcX-debugrelay-6: log whether duplicate messages are being
relayed and whether peers' send queue is being modified correctly.

2.0.0-rcX-debugrelay-7: atomically count all peer message queue
modifications.
  • Loading branch information
mtrippled committed Feb 12, 2024
1 parent da68651 commit 0d2afea
Show file tree
Hide file tree
Showing 17 changed files with 208 additions and 32 deletions.
1 change: 1 addition & 0 deletions src/ripple/app/consensus/RCLConsensus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,7 @@ RCLConsensus::Adaptor::notify(
}
s.set_firstseq(uMin);
s.set_lastseq(uMax);
JLOG(j_.debug()) << "debugrelay send() 32";
app_.overlay().foreach(
send_always(std::make_shared<Message>(s, protocol::mtSTATUS_CHANGE)));
JLOG(j_.trace()) << "send status change to peer";
Expand Down
1 change: 1 addition & 0 deletions src/ripple/app/ledger/impl/InboundLedger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,7 @@ InboundLedger::trigger(std::shared_ptr<Peer> const& peer, TriggerReason reason)
if (auto p = app_.overlay().findPeerByShortID(id))
{
mByHash = false;
JLOG(journal_.debug()) << "debugrelay send() 1";
p->send(packet);
}
});
Expand Down
2 changes: 2 additions & 0 deletions src/ripple/app/ledger/impl/LedgerMaster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,7 @@ LedgerMaster::getFetchPack(LedgerIndex missing, InboundLedger::Reason reason)
tmBH.set_ledgerhash(haveHash->begin(), 32);
auto packet = std::make_shared<Message>(tmBH, protocol::mtGET_OBJECTS);

JLOG(m_journal.debug()) << "debugrelay send() 2";
target->send(packet);
JLOG(m_journal.trace()) << "Requested fetch pack for " << missing;
}
Expand Down Expand Up @@ -2400,6 +2401,7 @@ LedgerMaster::makeFetchPack(
<< "Built fetch pack with " << reply.objects().size() << " nodes ("
<< msg->getBufferSize() << " bytes)";

JLOG(m_journal.debug()) << "debugrelay send() 3";
peer->send(msg);
}
catch (std::exception const& ex)
Expand Down
3 changes: 2 additions & 1 deletion src/ripple/app/main/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,8 @@ class ApplicationImp : public Application, public BasicApp

, hashRouter_(std::make_unique<HashRouter>(
stopwatch(),
HashRouter::getDefaultHoldTime()))
HashRouter::getDefaultHoldTime(),
logs_->journal("HashRouter")))

, mValidations(
ValidationParms(),
Expand Down
18 changes: 16 additions & 2 deletions src/ripple/app/misc/HashRouter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
//==============================================================================

#include <ripple/app/misc/HashRouter.h>
#include <ripple/basics/Log.h>
#include <sstream>

namespace ripple {

Expand Down Expand Up @@ -122,10 +124,22 @@ HashRouter::shouldRelay(uint256 const& key)

auto& s = emplace(key).first;

std::stringstream ss;
ss << "shouldRelay " << key;
std::optional<std::set<PeerShortID>> ret;
if (!s.shouldRelay(suppressionMap_.clock().now(), holdTime_))
return {};
{
ss << " not recently relayed.";
}
else
{
ss << " recently relayed.";
ret = s.releasePeerSet();
}

return s.releasePeerSet();
ss << " relaying: " << (ret.has_value() ? "true" : "false");
JLOG(j_.debug()) << ss.str();
return ret;
}

} // namespace ripple
6 changes: 5 additions & 1 deletion src/ripple/app/misc/HashRouter.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <ripple/basics/base_uint.h>
#include <ripple/basics/chrono.h>
#include <ripple/beast/container/aged_unordered_map.h>
#include <ripple/beast/utility/Journal.h>

#include <optional>

Expand Down Expand Up @@ -143,8 +144,10 @@ class HashRouter
return 300s;
}

HashRouter(Stopwatch& clock, std::chrono::seconds entryHoldTimeInSeconds)
HashRouter(Stopwatch& clock, std::chrono::seconds entryHoldTimeInSeconds,
beast::Journal j)
: suppressionMap_(clock), holdTime_(entryHoldTimeInSeconds)
, j_(j)
{
}

Expand Down Expand Up @@ -221,6 +224,7 @@ class HashRouter
suppressionMap_;

std::chrono::seconds const holdTime_;
beast::Journal j_;
};

} // namespace ripple
Expand Down
74 changes: 65 additions & 9 deletions src/ripple/app/misc/NetworkOPs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
#include <algorithm>
#include <mutex>
#include <optional>
#include <sstream>
#include <string>
#include <tuple>
#include <unordered_map>
Expand Down Expand Up @@ -611,7 +612,8 @@ class NetworkOPsImp final : public NetworkOPs
boost::asio::steady_timer& timer,
std::chrono::milliseconds const& expiry_time,
std::function<void()> onExpire,
std::function<void()> onError);
std::function<void()> onError,
std::optional<std::string> label = std::nullopt);
void
setHeartbeatTimer();
void
Expand Down Expand Up @@ -938,15 +940,30 @@ NetworkOPsImp::setTimer(
boost::asio::steady_timer& timer,
const std::chrono::milliseconds& expiry_time,
std::function<void()> onExpire,
std::function<void()> onError)
std::function<void()> onError,
std::optional<std::string> label)
{
if (label.has_value())
{
JLOG(m_journal.debug()) << "setTimer setting " << *label;
}
// Only start the timer if waitHandlerCounter_ is not yet joined.
if (auto optionalCountedHandler = waitHandlerCounter_.wrap(
[this, onExpire, onError](boost::system::error_code const& e) {
[this, label, onExpire, onError](boost::system::error_code const& e) {
if ((e.value() == boost::system::errc::success) &&
(!m_job_queue.isStopped()))
{
if (label.has_value())
{
JLOG(m_journal.debug())
<< "setTimer onExpire() starting " << *label;
}
onExpire();
if (label.has_value())
{
JLOG(m_journal.debug())
<< "setTimer onExpire() completed " << *label;
}
}
// Recover as best we can if an unexpected error occurs.
if (e.value() != boost::system::errc::success &&
Expand All @@ -956,12 +973,31 @@ NetworkOPsImp::setTimer(
JLOG(m_journal.error())
<< "Timer got error '" << e.message()
<< "'. Restarting timer.";
if (label.has_value())
{
JLOG(m_journal.debug())
<< "setTimer onError() starting " << *label;
}
onError();
if (label.has_value())
{
JLOG(m_journal.debug())
<< "setTimer onError() completed " << *label;
}
}
}))
{
if (label.has_value())
{
JLOG(m_journal.debug())
<< "setTimer setting expiration and waiting " << *label;
}
timer.expires_from_now(expiry_time);
timer.async_wait(std::move(*optionalCountedHandler));
if (label.has_value())
{
JLOG(m_journal.debug()) << "setTimer finished waiting " << *label;
}
}
}

Expand Down Expand Up @@ -1106,6 +1142,7 @@ NetworkOPsImp::processClusterTimer()
node.set_name(to_string(item.address));
node.set_cost(item.balance);
}
JLOG(m_journal.debug()) << "debugrelay send() 37";
app_.overlay().foreach(send_if(
std::make_shared<Message>(cluster, protocol::mtCLUSTER),
peer_in_cluster()));
Expand Down Expand Up @@ -1195,6 +1232,7 @@ NetworkOPsImp::processTransaction(
bool bLocal,
FailHard failType)
{
JLOG(m_journal.debug()) << "processTransaction " << transaction->getID();
auto ev = m_job_queue.makeLoadEvent(jtTXN_PROC, "ProcessTXN");
auto const newFlags = app_.getHashRouter().getFlags(transaction->getID());

Expand All @@ -1221,7 +1259,7 @@ NetworkOPsImp::processTransaction(
// Not concerned with local checks at this point.
if (validity == Validity::SigBad)
{
JLOG(m_journal.info()) << "Transaction has bad signature: " << reason;
JLOG(m_journal.info()) << "Transaction has bad signature: " << transaction->getID() << " " << reason;
transaction->setStatus(INVALID);
transaction->setResult(temBAD_SIGNATURE);
app_.getHashRouter().setFlags(transaction->getID(), SF_BAD);
Expand Down Expand Up @@ -1251,6 +1289,7 @@ NetworkOPsImp::doTransactionAsync(
mTransactions.push_back(
TransactionStatus(transaction, bUnlimited, false, failType));
transaction->setApplying();
JLOG(m_journal.debug()) << "processTransaction batch size " << transaction->getID() << " " << mTransactions.size();

if (mDispatchState == DispatchState::none)
{
Expand Down Expand Up @@ -1328,6 +1367,7 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
mDispatchState = DispatchState::running;

batchLock.unlock();
JLOG(m_journal.debug()) << "apply batch size " << transactions.size();

{
std::unique_lock masterLock{app_.getMasterMutex(), std::defer_lock};
Expand All @@ -1348,6 +1388,7 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
if (e.failType == FailHard::yes)
flags |= tapFAIL_HARD;

JLOG(m_journal.debug()) << "apply in batch " << e.transaction->getID();
auto const result = app_.getTxQ().apply(
app_, view, e.transaction->getSTransaction(), flags, j);
e.result = result.first;
Expand Down Expand Up @@ -1399,7 +1440,7 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
if (e.result == tesSUCCESS)
{
JLOG(m_journal.debug())
<< "Transaction is now included in open ledger";
<< "Transaction is now included in open ledger " << e.transaction->getID();
e.transaction->setStatus(INCLUDED);

auto const& txCur = e.transaction->getSTransaction();
Expand All @@ -1416,14 +1457,14 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
else if (e.result == tefPAST_SEQ)
{
// duplicate or conflict
JLOG(m_journal.info()) << "Transaction is obsolete";
JLOG(m_journal.info()) << "Transaction is obsolete " << e.transaction->getID();
e.transaction->setStatus(OBSOLETE);
}
else if (e.result == terQUEUED)
{
JLOG(m_journal.debug())
<< "Transaction is likely to claim a"
<< " fee, but is queued until fee drops";
<< " fee, but is queued until fee drops " << e.transaction->getID();

e.transaction->setStatus(HELD);
// Add to held transactions, because it could get
Expand All @@ -1439,7 +1480,7 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
{
// transaction should be held
JLOG(m_journal.debug())
<< "Transaction should be held: " << e.result;
<< "Transaction should be held: " << e.result << " " << e.transaction->getID();
e.transaction->setStatus(HELD);
m_ledgerMaster.addHeldTransaction(e.transaction);
e.transaction->setKept();
Expand All @@ -1448,7 +1489,7 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
else
{
JLOG(m_journal.debug())
<< "Status other than success " << e.result;
<< "Status other than success " << e.result << " " << e.transaction->getID();
e.transaction->setStatus(INVALID);
}

Expand All @@ -1463,6 +1504,15 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
e.transaction->setKept();
}

std::stringstream ss;
ss << "DEBUGRELAY apply tx " << e.transaction->getID()
<< ". applied,mMode,failtype != yes,local,result == terQUEUED,enforceFailHard: "
<< e.applied << ',' << strOperatingMode() << ','
<< (e.failType != FailHard::yes) << ','
<< e.local << ','
<< (e.result == terQUEUED) << ','
<< enforceFailHard;

if ((e.applied ||
((mMode != OperatingMode::FULL) &&
(e.failType != FailHard::yes) && e.local) ||
Expand All @@ -1471,9 +1521,11 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
{
auto const toSkip =
app_.getHashRouter().shouldRelay(e.transaction->getID());
ss << ". Possibly relaying. toSkip: " << toSkip.has_value();

if (toSkip)
{
ss << ". Attempting to relay.";
protocol::TMTransaction tx;
Serializer s;

Expand All @@ -1489,6 +1541,8 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
}
}

JLOG(m_journal.debug()) << ss.str();

if (validatedLedgerIndex)
{
auto [fee, accountSeq, availableSeq] =
Expand Down Expand Up @@ -1800,6 +1854,7 @@ NetworkOPsImp::switchLastClosedLedger(
newLCL->info().parentHash.begin(), newLCL->info().parentHash.size());
s.set_ledgerhash(newLCL->info().hash.begin(), newLCL->info().hash.size());

JLOG(m_journal.debug()) << "debugrelay send() 33";
app_.overlay().foreach(
send_always(std::make_shared<Message>(s, protocol::mtSTATUS_CHANGE)));
}
Expand Down Expand Up @@ -1885,6 +1940,7 @@ NetworkOPsImp::mapComplete(std::shared_ptr<SHAMap> const& map, bool fromAcquire)
protocol::TMHaveTransactionSet msg;
msg.set_hash(map->getHash().as_uint256().begin(), 256 / 8);
msg.set_status(protocol::tsHAVE);
JLOG(m_journal.debug()) << "debugrelay send() 34";
app_.overlay().foreach(
send_always(std::make_shared<Message>(msg, protocol::mtHAVE_SET)));

Expand Down
1 change: 1 addition & 0 deletions src/ripple/app/misc/impl/ValidatorList.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,7 @@ ValidatorList::sendValidatorList(
{
if (message.message)
{
JLOG(j.debug()) << "debugrelay send() 4";
peer.send(message.message);
hashRouter.addSuppressionPeer(message.hash, peer.id());
sent = true;
Expand Down
1 change: 1 addition & 0 deletions src/ripple/nodestore/impl/DatabaseShardImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2205,6 +2205,7 @@ DatabaseShardImp::updatePeers(std::lock_guard<std::mutex> const& lock) const
app_.getOPs().getOperatingMode() != OperatingMode::DISCONNECTED)
{
auto const message{getShardInfo(lock)->makeMessage(app_)};
JLOG(j_.debug()) << "debugrelay send() 35";
app_.overlay().foreach(send_always(std::make_shared<Message>(
message, protocol::mtPEER_SHARD_INFO_V2)));
}
Expand Down
1 change: 1 addition & 0 deletions src/ripple/overlay/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ class Message : public std::enable_shared_from_this<Message>
* @param in Payload header pointer
* @return Message type
*/
public:
int
getType(std::uint8_t const* in) const;
};
Expand Down
Loading

0 comments on commit 0d2afea

Please sign in to comment.