Skip to content

Commit

Permalink
Latest: 2.2.0-b1-debugrelay-12
Browse files Browse the repository at this point in the history
-------------------------

squashed these:

2.1.0-rc1-debugrelay-8
Squashed previous debugrelay commits and rebased
onto 2.1.0-rc1.

Diagnostics for relaying transactions. Should show whether or not
each transaction was relayed to each peer once committed locally.

Log relay better.

Log peer messages and charges.

Diags for receiving tx.

Log outgoing peer messages.

Log every caller of Peer::send(). Try to find why ledger_data and
validations messages are being spammed.

Bump version to 2.0.0-rcX-debugrelay-2

2.0.0-rcX-debugrelay-3: logs for batch apply timer.

2.0.0-rcX-debugrelay-4: log batch size and dispatch state.

a2.0.0-rcX-debugrelay-5: log condition for running transactionBatch()

2.0.0-rcX-debugrelay-6: log whether duplicate messages are being
relayed and whether peers' send queue is being modified correctly.

2.0.0-rcX-debugrelay-7: atomically count all peer message queue
modifications.

2.1.0-rc1-debugrelay-9: log ledger advancing.

2.1.0-rc1-debugrelay-10: Log why ledgers not being acquired,
why complete_ledgers not being updated.

2.1.0-rc1-debugrelay-11:
Logs for finding new ledgers to publish.

2.2.0-b1-debugrelay-12: Fix stringstream crash.
  • Loading branch information
mtrippled committed Apr 4, 2024
1 parent 4bcbf70 commit d27906b
Show file tree
Hide file tree
Showing 17 changed files with 304 additions and 45 deletions.
1 change: 1 addition & 0 deletions src/ripple/app/consensus/RCLConsensus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,7 @@ RCLConsensus::Adaptor::notify(
}
s.set_firstseq(uMin);
s.set_lastseq(uMax);
JLOG(j_.debug()) << "debugrelay send() 32";
app_.overlay().foreach(
send_always(std::make_shared<Message>(s, protocol::mtSTATUS_CHANGE)));
JLOG(j_.trace()) << "send status change to peer";
Expand Down
1 change: 1 addition & 0 deletions src/ripple/app/ledger/impl/InboundLedger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,7 @@ InboundLedger::trigger(std::shared_ptr<Peer> const& peer, TriggerReason reason)
if (auto p = app_.overlay().findPeerByShortID(id))
{
mByHash = false;
JLOG(journal_.debug()) << "debugrelay send() 1";
p->send(packet);
}
});
Expand Down
111 changes: 98 additions & 13 deletions src/ripple/app/ledger/impl/LedgerMaster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ shouldAcquire(
return minimumOnline.has_value() && candidateLedger >= *minimumOnline;
}();

JLOG(j.trace()) << "Missing ledger " << candidateLedger
JLOG(j.debug()) << "shouldAcquire Missing ledger " << candidateLedger
<< (ret ? " should" : " should NOT") << " be acquired";
return ret;
}
Expand Down Expand Up @@ -893,6 +893,7 @@ LedgerMaster::getFetchPack(LedgerIndex missing, InboundLedger::Reason reason)
tmBH.set_ledgerhash(haveHash->begin(), 32);
auto packet = std::make_shared<Message>(tmBH, protocol::mtGET_OBJECTS);

JLOG(m_journal.debug()) << "debugrelay send() 2";
target->send(packet);
JLOG(m_journal.trace()) << "Requested fetch pack for " << missing;
}
Expand Down Expand Up @@ -1362,24 +1363,35 @@ std::vector<std::shared_ptr<Ledger const>>
LedgerMaster::findNewLedgersToPublish(
std::unique_lock<std::recursive_mutex>& sl)
{
std::stringstream ss;
ss << "findNewLedgersToPublish ";

std::vector<std::shared_ptr<Ledger const>> ret;

JLOG(m_journal.trace()) << "findNewLedgersToPublish<";

ss << " mValidLedger.empty()? " << mValidLedger.empty();
// No valid ledger, nothing to do
if (mValidLedger.empty())
{
JLOG(m_journal.trace()) << "No valid journal, nothing to publish.";
ss << " returning";
JLOG(m_journal.debug()) << ss.str();
return {};
}

ss << " mPubLedger? " << (mPubLedger ? "exists " : "doesn't exist ");
if (!mPubLedger)
{
JLOG(m_journal.info())
<< "First published ledger will be " << mValidLedgerSeq;
ss << " first published ledger will be mValidLedgerSeq " << mValidLedgerSeq;
JLOG(m_journal.debug()) << ss.str();
return {mValidLedger.get()};
}

ss << " mValidLedgerSeq,mPubLedgerSeq,MAX_LEDGER_GAP: "
<< mValidLedgerSeq << ',' << mPubLedgerSeq << ',' << MAX_LEDGER_GAP;
if (mValidLedgerSeq > (mPubLedgerSeq + MAX_LEDGER_GAP))
{
JLOG(m_journal.warn()) << "Gap in validated ledger stream "
Expand All @@ -1389,13 +1401,17 @@ LedgerMaster::findNewLedgersToPublish(
ret.push_back(valLedger);
setPubLedger(valLedger);
app_.getOrderBookDB().setup(valLedger);
ss << " returning valid ledger " << valLedger->info().seq;
JLOG(m_journal.debug()) << ss.str();

return {valLedger};
}

if (mValidLedgerSeq <= mPubLedgerSeq)
{
JLOG(m_journal.trace()) << "No valid journal, nothing to publish.";
ss << " mValidLedgerSeq <= mPubLedgerSeq, nothing to puslish";
JLOG(m_journal.debug()) << ss.str();
return {};
}

Expand All @@ -1405,11 +1421,14 @@ LedgerMaster::findNewLedgersToPublish(
auto valLedger = mValidLedger.get();
std::uint32_t valSeq = valLedger->info().seq;

ss << "LEDGER REPLAY configured? " << app_.config().LEDGER_REPLAY << ", ";
ss << " now iterating through ledgers from " << pubSeq << " to " << valSeq << " inclusive: ";
ScopedUnlock sul{sl};
try
{
for (std::uint32_t seq = pubSeq; seq <= valSeq; ++seq)
{
ss << " seq " << seq;
JLOG(m_journal.trace())
<< "Trying to fetch/publish valid ledger " << seq;

Expand All @@ -1419,16 +1438,21 @@ LedgerMaster::findNewLedgersToPublish(
// VFALCO TODO Restructure this code so that zero is not
// used.
if (!hash)
{
ss << " no hash setting to beast::zero ";
hash = beast::zero; // kludge
}
if (seq == valSeq)
{
// We need to publish the ledger we just fully validated
ss << " publish the ledger we just fully validated (seq == valSeq) ";
ledger = valLedger;
}
else if (hash->isZero())
{
JLOG(m_journal.fatal()) << "Ledger: " << valSeq
<< " does not have hash for " << seq;
ss << " hash is zero, no hash available ";
assert(false);
}
else
Expand All @@ -1438,20 +1462,29 @@ LedgerMaster::findNewLedgersToPublish(

if (!app_.config().LEDGER_REPLAY)
{
ss << " acqCount+1,ledger_fetch_size_: " << (acqCount + 1)
<< ',' << ledger_fetch_size_ << " ";
// Can we try to acquire the ledger we need?
if (!ledger && (++acqCount < ledger_fetch_size_))
{
ss << "acquire() " << seq << " ";
ledger = app_.getInboundLedgers().acquire(
*hash, seq, InboundLedger::Reason::GENERIC);
}
}

// Did we acquire the next ledger we need to publish?
ss << " we have ledger? " << (ledger ? "yes " : "no ") << " it's seq,pubSeq: "
<< ledger->info().seq << ',' << pubSeq << " ";
if (ledger && (ledger->info().seq == pubSeq))
{
ss << " yes, set it validated, add to return vector and increment pubSeq to " << pubSeq + 1 << " ";
ledger->setValidated();
ret.push_back(ledger);
++pubSeq;
}
}
ss << ". ready to publish " << ret.size() << " ledgers. ";

JLOG(m_journal.trace())
<< "ready to publish " << ret.size() << " ledgers.";
Expand All @@ -1461,10 +1494,12 @@ LedgerMaster::findNewLedgersToPublish(
JLOG(m_journal.error())
<< "Exception while trying to find ledgers to publish: "
<< ex.what();
ss << " exception while trying to find ledgers to publish: " << ex.what() << " ";
}

if (app_.config().LEDGER_REPLAY)
{
ss << " narrow down and attempt to replay ledgers. ";
/* Narrow down the gap of ledgers, and try to replay them.
* When replaying a ledger gap, if the local node has
* the start ledger, it saves an expensive InboundLedger
Expand All @@ -1485,7 +1520,8 @@ LedgerMaster::findNewLedgersToPublish(
{
auto numberLedgers =
finishLedger->seq() - startLedger->seq() + 1;
JLOG(m_journal.debug())
// JLOG(m_journal.debug())
ss
<< "Publish LedgerReplays " << numberLedgers
<< " ledgers, from seq=" << startLedger->info().seq << ", "
<< startLedger->info().hash
Expand All @@ -1500,6 +1536,7 @@ LedgerMaster::findNewLedgersToPublish(
}
}

ss << ss.str();
return ret;
}

Expand Down Expand Up @@ -1974,33 +2011,52 @@ LedgerMaster::fetchForHistory(
{
assert(hash->isNonZero());
auto ledger = getLedgerByHash(*hash);
std::stringstream ss;
ss << "fetchForHistory seq: " << missing;
if (ledger)
ss << " hash: " << ledger->info().hash;
else
ss << " no ledger ";
if (!ledger)
{
if (!app_.getInboundLedgers().isFailure(*hash))
{
ledger =
app_.getInboundLedgers().acquire(*hash, missing, reason);
ss << " inbound ledgers ok so far. ledger: " << ledger
<< " missing: " << missing
<< " fetch_seq: " << fetch_seq_
<< " earliest ledger seq: " << app_.getNodeStore().earliestLedgerSeq();
if (!ledger && missing != fetch_seq_ &&
missing > app_.getNodeStore().earliestLedgerSeq())
{
JLOG(m_journal.trace())
ss << " want fetch pack. ";
JLOG(m_journal.debug())
<< "fetchForHistory want fetch pack " << missing;
fetch_seq_ = missing;
getFetchPack(missing, reason);
}
else
JLOG(m_journal.trace())
{
ss << " no fetch pack. ";
JLOG(m_journal.debug())
<< "fetchForHistory no fetch pack for " << missing;
}
}
else
{
ss << " found failed acquire. ";
JLOG(m_journal.debug())
<< "fetchForHistory found failed acquire";
}
}
ss << " ledger: " << ledger;
if (ledger)
{
auto seq = ledger->info().seq;
assert(seq == missing);
JLOG(m_journal.trace()) << "fetchForHistory acquired " << seq;
ss << " acquired: " << seq;
JLOG(m_journal.debug()) << "fetchForHistory acquired " << seq;
if (reason == InboundLedger::Reason::SHARD)
{
ledger->setFull();
Expand All @@ -2013,6 +2069,7 @@ LedgerMaster::fetchForHistory(
}
else
{
ss << "setFullLedger now. all should be well. ";
setFullLedger(ledger, false, false);
int fillInProgress;
{
Expand All @@ -2039,6 +2096,7 @@ LedgerMaster::fetchForHistory(
}
else
{
ss << " have not acquired. ";
std::uint32_t fetchSz;
if (reason == InboundLedger::Reason::SHARD)
// Do not fetch ledger sequences lower
Expand Down Expand Up @@ -2070,6 +2128,7 @@ LedgerMaster::fetchForHistory(
<< "Threw while prefetching: " << ex.what();
}
}
JLOG(m_journal.debug()) << ss.str();
}
else
{
Expand All @@ -2091,20 +2150,37 @@ LedgerMaster::fetchForHistory(
void
LedgerMaster::doAdvance(std::unique_lock<std::recursive_mutex>& sl)
{
JLOG(m_journal.debug()) << "doAdvance aka tryAdvance. mAdvanceWork: "
<< mAdvanceWork;
do
{
mAdvanceWork = false; // If there's work to do, we'll make progress
bool progress = false;

auto const pubLedgers = findNewLedgersToPublish(sl);
JLOG(m_journal.debug()) << "doAdvance aka tryAdvance number of "
<< "ledgers to publish " << pubLedgers.size();
std::stringstream ss;
ss << "doAdvance aka tryAdvance ";
if (pubLedgers.empty())
{
ss << "no ledgers to publish. Figuring out why. ";
ss << "standalone_: " << standalone_
<< " isLoadedLocal: " << app_.getFeeTrack().isLoadedLocal()
<< " getJobCount(jtPUBOLDLEDGER): " << app_.getJobQueue().getJobCount(jtPUBOLDLEDGER)
<< " mValidLedgerSeq: " << mValidLedgerSeq
<< " mPubLedgerSeq: " << mPubLedgerSeq
<< " age: " << getValidatedLedgerAge().count()
<< " MAX_LEDGER_AGE_ACQUIRE: " << MAX_LEDGER_AGE_ACQUIRE.count()
<< " getWriteLoad(): " << app_.getNodeStore().getWriteLoad()
<< " MAX_WRITE_LOAD_ACQUIRE: " << MAX_WRITE_LOAD_ACQUIRE;
if (!standalone_ && !app_.getFeeTrack().isLoadedLocal() &&
(app_.getJobQueue().getJobCount(jtPUBOLDLEDGER) < 10) &&
(mValidLedgerSeq == mPubLedgerSeq) &&
(getValidatedLedgerAge() < MAX_LEDGER_AGE_ACQUIRE) &&
(app_.getNodeStore().getWriteLoad() < MAX_WRITE_LOAD_ACQUIRE))
{
ss << " We are in sync, so can acquire. ";
// We are in sync, so can acquire
InboundLedger::Reason reason = InboundLedger::Reason::HISTORY;
std::optional<std::uint32_t> missing;
Expand All @@ -2115,10 +2191,11 @@ LedgerMaster::doAdvance(std::unique_lock<std::recursive_mutex>& sl)
mPubLedger->info().seq,
app_.getNodeStore().earliestLedgerSeq());
}
ss << " missing? " << (missing.has_value() ? std::to_string(*missing) : "none");
if (missing)
{
JLOG(m_journal.trace())
<< "tryAdvance discovered missing " << *missing;
JLOG(m_journal.debug())
<< "doAdvance aka tryAdvance discovered missing " << *missing;
if ((mFillInProgress == 0 || *missing > mFillInProgress) &&
shouldAcquire(
mValidLedgerSeq,
Expand All @@ -2127,8 +2204,8 @@ LedgerMaster::doAdvance(std::unique_lock<std::recursive_mutex>& sl)
*missing,
m_journal))
{
JLOG(m_journal.trace())
<< "advanceThread should acquire";
JLOG(m_journal.debug())
<< "doAdvance aka advanceThread should acquire";
}
else
missing = std::nullopt;
Expand All @@ -2142,34 +2219,40 @@ LedgerMaster::doAdvance(std::unique_lock<std::recursive_mutex>& sl)
reason = InboundLedger::Reason::SHARD;
}
}
ss << " missing again: " << (missing.has_value() ? std::to_string(*missing) : "none");
if (missing)
{
ss << " fetchForHistory: " << *missing;
fetchForHistory(*missing, progress, reason, sl);
ss << " mValidLedgerSeq: " << mValidLedgerSeq
<< " mPubLedgerSeq: " << mPubLedgerSeq;
if (mValidLedgerSeq != mPubLedgerSeq)
{
JLOG(m_journal.debug())
<< "tryAdvance found last valid changed";
<< "doAdvance aka tryAdvance found last valid changed";
progress = true;
}
}
}
else
{
ss << " That big check evaluated false. ";
mHistLedger.reset();
mShardLedger.reset();
JLOG(m_journal.trace()) << "tryAdvance not fetching history";
JLOG(m_journal.debug()) << "doAdvance aka tryAdvance not fetching history";
}
}
else
{
JLOG(m_journal.trace()) << "tryAdvance found " << pubLedgers.size()
ss << " found ledgers to publish so things are probably good. ";
JLOG(m_journal.debug()) << "doAdvance aka tryAdvance found " << pubLedgers.size()
<< " ledgers to publish";
for (auto const& ledger : pubLedgers)
{
{
ScopedUnlock sul{sl};
JLOG(m_journal.debug())
<< "tryAdvance publishing seq " << ledger->info().seq;
<< "doAdvance aka tryAdvance publishing seq " << ledger->info().seq;
setFullLedger(ledger, true, true);
}

Expand All @@ -2184,6 +2267,7 @@ LedgerMaster::doAdvance(std::unique_lock<std::recursive_mutex>& sl)
app_.getOPs().clearNeedNetworkLedger();
progress = newPFWork("pf:newLedger", sl);
}
JLOG(m_journal.debug()) << ss.str();
if (progress)
mAdvanceWork = true;
} while (mAdvanceWork);
Expand Down Expand Up @@ -2400,6 +2484,7 @@ LedgerMaster::makeFetchPack(
<< "Built fetch pack with " << reply.objects().size() << " nodes ("
<< msg->getBufferSize() << " bytes)";

JLOG(m_journal.debug()) << "debugrelay send() 3";
peer->send(msg);
}
catch (std::exception const& ex)
Expand Down
3 changes: 2 additions & 1 deletion src/ripple/app/main/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,8 @@ class ApplicationImp : public Application, public BasicApp

, hashRouter_(std::make_unique<HashRouter>(
stopwatch(),
HashRouter::getDefaultHoldTime()))
HashRouter::getDefaultHoldTime(),
logs_->journal("HashRouter")))

, mValidations(
ValidationParms(),
Expand Down
Loading

0 comments on commit d27906b

Please sign in to comment.