Skip to content

Commit

Permalink
partial bitcoin#27981: Fix potential network stalling bug
Browse files Browse the repository at this point in the history
To allow for the removal of a node from `vReceivableNodes`, the
collection of node pointers have been made into an `std::set`.

Marking as partial as it should be revisited when bitcoin#24356 is
backported.
  • Loading branch information
kwvg committed Sep 4, 2024
1 parent 13f6dc1 commit 8c986d6
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 25 deletions.
58 changes: 34 additions & 24 deletions src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -936,7 +936,7 @@ void V1TransportSerializer::prepareForTransport(CSerializedNetMsg& msg, std::vec
CVectorWriter{SER_NETWORK, INIT_PROTO_VERSION, header, 0, hdr};
}

size_t CConnman::SocketSendData(CNode& node)
std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
{
auto it = node.vSendMsg.begin();
size_t nSentSize = 0;
Expand Down Expand Up @@ -994,7 +994,7 @@ size_t CConnman::SocketSendData(CNode& node)
}
node.vSendMsg.erase(node.vSendMsg.begin(), it);
node.nSendMsgSize = node.vSendMsg.size();
return nSentSize;
return {nSentSize, !node.vSendMsg.empty()};
}

static bool ReverseCompareNodeMinPingTime(const NodeEvictionCandidate& a, const NodeEvictionCandidate& b)
Expand Down Expand Up @@ -1711,8 +1711,7 @@ bool CConnman::GenerateSelectSet(const std::vector<CNode*>& nodes,
recv_set.insert(hListenSocket.sock->Get());
}

for (CNode* pnode : nodes)
{
for (CNode* pnode : nodes) {
bool select_recv = !pnode->fHasRecvData;
bool select_send = !pnode->fCanSendData;

Expand Down Expand Up @@ -2027,9 +2026,9 @@ void CConnman::SocketHandlerConnected(const std::set<SOCKET>& recv_set,

if (interruptNet) return;

std::vector<CNode*> vErrorNodes;
std::vector<CNode*> vReceivableNodes;
std::vector<CNode*> vSendableNodes;
std::set<CNode*> vErrorNodes;
std::set<CNode*> vReceivableNodes;
std::set<CNode*> vSendableNodes;
{
LOCK(cs_mapSocketToNode);
for (auto hSocket : error_set) {
Expand All @@ -2038,7 +2037,7 @@ void CConnman::SocketHandlerConnected(const std::set<SOCKET>& recv_set,
continue;
}
it->second->AddRef();
vErrorNodes.emplace_back(it->second);
vErrorNodes.emplace(it->second);
}
for (auto hSocket : recv_set) {
if (error_set.count(hSocket)) {
Expand Down Expand Up @@ -2073,7 +2072,6 @@ void CConnman::SocketHandlerConnected(const std::set<SOCKET>& recv_set,
{
LOCK(cs_sendable_receivable_nodes);

vReceivableNodes.reserve(mapReceivableNodes.size());
for (auto it = mapReceivableNodes.begin(); it != mapReceivableNodes.end(); ) {
if (!it->second->fHasRecvData) {
it = mapReceivableNodes.erase(it);
Expand All @@ -2088,7 +2086,7 @@ void CConnman::SocketHandlerConnected(const std::set<SOCKET>& recv_set,
// receiving data (which should succeed as the socket signalled as receivable).
if (!it->second->fPauseRecv && it->second->nSendMsgSize == 0 && !it->second->fDisconnect) {
it->second->AddRef();
vReceivableNodes.emplace_back(it->second);
vReceivableNodes.emplace(it->second);
}
++it;
}
Expand All @@ -2099,7 +2097,6 @@ void CConnman::SocketHandlerConnected(const std::set<SOCKET>& recv_set,
// also clean up mapNodesWithDataToSend from nodes that had messages to send in the last iteration
// but don't have any in this iteration
LOCK(cs_mapNodesWithDataToSend);
vSendableNodes.reserve(mapNodesWithDataToSend.size());
for (auto it = mapNodesWithDataToSend.begin(); it != mapNodesWithDataToSend.end(); ) {
if (it->second->nSendMsgSize == 0) {
// See comment in PushMessage
Expand All @@ -2108,13 +2105,36 @@ void CConnman::SocketHandlerConnected(const std::set<SOCKET>& recv_set,
} else {
if (it->second->fCanSendData) {
it->second->AddRef();
vSendableNodes.emplace_back(it->second);
vSendableNodes.emplace(it->second);
}
++it;
}
}
}

for (CNode* pnode : vSendableNodes) {
if (interruptNet) {
break;
}

// Send data
auto [bytes_sent, data_left] = WITH_LOCK(pnode->cs_vSend, return SocketSendData(*pnode));
if (bytes_sent) {
RecordBytesSent(bytes_sent);

// If both receiving and (non-optimistic) sending were possible, we first attempt
// sending. If that succeeds, but does not fully drain the send queue, do not
// attempt to receive. This avoids needlessly queueing data if the remote peer
// is slow at receiving data, by means of TCP flow control. We only do this when
// sending actually succeeded to make sure progress is always made; otherwise a
// deadlock would be possible when both sides have data to send, but neither is
// receiving.
if (data_left && vReceivableNodes.erase(pnode)) {
pnode->Release();
}
}
}

for (CNode* pnode : vErrorNodes)
{
if (interruptNet) {
Expand All @@ -2136,16 +2156,6 @@ void CConnman::SocketHandlerConnected(const std::set<SOCKET>& recv_set,
SocketRecvData(pnode);
}

for (CNode* pnode : vSendableNodes) {
if (interruptNet) {
break;
}

// Send data
size_t bytes_sent = WITH_LOCK(pnode->cs_vSend, return SocketSendData(*pnode));
if (bytes_sent) RecordBytesSent(bytes_sent);
}

for (auto& node : vErrorNodes) {
node->Release();
}
Expand Down Expand Up @@ -4183,7 +4193,7 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)

{
LOCK(pnode->cs_vSend);
bool hasPendingData = !pnode->vSendMsg.empty();
bool optimisticSend(pnode->vSendMsg.empty());

//log total amount of bytes per message type
pnode->mapSendBytesPerMsgType[msg.m_type] += nTotalSize;
Expand All @@ -4206,7 +4216,7 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
}

// wake up select() call in case there was no pending data before (so it was not selecting this socket for sending)
if (!hasPendingData && (m_wakeup_pipe && m_wakeup_pipe->m_need_wakeup.load()))
if (optimisticSend && (m_wakeup_pipe && m_wakeup_pipe->m_need_wakeup.load()))
m_wakeup_pipe->Write();
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -1387,8 +1387,11 @@ friend class CNode;

NodeId GetNewNodeId();

size_t SocketSendData(CNode& node) EXCLUSIVE_LOCKS_REQUIRED(node.cs_vSend);
/** (Try to) send data from node's vSendMsg. Returns (bytes_sent, data_left). */
std::pair<size_t, bool> SocketSendData(CNode& node) const EXCLUSIVE_LOCKS_REQUIRED(node.cs_vSend);

size_t SocketRecvData(CNode* pnode) EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc);

void DumpAddresses();

// Network stats
Expand Down

0 comments on commit 8c986d6

Please sign in to comment.