Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p receipts #11010

Merged
merged 33 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion erigon-lib/txpool/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (f *Fetch) receiveMessage(ctx context.Context, sentryClient sentry.SentryCl
time.Sleep(3 * time.Second)
continue
}
f.logger.Debug("[txpool.fetch] Handling incoming message", "msg", req.Id.String(), "err", err)
f.logger.Debug("[txpool.fetch] Handling incoming message", "msg", string(req.Data), "reqID", req.Id.String(), "err", err)
}
if f.wg != nil {
f.wg.Done()
Expand Down
61 changes: 49 additions & 12 deletions eth/protocols/eth/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,15 @@ package eth
import (
"context"
"fmt"

"github.com/ledgerwatch/erigon-lib/chain"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/log/v3"
"github.com/ledgerwatch/erigon/consensus"
"github.com/ledgerwatch/erigon/core"
"github.com/ledgerwatch/erigon/core/state"
"github.com/ledgerwatch/erigon/core/vm"
"github.com/ledgerwatch/erigon/turbo/transactions"

"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/core/rawdb"
Expand Down Expand Up @@ -160,12 +165,13 @@ func AnswerGetBlockBodiesQuery(db kv.Tx, query GetBlockBodiesPacket, blockReader
return bodies
}

func AnswerGetReceiptsQuery(br services.FullBlockReader, db kv.Tx, query GetReceiptsPacket) ([]rlp.RawValue, error) { //nolint:unparam
func AnswerGetReceiptsQuery(ctx context.Context, br services.FullBlockReader, db kv.Tx, query GetReceiptsPacket, chainConfing *chain.Config, engine consensus.Engine) ([]rlp.RawValue, error) { //nolint:unparam
// Gather state data until the fetch or network limits is reached
var (
bytes int
receipts []rlp.RawValue
)

for lookups, hash := range query {
if bytes >= softResponseLimit || len(receipts) >= maxReceiptsServe ||
lookups >= 2*maxReceiptsServe {
Expand All @@ -176,23 +182,19 @@ func AnswerGetReceiptsQuery(br services.FullBlockReader, db kv.Tx, query GetRece
return nil, nil
}
// Retrieve the requested block's receipts
b, s, err := br.BlockWithSenders(context.Background(), db, hash, *number)
b, _, err := br.BlockWithSenders(context.Background(), db, hash, *number)
if err != nil {
return nil, err
}
if b == nil {
return nil, nil
}
results := rawdb.ReadReceipts(db, b, s)
if results == nil {
header, err := rawdb.ReadHeaderByHash(db, hash)
if err != nil {
return nil, err
}
if header == nil || header.ReceiptHash != types.EmptyRootHash {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems this optimization lost. maybe move it inside generator

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done but not in generator because it's query response optimization, not receipt

continue
}

results, err := MakeReceipts(ctx, db, b, br, chainConfing, engine)
if err != nil {
return nil, err
}

// If known, encode and queue for response packet
if encoded, err := rlp.EncodeToBytes(results); err != nil {
return nil, fmt.Errorf("failed to encode receipt: %w", err)
Expand All @@ -203,3 +205,38 @@ func AnswerGetReceiptsQuery(br services.FullBlockReader, db kv.Tx, query GetRece
}
return receipts, nil
}

func MakeReceipts(ctx context.Context, db kv.Tx, block *types.Block,
br services.FullBlockReader, chainConfig *chain.Config, engine consensus.Engine) (receipts types.Receipts, err error) {
_, _, _, ibs, _, err := transactions.ComputeTxEnv(ctx, engine, block, chainConfig, br, db, 0)
if err != nil {
return nil, err
}
txs := block.Transactions()
getHeader := func(hash libcommon.Hash, number uint64) *types.Header {
h, e := br.Header(ctx, db, hash, number)
if e != nil {
log.Error("getHeader error", "number", number, "hash", hash, "err", e)
}
return h
}
header := block.Header()
usedGas := new(uint64)
usedBlobGas := new(uint64)
gp := new(core.GasPool).AddGas(block.GasLimit()).AddBlobGas(chainConfig.GetMaxBlobGasPerBlock())

noopWriter := state.NewNoopWriter()

receipts = make(types.Receipts, len(block.Transactions()))

for i := range txs {
ibs.SetTxContext(txs[i].Hash(), block.Hash(), i)
receipt, _, err := core.ApplyTransaction(chainConfig, core.GetHashFn(header, getHeader), engine, nil, gp, ibs, noopWriter, header, txs[i], usedGas, usedBlobGas, vm.Config{})
if err != nil {
return nil, err
}
receipt.BlockHash = block.Hash()
receipts[i] = receipt
}
return receipts, nil
}
86 changes: 47 additions & 39 deletions p2p/sentry/sentry_multi_client/sentry_multi_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"encoding/hex"
"errors"
"fmt"
"golang.org/x/sync/semaphore"
"math/rand"
"sort"
"sync"
Expand Down Expand Up @@ -291,7 +292,8 @@ type MultiClient struct {
// decouple sentry multi client from header and body downloading logic is done
disableBlockDownload bool

logger log.Logger
logger log.Logger
onlyOneGoroutineController *semaphore.Weighted
}

func NewMultiClient(
Expand Down Expand Up @@ -356,6 +358,7 @@ func NewMultiClient(
maxBlockBroadcastPeers: maxBlockBroadcastPeers,
disableBlockDownload: disableBlockDownload,
logger: logger,
onlyOneGoroutineController: semaphore.NewWeighted(1),
}

return cs, nil
Expand Down Expand Up @@ -697,44 +700,49 @@ func (cs *MultiClient) getBlockBodies66(ctx context.Context, inreq *proto_sentry
}

func (cs *MultiClient) getReceipts66(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry direct.SentryClient) error {
return nil //TODO: https://github.com/ledgerwatch/erigon/issues/10320
//var query eth.GetReceiptsPacket66
//if err := rlp.DecodeBytes(inreq.Data, &query); err != nil {
// return fmt.Errorf("decoding getReceipts66: %w, data: %x", err, inreq.Data)
//}
//tx, err := cs.db.BeginRo(ctx)
//if err != nil {
// return err
//}
//defer tx.Rollback()
//receipts, err := eth.AnswerGetReceiptsQuery(cs.blockReader, tx, query.GetReceiptsPacket)
//if err != nil {
// return err
//}
//tx.Rollback()
//b, err := rlp.EncodeToBytes(&eth.ReceiptsRLPPacket66{
// RequestId: query.RequestId,
// ReceiptsRLPPacket: receipts,
//})
//if err != nil {
// return fmt.Errorf("encode header response: %w", err)
//}
//outreq := proto_sentry.SendMessageByIdRequest{
// PeerId: inreq.PeerId,
// Data: &proto_sentry.OutboundMessageData{
// Id: proto_sentry.MessageId_RECEIPTS_66,
// Data: b,
// },
//}
//_, err = sentry.SendMessageById(ctx, &outreq, &grpc.EmptyCallOption{})
//if err != nil {
// if isPeerNotFoundErr(err) {
// return nil
// }
// return fmt.Errorf("send bodies response: %w", err)
//}
////cs.logger.Info(fmt.Sprintf("[%s] GetReceipts responseLen %d", ConvertH512ToPeerID(inreq.PeerId), len(b)))
//return nil
err := cs.onlyOneGoroutineController.Acquire(ctx, 1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

onlyOneGoroutineController - it's also meaningless name:

  • onlyOneGoroutine unclear what kind of goroutines (probably there are many of them in MultiClient class). main feature of semaphore - ability to configure limit in future. if semaphore limit is always 1 - then can use mutex. but I think we need ability to increase limit in future.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if err != nil {
return err
}
defer cs.onlyOneGoroutineController.Release(1)
var query eth.GetReceiptsPacket66
if err := rlp.DecodeBytes(inreq.Data, &query); err != nil {
return fmt.Errorf("decoding getReceipts66: %w, data: %x", err, inreq.Data)
}

tx, err := cs.db.BeginRo(ctx)
if err != nil {
return err
}
defer tx.Rollback()
receipts, err := eth.AnswerGetReceiptsQuery(ctx, cs.blockReader, tx, query.GetReceiptsPacket, cs.ChainConfig, cs.Engine)
if err != nil {
return err
}
tx.Rollback()
b, err := rlp.EncodeToBytes(&eth.ReceiptsRLPPacket66{
RequestId: query.RequestId,
ReceiptsRLPPacket: receipts,
})
if err != nil {
return fmt.Errorf("encode header response: %w", err)
}
outreq := proto_sentry.SendMessageByIdRequest{
PeerId: inreq.PeerId,
Data: &proto_sentry.OutboundMessageData{
Id: proto_sentry.MessageId_RECEIPTS_66,
Data: b,
},
}
_, err = sentry.SendMessageById(ctx, &outreq, &grpc.EmptyCallOption{})
if err != nil {
if isPeerNotFoundErr(err) {
return nil
}
return fmt.Errorf("send bodies response: %w", err)
}
//cs.logger.Info(fmt.Sprintf("[%s] GetReceipts responseLen %d", ConvertH512ToPeerID(inreq.PeerId), len(b)))
return nil
}

func MakeInboundMessage() *proto_sentry.InboundMessage {
Expand Down
30 changes: 25 additions & 5 deletions turbo/stages/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ import (
"encoding/binary"
"errors"
"fmt"
proto_sentry "github.com/ledgerwatch/erigon-lib/gointerfaces/sentryproto"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

enable imports auto-sort in IDE pls

"github.com/ledgerwatch/erigon/eth/protocols/eth"
"github.com/ledgerwatch/erigon/rlp"
"math"
"math/big"
"testing"

"github.com/ledgerwatch/erigon-lib/common/hexutil"
"github.com/ledgerwatch/erigon-lib/config3"
"github.com/ledgerwatch/erigon-lib/log/v3"

"github.com/holiman/uint256"
Expand Down Expand Up @@ -319,10 +321,6 @@ func testReorgShort(t *testing.T) {
}

func testReorg(t *testing.T, first, second []int64, td int64) {
if config3.EnableHistoryV4InTest {
t.Skip("TODO: [e4] implement me")
}

require := require.New(t)
// Create a pristine chain and database
m := newCanonical(t, 0)
Expand Down Expand Up @@ -359,7 +357,11 @@ func testReorg(t *testing.T, first, second []int64, td int64) {
if err != nil {
t.Fatal(err)
}

hashPacket := make([]libcommon.Hash, 0)

for block.NumberU64() != 0 {
hashPacket = append(hashPacket, block.Hash())
if prev.ParentHash() != block.Hash() {
t.Errorf("parent block hash mismatch: have %x, want %x", prev.ParentHash(), block.Hash())
}
Expand All @@ -369,6 +371,24 @@ func testReorg(t *testing.T, first, second []int64, td int64) {
t.Fatal(err)
}
}

b, err := rlp.EncodeToBytes(&eth.GetReceiptsPacket66{
RequestId: 1,
GetReceiptsPacket: hashPacket,
})
if err != nil {
t.Fatal(err)
}

m.ReceiveWg.Add(1)
for _, err = range m.Send(&proto_sentry.InboundMessage{Id: proto_sentry.MessageId_GET_RECEIPTS_66, Data: b, PeerId: m.PeerId}) {
if err != nil {
t.Fatal(err)
}
}

m.ReceiveWg.Wait()

// Make sure the chain total difficulty is the correct one
want := new(big.Int).Add(m.Genesis.Difficulty(), big.NewInt(td))
have, err := rawdb.ReadTdByHash(tx, rawdb.ReadCurrentHeader(tx).Hash())
Expand Down
Loading