Skip to content

Commit

Permalink
enable bft and better protocol version check
Browse files Browse the repository at this point in the history
  • Loading branch information
wanwiset25 committed Jun 28, 2024
1 parent ebb9a63 commit b5c7bd5
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 36 deletions.
8 changes: 4 additions & 4 deletions eth/downloader/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ func (ps *peerSet) HeaderIdlePeers() ([]*peerConnection, int) {
defer p.lock.RUnlock()
return p.headerThroughput
}
return ps.idlePeers(62, 101, idle, throughput)
return ps.idlePeers(62, 200, idle, throughput)
}

// BodyIdlePeers retrieves a flat list of all the currently body-idle peers within
Expand All @@ -491,7 +491,7 @@ func (ps *peerSet) BodyIdlePeers() ([]*peerConnection, int) {
defer p.lock.RUnlock()
return p.blockThroughput
}
return ps.idlePeers(62, 101, idle, throughput)
return ps.idlePeers(62, 200, idle, throughput)
}

// ReceiptIdlePeers retrieves a flat list of all the currently receipt-idle peers
Expand All @@ -505,7 +505,7 @@ func (ps *peerSet) ReceiptIdlePeers() ([]*peerConnection, int) {
defer p.lock.RUnlock()
return p.receiptThroughput
}
return ps.idlePeers(63, 101, idle, throughput)
return ps.idlePeers(63, 200, idle, throughput)
}

// NodeDataIdlePeers retrieves a flat list of all the currently node-data-idle
Expand All @@ -519,7 +519,7 @@ func (ps *peerSet) NodeDataIdlePeers() ([]*peerConnection, int) {
defer p.lock.RUnlock()
return p.stateThroughput
}
return ps.idlePeers(63, 101, idle, throughput)
return ps.idlePeers(63, 200, idle, throughput)
}

// idlePeers retrieves a flat list of all currently idle peers satisfying the
Expand Down
14 changes: 7 additions & 7 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
}

case p.version >= eth63 && msg.Code == GetNodeDataMsg:
case supportsEth63(p.version) && msg.Code == GetNodeDataMsg:
// Decode the retrieval message
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
if _, err := msgStream.List(); err != nil {
Expand All @@ -726,7 +726,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
return p.SendNodeData(data)

case p.version >= eth63 && msg.Code == NodeDataMsg:
case supportsEth63(p.version) && msg.Code == NodeDataMsg:
// A batch of node state data arrived to one of our previous requests
var data [][]byte
if err := msg.Decode(&data); err != nil {
Expand All @@ -737,7 +737,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
log.Debug("Failed to deliver node state data", "err", err)
}

case p.version >= eth63 && msg.Code == GetReceiptsMsg:
case supportsEth63(p.version) && msg.Code == GetReceiptsMsg:
// Decode the retrieval message
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
if _, err := msgStream.List(); err != nil {
Expand Down Expand Up @@ -773,7 +773,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
return p.SendReceiptsRLP(receipts)

case p.version >= eth63 && msg.Code == ReceiptsMsg:
case supportsEth63(p.version) && msg.Code == ReceiptsMsg:
// A batch of receipts arrived to one of our previous requests
var receipts [][]*types.Receipt
if err := msg.Decode(&receipts); err != nil {
Expand Down Expand Up @@ -847,7 +847,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
}

case msg.Code == NewPooledTransactionHashesMsg && p.version >= eth65:
case msg.Code == NewPooledTransactionHashesMsg && supportsEth65(p.version):
// New transaction announcement arrived, make sure we have
// a valid and fresh chain to handle them
if atomic.LoadUint32(&pm.acceptTxs) == 0 {
Expand All @@ -863,7 +863,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
pm.txFetcher.Notify(p.id, hashes)

case msg.Code == GetPooledTransactionsMsg && p.version >= eth65:
case msg.Code == GetPooledTransactionsMsg && supportsEth65(p.version):
// Decode the retrieval message
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
if _, err := msgStream.List(); err != nil {
Expand Down Expand Up @@ -899,7 +899,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
return p.SendPooledTransactionsRLP(hashes, txs)

case msg.Code == TransactionMsg || (msg.Code == PooledTransactionsMsg && p.version >= eth65):
case msg.Code == TransactionMsg || (msg.Code == PooledTransactionsMsg && supportsEth65(p.version)):
// Transactions arrived, make sure we have a valid and fresh chain to handle them
if atomic.LoadUint32(&pm.acceptTxs) == 0 {
break
Expand Down
45 changes: 21 additions & 24 deletions eth/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,23 +783,16 @@ func (p *peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis
)
go func() {
switch {
case p.version == xdpos2:
errc <- p2p.Send(p.rw, StatusMsg, &statusData63{
ProtocolVersion: uint32(p.version),
NetworkId: network,
TD: td,
CurrentBlock: head,
GenesisBlock: genesis,
})
case p.version == eth63:
errc <- p2p.Send(p.rw, StatusMsg, &statusData63{
case supportsEth65(p.version):
errc <- p2p.Send(p.rw, StatusMsg, &statusData{
ProtocolVersion: uint32(p.version),
NetworkId: network,
NetworkID: network,
TD: td,
CurrentBlock: head,
GenesisBlock: genesis,
Head: head,
Genesis: genesis,
ForkID: forkID,
})
case p.version >= eth64 || p.version >= xdpos22:
case supportsEth64(p.version):
errc <- p2p.Send(p.rw, StatusMsg, &statusData{
ProtocolVersion: uint32(p.version),
NetworkID: network,
Expand All @@ -808,18 +801,24 @@ func (p *peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis
Genesis: genesis,
ForkID: forkID,
})
case supportsEth63(p.version):
errc <- p2p.Send(p.rw, StatusMsg, &statusData63{
ProtocolVersion: uint32(p.version),
NetworkId: network,
TD: td,
CurrentBlock: head,
GenesisBlock: genesis,
})
default:
panic(fmt.Sprintf("unsupported eth protocol version: %d", p.version))
}
}()
go func() {
switch {
case p.version == xdpos2:
errc <- p.readStatusLegacy(network, &status63, genesis)
case p.version == eth63:
errc <- p.readStatusLegacy(network, &status63, genesis)
case p.version >= eth64 || p.version >= xdpos22: //include xdpos22 condition for completeness
case supportsEth64(p.version):
errc <- p.readStatus(network, &status, genesis, forkFilter)
case supportsEth63(p.version):
errc <- p.readStatusLegacy(network, &status63, genesis)
default:
panic(fmt.Sprintf("unsupported eth protocol version: %d", p.version))
}
Expand All @@ -837,12 +836,10 @@ func (p *peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis
}
}
switch {
case p.version == xdpos2:
p.td, p.head = status63.TD, status63.CurrentBlock
case p.version == eth63:
p.td, p.head = status63.TD, status63.CurrentBlock
case p.version >= eth64 || p.version >= xdpos22: //include xdpos22 for completeness
case supportsEth64(p.version):
p.td, p.head = status.TD, status.Head
case supportsEth63(p.version):
p.td, p.head = status63.TD, status63.CurrentBlock
default:
panic(fmt.Sprintf("unsupported eth protocol version: %d", p.version))
}
Expand Down
2 changes: 1 addition & 1 deletion eth/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func supportsEth63(version int) bool {
switch {
case version < 63:
return false
case version > 63:
case version >= 63:
return true
default:
return false
Expand Down
2 changes: 2 additions & 0 deletions eth/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,10 @@ func (pm *ProtocolManager) syncer() {
// Start and ensure cleanup of sync mechanisms
pm.blockFetcher.Start()
pm.txFetcher.Start()
pm.bft.Start()
defer pm.blockFetcher.Stop()
defer pm.txFetcher.Stop()
defer pm.bft.Stop()
defer pm.downloader.Terminate()

// Wait for different events to fire synchronisation operations
Expand Down

0 comments on commit b5c7bd5

Please sign in to comment.