Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p receipts #11010

Merged
merged 33 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 16 additions & 15 deletions cmd/rpcdaemon/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,24 +322,24 @@ func EmbeddedServices(ctx context.Context,
func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger, rootCancel context.CancelFunc) (
db kv.RoDB, eth rpchelper.ApiBackend, txPool txpool.TxpoolClient, mining txpool.MiningClient,
stateCache kvcache.Cache, blockReader services.FullBlockReader, engine consensus.EngineReader,
ff *rpchelper.Filters, agg *libstate.Aggregator, err error) {
ff *rpchelper.Filters, err error) {
if !cfg.WithDatadir && cfg.PrivateApiAddr == "" {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("either remote db or local db must be specified")
return nil, nil, nil, nil, nil, nil, nil, ff, fmt.Errorf("either remote db or local db must be specified")
}
creds, err := grpcutil.TLS(cfg.TLSCACert, cfg.TLSCertfile, cfg.TLSKeyFile)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("open tls cert: %w", err)
return nil, nil, nil, nil, nil, nil, nil, ff, fmt.Errorf("open tls cert: %w", err)
}
conn, err := grpcutil.Connect(creds, cfg.PrivateApiAddr)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("could not connect to execution service privateApi: %w", err)
return nil, nil, nil, nil, nil, nil, nil, ff, fmt.Errorf("could not connect to execution service privateApi: %w", err)
}

remoteBackendClient := remote.NewETHBACKENDClient(conn)
remoteKvClient := remote.NewKVClient(conn)
remoteKv, err := remotedb.NewRemote(gointerfaces.VersionFromProto(remotedbserver.KvServiceAPIVersion), logger, remoteKvClient).Open()
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("could not connect to remoteKv: %w", err)
return nil, nil, nil, nil, nil, nil, nil, ff, fmt.Errorf("could not connect to remoteKv: %w", err)
}

// Configure DB first
Expand All @@ -364,10 +364,10 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
limiter := semaphore.NewWeighted(int64(cfg.DBReadConcurrency))
rwKv, err = kv2.NewMDBX(logger).RoTxsLimiter(limiter).Path(cfg.Dirs.Chaindata).Accede().Open(ctx)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, err
return nil, nil, nil, nil, nil, nil, nil, ff, err
}
if compatErr := checkDbCompatibility(ctx, rwKv); compatErr != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, compatErr
return nil, nil, nil, nil, nil, nil, nil, ff, compatErr
}
db = rwKv

Expand All @@ -386,10 +386,10 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
}
return nil
}); err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, err
return nil, nil, nil, nil, nil, nil, nil, ff, err
}
if cc == nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("chain config not found in db. Need start erigon at least once on this db")
return nil, nil, nil, nil, nil, nil, nil, ff, fmt.Errorf("chain config not found in db. Need start erigon at least once on this db")
}
cfg.Snap.Enabled = cfg.Snap.Enabled || cfg.Sync.UseSnapshots
if !cfg.Snap.Enabled {
Expand All @@ -407,8 +407,9 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
allBorSnapshots.LogStat("bor:remote")

cr := rawdb.NewCanonicalReader()
if agg, err = libstate.NewAggregator(ctx, cfg.Dirs, config3.HistoryV3AggregationStep, db, cr, logger); err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("create aggregator: %w", err)
agg, err := libstate.NewAggregator(ctx, cfg.Dirs, config3.HistoryV3AggregationStep, db, cr, logger)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, fmt.Errorf("create aggregator: %w", err)
}
_ = agg.OpenFolder() //TODO: must use analog of `OptimisticReopenWithDB`

Expand Down Expand Up @@ -460,7 +461,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger

db, err = temporal.New(rwKv, agg)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, nil, nil, nil, err
}
stateCache = kvcache.NewDummy()
}
Expand All @@ -484,7 +485,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
if cfg.TxPoolApiAddr != cfg.PrivateApiAddr {
txpoolConn, err = grpcutil.Connect(creds, cfg.TxPoolApiAddr)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("could not connect to txpool api: %w", err)
return nil, nil, nil, nil, nil, nil, nil, ff, fmt.Errorf("could not connect to txpool api: %w", err)
}
}

Expand Down Expand Up @@ -515,7 +516,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
logger.Warn("[rpc] Opening Bor db", "path", borDbPath)
borKv, err = kv2.NewMDBX(logger).Path(borDbPath).Label(kv.ConsensusDB).Accede().Open(ctx)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, err
return nil, nil, nil, nil, nil, nil, nil, ff, err
}
// Skip the compatibility check, until we have a schema in erigon-lib

Expand Down Expand Up @@ -558,7 +559,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
}()

ff = rpchelper.New(ctx, cfg.RpcFiltersConfig, eth, txPool, mining, onNewSnapshot, logger)
return db, eth, txPool, mining, stateCache, blockReader, engine, ff, agg, err
return db, eth, txPool, mining, stateCache, blockReader, engine, ff, err
}

func StartRpcServer(ctx context.Context, cfg *httpcfg.HttpCfg, rpcAPI []rpc.API, logger log.Logger) error {
Expand Down
4 changes: 2 additions & 2 deletions cmd/rpcdaemon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func main() {
cmd.RunE = func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
logger := debug.SetupCobra(cmd, "sentry")
db, backend, txPool, mining, stateCache, blockReader, engine, ff, agg, err := cli.RemoteServices(ctx, cfg, logger, rootCancel)
db, backend, txPool, mining, stateCache, blockReader, engine, ff, err := cli.RemoteServices(ctx, cfg, logger, rootCancel)
if err != nil {
if !errors.Is(err, context.Canceled) {
logger.Error("Could not connect to DB", "err", err)
Expand All @@ -49,7 +49,7 @@ func main() {
defer db.Close()
defer engine.Close()

apiList := jsonrpc.APIList(db, backend, txPool, mining, ff, stateCache, blockReader, agg, cfg, engine, logger)
apiList := jsonrpc.APIList(db, backend, txPool, mining, ff, stateCache, blockReader, cfg, engine, logger)
rpc.PreAllocateRPCMetricLabels(apiList)
if err := cli.StartRpcServer(ctx, cfg, apiList, logger); err != nil {
logger.Error(err.Error())
Expand Down
6 changes: 6 additions & 0 deletions core/types/receipt.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,3 +476,9 @@ func (rl Receipts) DeriveFieldsV3ForSingleReceipt(i int, blockHash libcommon.Has
}
return r, nil
}

// TODO: maybe make it more prettier (only for debug purposes)
func (r *Receipt) String() string {
str := fmt.Sprintf("Receipt of tx %+v", *r)
return str
}
9 changes: 6 additions & 3 deletions erigon-lib/txpool/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ func (f *Fetch) receiveMessageLoop(sentryClient sentry.SentryClient) {
f.logger.Warn("[txpool.recvMessage] sentry not ready yet", "err", err)
continue
}

if err := f.receiveMessage(f.ctx, sentryClient); err != nil {
if grpcutil.IsRetryLater(err) || grpcutil.IsEndOfStream(err) {
time.Sleep(3 * time.Second)
Expand Down Expand Up @@ -175,10 +174,11 @@ func (f *Fetch) receiveMessage(ctx context.Context, sentryClient sentry.SentryCl
}
return err
}

println("receive message")
var req *sentry.InboundMessage
for req, err = stream.Recv(); ; req, err = stream.Recv() {
if err != nil {
f.logger.Error("[txpool.receiveMessage]", "err", err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

users using our logs for understanding - is erigon working or not.
Error log level - means node can't work without user's actions.
This error is self-recoverable and can be at Debug level

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set to debug lvl

select {
case <-f.ctx.Done():
return ctx.Err()
Expand All @@ -187,18 +187,21 @@ func (f *Fetch) receiveMessage(ctx context.Context, sentryClient sentry.SentryCl
return err
}
if req == nil {
f.logger.Warn("[txpool.receiveMessage]", "req nil")
return nil
}
f.logger.Info("[txpool.receiveMessage]", "req", req)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

requests can be very big. and binary data can be printed useful way - only by fmt.Sprintf("%x")

Screenshot 2024-07-14 at 16 37 54

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed because it's useless for majority of situations

if err := f.handleInboundMessage(streamCtx, req, sentryClient); err != nil {
if grpcutil.IsRetryLater(err) || grpcutil.IsEndOfStream(err) {
time.Sleep(3 * time.Second)
continue
}
f.logger.Debug("[txpool.fetch] Handling incoming message", "msg", req.Id.String(), "err", err)
f.logger.Debug("[txpool.fetch] Handling incoming message", "msg", string(req.Data), "reqID", req.Id.String(), "err", err)
}
if f.wg != nil {
f.wg.Done()
}
f.logger.Info("[txpool.fetch] Handling incoming message", "msg", string(req.Data), "reqID", req.Id.String(), "err", err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

erigon handling tons of concurrent requests - from RPC and from P2P. no much reason to log them all.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also this change is not related to current PR

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

}
}

Expand Down
2 changes: 1 addition & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -1073,7 +1073,7 @@ func (s *Ethereum) Init(stack *node.Node, config *ethconfig.Config, chainConfig
}
}

s.apiList = jsonrpc.APIList(chainKv, ethRpcClient, txPoolRpcClient, miningRpcClient, ff, stateCache, blockReader, s.agg, &httpRpcCfg, s.engine, s.logger)
s.apiList = jsonrpc.APIList(chainKv, ethRpcClient, txPoolRpcClient, miningRpcClient, ff, stateCache, blockReader, &httpRpcCfg, s.engine, s.logger)

if config.SilkwormRpcDaemon && httpRpcCfg.Enabled {
interface_log_settings := silkworm.RpcInterfaceLogSettings{
Expand Down
30 changes: 18 additions & 12 deletions eth/protocols/eth/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@ package eth
import (
"context"
"fmt"

"github.com/ledgerwatch/erigon-lib/chain"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/log/v3"

"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/types"
Expand Down Expand Up @@ -160,12 +159,17 @@ func AnswerGetBlockBodiesQuery(db kv.Tx, query GetBlockBodiesPacket, blockReader
return bodies
}

func AnswerGetReceiptsQuery(br services.FullBlockReader, db kv.Tx, query GetReceiptsPacket) ([]rlp.RawValue, error) { //nolint:unparam
type ReceiptsGetter interface {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI:

  • we also have turbo/services/interfaces.go. But define interfaces on consumer side is also good.
  • can use next trick:
var _ evmtypes.IntraBlockState = new(IntraBlockState) // compile-time interface-check

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in an another file

GetReceipts(ctx context.Context, cfg *chain.Config, tx kv.Tx, block *types.Block, senders []libcommon.Address) (types.Receipts, error)
}

func AnswerGetReceiptsQuery(ctx context.Context, cfg *chain.Config, receiptsGetter ReceiptsGetter, br services.FullBlockReader, db kv.Tx, query GetReceiptsPacket) ([]rlp.RawValue, error) { //nolint:unparam
// Gather state data until the fetch or network limits is reached
var (
bytes int
receipts []rlp.RawValue
)

for lookups, hash := range query {
if bytes >= softResponseLimit || len(receipts) >= maxReceiptsServe ||
lookups >= 2*maxReceiptsServe {
Expand All @@ -183,16 +187,18 @@ func AnswerGetReceiptsQuery(br services.FullBlockReader, db kv.Tx, query GetRece
if b == nil {
return nil, nil
}
results := rawdb.ReadReceipts(db, b, s)
if results == nil {
header, err := rawdb.ReadHeaderByHash(db, hash)
if err != nil {
return nil, err
}
if header == nil || header.ReceiptHash != types.EmptyRootHash {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems this optimization lost. maybe move it inside generator

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done but not in generator because it's query response optimization, not receipt

continue
}

results, err := receiptsGetter.GetReceipts(ctx, cfg, db, b, s)
if err != nil {
return nil, err
}

// For debug
//println("receipts:")
//for _, result := range results {
// println(result.String())
//}

// If known, encode and queue for response packet
if encoded, err := rlp.EncodeToBytes(results); err != nil {
return nil, fmt.Errorf("failed to encode receipt: %w", err)
Expand Down
106 changes: 65 additions & 41 deletions p2p/sentry/sentry_multi_client/sentry_multi_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ import (
"encoding/hex"
"errors"
"fmt"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon/turbo/jsonrpc/receipts"
"golang.org/x/sync/semaphore"
"math/rand"
"sort"
"sync"
Expand Down Expand Up @@ -291,7 +295,13 @@ type MultiClient struct {
// decouple sentry multi client from header and body downloading logic is done
disableBlockDownload bool

logger log.Logger
logger log.Logger
getReceiptsActiveGoroutineNumber *semaphore.Weighted
ethApiWrapper EthAPI
}

type EthAPI interface {
GetReceipts(ctx context.Context, cfg *chain.Config, tx kv.Tx, block *types.Block, senders []common.Address) (types.Receipts, error)
}

func NewMultiClient(
Expand Down Expand Up @@ -342,6 +352,14 @@ func NewMultiClient(
bd = &bodydownload.BodyDownload{}
}

receiptsCacheLimit := 32
receiptsCache, err := lru.New[common.Hash, []*types.Receipt](receiptsCacheLimit)
if err != nil {
return nil, err
}

receiptsGenerator := receipts.NewGenerator(receiptsCache, blockReader, engine)

cs := &MultiClient{
Hd: hd,
Bd: bd,
Expand All @@ -356,6 +374,8 @@ func NewMultiClient(
maxBlockBroadcastPeers: maxBlockBroadcastPeers,
disableBlockDownload: disableBlockDownload,
logger: logger,
getReceiptsActiveGoroutineNumber: semaphore.NewWeighted(1),
ethApiWrapper: receiptsGenerator,
}

return cs, nil
Expand Down Expand Up @@ -696,45 +716,50 @@ func (cs *MultiClient) getBlockBodies66(ctx context.Context, inreq *proto_sentry
return nil
}

func (cs *MultiClient) getReceipts66(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry direct.SentryClient) error {
return nil //TODO: https://github.com/ledgerwatch/erigon/issues/10320
//var query eth.GetReceiptsPacket66
//if err := rlp.DecodeBytes(inreq.Data, &query); err != nil {
// return fmt.Errorf("decoding getReceipts66: %w, data: %x", err, inreq.Data)
//}
//tx, err := cs.db.BeginRo(ctx)
//if err != nil {
// return err
//}
//defer tx.Rollback()
//receipts, err := eth.AnswerGetReceiptsQuery(cs.blockReader, tx, query.GetReceiptsPacket)
//if err != nil {
// return err
//}
//tx.Rollback()
//b, err := rlp.EncodeToBytes(&eth.ReceiptsRLPPacket66{
// RequestId: query.RequestId,
// ReceiptsRLPPacket: receipts,
//})
//if err != nil {
// return fmt.Errorf("encode header response: %w", err)
//}
//outreq := proto_sentry.SendMessageByIdRequest{
// PeerId: inreq.PeerId,
// Data: &proto_sentry.OutboundMessageData{
// Id: proto_sentry.MessageId_RECEIPTS_66,
// Data: b,
// },
//}
//_, err = sentry.SendMessageById(ctx, &outreq, &grpc.EmptyCallOption{})
//if err != nil {
// if isPeerNotFoundErr(err) {
// return nil
// }
// return fmt.Errorf("send bodies response: %w", err)
//}
////cs.logger.Info(fmt.Sprintf("[%s] GetReceipts responseLen %d", ConvertH512ToPeerID(inreq.PeerId), len(b)))
//return nil
func (cs *MultiClient) getReceipts66(ctx context.Context, inreq *proto_sentry.InboundMessage, sentryClient direct.SentryClient) error {
err := cs.getReceiptsActiveGoroutineNumber.Acquire(ctx, 1)
if err != nil {
return err
}
defer cs.getReceiptsActiveGoroutineNumber.Release(1)
var query eth.GetReceiptsPacket66
if err := rlp.DecodeBytes(inreq.Data, &query); err != nil {
return fmt.Errorf("decoding getReceipts66: %w, data: %x", err, inreq.Data)
}

tx, err := cs.db.BeginRo(ctx)
if err != nil {
return err
}
defer tx.Rollback()

receiptsList, err := eth.AnswerGetReceiptsQuery(ctx, cs.ChainConfig, cs.ethApiWrapper, cs.blockReader, tx, query.GetReceiptsPacket)
if err != nil {
return err
}
b, err := rlp.EncodeToBytes(&eth.ReceiptsRLPPacket66{
RequestId: query.RequestId,
ReceiptsRLPPacket: receiptsList,
})
if err != nil {
return fmt.Errorf("encode header response: %w", err)
}
outreq := proto_sentry.SendMessageByIdRequest{
PeerId: inreq.PeerId,
Data: &proto_sentry.OutboundMessageData{
Id: proto_sentry.MessageId_RECEIPTS_66,
Data: b,
},
}
_, err = sentryClient.SendMessageById(ctx, &outreq, &grpc.OnFinishCallOption{})
if err != nil {
if isPeerNotFoundErr(err) {
return nil
}
return fmt.Errorf("send receipts response: %w", err)
}
//println(fmt.Sprintf("[%s] GetReceipts responseLen %d", sentry.ConvertH512ToPeerID(inreq.PeerId), len(b)))
return nil
}

func MakeInboundMessage() *proto_sentry.InboundMessage {
Expand All @@ -747,7 +772,6 @@ func (cs *MultiClient) HandleInboundMessage(ctx context.Context, message *proto_
err = fmt.Errorf("%+v, msgID=%s, trace: %s", rec, message.Id.String(), dbg.Stack())
}
}() // avoid crash because Erigon's core does many things

err = cs.handleInboundMessage(ctx, message, sentry)

if (err != nil) && rlp.IsInvalidRLPError(err) {
Expand Down
Loading
Loading