Skip to content

Commit

Permalink
add sub protocol to disable tx broadcast
Browse files Browse the repository at this point in the history
  • Loading branch information
yutianwu committed Sep 7, 2021
1 parent bca9678 commit 4a9ede4
Show file tree
Hide file tree
Showing 11 changed files with 104 additions and 21 deletions.
1 change: 1 addition & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
Checkpoint: checkpoint,
Whitelist: config.Whitelist,
DirectBroadcast: config.DirectBroadcast,
SubVersion: config.SubVersion,
}); err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions eth/downloader/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ func (ps *peerSet) HeaderIdlePeers() ([]*peerConnection, int) {
defer p.lock.RUnlock()
return p.headerThroughput
}
return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput)
return ps.idlePeers(eth.ETH65, eth.ETH67, idle, throughput)
}

// BodyIdlePeers retrieves a flat list of all the currently body-idle peers within
Expand All @@ -471,7 +471,7 @@ func (ps *peerSet) BodyIdlePeers() ([]*peerConnection, int) {
defer p.lock.RUnlock()
return p.blockThroughput
}
return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput)
return ps.idlePeers(eth.ETH65, eth.ETH67, idle, throughput)
}

// ReceiptIdlePeers retrieves a flat list of all the currently receipt-idle peers
Expand All @@ -485,7 +485,7 @@ func (ps *peerSet) ReceiptIdlePeers() ([]*peerConnection, int) {
defer p.lock.RUnlock()
return p.receiptThroughput
}
return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput)
return ps.idlePeers(eth.ETH65, eth.ETH67, idle, throughput)
}

// NodeDataIdlePeers retrieves a flat list of all the currently node-data-idle
Expand All @@ -499,7 +499,7 @@ func (ps *peerSet) NodeDataIdlePeers() ([]*peerConnection, int) {
defer p.lock.RUnlock()
return p.stateThroughput
}
return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput)
return ps.idlePeers(eth.ETH65, eth.ETH67, idle, throughput)
}

// idlePeers retrieves a flat list of all currently idle peers satisfying the
Expand Down
5 changes: 3 additions & 2 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,9 @@ type Config struct {
Genesis *core.Genesis `toml:",omitempty"`

// Protocol options
NetworkId uint64 // Network ID to use for selecting peers to connect to
SyncMode downloader.SyncMode
NetworkId uint64 // Network ID to use for selecting peers to connect to
SyncMode downloader.SyncMode
SubVersion uint64

// This can be set to list of enrtree:// URLs which will be queried for
// for nodes to connect to.
Expand Down
6 changes: 6 additions & 0 deletions eth/ethconfig/gen_config.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,13 @@ type handlerConfig struct {
Checkpoint *params.TrustedCheckpoint // Hard coded checkpoint for sync challenges
Whitelist map[uint64]common.Hash // Hard coded whitelist for sync challenged
DirectBroadcast bool
SubVersion uint64
}

type handler struct {
networkID uint64
forkFilter forkid.Filter // Fork ID filter, constant across the lifetime of the node
subVersion uint64

fastSync uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks)
snapSync uint32 // Flag whether fast sync should operate on top of the snap protocol
Expand Down Expand Up @@ -136,6 +138,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
h := &handler{
networkID: config.Network,
forkFilter: forkid.NewFilter(config.Chain),
subVersion: config.SubVersion,
eventMux: config.EventMux,
database: config.Database,
txpool: config.TxPool,
Expand Down Expand Up @@ -262,7 +265,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
td = h.chain.GetTd(hash, number)
)
forkID := forkid.NewID(h.chain.Config(), h.chain.Genesis().Hash(), h.chain.CurrentHeader().Number.Uint64())
if err := peer.Handshake(h.networkID, td, hash, genesis.Hash(), forkID, h.forkFilter); err != nil {
if err := peer.Handshake(h.networkID, td, hash, genesis.Hash(), forkID, h.forkFilter, h.subVersion); err != nil {
peer.Log().Debug("Ethereum handshake failed", "err", err)
return err
}
Expand Down
10 changes: 5 additions & 5 deletions eth/handler_eth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func testRecvTransactions(t *testing.T, protocol uint) {
head = handler.chain.CurrentBlock()
td = handler.chain.GetTd(head.Hash(), head.NumberU64())
)
if err := src.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain)); err != nil {
if err := src.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), 0); err != nil {
t.Fatalf("failed to run protocol handshake")
}
// Send the transaction to the sink and verify that it's added to the tx pool
Expand Down Expand Up @@ -333,7 +333,7 @@ func testSendTransactions(t *testing.T, protocol uint) {
head = handler.chain.CurrentBlock()
td = handler.chain.GetTd(head.Hash(), head.NumberU64())
)
if err := sink.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain)); err != nil {
if err := sink.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), 0); err != nil {
t.Fatalf("failed to run protocol handshake")
}
// After the handshake completes, the source handler should stream the sink
Expand Down Expand Up @@ -532,7 +532,7 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo
head = handler.chain.CurrentBlock()
td = handler.chain.GetTd(head.Hash(), head.NumberU64())
)
if err := remote.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain)); err != nil {
if err := remote.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), 0); err != nil {
t.Fatalf("failed to run protocol handshake")
}
// Connect a new peer and check that we receive the checkpoint challenge
Expand Down Expand Up @@ -616,7 +616,7 @@ func testBroadcastBlock(t *testing.T, peers, bcasts int) {
go source.handler.runEthPeer(sourcePeer, func(peer *eth.Peer) error {
return eth.Handle((*ethHandler)(source.handler), peer)
})
if err := sinkPeer.Handshake(1, td, genesis.Hash(), genesis.Hash(), forkid.NewIDWithChain(source.chain), forkid.NewFilter(source.chain)); err != nil {
if err := sinkPeer.Handshake(1, td, genesis.Hash(), genesis.Hash(), forkid.NewIDWithChain(source.chain), forkid.NewFilter(source.chain), 0); err != nil {
t.Fatalf("failed to run protocol handshake")
}
go eth.Handle(sink, sinkPeer)
Expand Down Expand Up @@ -689,7 +689,7 @@ func testBroadcastMalformedBlock(t *testing.T, protocol uint) {
genesis = source.chain.Genesis()
td = source.chain.GetTd(genesis.Hash(), genesis.NumberU64())
)
if err := sink.Handshake(1, td, genesis.Hash(), genesis.Hash(), forkid.NewIDWithChain(source.chain), forkid.NewFilter(source.chain)); err != nil {
if err := sink.Handshake(1, td, genesis.Hash(), genesis.Hash(), forkid.NewIDWithChain(source.chain), forkid.NewFilter(source.chain), 0); err != nil {
t.Fatalf("failed to run protocol handshake")
}
// After the handshake completes, the source handler should stream the sink
Expand Down
52 changes: 51 additions & 1 deletion eth/protocols/eth/handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const (

// Handshake executes the eth protocol handshake, negotiating version number,
// network IDs, difficulties, head and genesis blocks.
func (p *Peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis common.Hash, forkID forkid.ID, forkFilter forkid.Filter) error {
func (p *Peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis common.Hash, forkID forkid.ID, forkFilter forkid.Filter, subVersion uint64) error {
// Send out own handshake in a new thread
errc := make(chan error, 2)

Expand Down Expand Up @@ -68,6 +68,37 @@ func (p *Peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis
}
p.td, p.head = status.TD, status.Head

if p.version >= ETH67 {
var upgradeStatus UpgradeStatusPacket // safe to read after two values have been received from errc

gopool.Submit(func() {
errc <- p2p.Send(p.rw, StatusMsg, &UpgradeStatusPacket{
SubProtocolVersion: SubProtocolVersion(subVersion),
})
})
gopool.Submit(func() {
errc <- p.readUpgradeStatus(&upgradeStatus)
})
timeout := time.NewTimer(handshakeTimeout)
defer timeout.Stop()
for i := 0; i < 2; i++ {
select {
case err := <-errc:
if err != nil {
return err
}
case <-timeout.C:
return p2p.DiscReadTimeout
}
}
p.subVersion = upgradeStatus.SubProtocolVersion

if !p.subVersion.NeedTxsBroadcastedFromPeers() {
p.Log().Debug("peer does not need broadcast txs, closing broadcast routines")
p.Close()
}
}

// TD at mainnet block #7753254 is 76 bits. If it becomes 100 million times
// larger, it will still fit within 100 bits
if tdlen := p.td.BitLen(); tdlen > 100 {
Expand Down Expand Up @@ -106,3 +137,22 @@ func (p *Peer) readStatus(network uint64, status *StatusPacket, genesis common.H
}
return nil
}

func (p *Peer) readUpgradeStatus(status *UpgradeStatusPacket) error {
msg, err := p.rw.ReadMsg()
if err != nil {
return err
}
if msg.Code != StatusMsg {
return fmt.Errorf("%w: first msg has code %x (!= %x)", errNoStatusMsg, msg.Code, StatusMsg)
}
if msg.Size > maxMessageSize {
return fmt.Errorf("%w: %v > %v", errMsgTooLarge, msg.Size, maxMessageSize)
}
// Decode the upgrade status packet, ignore the decode error for the future upgrade
// leave the checks in the handshake function
msg.Decode(&status)

p.Log().Info("receive upgrade status packet", "sub", status.SubProtocolVersion)
return nil
}
6 changes: 5 additions & 1 deletion eth/protocols/eth/handshake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,15 @@ func testHandshake(t *testing.T, protocol uint) {
// Send the junk test with one peer, check the handshake failure
go p2p.Send(app, test.code, test.data)

err := peer.Handshake(1, td, head.Hash(), genesis.Hash(), forkID, forkid.NewFilter(backend.chain))
err := peer.Handshake(1, td, head.Hash(), genesis.Hash(), forkID, forkid.NewFilter(backend.chain), 0)
if err == nil {
t.Errorf("test %d: protocol returned nil error, want %q", i, test.want)
} else if !errors.Is(err, test.want) {
t.Errorf("test %d: wrong error: got %q, want %q", i, err, test.want)
}
}
}

func TestVersion(t *testing.T) {
println(0 & 0x1)
}
15 changes: 11 additions & 4 deletions eth/protocols/eth/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync"

mapset "github.com/deckarep/golang-set"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/p2p"
Expand Down Expand Up @@ -68,9 +69,10 @@ func max(a, b int) int {
type Peer struct {
id string // Unique ID for the peer, cached

*p2p.Peer // The embedded P2P package peer
rw p2p.MsgReadWriter // Input/output streams for snap
version uint // Protocol version negotiated
*p2p.Peer // The embedded P2P package peer
rw p2p.MsgReadWriter // Input/output streams for snap
version uint // Protocol version negotiated
subVersion SubProtocolVersion

head common.Hash // Latest advertised head block hash
td *big.Int // Latest advertised head block total difficulty
Expand Down Expand Up @@ -118,7 +120,12 @@ func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool) *Pe
// you created the peer yourself via NewPeer. Otherwise let whoever created it
// clean it up!
func (p *Peer) Close() {
close(p.term)
p.Log().Info("closing broadcast routines")
select {
case <-p.term:
default:
close(p.term)
}
}

// ID retrieves the peer's unique identifier.
Expand Down
15 changes: 13 additions & 2 deletions eth/protocols/eth/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
const (
ETH65 = 65
ETH66 = 66
ETH67 = 67
)

// ProtocolName is the official short name of the `eth` protocol used during
Expand All @@ -40,11 +41,11 @@ const ProtocolName = "eth"

// ProtocolVersions are the supported versions of the `eth` protocol (first
// is primary).
var ProtocolVersions = []uint{ETH66, ETH65}
var ProtocolVersions = []uint{ETH67, ETH66, ETH65}

// protocolLengths are the number of implemented message corresponding to
// different protocol versions.
var protocolLengths = map[uint]uint64{ETH66: 17, ETH65: 17}
var protocolLengths = map[uint]uint64{ETH67: 17, ETH66: 17, ETH65: 17}

// maxMessageSize is the maximum cap on the size of a protocol message.
const maxMessageSize = 10 * 1024 * 1024
Expand Down Expand Up @@ -97,6 +98,16 @@ type StatusPacket struct {
ForkID forkid.ID
}

type UpgradeStatusPacket struct {
SubProtocolVersion SubProtocolVersion
}

type SubProtocolVersion uint64

func (v SubProtocolVersion) NeedTxsBroadcastedFromPeers() bool {
return v&0x1 == 0x0
}

// NewBlockHashesPacket is the network packet for the block announcements.
type NewBlockHashesPacket []struct {
Hash common.Hash // Hash of one particular block being announced
Expand Down
2 changes: 1 addition & 1 deletion eth/tracers/tracers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func BenchmarkTransactionTrace(b *testing.B) {
//DisableReturnData: true,
})
evm := vm.NewEVM(context, txContext, statedb, params.AllEthashProtocolChanges, vm.Config{Debug: true, Tracer: tracer})
msg, err := tx.AsMessage(signer, nil)
msg, err := tx.AsMessage(signer)
if err != nil {
b.Fatalf("failed to prepare transaction for tracing: %v", err)
}
Expand Down

0 comments on commit 4a9ede4

Please sign in to comment.