From eea467a6da9fb7a22e67c83c387ef44db52dfbb2 Mon Sep 17 00:00:00 2001 From: JkLondon Date: Wed, 3 Jul 2024 17:20:12 +0400 Subject: [PATCH 01/29] added logs to receipts --- cmd/state/exec3/trace_worker.go | 2 +- eth/protocols/eth/handlers.go | 27 ++++++- .../sentry_multi_client.go | 79 ++++++++++--------- 3 files changed, 67 insertions(+), 41 deletions(-) diff --git a/cmd/state/exec3/trace_worker.go b/cmd/state/exec3/trace_worker.go index 9a6d918a10a..e28f1aca131 100644 --- a/cmd/state/exec3/trace_worker.go +++ b/cmd/state/exec3/trace_worker.go @@ -47,7 +47,7 @@ type TraceWorker struct { vmConfig *vm.Config } -func NewTraceWorker(tx kv.TemporalTx, cc *chain.Config, engine consensus.EngineReader, br services.HeaderReader, tracer GenericTracer) *TraceWorker { +func NewTraceWorker(tx kv.Tx, cc *chain.Config, engine consensus.EngineReader, br services.HeaderReader, tracer GenericTracer) *TraceWorker { stateReader := state.NewHistoryReaderV3() stateReader.SetTx(tx) diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go index 49d31a1d5e0..0217e61d3ec 100644 --- a/eth/protocols/eth/handlers.go +++ b/eth/protocols/eth/handlers.go @@ -19,10 +19,11 @@ package eth import ( "context" "fmt" - libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon-lib/kv/rawdbv3" "github.com/ledgerwatch/erigon-lib/log/v3" + "github.com/ledgerwatch/erigon/cmd/state/exec3" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/core/rawdb" @@ -157,7 +158,7 @@ func AnswerGetBlockBodiesQuery(db kv.Tx, query GetBlockBodiesPacket, blockReader return bodies } -func AnswerGetReceiptsQuery(br services.FullBlockReader, db kv.Tx, query GetReceiptsPacket) ([]rlp.RawValue, error) { //nolint:unparam +func AnswerGetReceiptsQuery(br services.FullBlockReader, db kv.Tx, query GetReceiptsPacket, exec *exec3.TraceWorker) ([]rlp.RawValue, error) { //nolint:unparam // Gather state data until the fetch or network limits is reached var ( bytes int @@ -190,6 +191,11 @@ func AnswerGetReceiptsQuery(br services.FullBlockReader, db kv.Tx, query GetRece continue } } + err = AddLogsToReceipts(db, results, b, exec) + if err != nil { + return nil, err + } + // If known, encode and queue for response packet if encoded, err := rlp.EncodeToBytes(results); err != nil { return nil, fmt.Errorf("failed to encode receipt: %w", err) @@ -200,3 +206,20 @@ func AnswerGetReceiptsQuery(br services.FullBlockReader, db kv.Tx, query GetRece } return receipts, nil } + +func AddLogsToReceipts(db kv.Tx, receipts types.Receipts, block *types.Block, exec *exec3.TraceWorker) error { + txs := block.Transactions() + txNum, err := rawdbv3.TxNums.Min(db, block.NumberU64()) + if err != nil { + return err + } + for i := range receipts { + _, err = exec.ExecTxn(txNum, i, txs[i]) + if err != nil { + return err + } + logs := exec.GetLogs(i, txs[i]) + receipts[i].Logs = logs + } + return nil +} diff --git a/p2p/sentry/sentry_multi_client/sentry_multi_client.go b/p2p/sentry/sentry_multi_client/sentry_multi_client.go index 3eceb0dfdef..6d094cbea5c 100644 --- a/p2p/sentry/sentry_multi_client/sentry_multi_client.go +++ b/p2p/sentry/sentry_multi_client/sentry_multi_client.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "errors" "fmt" + "github.com/ledgerwatch/erigon/cmd/state/exec3" "math/rand" "sort" "sync" @@ -681,44 +682,46 @@ func (cs *MultiClient) getBlockBodies66(ctx context.Context, inreq *proto_sentry } func (cs *MultiClient) getReceipts66(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry direct.SentryClient) error { - return nil //TODO: https://github.com/ledgerwatch/erigon/issues/10320 - //var query eth.GetReceiptsPacket66 - //if err := rlp.DecodeBytes(inreq.Data, &query); err != nil { - // return fmt.Errorf("decoding getReceipts66: %w, data: %x", err, inreq.Data) - //} - //tx, err := cs.db.BeginRo(ctx) - //if err != nil { - // return err - //} - //defer tx.Rollback() - //receipts, err := eth.AnswerGetReceiptsQuery(cs.blockReader, tx, query.GetReceiptsPacket) - //if err != nil { - // return err - //} - //tx.Rollback() - //b, err := rlp.EncodeToBytes(ð.ReceiptsRLPPacket66{ - // RequestId: query.RequestId, - // ReceiptsRLPPacket: receipts, - //}) - //if err != nil { - // return fmt.Errorf("encode header response: %w", err) - //} - //outreq := proto_sentry.SendMessageByIdRequest{ - // PeerId: inreq.PeerId, - // Data: &proto_sentry.OutboundMessageData{ - // Id: proto_sentry.MessageId_RECEIPTS_66, - // Data: b, - // }, - //} - //_, err = sentry.SendMessageById(ctx, &outreq, &grpc.EmptyCallOption{}) - //if err != nil { - // if isPeerNotFoundErr(err) { - // return nil - // } - // return fmt.Errorf("send bodies response: %w", err) - //} - ////cs.logger.Info(fmt.Sprintf("[%s] GetReceipts responseLen %d", ConvertH512ToPeerID(inreq.PeerId), len(b))) - //return nil + //return nil //TODO: https://github.com/ledgerwatch/erigon/issues/10320 + var query eth.GetReceiptsPacket66 + if err := rlp.DecodeBytes(inreq.Data, &query); err != nil { + return fmt.Errorf("decoding getReceipts66: %w, data: %x", err, inreq.Data) + } + + tx, err := cs.db.BeginRo(ctx) + if err != nil { + return err + } + defer tx.Rollback() + exec := exec3.NewTraceWorker(tx, cs.ChainConfig, cs.Engine, cs.blockReader, nil) + receipts, err := eth.AnswerGetReceiptsQuery(cs.blockReader, tx, query.GetReceiptsPacket, exec) + if err != nil { + return err + } + tx.Rollback() + b, err := rlp.EncodeToBytes(ð.ReceiptsRLPPacket66{ + RequestId: query.RequestId, + ReceiptsRLPPacket: receipts, + }) + if err != nil { + return fmt.Errorf("encode header response: %w", err) + } + outreq := proto_sentry.SendMessageByIdRequest{ + PeerId: inreq.PeerId, + Data: &proto_sentry.OutboundMessageData{ + Id: proto_sentry.MessageId_RECEIPTS_66, + Data: b, + }, + } + _, err = sentry.SendMessageById(ctx, &outreq, &grpc.EmptyCallOption{}) + if err != nil { + if isPeerNotFoundErr(err) { + return nil + } + return fmt.Errorf("send bodies response: %w", err) + } + //cs.logger.Info(fmt.Sprintf("[%s] GetReceipts responseLen %d", ConvertH512ToPeerID(inreq.PeerId), len(b))) + return nil } func MakeInboundMessage() *proto_sentry.InboundMessage { From 9ff21c9d09b3459aeb5bba3b3116397dc18eff7e Mon Sep 17 00:00:00 2001 From: JkLondon Date: Wed, 3 Jul 2024 17:26:02 +0400 Subject: [PATCH 02/29] casted to temporal --- cmd/state/exec3/trace_worker.go | 2 +- p2p/sentry/sentry_multi_client/sentry_multi_client.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/cmd/state/exec3/trace_worker.go b/cmd/state/exec3/trace_worker.go index e28f1aca131..9a6d918a10a 100644 --- a/cmd/state/exec3/trace_worker.go +++ b/cmd/state/exec3/trace_worker.go @@ -47,7 +47,7 @@ type TraceWorker struct { vmConfig *vm.Config } -func NewTraceWorker(tx kv.Tx, cc *chain.Config, engine consensus.EngineReader, br services.HeaderReader, tracer GenericTracer) *TraceWorker { +func NewTraceWorker(tx kv.TemporalTx, cc *chain.Config, engine consensus.EngineReader, br services.HeaderReader, tracer GenericTracer) *TraceWorker { stateReader := state.NewHistoryReaderV3() stateReader.SetTx(tx) diff --git a/p2p/sentry/sentry_multi_client/sentry_multi_client.go b/p2p/sentry/sentry_multi_client/sentry_multi_client.go index 6d094cbea5c..3b7ef44f25c 100644 --- a/p2p/sentry/sentry_multi_client/sentry_multi_client.go +++ b/p2p/sentry/sentry_multi_client/sentry_multi_client.go @@ -693,7 +693,8 @@ func (cs *MultiClient) getReceipts66(ctx context.Context, inreq *proto_sentry.In return err } defer tx.Rollback() - exec := exec3.NewTraceWorker(tx, cs.ChainConfig, cs.Engine, cs.blockReader, nil) + ttx := tx.(kv.TemporalTx) + exec := exec3.NewTraceWorker(ttx, cs.ChainConfig, cs.Engine, cs.blockReader, nil) receipts, err := eth.AnswerGetReceiptsQuery(cs.blockReader, tx, query.GetReceiptsPacket, exec) if err != nil { return err From 35a3f1f21fcc21e6eabf416a9f49c10715161207 Mon Sep 17 00:00:00 2001 From: JkLondon Date: Wed, 3 Jul 2024 17:26:45 +0400 Subject: [PATCH 03/29] save --- p2p/sentry/sentry_multi_client/sentry_multi_client.go | 1 - 1 file changed, 1 deletion(-) diff --git a/p2p/sentry/sentry_multi_client/sentry_multi_client.go b/p2p/sentry/sentry_multi_client/sentry_multi_client.go index 3b7ef44f25c..c0e4c1a778b 100644 --- a/p2p/sentry/sentry_multi_client/sentry_multi_client.go +++ b/p2p/sentry/sentry_multi_client/sentry_multi_client.go @@ -682,7 +682,6 @@ func (cs *MultiClient) getBlockBodies66(ctx context.Context, inreq *proto_sentry } func (cs *MultiClient) getReceipts66(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry direct.SentryClient) error { - //return nil //TODO: https://github.com/ledgerwatch/erigon/issues/10320 var query eth.GetReceiptsPacket66 if err := rlp.DecodeBytes(inreq.Data, &query); err != nil { return fmt.Errorf("decoding getReceipts66: %w, data: %x", err, inreq.Data) From f05ba605d7ab7adbc5c608ea29dbc4be9384a08d Mon Sep 17 00:00:00 2001 From: JkLondon Date: Wed, 3 Jul 2024 17:46:36 +0400 Subject: [PATCH 04/29] added semaphore --- p2p/sentry/sentry_multi_client/sentry_multi_client.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/p2p/sentry/sentry_multi_client/sentry_multi_client.go b/p2p/sentry/sentry_multi_client/sentry_multi_client.go index c0e4c1a778b..e8c1bbed257 100644 --- a/p2p/sentry/sentry_multi_client/sentry_multi_client.go +++ b/p2p/sentry/sentry_multi_client/sentry_multi_client.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "github.com/ledgerwatch/erigon/cmd/state/exec3" + "golang.org/x/sync/semaphore" "math/rand" "sort" "sync" @@ -276,7 +277,8 @@ type MultiClient struct { // decouple sentry multi client from header and body downloading logic is done disableBlockDownload bool - logger log.Logger + logger log.Logger + semaphore *semaphore.Weighted } func NewMultiClient( @@ -341,6 +343,7 @@ func NewMultiClient( maxBlockBroadcastPeers: maxBlockBroadcastPeers, disableBlockDownload: disableBlockDownload, logger: logger, + semaphore: semaphore.NewWeighted(1), } return cs, nil @@ -682,6 +685,11 @@ func (cs *MultiClient) getBlockBodies66(ctx context.Context, inreq *proto_sentry } func (cs *MultiClient) getReceipts66(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry direct.SentryClient) error { + err := cs.semaphore.Acquire(ctx, 1) + if err != nil { + return err + } + defer cs.semaphore.Release(1) var query eth.GetReceiptsPacket66 if err := rlp.DecodeBytes(inreq.Data, &query); err != nil { return fmt.Errorf("decoding getReceipts66: %w, data: %x", err, inreq.Data) From c917eb08e3f2e9d0759c4409641af4dcb797f44c Mon Sep 17 00:00:00 2001 From: JkLondon Date: Thu, 4 Jul 2024 10:42:32 +0400 Subject: [PATCH 05/29] reorg unlocked --- turbo/stages/blockchain_test.go | 29 ++++++++++++++++++++++++----- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/turbo/stages/blockchain_test.go b/turbo/stages/blockchain_test.go index 894a400f2be..8bb9875ec72 100644 --- a/turbo/stages/blockchain_test.go +++ b/turbo/stages/blockchain_test.go @@ -24,12 +24,14 @@ import ( "encoding/binary" "errors" "fmt" + proto_sentry "github.com/ledgerwatch/erigon-lib/gointerfaces/sentryproto" + "github.com/ledgerwatch/erigon/eth/protocols/eth" + "github.com/ledgerwatch/erigon/rlp" "math" "math/big" "testing" "github.com/ledgerwatch/erigon-lib/common/hexutil" - "github.com/ledgerwatch/erigon-lib/config3" "github.com/ledgerwatch/erigon-lib/log/v3" "github.com/holiman/uint256" @@ -319,10 +321,6 @@ func testReorgShort(t *testing.T) { } func testReorg(t *testing.T, first, second []int64, td int64) { - if config3.EnableHistoryV4InTest { - t.Skip("TODO: [e4] implement me") - } - require := require.New(t) // Create a pristine chain and database m := newCanonical(t, 0) @@ -359,7 +357,11 @@ func testReorg(t *testing.T, first, second []int64, td int64) { if err != nil { t.Fatal(err) } + + hashPacket := make([]libcommon.Hash, 0) + for block.NumberU64() != 0 { + hashPacket = append(hashPacket, block.Hash()) if prev.ParentHash() != block.Hash() { t.Errorf("parent block hash mismatch: have %x, want %x", prev.ParentHash(), block.Hash()) } @@ -369,6 +371,23 @@ func testReorg(t *testing.T, first, second []int64, td int64) { t.Fatal(err) } } + + b, err := rlp.EncodeToBytes(ð.GetReceiptsPacket66{ + RequestId: 1, + GetReceiptsPacket: hashPacket, + }) + if err != nil { + t.Fatal(err) + } + + m.ReceiveWg.Add(1) + for _, err = range m.Send(&proto_sentry.InboundMessage{Id: proto_sentry.MessageId_GET_RECEIPTS_66, Data: b, PeerId: m.PeerId}) { + if err != nil { + t.Fatal(err) + } + } + m.ReceiveWg.Wait() + // Make sure the chain total difficulty is the correct one want := new(big.Int).Add(m.Genesis.Difficulty(), big.NewInt(td)) have, err := rawdb.ReadTdByHash(tx, rawdb.ReadCurrentHeader(tx).Hash()) From cf7fc0b75cc5f0191beb729b73d15fa1b4772936 Mon Sep 17 00:00:00 2001 From: JkLondon Date: Fri, 5 Jul 2024 13:03:46 +0400 Subject: [PATCH 06/29] fix --- eth/protocols/eth/handlers.go | 1 + 1 file changed, 1 insertion(+) diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go index 95a65d4a822..326205dd5fe 100644 --- a/eth/protocols/eth/handlers.go +++ b/eth/protocols/eth/handlers.go @@ -223,6 +223,7 @@ func AddLogsToReceipts(db kv.Tx, receipts types.Receipts, block *types.Block, ex } logs := exec.GetLogs(i, txs[i]) receipts[i].Logs = logs + txNum++ } return nil } From 132d101ea42f06a1867f7d87015a9038578d9ec1 Mon Sep 17 00:00:00 2001 From: JkLondon Date: Fri, 5 Jul 2024 13:57:19 +0400 Subject: [PATCH 07/29] fix --- erigon-lib/txpool/fetch.go | 2 +- p2p/sentry/sentry_multi_client/sentry_multi_client.go | 10 +++++----- turbo/stages/blockchain_test.go | 1 + 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/erigon-lib/txpool/fetch.go b/erigon-lib/txpool/fetch.go index a0022e5b33b..c09473acae8 100644 --- a/erigon-lib/txpool/fetch.go +++ b/erigon-lib/txpool/fetch.go @@ -194,7 +194,7 @@ func (f *Fetch) receiveMessage(ctx context.Context, sentryClient sentry.SentryCl time.Sleep(3 * time.Second) continue } - f.logger.Debug("[txpool.fetch] Handling incoming message", "msg", req.Id.String(), "err", err) + f.logger.Debug("[txpool.fetch] Handling incoming message", "msg", string(req.Data), "reqID", req.Id.String(), "err", err) } if f.wg != nil { f.wg.Done() diff --git a/p2p/sentry/sentry_multi_client/sentry_multi_client.go b/p2p/sentry/sentry_multi_client/sentry_multi_client.go index f33c65ae784..8f41daf0231 100644 --- a/p2p/sentry/sentry_multi_client/sentry_multi_client.go +++ b/p2p/sentry/sentry_multi_client/sentry_multi_client.go @@ -293,8 +293,8 @@ type MultiClient struct { // decouple sentry multi client from header and body downloading logic is done disableBlockDownload bool - logger log.Logger - semaphore *semaphore.Weighted + logger log.Logger + onlyOneGoroutineController *semaphore.Weighted } func NewMultiClient( @@ -359,7 +359,7 @@ func NewMultiClient( maxBlockBroadcastPeers: maxBlockBroadcastPeers, disableBlockDownload: disableBlockDownload, logger: logger, - semaphore: semaphore.NewWeighted(1), + onlyOneGoroutineController: semaphore.NewWeighted(1), } return cs, nil @@ -701,11 +701,11 @@ func (cs *MultiClient) getBlockBodies66(ctx context.Context, inreq *proto_sentry } func (cs *MultiClient) getReceipts66(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry direct.SentryClient) error { - err := cs.semaphore.Acquire(ctx, 1) + err := cs.onlyOneGoroutineController.Acquire(ctx, 1) if err != nil { return err } - defer cs.semaphore.Release(1) + defer cs.onlyOneGoroutineController.Release(1) var query eth.GetReceiptsPacket66 if err := rlp.DecodeBytes(inreq.Data, &query); err != nil { return fmt.Errorf("decoding getReceipts66: %w, data: %x", err, inreq.Data) diff --git a/turbo/stages/blockchain_test.go b/turbo/stages/blockchain_test.go index 8bb9875ec72..83fd7b10190 100644 --- a/turbo/stages/blockchain_test.go +++ b/turbo/stages/blockchain_test.go @@ -386,6 +386,7 @@ func testReorg(t *testing.T, first, second []int64, td int64) { t.Fatal(err) } } + m.ReceiveWg.Wait() // Make sure the chain total difficulty is the correct one From ddc0aae5914713aaa288163ed7774835721fc005 Mon Sep 17 00:00:00 2001 From: JkLondon Date: Sat, 6 Jul 2024 19:08:28 +0400 Subject: [PATCH 08/29] new receipt func --- eth/protocols/eth/handlers.go | 65 +++++++++++-------- .../sentry_multi_client.go | 5 +- 2 files changed, 40 insertions(+), 30 deletions(-) diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go index 326205dd5fe..8bd5823a0a7 100644 --- a/eth/protocols/eth/handlers.go +++ b/eth/protocols/eth/handlers.go @@ -22,11 +22,15 @@ package eth import ( "context" "fmt" + "github.com/ledgerwatch/erigon-lib/chain" libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon-lib/kv/rawdbv3" "github.com/ledgerwatch/erigon-lib/log/v3" - "github.com/ledgerwatch/erigon/cmd/state/exec3" + "github.com/ledgerwatch/erigon/consensus" + "github.com/ledgerwatch/erigon/core" + "github.com/ledgerwatch/erigon/core/state" + "github.com/ledgerwatch/erigon/core/vm" + "github.com/ledgerwatch/erigon/turbo/transactions" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/core/rawdb" @@ -161,12 +165,13 @@ func AnswerGetBlockBodiesQuery(db kv.Tx, query GetBlockBodiesPacket, blockReader return bodies } -func AnswerGetReceiptsQuery(br services.FullBlockReader, db kv.Tx, query GetReceiptsPacket, exec *exec3.TraceWorker) ([]rlp.RawValue, error) { //nolint:unparam +func AnswerGetReceiptsQuery(ctx context.Context, br services.FullBlockReader, db kv.Tx, query GetReceiptsPacket, chainConfing *chain.Config, engine consensus.Engine) ([]rlp.RawValue, error) { //nolint:unparam // Gather state data until the fetch or network limits is reached var ( bytes int receipts []rlp.RawValue ) + for lookups, hash := range query { if bytes >= softResponseLimit || len(receipts) >= maxReceiptsServe || lookups >= 2*maxReceiptsServe { @@ -177,24 +182,15 @@ func AnswerGetReceiptsQuery(br services.FullBlockReader, db kv.Tx, query GetRece return nil, nil } // Retrieve the requested block's receipts - b, s, err := br.BlockWithSenders(context.Background(), db, hash, *number) + b, _, err := br.BlockWithSenders(context.Background(), db, hash, *number) if err != nil { return nil, err } if b == nil { return nil, nil } - results := rawdb.ReadReceipts(db, b, s) - if results == nil { - header, err := rawdb.ReadHeaderByHash(db, hash) - if err != nil { - return nil, err - } - if header == nil || header.ReceiptHash != types.EmptyRootHash { - continue - } - } - err = AddLogsToReceipts(db, results, b, exec) + + results, err := MakeReceipts(ctx, db, b, br, chainConfing, engine) if err != nil { return nil, err } @@ -210,20 +206,37 @@ func AnswerGetReceiptsQuery(br services.FullBlockReader, db kv.Tx, query GetRece return receipts, nil } -func AddLogsToReceipts(db kv.Tx, receipts types.Receipts, block *types.Block, exec *exec3.TraceWorker) error { - txs := block.Transactions() - txNum, err := rawdbv3.TxNums.Min(db, block.NumberU64()) +func MakeReceipts(ctx context.Context, db kv.Tx, block *types.Block, + br services.FullBlockReader, chainConfig *chain.Config, engine consensus.Engine) (receipts types.Receipts, err error) { + _, _, _, ibs, _, err := transactions.ComputeTxEnv(ctx, engine, block, chainConfig, br, db, 0) if err != nil { - return err + return nil, err } - for i := range receipts { - _, err = exec.ExecTxn(txNum, i, txs[i]) + txs := block.Transactions() + getHeader := func(hash libcommon.Hash, number uint64) *types.Header { + h, e := br.Header(ctx, db, hash, number) + if e != nil { + log.Error("getHeader error", "number", number, "hash", hash, "err", e) + } + return h + } + header := block.Header() + usedGas := new(uint64) + usedBlobGas := new(uint64) + gp := new(core.GasPool).AddGas(block.GasLimit()).AddBlobGas(chainConfig.GetMaxBlobGasPerBlock()) + + noopWriter := state.NewNoopWriter() + + receipts = make(types.Receipts, len(block.Transactions())) + + for i := range txs { + ibs.SetTxContext(txs[i].Hash(), block.Hash(), i) + receipt, _, err := core.ApplyTransaction(chainConfig, core.GetHashFn(header, getHeader), engine, nil, gp, ibs, noopWriter, header, txs[i], usedGas, usedBlobGas, vm.Config{}) if err != nil { - return err + return nil, err } - logs := exec.GetLogs(i, txs[i]) - receipts[i].Logs = logs - txNum++ + receipt.BlockHash = block.Hash() + receipts[i] = receipt } - return nil + return receipts, nil } diff --git a/p2p/sentry/sentry_multi_client/sentry_multi_client.go b/p2p/sentry/sentry_multi_client/sentry_multi_client.go index 8f41daf0231..9a6964a09ff 100644 --- a/p2p/sentry/sentry_multi_client/sentry_multi_client.go +++ b/p2p/sentry/sentry_multi_client/sentry_multi_client.go @@ -22,7 +22,6 @@ import ( "encoding/hex" "errors" "fmt" - "github.com/ledgerwatch/erigon/cmd/state/exec3" "golang.org/x/sync/semaphore" "math/rand" "sort" @@ -716,9 +715,7 @@ func (cs *MultiClient) getReceipts66(ctx context.Context, inreq *proto_sentry.In return err } defer tx.Rollback() - ttx := tx.(kv.TemporalTx) - exec := exec3.NewTraceWorker(ttx, cs.ChainConfig, cs.Engine, cs.blockReader, nil) - receipts, err := eth.AnswerGetReceiptsQuery(cs.blockReader, tx, query.GetReceiptsPacket, exec) + receipts, err := eth.AnswerGetReceiptsQuery(ctx, cs.blockReader, tx, query.GetReceiptsPacket, cs.ChainConfig, cs.Engine) if err != nil { return err } From 7cf43acb435d56b31bf9ca0e6491ecf8a8c0b21c Mon Sep 17 00:00:00 2001 From: JkLondon Date: Tue, 9 Jul 2024 12:16:22 +0300 Subject: [PATCH 09/29] save --- erigon-lib/txpool/fetch.go | 7 ++- eth/protocols/eth/handlers.go | 49 ++----------------- .../sentry_multi_client.go | 16 +++--- turbo/jsonrpc/eth_receipts.go | 5 ++ 4 files changed, 24 insertions(+), 53 deletions(-) diff --git a/erigon-lib/txpool/fetch.go b/erigon-lib/txpool/fetch.go index c09473acae8..d8a4322a6a9 100644 --- a/erigon-lib/txpool/fetch.go +++ b/erigon-lib/txpool/fetch.go @@ -146,7 +146,6 @@ func (f *Fetch) receiveMessageLoop(sentryClient sentry.SentryClient) { f.logger.Warn("[txpool.recvMessage] sentry not ready yet", "err", err) continue } - if err := f.receiveMessage(f.ctx, sentryClient); err != nil { if grpcutil.IsRetryLater(err) || grpcutil.IsEndOfStream(err) { time.Sleep(3 * time.Second) @@ -175,10 +174,11 @@ func (f *Fetch) receiveMessage(ctx context.Context, sentryClient sentry.SentryCl } return err } - + println("receive message") var req *sentry.InboundMessage for req, err = stream.Recv(); ; req, err = stream.Recv() { if err != nil { + f.logger.Error("[txpool.receiveMessage]", "err", err) select { case <-f.ctx.Done(): return ctx.Err() @@ -187,8 +187,10 @@ func (f *Fetch) receiveMessage(ctx context.Context, sentryClient sentry.SentryCl return err } if req == nil { + f.logger.Warn("[txpool.receiveMessage]", "req nil") return nil } + f.logger.Info("[txpool.receiveMessage]", "req", req) if err := f.handleInboundMessage(streamCtx, req, sentryClient); err != nil { if grpcutil.IsRetryLater(err) || grpcutil.IsEndOfStream(err) { time.Sleep(3 * time.Second) @@ -199,6 +201,7 @@ func (f *Fetch) receiveMessage(ctx context.Context, sentryClient sentry.SentryCl if f.wg != nil { f.wg.Done() } + f.logger.Info("[txpool.fetch] Handling incoming message", "msg", string(req.Data), "reqID", req.Id.String(), "err", err) } } diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go index 8bd5823a0a7..fe9681cb4f8 100644 --- a/eth/protocols/eth/handlers.go +++ b/eth/protocols/eth/handlers.go @@ -22,20 +22,14 @@ package eth import ( "context" "fmt" - "github.com/ledgerwatch/erigon-lib/chain" libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/log/v3" - "github.com/ledgerwatch/erigon/consensus" - "github.com/ledgerwatch/erigon/core" - "github.com/ledgerwatch/erigon/core/state" - "github.com/ledgerwatch/erigon/core/vm" - "github.com/ledgerwatch/erigon/turbo/transactions" - "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/rlp" + "github.com/ledgerwatch/erigon/turbo/jsonrpc" "github.com/ledgerwatch/erigon/turbo/services" ) @@ -165,7 +159,7 @@ func AnswerGetBlockBodiesQuery(db kv.Tx, query GetBlockBodiesPacket, blockReader return bodies } -func AnswerGetReceiptsQuery(ctx context.Context, br services.FullBlockReader, db kv.Tx, query GetReceiptsPacket, chainConfing *chain.Config, engine consensus.Engine) ([]rlp.RawValue, error) { //nolint:unparam +func AnswerGetReceiptsQuery(ctx context.Context, baseApi *jsonrpc.BaseAPI, br services.FullBlockReader, db kv.Tx, query GetReceiptsPacket) ([]rlp.RawValue, error) { //nolint:unparam // Gather state data until the fetch or network limits is reached var ( bytes int @@ -182,7 +176,7 @@ func AnswerGetReceiptsQuery(ctx context.Context, br services.FullBlockReader, db return nil, nil } // Retrieve the requested block's receipts - b, _, err := br.BlockWithSenders(context.Background(), db, hash, *number) + b, s, err := br.BlockWithSenders(context.Background(), db, hash, *number) if err != nil { return nil, err } @@ -190,7 +184,7 @@ func AnswerGetReceiptsQuery(ctx context.Context, br services.FullBlockReader, db return nil, nil } - results, err := MakeReceipts(ctx, db, b, br, chainConfing, engine) + results, err := baseApi.GetReceipts(ctx, db, b, s) if err != nil { return nil, err } @@ -205,38 +199,3 @@ func AnswerGetReceiptsQuery(ctx context.Context, br services.FullBlockReader, db } return receipts, nil } - -func MakeReceipts(ctx context.Context, db kv.Tx, block *types.Block, - br services.FullBlockReader, chainConfig *chain.Config, engine consensus.Engine) (receipts types.Receipts, err error) { - _, _, _, ibs, _, err := transactions.ComputeTxEnv(ctx, engine, block, chainConfig, br, db, 0) - if err != nil { - return nil, err - } - txs := block.Transactions() - getHeader := func(hash libcommon.Hash, number uint64) *types.Header { - h, e := br.Header(ctx, db, hash, number) - if e != nil { - log.Error("getHeader error", "number", number, "hash", hash, "err", e) - } - return h - } - header := block.Header() - usedGas := new(uint64) - usedBlobGas := new(uint64) - gp := new(core.GasPool).AddGas(block.GasLimit()).AddBlobGas(chainConfig.GetMaxBlobGasPerBlock()) - - noopWriter := state.NewNoopWriter() - - receipts = make(types.Receipts, len(block.Transactions())) - - for i := range txs { - ibs.SetTxContext(txs[i].Hash(), block.Hash(), i) - receipt, _, err := core.ApplyTransaction(chainConfig, core.GetHashFn(header, getHeader), engine, nil, gp, ibs, noopWriter, header, txs[i], usedGas, usedBlobGas, vm.Config{}) - if err != nil { - return nil, err - } - receipt.BlockHash = block.Hash() - receipts[i] = receipt - } - return receipts, nil -} diff --git a/p2p/sentry/sentry_multi_client/sentry_multi_client.go b/p2p/sentry/sentry_multi_client/sentry_multi_client.go index 9a6964a09ff..ea24e8d2c52 100644 --- a/p2p/sentry/sentry_multi_client/sentry_multi_client.go +++ b/p2p/sentry/sentry_multi_client/sentry_multi_client.go @@ -22,6 +22,9 @@ import ( "encoding/hex" "errors" "fmt" + "github.com/ledgerwatch/erigon-lib/common/datadir" + "github.com/ledgerwatch/erigon-lib/kv/kvcache" + "github.com/ledgerwatch/erigon/turbo/jsonrpc" "golang.org/x/sync/semaphore" "math/rand" "sort" @@ -699,7 +702,7 @@ func (cs *MultiClient) getBlockBodies66(ctx context.Context, inreq *proto_sentry return nil } -func (cs *MultiClient) getReceipts66(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry direct.SentryClient) error { +func (cs *MultiClient) getReceipts66(ctx context.Context, inreq *proto_sentry.InboundMessage, sentryClient direct.SentryClient) error { err := cs.onlyOneGoroutineController.Acquire(ctx, 1) if err != nil { return err @@ -715,11 +718,12 @@ func (cs *MultiClient) getReceipts66(ctx context.Context, inreq *proto_sentry.In return err } defer tx.Rollback() - receipts, err := eth.AnswerGetReceiptsQuery(ctx, cs.blockReader, tx, query.GetReceiptsPacket, cs.ChainConfig, cs.Engine) + cache := kvcache.NewDummy() + baseApi := jsonrpc.NewBaseApi(nil, cache, cs.blockReader, nil, true, time.Second, cs.Engine, datadir.New("")) + receipts, err := eth.AnswerGetReceiptsQuery(ctx, baseApi, cs.blockReader, tx, query.GetReceiptsPacket) if err != nil { return err } - tx.Rollback() b, err := rlp.EncodeToBytes(ð.ReceiptsRLPPacket66{ RequestId: query.RequestId, ReceiptsRLPPacket: receipts, @@ -734,14 +738,14 @@ func (cs *MultiClient) getReceipts66(ctx context.Context, inreq *proto_sentry.In Data: b, }, } - _, err = sentry.SendMessageById(ctx, &outreq, &grpc.EmptyCallOption{}) + _, err = sentryClient.SendMessageById(ctx, &outreq, &grpc.EmptyCallOption{}) if err != nil { if isPeerNotFoundErr(err) { return nil } - return fmt.Errorf("send bodies response: %w", err) + return fmt.Errorf("send receipts response: %w", err) } - //cs.logger.Info(fmt.Sprintf("[%s] GetReceipts responseLen %d", ConvertH512ToPeerID(inreq.PeerId), len(b))) + cs.logger.Info(fmt.Sprintf("[%s] GetReceipts responseLen %d", sentry.ConvertH512ToPeerID(inreq.PeerId), len(b))) return nil } diff --git a/turbo/jsonrpc/eth_receipts.go b/turbo/jsonrpc/eth_receipts.go index c7ff94ca19d..e2b23d64672 100644 --- a/turbo/jsonrpc/eth_receipts.go +++ b/turbo/jsonrpc/eth_receipts.go @@ -606,3 +606,8 @@ func (i *MapTxNum2BlockNumIter) Next() (txNum, blockNum uint64, txIndex int, isF isFinalTxn = txNum == i.maxTxNumInBlock return } + +// GetReceipts Made getReceipts method public to use in sentry multi client +func (api *BaseAPI) GetReceipts(ctx context.Context, tx kv.Tx, block *types.Block, senders []common.Address) (types.Receipts, error) { + return api.getReceipts(ctx, tx, block, senders) +} From 61ea0761b10664d1dde9b30c049626173484aa59 Mon Sep 17 00:00:00 2001 From: JkLondon Date: Tue, 9 Jul 2024 19:23:06 +0300 Subject: [PATCH 10/29] save --- p2p/sentry/sentry_multi_client/sentry_multi_client.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/p2p/sentry/sentry_multi_client/sentry_multi_client.go b/p2p/sentry/sentry_multi_client/sentry_multi_client.go index ea24e8d2c52..4d42d4e0c7d 100644 --- a/p2p/sentry/sentry_multi_client/sentry_multi_client.go +++ b/p2p/sentry/sentry_multi_client/sentry_multi_client.go @@ -295,8 +295,8 @@ type MultiClient struct { // decouple sentry multi client from header and body downloading logic is done disableBlockDownload bool - logger log.Logger - onlyOneGoroutineController *semaphore.Weighted + logger log.Logger + getReceiptsActiveGoroutineNumberController *semaphore.Weighted } func NewMultiClient( @@ -361,7 +361,7 @@ func NewMultiClient( maxBlockBroadcastPeers: maxBlockBroadcastPeers, disableBlockDownload: disableBlockDownload, logger: logger, - onlyOneGoroutineController: semaphore.NewWeighted(1), + getReceiptsActiveGoroutineNumberController: semaphore.NewWeighted(1), } return cs, nil @@ -703,11 +703,11 @@ func (cs *MultiClient) getBlockBodies66(ctx context.Context, inreq *proto_sentry } func (cs *MultiClient) getReceipts66(ctx context.Context, inreq *proto_sentry.InboundMessage, sentryClient direct.SentryClient) error { - err := cs.onlyOneGoroutineController.Acquire(ctx, 1) + err := cs.getReceiptsActiveGoroutineNumberController.Acquire(ctx, 1) if err != nil { return err } - defer cs.onlyOneGoroutineController.Release(1) + defer cs.getReceiptsActiveGoroutineNumberController.Release(1) var query eth.GetReceiptsPacket66 if err := rlp.DecodeBytes(inreq.Data, &query); err != nil { return fmt.Errorf("decoding getReceipts66: %w, data: %x", err, inreq.Data) From c965d3fd9e105bc32ab7bda06d639c65f139f407 Mon Sep 17 00:00:00 2001 From: JkLondon Date: Wed, 10 Jul 2024 16:52:18 +0300 Subject: [PATCH 11/29] save --- cmd/integration/commands/stages.go | 1 + eth/backend.go | 1 + .../sentry_multi_client.go | 21 ++++++++++++------- turbo/stages/blockchain_test.go | 4 ++-- turbo/stages/mock/mock_sentry.go | 7 +++++-- 5 files changed, 22 insertions(+), 12 deletions(-) diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index be94d1222c7..789a35eb80b 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -1461,6 +1461,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig, maxBlockBroadcastPeers, false, /* disableBlockDownload */ logger, + cfg.Dirs, ) if err != nil { panic(err) diff --git a/eth/backend.go b/eth/backend.go index 5bfd5889c0a..cb6987a80e3 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -636,6 +636,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger maxBlockBroadcastPeers, sentryMcDisableBlockDownload, logger, + dirs, ) if err != nil { return nil, err diff --git a/p2p/sentry/sentry_multi_client/sentry_multi_client.go b/p2p/sentry/sentry_multi_client/sentry_multi_client.go index 4d42d4e0c7d..7db0a9f15b7 100644 --- a/p2p/sentry/sentry_multi_client/sentry_multi_client.go +++ b/p2p/sentry/sentry_multi_client/sentry_multi_client.go @@ -295,8 +295,9 @@ type MultiClient struct { // decouple sentry multi client from header and body downloading logic is done disableBlockDownload bool - logger log.Logger - getReceiptsActiveGoroutineNumberController *semaphore.Weighted + logger log.Logger + getReceiptsActiveGoroutineNumber *semaphore.Weighted + baseApi *jsonrpc.BaseAPI } func NewMultiClient( @@ -312,6 +313,7 @@ func NewMultiClient( maxBlockBroadcastPeers func(*types.Header) uint, disableBlockDownload bool, logger log.Logger, + dirs datadir.Dirs, ) (*MultiClient, error) { // header downloader var hd *headerdownload.HeaderDownload @@ -347,6 +349,9 @@ func NewMultiClient( bd = &bodydownload.BodyDownload{} } + cache := kvcache.NewDummy() + baseApi := jsonrpc.NewBaseApi(nil, cache, blockReader, nil, true, time.Second, engine, dirs) + cs := &MultiClient{ Hd: hd, Bd: bd, @@ -361,7 +366,8 @@ func NewMultiClient( maxBlockBroadcastPeers: maxBlockBroadcastPeers, disableBlockDownload: disableBlockDownload, logger: logger, - getReceiptsActiveGoroutineNumberController: semaphore.NewWeighted(1), + getReceiptsActiveGoroutineNumber: semaphore.NewWeighted(1), + baseApi: baseApi, } return cs, nil @@ -703,11 +709,11 @@ func (cs *MultiClient) getBlockBodies66(ctx context.Context, inreq *proto_sentry } func (cs *MultiClient) getReceipts66(ctx context.Context, inreq *proto_sentry.InboundMessage, sentryClient direct.SentryClient) error { - err := cs.getReceiptsActiveGoroutineNumberController.Acquire(ctx, 1) + err := cs.getReceiptsActiveGoroutineNumber.Acquire(ctx, 1) if err != nil { return err } - defer cs.getReceiptsActiveGoroutineNumberController.Release(1) + defer cs.getReceiptsActiveGoroutineNumber.Release(1) var query eth.GetReceiptsPacket66 if err := rlp.DecodeBytes(inreq.Data, &query); err != nil { return fmt.Errorf("decoding getReceipts66: %w, data: %x", err, inreq.Data) @@ -718,9 +724,8 @@ func (cs *MultiClient) getReceipts66(ctx context.Context, inreq *proto_sentry.In return err } defer tx.Rollback() - cache := kvcache.NewDummy() - baseApi := jsonrpc.NewBaseApi(nil, cache, cs.blockReader, nil, true, time.Second, cs.Engine, datadir.New("")) - receipts, err := eth.AnswerGetReceiptsQuery(ctx, baseApi, cs.blockReader, tx, query.GetReceiptsPacket) + + receipts, err := eth.AnswerGetReceiptsQuery(ctx, cs.baseApi, cs.blockReader, tx, query.GetReceiptsPacket) if err != nil { return err } diff --git a/turbo/stages/blockchain_test.go b/turbo/stages/blockchain_test.go index 3d78e5d4f90..fbebec58cc3 100644 --- a/turbo/stages/blockchain_test.go +++ b/turbo/stages/blockchain_test.go @@ -24,7 +24,7 @@ import ( "encoding/binary" "errors" "fmt" - proto_sentry "github.com/ledgerwatch/erigon-lib/gointerfaces/sentryproto" + protosentry "github.com/ledgerwatch/erigon-lib/gointerfaces/sentryproto" "github.com/ledgerwatch/erigon/eth/protocols/eth" "github.com/ledgerwatch/erigon/rlp" "math" @@ -381,7 +381,7 @@ func testReorg(t *testing.T, first, second []int64, td int64) { } m.ReceiveWg.Add(1) - for _, err = range m.Send(&proto_sentry.InboundMessage{Id: proto_sentry.MessageId_GET_RECEIPTS_66, Data: b, PeerId: m.PeerId}) { + for _, err = range m.Send(&protosentry.InboundMessage{Id: protosentry.MessageId_GET_RECEIPTS_66, Data: b, PeerId: m.PeerId}) { if err != nil { t.Fatal(err) } diff --git a/turbo/stages/mock/mock_sentry.go b/turbo/stages/mock/mock_sentry.go index 5932f015952..69a5718ea13 100644 --- a/turbo/stages/mock/mock_sentry.go +++ b/turbo/stages/mock/mock_sentry.go @@ -313,8 +313,10 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK propagateNewBlockHashes := func(context.Context, []headerdownload.Announce) {} penalize := func(context.Context, []headerdownload.PenaltyItem) {} - mock.SentryClient = direct.NewSentryClientDirect(direct.ETH68, mock) - sentries := []direct.SentryClient{mock.SentryClient} + //mock.SentryClient = direct.NewSentryClientDirect(direct.ETH68, mock) + sentry68 := direct.NewSentryClientDirect(direct.ETH68, mock) + mock.SentryClient = direct.NewSentryClientDirect(direct.ETH66, mock) + sentries := []direct.SentryClient{mock.SentryClient, sentry68} sendBodyRequest := func(context.Context, *bodydownload.BodyRequest) ([64]byte, bool) { return [64]byte{}, false } blockPropagator := func(Ctx context.Context, header *types.Header, body *types.RawBody, td *big.Int) {} @@ -410,6 +412,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK maxBlockBroadcastPeers, false, /* disableBlockDownload */ logger, + dirs, ) if err != nil { if tb != nil { From f5740fe526e2cc7359df5bc0e4e570cbfa958130 Mon Sep 17 00:00:00 2001 From: JkLondon Date: Wed, 10 Jul 2024 18:05:44 +0300 Subject: [PATCH 12/29] save --- eth/protocols/eth/handlers.go | 7 ++++--- .../sentry_multi_client.go | 19 +++++++++---------- turbo/jsonrpc/daemon.go | 5 ++--- turbo/jsonrpc/eth_api.go | 8 +------- turbo/stages/mock/mock_sentry.go | 6 ++++-- 5 files changed, 20 insertions(+), 25 deletions(-) diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go index fe9681cb4f8..9c0b2d80759 100644 --- a/eth/protocols/eth/handlers.go +++ b/eth/protocols/eth/handlers.go @@ -29,7 +29,6 @@ import ( "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/rlp" - "github.com/ledgerwatch/erigon/turbo/jsonrpc" "github.com/ledgerwatch/erigon/turbo/services" ) @@ -159,7 +158,9 @@ func AnswerGetBlockBodiesQuery(db kv.Tx, query GetBlockBodiesPacket, blockReader return bodies } -func AnswerGetReceiptsQuery(ctx context.Context, baseApi *jsonrpc.BaseAPI, br services.FullBlockReader, db kv.Tx, query GetReceiptsPacket) ([]rlp.RawValue, error) { //nolint:unparam +type getReceiptsFunc func(context.Context, kv.Tx, *types.Block, []libcommon.Address) (types.Receipts, error) + +func AnswerGetReceiptsQuery(ctx context.Context, getReceipts getReceiptsFunc, br services.FullBlockReader, db kv.Tx, query GetReceiptsPacket) ([]rlp.RawValue, error) { //nolint:unparam // Gather state data until the fetch or network limits is reached var ( bytes int @@ -184,7 +185,7 @@ func AnswerGetReceiptsQuery(ctx context.Context, baseApi *jsonrpc.BaseAPI, br se return nil, nil } - results, err := baseApi.GetReceipts(ctx, db, b, s) + results, err := getReceipts(ctx, db, b, s) if err != nil { return nil, err } diff --git a/p2p/sentry/sentry_multi_client/sentry_multi_client.go b/p2p/sentry/sentry_multi_client/sentry_multi_client.go index 7db0a9f15b7..f5eee8b6966 100644 --- a/p2p/sentry/sentry_multi_client/sentry_multi_client.go +++ b/p2p/sentry/sentry_multi_client/sentry_multi_client.go @@ -22,9 +22,7 @@ import ( "encoding/hex" "errors" "fmt" - "github.com/ledgerwatch/erigon-lib/common/datadir" - "github.com/ledgerwatch/erigon-lib/kv/kvcache" - "github.com/ledgerwatch/erigon/turbo/jsonrpc" + "github.com/ledgerwatch/erigon-lib/common" "golang.org/x/sync/semaphore" "math/rand" "sort" @@ -297,7 +295,11 @@ type MultiClient struct { logger log.Logger getReceiptsActiveGoroutineNumber *semaphore.Weighted - baseApi *jsonrpc.BaseAPI + ethApiWrapper EthAPI +} + +type EthAPI interface { + GetReceipts(ctx context.Context, tx kv.Tx, block *types.Block, senders []common.Address) (types.Receipts, error) } func NewMultiClient( @@ -313,7 +315,7 @@ func NewMultiClient( maxBlockBroadcastPeers func(*types.Header) uint, disableBlockDownload bool, logger log.Logger, - dirs datadir.Dirs, + ethApiWrapper EthAPI, ) (*MultiClient, error) { // header downloader var hd *headerdownload.HeaderDownload @@ -349,9 +351,6 @@ func NewMultiClient( bd = &bodydownload.BodyDownload{} } - cache := kvcache.NewDummy() - baseApi := jsonrpc.NewBaseApi(nil, cache, blockReader, nil, true, time.Second, engine, dirs) - cs := &MultiClient{ Hd: hd, Bd: bd, @@ -367,7 +366,7 @@ func NewMultiClient( disableBlockDownload: disableBlockDownload, logger: logger, getReceiptsActiveGoroutineNumber: semaphore.NewWeighted(1), - baseApi: baseApi, + ethApiWrapper: ethApiWrapper, } return cs, nil @@ -725,7 +724,7 @@ func (cs *MultiClient) getReceipts66(ctx context.Context, inreq *proto_sentry.In } defer tx.Rollback() - receipts, err := eth.AnswerGetReceiptsQuery(ctx, cs.baseApi, cs.blockReader, tx, query.GetReceiptsPacket) + receipts, err := eth.AnswerGetReceiptsQuery(ctx, cs.ethApiWrapper.GetReceipts, cs.blockReader, tx, query.GetReceiptsPacket) if err != nil { return err } diff --git a/turbo/jsonrpc/daemon.go b/turbo/jsonrpc/daemon.go index 68d3dc95216..dc2fad727ba 100644 --- a/turbo/jsonrpc/daemon.go +++ b/turbo/jsonrpc/daemon.go @@ -22,7 +22,6 @@ import ( txpool "github.com/ledgerwatch/erigon-lib/gointerfaces/txpoolproto" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/kvcache" - libstate "github.com/ledgerwatch/erigon-lib/state" "github.com/ledgerwatch/erigon/cmd/rpcdaemon/cli/httpcfg" "github.com/ledgerwatch/erigon/consensus" "github.com/ledgerwatch/erigon/consensus/clique" @@ -35,10 +34,10 @@ import ( // APIList describes the list of available RPC apis func APIList(db kv.RoDB, eth rpchelper.ApiBackend, txPool txpool.TxpoolClient, mining txpool.MiningClient, filters *rpchelper.Filters, stateCache kvcache.Cache, - blockReader services.FullBlockReader, agg *libstate.Aggregator, cfg *httpcfg.HttpCfg, engine consensus.EngineReader, + blockReader services.FullBlockReader, cfg *httpcfg.HttpCfg, engine consensus.EngineReader, logger log.Logger, ) (list []rpc.API) { - base := NewBaseApi(filters, stateCache, blockReader, agg, cfg.WithDatadir, cfg.EvmCallTimeout, engine, cfg.Dirs) + base := NewBaseApi(filters, stateCache, blockReader, cfg.WithDatadir, cfg.EvmCallTimeout, engine) ethImpl := NewEthAPI(base, db, eth, txPool, mining, cfg.Gascap, cfg.Feecap, cfg.ReturnDataLimit, cfg.AllowUnprotectedTxs, cfg.MaxGetProofRewindBlockCount, cfg.WebsocketSubscribeLogsChannelSize, logger) erigonImpl := NewErigonAPI(base, db, eth) txpoolImpl := NewTxPoolAPI(base, db, txPool) diff --git a/turbo/jsonrpc/eth_api.go b/turbo/jsonrpc/eth_api.go index 4d2835755bf..85320fcc815 100644 --- a/turbo/jsonrpc/eth_api.go +++ b/turbo/jsonrpc/eth_api.go @@ -34,12 +34,10 @@ import ( "github.com/ledgerwatch/erigon-lib/chain" "github.com/ledgerwatch/erigon-lib/common" - "github.com/ledgerwatch/erigon-lib/common/datadir" "github.com/ledgerwatch/erigon-lib/common/hexutility" txpool "github.com/ledgerwatch/erigon-lib/gointerfaces/txpoolproto" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/kvcache" - libstate "github.com/ledgerwatch/erigon-lib/state" types2 "github.com/ledgerwatch/erigon-lib/types" "github.com/ledgerwatch/erigon/common/math" @@ -137,14 +135,12 @@ type BaseAPI struct { _blockReader services.FullBlockReader _txnReader services.TxnReader - _agg *libstate.Aggregator _engine consensus.EngineReader evmCallTimeout time.Duration - dirs datadir.Dirs } -func NewBaseApi(f *rpchelper.Filters, stateCache kvcache.Cache, blockReader services.FullBlockReader, agg *libstate.Aggregator, singleNodeMode bool, evmCallTimeout time.Duration, engine consensus.EngineReader, dirs datadir.Dirs) *BaseAPI { +func NewBaseApi(f *rpchelper.Filters, stateCache kvcache.Cache, blockReader services.FullBlockReader, singleNodeMode bool, evmCallTimeout time.Duration, engine consensus.EngineReader) *BaseAPI { var ( blocksLRUSize = 128 // ~32Mb receiptsCacheLimit = 32 @@ -170,10 +166,8 @@ func NewBaseApi(f *rpchelper.Filters, stateCache kvcache.Cache, blockReader serv receiptsCache: receiptsCache, _blockReader: blockReader, _txnReader: blockReader, - _agg: agg, evmCallTimeout: evmCallTimeout, _engine: engine, - dirs: dirs, } } diff --git a/turbo/stages/mock/mock_sentry.go b/turbo/stages/mock/mock_sentry.go index 69a5718ea13..364186c0f30 100644 --- a/turbo/stages/mock/mock_sentry.go +++ b/turbo/stages/mock/mock_sentry.go @@ -21,6 +21,7 @@ import ( "crypto/ecdsa" "errors" "fmt" + "github.com/ledgerwatch/erigon/turbo/jsonrpc" "math/big" "os" "sync" @@ -398,7 +399,8 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK ) maxBlockBroadcastPeers := func(header *types.Header) uint { return 0 } - + cache := kvcache.NewDummy() + ethApi := jsonrpc.NewBaseApi(nil, cache, mock.BlockReader, true, time.Second, engine) mock.sentriesClient, err = sentry_multi_client.NewMultiClient( mock.DB, mock.ChainConfig, @@ -412,7 +414,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK maxBlockBroadcastPeers, false, /* disableBlockDownload */ logger, - dirs, + ethApi, ) if err != nil { if tb != nil { From 9f1bb35e31b0417591b96d0fadf21696ce4fb9b5 Mon Sep 17 00:00:00 2001 From: JkLondon Date: Thu, 11 Jul 2024 10:28:18 +0300 Subject: [PATCH 13/29] save --- eth/backend.go | 1 - .../sentry_multi_client.go | 13 ++- turbo/jsonrpc/debug_api_test.go | 3 +- turbo/jsonrpc/eth_api.go | 23 +++-- turbo/jsonrpc/eth_api_test.go | 6 +- turbo/jsonrpc/eth_block_test.go | 3 +- turbo/jsonrpc/eth_callMany_test.go | 4 +- turbo/jsonrpc/eth_call_test.go | 6 +- turbo/jsonrpc/eth_filters_test.go | 3 +- turbo/jsonrpc/eth_mining_test.go | 3 +- turbo/jsonrpc/eth_receipts.go | 54 +----------- turbo/jsonrpc/gen_traces_test.go | 6 +- ...an_transaction_by_sender_and_nonce_test.go | 3 +- turbo/jsonrpc/parity_api_test.go | 3 +- turbo/jsonrpc/receipts/receipts_generator.go | 85 +++++++++++++++++++ turbo/jsonrpc/send_transaction_test.go | 3 +- turbo/jsonrpc/txpool_api_test.go | 3 +- turbo/stages/mock/mock_sentry.go | 11 +-- 18 files changed, 130 insertions(+), 103 deletions(-) create mode 100644 turbo/jsonrpc/receipts/receipts_generator.go diff --git a/eth/backend.go b/eth/backend.go index cb6987a80e3..5bfd5889c0a 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -636,7 +636,6 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger maxBlockBroadcastPeers, sentryMcDisableBlockDownload, logger, - dirs, ) if err != nil { return nil, err diff --git a/p2p/sentry/sentry_multi_client/sentry_multi_client.go b/p2p/sentry/sentry_multi_client/sentry_multi_client.go index f5eee8b6966..7d1d9f7ab89 100644 --- a/p2p/sentry/sentry_multi_client/sentry_multi_client.go +++ b/p2p/sentry/sentry_multi_client/sentry_multi_client.go @@ -22,7 +22,9 @@ import ( "encoding/hex" "errors" "fmt" + lru "github.com/hashicorp/golang-lru/v2" "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon/turbo/jsonrpc/receipts" "golang.org/x/sync/semaphore" "math/rand" "sort" @@ -315,7 +317,6 @@ func NewMultiClient( maxBlockBroadcastPeers func(*types.Header) uint, disableBlockDownload bool, logger log.Logger, - ethApiWrapper EthAPI, ) (*MultiClient, error) { // header downloader var hd *headerdownload.HeaderDownload @@ -351,6 +352,14 @@ func NewMultiClient( bd = &bodydownload.BodyDownload{} } + receiptsCacheLimit := 32 + receiptsCache, err := lru.New[common.Hash, []*types.Receipt](receiptsCacheLimit) + if err != nil { + return nil, err + } + + receiptsGenerator := receipts.NewGenerator(receiptsCache, blockReader, engine) + cs := &MultiClient{ Hd: hd, Bd: bd, @@ -366,7 +375,7 @@ func NewMultiClient( disableBlockDownload: disableBlockDownload, logger: logger, getReceiptsActiveGoroutineNumber: semaphore.NewWeighted(1), - ethApiWrapper: ethApiWrapper, + ethApiWrapper: receiptsGenerator, } return cs, nil diff --git a/turbo/jsonrpc/debug_api_test.go b/turbo/jsonrpc/debug_api_test.go index 53604794e42..92490ff3886 100644 --- a/turbo/jsonrpc/debug_api_test.go +++ b/turbo/jsonrpc/debug_api_test.go @@ -68,9 +68,8 @@ var debugTraceTransactionNoRefundTests = []struct { func TestTraceBlockByNumber(t *testing.T) { m, _, _ := rpcdaemontest.CreateTestSentry(t) - agg := m.HistoryV3Components() stateCache := kvcache.New(kvcache.DefaultCoherentConfig) - baseApi := NewBaseApi(nil, stateCache, m.BlockReader, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine, m.Dirs) + baseApi := NewBaseApi(nil, stateCache, m.BlockReader, false, rpccfg.DefaultEvmCallTimeout, m.Engine) ethApi := NewEthAPI(baseApi, m.DB, nil, nil, nil, 5000000, 1e18, 100_000, false, 100_000, 128, log.New()) api := NewPrivateDebugAPI(baseApi, m.DB, 0) for _, tt := range debugTraceTransactionTests { diff --git a/turbo/jsonrpc/eth_api.go b/turbo/jsonrpc/eth_api.go index 85320fcc815..27256dd6cf4 100644 --- a/turbo/jsonrpc/eth_api.go +++ b/turbo/jsonrpc/eth_api.go @@ -20,6 +20,7 @@ import ( "bytes" "context" "fmt" + "github.com/ledgerwatch/erigon/turbo/jsonrpc/receipts" "math/big" "sync" "sync/atomic" @@ -137,7 +138,8 @@ type BaseAPI struct { _txnReader services.TxnReader _engine consensus.EngineReader - evmCallTimeout time.Duration + evmCallTimeout time.Duration + receiptsGenerator *receipts.Generator } func NewBaseApi(f *rpchelper.Filters, stateCache kvcache.Cache, blockReader services.FullBlockReader, singleNodeMode bool, evmCallTimeout time.Duration, engine consensus.EngineReader) *BaseAPI { @@ -159,15 +161,18 @@ func NewBaseApi(f *rpchelper.Filters, stateCache kvcache.Cache, blockReader serv panic(err) } + receiptsGenerator := receipts.NewGenerator(receiptsCache, blockReader, engine) + return &BaseAPI{ - filters: f, - stateCache: stateCache, - blocksLRU: blocksLRU, - receiptsCache: receiptsCache, - _blockReader: blockReader, - _txnReader: blockReader, - evmCallTimeout: evmCallTimeout, - _engine: engine, + filters: f, + stateCache: stateCache, + blocksLRU: blocksLRU, + receiptsCache: receiptsCache, + _blockReader: blockReader, + _txnReader: blockReader, + evmCallTimeout: evmCallTimeout, + _engine: engine, + receiptsGenerator: receiptsGenerator, } } diff --git a/turbo/jsonrpc/eth_api_test.go b/turbo/jsonrpc/eth_api_test.go index 00bd0b34308..fb558773bd2 100644 --- a/turbo/jsonrpc/eth_api_test.go +++ b/turbo/jsonrpc/eth_api_test.go @@ -41,9 +41,8 @@ import ( ) func newBaseApiForTest(m *mock.MockSentry) *BaseAPI { - agg := m.HistoryV3Components() stateCache := kvcache.New(kvcache.DefaultCoherentConfig) - return NewBaseApi(nil, stateCache, m.BlockReader, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine, m.Dirs) + return NewBaseApi(nil, stateCache, m.BlockReader, false, rpccfg.DefaultEvmCallTimeout, m.Engine) } func TestGetBalanceChangesInBlock(t *testing.T) { @@ -71,9 +70,8 @@ func TestGetBalanceChangesInBlock(t *testing.T) { func TestGetTransactionReceipt(t *testing.T) { m, _, _ := rpcdaemontest.CreateTestSentry(t) db := m.DB - agg := m.HistoryV3Components() stateCache := kvcache.New(kvcache.DefaultCoherentConfig) - api := NewEthAPI(NewBaseApi(nil, stateCache, m.BlockReader, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine, m.Dirs), db, nil, nil, nil, 5000000, 1e18, 100_000, false, 100_000, 128, log.New()) + api := NewEthAPI(NewBaseApi(nil, stateCache, m.BlockReader, false, rpccfg.DefaultEvmCallTimeout, m.Engine), db, nil, nil, nil, 5000000, 1e18, 100_000, false, 100_000, 128, log.New()) // Call GetTransactionReceipt for transaction which is not in the database if _, err := api.GetTransactionReceipt(context.Background(), common.Hash{}); err != nil { t.Errorf("calling GetTransactionReceipt with empty hash: %v", err) diff --git a/turbo/jsonrpc/eth_block_test.go b/turbo/jsonrpc/eth_block_test.go index 011680729ce..8962d0399fa 100644 --- a/turbo/jsonrpc/eth_block_test.go +++ b/turbo/jsonrpc/eth_block_test.go @@ -85,7 +85,6 @@ func TestGetBlockByNumberWithLatestTag_WithHeadHashInDb(t *testing.T) { func TestGetBlockByNumberWithPendingTag(t *testing.T) { m := mock.MockWithTxPool(t) - agg := m.HistoryV3Components() stateCache := kvcache.New(kvcache.DefaultCoherentConfig) ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, m) @@ -105,7 +104,7 @@ func TestGetBlockByNumberWithPendingTag(t *testing.T) { RplBlock: rlpBlock, }) - api := NewEthAPI(NewBaseApi(ff, stateCache, m.BlockReader, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine, m.Dirs), m.DB, nil, nil, nil, 5000000, 1e18, 100_000, false, 100_000, 128, log.New()) + api := NewEthAPI(NewBaseApi(ff, stateCache, m.BlockReader, false, rpccfg.DefaultEvmCallTimeout, m.Engine), m.DB, nil, nil, nil, 5000000, 1e18, 100_000, false, 100_000, 128, log.New()) b, err := api.GetBlockByNumber(context.Background(), rpc.PendingBlockNumber, false) if err != nil { t.Errorf("error getting block number with pending tag: %s", err) diff --git a/turbo/jsonrpc/eth_callMany_test.go b/turbo/jsonrpc/eth_callMany_test.go index 22d44a82a9b..c73118370fc 100644 --- a/turbo/jsonrpc/eth_callMany_test.go +++ b/turbo/jsonrpc/eth_callMany_test.go @@ -26,7 +26,6 @@ import ( "github.com/ledgerwatch/erigon-lib/common/hexutil" - "github.com/ledgerwatch/erigon-lib/common/datadir" "github.com/ledgerwatch/erigon-lib/common/hexutility" "github.com/ledgerwatch/erigon-lib/kv/kvcache" @@ -101,8 +100,7 @@ func TestCallMany(t *testing.T) { db := contractBackend.DB() engine := contractBackend.Engine() - api := NewEthAPI(NewBaseApi(nil, stateCache, contractBackend.BlockReader(), contractBackend.Agg(), false, rpccfg.DefaultEvmCallTimeout, engine, - datadir.New(t.TempDir())), db, nil, nil, nil, 5000000, 1e18, 100_000, false, 100_000, 128, log.New()) + api := NewEthAPI(NewBaseApi(nil, stateCache, contractBackend.BlockReader(), false, rpccfg.DefaultEvmCallTimeout, engine), db, nil, nil, nil, 5000000, 1e18, 100_000, false, 100_000, 128, log.New()) callArgAddr1 := ethapi.CallArgs{From: &address, To: &tokenAddr, Nonce: &nonce, MaxPriorityFeePerGas: (*hexutil.Big)(big.NewInt(1e9)), diff --git a/turbo/jsonrpc/eth_call_test.go b/turbo/jsonrpc/eth_call_test.go index 5fd618fb9d8..405c24b63fa 100644 --- a/turbo/jsonrpc/eth_call_test.go +++ b/turbo/jsonrpc/eth_call_test.go @@ -55,12 +55,11 @@ import ( func TestEstimateGas(t *testing.T) { m, _, _ := rpcdaemontest.CreateTestSentry(t) - agg := m.HistoryV3Components() stateCache := kvcache.New(kvcache.DefaultCoherentConfig) ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, mock.Mock(t)) mining := txpool.NewMiningClient(conn) ff := rpchelper.New(ctx, rpchelper.DefaultFiltersConfig, nil, nil, mining, func() {}, m.Log) - api := NewEthAPI(NewBaseApi(ff, stateCache, m.BlockReader, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine, m.Dirs), m.DB, nil, nil, nil, 5000000, 1e18, 100_000, false, 100_000, 128, log.New()) + api := NewEthAPI(NewBaseApi(ff, stateCache, m.BlockReader, false, rpccfg.DefaultEvmCallTimeout, m.Engine), m.DB, nil, nil, nil, 5000000, 1e18, 100_000, false, 100_000, 128, log.New()) var from = libcommon.HexToAddress("0x71562b71999873db5b286df957af199ec94617f7") var to = libcommon.HexToAddress("0x0d3ab14bbad3d99f4203bd7a11acb94882050e7e") if _, err := api.EstimateGas(context.Background(), ðapi.CallArgs{ @@ -73,9 +72,8 @@ func TestEstimateGas(t *testing.T) { func TestEthCallNonCanonical(t *testing.T) { m, _, _ := rpcdaemontest.CreateTestSentry(t) - agg := m.HistoryV3Components() stateCache := kvcache.New(kvcache.DefaultCoherentConfig) - api := NewEthAPI(NewBaseApi(nil, stateCache, m.BlockReader, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine, m.Dirs), m.DB, nil, nil, nil, 5000000, 1e18, 100_000, false, 100_000, 128, log.New()) + api := NewEthAPI(NewBaseApi(nil, stateCache, m.BlockReader, false, rpccfg.DefaultEvmCallTimeout, m.Engine), m.DB, nil, nil, nil, 5000000, 1e18, 100_000, false, 100_000, 128, log.New()) var from = libcommon.HexToAddress("0x71562b71999873db5b286df957af199ec94617f7") var to = libcommon.HexToAddress("0x0d3ab14bbad3d99f4203bd7a11acb94882050e7e") if _, err := api.Call(context.Background(), ethapi.CallArgs{ diff --git a/turbo/jsonrpc/eth_filters_test.go b/turbo/jsonrpc/eth_filters_test.go index 817b1f5b385..98ce50899ad 100644 --- a/turbo/jsonrpc/eth_filters_test.go +++ b/turbo/jsonrpc/eth_filters_test.go @@ -43,12 +43,11 @@ import ( func TestNewFilters(t *testing.T) { assert := assert.New(t) m, _, _ := rpcdaemontest.CreateTestSentry(t) - agg := m.HistoryV3Components() stateCache := kvcache.New(kvcache.DefaultCoherentConfig) ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, mock.Mock(t)) mining := txpool.NewMiningClient(conn) ff := rpchelper.New(ctx, rpchelper.DefaultFiltersConfig, nil, nil, mining, func() {}, m.Log) - api := NewEthAPI(NewBaseApi(ff, stateCache, m.BlockReader, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine, m.Dirs), m.DB, nil, nil, nil, 5000000, 1e18, 100_000, false, 100_000, 128, log.New()) + api := NewEthAPI(NewBaseApi(ff, stateCache, m.BlockReader, false, rpccfg.DefaultEvmCallTimeout, m.Engine), m.DB, nil, nil, nil, 5000000, 1e18, 100_000, false, 100_000, 128, log.New()) ptf, err := api.NewPendingTransactionFilter(ctx) assert.Nil(err) diff --git a/turbo/jsonrpc/eth_mining_test.go b/turbo/jsonrpc/eth_mining_test.go index af41d4fc55c..4142d0abbd5 100644 --- a/turbo/jsonrpc/eth_mining_test.go +++ b/turbo/jsonrpc/eth_mining_test.go @@ -44,8 +44,7 @@ func TestPendingBlock(t *testing.T) { ff := rpchelper.New(ctx, rpchelper.DefaultFiltersConfig, nil, nil, mining, func() {}, m.Log) stateCache := kvcache.New(kvcache.DefaultCoherentConfig) engine := ethash.NewFaker() - api := NewEthAPI(NewBaseApi(ff, stateCache, m.BlockReader, nil, false, rpccfg.DefaultEvmCallTimeout, engine, - m.Dirs), nil, nil, nil, mining, 5000000, 1e18, 100_000, false, 100_000, 128, log.New()) + api := NewEthAPI(NewBaseApi(ff, stateCache, m.BlockReader, false, rpccfg.DefaultEvmCallTimeout, engine), nil, nil, nil, mining, 5000000, 1e18, 100_000, false, 100_000, 128, log.New()) expect := uint64(12345) b, err := rlp.EncodeToBytes(types.NewBlockWithHeader(&types.Header{Number: new(big.Int).SetUint64(expect)})) require.NoError(t, err) diff --git a/turbo/jsonrpc/eth_receipts.go b/turbo/jsonrpc/eth_receipts.go index e2b23d64672..2e3db20be19 100644 --- a/turbo/jsonrpc/eth_receipts.go +++ b/turbo/jsonrpc/eth_receipts.go @@ -32,69 +32,24 @@ import ( "github.com/ledgerwatch/erigon-lib/kv/rawdbv3" "github.com/ledgerwatch/erigon/cmd/state/exec3" - "github.com/ledgerwatch/erigon/core" "github.com/ledgerwatch/erigon/core/rawdb" - "github.com/ledgerwatch/erigon/core/state" "github.com/ledgerwatch/erigon/core/types" - "github.com/ledgerwatch/erigon/core/vm" "github.com/ledgerwatch/erigon/eth/ethutils" "github.com/ledgerwatch/erigon/eth/filters" bortypes "github.com/ledgerwatch/erigon/polygon/bor/types" "github.com/ledgerwatch/erigon/rpc" "github.com/ledgerwatch/erigon/turbo/rpchelper" - "github.com/ledgerwatch/erigon/turbo/transactions" ) // getReceipts - checking in-mem cache, or else fallback to db, or else fallback to re-exec of block to re-gen receipts func (api *BaseAPI) getReceipts(ctx context.Context, tx kv.Tx, block *types.Block, senders []common.Address) (types.Receipts, error) { - if receipts, ok := api.receiptsCache.Get(block.Hash()); ok { - return receipts, nil - } - - if receipts := rawdb.ReadReceipts(tx, block, senders); receipts != nil { - api.receiptsCache.Add(block.Hash(), receipts) - return receipts, nil - } - - engine := api.engine() chainConfig, err := api.chainConfig(ctx, tx) if err != nil { return nil, err } + api.receiptsGenerator.SetChainConfig(chainConfig) - _, _, _, ibs, _, err := transactions.ComputeTxEnv(ctx, engine, block, chainConfig, api._blockReader, tx, 0) - if err != nil { - return nil, err - } - - usedGas := new(uint64) - usedBlobGas := new(uint64) - gp := new(core.GasPool).AddGas(block.GasLimit()).AddBlobGas(chainConfig.GetMaxBlobGasPerBlock()) - - noopWriter := state.NewNoopWriter() - - receipts := make(types.Receipts, len(block.Transactions())) - - getHeader := func(hash common.Hash, number uint64) *types.Header { - h, e := api._blockReader.Header(ctx, tx, hash, number) - if e != nil { - log.Error("getHeader error", "number", number, "hash", hash, "err", e) - } - return h - } - header := block.Header() - for i, txn := range block.Transactions() { - ibs.SetTxContext(txn.Hash(), block.Hash(), i) - receipt, _, err := core.ApplyTransaction(chainConfig, core.GetHashFn(header, getHeader), engine, nil, gp, ibs, noopWriter, header, txn, usedGas, usedBlobGas, vm.Config{}) - if err != nil { - return nil, err - } - receipt.BlockHash = block.Hash() - receipts[i] = receipt - } - - api.receiptsCache.Add(block.Hash(), receipts) - return receipts, nil + return api.receiptsGenerator.GetReceipts(ctx, tx, block, senders) } // GetLogs implements eth_getLogs. Returns an array of logs matching a given filter object. @@ -606,8 +561,3 @@ func (i *MapTxNum2BlockNumIter) Next() (txNum, blockNum uint64, txIndex int, isF isFinalTxn = txNum == i.maxTxNumInBlock return } - -// GetReceipts Made getReceipts method public to use in sentry multi client -func (api *BaseAPI) GetReceipts(ctx context.Context, tx kv.Tx, block *types.Block, senders []common.Address) (types.Receipts, error) { - return api.getReceipts(ctx, tx, block, senders) -} diff --git a/turbo/jsonrpc/gen_traces_test.go b/turbo/jsonrpc/gen_traces_test.go index 1fbb8bde450..2ac2c115e56 100644 --- a/turbo/jsonrpc/gen_traces_test.go +++ b/turbo/jsonrpc/gen_traces_test.go @@ -44,9 +44,8 @@ Testing tracing RPC API by generating patters of contracts invoking one another func TestGeneratedDebugApi(t *testing.T) { m := rpcdaemontest.CreateTestSentryForTraces(t) - agg := m.HistoryV3Components() stateCache := kvcache.New(kvcache.DefaultCoherentConfig) - baseApi := NewBaseApi(nil, stateCache, m.BlockReader, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine, m.Dirs) + baseApi := NewBaseApi(nil, stateCache, m.BlockReader, false, rpccfg.DefaultEvmCallTimeout, m.Engine) api := NewPrivateDebugAPI(baseApi, m.DB, 0) var buf bytes.Buffer stream := jsoniter.NewStream(jsoniter.ConfigDefault, &buf, 4096) @@ -132,9 +131,8 @@ func TestGeneratedDebugApi(t *testing.T) { func TestGeneratedTraceApi(t *testing.T) { m := rpcdaemontest.CreateTestSentryForTraces(t) - agg := m.HistoryV3Components() stateCache := kvcache.New(kvcache.DefaultCoherentConfig) - baseApi := NewBaseApi(nil, stateCache, m.BlockReader, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine, m.Dirs) + baseApi := NewBaseApi(nil, stateCache, m.BlockReader, false, rpccfg.DefaultEvmCallTimeout, m.Engine) api := NewTraceAPI(baseApi, m.DB, &httpcfg.HttpCfg{}) traces, err := api.Block(context.Background(), rpc.BlockNumber(1), new(bool), nil) if err != nil { diff --git a/turbo/jsonrpc/otterscan_transaction_by_sender_and_nonce_test.go b/turbo/jsonrpc/otterscan_transaction_by_sender_and_nonce_test.go index 6568091cdab..42ee6dfae6c 100644 --- a/turbo/jsonrpc/otterscan_transaction_by_sender_and_nonce_test.go +++ b/turbo/jsonrpc/otterscan_transaction_by_sender_and_nonce_test.go @@ -27,8 +27,7 @@ import ( func TestGetTransactionBySenderAndNonce(t *testing.T) { m, _, _ := rpcdaemontest.CreateTestSentry(t) - agg := m.HistoryV3Components() - api := NewOtterscanAPI(NewBaseApi(nil, nil, m.BlockReader, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine, m.Dirs), m.DB, 25) + api := NewOtterscanAPI(NewBaseApi(nil, nil, m.BlockReader, false, rpccfg.DefaultEvmCallTimeout, m.Engine), m.DB, 25) addr := common.HexToAddress("0x537e697c7ab75a26f9ecf0ce810e3154dfcaaf44") expectCreator := common.HexToAddress("0x71562b71999873db5b286df957af199ec94617f7") diff --git a/turbo/jsonrpc/parity_api_test.go b/turbo/jsonrpc/parity_api_test.go index 5a8449ecf4a..b9c5a110e73 100644 --- a/turbo/jsonrpc/parity_api_test.go +++ b/turbo/jsonrpc/parity_api_test.go @@ -37,8 +37,7 @@ var latestBlock = rpc.BlockNumberOrHashWithNumber(rpc.LatestBlockNumber) func TestParityAPIImpl_ListStorageKeys_NoOffset(t *testing.T) { assert := assert.New(t) m, _, _ := rpcdaemontest.CreateTestSentry(t) - agg := m.HistoryV3Components() - baseApi := NewBaseApi(nil, nil, m.BlockReader, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine, m.Dirs) + baseApi := NewBaseApi(nil, nil, m.BlockReader, false, rpccfg.DefaultEvmCallTimeout, m.Engine) api := NewParityAPIImpl(baseApi, m.DB) answers := []string{ "0000000000000000000000000000000000000000000000000000000000000000", diff --git a/turbo/jsonrpc/receipts/receipts_generator.go b/turbo/jsonrpc/receipts/receipts_generator.go new file mode 100644 index 00000000000..6faed9eb13e --- /dev/null +++ b/turbo/jsonrpc/receipts/receipts_generator.go @@ -0,0 +1,85 @@ +package receipts + +import ( + "context" + lru "github.com/hashicorp/golang-lru/v2" + "github.com/ledgerwatch/erigon-lib/chain" + "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon-lib/log/v3" + "github.com/ledgerwatch/erigon/consensus" + "github.com/ledgerwatch/erigon/core" + "github.com/ledgerwatch/erigon/core/rawdb" + "github.com/ledgerwatch/erigon/core/state" + "github.com/ledgerwatch/erigon/core/types" + "github.com/ledgerwatch/erigon/core/vm" + "github.com/ledgerwatch/erigon/turbo/services" + "github.com/ledgerwatch/erigon/turbo/transactions" +) + +type Generator struct { + receiptsCache *lru.Cache[common.Hash, []*types.Receipt] + blockReader services.FullBlockReader + engine consensus.EngineReader + cfg *chain.Config +} + +func NewGenerator(receiptsCache *lru.Cache[common.Hash, []*types.Receipt], blockReader services.FullBlockReader, + engine consensus.EngineReader) *Generator { + return &Generator{ + receiptsCache: receiptsCache, + blockReader: blockReader, + engine: engine, + } +} + +func (g *Generator) GetReceipts(ctx context.Context, tx kv.Tx, block *types.Block, senders []common.Address) (types.Receipts, error) { + if receipts, ok := g.receiptsCache.Get(block.Hash()); ok { + return receipts, nil + } + + if receipts := rawdb.ReadReceipts(tx, block, senders); receipts != nil { + g.receiptsCache.Add(block.Hash(), receipts) + return receipts, nil + } + + engine := g.engine + + _, _, _, ibs, _, err := transactions.ComputeTxEnv(ctx, engine, block, g.cfg, g.blockReader, tx, 0) + if err != nil { + return nil, err + } + + usedGas := new(uint64) + usedBlobGas := new(uint64) + gp := new(core.GasPool).AddGas(block.GasLimit()).AddBlobGas(g.cfg.GetMaxBlobGasPerBlock()) + + noopWriter := state.NewNoopWriter() + + receipts := make(types.Receipts, len(block.Transactions())) + + getHeader := func(hash common.Hash, number uint64) *types.Header { + h, e := g.blockReader.Header(ctx, tx, hash, number) + if e != nil { + log.Error("getHeader error", "number", number, "hash", hash, "err", e) + } + return h + } + header := block.Header() + for i, txn := range block.Transactions() { + ibs.SetTxContext(txn.Hash(), block.Hash(), i) + receipt, _, err := core.ApplyTransaction(g.cfg, core.GetHashFn(header, getHeader), engine, nil, gp, ibs, noopWriter, header, txn, usedGas, usedBlobGas, vm.Config{}) + if err != nil { + return nil, err + } + receipt.BlockHash = block.Hash() + receipts[i] = receipt + } + + g.receiptsCache.Add(block.Hash(), receipts) + return receipts, nil +} + +func (g *Generator) SetChainConfig(config *chain.Config) { + g.cfg = config +} diff --git a/turbo/jsonrpc/send_transaction_test.go b/turbo/jsonrpc/send_transaction_test.go index ddd05d5f6ee..7e7984feaff 100644 --- a/turbo/jsonrpc/send_transaction_test.go +++ b/turbo/jsonrpc/send_transaction_test.go @@ -53,9 +53,8 @@ import ( ) func newBaseApiForTest(m *mock.MockSentry) *jsonrpc.BaseAPI { - agg := m.HistoryV3Components() stateCache := kvcache.New(kvcache.DefaultCoherentConfig) - return jsonrpc.NewBaseApi(nil, stateCache, m.BlockReader, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine, m.Dirs) + return jsonrpc.NewBaseApi(nil, stateCache, m.BlockReader, false, rpccfg.DefaultEvmCallTimeout, m.Engine) } // Do 1 step to start txPool diff --git a/turbo/jsonrpc/txpool_api_test.go b/turbo/jsonrpc/txpool_api_test.go index 5abee8d0872..623446e6988 100644 --- a/turbo/jsonrpc/txpool_api_test.go +++ b/turbo/jsonrpc/txpool_api_test.go @@ -55,8 +55,7 @@ func TestTxPoolContent(t *testing.T) { ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, m) txPool := txpool.NewTxpoolClient(conn) ff := rpchelper.New(ctx, rpchelper.DefaultFiltersConfig, nil, txPool, txpool.NewMiningClient(conn), func() {}, m.Log) - agg := m.HistoryV3Components() - api := NewTxPoolAPI(NewBaseApi(ff, kvcache.New(kvcache.DefaultCoherentConfig), m.BlockReader, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine, m.Dirs), m.DB, txPool) + api := NewTxPoolAPI(NewBaseApi(ff, kvcache.New(kvcache.DefaultCoherentConfig), m.BlockReader, false, rpccfg.DefaultEvmCallTimeout, m.Engine), m.DB, txPool) expectValue := uint64(1234) txn, err := types.SignTx(types.NewTransaction(0, libcommon.Address{1}, uint256.NewInt(expectValue), params.TxGas, uint256.NewInt(10*params.GWei), nil), *types.LatestSignerForChainID(m.ChainConfig.ChainID), m.Key) diff --git a/turbo/stages/mock/mock_sentry.go b/turbo/stages/mock/mock_sentry.go index 364186c0f30..5932f015952 100644 --- a/turbo/stages/mock/mock_sentry.go +++ b/turbo/stages/mock/mock_sentry.go @@ -21,7 +21,6 @@ import ( "crypto/ecdsa" "errors" "fmt" - "github.com/ledgerwatch/erigon/turbo/jsonrpc" "math/big" "os" "sync" @@ -314,10 +313,8 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK propagateNewBlockHashes := func(context.Context, []headerdownload.Announce) {} penalize := func(context.Context, []headerdownload.PenaltyItem) {} - //mock.SentryClient = direct.NewSentryClientDirect(direct.ETH68, mock) - sentry68 := direct.NewSentryClientDirect(direct.ETH68, mock) - mock.SentryClient = direct.NewSentryClientDirect(direct.ETH66, mock) - sentries := []direct.SentryClient{mock.SentryClient, sentry68} + mock.SentryClient = direct.NewSentryClientDirect(direct.ETH68, mock) + sentries := []direct.SentryClient{mock.SentryClient} sendBodyRequest := func(context.Context, *bodydownload.BodyRequest) ([64]byte, bool) { return [64]byte{}, false } blockPropagator := func(Ctx context.Context, header *types.Header, body *types.RawBody, td *big.Int) {} @@ -399,8 +396,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK ) maxBlockBroadcastPeers := func(header *types.Header) uint { return 0 } - cache := kvcache.NewDummy() - ethApi := jsonrpc.NewBaseApi(nil, cache, mock.BlockReader, true, time.Second, engine) + mock.sentriesClient, err = sentry_multi_client.NewMultiClient( mock.DB, mock.ChainConfig, @@ -414,7 +410,6 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK maxBlockBroadcastPeers, false, /* disableBlockDownload */ logger, - ethApi, ) if err != nil { if tb != nil { From e879bd8cc06bf6a5edc141c906b265223e357a9e Mon Sep 17 00:00:00 2001 From: JkLondon Date: Thu, 11 Jul 2024 10:38:31 +0300 Subject: [PATCH 14/29] save --- eth/backend.go | 2 +- turbo/engineapi/engine_server.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/eth/backend.go b/eth/backend.go index 5bfd5889c0a..c6be5c24b3c 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -1073,7 +1073,7 @@ func (s *Ethereum) Init(stack *node.Node, config *ethconfig.Config, chainConfig } } - s.apiList = jsonrpc.APIList(chainKv, ethRpcClient, txPoolRpcClient, miningRpcClient, ff, stateCache, blockReader, s.agg, &httpRpcCfg, s.engine, s.logger) + s.apiList = jsonrpc.APIList(chainKv, ethRpcClient, txPoolRpcClient, miningRpcClient, ff, stateCache, blockReader, &httpRpcCfg, s.engine, s.logger) if config.SilkwormRpcDaemon && httpRpcCfg.Enabled { interface_log_settings := silkworm.RpcInterfaceLogSettings{ diff --git a/turbo/engineapi/engine_server.go b/turbo/engineapi/engine_server.go index b25c124057f..ca1575d7fc6 100644 --- a/turbo/engineapi/engine_server.go +++ b/turbo/engineapi/engine_server.go @@ -106,7 +106,7 @@ func (e *EngineServer) Start( txPool txpool.TxpoolClient, mining txpool.MiningClient, ) { - base := jsonrpc.NewBaseApi(filters, stateCache, blockReader, agg, httpConfig.WithDatadir, httpConfig.EvmCallTimeout, engineReader, httpConfig.Dirs) + base := jsonrpc.NewBaseApi(filters, stateCache, blockReader, httpConfig.WithDatadir, httpConfig.EvmCallTimeout, engineReader) ethImpl := jsonrpc.NewEthAPI(base, db, eth, txPool, mining, httpConfig.Gascap, httpConfig.Feecap, httpConfig.ReturnDataLimit, httpConfig.AllowUnprotectedTxs, httpConfig.MaxGetProofRewindBlockCount, httpConfig.WebsocketSubscribeLogsChannelSize, e.logger) From 6072d5491ddc4c2496f47966ccc6d91189ad0a7f Mon Sep 17 00:00:00 2001 From: JkLondon Date: Thu, 11 Jul 2024 10:39:30 +0300 Subject: [PATCH 15/29] save --- cmd/integration/commands/stages.go | 1 - 1 file changed, 1 deletion(-) diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index 789a35eb80b..be94d1222c7 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -1461,7 +1461,6 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig, maxBlockBroadcastPeers, false, /* disableBlockDownload */ logger, - cfg.Dirs, ) if err != nil { panic(err) From 8604911008207341ead4e6f1ec335f4bec2bdbc8 Mon Sep 17 00:00:00 2001 From: JkLondon Date: Thu, 11 Jul 2024 10:45:12 +0300 Subject: [PATCH 16/29] save --- cmd/rpcdaemon/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/rpcdaemon/main.go b/cmd/rpcdaemon/main.go index 3e2b26ced98..133868ff9af 100644 --- a/cmd/rpcdaemon/main.go +++ b/cmd/rpcdaemon/main.go @@ -49,7 +49,7 @@ func main() { defer db.Close() defer engine.Close() - apiList := jsonrpc.APIList(db, backend, txPool, mining, ff, stateCache, blockReader, agg, cfg, engine, logger) + apiList := jsonrpc.APIList(db, backend, txPool, mining, ff, stateCache, blockReader, cfg, engine, logger) rpc.PreAllocateRPCMetricLabels(apiList) if err := cli.StartRpcServer(ctx, cfg, apiList, logger); err != nil { logger.Error(err.Error()) From 490c1a883a9404df075014b825824eb1d461af83 Mon Sep 17 00:00:00 2001 From: JkLondon Date: Thu, 11 Jul 2024 10:46:06 +0300 Subject: [PATCH 17/29] save --- cmd/rpcdaemon/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/rpcdaemon/main.go b/cmd/rpcdaemon/main.go index 133868ff9af..ca9aaa7ad84 100644 --- a/cmd/rpcdaemon/main.go +++ b/cmd/rpcdaemon/main.go @@ -39,7 +39,7 @@ func main() { cmd.RunE = func(cmd *cobra.Command, args []string) error { ctx := cmd.Context() logger := debug.SetupCobra(cmd, "sentry") - db, backend, txPool, mining, stateCache, blockReader, engine, ff, agg, err := cli.RemoteServices(ctx, cfg, logger, rootCancel) + db, backend, txPool, mining, stateCache, blockReader, engine, ff, _, err := cli.RemoteServices(ctx, cfg, logger, rootCancel) if err != nil { if !errors.Is(err, context.Canceled) { logger.Error("Could not connect to DB", "err", err) From 0d23573f4566607965b605a1137d0682652cfb42 Mon Sep 17 00:00:00 2001 From: JkLondon Date: Thu, 11 Jul 2024 10:50:49 +0300 Subject: [PATCH 18/29] save --- cmd/rpcdaemon/cli/config.go | 31 ++++++++++++++++--------------- cmd/rpcdaemon/main.go | 2 +- 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/cmd/rpcdaemon/cli/config.go b/cmd/rpcdaemon/cli/config.go index 1adb2b5390a..6c6931273ee 100644 --- a/cmd/rpcdaemon/cli/config.go +++ b/cmd/rpcdaemon/cli/config.go @@ -322,24 +322,24 @@ func EmbeddedServices(ctx context.Context, func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger, rootCancel context.CancelFunc) ( db kv.RoDB, eth rpchelper.ApiBackend, txPool txpool.TxpoolClient, mining txpool.MiningClient, stateCache kvcache.Cache, blockReader services.FullBlockReader, engine consensus.EngineReader, - ff *rpchelper.Filters, agg *libstate.Aggregator, err error) { + ff *rpchelper.Filters, err error) { if !cfg.WithDatadir && cfg.PrivateApiAddr == "" { - return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("either remote db or local db must be specified") + return nil, nil, nil, nil, nil, nil, nil, ff, fmt.Errorf("either remote db or local db must be specified") } creds, err := grpcutil.TLS(cfg.TLSCACert, cfg.TLSCertfile, cfg.TLSKeyFile) if err != nil { - return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("open tls cert: %w", err) + return nil, nil, nil, nil, nil, nil, nil, ff, fmt.Errorf("open tls cert: %w", err) } conn, err := grpcutil.Connect(creds, cfg.PrivateApiAddr) if err != nil { - return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("could not connect to execution service privateApi: %w", err) + return nil, nil, nil, nil, nil, nil, nil, ff, fmt.Errorf("could not connect to execution service privateApi: %w", err) } remoteBackendClient := remote.NewETHBACKENDClient(conn) remoteKvClient := remote.NewKVClient(conn) remoteKv, err := remotedb.NewRemote(gointerfaces.VersionFromProto(remotedbserver.KvServiceAPIVersion), logger, remoteKvClient).Open() if err != nil { - return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("could not connect to remoteKv: %w", err) + return nil, nil, nil, nil, nil, nil, nil, ff, fmt.Errorf("could not connect to remoteKv: %w", err) } // Configure DB first @@ -364,10 +364,10 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger limiter := semaphore.NewWeighted(int64(cfg.DBReadConcurrency)) rwKv, err = kv2.NewMDBX(logger).RoTxsLimiter(limiter).Path(cfg.Dirs.Chaindata).Accede().Open(ctx) if err != nil { - return nil, nil, nil, nil, nil, nil, nil, ff, nil, err + return nil, nil, nil, nil, nil, nil, nil, ff, err } if compatErr := checkDbCompatibility(ctx, rwKv); compatErr != nil { - return nil, nil, nil, nil, nil, nil, nil, ff, nil, compatErr + return nil, nil, nil, nil, nil, nil, nil, ff, compatErr } db = rwKv @@ -386,10 +386,10 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger } return nil }); err != nil { - return nil, nil, nil, nil, nil, nil, nil, ff, nil, err + return nil, nil, nil, nil, nil, nil, nil, ff, err } if cc == nil { - return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("chain config not found in db. Need start erigon at least once on this db") + return nil, nil, nil, nil, nil, nil, nil, ff, fmt.Errorf("chain config not found in db. Need start erigon at least once on this db") } cfg.Snap.Enabled = cfg.Snap.Enabled || cfg.Sync.UseSnapshots if !cfg.Snap.Enabled { @@ -407,8 +407,9 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger allBorSnapshots.LogStat("bor:remote") cr := rawdb.NewCanonicalReader() - if agg, err = libstate.NewAggregator(ctx, cfg.Dirs, config3.HistoryV3AggregationStep, db, cr, logger); err != nil { - return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("create aggregator: %w", err) + agg, err := libstate.NewAggregator(ctx, cfg.Dirs, config3.HistoryV3AggregationStep, db, cr, logger) + if err != nil { + return nil, nil, nil, nil, nil, nil, nil, ff, fmt.Errorf("create aggregator: %w", err) } _ = agg.OpenFolder() //TODO: must use analog of `OptimisticReopenWithDB` @@ -460,7 +461,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger db, err = temporal.New(rwKv, agg) if err != nil { - return nil, nil, nil, nil, nil, nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, nil, nil, nil, err } stateCache = kvcache.NewDummy() } @@ -484,7 +485,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger if cfg.TxPoolApiAddr != cfg.PrivateApiAddr { txpoolConn, err = grpcutil.Connect(creds, cfg.TxPoolApiAddr) if err != nil { - return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("could not connect to txpool api: %w", err) + return nil, nil, nil, nil, nil, nil, nil, ff, fmt.Errorf("could not connect to txpool api: %w", err) } } @@ -515,7 +516,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger logger.Warn("[rpc] Opening Bor db", "path", borDbPath) borKv, err = kv2.NewMDBX(logger).Path(borDbPath).Label(kv.ConsensusDB).Accede().Open(ctx) if err != nil { - return nil, nil, nil, nil, nil, nil, nil, ff, nil, err + return nil, nil, nil, nil, nil, nil, nil, ff, err } // Skip the compatibility check, until we have a schema in erigon-lib @@ -558,7 +559,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger }() ff = rpchelper.New(ctx, cfg.RpcFiltersConfig, eth, txPool, mining, onNewSnapshot, logger) - return db, eth, txPool, mining, stateCache, blockReader, engine, ff, agg, err + return db, eth, txPool, mining, stateCache, blockReader, engine, ff, err } func StartRpcServer(ctx context.Context, cfg *httpcfg.HttpCfg, rpcAPI []rpc.API, logger log.Logger) error { diff --git a/cmd/rpcdaemon/main.go b/cmd/rpcdaemon/main.go index ca9aaa7ad84..7e691a024ce 100644 --- a/cmd/rpcdaemon/main.go +++ b/cmd/rpcdaemon/main.go @@ -39,7 +39,7 @@ func main() { cmd.RunE = func(cmd *cobra.Command, args []string) error { ctx := cmd.Context() logger := debug.SetupCobra(cmd, "sentry") - db, backend, txPool, mining, stateCache, blockReader, engine, ff, _, err := cli.RemoteServices(ctx, cfg, logger, rootCancel) + db, backend, txPool, mining, stateCache, blockReader, engine, ff, err := cli.RemoteServices(ctx, cfg, logger, rootCancel) if err != nil { if !errors.Is(err, context.Canceled) { logger.Error("Could not connect to DB", "err", err) From 625a62c8358ec7795e2f2dc60760ebbbf86a9b31 Mon Sep 17 00:00:00 2001 From: JkLondon Date: Thu, 11 Jul 2024 11:35:29 +0300 Subject: [PATCH 19/29] fixes + debug logs --- eth/protocols/eth/handlers.go | 6 ++++-- .../sentry_multi_client/sentry_multi_client.go | 11 +++++++---- turbo/jsonrpc/eth_receipts.go | 3 +-- turbo/jsonrpc/receipts/receipts_generator.go | 13 ++++--------- turbo/stages/blockchain_test.go | 2 ++ 5 files changed, 18 insertions(+), 17 deletions(-) diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go index 9c0b2d80759..6a8d6726363 100644 --- a/eth/protocols/eth/handlers.go +++ b/eth/protocols/eth/handlers.go @@ -22,12 +22,14 @@ package eth import ( "context" "fmt" + "github.com/ledgerwatch/erigon-lib/chain" libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/log/v3" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/core/types" + "github.com/ledgerwatch/erigon/p2p/sentry/sentry_multi_client" "github.com/ledgerwatch/erigon/rlp" "github.com/ledgerwatch/erigon/turbo/services" ) @@ -160,7 +162,7 @@ func AnswerGetBlockBodiesQuery(db kv.Tx, query GetBlockBodiesPacket, blockReader type getReceiptsFunc func(context.Context, kv.Tx, *types.Block, []libcommon.Address) (types.Receipts, error) -func AnswerGetReceiptsQuery(ctx context.Context, getReceipts getReceiptsFunc, br services.FullBlockReader, db kv.Tx, query GetReceiptsPacket) ([]rlp.RawValue, error) { //nolint:unparam +func AnswerGetReceiptsQuery(ctx context.Context, cfg *chain.Config, ethApi sentry_multi_client.EthAPI, br services.FullBlockReader, db kv.Tx, query GetReceiptsPacket) ([]rlp.RawValue, error) { //nolint:unparam // Gather state data until the fetch or network limits is reached var ( bytes int @@ -185,7 +187,7 @@ func AnswerGetReceiptsQuery(ctx context.Context, getReceipts getReceiptsFunc, br return nil, nil } - results, err := getReceipts(ctx, db, b, s) + results, err := ethApi.GetReceipts(ctx, cfg, db, b, s) if err != nil { return nil, err } diff --git a/p2p/sentry/sentry_multi_client/sentry_multi_client.go b/p2p/sentry/sentry_multi_client/sentry_multi_client.go index 7d1d9f7ab89..b2ca0780318 100644 --- a/p2p/sentry/sentry_multi_client/sentry_multi_client.go +++ b/p2p/sentry/sentry_multi_client/sentry_multi_client.go @@ -244,6 +244,7 @@ func pumpStreamLoop[TMessage interface{}]( case <-ctx.Done(): return case req := <-reqs: + println("received") if err := handleInboundMessage(ctx, req, sentry); err != nil { logger.Debug("Handling incoming message", "stream", streamName, "err", err) } @@ -301,7 +302,7 @@ type MultiClient struct { } type EthAPI interface { - GetReceipts(ctx context.Context, tx kv.Tx, block *types.Block, senders []common.Address) (types.Receipts, error) + GetReceipts(ctx context.Context, cfg *chain.Config, tx kv.Tx, block *types.Block, senders []common.Address) (types.Receipts, error) } func NewMultiClient( @@ -733,14 +734,15 @@ func (cs *MultiClient) getReceipts66(ctx context.Context, inreq *proto_sentry.In } defer tx.Rollback() - receipts, err := eth.AnswerGetReceiptsQuery(ctx, cs.ethApiWrapper.GetReceipts, cs.blockReader, tx, query.GetReceiptsPacket) + receiptsList, err := eth.AnswerGetReceiptsQuery(ctx, cs.ChainConfig, cs.ethApiWrapper, cs.blockReader, tx, query.GetReceiptsPacket) if err != nil { return err } b, err := rlp.EncodeToBytes(ð.ReceiptsRLPPacket66{ RequestId: query.RequestId, - ReceiptsRLPPacket: receipts, + ReceiptsRLPPacket: receiptsList, }) + println("getReceipts66", len(receiptsList)) if err != nil { return fmt.Errorf("encode header response: %w", err) } @@ -772,8 +774,9 @@ func (cs *MultiClient) HandleInboundMessage(ctx context.Context, message *proto_ err = fmt.Errorf("%+v, msgID=%s, trace: %s", rec, message.Id.String(), dbg.Stack()) } }() // avoid crash because Erigon's core does many things - + println("entered handleInboundMessage") err = cs.handleInboundMessage(ctx, message, sentry) + println(message.Id.String()) if (err != nil) && rlp.IsInvalidRLPError(err) { cs.logger.Debug("Kick peer for invalid RLP", "err", err) diff --git a/turbo/jsonrpc/eth_receipts.go b/turbo/jsonrpc/eth_receipts.go index 2e3db20be19..794b89be807 100644 --- a/turbo/jsonrpc/eth_receipts.go +++ b/turbo/jsonrpc/eth_receipts.go @@ -47,9 +47,8 @@ func (api *BaseAPI) getReceipts(ctx context.Context, tx kv.Tx, block *types.Bloc if err != nil { return nil, err } - api.receiptsGenerator.SetChainConfig(chainConfig) - return api.receiptsGenerator.GetReceipts(ctx, tx, block, senders) + return api.receiptsGenerator.GetReceipts(ctx, chainConfig, tx, block, senders) } // GetLogs implements eth_getLogs. Returns an array of logs matching a given filter object. diff --git a/turbo/jsonrpc/receipts/receipts_generator.go b/turbo/jsonrpc/receipts/receipts_generator.go index 6faed9eb13e..e087f67063c 100644 --- a/turbo/jsonrpc/receipts/receipts_generator.go +++ b/turbo/jsonrpc/receipts/receipts_generator.go @@ -21,7 +21,6 @@ type Generator struct { receiptsCache *lru.Cache[common.Hash, []*types.Receipt] blockReader services.FullBlockReader engine consensus.EngineReader - cfg *chain.Config } func NewGenerator(receiptsCache *lru.Cache[common.Hash, []*types.Receipt], blockReader services.FullBlockReader, @@ -33,7 +32,7 @@ func NewGenerator(receiptsCache *lru.Cache[common.Hash, []*types.Receipt], block } } -func (g *Generator) GetReceipts(ctx context.Context, tx kv.Tx, block *types.Block, senders []common.Address) (types.Receipts, error) { +func (g *Generator) GetReceipts(ctx context.Context, cfg *chain.Config, tx kv.Tx, block *types.Block, senders []common.Address) (types.Receipts, error) { if receipts, ok := g.receiptsCache.Get(block.Hash()); ok { return receipts, nil } @@ -45,14 +44,14 @@ func (g *Generator) GetReceipts(ctx context.Context, tx kv.Tx, block *types.Bloc engine := g.engine - _, _, _, ibs, _, err := transactions.ComputeTxEnv(ctx, engine, block, g.cfg, g.blockReader, tx, 0) + _, _, _, ibs, _, err := transactions.ComputeTxEnv(ctx, engine, block, cfg, g.blockReader, tx, 0) if err != nil { return nil, err } usedGas := new(uint64) usedBlobGas := new(uint64) - gp := new(core.GasPool).AddGas(block.GasLimit()).AddBlobGas(g.cfg.GetMaxBlobGasPerBlock()) + gp := new(core.GasPool).AddGas(block.GasLimit()).AddBlobGas(cfg.GetMaxBlobGasPerBlock()) noopWriter := state.NewNoopWriter() @@ -68,7 +67,7 @@ func (g *Generator) GetReceipts(ctx context.Context, tx kv.Tx, block *types.Bloc header := block.Header() for i, txn := range block.Transactions() { ibs.SetTxContext(txn.Hash(), block.Hash(), i) - receipt, _, err := core.ApplyTransaction(g.cfg, core.GetHashFn(header, getHeader), engine, nil, gp, ibs, noopWriter, header, txn, usedGas, usedBlobGas, vm.Config{}) + receipt, _, err := core.ApplyTransaction(cfg, core.GetHashFn(header, getHeader), engine, nil, gp, ibs, noopWriter, header, txn, usedGas, usedBlobGas, vm.Config{}) if err != nil { return nil, err } @@ -79,7 +78,3 @@ func (g *Generator) GetReceipts(ctx context.Context, tx kv.Tx, block *types.Bloc g.receiptsCache.Add(block.Hash(), receipts) return receipts, nil } - -func (g *Generator) SetChainConfig(config *chain.Config) { - g.cfg = config -} diff --git a/turbo/stages/blockchain_test.go b/turbo/stages/blockchain_test.go index fbebec58cc3..7227539cd79 100644 --- a/turbo/stages/blockchain_test.go +++ b/turbo/stages/blockchain_test.go @@ -381,11 +381,13 @@ func testReorg(t *testing.T, first, second []int64, td int64) { } m.ReceiveWg.Add(1) + println("sending") for _, err = range m.Send(&protosentry.InboundMessage{Id: protosentry.MessageId_GET_RECEIPTS_66, Data: b, PeerId: m.PeerId}) { if err != nil { t.Fatal(err) } } + println("sent") m.ReceiveWg.Wait() From d7c5fffc97718751f154f9f51c862ff48186abc1 Mon Sep 17 00:00:00 2001 From: JkLondon Date: Thu, 11 Jul 2024 11:38:45 +0300 Subject: [PATCH 20/29] fixes --- eth/protocols/eth/handlers.go | 7 +++---- p2p/sentry/sentry_multi_client/sentry_multi_client.go | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go index 6a8d6726363..81e483ff34b 100644 --- a/eth/protocols/eth/handlers.go +++ b/eth/protocols/eth/handlers.go @@ -29,7 +29,6 @@ import ( "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/core/types" - "github.com/ledgerwatch/erigon/p2p/sentry/sentry_multi_client" "github.com/ledgerwatch/erigon/rlp" "github.com/ledgerwatch/erigon/turbo/services" ) @@ -160,9 +159,9 @@ func AnswerGetBlockBodiesQuery(db kv.Tx, query GetBlockBodiesPacket, blockReader return bodies } -type getReceiptsFunc func(context.Context, kv.Tx, *types.Block, []libcommon.Address) (types.Receipts, error) +type getReceiptsFunc func(context.Context, *chain.Config, kv.Tx, *types.Block, []libcommon.Address) (types.Receipts, error) -func AnswerGetReceiptsQuery(ctx context.Context, cfg *chain.Config, ethApi sentry_multi_client.EthAPI, br services.FullBlockReader, db kv.Tx, query GetReceiptsPacket) ([]rlp.RawValue, error) { //nolint:unparam +func AnswerGetReceiptsQuery(ctx context.Context, cfg *chain.Config, getReceipts getReceiptsFunc, br services.FullBlockReader, db kv.Tx, query GetReceiptsPacket) ([]rlp.RawValue, error) { //nolint:unparam // Gather state data until the fetch or network limits is reached var ( bytes int @@ -187,7 +186,7 @@ func AnswerGetReceiptsQuery(ctx context.Context, cfg *chain.Config, ethApi sentr return nil, nil } - results, err := ethApi.GetReceipts(ctx, cfg, db, b, s) + results, err := getReceipts(ctx, cfg, db, b, s) if err != nil { return nil, err } diff --git a/p2p/sentry/sentry_multi_client/sentry_multi_client.go b/p2p/sentry/sentry_multi_client/sentry_multi_client.go index b2ca0780318..05a44920ed4 100644 --- a/p2p/sentry/sentry_multi_client/sentry_multi_client.go +++ b/p2p/sentry/sentry_multi_client/sentry_multi_client.go @@ -734,7 +734,7 @@ func (cs *MultiClient) getReceipts66(ctx context.Context, inreq *proto_sentry.In } defer tx.Rollback() - receiptsList, err := eth.AnswerGetReceiptsQuery(ctx, cs.ChainConfig, cs.ethApiWrapper, cs.blockReader, tx, query.GetReceiptsPacket) + receiptsList, err := eth.AnswerGetReceiptsQuery(ctx, cs.ChainConfig, cs.ethApiWrapper.GetReceipts, cs.blockReader, tx, query.GetReceiptsPacket) if err != nil { return err } From e25c4ec25d5e0a68e638e50e74a0dd140467f2b1 Mon Sep 17 00:00:00 2001 From: JkLondon Date: Thu, 11 Jul 2024 12:01:28 +0300 Subject: [PATCH 21/29] fixes --- eth/protocols/eth/handlers.go | 8 +++++--- p2p/sentry/sentry_multi_client/sentry_multi_client.go | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go index 81e483ff34b..48fccc7b03d 100644 --- a/eth/protocols/eth/handlers.go +++ b/eth/protocols/eth/handlers.go @@ -159,9 +159,11 @@ func AnswerGetBlockBodiesQuery(db kv.Tx, query GetBlockBodiesPacket, blockReader return bodies } -type getReceiptsFunc func(context.Context, *chain.Config, kv.Tx, *types.Block, []libcommon.Address) (types.Receipts, error) +type ReceiptsGetter interface { + GetReceipts(ctx context.Context, cfg *chain.Config, tx kv.Tx, block *types.Block, senders []libcommon.Address) (types.Receipts, error) +} -func AnswerGetReceiptsQuery(ctx context.Context, cfg *chain.Config, getReceipts getReceiptsFunc, br services.FullBlockReader, db kv.Tx, query GetReceiptsPacket) ([]rlp.RawValue, error) { //nolint:unparam +func AnswerGetReceiptsQuery(ctx context.Context, cfg *chain.Config, receiptsGetter ReceiptsGetter, br services.FullBlockReader, db kv.Tx, query GetReceiptsPacket) ([]rlp.RawValue, error) { //nolint:unparam // Gather state data until the fetch or network limits is reached var ( bytes int @@ -186,7 +188,7 @@ func AnswerGetReceiptsQuery(ctx context.Context, cfg *chain.Config, getReceipts return nil, nil } - results, err := getReceipts(ctx, cfg, db, b, s) + results, err := receiptsGetter.GetReceipts(ctx, cfg, db, b, s) if err != nil { return nil, err } diff --git a/p2p/sentry/sentry_multi_client/sentry_multi_client.go b/p2p/sentry/sentry_multi_client/sentry_multi_client.go index 05a44920ed4..b2ca0780318 100644 --- a/p2p/sentry/sentry_multi_client/sentry_multi_client.go +++ b/p2p/sentry/sentry_multi_client/sentry_multi_client.go @@ -734,7 +734,7 @@ func (cs *MultiClient) getReceipts66(ctx context.Context, inreq *proto_sentry.In } defer tx.Rollback() - receiptsList, err := eth.AnswerGetReceiptsQuery(ctx, cs.ChainConfig, cs.ethApiWrapper.GetReceipts, cs.blockReader, tx, query.GetReceiptsPacket) + receiptsList, err := eth.AnswerGetReceiptsQuery(ctx, cs.ChainConfig, cs.ethApiWrapper, cs.blockReader, tx, query.GetReceiptsPacket) if err != nil { return err } From ba044f63d3178c0cc16a28a25b88335863f6a536 Mon Sep 17 00:00:00 2001 From: JkLondon Date: Fri, 12 Jul 2024 14:10:45 +0300 Subject: [PATCH 22/29] find out how to receive a message --- core/types/receipt.go | 9 +++++++++ eth/protocols/eth/handlers.go | 5 +++++ p2p/sentry/sentry_multi_client/sentry_multi_client.go | 8 ++------ turbo/stages/blockchain_test.go | 7 +++++-- 4 files changed, 21 insertions(+), 8 deletions(-) diff --git a/core/types/receipt.go b/core/types/receipt.go index 1f770ca7f73..4be6367db31 100644 --- a/core/types/receipt.go +++ b/core/types/receipt.go @@ -476,3 +476,12 @@ func (rl Receipts) DeriveFieldsV3ForSingleReceipt(i int, blockHash libcommon.Has } return r, nil } + +func (r *Receipt) String() string { + firstLog := []byte("none") + if len(r.Logs) > 0 { + firstLog, _ = r.Logs[0].MarshalJSON() + } + str := fmt.Sprintf("Receipt of tx %s status %d gas used %d log 1st data %s", r.TxHash.String(), r.Status, r.GasUsed, string(firstLog)) + return str +} diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go index 48fccc7b03d..3d7f793800e 100644 --- a/eth/protocols/eth/handlers.go +++ b/eth/protocols/eth/handlers.go @@ -193,6 +193,11 @@ func AnswerGetReceiptsQuery(ctx context.Context, cfg *chain.Config, receiptsGett return nil, err } + println("receipts:") + for _, result := range results { + println(result.String()) + } + // If known, encode and queue for response packet if encoded, err := rlp.EncodeToBytes(results); err != nil { return nil, fmt.Errorf("failed to encode receipt: %w", err) diff --git a/p2p/sentry/sentry_multi_client/sentry_multi_client.go b/p2p/sentry/sentry_multi_client/sentry_multi_client.go index b2ca0780318..11cf9b0dc68 100644 --- a/p2p/sentry/sentry_multi_client/sentry_multi_client.go +++ b/p2p/sentry/sentry_multi_client/sentry_multi_client.go @@ -244,7 +244,6 @@ func pumpStreamLoop[TMessage interface{}]( case <-ctx.Done(): return case req := <-reqs: - println("received") if err := handleInboundMessage(ctx, req, sentry); err != nil { logger.Debug("Handling incoming message", "stream", streamName, "err", err) } @@ -742,7 +741,6 @@ func (cs *MultiClient) getReceipts66(ctx context.Context, inreq *proto_sentry.In RequestId: query.RequestId, ReceiptsRLPPacket: receiptsList, }) - println("getReceipts66", len(receiptsList)) if err != nil { return fmt.Errorf("encode header response: %w", err) } @@ -753,14 +751,14 @@ func (cs *MultiClient) getReceipts66(ctx context.Context, inreq *proto_sentry.In Data: b, }, } - _, err = sentryClient.SendMessageById(ctx, &outreq, &grpc.EmptyCallOption{}) + _, err = sentryClient.SendMessageById(ctx, &outreq, &grpc.OnFinishCallOption{}) if err != nil { if isPeerNotFoundErr(err) { return nil } return fmt.Errorf("send receipts response: %w", err) } - cs.logger.Info(fmt.Sprintf("[%s] GetReceipts responseLen %d", sentry.ConvertH512ToPeerID(inreq.PeerId), len(b))) + //println(fmt.Sprintf("[%s] GetReceipts responseLen %d", sentry.ConvertH512ToPeerID(inreq.PeerId), len(b))) return nil } @@ -774,9 +772,7 @@ func (cs *MultiClient) HandleInboundMessage(ctx context.Context, message *proto_ err = fmt.Errorf("%+v, msgID=%s, trace: %s", rec, message.Id.String(), dbg.Stack()) } }() // avoid crash because Erigon's core does many things - println("entered handleInboundMessage") err = cs.handleInboundMessage(ctx, message, sentry) - println(message.Id.String()) if (err != nil) && rlp.IsInvalidRLPError(err) { cs.logger.Debug("Kick peer for invalid RLP", "err", err) diff --git a/turbo/stages/blockchain_test.go b/turbo/stages/blockchain_test.go index 7227539cd79..234c5556d05 100644 --- a/turbo/stages/blockchain_test.go +++ b/turbo/stages/blockchain_test.go @@ -381,16 +381,19 @@ func testReorg(t *testing.T, first, second []int64, td int64) { } m.ReceiveWg.Add(1) - println("sending") for _, err = range m.Send(&protosentry.InboundMessage{Id: protosentry.MessageId_GET_RECEIPTS_66, Data: b, PeerId: m.PeerId}) { if err != nil { t.Fatal(err) } } - println("sent") m.ReceiveWg.Wait() + msg := m.SentMessage(0) + + require.Equal(protosentry.MessageId_RECEIPTS_66, msg.Id) + println(string(msg.GetData())) + // Make sure the chain total difficulty is the correct one want := new(big.Int).Add(m.Genesis.Difficulty(), big.NewInt(td)) have, err := rawdb.ReadTdByHash(tx, rawdb.ReadCurrentHeader(tx).Hash()) From 74ab5187f9b780618c9453dff273196be10187b0 Mon Sep 17 00:00:00 2001 From: JkLondon Date: Fri, 12 Jul 2024 14:27:10 +0300 Subject: [PATCH 23/29] added tests for empty blockchain --- turbo/stages/blockchain_test.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/turbo/stages/blockchain_test.go b/turbo/stages/blockchain_test.go index 234c5556d05..b08db932fe5 100644 --- a/turbo/stages/blockchain_test.go +++ b/turbo/stages/blockchain_test.go @@ -359,9 +359,11 @@ func testReorg(t *testing.T, first, second []int64, td int64) { } hashPacket := make([]libcommon.Hash, 0) + queryNum := 0 for block.NumberU64() != 0 { hashPacket = append(hashPacket, block.Hash()) + queryNum++ if prev.ParentHash() != block.Hash() { t.Errorf("parent block hash mismatch: have %x, want %x", prev.ParentHash(), block.Hash()) } @@ -393,6 +395,19 @@ func testReorg(t *testing.T, first, second []int64, td int64) { require.Equal(protosentry.MessageId_RECEIPTS_66, msg.Id) println(string(msg.GetData())) + encoded, err := rlp.EncodeToBytes(types.Receipts{}) + res := make([]rlp.RawValue, 0, queryNum) + for i := 0; i < queryNum; i++ { + res = append(res, encoded) + } + + require.NoError(err) + b, err = rlp.EncodeToBytes(ð.ReceiptsRLPPacket66{ + RequestId: 1, + ReceiptsRLPPacket: res, + }) + require.NoError(err) + require.Equal(b, msg.GetData()) // Make sure the chain total difficulty is the correct one want := new(big.Int).Add(m.Genesis.Difficulty(), big.NewInt(td)) From 2107f89be1dbe8a9874b0b94717fe45ff746d443 Mon Sep 17 00:00:00 2001 From: JkLondon Date: Fri, 12 Jul 2024 15:09:43 +0300 Subject: [PATCH 24/29] added test for generated blockchain --- core/types/receipt.go | 7 +-- eth/protocols/eth/handlers.go | 9 ++-- turbo/stages/blockchain_test.go | 5 ++- turbo/stages/chain_makers_test.go | 75 +++++++++++++++++++++++++++++++ 4 files changed, 85 insertions(+), 11 deletions(-) diff --git a/core/types/receipt.go b/core/types/receipt.go index 4be6367db31..7be66d08f10 100644 --- a/core/types/receipt.go +++ b/core/types/receipt.go @@ -477,11 +477,8 @@ func (rl Receipts) DeriveFieldsV3ForSingleReceipt(i int, blockHash libcommon.Has return r, nil } +// TODO: maybe make it more prettier (only for debug purposes) func (r *Receipt) String() string { - firstLog := []byte("none") - if len(r.Logs) > 0 { - firstLog, _ = r.Logs[0].MarshalJSON() - } - str := fmt.Sprintf("Receipt of tx %s status %d gas used %d log 1st data %s", r.TxHash.String(), r.Status, r.GasUsed, string(firstLog)) + str := fmt.Sprintf("Receipt of tx %+v", *r) return str } diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go index 3d7f793800e..f33de1f27c8 100644 --- a/eth/protocols/eth/handlers.go +++ b/eth/protocols/eth/handlers.go @@ -193,10 +193,11 @@ func AnswerGetReceiptsQuery(ctx context.Context, cfg *chain.Config, receiptsGett return nil, err } - println("receipts:") - for _, result := range results { - println(result.String()) - } + // For debug + //println("receipts:") + //for _, result := range results { + // println(result.String()) + //} // If known, encode and queue for response packet if encoded, err := rlp.EncodeToBytes(results); err != nil { diff --git a/turbo/stages/blockchain_test.go b/turbo/stages/blockchain_test.go index b08db932fe5..9a6584ca34d 100644 --- a/turbo/stages/blockchain_test.go +++ b/turbo/stages/blockchain_test.go @@ -394,14 +394,15 @@ func testReorg(t *testing.T, first, second []int64, td int64) { msg := m.SentMessage(0) require.Equal(protosentry.MessageId_RECEIPTS_66, msg.Id) - println(string(msg.GetData())) + encoded, err := rlp.EncodeToBytes(types.Receipts{}) + require.NoError(err) + res := make([]rlp.RawValue, 0, queryNum) for i := 0; i < queryNum; i++ { res = append(res, encoded) } - require.NoError(err) b, err = rlp.EncodeToBytes(ð.ReceiptsRLPPacket66{ RequestId: 1, ReceiptsRLPPacket: res, diff --git a/turbo/stages/chain_makers_test.go b/turbo/stages/chain_makers_test.go index d550006bdab..ad33c7927f2 100644 --- a/turbo/stages/chain_makers_test.go +++ b/turbo/stages/chain_makers_test.go @@ -21,6 +21,10 @@ package stages_test import ( "fmt" + libcommon "github.com/ledgerwatch/erigon-lib/common" + protosentry "github.com/ledgerwatch/erigon-lib/gointerfaces/sentryproto" + "github.com/ledgerwatch/erigon/eth/protocols/eth" + "github.com/ledgerwatch/erigon/rlp" "math/big" "testing" @@ -102,6 +106,7 @@ func TestGenerateChain(t *testing.T) { fmt.Printf("insert error%v\n", err) return } + tx, err := m.DB.BeginRw(m.Ctx) if err != nil { fmt.Printf("beginro error: %v\n", err) @@ -110,6 +115,7 @@ func TestGenerateChain(t *testing.T) { defer tx.Rollback() st := state.New(m.NewStateReader(tx)) + if big.NewInt(5).Cmp(current(m, tx).Number()) != 0 { t.Errorf("wrong block number: %d", current(m, tx).Number()) } @@ -122,4 +128,73 @@ func TestGenerateChain(t *testing.T) { if fmt.Sprintf("%s", st.GetBalance(addr3)) != "19687500000000001000" { //nolint t.Errorf("wrong balance of addr3: %s", st.GetBalance(addr3)) } + + // Test of receipts + hashPacket := make([]libcommon.Hash, 0, len(chain.Blocks)) + for _, block := range chain.Blocks { + hashPacket = append(hashPacket, block.Hash()) + } + + b, err := rlp.EncodeToBytes(ð.GetReceiptsPacket66{ + RequestId: 1, + GetReceiptsPacket: hashPacket, + }) + if err != nil { + t.Fatal(err) + } + + m.ReceiveWg.Add(1) + for _, err = range m.Send(&protosentry.InboundMessage{Id: protosentry.MessageId_GET_RECEIPTS_66, Data: b, PeerId: m.PeerId}) { + if err != nil { + t.Fatal(err) + } + } + + m.ReceiveWg.Wait() + + msg := m.SentMessage(0) + + if protosentry.MessageId_RECEIPTS_66 != msg.Id { + t.Errorf("receipt id %d do not match the expected id %d", msg.Id, protosentry.MessageId_RECEIPTS_66) + } + r1 := types.Receipt{Type: 0, PostState: []byte{}, Status: 1, CumulativeGasUsed: 21000, Bloom: [256]byte{}, Logs: types.Logs{}, TxHash: libcommon.HexToHash("0x9ca7a9e6bf23353fc5ac37f5c5676db1accec4af83477ac64cdcaa37f3a837f9"), ContractAddress: libcommon.HexToAddress("0x0000000000000000000000000000000000000000"), GasUsed: 21000, BlockHash: libcommon.HexToHash("0x5c7909bf8d4d8db71f0f6091aa412129591a8e41ff2230369ddf77a00bf57149"), BlockNumber: big.NewInt(1), TransactionIndex: 0} + r2 := types.Receipt{Type: 0, PostState: []byte{}, Status: 1, CumulativeGasUsed: 21000, Bloom: [256]byte{}, Logs: types.Logs{}, TxHash: libcommon.HexToHash("0xf190eed1578cdcfe69badd05b7ef183397f336dc3de37baa4adbfb4bc657c11e"), ContractAddress: libcommon.HexToAddress("0x0000000000000000000000000000000000000000"), GasUsed: 21000, BlockHash: libcommon.HexToHash("0xe4d4617526870ba7c5b81900e31bd2525c02f27fe06fd6c3caf7bed05f3271f4"), BlockNumber: big.NewInt(2), TransactionIndex: 0} + r3 := types.Receipt{Type: 0, PostState: []byte{}, Status: 1, CumulativeGasUsed: 42000, Bloom: [256]byte{}, Logs: types.Logs{}, TxHash: libcommon.HexToHash("0x309a030e44058e435a2b01302006880953e2c9319009db97013eb130d7a24eab"), ContractAddress: libcommon.HexToAddress("0x0000000000000000000000000000000000000000"), GasUsed: 21000, BlockHash: libcommon.HexToHash("0xe4d4617526870ba7c5b81900e31bd2525c02f27fe06fd6c3caf7bed05f3271f4"), BlockNumber: big.NewInt(2), TransactionIndex: 1} + + encodedEmpty, err := rlp.EncodeToBytes(types.Receipts{}) + if err != nil { + t.Fatal(err) + } + encodedFirst, err := rlp.EncodeToBytes(types.Receipts{ + &r1, + }) + if err != nil { + t.Fatal(err) + } + encodedSecond, err := rlp.EncodeToBytes(types.Receipts{ + &r2, + &r3, + }) + if err != nil { + t.Fatal(err) + } + + res := []rlp.RawValue{ + encodedFirst, + encodedSecond, + encodedEmpty, + encodedEmpty, + encodedEmpty, + } + + b, err = rlp.EncodeToBytes(ð.ReceiptsRLPPacket66{ + RequestId: 1, + ReceiptsRLPPacket: res, + }) + if err != nil { + t.Fatal(err) + } + if string(b) != string(msg.GetData()) { + t.Errorf("receipt data %s do not match the expected msg %s", string(msg.GetData()), string(b)) + } } From 6d3b7da11ac96f6577d61b066ba47cc921d6a6c9 Mon Sep 17 00:00:00 2001 From: JkLondon Date: Sat, 13 Jul 2024 18:13:56 +0300 Subject: [PATCH 25/29] save --- turbo/jsonrpc/receipts/receipts_generator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/turbo/jsonrpc/receipts/receipts_generator.go b/turbo/jsonrpc/receipts/receipts_generator.go index e087f67063c..c27fd5d4fb8 100644 --- a/turbo/jsonrpc/receipts/receipts_generator.go +++ b/turbo/jsonrpc/receipts/receipts_generator.go @@ -64,7 +64,7 @@ func (g *Generator) GetReceipts(ctx context.Context, cfg *chain.Config, tx kv.Tx } return h } - header := block.Header() + header := block.HeaderNoCopy() for i, txn := range block.Transactions() { ibs.SetTxContext(txn.Hash(), block.Hash(), i) receipt, _, err := core.ApplyTransaction(cfg, core.GetHashFn(header, getHeader), engine, nil, gp, ibs, noopWriter, header, txn, usedGas, usedBlobGas, vm.Config{}) From b632521898a74a2682364e8ff5537e0ac7b58d53 Mon Sep 17 00:00:00 2001 From: JkLondon Date: Sat, 13 Jul 2024 18:20:52 +0300 Subject: [PATCH 26/29] save --- p2p/sentry/sentry_multi_client/sentry_multi_client.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/p2p/sentry/sentry_multi_client/sentry_multi_client.go b/p2p/sentry/sentry_multi_client/sentry_multi_client.go index 11cf9b0dc68..0d481f43cbf 100644 --- a/p2p/sentry/sentry_multi_client/sentry_multi_client.go +++ b/p2p/sentry/sentry_multi_client/sentry_multi_client.go @@ -297,12 +297,10 @@ type MultiClient struct { logger log.Logger getReceiptsActiveGoroutineNumber *semaphore.Weighted - ethApiWrapper EthAPI + ethApiWrapper eth.ReceiptsGetter } -type EthAPI interface { - GetReceipts(ctx context.Context, cfg *chain.Config, tx kv.Tx, block *types.Block, senders []common.Address) (types.Receipts, error) -} +var _ eth.ReceiptsGetter = new(receipts.Generator) // compile-time interface-check func NewMultiClient( db kv.RwDB, From 68b09d559dd15d71213dde7983294ad142dee6d2 Mon Sep 17 00:00:00 2001 From: JkLondon Date: Sat, 13 Jul 2024 18:30:35 +0300 Subject: [PATCH 27/29] save --- erigon-lib/txpool/fetch.go | 1 - eth/protocols/eth/handlers.go | 9 +++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/erigon-lib/txpool/fetch.go b/erigon-lib/txpool/fetch.go index d8a4322a6a9..5c50819326b 100644 --- a/erigon-lib/txpool/fetch.go +++ b/erigon-lib/txpool/fetch.go @@ -174,7 +174,6 @@ func (f *Fetch) receiveMessage(ctx context.Context, sentryClient sentry.SentryCl } return err } - println("receive message") var req *sentry.InboundMessage for req, err = stream.Recv(); ; req, err = stream.Recv() { if err != nil { diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go index f33de1f27c8..a9293760eb2 100644 --- a/eth/protocols/eth/handlers.go +++ b/eth/protocols/eth/handlers.go @@ -193,6 +193,15 @@ func AnswerGetReceiptsQuery(ctx context.Context, cfg *chain.Config, receiptsGett return nil, err } + if results == nil { + header, err := rawdb.ReadHeaderByHash(db, hash) + if err != nil { + return nil, err + } + if header == nil || header.ReceiptHash != types.EmptyRootHash { + continue + } + } // For debug //println("receipts:") //for _, result := range results { From 06203273a471f55d361a5a72dd5f05fd3005dbad Mon Sep 17 00:00:00 2001 From: JkLondon Date: Mon, 15 Jul 2024 10:34:52 +0300 Subject: [PATCH 28/29] save --- erigon-lib/txpool/fetch.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/erigon-lib/txpool/fetch.go b/erigon-lib/txpool/fetch.go index 5c50819326b..e21c39d9ce2 100644 --- a/erigon-lib/txpool/fetch.go +++ b/erigon-lib/txpool/fetch.go @@ -177,7 +177,7 @@ func (f *Fetch) receiveMessage(ctx context.Context, sentryClient sentry.SentryCl var req *sentry.InboundMessage for req, err = stream.Recv(); ; req, err = stream.Recv() { if err != nil { - f.logger.Error("[txpool.receiveMessage]", "err", err) + f.logger.Debug("[txpool.receiveMessage]", "err", err) select { case <-f.ctx.Done(): return ctx.Err() @@ -189,7 +189,6 @@ func (f *Fetch) receiveMessage(ctx context.Context, sentryClient sentry.SentryCl f.logger.Warn("[txpool.receiveMessage]", "req nil") return nil } - f.logger.Info("[txpool.receiveMessage]", "req", req) if err := f.handleInboundMessage(streamCtx, req, sentryClient); err != nil { if grpcutil.IsRetryLater(err) || grpcutil.IsEndOfStream(err) { time.Sleep(3 * time.Second) @@ -200,7 +199,6 @@ func (f *Fetch) receiveMessage(ctx context.Context, sentryClient sentry.SentryCl if f.wg != nil { f.wg.Done() } - f.logger.Info("[txpool.fetch] Handling incoming message", "msg", string(req.Data), "reqID", req.Id.String(), "err", err) } } From 7626b761d63acf272cc36f41198f5f7e544a6640 Mon Sep 17 00:00:00 2001 From: JkLondon Date: Tue, 16 Jul 2024 12:24:31 +0300 Subject: [PATCH 29/29] save --- erigon-lib/txpool/fetch.go | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/erigon-lib/txpool/fetch.go b/erigon-lib/txpool/fetch.go index e21c39d9ce2..4545dd48485 100644 --- a/erigon-lib/txpool/fetch.go +++ b/erigon-lib/txpool/fetch.go @@ -177,19 +177,14 @@ func (f *Fetch) receiveMessage(ctx context.Context, sentryClient sentry.SentryCl var req *sentry.InboundMessage for req, err = stream.Recv(); ; req, err = stream.Recv() { if err != nil { - f.logger.Debug("[txpool.receiveMessage]", "err", err) select { case <-f.ctx.Done(): return ctx.Err() default: } - return err - } - if req == nil { - f.logger.Warn("[txpool.receiveMessage]", "req nil") - return nil + return fmt.Errorf("txpool.receiveMessage: %w", err) } - if err := f.handleInboundMessage(streamCtx, req, sentryClient); err != nil { + if err = f.handleInboundMessage(streamCtx, req, sentryClient); err != nil { if grpcutil.IsRetryLater(err) || grpcutil.IsEndOfStream(err) { time.Sleep(3 * time.Second) continue