From 0d2afeafb93b5d6eb94634f4f8861ea4b580603c Mon Sep 17 00:00:00 2001 From: Mark Travis Date: Thu, 14 Dec 2023 09:54:11 -0800 Subject: [PATCH] 2.1.0-rc1-debugrelay-8 Squashed previous debugrelay commits and rebased onto 2.1.0-rc1. Diagnostics for relaying transactions. Should show whether or not each transaction was relayed to each peer once committed locally. Log relay better. Log peer messages and charges. Diags for receiving tx. Log outgoing peer messages. Log every caller of Peer::send(). Try to find why ledger_data and validations messages are being spammed. Bump version to 2.0.0-rcX-debugrelay-2 2.0.0-rcX-debugrelay-3: logs for batch apply timer. 2.0.0-rcX-debugrelay-4: log batch size and dispatch state. a2.0.0-rcX-debugrelay-5: log condition for running transactionBatch() 2.0.0-rcX-debugrelay-6: log whether duplicate messages are being relayed and whether peers' send queue is being modified correctly. 2.0.0-rcX-debugrelay-7: atomically count all peer message queue modifications. --- src/ripple/app/consensus/RCLConsensus.cpp | 1 + src/ripple/app/ledger/impl/InboundLedger.cpp | 1 + src/ripple/app/ledger/impl/LedgerMaster.cpp | 2 + src/ripple/app/main/Application.cpp | 3 +- src/ripple/app/misc/HashRouter.cpp | 18 ++++- src/ripple/app/misc/HashRouter.h | 6 +- src/ripple/app/misc/NetworkOPs.cpp | 74 ++++++++++++++++--- src/ripple/app/misc/impl/ValidatorList.cpp | 1 + .../nodestore/impl/DatabaseShardImp.cpp | 1 + src/ripple/overlay/Message.h | 1 + src/ripple/overlay/impl/OverlayImpl.cpp | 44 +++++++++-- src/ripple/overlay/impl/PeerImp.cpp | 60 +++++++++++++-- src/ripple/overlay/impl/PeerImp.h | 3 + src/ripple/overlay/impl/PeerSet.cpp | 4 + src/ripple/protocol/impl/BuildInfo.cpp | 2 +- src/ripple/resource/impl/Logic.h | 6 ++ src/test/app/HashRouter_test.cpp | 13 ++-- 17 files changed, 208 insertions(+), 32 deletions(-) diff --git a/src/ripple/app/consensus/RCLConsensus.cpp b/src/ripple/app/consensus/RCLConsensus.cpp index e60c8cf37d3..b8b2417710b 100644 --- a/src/ripple/app/consensus/RCLConsensus.cpp +++ b/src/ripple/app/consensus/RCLConsensus.cpp @@ -740,6 +740,7 @@ RCLConsensus::Adaptor::notify( } s.set_firstseq(uMin); s.set_lastseq(uMax); + JLOG(j_.debug()) << "debugrelay send() 32"; app_.overlay().foreach( send_always(std::make_shared(s, protocol::mtSTATUS_CHANGE))); JLOG(j_.trace()) << "send status change to peer"; diff --git a/src/ripple/app/ledger/impl/InboundLedger.cpp b/src/ripple/app/ledger/impl/InboundLedger.cpp index 53475988cbf..ba1f91a5412 100644 --- a/src/ripple/app/ledger/impl/InboundLedger.cpp +++ b/src/ripple/app/ledger/impl/InboundLedger.cpp @@ -607,6 +607,7 @@ InboundLedger::trigger(std::shared_ptr const& peer, TriggerReason reason) if (auto p = app_.overlay().findPeerByShortID(id)) { mByHash = false; + JLOG(journal_.debug()) << "debugrelay send() 1"; p->send(packet); } }); diff --git a/src/ripple/app/ledger/impl/LedgerMaster.cpp b/src/ripple/app/ledger/impl/LedgerMaster.cpp index 9388a3005ba..81194552219 100644 --- a/src/ripple/app/ledger/impl/LedgerMaster.cpp +++ b/src/ripple/app/ledger/impl/LedgerMaster.cpp @@ -893,6 +893,7 @@ LedgerMaster::getFetchPack(LedgerIndex missing, InboundLedger::Reason reason) tmBH.set_ledgerhash(haveHash->begin(), 32); auto packet = std::make_shared(tmBH, protocol::mtGET_OBJECTS); + JLOG(m_journal.debug()) << "debugrelay send() 2"; target->send(packet); JLOG(m_journal.trace()) << "Requested fetch pack for " << missing; } @@ -2400,6 +2401,7 @@ LedgerMaster::makeFetchPack( << "Built fetch pack with " << reply.objects().size() << " nodes (" << msg->getBufferSize() << " bytes)"; + JLOG(m_journal.debug()) << "debugrelay send() 3"; peer->send(msg); } catch (std::exception const& ex) diff --git a/src/ripple/app/main/Application.cpp b/src/ripple/app/main/Application.cpp index 9aad155d876..0c5873198b7 100644 --- a/src/ripple/app/main/Application.cpp +++ b/src/ripple/app/main/Application.cpp @@ -457,7 +457,8 @@ class ApplicationImp : public Application, public BasicApp , hashRouter_(std::make_unique( stopwatch(), - HashRouter::getDefaultHoldTime())) + HashRouter::getDefaultHoldTime(), + logs_->journal("HashRouter"))) , mValidations( ValidationParms(), diff --git a/src/ripple/app/misc/HashRouter.cpp b/src/ripple/app/misc/HashRouter.cpp index 8085d6892ab..35932cea4da 100644 --- a/src/ripple/app/misc/HashRouter.cpp +++ b/src/ripple/app/misc/HashRouter.cpp @@ -18,6 +18,8 @@ //============================================================================== #include +#include +#include namespace ripple { @@ -122,10 +124,22 @@ HashRouter::shouldRelay(uint256 const& key) auto& s = emplace(key).first; + std::stringstream ss; + ss << "shouldRelay " << key; + std::optional> ret; if (!s.shouldRelay(suppressionMap_.clock().now(), holdTime_)) - return {}; + { + ss << " not recently relayed."; + } + else + { + ss << " recently relayed."; + ret = s.releasePeerSet(); + } - return s.releasePeerSet(); + ss << " relaying: " << (ret.has_value() ? "true" : "false"); + JLOG(j_.debug()) << ss.str(); + return ret; } } // namespace ripple diff --git a/src/ripple/app/misc/HashRouter.h b/src/ripple/app/misc/HashRouter.h index 8c546b2c51d..f3a98cb6374 100644 --- a/src/ripple/app/misc/HashRouter.h +++ b/src/ripple/app/misc/HashRouter.h @@ -25,6 +25,7 @@ #include #include #include +#include #include @@ -143,8 +144,10 @@ class HashRouter return 300s; } - HashRouter(Stopwatch& clock, std::chrono::seconds entryHoldTimeInSeconds) + HashRouter(Stopwatch& clock, std::chrono::seconds entryHoldTimeInSeconds, + beast::Journal j) : suppressionMap_(clock), holdTime_(entryHoldTimeInSeconds) + , j_(j) { } @@ -221,6 +224,7 @@ class HashRouter suppressionMap_; std::chrono::seconds const holdTime_; + beast::Journal j_; }; } // namespace ripple diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index 239fd6306e0..5d5f0eb650c 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -75,6 +75,7 @@ #include #include #include +#include #include #include #include @@ -611,7 +612,8 @@ class NetworkOPsImp final : public NetworkOPs boost::asio::steady_timer& timer, std::chrono::milliseconds const& expiry_time, std::function onExpire, - std::function onError); + std::function onError, + std::optional label = std::nullopt); void setHeartbeatTimer(); void @@ -938,15 +940,30 @@ NetworkOPsImp::setTimer( boost::asio::steady_timer& timer, const std::chrono::milliseconds& expiry_time, std::function onExpire, - std::function onError) + std::function onError, + std::optional label) { + if (label.has_value()) + { + JLOG(m_journal.debug()) << "setTimer setting " << *label; + } // Only start the timer if waitHandlerCounter_ is not yet joined. if (auto optionalCountedHandler = waitHandlerCounter_.wrap( - [this, onExpire, onError](boost::system::error_code const& e) { + [this, label, onExpire, onError](boost::system::error_code const& e) { if ((e.value() == boost::system::errc::success) && (!m_job_queue.isStopped())) { + if (label.has_value()) + { + JLOG(m_journal.debug()) + << "setTimer onExpire() starting " << *label; + } onExpire(); + if (label.has_value()) + { + JLOG(m_journal.debug()) + << "setTimer onExpire() completed " << *label; + } } // Recover as best we can if an unexpected error occurs. if (e.value() != boost::system::errc::success && @@ -956,12 +973,31 @@ NetworkOPsImp::setTimer( JLOG(m_journal.error()) << "Timer got error '" << e.message() << "'. Restarting timer."; + if (label.has_value()) + { + JLOG(m_journal.debug()) + << "setTimer onError() starting " << *label; + } onError(); + if (label.has_value()) + { + JLOG(m_journal.debug()) + << "setTimer onError() completed " << *label; + } } })) { + if (label.has_value()) + { + JLOG(m_journal.debug()) + << "setTimer setting expiration and waiting " << *label; + } timer.expires_from_now(expiry_time); timer.async_wait(std::move(*optionalCountedHandler)); + if (label.has_value()) + { + JLOG(m_journal.debug()) << "setTimer finished waiting " << *label; + } } } @@ -1106,6 +1142,7 @@ NetworkOPsImp::processClusterTimer() node.set_name(to_string(item.address)); node.set_cost(item.balance); } + JLOG(m_journal.debug()) << "debugrelay send() 37"; app_.overlay().foreach(send_if( std::make_shared(cluster, protocol::mtCLUSTER), peer_in_cluster())); @@ -1195,6 +1232,7 @@ NetworkOPsImp::processTransaction( bool bLocal, FailHard failType) { + JLOG(m_journal.debug()) << "processTransaction " << transaction->getID(); auto ev = m_job_queue.makeLoadEvent(jtTXN_PROC, "ProcessTXN"); auto const newFlags = app_.getHashRouter().getFlags(transaction->getID()); @@ -1221,7 +1259,7 @@ NetworkOPsImp::processTransaction( // Not concerned with local checks at this point. if (validity == Validity::SigBad) { - JLOG(m_journal.info()) << "Transaction has bad signature: " << reason; + JLOG(m_journal.info()) << "Transaction has bad signature: " << transaction->getID() << " " << reason; transaction->setStatus(INVALID); transaction->setResult(temBAD_SIGNATURE); app_.getHashRouter().setFlags(transaction->getID(), SF_BAD); @@ -1251,6 +1289,7 @@ NetworkOPsImp::doTransactionAsync( mTransactions.push_back( TransactionStatus(transaction, bUnlimited, false, failType)); transaction->setApplying(); + JLOG(m_journal.debug()) << "processTransaction batch size " << transaction->getID() << " " << mTransactions.size(); if (mDispatchState == DispatchState::none) { @@ -1328,6 +1367,7 @@ NetworkOPsImp::apply(std::unique_lock& batchLock) mDispatchState = DispatchState::running; batchLock.unlock(); + JLOG(m_journal.debug()) << "apply batch size " << transactions.size(); { std::unique_lock masterLock{app_.getMasterMutex(), std::defer_lock}; @@ -1348,6 +1388,7 @@ NetworkOPsImp::apply(std::unique_lock& batchLock) if (e.failType == FailHard::yes) flags |= tapFAIL_HARD; + JLOG(m_journal.debug()) << "apply in batch " << e.transaction->getID(); auto const result = app_.getTxQ().apply( app_, view, e.transaction->getSTransaction(), flags, j); e.result = result.first; @@ -1399,7 +1440,7 @@ NetworkOPsImp::apply(std::unique_lock& batchLock) if (e.result == tesSUCCESS) { JLOG(m_journal.debug()) - << "Transaction is now included in open ledger"; + << "Transaction is now included in open ledger " << e.transaction->getID(); e.transaction->setStatus(INCLUDED); auto const& txCur = e.transaction->getSTransaction(); @@ -1416,14 +1457,14 @@ NetworkOPsImp::apply(std::unique_lock& batchLock) else if (e.result == tefPAST_SEQ) { // duplicate or conflict - JLOG(m_journal.info()) << "Transaction is obsolete"; + JLOG(m_journal.info()) << "Transaction is obsolete " << e.transaction->getID(); e.transaction->setStatus(OBSOLETE); } else if (e.result == terQUEUED) { JLOG(m_journal.debug()) << "Transaction is likely to claim a" - << " fee, but is queued until fee drops"; + << " fee, but is queued until fee drops " << e.transaction->getID(); e.transaction->setStatus(HELD); // Add to held transactions, because it could get @@ -1439,7 +1480,7 @@ NetworkOPsImp::apply(std::unique_lock& batchLock) { // transaction should be held JLOG(m_journal.debug()) - << "Transaction should be held: " << e.result; + << "Transaction should be held: " << e.result << " " << e.transaction->getID(); e.transaction->setStatus(HELD); m_ledgerMaster.addHeldTransaction(e.transaction); e.transaction->setKept(); @@ -1448,7 +1489,7 @@ NetworkOPsImp::apply(std::unique_lock& batchLock) else { JLOG(m_journal.debug()) - << "Status other than success " << e.result; + << "Status other than success " << e.result << " " << e.transaction->getID(); e.transaction->setStatus(INVALID); } @@ -1463,6 +1504,15 @@ NetworkOPsImp::apply(std::unique_lock& batchLock) e.transaction->setKept(); } + std::stringstream ss; + ss << "DEBUGRELAY apply tx " << e.transaction->getID() + << ". applied,mMode,failtype != yes,local,result == terQUEUED,enforceFailHard: " + << e.applied << ',' << strOperatingMode() << ',' + << (e.failType != FailHard::yes) << ',' + << e.local << ',' + << (e.result == terQUEUED) << ',' + << enforceFailHard; + if ((e.applied || ((mMode != OperatingMode::FULL) && (e.failType != FailHard::yes) && e.local) || @@ -1471,9 +1521,11 @@ NetworkOPsImp::apply(std::unique_lock& batchLock) { auto const toSkip = app_.getHashRouter().shouldRelay(e.transaction->getID()); + ss << ". Possibly relaying. toSkip: " << toSkip.has_value(); if (toSkip) { + ss << ". Attempting to relay."; protocol::TMTransaction tx; Serializer s; @@ -1489,6 +1541,8 @@ NetworkOPsImp::apply(std::unique_lock& batchLock) } } + JLOG(m_journal.debug()) << ss.str(); + if (validatedLedgerIndex) { auto [fee, accountSeq, availableSeq] = @@ -1800,6 +1854,7 @@ NetworkOPsImp::switchLastClosedLedger( newLCL->info().parentHash.begin(), newLCL->info().parentHash.size()); s.set_ledgerhash(newLCL->info().hash.begin(), newLCL->info().hash.size()); + JLOG(m_journal.debug()) << "debugrelay send() 33"; app_.overlay().foreach( send_always(std::make_shared(s, protocol::mtSTATUS_CHANGE))); } @@ -1885,6 +1940,7 @@ NetworkOPsImp::mapComplete(std::shared_ptr const& map, bool fromAcquire) protocol::TMHaveTransactionSet msg; msg.set_hash(map->getHash().as_uint256().begin(), 256 / 8); msg.set_status(protocol::tsHAVE); + JLOG(m_journal.debug()) << "debugrelay send() 34"; app_.overlay().foreach( send_always(std::make_shared(msg, protocol::mtHAVE_SET))); diff --git a/src/ripple/app/misc/impl/ValidatorList.cpp b/src/ripple/app/misc/impl/ValidatorList.cpp index d17b85c4840..b7751ca17e5 100644 --- a/src/ripple/app/misc/impl/ValidatorList.cpp +++ b/src/ripple/app/misc/impl/ValidatorList.cpp @@ -712,6 +712,7 @@ ValidatorList::sendValidatorList( { if (message.message) { + JLOG(j.debug()) << "debugrelay send() 4"; peer.send(message.message); hashRouter.addSuppressionPeer(message.hash, peer.id()); sent = true; diff --git a/src/ripple/nodestore/impl/DatabaseShardImp.cpp b/src/ripple/nodestore/impl/DatabaseShardImp.cpp index 33000b5d24c..2f223a722c3 100644 --- a/src/ripple/nodestore/impl/DatabaseShardImp.cpp +++ b/src/ripple/nodestore/impl/DatabaseShardImp.cpp @@ -2205,6 +2205,7 @@ DatabaseShardImp::updatePeers(std::lock_guard const& lock) const app_.getOPs().getOperatingMode() != OperatingMode::DISCONNECTED) { auto const message{getShardInfo(lock)->makeMessage(app_)}; + JLOG(j_.debug()) << "debugrelay send() 35"; app_.overlay().foreach(send_always(std::make_shared( message, protocol::mtPEER_SHARD_INFO_V2))); } diff --git a/src/ripple/overlay/Message.h b/src/ripple/overlay/Message.h index 0d6479366e8..6a35a596b1e 100644 --- a/src/ripple/overlay/Message.h +++ b/src/ripple/overlay/Message.h @@ -136,6 +136,7 @@ class Message : public std::enable_shared_from_this * @param in Payload header pointer * @return Message type */ +public: int getType(std::uint8_t const* in) const; }; diff --git a/src/ripple/overlay/impl/OverlayImpl.cpp b/src/ripple/overlay/impl/OverlayImpl.cpp index 6ed046f0403..4af8f7a04c1 100644 --- a/src/ripple/overlay/impl/OverlayImpl.cpp +++ b/src/ripple/overlay/impl/OverlayImpl.cpp @@ -41,6 +41,8 @@ #include #include +#include + namespace ripple { namespace CrawlOptions { @@ -670,8 +672,10 @@ OverlayImpl::onManifests( } if (!relay.list().empty()) - for_each([m2 = std::make_shared(relay, protocol::mtMANIFESTS)]( - std::shared_ptr&& p) { p->send(m2); }); + for_each([&, m2 = std::make_shared(relay, protocol::mtMANIFESTS)]( + std::shared_ptr&& p) { + JLOG(journal_.debug()) << "debugrelay send() 7"; + p->send(m2); }); } void @@ -723,6 +727,7 @@ OverlayImpl::crawlShards(bool includePublicKey, std::uint32_t relays) } // Request peer shard info + JLOG(journal_.debug()) << "debugrelay send() 36"; foreach(send_always(std::make_shared( tmGPS, protocol::mtGET_PEER_SHARD_INFO_V2))); @@ -1224,7 +1229,9 @@ void OverlayImpl::broadcast(protocol::TMProposeSet& m) { auto const sm = std::make_shared(m, protocol::mtPROPOSE_LEDGER); - for_each([&](std::shared_ptr&& p) { p->send(sm); }); + for_each([&](std::shared_ptr&& p) { + JLOG(journal_.debug()) << "debugrelay send() 5"; + p->send(sm); }); } std::set @@ -1239,7 +1246,10 @@ OverlayImpl::relay( std::make_shared(m, protocol::mtPROPOSE_LEDGER, validator); for_each([&](std::shared_ptr&& p) { if (toSkip->find(p->id()) == toSkip->end()) + { + JLOG(journal_.debug()) << "debugrelay send() 9"; p->send(sm); + } }); return *toSkip; } @@ -1250,7 +1260,9 @@ void OverlayImpl::broadcast(protocol::TMValidation& m) { auto const sm = std::make_shared(m, protocol::mtVALIDATION); - for_each([sm](std::shared_ptr&& p) { p->send(sm); }); + for_each([&, sm](std::shared_ptr&& p) { + JLOG(journal_.debug()) << "debugrelay send() 6"; + p->send(sm); }); } std::set @@ -1265,7 +1277,10 @@ OverlayImpl::relay( std::make_shared(m, protocol::mtVALIDATION, validator); for_each([&](std::shared_ptr&& p) { if (toSkip->find(p->id()) == toSkip->end()) + { + JLOG(journal_.debug()) << "debugrelay send() 11"; p->send(sm); + } }); return *toSkip; } @@ -1317,13 +1332,20 @@ OverlayImpl::relay( auto peers = getActivePeers(toSkip, total, disabled, enabledInSkip); auto minRelay = app_.config().TX_REDUCE_RELAY_MIN_PEERS + disabled; + std::stringstream ss; + ss << "DEBUGRELAY relaying " << hash << " num peers " << peers.size() << " "; if (!app_.config().TX_REDUCE_RELAY_ENABLE || total <= minRelay) { for (auto const& p : peers) + { + ss << "peer: " << p->id() << ","; + JLOG(journal_.debug()) << "debugrelay send() 8"; p->send(sm); + } if (app_.config().TX_REDUCE_RELAY_ENABLE || app_.config().TX_REDUCE_RELAY_METRICS) txMetrics_.addMetrics(total, toSkip.size(), 0); + JLOG(journal_.debug()) << ss.str(); return; } @@ -1338,7 +1360,9 @@ OverlayImpl::relay( if (enabledTarget > enabledInSkip) std::shuffle(peers.begin(), peers.end(), default_prng()); - JLOG(journal_.trace()) << "relaying tx, total peers " << peers.size() +// std::stringstream ss; + ss << "have reduce relay relaying tx, total peers " << hash + << ',' << peers.size() << " selected " << enabledTarget << " skip " << toSkip.size() << " disabled " << disabled; @@ -1346,21 +1370,29 @@ OverlayImpl::relay( std::uint16_t enabledAndRelayed = enabledInSkip; for (auto const& p : peers) { + ss << "peer " << p->id() << " "; // always relay to a peer with the disabled feature if (!p->txReduceRelayEnabled()) { + ss << "reduceRelay not enabled"; + JLOG(journal_.debug()) << "debugrelay send() 38"; p->send(sm); } else if (enabledAndRelayed < enabledTarget) { + ss << "reducerelay enabled but less than target"; enabledAndRelayed++; + JLOG(journal_.debug()) << "debugrelay send() 10"; p->send(sm); } else { + ss << "adding to tx queue"; p->addTxQueue(hash); } + ss << ". "; } + JLOG(journal_.debug()) << ss.str(); } //------------------------------------------------------------------------------ @@ -1463,6 +1495,7 @@ OverlayImpl::unsquelch(PublicKey const& validator, Peer::id_t id) const { // optimize - multiple message with different // validator might be sent to the same peer + JLOG(journal_.debug()) << "debugrelay send() 13"; peer->send(makeSquelchMessage(validator, false, 0)); } } @@ -1476,6 +1509,7 @@ OverlayImpl::squelch( if (auto peer = findPeerByShortID(id); peer && app_.config().VP_REDUCE_RELAY_SQUELCH) { + JLOG(journal_.debug()) << "debugrelay send() 12"; peer->send(makeSquelchMessage(validator, true, squelchDuration)); } } diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index 0d58a10abac..35fc50a85e8 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -241,7 +241,10 @@ void PeerImp::send(std::shared_ptr const& m) { if (!strand_.running_in_this_thread()) + { + JLOG(journal_.debug()) << "debugrelay send() 28"; return post(strand_, std::bind(&PeerImp::send, shared_from_this(), m)); + } if (gracefulClose_) return; if (detaching_) @@ -274,10 +277,23 @@ PeerImp::send(std::shared_ptr const& m) } send_queue_.push(m); + ++send_queue_size_; if (sendq_size != 0) + { + JLOG(journal_.debug()) << "send not sending type: " + << protocolMessageName(m->getType(&m->getBuffer(Compressed::Off)[0])) + << ". Prior to push, q size was " << sendq_size << ". now it's " + << send_queue_.size() << " and tracking counter should be equal: " + << send_queue_size_ + << " peer id " << id(); return; + } + JLOG(journal_.debug()) << "sending message type " << protocolMessageName(m->getType(&m->getBuffer(Compressed::Off)[0])) + << ". just pushed q size " << send_queue_.size() + << " and tracking counter should be equal: " << send_queue_size_ + << " peer id " << id(); boost::asio::async_write( stream_, boost::asio::buffer( @@ -306,6 +322,7 @@ PeerImp::sendTxQueue() }); JLOG(p_journal_.trace()) << "sendTxQueue " << txQueue_.size(); txQueue_.clear(); + JLOG(journal_.debug()) << "debugrelay send() 30"; send(std::make_shared(ht, protocol::mtHAVE_TRANSACTIONS)); } } @@ -742,6 +759,7 @@ PeerImp::onTimer(error_code const& ec) message.set_type(protocol::TMPing::ptPING); message.set_seq(*lastPingSeq_); + JLOG(journal_.debug()) << "debugrelay send() 26"; send(std::make_shared(message, protocol::mtPING)); setTimer(); @@ -878,11 +896,15 @@ PeerImp::doProtocolStart() } if (auto m = overlay_.getManifestsMessage()) + { + JLOG(journal_.debug()) << "debugrelay send() 40"; send(m); + } // Request shard info from peer protocol::TMGetPeerShardInfoV2 tmGPS; tmGPS.set_relays(0); + JLOG(journal_.debug()) << "debugrelay send() 20"; send(std::make_shared(tmGPS, protocol::mtGET_PEER_SHARD_INFO_V2)); setTimer(); @@ -965,7 +987,12 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred) metrics_.sent.add_message(bytes_transferred); assert(!send_queue_.empty()); + if (!send_queue_.empty()) + --send_queue_size_; send_queue_.pop(); + JLOG(journal_.debug()) << "onWriteMessage just popped q size " << send_queue_.size() + << ". tracking counter should be equal: " << send_queue_size_ + << " peerid " << id(); if (!send_queue_.empty()) { // Timeout on writes only @@ -1013,8 +1040,9 @@ PeerImp::onMessageBegin( std::size_t uncompressed_size, bool isCompressed) { + auto const type_name = protocolMessageName(type); load_event_ = - app_.getJobQueue().makeLoadEvent(jtPEER, protocolMessageName(type)); + app_.getJobQueue().makeLoadEvent(jtPEER, type_name); fee_ = Resource::feeLightPeer; auto const category = TrafficCount::categorize(*m, type, true); overlay_.reportTraffic(category, true, static_cast(size)); @@ -1035,8 +1063,8 @@ PeerImp::onMessageBegin( overlay_.addTxMetrics( static_cast(type), static_cast(size)); } - JLOG(journal_.trace()) << "onMessageBegin: " << type << " " << size << " " - << uncompressed_size << " " << isCompressed; + JLOG(journal_.debug()) << "onMessageBegin: typename size uncompressed size isCompressed address " << type_name << " " << size << " " + << uncompressed_size << " " << isCompressed << " " << usage_.to_string(); } void @@ -1076,6 +1104,7 @@ PeerImp::onMessage(std::shared_ptr const& m) // We have received a ping request, reply with a pong fee_ = Resource::feeMediumBurdenPeer; m->set_type(protocol::TMPing::ptPONG); + JLOG(journal_.debug()) << "debugrelay send() 24"; send(std::make_shared(*m, protocol::mtPING)); return; } @@ -1237,6 +1266,7 @@ PeerImp::onMessage(std::shared_ptr const& m) auto reply{shardStore->getShardInfo()->makeMessage(app_)}; if (peerChainSz > 0) *(reply.mutable_peerchain()) = m->peerchain(); + JLOG(journal_.debug()) << "debugrelay send() 23"; send(std::make_shared(reply, protocol::mtPEER_SHARD_INFO_V2)); } @@ -1252,6 +1282,7 @@ PeerImp::onMessage(std::shared_ptr const& m) // Relay the request to peers, exclude the peer chain m->set_relays(m->relays() - 1); + JLOG(journal_.debug()) << "debugrelay send() 31"; overlay_.foreach(send_if_not( std::make_shared(*m, protocol::mtGET_PEER_SHARD_INFO_V2), [&](std::shared_ptr const& peer) { @@ -1428,6 +1459,7 @@ PeerImp::onMessage(std::shared_ptr const& m) if (auto peer = overlay_.findPeerByPublicKey(peerPubKey)) { m->mutable_peerchain()->RemoveLast(); + JLOG(p_journal_.debug()) << "debugrelay send() 16"; peer->send( std::make_shared(*m, protocol::mtPEER_SHARD_INFO_V2)); JLOG(p_journal_.trace()) @@ -1725,7 +1757,7 @@ PeerImp::onMessage(std::shared_ptr const& m) fee_ = Resource::feeMediumBurdenPeer; std::weak_ptr weak = shared_from_this(); app_.getJobQueue().addJob( - jtREPLAY_REQ, "recvProofPathRequest", [weak, m]() { + jtREPLAY_REQ, "recvProofPathRequest", [&, weak, m]() { if (auto peer = weak.lock()) { auto reply = @@ -1739,6 +1771,7 @@ PeerImp::onMessage(std::shared_ptr const& m) } else { + JLOG(p_journal_.debug()) << "debugrelay send() 18"; peer->send(std::make_shared( reply, protocol::mtPROOF_PATH_RESPONSE)); } @@ -1774,7 +1807,7 @@ PeerImp::onMessage(std::shared_ptr const& m) fee_ = Resource::feeMediumBurdenPeer; std::weak_ptr weak = shared_from_this(); app_.getJobQueue().addJob( - jtREPLAY_REQ, "recvReplayDeltaRequest", [weak, m]() { + jtREPLAY_REQ, "recvReplayDeltaRequest", [&, weak, m]() { if (auto peer = weak.lock()) { auto reply = @@ -1788,6 +1821,7 @@ PeerImp::onMessage(std::shared_ptr const& m) } else { + JLOG(p_journal_.debug()) << "debugrelay send() 15"; peer->send(std::make_shared( reply, protocol::mtREPLAY_DELTA_RESPONSE)); } @@ -1879,6 +1913,7 @@ PeerImp::onMessage(std::shared_ptr const& m) if (auto peer = overlay_.findPeerByShortID(m->requestcookie())) { m->clear_requestcookie(); + JLOG(p_journal_.debug()) << "debugrelay send() 17"; peer->send(std::make_shared(*m, protocol::mtLEDGER_DATA)); } else @@ -2733,6 +2768,7 @@ PeerImp::onMessage(std::shared_ptr const& m) JLOG(p_journal_.trace()) << "GetObj: " << reply.objects_size() << " of " << packet.objects_size(); + JLOG(journal_.debug()) << "debugrelay send() 25"; send(std::make_shared(reply, protocol::mtGET_OBJECTS)); } else @@ -2858,7 +2894,10 @@ PeerImp::handleHaveTransactions( << "transaction request object is " << tmBH.objects_size(); if (tmBH.objects_size() > 0) + { + JLOG(journal_.debug()) << "debugrelay send() 22"; send(std::make_shared(tmBH, protocol::mtGET_OBJECTS)); + } } void @@ -3042,7 +3081,10 @@ PeerImp::doTransactions( } if (reply.transactions_size() > 0) + { + JLOG(journal_.debug()) << "debugrelay send() 21"; send(std::make_shared(reply, protocol::mtTRANSACTIONS)); + } } void @@ -3076,7 +3118,7 @@ PeerImp::checkTransaction( { if (!validReason.empty()) { - JLOG(p_journal_.trace()) + JLOG(p_journal_.debug()) << "Exception checking transaction: " << validReason; } @@ -3099,7 +3141,7 @@ PeerImp::checkTransaction( { if (!reason.empty()) { - JLOG(p_journal_.trace()) + JLOG(p_journal_.debug()) << "Exception checking transaction: " << reason; } app_.getHashRouter().setFlags(stx->getTransactionID(), SF_BAD); @@ -3295,6 +3337,7 @@ PeerImp::sendLedgerBase( auto message{ std::make_shared(ledgerData, protocol::mtLEDGER_DATA)}; + JLOG(journal_.debug()) << "debugrelay send() 29"; send(message); } @@ -3340,6 +3383,7 @@ PeerImp::getLedger(std::shared_ptr const& m) this)) { m->set_requestcookie(id()); + JLOG(p_journal_.debug()) << "debugrelay send() 14"; peer->send(std::make_shared( *m, protocol::mtGET_LEDGER)); JLOG(p_journal_.debug()) @@ -3425,6 +3469,7 @@ PeerImp::getTxSet(std::shared_ptr const& m) const if (auto const peer = getPeerWithTree(overlay_, txSetHash, this)) { m->set_requestcookie(id()); + JLOG(p_journal_.debug()) << "debugrelay send() 39"; peer->send( std::make_shared(*m, protocol::mtGET_LEDGER)); JLOG(p_journal_.debug()) << "getTxSet: Request relayed"; @@ -3613,6 +3658,7 @@ PeerImp::processLedgerRequest(std::shared_ptr const& m) << ledgerData.nodes_size() << " nodes"; } + JLOG(journal_.debug()) << "debugrelay send() 27"; send(std::make_shared(ledgerData, protocol::mtLEDGER_DATA)); } diff --git a/src/ripple/overlay/impl/PeerImp.h b/src/ripple/overlay/impl/PeerImp.h index 710ab4d74d6..60383dbf105 100644 --- a/src/ripple/overlay/impl/PeerImp.h +++ b/src/ripple/overlay/impl/PeerImp.h @@ -40,6 +40,7 @@ #include #include #include +#include #include #include #include @@ -156,6 +157,7 @@ class PeerImp : public Peer, http_response_type response_; boost::beast::http::fields const& headers_; std::queue> send_queue_; + std::atomic send_queue_size_ {0}; bool gracefulClose_ = false; int large_sendq_ = 0; std::unique_ptr load_event_; @@ -733,6 +735,7 @@ PeerImp::sendEndpoints(FwdIt first, FwdIt last) } tm.set_version(2); + JLOG(journal_.debug()) << "debugrelay send() 42"; send(std::make_shared(tm, protocol::mtENDPOINTS)); } diff --git a/src/ripple/overlay/impl/PeerSet.cpp b/src/ripple/overlay/impl/PeerSet.cpp index de5c3cd9f93..9f246cc6272 100644 --- a/src/ripple/overlay/impl/PeerSet.cpp +++ b/src/ripple/overlay/impl/PeerSet.cpp @@ -106,6 +106,7 @@ PeerSetImpl::sendRequest( auto packet = std::make_shared(message, type); if (peer) { + JLOG(journal_.debug()) << "debugrelay send() 19"; peer->send(packet); return; } @@ -113,7 +114,10 @@ PeerSetImpl::sendRequest( for (auto id : peers_) { if (auto p = app_.overlay().findPeerByShortID(id)) + { + JLOG(journal_.debug()) << "debugrelay send() 41"; p->send(packet); + } } } diff --git a/src/ripple/protocol/impl/BuildInfo.cpp b/src/ripple/protocol/impl/BuildInfo.cpp index be7404c7d40..fe1d555bae5 100644 --- a/src/ripple/protocol/impl/BuildInfo.cpp +++ b/src/ripple/protocol/impl/BuildInfo.cpp @@ -33,7 +33,7 @@ namespace BuildInfo { // and follow the format described at http://semver.org/ //------------------------------------------------------------------------------ // clang-format off -char const* const versionString = "2.1.0-rc1" +char const* const versionString = "2.1.0-rc1-debugrelay-8" // clang-format on #if defined(DEBUG) || defined(SANITIZER) diff --git a/src/ripple/resource/impl/Logic.h b/src/ripple/resource/impl/Logic.h index 07d89403b6a..0f78b6a3089 100644 --- a/src/ripple/resource/impl/Logic.h +++ b/src/ripple/resource/impl/Logic.h @@ -498,6 +498,12 @@ class Logic ++m_stats.drop; drop = true; } + else + { + JLOG(m_journal.debug()) << "Consumer entry " << entry + << " not dropped with balance " << balance << + " below drop threshold " << dropThreshold; + } return drop; } diff --git a/src/test/app/HashRouter_test.cpp b/src/test/app/HashRouter_test.cpp index 96d14e824cf..9b06e6c0253 100644 --- a/src/test/app/HashRouter_test.cpp +++ b/src/test/app/HashRouter_test.cpp @@ -20,6 +20,7 @@ #include #include #include +#include namespace ripple { namespace test { @@ -31,7 +32,7 @@ class HashRouter_test : public beast::unit_test::suite { using namespace std::chrono_literals; TestStopwatch stopwatch; - HashRouter router(stopwatch, 2s); + HashRouter router(stopwatch, 2s, test::SuiteJournal("TestHashRouter", *this)); uint256 const key1(1); uint256 const key2(2); @@ -68,7 +69,7 @@ class HashRouter_test : public beast::unit_test::suite { using namespace std::chrono_literals; TestStopwatch stopwatch; - HashRouter router(stopwatch, 2s); + HashRouter router(stopwatch, 2s, test::SuiteJournal("TestHashRouter", *this)); uint256 const key1(1); uint256 const key2(2); @@ -146,7 +147,7 @@ class HashRouter_test : public beast::unit_test::suite // Normal HashRouter using namespace std::chrono_literals; TestStopwatch stopwatch; - HashRouter router(stopwatch, 2s); + HashRouter router(stopwatch, 2s, test::SuiteJournal("TestHashRouter", *this)); uint256 const key1(1); uint256 const key2(2); @@ -174,7 +175,7 @@ class HashRouter_test : public beast::unit_test::suite { using namespace std::chrono_literals; TestStopwatch stopwatch; - HashRouter router(stopwatch, 2s); + HashRouter router(stopwatch, 2s, test::SuiteJournal("TestHashRouter", *this)); uint256 const key1(1); BEAST_EXPECT(router.setFlags(key1, 10)); @@ -187,7 +188,7 @@ class HashRouter_test : public beast::unit_test::suite { using namespace std::chrono_literals; TestStopwatch stopwatch; - HashRouter router(stopwatch, 1s); + HashRouter router(stopwatch, 1s, test::SuiteJournal("TestHashRouter", *this)); uint256 const key1(1); @@ -230,7 +231,7 @@ class HashRouter_test : public beast::unit_test::suite { using namespace std::chrono_literals; TestStopwatch stopwatch; - HashRouter router(stopwatch, 5s); + HashRouter router(stopwatch, 5s, test::SuiteJournal("TestHashRouter", *this)); uint256 const key(1); HashRouter::PeerShortID peer = 1; int flags;