Skip to content

Commit

Permalink
2.0.0-rcX-debugrelay-7: atomically count all peer message queue
Browse files Browse the repository at this point in the history
modifications.
  • Loading branch information
mtrippled committed Feb 7, 2024
1 parent 81760bc commit 2298b25
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 6 deletions.
2 changes: 1 addition & 1 deletion src/ripple/app/ledger/impl/LedgerMaster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -896,8 +896,8 @@ LedgerMaster::getFetchPack(LedgerIndex missing, InboundLedger::Reason reason)
tmBH.set_ledgerhash(haveHash->begin(), 32);
auto packet = std::make_shared<Message>(tmBH, protocol::mtGET_OBJECTS);

target->send(packet);
JLOG(m_journal.debug()) << "debugrelay send() 2";
target->send(packet);
JLOG(m_journal.trace()) << "Requested fetch pack for " << missing;
}
else
Expand Down
4 changes: 3 additions & 1 deletion src/ripple/overlay/impl/OverlayImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1277,8 +1277,10 @@ OverlayImpl::relay(
std::make_shared<Message>(m, protocol::mtVALIDATION, validator);
for_each([&](std::shared_ptr<PeerImp>&& p) {
if (toSkip->find(p->id()) == toSkip->end())
{
JLOG(journal_.debug()) << "debugrelay send() 11";
p->send(sm);
p->send(sm);
}
});
return *toSkip;
}
Expand Down
15 changes: 12 additions & 3 deletions src/ripple/overlay/impl/PeerImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,18 +279,23 @@ PeerImp::send(std::shared_ptr<Message> const& m)
}

send_queue_.push(m);
++send_queue_size_;

if (sendq_size != 0)
{
JLOG(journal_.debug()) << "send not sending type: "
<< protocolMessageName(m->getType(&m->getBuffer(Compressed::Off)[0]))
<< ". Prior to push, q size was " << sendq_size << ". now it's "
<< send_queue_.size();
<< send_queue_.size() << " and tracking counter should be equal: "
<< send_queue_size_
<< " peer id " << id();
return;
}

JLOG(journal_.debug()) << "sending message type " << protocolMessageName(m->getType(&m->getBuffer(Compressed::Off)[0]))
<< ". just pushed q size " << send_queue_.size();
<< ". just pushed q size " << send_queue_.size()
<< " and tracking counter should be equal: " << send_queue_size_
<< " peer id " << id();
boost::asio::async_write(
stream_,
boost::asio::buffer(
Expand Down Expand Up @@ -984,8 +989,12 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
metrics_.sent.add_message(bytes_transferred);

assert(!send_queue_.empty());
if (!send_queue_.empty())
--send_queue_size_;
send_queue_.pop();
JLOG(journal_.debug()) << "onWriteMessage just popped q size " << send_queue_.size();
JLOG(journal_.debug()) << "onWriteMessage just popped q size " << send_queue_.size()
<< ". tracking counter should be equal: " << send_queue_size_
<< " peerid " << id();
if (!send_queue_.empty())
{
// Timeout on writes only
Expand Down
2 changes: 2 additions & 0 deletions src/ripple/overlay/impl/PeerImp.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include <boost/circular_buffer.hpp>
#include <boost/endian/conversion.hpp>
#include <boost/thread/shared_mutex.hpp>
#include <atomic>
#include <cstdint>
#include <optional>
#include <queue>
Expand Down Expand Up @@ -156,6 +157,7 @@ class PeerImp : public Peer,
http_response_type response_;
boost::beast::http::fields const& headers_;
std::queue<std::shared_ptr<Message>> send_queue_;
std::atomic<std::uint64_t> send_queue_size_ {0};
bool gracefulClose_ = false;
int large_sendq_ = 0;
std::unique_ptr<LoadEvent> load_event_;
Expand Down
2 changes: 1 addition & 1 deletion src/ripple/protocol/impl/BuildInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace BuildInfo {
// and follow the format described at http://semver.org/
//------------------------------------------------------------------------------
// clang-format off
char const* const versionString = "2.0.0-rcX-debugrelay-6"
char const* const versionString = "2.0.0-rcX-debugrelay-7"
// clang-format on

#if defined(DEBUG) || defined(SANITIZER)
Expand Down

0 comments on commit 2298b25

Please sign in to comment.