Skip to content

Commit

Permalink
Merge bitcoin#25720: p2p: Reduce bandwidth during initial headers syn…
Browse files Browse the repository at this point in the history
…c when a block is found

f6a9166 Add functional test for block announcements during initial headers sync (Suhas Daftuar)
05f7f31 Reduce bandwidth during initial headers sync when a block is found (Suhas Daftuar)

Pull request description:

  On startup, if our headers chain is more than a day behind current time, we'll pick one peer to sync headers with until our best headers chain is caught up (at that point, we'll try to sync headers with all peers).

  However, if an INV for a block is received before our headers chain is caught up, we'll then start to sync headers from each peer announcing the block.  This can result in doing a big headers sync with many (if not all) of our peers simultaneously, which wastes bandwidth.

  This PR would reduce that overhead by picking (at most) one new peer to try syncing headers with whenever a new block is announced, prior to our headers chain being caught up.

ACKs for top commit:
  LarryRuane:
    ACK f6a9166
  ajtowns:
    ACK f6a9166
  mzumsande:
    ACK f6a9166
  dergoegge:
    Code review ACK f6a9166
  achow101:
    ACK f6a9166

Tree-SHA512: 0662000bd68db146f55981de4adc2e2b07cbfda222b1176569d61c22055e5556752ffd648426f69687ed1cc203105515e7304c12b915d6270df8e41a4a0e1eaa
  • Loading branch information
achow101 committed Aug 15, 2022
2 parents 6d4889a + f6a9166 commit 22d96d7
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 6 deletions.
39 changes: 33 additions & 6 deletions src/net_processing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,9 @@ struct Peer {
/** Set of txids to reconsider once their parent transactions have been accepted **/
std::set<uint256> m_orphan_work_set GUARDED_BY(g_cs_orphans);

/** Whether we've sent this peer a getheaders in response to an inv prior to initial-headers-sync completing */
bool m_inv_triggered_getheaders_before_sync{false};

/** Protects m_getdata_requests **/
Mutex m_getdata_requests_mutex;
/** Work queue of items requested by this peer **/
Expand Down Expand Up @@ -682,6 +685,9 @@ class PeerManagerImpl final : public PeerManager
/** Number of nodes with fSyncStarted. */
int nSyncStarted GUARDED_BY(cs_main) = 0;

/** Hash of the last block we received via INV */
uint256 m_last_block_inv_triggering_headers_sync{};

/**
* Sources of received blocks, saved to be able punish them when processing
* happens afterwards.
Expand Down Expand Up @@ -3240,8 +3246,9 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
UpdateBlockAvailability(pfrom.GetId(), inv.hash);
if (!fAlreadyHave && !fImporting && !fReindex && !IsBlockRequested(inv.hash)) {
// Headers-first is the primary method of announcement on
// the network. If a node fell back to sending blocks by inv,
// it's probably for a re-org. The final block hash
// the network. If a node fell back to sending blocks by
// inv, it may be for a re-org, or because we haven't
// completed initial headers sync. The final block hash
// provided should be the highest, so send a getheaders and
// then fetch the blocks we need to catch up.
best_block = &inv.hash;
Expand All @@ -3266,10 +3273,30 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
}

if (best_block != nullptr) {
if (MaybeSendGetHeaders(pfrom, m_chainman.ActiveChain().GetLocator(m_chainman.m_best_header), *peer)) {
LogPrint(BCLog::NET, "getheaders (%d) %s to peer=%d\n",
m_chainman.m_best_header->nHeight, best_block->ToString(),
pfrom.GetId());
// If we haven't started initial headers-sync with this peer, then
// consider sending a getheaders now. On initial startup, there's a
// reliability vs bandwidth tradeoff, where we are only trying to do
// initial headers sync with one peer at a time, with a long
// timeout (at which point, if the sync hasn't completed, we will
// disconnect the peer and then choose another). In the meantime,
// as new blocks are found, we are willing to add one new peer per
// block to sync with as well, to sync quicker in the case where
// our initial peer is unresponsive (but less bandwidth than we'd
// use if we turned on sync with all peers).
CNodeState& state{*Assert(State(pfrom.GetId()))};
if (state.fSyncStarted || (!peer->m_inv_triggered_getheaders_before_sync && *best_block != m_last_block_inv_triggering_headers_sync)) {
if (MaybeSendGetHeaders(pfrom, m_chainman.ActiveChain().GetLocator(m_chainman.m_best_header), *peer)) {
LogPrint(BCLog::NET, "getheaders (%d) %s to peer=%d\n",
m_chainman.m_best_header->nHeight, best_block->ToString(),
pfrom.GetId());
}
if (!state.fSyncStarted) {
peer->m_inv_triggered_getheaders_before_sync = true;
// Update the last block hash that triggered a new headers
// sync, so that we don't turn on headers sync with more
// than 1 new peer every new block.
m_last_block_inv_triggering_headers_sync = *best_block;
}
}
}

Expand Down
105 changes: 105 additions & 0 deletions test/functional/p2p_initial_headers_sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
#!/usr/bin/env python3
# Copyright (c) 2022 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Test initial headers download
Test that we only try to initially sync headers from one peer (until our chain
is close to caught up), and that each block announcement results in only one
additional peer receiving a getheaders message.
"""

from test_framework.test_framework import BitcoinTestFramework
from test_framework.messages import (
CInv,
MSG_BLOCK,
msg_headers,
msg_inv,
)
from test_framework.p2p import (
p2p_lock,
P2PInterface,
)
from test_framework.util import (
assert_equal,
)
import random

class HeadersSyncTest(BitcoinTestFramework):
def set_test_params(self):
self.setup_clean_chain = True
self.num_nodes = 1

def announce_random_block(self, peers):
new_block_announcement = msg_inv(inv=[CInv(MSG_BLOCK, random.randrange(1<<256))])
for p in peers:
p.send_and_ping(new_block_announcement)

def run_test(self):
self.log.info("Adding a peer to node0")
peer1 = self.nodes[0].add_p2p_connection(P2PInterface())

# Wait for peer1 to receive a getheaders
peer1.wait_for_getheaders()
# An empty reply will clear the outstanding getheaders request,
# allowing additional getheaders requests to be sent to this peer in
# the future.
peer1.send_message(msg_headers())

self.log.info("Connecting two more peers to node0")
# Connect 2 more peers; they should not receive a getheaders yet
peer2 = self.nodes[0].add_p2p_connection(P2PInterface())
peer3 = self.nodes[0].add_p2p_connection(P2PInterface())

all_peers = [peer1, peer2, peer3]

self.log.info("Verify that peer2 and peer3 don't receive a getheaders after connecting")
for p in all_peers:
p.sync_with_ping()
with p2p_lock:
assert "getheaders" not in peer2.last_message
assert "getheaders" not in peer3.last_message

with p2p_lock:
peer1.last_message.pop("getheaders", None)

self.log.info("Have all peers announce a new block")
self.announce_random_block(all_peers)

self.log.info("Check that peer1 receives a getheaders in response")
peer1.wait_for_getheaders()
peer1.send_message(msg_headers()) # Send empty response, see above
with p2p_lock:
peer1.last_message.pop("getheaders", None)

self.log.info("Check that exactly 1 of {peer2, peer3} received a getheaders in response")
count = 0
peer_receiving_getheaders = None
for p in [peer2, peer3]:
with p2p_lock:
if "getheaders" in p.last_message:
count += 1
peer_receiving_getheaders = p
p.last_message.pop("getheaders", None)
p.send_message(msg_headers()) # Send empty response, see above

assert_equal(count, 1)

self.log.info("Announce another new block, from all peers")
self.announce_random_block(all_peers)

self.log.info("Check that peer1 receives a getheaders in response")
peer1.wait_for_getheaders()

self.log.info("Check that the remaining peer received a getheaders as well")
expected_peer = peer2
if peer2 == peer_receiving_getheaders:
expected_peer = peer3

expected_peer.wait_for_getheaders()

self.log.info("Success!")

if __name__ == '__main__':
HeadersSyncTest().main()

1 change: 1 addition & 0 deletions test/functional/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@
'rpc_generate.py',
'wallet_balance.py --legacy-wallet',
'wallet_balance.py --descriptors',
'p2p_initial_headers_sync.py',
'feature_nulldummy.py',
'mempool_accept.py',
'mempool_expiry.py',
Expand Down

0 comments on commit 22d96d7

Please sign in to comment.