diff --git a/cmd/devp2p/internal/ethtest/types.go b/cmd/devp2p/internal/ethtest/types.go index 2c5cb94c699f..fd5251d161f3 100644 --- a/cmd/devp2p/internal/ethtest/types.go +++ b/cmd/devp2p/internal/ethtest/types.go @@ -127,7 +127,7 @@ func (msg NewBlock) Code() int { return 23 } func (msg NewBlock) ReqID() uint64 { return 0 } // NewPooledTransactionHashes is the network packet for the tx hash propagation message. -type NewPooledTransactionHashes eth.NewPooledTransactionHashesPacket +type NewPooledTransactionHashes eth.NewPooledTransactionHashesPacket66 func (msg NewPooledTransactionHashes) Code() int { return 24 } func (msg NewPooledTransactionHashes) ReqID() uint64 { return 0 } diff --git a/eth/handler_eth.go b/eth/handler_eth.go index 12e91ec7f534..4ed6335769cf 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -67,9 +67,12 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error { case *eth.NewBlockPacket: return h.handleBlockBroadcast(peer, packet.Block, packet.TD) - case *eth.NewPooledTransactionHashesPacket: + case *eth.NewPooledTransactionHashesPacket66: return h.txFetcher.Notify(peer.ID(), *packet) + case *eth.NewPooledTransactionHashesPacket68: + return h.txFetcher.Notify(peer.ID(), packet.Hashes) + case *eth.TransactionsPacket: return h.txFetcher.Enqueue(peer.ID(), *packet, false) diff --git a/eth/handler_eth_test.go b/eth/handler_eth_test.go index 9f0c36f950c5..885c2a971505 100644 --- a/eth/handler_eth_test.go +++ b/eth/handler_eth_test.go @@ -61,10 +61,14 @@ func (h *testEthHandler) Handle(peer *eth.Peer, packet eth.Packet) error { h.blockBroadcasts.Send(packet.Block) return nil - case *eth.NewPooledTransactionHashesPacket: + case *eth.NewPooledTransactionHashesPacket66: h.txAnnounces.Send(([]common.Hash)(*packet)) return nil + case *eth.NewPooledTransactionHashesPacket68: + h.txAnnounces.Send(packet.Hashes) + return nil + case *eth.TransactionsPacket: h.txBroadcasts.Send(([]*types.Transaction)(*packet)) return nil @@ -81,6 +85,8 @@ func (h *testEthHandler) Handle(peer *eth.Peer, packet eth.Packet) error { // Tests that peers are correctly accepted (or rejected) based on the advertised // fork IDs in the protocol handshake. func TestForkIDSplit66(t *testing.T) { testForkIDSplit(t, eth.ETH66) } +func TestForkIDSplit67(t *testing.T) { testForkIDSplit(t, eth.ETH67) } +func TestForkIDSplit68(t *testing.T) { testForkIDSplit(t, eth.ETH68) } func testForkIDSplit(t *testing.T, protocol uint) { t.Parallel() @@ -235,6 +241,8 @@ func testForkIDSplit(t *testing.T, protocol uint) { // Tests that received transactions are added to the local pool. func TestRecvTransactions66(t *testing.T) { testRecvTransactions(t, eth.ETH66) } +func TestRecvTransactions67(t *testing.T) { testRecvTransactions(t, eth.ETH67) } +func TestRecvTransactions68(t *testing.T) { testRecvTransactions(t, eth.ETH68) } func testRecvTransactions(t *testing.T, protocol uint) { t.Parallel() @@ -292,6 +300,8 @@ func testRecvTransactions(t *testing.T, protocol uint) { // This test checks that pending transactions are sent. func TestSendTransactions66(t *testing.T) { testSendTransactions(t, eth.ETH66) } +func TestSendTransactions67(t *testing.T) { testSendTransactions(t, eth.ETH67) } +func TestSendTransactions68(t *testing.T) { testSendTransactions(t, eth.ETH68) } func testSendTransactions(t *testing.T, protocol uint) { t.Parallel() @@ -350,7 +360,7 @@ func testSendTransactions(t *testing.T, protocol uint) { seen := make(map[common.Hash]struct{}) for len(seen) < len(insert) { switch protocol { - case 66: + case 66, 67, 68: select { case hashes := <-anns: for _, hash := range hashes { @@ -377,6 +387,8 @@ func testSendTransactions(t *testing.T, protocol uint) { // Tests that transactions get propagated to all attached peers, either via direct // broadcasts or via announcements/retrievals. func TestTransactionPropagation66(t *testing.T) { testTransactionPropagation(t, eth.ETH66) } +func TestTransactionPropagation67(t *testing.T) { testTransactionPropagation(t, eth.ETH67) } +func TestTransactionPropagation68(t *testing.T) { testTransactionPropagation(t, eth.ETH68) } func testTransactionPropagation(t *testing.T, protocol uint) { t.Parallel() @@ -678,6 +690,8 @@ func testBroadcastBlock(t *testing.T, peers, bcasts int) { // Tests that a propagated malformed block (uncles or transactions don't match // with the hashes in the header) gets discarded and not broadcast forward. func TestBroadcastMalformedBlock66(t *testing.T) { testBroadcastMalformedBlock(t, eth.ETH66) } +func TestBroadcastMalformedBlock67(t *testing.T) { testBroadcastMalformedBlock(t, eth.ETH67) } +func TestBroadcastMalformedBlock68(t *testing.T) { testBroadcastMalformedBlock(t, eth.ETH68) } func testBroadcastMalformedBlock(t *testing.T, protocol uint) { t.Parallel() diff --git a/eth/protocols/eth/broadcast.go b/eth/protocols/eth/broadcast.go index 0afe01b1ce15..3045303f222e 100644 --- a/eth/protocols/eth/broadcast.go +++ b/eth/protocols/eth/broadcast.go @@ -142,13 +142,17 @@ func (p *Peer) announceTransactions() { if done == nil && len(queue) > 0 { // Pile transaction hashes until we reach our allowed network limit var ( - count int - pending []common.Hash - size common.StorageSize + count int + pending []common.Hash + pendingTypes []byte + pendingSizes []uint32 + size common.StorageSize ) for count = 0; count < len(queue) && size < maxTxPacketSize; count++ { - if p.txpool.Get(queue[count]) != nil { + if tx := p.txpool.Get(queue[count]); tx != nil { pending = append(pending, queue[count]) + pendingTypes = append(pendingTypes, tx.Type()) + pendingSizes = append(pendingSizes, uint32(tx.Size())) size += common.HashLength } } @@ -159,9 +163,16 @@ func (p *Peer) announceTransactions() { if len(pending) > 0 { done = make(chan struct{}) go func() { - if err := p.sendPooledTransactionHashes(pending); err != nil { - fail <- err - return + if p.version >= ETH68 { + if err := p.sendPooledTransactionHashes68(pending, pendingTypes, pendingSizes); err != nil { + fail <- err + return + } + } else { + if err := p.sendPooledTransactionHashes66(pending); err != nil { + fail <- err + return + } } close(done) p.Log().Trace("Sent transaction announcements", "count", len(pending)) diff --git a/eth/protocols/eth/handler.go b/eth/protocols/eth/handler.go index 87b1f20a2dc2..60654b803051 100644 --- a/eth/protocols/eth/handler.go +++ b/eth/protocols/eth/handler.go @@ -168,7 +168,7 @@ var eth66 = map[uint64]msgHandler{ NewBlockHashesMsg: handleNewBlockhashes, NewBlockMsg: handleNewBlock, TransactionsMsg: handleTransactions, - NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes, + NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes66, GetBlockHeadersMsg: handleGetBlockHeaders66, BlockHeadersMsg: handleBlockHeaders66, GetBlockBodiesMsg: handleGetBlockBodies66, @@ -185,7 +185,22 @@ var eth67 = map[uint64]msgHandler{ NewBlockHashesMsg: handleNewBlockhashes, NewBlockMsg: handleNewBlock, TransactionsMsg: handleTransactions, - NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes, + NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes66, + GetBlockHeadersMsg: handleGetBlockHeaders66, + BlockHeadersMsg: handleBlockHeaders66, + GetBlockBodiesMsg: handleGetBlockBodies66, + BlockBodiesMsg: handleBlockBodies66, + GetReceiptsMsg: handleGetReceipts66, + ReceiptsMsg: handleReceipts66, + GetPooledTransactionsMsg: handleGetPooledTransactions66, + PooledTransactionsMsg: handlePooledTransactions66, +} + +var eth68 = map[uint64]msgHandler{ + NewBlockHashesMsg: handleNewBlockhashes, + NewBlockMsg: handleNewBlock, + TransactionsMsg: handleTransactions, + NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes68, GetBlockHeadersMsg: handleGetBlockHeaders66, BlockHeadersMsg: handleBlockHeaders66, GetBlockBodiesMsg: handleGetBlockBodies66, @@ -210,9 +225,12 @@ func handleMessage(backend Backend, peer *Peer) error { defer msg.Discard() var handlers = eth66 - if peer.Version() >= ETH67 { + if peer.Version() == ETH67 { handlers = eth67 } + if peer.Version() >= ETH68 { + handlers = eth68 + } // Track the amount of time it takes to serve the request and run the handler if metrics.Enabled { diff --git a/eth/protocols/eth/handler_test.go b/eth/protocols/eth/handler_test.go index faea081be539..5c3d1be0a123 100644 --- a/eth/protocols/eth/handler_test.go +++ b/eth/protocols/eth/handler_test.go @@ -112,6 +112,8 @@ func (b *testBackend) Handle(*Peer, Packet) error { // Tests that block headers can be retrieved from a remote chain based on user queries. func TestGetBlockHeaders66(t *testing.T) { testGetBlockHeaders(t, ETH66) } +func TestGetBlockHeaders67(t *testing.T) { testGetBlockHeaders(t, ETH67) } +func TestGetBlockHeaders68(t *testing.T) { testGetBlockHeaders(t, ETH68) } func testGetBlockHeaders(t *testing.T, protocol uint) { t.Parallel() @@ -297,6 +299,8 @@ func testGetBlockHeaders(t *testing.T, protocol uint) { // Tests that block contents can be retrieved from a remote chain based on their hashes. func TestGetBlockBodies66(t *testing.T) { testGetBlockBodies(t, ETH66) } +func TestGetBlockBodies67(t *testing.T) { testGetBlockBodies(t, ETH67) } +func TestGetBlockBodies68(t *testing.T) { testGetBlockBodies(t, ETH68) } func testGetBlockBodies(t *testing.T, protocol uint) { t.Parallel() @@ -379,9 +383,11 @@ func testGetBlockBodies(t *testing.T, protocol uint) { } // Tests that the state trie nodes can be retrieved based on hashes. -func TestGetNodeData66(t *testing.T) { testGetNodeData(t, ETH66) } +func TestGetNodeData66(t *testing.T) { testGetNodeData(t, ETH66, false) } +func TestGetNodeData67(t *testing.T) { testGetNodeData(t, ETH67, true) } +func TestGetNodeData68(t *testing.T) { testGetNodeData(t, ETH68, true) } -func testGetNodeData(t *testing.T, protocol uint) { +func testGetNodeData(t *testing.T, protocol uint, drop bool) { t.Parallel() // Define three accounts to simulate transactions with @@ -442,8 +448,15 @@ func testGetNodeData(t *testing.T, protocol uint) { GetNodeDataPacket: hashes, }) msg, err := peer.app.ReadMsg() - if err != nil { - t.Fatalf("failed to read node data response: %v", err) + if !drop { + if err != nil { + t.Fatalf("failed to read node data response: %v", err) + } + } else { + if err != nil { + return + } + t.Fatalf("succeeded to read node data response on non-supporting protocol: %v", msg) } if msg.Code != NodeDataMsg { t.Fatalf("response packet code mismatch: have %x, want %x", msg.Code, NodeDataMsg) @@ -489,6 +502,8 @@ func testGetNodeData(t *testing.T, protocol uint) { // Tests that the transaction receipts can be retrieved based on hashes. func TestGetBlockReceipts66(t *testing.T) { testGetBlockReceipts(t, ETH66) } +func TestGetBlockReceipts67(t *testing.T) { testGetBlockReceipts(t, ETH67) } +func TestGetBlockReceipts68(t *testing.T) { testGetBlockReceipts(t, ETH68) } func testGetBlockReceipts(t *testing.T, protocol uint) { t.Parallel() diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go index d454b3407f3c..85a59969ebf8 100644 --- a/eth/protocols/eth/handlers.go +++ b/eth/protocols/eth/handlers.go @@ -430,13 +430,13 @@ func handleReceipts66(backend Backend, msg Decoder, peer *Peer) error { }, metadata) } -func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer) error { +func handleNewPooledTransactionHashes66(backend Backend, msg Decoder, peer *Peer) error { // New transaction announcement arrived, make sure we have // a valid and fresh chain to handle them if !backend.AcceptTxs() { return nil } - ann := new(NewPooledTransactionHashesPacket) + ann := new(NewPooledTransactionHashesPacket66) if err := msg.Decode(ann); err != nil { return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) } @@ -447,6 +447,26 @@ func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer) return backend.Handle(peer, ann) } +func handleNewPooledTransactionHashes68(backend Backend, msg Decoder, peer *Peer) error { + // New transaction announcement arrived, make sure we have + // a valid and fresh chain to handle them + if !backend.AcceptTxs() { + return nil + } + ann := new(NewPooledTransactionHashesPacket68) + if err := msg.Decode(ann); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + if len(ann.Hashes) != len(ann.Types) || len(ann.Hashes) != len(ann.Sizes) { + return fmt.Errorf("%w: message %v: invalid len of fields: %v %v %v", errDecode, msg, len(ann.Hashes), len(ann.Types), len(ann.Sizes)) + } + // Schedule all the unknown hashes for retrieval + for _, hash := range ann.Hashes { + peer.markTransaction(hash) + } + return backend.Handle(peer, ann) +} + func handleGetPooledTransactions66(backend Backend, msg Decoder, peer *Peer) error { // Decode the pooled transactions retrieval message var query GetPooledTransactionsPacket66 diff --git a/eth/protocols/eth/peer.go b/eth/protocols/eth/peer.go index 0a3b7bd56e1b..070ce0825f9a 100644 --- a/eth/protocols/eth/peer.go +++ b/eth/protocols/eth/peer.go @@ -210,16 +210,29 @@ func (p *Peer) AsyncSendTransactions(hashes []common.Hash) { } } -// sendPooledTransactionHashes sends transaction hashes to the peer and includes +// sendPooledTransactionHashes66 sends transaction hashes to the peer and includes // them in its transaction hash set for future reference. // // This method is a helper used by the async transaction announcer. Don't call it // directly as the queueing (memory) and transmission (bandwidth) costs should // not be managed directly. -func (p *Peer) sendPooledTransactionHashes(hashes []common.Hash) error { +func (p *Peer) sendPooledTransactionHashes66(hashes []common.Hash) error { // Mark all the transactions as known, but ensure we don't overflow our limits p.knownTxs.Add(hashes...) - return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket(hashes)) + return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket66(hashes)) +} + +// sendPooledTransactionHashes68 sends transaction hashes (tagged with their type +// and size) to the peer and includes them in its transaction hash set for future +// reference. +// +// This method is a helper used by the async transaction announcer. Don't call it +// directly as the queueing (memory) and transmission (bandwidth) costs should +// not be managed directly. +func (p *Peer) sendPooledTransactionHashes68(hashes []common.Hash, types []byte, sizes []uint32) error { + // Mark all the transactions as known, but ensure we don't overflow our limits + p.knownTxs.Add(hashes...) + return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket68{Types: types, Sizes: sizes, Hashes: hashes}) } // AsyncSendPooledTransactionHashes queues a list of transactions hashes to eventually diff --git a/eth/protocols/eth/peer_test.go b/eth/protocols/eth/peer_test.go index 0916ebee5d45..efbbbc6fff88 100644 --- a/eth/protocols/eth/peer_test.go +++ b/eth/protocols/eth/peer_test.go @@ -48,6 +48,8 @@ func newTestPeer(name string, version uint, backend Backend) (*testPeer, <-chan peer := NewPeer(version, p2p.NewPeer(id, name, nil), net, backend.TxPool()) errc := make(chan error, 1) go func() { + defer app.Close() + errc <- backend.RunPeer(peer, func(peer *Peer) error { return Handle(backend, peer) }) diff --git a/eth/protocols/eth/protocol.go b/eth/protocols/eth/protocol.go index f6fac4278080..6c59fcae655a 100644 --- a/eth/protocols/eth/protocol.go +++ b/eth/protocols/eth/protocol.go @@ -32,6 +32,7 @@ import ( const ( ETH66 = 66 ETH67 = 67 + ETH68 = 68 ) // ProtocolName is the official short name of the `eth` protocol used during @@ -40,11 +41,11 @@ const ProtocolName = "eth" // ProtocolVersions are the supported versions of the `eth` protocol (first // is primary). -var ProtocolVersions = []uint{ETH67, ETH66} +var ProtocolVersions = []uint{ETH68, ETH67, ETH66} // protocolLengths are the number of implemented message corresponding to // different protocol versions. -var protocolLengths = map[uint]uint64{ETH67: 17, ETH66: 17} +var protocolLengths = map[uint]uint64{ETH68: 17, ETH67: 17, ETH66: 17} // maxMessageSize is the maximum cap on the size of a protocol message. const maxMessageSize = 10 * 1024 * 1024 @@ -298,8 +299,15 @@ type ReceiptsRLPPacket66 struct { ReceiptsRLPPacket } -// NewPooledTransactionHashesPacket represents a transaction announcement packet. -type NewPooledTransactionHashesPacket []common.Hash +// NewPooledTransactionHashesPacket66 represents a transaction announcement packet on eth/66 and eth/67. +type NewPooledTransactionHashesPacket66 []common.Hash + +// NewPooledTransactionHashesPacket68 represents a transaction announcement packet on eth/68 and newer. +type NewPooledTransactionHashesPacket68 struct { + Types []byte + Sizes []uint32 + Hashes []common.Hash +} // GetPooledTransactionsPacket represents a transaction query. type GetPooledTransactionsPacket []common.Hash @@ -364,8 +372,11 @@ func (*GetReceiptsPacket) Kind() byte { return GetReceiptsMsg } func (*ReceiptsPacket) Name() string { return "Receipts" } func (*ReceiptsPacket) Kind() byte { return ReceiptsMsg } -func (*NewPooledTransactionHashesPacket) Name() string { return "NewPooledTransactionHashes" } -func (*NewPooledTransactionHashesPacket) Kind() byte { return NewPooledTransactionHashesMsg } +func (*NewPooledTransactionHashesPacket66) Name() string { return "NewPooledTransactionHashes" } +func (*NewPooledTransactionHashesPacket66) Kind() byte { return NewPooledTransactionHashesMsg } + +func (*NewPooledTransactionHashesPacket68) Name() string { return "NewPooledTransactionHashes" } +func (*NewPooledTransactionHashesPacket68) Kind() byte { return NewPooledTransactionHashesMsg } func (*GetPooledTransactionsPacket) Name() string { return "GetPooledTransactions" } func (*GetPooledTransactionsPacket) Kind() byte { return GetPooledTransactionsMsg }