From c3d2c2689a7c3bf393424cc59017d0da10cfbc89 Mon Sep 17 00:00:00 2001 From: Manav Darji Date: Mon, 20 Feb 2023 18:37:21 +0530 Subject: [PATCH] Merge qa to master (#750) * Added checks to RPC requests and introduced new flags to customise the parameters (#657) * added a check to reject rpc requests with batch size > the one set using a newly added flag (rpcbatchlimit) * added a check to reject rpc requests whose result size > the one set using a newly added flag (rpcreturndatalimit) * updated the config files and docs * chg : trieTimeout from 60 to 10 mins (#692) * chg : trieTimeout from 60 to 10 mins * chg : cache.timout to 10m from 1h in configs * internal/cli/server : fix : added triesInMemory in config (#691) * changed version from 0.3.0 to 0.3.4-beta (#693) * fix nil state-sync issue, increase grpc limit (#695) * Increase grpc message size limit in pprof * consensus/bor/bor.go : stateSyncs init fixed [Fix #686] * eth/filters: handle nil state-sync before notify * eth/filters: update check Co-authored-by: Jerry Co-authored-by: Daniil * core, tests/bor: add more tests for state-sync validation (#710) * core: add get state sync function for tests * tests/bor: add validation for state sync events post consensus * Arpit/temp bor sync (#701) * Increase grpc message size limit in pprof * ReadBorReceipts improvements * use internal function * fix tests * fetch geth upstread for ReadBorReceiptRLP * Only query bor receipt when the query index is equal to # tx in block body This change reduces the frequency of calling ReadBorReceipt and ReadBorTransaction, which are CPU and db intensive. * Revert "fetch geth upstread for ReadBorReceiptRLP" This reverts commit 2e838a6b1313d26674f3a8df4b044e35dcbf35a0. * Restore ReadBorReceiptRLP * fix bor receipts * remove unused * fix lints --------- Co-authored-by: Jerry Co-authored-by: Manav Darji Co-authored-by: Evgeny Danienko <6655321@bk.ru> * Revert "chg : trieTimeout from 60 to 10 mins (#692)" (#720) This reverts commit 241843c7e7bb18e64d2e157fd6fbbd665f6ce9d9. * Arpit/add execution pool 2 (#719) * initial * linters * linters * remove timeout * update pool * change pool size function * check nil * check nil * fix tests * Use execution pool from server in all handlers * simplify things * test fix * add support for cli, config * add to cli and config * merge base branch * debug statements * fix bug * atomic pointer timeout * add apis * update workerpool * fix issues * change params * fix issues * fix ipc issue * remove execution pool from IPC * revert * fix tests * mutex * refactor flag and value names * ordering fix * refactor flag and value names * update default ep size to 40 * fix bor start issues * revert file changes * debug statements * fix bug * update workerpool * atomic pointer timeout * add apis * Merge branch 'add-execution-pool' of github.com:maticnetwork/bor into arpit/add-execution-pool * fix issues * change params * fix issues * fix ipc issue * remove execution pool from IPC * revert * merge base branch * Merge branch 'add-execution-pool' of github.com:maticnetwork/bor into arpit/add-execution-pool * mutex * fix tests * Merge branch 'arpit/add-execution-pool' of github.com:maticnetwork/bor into arpit/add-execution-pool * Change default size of execution pool to 40 * refactor flag and value names * fix merge conflicts * ordering fix * refactor flag and value names * update default ep size to 40 * fix bor start issues * revert file changes * fix linters * fix go.mod * change sec to ms * change default value for ep timeout * fix node api calls * comment setter for ep timeout --------- Co-authored-by: Evgeny Danienko <6655321@bk.ru> Co-authored-by: Jerry Co-authored-by: Manav Darji * version change (#721) * Event based pprof (#732) * feature * Save pprof to /tmp --------- Co-authored-by: Jerry * Cherry-pick changes from develop (#738) * Check if block is nil to prevent panic (#736) * miner: use env for tracing instead of block object (#728) --------- Co-authored-by: Dmitry <46797839+dkeysil@users.noreply.github.com> * add max code init size check in txpool (#739) * Revert "Event based pprof" and update version (#742) * Revert "Event based pprof (#732)" This reverts commit 22fa4033e8fabb51c44e8d2a8c6bb695a6e9285e. * params: update version to 0.3.4-beta3 * packaging/templates: update bor version * params, packaging/templates: update bor version --------- Co-authored-by: SHIVAM SHARMA Co-authored-by: Pratik Patil Co-authored-by: Jerry Co-authored-by: Daniil Co-authored-by: Arpit Temani Co-authored-by: Evgeny Danienko <6655321@bk.ru> Co-authored-by: Dmitry <46797839+dkeysil@users.noreply.github.com> --- builder/files/config.toml | 6 ++ cmd/clef/main.go | 2 +- consensus/bor/bor.go | 2 +- core/blockchain.go | 2 +- core/blockchain_reader.go | 4 + core/bor_blockchain.go | 2 +- core/error.go | 4 + core/rawdb/bor_receipt.go | 64 +++++------- core/tx_pool.go | 6 ++ docs/cli/example_config.toml | 25 +++-- docs/cli/server.md | 12 +++ eth/api_backend.go | 4 + eth/ethconfig/config.go | 14 ++- eth/filters/bor_api.go | 7 +- eth/filters/test_backend.go | 2 +- eth/tracers/api.go | 2 +- eth/tracers/api_test.go | 4 + go.mod | 3 + go.sum | 4 + internal/cli/debug_pprof.go | 3 +- internal/cli/dumpconfig.go | 2 + internal/cli/server/config.go | 67 +++++++++---- internal/cli/server/flags.go | 40 ++++++++ internal/ethapi/api.go | 33 +++++-- internal/ethapi/backend.go | 9 +- internal/web3ext/web3ext.go | 28 ++++++ les/api_backend.go | 4 + miner/worker.go | 6 +- node/api.go | 89 +++++++++++++++++ node/config.go | 9 ++ node/node.go | 31 +++--- node/rpcstack.go | 21 +++- node/rpcstack_test.go | 2 +- .../templates/mainnet-v1/archive/config.toml | 6 ++ .../mainnet-v1/sentry/sentry/bor/config.toml | 6 ++ .../sentry/validator/bor/config.toml | 6 ++ .../mainnet-v1/without-sentry/bor/config.toml | 6 ++ packaging/templates/package_scripts/control | 2 +- .../templates/package_scripts/control.arm64 | 2 +- .../package_scripts/control.profile.amd64 | 2 +- .../package_scripts/control.profile.arm64 | 2 +- .../package_scripts/control.validator | 2 +- .../package_scripts/control.validator.arm64 | 2 +- .../templates/testnet-v4/archive/config.toml | 6 ++ .../testnet-v4/sentry/sentry/bor/config.toml | 6 ++ .../sentry/validator/bor/config.toml | 6 ++ .../testnet-v4/without-sentry/bor/config.toml | 6 ++ params/config.go | 4 + params/protocol_params.go | 3 +- params/version.go | 2 +- rpc/client.go | 2 +- rpc/client_test.go | 7 +- rpc/endpoints.go | 2 +- rpc/execution_pool.go | 99 +++++++++++++++++++ rpc/handler.go | 38 ++++--- rpc/http_test.go | 2 +- rpc/inproc.go | 8 +- rpc/ipc.go | 6 +- rpc/server.go | 56 ++++++++++- rpc/server_test.go | 2 +- rpc/subscription_test.go | 2 +- rpc/testservice_test.go | 2 +- rpc/websocket_test.go | 2 +- tests/bor/bor_test.go | 13 +++ tests/bor/helper.go | 2 +- 65 files changed, 671 insertions(+), 154 deletions(-) create mode 100644 rpc/execution_pool.go diff --git a/builder/files/config.toml b/builder/files/config.toml index 0f2919807ff3..1b8d915b7b4d 100644 --- a/builder/files/config.toml +++ b/builder/files/config.toml @@ -8,6 +8,8 @@ chain = "mainnet" datadir = "/var/lib/bor/data" # ancient = "" # keystore = "/var/lib/bor/keystore" +# "rpc.batchlimit" = 100 +# "rpc.returndatalimit" = 100000 syncmode = "full" # gcmode = "full" # snapshot = true @@ -73,6 +75,8 @@ syncmode = "full" # api = ["eth", "net", "web3", "txpool", "bor"] # vhosts = ["*"] # corsdomain = ["*"] +# ep-size = 40 +# ep-requesttimeout = "0s" # [jsonrpc.ws] # enabled = false # port = 8546 @@ -80,6 +84,8 @@ syncmode = "full" # host = "localhost" # api = ["web3", "net"] # origins = ["*"] +# ep-size = 40 +# ep-requesttimeout = "0s" # [jsonrpc.graphql] # enabled = false # port = 0 diff --git a/cmd/clef/main.go b/cmd/clef/main.go index f7c3adebc44a..1bfb2610e532 100644 --- a/cmd/clef/main.go +++ b/cmd/clef/main.go @@ -656,7 +656,7 @@ func signer(c *cli.Context) error { vhosts := utils.SplitAndTrim(c.GlobalString(utils.HTTPVirtualHostsFlag.Name)) cors := utils.SplitAndTrim(c.GlobalString(utils.HTTPCORSDomainFlag.Name)) - srv := rpc.NewServer() + srv := rpc.NewServer(0, 0) err := node.RegisterApis(rpcAPI, []string{"account"}, srv, false) if err != nil { utils.Fatalf("Could not register API: %w", err) diff --git a/consensus/bor/bor.go b/consensus/bor/bor.go index 1b4ddec45d1e..5b322637624c 100644 --- a/consensus/bor/bor.go +++ b/consensus/bor/bor.go @@ -1161,7 +1161,7 @@ func (c *Bor) CommitStates( processStart := time.Now() totalGas := 0 /// limit on gas for state sync per block chainID := c.chainConfig.ChainID.String() - stateSyncs := make([]*types.StateSyncData, len(eventRecords)) + stateSyncs := make([]*types.StateSyncData, 0, len(eventRecords)) var gasUsed uint64 diff --git a/core/blockchain.go b/core/blockchain.go index 74fd4bfeda4f..cbcf02fef433 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2059,7 +2059,7 @@ func (bc *BlockChain) collectLogs(hash common.Hash, removed bool) []*types.Log { receipts := rawdb.ReadReceipts(bc.db, hash, *number, bc.chainConfig) // Append bor receipt - borReceipt := rawdb.ReadBorReceipt(bc.db, hash, *number) + borReceipt := rawdb.ReadBorReceipt(bc.db, hash, *number, bc.chainConfig) if borReceipt != nil { receipts = append(receipts, borReceipt) } diff --git a/core/blockchain_reader.go b/core/blockchain_reader.go index f61f93049669..8405d4a54ccc 100644 --- a/core/blockchain_reader.go +++ b/core/blockchain_reader.go @@ -422,6 +422,10 @@ func (bc *BlockChain) SetStateSync(stateData []*types.StateSyncData) { bc.stateSyncData = stateData } +func (bc *BlockChain) GetStateSync() []*types.StateSyncData { + return bc.stateSyncData +} + // SubscribeStateSyncEvent registers a subscription of StateSyncEvent. func (bc *BlockChain) SubscribeStateSyncEvent(ch chan<- StateSyncEvent) event.Subscription { return bc.scope.Track(bc.stateSyncFeed.Subscribe(ch)) diff --git a/core/bor_blockchain.go b/core/bor_blockchain.go index ae2cdf3c6f65..49973421bdd8 100644 --- a/core/bor_blockchain.go +++ b/core/bor_blockchain.go @@ -19,7 +19,7 @@ func (bc *BlockChain) GetBorReceiptByHash(hash common.Hash) *types.Receipt { } // read bor reciept by hash and number - receipt := rawdb.ReadBorReceipt(bc.db, hash, *number) + receipt := rawdb.ReadBorReceipt(bc.db, hash, *number, bc.chainConfig) if receipt == nil { return nil } diff --git a/core/error.go b/core/error.go index 51ebefc137bc..234620ee4b6e 100644 --- a/core/error.go +++ b/core/error.go @@ -63,6 +63,10 @@ var ( // have enough funds for transfer(topmost call only). ErrInsufficientFundsForTransfer = errors.New("insufficient funds for transfer") + // ErrMaxInitCodeSizeExceeded is returned if creation transaction provides the init code bigger + // than init code size limit. + ErrMaxInitCodeSizeExceeded = errors.New("max initcode size exceeded") + // ErrInsufficientFunds is returned if the total cost of executing a transaction // is higher than the balance of the user's account. ErrInsufficientFunds = errors.New("insufficient funds for gas * price + value") diff --git a/core/rawdb/bor_receipt.go b/core/rawdb/bor_receipt.go index e2250837415c..0739c67a9f6b 100644 --- a/core/rawdb/bor_receipt.go +++ b/core/rawdb/bor_receipt.go @@ -7,6 +7,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" ) @@ -33,49 +34,28 @@ func borTxLookupKey(hash common.Hash) []byte { return append(borTxLookupPrefix, hash.Bytes()...) } -// HasBorReceipt verifies the existence of all block receipt belonging -// to a block. -func HasBorReceipt(db ethdb.Reader, hash common.Hash, number uint64) bool { - if has, err := db.Ancient(freezerHashTable, number); err == nil && common.BytesToHash(has) == hash { - return true - } - - if has, err := db.Has(borReceiptKey(number, hash)); !has || err != nil { - return false - } +func ReadBorReceiptRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue { + var data []byte - return true -} + err := db.ReadAncients(func(reader ethdb.AncientReader) error { + // Check if the data is in ancients + if isCanon(reader, number, hash) { + data, _ = reader.Ancient(freezerBorReceiptTable, number) -// ReadBorReceiptRLP retrieves the block receipt belonging to a block in RLP encoding. -func ReadBorReceiptRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue { - // First try to look up the data in ancient database. Extra hash - // comparison is necessary since ancient database only maintains - // the canonical data. - data, _ := db.Ancient(freezerBorReceiptTable, number) - if len(data) > 0 { - h, _ := db.Ancient(freezerHashTable, number) - if common.BytesToHash(h) == hash { - return data - } - } - // Then try to look up the data in leveldb. - data, _ = db.Get(borReceiptKey(number, hash)) - if len(data) > 0 { - return data - } - // In the background freezer is moving data from leveldb to flatten files. - // So during the first check for ancient db, the data is not yet in there, - // but when we reach into leveldb, the data was already moved. That would - // result in a not found error. - data, _ = db.Ancient(freezerBorReceiptTable, number) - if len(data) > 0 { - h, _ := db.Ancient(freezerHashTable, number) - if common.BytesToHash(h) == hash { - return data + return nil } + + // If not, try reading from leveldb + data, _ = db.Get(borReceiptKey(number, hash)) + + return nil + }) + + if err != nil { + log.Warn("during ReadBorReceiptRLP", "number", number, "hash", hash, "err", err) } - return nil // Can't find the data anywhere. + + return data } // ReadRawBorReceipt retrieves the block receipt belonging to a block. @@ -101,7 +81,11 @@ func ReadRawBorReceipt(db ethdb.Reader, hash common.Hash, number uint64) *types. // ReadBorReceipt retrieves all the bor block receipts belonging to a block, including // its correspoinding metadata fields. If it is unable to populate these metadata // fields then nil is returned. -func ReadBorReceipt(db ethdb.Reader, hash common.Hash, number uint64) *types.Receipt { +func ReadBorReceipt(db ethdb.Reader, hash common.Hash, number uint64, config *params.ChainConfig) *types.Receipt { + if config != nil && config.Bor != nil && config.Bor.Sprint != nil && !config.Bor.IsSprintStart(number) { + return nil + } + // We're deriving many fields from the block body, retrieve beside the receipt borReceipt := ReadRawBorReceipt(db, hash, number) if borReceipt == nil { diff --git a/core/tx_pool.go b/core/tx_pool.go index 764866868800..3d3f01eecb41 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -18,6 +18,7 @@ package core import ( "errors" + "fmt" "math" "math/big" "sort" @@ -604,6 +605,11 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { if uint64(tx.Size()) > txMaxSize { return ErrOversizedData } + // Check whether the init code size has been exceeded. + // (TODO): Add a hardfork check here while pulling upstream changes. + if tx.To() == nil && len(tx.Data()) > params.MaxInitCodeSize { + return fmt.Errorf("%w: code size %v limit %v", ErrMaxInitCodeSizeExceeded, len(tx.Data()), params.MaxInitCodeSize) + } // Transactions can't be negative. This may never happen using RLP decoded // transactions but may occur if you create a transaction using the RPC. if tx.Value().Sign() < 0 { diff --git a/docs/cli/example_config.toml b/docs/cli/example_config.toml index 64ef60ae12db..c32c40e2c6c5 100644 --- a/docs/cli/example_config.toml +++ b/docs/cli/example_config.toml @@ -8,6 +8,8 @@ log-level = "INFO" # Set log level for the server datadir = "var/lib/bor" # Path of the data directory to store information ancient = "" # Data directory for ancient chain segments (default = inside chaindata) keystore = "" # Path of the directory where keystores are located +"rpc.batchlimit" = 100 # Maximum number of messages in a batch (default=100, use 0 for no limits) +"rpc.returndatalimit" = 100000 # Maximum size (in bytes) a result of an rpc request could have (default=100000, use 0 for no limits) syncmode = "full" # Blockchain sync mode (only "full" sync supported) gcmode = "full" # Blockchain garbage collection mode ("full", "archive") snapshot = true # Enables the snapshot-database mode @@ -72,18 +74,22 @@ ethstats = "" # Reporting URL of a ethstats service (nodename:sec api = ["eth", "net", "web3", "txpool", "bor"] # API's offered over the HTTP-RPC interface vhosts = ["localhost"] # Comma separated list of virtual hostnames from which to accept requests (server enforced). Accepts '*' wildcard. corsdomain = ["localhost"] # Comma separated list of domains from which to accept cross origin requests (browser enforced) + ep-size = 40 # Maximum size of workers to run in rpc execution pool for HTTP requests (default: 40) + ep-requesttimeout = "0s" # Request Timeout for rpc execution pool for HTTP requests (default: 0s, 0s = disabled) [jsonrpc.ws] - enabled = false # Enable the WS-RPC server - port = 8546 # WS-RPC server listening port - prefix = "" # HTTP path prefix on which JSON-RPC is served. Use '/' to serve on all paths. - host = "localhost" # ws.addr - api = ["net", "web3"] # API's offered over the WS-RPC interface - origins = ["localhost"] # Origins from which to accept websockets requests + enabled = false # Enable the WS-RPC server + port = 8546 # WS-RPC server listening port + prefix = "" # HTTP path prefix on which JSON-RPC is served. Use '/' to serve on all paths. + host = "localhost" # ws.addr + api = ["net", "web3"] # API's offered over the WS-RPC interface + origins = ["localhost"] # Origins from which to accept websockets requests + ep-size = 40 # Maximum size of workers to run in rpc execution pool for WS requests (default: 40) + ep-requesttimeout = "0s" # Request Timeout for rpc execution pool for WS requests (default: 0s, 0s = disabled) [jsonrpc.graphql] enabled = false # Enable GraphQL on the HTTP-RPC server. Note that GraphQL can only be started if an HTTP server is started as well. - port = 0 # - prefix = "" # - host = "" # + port = 0 # + prefix = "" # + host = "" # vhosts = ["localhost"] # Comma separated list of virtual hostnames from which to accept requests (server enforced). Accepts '*' wildcard. corsdomain = ["localhost"] # Comma separated list of domains from which to accept cross origin requests (browser enforced) [jsonrpc.timeouts] @@ -91,6 +97,7 @@ ethstats = "" # Reporting URL of a ethstats service (nodename:sec write = "30s" idle = "2m0s" + [gpo] blocks = 20 # Number of recent blocks to check for gas prices percentile = 60 # Suggested gas price is the given percentile of a set of recent transaction gas prices diff --git a/docs/cli/server.md b/docs/cli/server.md index 5bc0ff1024c0..b91b000eb66f 100644 --- a/docs/cli/server.md +++ b/docs/cli/server.md @@ -16,6 +16,10 @@ The ```bor server``` command runs the Bor client. - ```keystore```: Path of the directory where keystores are located +- ```rpc.batchlimit```: Maximum number of messages in a batch (default=100, use 0 for no limits) (default: 100) + +- ```rpc.returndatalimit```: Maximum size (in bytes) a result of an rpc request could have (default=100000, use 0 for no limits) (default: 100000) + - ```config```: File for the config file - ```syncmode```: Blockchain sync mode (only "full" sync supported) (default: full) @@ -116,6 +120,10 @@ The ```bor server``` command runs the Bor client. - ```http.api```: API's offered over the HTTP-RPC interface (default: eth,net,web3,txpool,bor) +- ```http.ep-size```: Maximum size of workers to run in rpc execution pool for HTTP requests (default: 40) + +- ```http.ep-requesttimeout```: Request Timeout for rpc execution pool for HTTP requests (default: 0s) + - ```ws```: Enable the WS-RPC server (default: false) - ```ws.addr```: WS-RPC server listening interface (default: localhost) @@ -126,6 +134,10 @@ The ```bor server``` command runs the Bor client. - ```ws.api```: API's offered over the WS-RPC interface (default: net,web3) +- ```ws.ep-size```: Maximum size of workers to run in rpc execution pool for WS requests (default: 40) + +- ```ws.ep-requesttimeout```: Request Timeout for rpc execution pool for WS requests (default: 0s) + - ```graphql```: Enable GraphQL on the HTTP-RPC server. Note that GraphQL can only be started if an HTTP server is started as well. (default: false) ### P2P Options diff --git a/eth/api_backend.go b/eth/api_backend.go index c33f3cf6f282..60aea7527e35 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -317,6 +317,10 @@ func (b *EthAPIBackend) RPCGasCap() uint64 { return b.eth.config.RPCGasCap } +func (b *EthAPIBackend) RPCRpcReturnDataLimit() uint64 { + return b.eth.config.RPCReturnDataLimit +} + func (b *EthAPIBackend) RPCEVMTimeout() time.Duration { return b.eth.config.RPCEVMTimeout } diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index c9272758ab4d..68cf733cc68d 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -94,11 +94,12 @@ var Defaults = Config{ GasPrice: big.NewInt(params.GWei), Recommit: 125 * time.Second, }, - TxPool: core.DefaultTxPoolConfig, - RPCGasCap: 50000000, - RPCEVMTimeout: 5 * time.Second, - GPO: FullNodeGPO, - RPCTxFeeCap: 5, // 5 matic + TxPool: core.DefaultTxPoolConfig, + RPCGasCap: 50000000, + RPCReturnDataLimit: 100000, + RPCEVMTimeout: 5 * time.Second, + GPO: FullNodeGPO, + RPCTxFeeCap: 5, // 5 matic } func init() { @@ -199,6 +200,9 @@ type Config struct { // RPCGasCap is the global gas cap for eth-call variants. RPCGasCap uint64 + // Maximum size (in bytes) a result of an rpc request could have + RPCReturnDataLimit uint64 + // RPCEVMTimeout is the global timeout for eth-call. RPCEVMTimeout time.Duration diff --git a/eth/filters/bor_api.go b/eth/filters/bor_api.go index db13c9595965..aeb370d6be41 100644 --- a/eth/filters/bor_api.go +++ b/eth/filters/bor_api.go @@ -1,7 +1,6 @@ package filters import ( - "bytes" "context" "errors" @@ -19,7 +18,7 @@ func (api *PublicFilterAPI) SetChainConfig(chainConfig *params.ChainConfig) { func (api *PublicFilterAPI) GetBorBlockLogs(ctx context.Context, crit FilterCriteria) ([]*types.Log, error) { if api.chainConfig == nil { - return nil, errors.New("No chain config found. Proper PublicFilterAPI initialization required") + return nil, errors.New("no chain config found. Proper PublicFilterAPI initialization required") } // get sprint from bor config @@ -67,8 +66,8 @@ func (api *PublicFilterAPI) NewDeposits(ctx context.Context, crit ethereum.State for { select { case h := <-stateSyncData: - if crit.ID == h.ID || bytes.Compare(crit.Contract.Bytes(), h.Contract.Bytes()) == 0 || - (crit.ID == 0 && crit.Contract == common.Address{}) { + if h != nil && (crit.ID == h.ID || crit.Contract == h.Contract || + (crit.ID == 0 && crit.Contract == common.Address{})) { notifier.Notify(rpcSub.ID, h) } case <-rpcSub.Err(): diff --git a/eth/filters/test_backend.go b/eth/filters/test_backend.go index 979ed3efb6f5..8b2ef4a7f29f 100644 --- a/eth/filters/test_backend.go +++ b/eth/filters/test_backend.go @@ -38,7 +38,7 @@ func (b *TestBackend) GetBorBlockReceipt(ctx context.Context, hash common.Hash) return &types.Receipt{}, nil } - receipt := rawdb.ReadBorReceipt(b.DB, hash, *number) + receipt := rawdb.ReadBorReceipt(b.DB, hash, *number, nil) if receipt == nil { return &types.Receipt{}, nil } diff --git a/eth/tracers/api.go b/eth/tracers/api.go index 3fce91ac9c3c..13f5c627cd82 100644 --- a/eth/tracers/api.go +++ b/eth/tracers/api.go @@ -177,7 +177,7 @@ func (api *API) getAllBlockTransactions(ctx context.Context, block *types.Block) stateSyncPresent := false - borReceipt := rawdb.ReadBorReceipt(api.backend.ChainDb(), block.Hash(), block.NumberU64()) + borReceipt := rawdb.ReadBorReceipt(api.backend.ChainDb(), block.Hash(), block.NumberU64(), api.backend.ChainConfig()) if borReceipt != nil { txHash := types.GetDerivedBorTxHash(types.BorReceiptKey(block.Number().Uint64(), block.Hash())) if txHash != (common.Hash{}) { diff --git a/eth/tracers/api_test.go b/eth/tracers/api_test.go index 6dd94e487089..d394e4fbe35f 100644 --- a/eth/tracers/api_test.go +++ b/eth/tracers/api_test.go @@ -126,6 +126,10 @@ func (b *testBackend) RPCGasCap() uint64 { return 25000000 } +func (b *testBackend) RPCRpcReturnDataLimit() uint64 { + return 100000 +} + func (b *testBackend) ChainConfig() *params.ChainConfig { return b.chainConfig } diff --git a/go.mod b/go.mod index 36595ca3076d..f55b2f9aa751 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.3.0 github.com/BurntSushi/toml v1.1.0 github.com/JekaMas/go-grpc-net-conn v0.0.0-20220708155319-6aff21f2d13d + github.com/JekaMas/workerpool v1.1.5 github.com/VictoriaMetrics/fastcache v1.6.0 github.com/aws/aws-sdk-go-v2 v1.2.0 github.com/aws/aws-sdk-go-v2/config v1.1.1 @@ -84,6 +85,8 @@ require ( pgregory.net/rapid v0.4.8 ) +require github.com/gammazero/deque v0.2.1 // indirect + require ( github.com/Azure/azure-sdk-for-go/sdk/azcore v0.21.1 // indirect github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.3 // indirect diff --git a/go.sum b/go.sum index 96fa9d3f040a..4b312ccfb181 100644 --- a/go.sum +++ b/go.sum @@ -31,6 +31,8 @@ github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= github.com/JekaMas/go-grpc-net-conn v0.0.0-20220708155319-6aff21f2d13d h1:RO27lgfZF8s9lZ3pWyzc0gCE0RZC+6/PXbRjAa0CNp8= github.com/JekaMas/go-grpc-net-conn v0.0.0-20220708155319-6aff21f2d13d/go.mod h1:romz7UPgSYhfJkKOalzEEyV6sWtt/eAEm0nX2aOrod0= +github.com/JekaMas/workerpool v1.1.5 h1:xmrx2Zyft95CEGiEqzDxiawptCIRZQ0zZDhTGDFOCaw= +github.com/JekaMas/workerpool v1.1.5/go.mod h1:IoDWPpwMcA27qbuugZKeBslDrgX09lVmksuh9sjzbhc= github.com/Masterminds/goutils v1.1.0 h1:zukEsf/1JZwCMgHiK3GZftabmxiCw4apj3a28RPBiVg= github.com/Masterminds/goutils v1.1.0/go.mod h1:8cTjp+g8YejhMuvIA5y2vz3BpJxksy863GQaJW2MFNU= github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww= @@ -157,6 +159,8 @@ github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/gammazero/deque v0.2.1 h1:qSdsbG6pgp6nL7A0+K/B7s12mcCY/5l5SIUpMOl+dC0= +github.com/gammazero/deque v0.2.1/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU= github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff h1:tY80oXqGNY4FhTFhk+o9oFHGINQ/+vhlm8HFzi6znCI= github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff/go.mod h1:x7DCsMOv1taUwEWCzT4cmDeAkigA5/QCwUodaVOe8Ww= github.com/getkin/kin-openapi v0.53.0/go.mod h1:7Yn5whZr5kJi6t+kShccXS8ae1APpYTW6yheSwk8Yi4= diff --git a/internal/cli/debug_pprof.go b/internal/cli/debug_pprof.go index 01698719e57e..4cbe98940881 100644 --- a/internal/cli/debug_pprof.go +++ b/internal/cli/debug_pprof.go @@ -7,6 +7,7 @@ import ( "fmt" "strings" + "google.golang.org/grpc" empty "google.golang.org/protobuf/types/known/emptypb" "github.com/ethereum/go-ethereum/internal/cli/flagset" @@ -103,7 +104,7 @@ func (d *DebugPprofCommand) Run(args []string) int { req.Profile = profile } - stream, err := clt.DebugPprof(ctx, req) + stream, err := clt.DebugPprof(ctx, req, grpc.MaxCallRecvMsgSize(1024*1024*1024)) if err != nil { return err diff --git a/internal/cli/dumpconfig.go b/internal/cli/dumpconfig.go index a748af3357c8..787eab2d13ae 100644 --- a/internal/cli/dumpconfig.go +++ b/internal/cli/dumpconfig.go @@ -55,6 +55,8 @@ func (c *DumpconfigCommand) Run(args []string) int { userConfig.JsonRPC.HttpTimeout.ReadTimeoutRaw = userConfig.JsonRPC.HttpTimeout.ReadTimeout.String() userConfig.JsonRPC.HttpTimeout.WriteTimeoutRaw = userConfig.JsonRPC.HttpTimeout.WriteTimeout.String() userConfig.JsonRPC.HttpTimeout.IdleTimeoutRaw = userConfig.JsonRPC.HttpTimeout.IdleTimeout.String() + userConfig.JsonRPC.Http.ExecutionPoolRequestTimeoutRaw = userConfig.JsonRPC.Http.ExecutionPoolRequestTimeout.String() + userConfig.JsonRPC.Ws.ExecutionPoolRequestTimeoutRaw = userConfig.JsonRPC.Ws.ExecutionPoolRequestTimeout.String() userConfig.TxPool.RejournalRaw = userConfig.TxPool.Rejournal.String() userConfig.TxPool.LifeTimeRaw = userConfig.TxPool.LifeTime.String() userConfig.Sealer.GasPriceRaw = userConfig.Sealer.GasPrice.String() diff --git a/internal/cli/server/config.go b/internal/cli/server/config.go index 35d7e19359d8..ca7a235ace3a 100644 --- a/internal/cli/server/config.go +++ b/internal/cli/server/config.go @@ -60,6 +60,12 @@ type Config struct { // KeyStoreDir is the directory to store keystores KeyStoreDir string `hcl:"keystore,optional" toml:"keystore,optional"` + // Maximum number of messages in a batch (default=100, use 0 for no limits) + RPCBatchLimit uint64 `hcl:"rpc.batchlimit,optional" toml:"rpc.batchlimit,optional"` + + // Maximum size (in bytes) a result of an rpc request could have (default=100000, use 0 for no limits) + RPCReturnDataLimit uint64 `hcl:"rpc.returndatalimit,optional" toml:"rpc.returndatalimit,optional"` + // SyncMode selects the sync protocol SyncMode string `hcl:"syncmode,optional" toml:"syncmode,optional"` @@ -275,6 +281,13 @@ type APIConfig struct { // Origins is the list of endpoints to accept requests from (only consumed for websockets) Origins []string `hcl:"origins,optional" toml:"origins,optional"` + + // ExecutionPoolSize is max size of workers to be used for rpc execution + ExecutionPoolSize uint64 `hcl:"ep-size,optional" toml:"ep-size,optional"` + + // ExecutionPoolRequestTimeout is timeout used by execution pool for rpc execution + ExecutionPoolRequestTimeout time.Duration `hcl:"-,optional" toml:"-"` + ExecutionPoolRequestTimeoutRaw string `hcl:"ep-requesttimeout,optional" toml:"ep-requesttimeout,optional"` } // Used from rpc.HTTPTimeouts @@ -435,12 +448,14 @@ type DeveloperConfig struct { func DefaultConfig() *Config { return &Config{ - Chain: "mainnet", - Identity: Hostname(), - RequiredBlocks: map[string]string{}, - LogLevel: "INFO", - DataDir: DefaultDataDir(), - Ancient: "", + Chain: "mainnet", + Identity: Hostname(), + RequiredBlocks: map[string]string{}, + LogLevel: "INFO", + DataDir: DefaultDataDir(), + Ancient: "", + RPCBatchLimit: 100, + RPCReturnDataLimit: 100000, P2P: &P2PConfig{ MaxPeers: 50, MaxPendPeers: 50, @@ -499,21 +514,25 @@ func DefaultConfig() *Config { GasCap: ethconfig.Defaults.RPCGasCap, TxFeeCap: ethconfig.Defaults.RPCTxFeeCap, Http: &APIConfig{ - Enabled: false, - Port: 8545, - Prefix: "", - Host: "localhost", - API: []string{"eth", "net", "web3", "txpool", "bor"}, - Cors: []string{"localhost"}, - VHost: []string{"localhost"}, + Enabled: false, + Port: 8545, + Prefix: "", + Host: "localhost", + API: []string{"eth", "net", "web3", "txpool", "bor"}, + Cors: []string{"localhost"}, + VHost: []string{"localhost"}, + ExecutionPoolSize: 40, + ExecutionPoolRequestTimeout: 0, }, Ws: &APIConfig{ - Enabled: false, - Port: 8546, - Prefix: "", - Host: "localhost", - API: []string{"net", "web3"}, - Origins: []string{"localhost"}, + Enabled: false, + Port: 8546, + Prefix: "", + Host: "localhost", + API: []string{"net", "web3"}, + Origins: []string{"localhost"}, + ExecutionPoolSize: 40, + ExecutionPoolRequestTimeout: 0, }, Graphql: &APIConfig{ Enabled: false, @@ -620,6 +639,8 @@ func (c *Config) fillTimeDurations() error { {"jsonrpc.timeouts.read", &c.JsonRPC.HttpTimeout.ReadTimeout, &c.JsonRPC.HttpTimeout.ReadTimeoutRaw}, {"jsonrpc.timeouts.write", &c.JsonRPC.HttpTimeout.WriteTimeout, &c.JsonRPC.HttpTimeout.WriteTimeoutRaw}, {"jsonrpc.timeouts.idle", &c.JsonRPC.HttpTimeout.IdleTimeout, &c.JsonRPC.HttpTimeout.IdleTimeoutRaw}, + {"jsonrpc.ws.ep-requesttimeout", &c.JsonRPC.Ws.ExecutionPoolRequestTimeout, &c.JsonRPC.Ws.ExecutionPoolRequestTimeoutRaw}, + {"jsonrpc.http.ep-requesttimeout", &c.JsonRPC.Http.ExecutionPoolRequestTimeout, &c.JsonRPC.Http.ExecutionPoolRequestTimeoutRaw}, {"txpool.lifetime", &c.TxPool.LifeTime, &c.TxPool.LifeTimeRaw}, {"txpool.rejournal", &c.TxPool.Rejournal, &c.TxPool.RejournalRaw}, {"cache.rejournal", &c.Cache.Rejournal, &c.Cache.RejournalRaw}, @@ -883,6 +904,7 @@ func (c *Config) buildEth(stack *node.Node, accountManager *accounts.Manager) (* n.Preimages = c.Cache.Preimages n.TxLookupLimit = c.Cache.TxLookupLimit n.TrieTimeout = c.Cache.TrieTimeout + n.TriesInMemory = c.Cache.TriesInMemory } n.RPCGasCap = c.JsonRPC.GasCap @@ -936,6 +958,8 @@ func (c *Config) buildEth(stack *node.Node, accountManager *accounts.Manager) (* n.BorLogs = c.BorLogs n.DatabaseHandles = dbHandles + n.RPCReturnDataLimit = c.RPCReturnDataLimit + if c.Ancient != "" { n.DatabaseFreezer = c.Ancient } @@ -986,6 +1010,11 @@ func (c *Config) buildNode() (*node.Config, error) { WriteTimeout: c.JsonRPC.HttpTimeout.WriteTimeout, IdleTimeout: c.JsonRPC.HttpTimeout.IdleTimeout, }, + RPCBatchLimit: c.RPCBatchLimit, + WSJsonRPCExecutionPoolSize: c.JsonRPC.Ws.ExecutionPoolSize, + WSJsonRPCExecutionPoolRequestTimeout: c.JsonRPC.Ws.ExecutionPoolRequestTimeout, + HTTPJsonRPCExecutionPoolSize: c.JsonRPC.Http.ExecutionPoolSize, + HTTPJsonRPCExecutionPoolRequestTimeout: c.JsonRPC.Http.ExecutionPoolRequestTimeout, } // dev mode diff --git a/internal/cli/server/flags.go b/internal/cli/server/flags.go index 822bb81aefd4..abf5fa3465e9 100644 --- a/internal/cli/server/flags.go +++ b/internal/cli/server/flags.go @@ -46,6 +46,18 @@ func (c *Command) Flags() *flagset.Flagset { Usage: "Path of the directory where keystores are located", Value: &c.cliConfig.KeyStoreDir, }) + f.Uint64Flag(&flagset.Uint64Flag{ + Name: "rpc.batchlimit", + Usage: "Maximum number of messages in a batch (default=100, use 0 for no limits)", + Value: &c.cliConfig.RPCBatchLimit, + Default: c.cliConfig.RPCBatchLimit, + }) + f.Uint64Flag(&flagset.Uint64Flag{ + Name: "rpc.returndatalimit", + Usage: "Maximum size (in bytes) a result of an rpc request could have (default=100000, use 0 for no limits)", + Value: &c.cliConfig.RPCReturnDataLimit, + Default: c.cliConfig.RPCReturnDataLimit, + }) f.StringFlag(&flagset.StringFlag{ Name: "config", Usage: "File for the config file", @@ -432,6 +444,20 @@ func (c *Command) Flags() *flagset.Flagset { Default: c.cliConfig.JsonRPC.Http.API, Group: "JsonRPC", }) + f.Uint64Flag(&flagset.Uint64Flag{ + Name: "http.ep-size", + Usage: "Maximum size of workers to run in rpc execution pool for HTTP requests", + Value: &c.cliConfig.JsonRPC.Http.ExecutionPoolSize, + Default: c.cliConfig.JsonRPC.Http.ExecutionPoolSize, + Group: "JsonRPC", + }) + f.DurationFlag(&flagset.DurationFlag{ + Name: "http.ep-requesttimeout", + Usage: "Request Timeout for rpc execution pool for HTTP requests", + Value: &c.cliConfig.JsonRPC.Http.ExecutionPoolRequestTimeout, + Default: c.cliConfig.JsonRPC.Http.ExecutionPoolRequestTimeout, + Group: "JsonRPC", + }) // ws options f.BoolFlag(&flagset.BoolFlag{ @@ -469,6 +495,20 @@ func (c *Command) Flags() *flagset.Flagset { Default: c.cliConfig.JsonRPC.Ws.API, Group: "JsonRPC", }) + f.Uint64Flag(&flagset.Uint64Flag{ + Name: "ws.ep-size", + Usage: "Maximum size of workers to run in rpc execution pool for WS requests", + Value: &c.cliConfig.JsonRPC.Ws.ExecutionPoolSize, + Default: c.cliConfig.JsonRPC.Ws.ExecutionPoolSize, + Group: "JsonRPC", + }) + f.DurationFlag(&flagset.DurationFlag{ + Name: "ws.ep-requesttimeout", + Usage: "Request Timeout for rpc execution pool for WS requests", + Value: &c.cliConfig.JsonRPC.Ws.ExecutionPoolRequestTimeout, + Default: c.cliConfig.JsonRPC.Ws.ExecutionPoolRequestTimeout, + Group: "JsonRPC", + }) // graphql options f.BoolFlag(&flagset.BoolFlag{ diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index 2fd148c7c666..49b161098734 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -627,6 +627,10 @@ func (s *PublicBlockChainAPI) GetTransactionReceiptsByBlock(ctx context.Context, return nil, err } + if block == nil { + return nil, errors.New("block not found") + } + receipts, err := s.b.GetReceipts(ctx, block.Hash()) if err != nil { return nil, err @@ -636,7 +640,7 @@ func (s *PublicBlockChainAPI) GetTransactionReceiptsByBlock(ctx context.Context, var txHash common.Hash - borReceipt := rawdb.ReadBorReceipt(s.b.ChainDb(), block.Hash(), block.NumberU64()) + borReceipt := rawdb.ReadBorReceipt(s.b.ChainDb(), block.Hash(), block.NumberU64(), s.b.ChainConfig()) if borReceipt != nil { receipts = append(receipts, borReceipt) txHash = types.GetDerivedBorTxHash(types.BorReceiptKey(block.Number().Uint64(), block.Hash())) @@ -1078,6 +1082,11 @@ func (s *PublicBlockChainAPI) Call(ctx context.Context, args TransactionArgs, bl if err != nil { return nil, err } + + if int(s.b.RPCRpcReturnDataLimit()) > 0 && len(result.ReturnData) > int(s.b.RPCRpcReturnDataLimit()) { + return nil, fmt.Errorf("call returned result of length %d exceeding limit %d", len(result.ReturnData), int(s.b.RPCRpcReturnDataLimit())) + } + // If the result contains a revert reason, try to unpack and return it. if len(result.Revert()) > 0 { return nil, newRevertError(result) @@ -1448,15 +1457,23 @@ func newRPCPendingTransaction(tx *types.Transaction, current *types.Header, conf func newRPCTransactionFromBlockIndex(b *types.Block, index uint64, config *params.ChainConfig, db ethdb.Database) *RPCTransaction { txs := b.Transactions() - borReceipt := rawdb.ReadBorReceipt(db, b.Hash(), b.NumberU64()) - if borReceipt != nil { - tx, _, _, _ := rawdb.ReadBorTransaction(db, borReceipt.TxHash) + if index >= uint64(len(txs)+1) { + return nil + } - if tx != nil { - txs = append(txs, tx) + // If the index out of the range of transactions defined in block body, it means that the transaction is a bor state sync transaction, and we need to fetch it from the database + if index == uint64(len(txs)) { + borReceipt := rawdb.ReadBorReceipt(db, b.Hash(), b.NumberU64(), config) + if borReceipt != nil { + tx, _, _, _ := rawdb.ReadBorTransaction(db, borReceipt.TxHash) + + if tx != nil { + txs = append(txs, tx) + } } } + // If the index is still out of the range after checking bor state sync transaction, it means that the transaction index is invalid if index >= uint64(len(txs)) { return nil } @@ -1597,7 +1614,7 @@ func (api *PublicTransactionPoolAPI) getAllBlockTransactions(ctx context.Context stateSyncPresent := false - borReceipt := rawdb.ReadBorReceipt(api.b.ChainDb(), block.Hash(), block.NumberU64()) + borReceipt := rawdb.ReadBorReceipt(api.b.ChainDb(), block.Hash(), block.NumberU64(), api.b.ChainConfig()) if borReceipt != nil { txHash := types.GetDerivedBorTxHash(types.BorReceiptKey(block.Number().Uint64(), block.Hash())) if txHash != (common.Hash{}) { @@ -1767,7 +1784,7 @@ func (s *PublicTransactionPoolAPI) GetTransactionReceipt(ctx context.Context, ha if borTx { // Fetch bor block receipt - receipt = rawdb.ReadBorReceipt(s.b.ChainDb(), blockHash, blockNumber) + receipt = rawdb.ReadBorReceipt(s.b.ChainDb(), blockHash, blockNumber, s.b.ChainConfig()) } else { receipts, err := s.b.GetReceipts(ctx, blockHash) if err != nil { diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index 1287640b83b7..14ddbba70ed9 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -48,10 +48,11 @@ type Backend interface { ChainDb() ethdb.Database AccountManager() *accounts.Manager ExtRPCEnabled() bool - RPCGasCap() uint64 // global gas cap for eth_call over rpc: DoS protection - RPCEVMTimeout() time.Duration // global timeout for eth_call over rpc: DoS protection - RPCTxFeeCap() float64 // global tx fee cap for all transaction related APIs - UnprotectedAllowed() bool // allows only for EIP155 transactions. + RPCGasCap() uint64 // global gas cap for eth_call over rpc: DoS protection + RPCRpcReturnDataLimit() uint64 // Maximum size (in bytes) a result of an rpc request could have + RPCEVMTimeout() time.Duration // global timeout for eth_call over rpc: DoS protection + RPCTxFeeCap() float64 // global tx fee cap for all transaction related APIs + UnprotectedAllowed() bool // allows only for EIP155 transactions. // Blockchain API SetHead(number uint64) diff --git a/internal/web3ext/web3ext.go b/internal/web3ext/web3ext.go index dcdd5baf234b..c823f096d658 100644 --- a/internal/web3ext/web3ext.go +++ b/internal/web3ext/web3ext.go @@ -192,6 +192,34 @@ web3._extend({ name: 'stopWS', call: 'admin_stopWS' }), + new web3._extend.Method({ + name: 'getExecutionPoolSize', + call: 'admin_getExecutionPoolSize' + }), + new web3._extend.Method({ + name: 'getExecutionPoolRequestTimeout', + call: 'admin_getExecutionPoolRequestTimeout' + }), + // new web3._extend.Method({ + // name: 'setWSExecutionPoolRequestTimeout', + // call: 'admin_setWSExecutionPoolRequestTimeout', + // params: 1 + // }), + // new web3._extend.Method({ + // name: 'setHttpExecutionPoolRequestTimeout', + // call: 'admin_setHttpExecutionPoolRequestTimeout', + // params: 1 + // }), + new web3._extend.Method({ + name: 'setWSExecutionPoolSize', + call: 'admin_setWSExecutionPoolSize', + params: 1 + }), + new web3._extend.Method({ + name: 'setHttpExecutionPoolSize', + call: 'admin_setHttpExecutionPoolSize', + params: 1 + }), ], properties: [ new web3._extend.Property({ diff --git a/les/api_backend.go b/les/api_backend.go index c716a3967f79..786e77ed46c8 100644 --- a/les/api_backend.go +++ b/les/api_backend.go @@ -294,6 +294,10 @@ func (b *LesApiBackend) RPCGasCap() uint64 { return b.eth.config.RPCGasCap } +func (b *LesApiBackend) RPCRpcReturnDataLimit() uint64 { + return b.eth.config.RPCReturnDataLimit +} + func (b *LesApiBackend) RPCEVMTimeout() time.Duration { return b.eth.config.RPCEVMTimeout } diff --git a/miner/worker.go b/miner/worker.go index 797e7ea980b9..30809cd558db 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -1314,9 +1314,9 @@ func (w *worker) commit(ctx context.Context, env *environment, interval func(), tracing.SetAttributes( span, - attribute.Int("number", int(block.Number().Uint64())), - attribute.String("hash", block.Hash().String()), - attribute.String("sealhash", w.engine.SealHash(block.Header()).String()), + attribute.Int("number", int(env.header.Number.Uint64())), + attribute.String("hash", env.header.Hash().String()), + attribute.String("sealhash", w.engine.SealHash(env.header).String()), attribute.Int("len of env.txs", len(env.txs)), attribute.Bool("error", err != nil), ) diff --git a/node/api.go b/node/api.go index 1b32399f635c..f8e7f944a6a0 100644 --- a/node/api.go +++ b/node/api.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "strings" + "time" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/crypto" @@ -342,3 +343,91 @@ func (s *publicWeb3API) ClientVersion() string { func (s *publicWeb3API) Sha3(input hexutil.Bytes) hexutil.Bytes { return crypto.Keccak256(input) } + +type ExecutionPoolSize struct { + HttpLimit int + WSLimit int +} + +type ExecutionPoolRequestTimeout struct { + HttpLimit time.Duration + WSLimit time.Duration +} + +func (api *privateAdminAPI) GetExecutionPoolSize() *ExecutionPoolSize { + var httpLimit int + if api.node.http.host != "" { + httpLimit = api.node.http.httpHandler.Load().(*rpcHandler).server.GetExecutionPoolSize() + } + + var wsLimit int + if api.node.ws.host != "" { + wsLimit = api.node.ws.wsHandler.Load().(*rpcHandler).server.GetExecutionPoolSize() + } + + executionPoolSize := &ExecutionPoolSize{ + HttpLimit: httpLimit, + WSLimit: wsLimit, + } + + return executionPoolSize +} + +func (api *privateAdminAPI) GetExecutionPoolRequestTimeout() *ExecutionPoolRequestTimeout { + var httpLimit time.Duration + if api.node.http.host != "" { + httpLimit = api.node.http.httpHandler.Load().(*rpcHandler).server.GetExecutionPoolRequestTimeout() + } + + var wsLimit time.Duration + if api.node.ws.host != "" { + wsLimit = api.node.ws.wsHandler.Load().(*rpcHandler).server.GetExecutionPoolRequestTimeout() + } + + executionPoolRequestTimeout := &ExecutionPoolRequestTimeout{ + HttpLimit: httpLimit, + WSLimit: wsLimit, + } + + return executionPoolRequestTimeout +} + +// func (api *privateAdminAPI) SetWSExecutionPoolRequestTimeout(n int) *ExecutionPoolRequestTimeout { +// if api.node.ws.host != "" { +// api.node.ws.wsConfig.executionPoolRequestTimeout = time.Duration(n) * time.Millisecond +// api.node.ws.wsHandler.Load().(*rpcHandler).server.SetExecutionPoolRequestTimeout(time.Duration(n) * time.Millisecond) +// log.Warn("updating ws execution pool request timeout", "timeout", n) +// } + +// return api.GetExecutionPoolRequestTimeout() +// } + +// func (api *privateAdminAPI) SetHttpExecutionPoolRequestTimeout(n int) *ExecutionPoolRequestTimeout { +// if api.node.http.host != "" { +// api.node.http.httpConfig.executionPoolRequestTimeout = time.Duration(n) * time.Millisecond +// api.node.http.httpHandler.Load().(*rpcHandler).server.SetExecutionPoolRequestTimeout(time.Duration(n) * time.Millisecond) +// log.Warn("updating http execution pool request timeout", "timeout", n) +// } + +// return api.GetExecutionPoolRequestTimeout() +// } + +func (api *privateAdminAPI) SetWSExecutionPoolSize(n int) *ExecutionPoolSize { + if api.node.ws.host != "" { + api.node.ws.wsConfig.executionPoolSize = uint64(n) + api.node.ws.wsHandler.Load().(*rpcHandler).server.SetExecutionPoolSize(n) + log.Warn("updating ws execution pool size", "threads", n) + } + + return api.GetExecutionPoolSize() +} + +func (api *privateAdminAPI) SetHttpExecutionPoolSize(n int) *ExecutionPoolSize { + if api.node.http.host != "" { + api.node.http.httpConfig.executionPoolSize = uint64(n) + api.node.http.httpHandler.Load().(*rpcHandler).server.SetExecutionPoolSize(n) + log.Warn("updating http execution pool size", "threads", n) + } + + return api.GetExecutionPoolSize() +} diff --git a/node/config.go b/node/config.go index 853190c95f71..c8f40c10627a 100644 --- a/node/config.go +++ b/node/config.go @@ -25,6 +25,7 @@ import ( "runtime" "strings" "sync" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" @@ -204,6 +205,14 @@ type Config struct { // JWTSecret is the hex-encoded jwt secret. JWTSecret string `toml:",omitempty"` + + // Maximum number of messages in a batch + RPCBatchLimit uint64 `toml:",omitempty"` + // Configs for RPC execution pool + WSJsonRPCExecutionPoolSize uint64 `toml:",omitempty"` + WSJsonRPCExecutionPoolRequestTimeout time.Duration `toml:",omitempty"` + HTTPJsonRPCExecutionPoolSize uint64 `toml:",omitempty"` + HTTPJsonRPCExecutionPoolRequestTimeout time.Duration `toml:",omitempty"` } // IPCEndpoint resolves an IPC endpoint based on a configured value, taking into diff --git a/node/node.go b/node/node.go index e12bcf6675f8..5cf233d17afb 100644 --- a/node/node.go +++ b/node/node.go @@ -105,7 +105,7 @@ func New(conf *Config) (*Node, error) { node := &Node{ config: conf, - inprocHandler: rpc.NewServer(), + inprocHandler: rpc.NewServer(0, 0), eventmux: new(event.TypeMux), log: conf.Logger, stop: make(chan struct{}), @@ -113,6 +113,9 @@ func New(conf *Config) (*Node, error) { databases: make(map[*closeTrackingDB]struct{}), } + // set RPC batch limit + node.inprocHandler.SetRPCBatchLimit(conf.RPCBatchLimit) + // Register built-in APIs. node.rpcAPIs = append(node.rpcAPIs, node.apis()...) @@ -153,10 +156,10 @@ func New(conf *Config) (*Node, error) { } // Configure RPC servers. - node.http = newHTTPServer(node.log, conf.HTTPTimeouts) - node.httpAuth = newHTTPServer(node.log, conf.HTTPTimeouts) - node.ws = newHTTPServer(node.log, rpc.DefaultHTTPTimeouts) - node.wsAuth = newHTTPServer(node.log, rpc.DefaultHTTPTimeouts) + node.http = newHTTPServer(node.log, conf.HTTPTimeouts, conf.RPCBatchLimit) + node.httpAuth = newHTTPServer(node.log, conf.HTTPTimeouts, conf.RPCBatchLimit) + node.ws = newHTTPServer(node.log, rpc.DefaultHTTPTimeouts, conf.RPCBatchLimit) + node.wsAuth = newHTTPServer(node.log, rpc.DefaultHTTPTimeouts, conf.RPCBatchLimit) node.ipc = newIPCServer(node.log, conf.IPCEndpoint()) return node, nil @@ -402,10 +405,12 @@ func (n *Node) startRPC() error { return err } if err := server.enableRPC(apis, httpConfig{ - CorsAllowedOrigins: n.config.HTTPCors, - Vhosts: n.config.HTTPVirtualHosts, - Modules: n.config.HTTPModules, - prefix: n.config.HTTPPathPrefix, + CorsAllowedOrigins: n.config.HTTPCors, + Vhosts: n.config.HTTPVirtualHosts, + Modules: n.config.HTTPModules, + prefix: n.config.HTTPPathPrefix, + executionPoolSize: n.config.HTTPJsonRPCExecutionPoolSize, + executionPoolRequestTimeout: n.config.HTTPJsonRPCExecutionPoolRequestTimeout, }); err != nil { return err } @@ -419,9 +424,11 @@ func (n *Node) startRPC() error { return err } if err := server.enableWS(n.rpcAPIs, wsConfig{ - Modules: n.config.WSModules, - Origins: n.config.WSOrigins, - prefix: n.config.WSPathPrefix, + Modules: n.config.WSModules, + Origins: n.config.WSOrigins, + prefix: n.config.WSPathPrefix, + executionPoolSize: n.config.WSJsonRPCExecutionPoolSize, + executionPoolRequestTimeout: n.config.WSJsonRPCExecutionPoolRequestTimeout, }); err != nil { return err } diff --git a/node/rpcstack.go b/node/rpcstack.go index eabf1dcae73c..cba9a22f6f02 100644 --- a/node/rpcstack.go +++ b/node/rpcstack.go @@ -28,6 +28,7 @@ import ( "strings" "sync" "sync/atomic" + "time" "github.com/rs/cors" @@ -42,6 +43,10 @@ type httpConfig struct { Vhosts []string prefix string // path prefix on which to mount http handler jwtSecret []byte // optional JWT secret + + // Execution pool config + executionPoolSize uint64 + executionPoolRequestTimeout time.Duration } // wsConfig is the JSON-RPC/Websocket configuration @@ -50,6 +55,10 @@ type wsConfig struct { Modules []string prefix string // path prefix on which to mount ws handler jwtSecret []byte // optional JWT secret + + // Execution pool config + executionPoolSize uint64 + executionPoolRequestTimeout time.Duration } type rpcHandler struct { @@ -81,10 +90,12 @@ type httpServer struct { port int handlerNames map[string]string + + RPCBatchLimit uint64 } -func newHTTPServer(log log.Logger, timeouts rpc.HTTPTimeouts) *httpServer { - h := &httpServer{log: log, timeouts: timeouts, handlerNames: make(map[string]string)} +func newHTTPServer(log log.Logger, timeouts rpc.HTTPTimeouts, rpcBatchLimit uint64) *httpServer { + h := &httpServer{log: log, timeouts: timeouts, handlerNames: make(map[string]string), RPCBatchLimit: rpcBatchLimit} h.httpHandler.Store((*rpcHandler)(nil)) h.wsHandler.Store((*rpcHandler)(nil)) @@ -282,7 +293,8 @@ func (h *httpServer) enableRPC(apis []rpc.API, config httpConfig) error { } // Create RPC server and handler. - srv := rpc.NewServer() + srv := rpc.NewServer(config.executionPoolSize, config.executionPoolRequestTimeout) + srv.SetRPCBatchLimit(h.RPCBatchLimit) if err := RegisterApis(apis, config.Modules, srv, false); err != nil { return err } @@ -313,7 +325,8 @@ func (h *httpServer) enableWS(apis []rpc.API, config wsConfig) error { return fmt.Errorf("JSON-RPC over WebSocket is already enabled") } // Create RPC server and handler. - srv := rpc.NewServer() + srv := rpc.NewServer(config.executionPoolSize, config.executionPoolRequestTimeout) + srv.SetRPCBatchLimit(h.RPCBatchLimit) if err := RegisterApis(apis, config.Modules, srv, false); err != nil { return err } diff --git a/node/rpcstack_test.go b/node/rpcstack_test.go index 60fcab5a9001..49db8435aca0 100644 --- a/node/rpcstack_test.go +++ b/node/rpcstack_test.go @@ -234,7 +234,7 @@ func Test_checkPath(t *testing.T) { func createAndStartServer(t *testing.T, conf *httpConfig, ws bool, wsConf *wsConfig) *httpServer { t.Helper() - srv := newHTTPServer(testlog.Logger(t, log.LvlDebug), rpc.DefaultHTTPTimeouts) + srv := newHTTPServer(testlog.Logger(t, log.LvlDebug), rpc.DefaultHTTPTimeouts, 100) assert.NoError(t, srv.enableRPC(nil, *conf)) if ws { assert.NoError(t, srv.enableWS(nil, *wsConf)) diff --git a/packaging/templates/mainnet-v1/archive/config.toml b/packaging/templates/mainnet-v1/archive/config.toml index 9eaafd3bee6a..5491c784efd9 100644 --- a/packaging/templates/mainnet-v1/archive/config.toml +++ b/packaging/templates/mainnet-v1/archive/config.toml @@ -4,6 +4,8 @@ chain = "mainnet" datadir = "/var/lib/bor/data" # ancient = "" # keystore = "" +# "rpc.batchlimit" = 100 +# "rpc.returndatalimit" = 100000 syncmode = "full" gcmode = "archive" # snapshot = true @@ -65,6 +67,8 @@ gcmode = "archive" vhosts = ["*"] corsdomain = ["*"] # prefix = "" + # ep-size = 40 + # ep-requesttimeout = "0s" [jsonrpc.ws] enabled = true port = 8546 @@ -72,6 +76,8 @@ gcmode = "archive" # host = "localhost" # api = ["web3", "net"] origins = ["*"] + # ep-size = 40 + # ep-requesttimeout = "0s" # [jsonrpc.graphql] # enabled = false # port = 0 diff --git a/packaging/templates/mainnet-v1/sentry/sentry/bor/config.toml b/packaging/templates/mainnet-v1/sentry/sentry/bor/config.toml index 94dd6634f09a..90df84dc0721 100644 --- a/packaging/templates/mainnet-v1/sentry/sentry/bor/config.toml +++ b/packaging/templates/mainnet-v1/sentry/sentry/bor/config.toml @@ -4,6 +4,8 @@ chain = "mainnet" datadir = "/var/lib/bor/data" # ancient = "" # keystore = "" +# "rpc.batchlimit" = 100 +# "rpc.returndatalimit" = 100000 syncmode = "full" # gcmode = "full" # snapshot = true @@ -65,6 +67,8 @@ syncmode = "full" vhosts = ["*"] corsdomain = ["*"] # prefix = "" + # ep-size = 40 +# ep-requesttimeout = "0s" # [jsonrpc.ws] # enabled = false # port = 8546 @@ -72,6 +76,8 @@ syncmode = "full" # host = "localhost" # api = ["web3", "net"] # origins = ["*"] + # ep-size = 40 +# ep-requesttimeout = "0s" # [jsonrpc.graphql] # enabled = false # port = 0 diff --git a/packaging/templates/mainnet-v1/sentry/validator/bor/config.toml b/packaging/templates/mainnet-v1/sentry/validator/bor/config.toml index 9c55683c9694..9e2d80fd2a63 100644 --- a/packaging/templates/mainnet-v1/sentry/validator/bor/config.toml +++ b/packaging/templates/mainnet-v1/sentry/validator/bor/config.toml @@ -6,6 +6,8 @@ chain = "mainnet" datadir = "/var/lib/bor/data" # ancient = "" # keystore = "$BOR_DIR/keystore" +# "rpc.batchlimit" = 100 +# "rpc.returndatalimit" = 100000 syncmode = "full" # gcmode = "full" # snapshot = true @@ -67,6 +69,8 @@ syncmode = "full" vhosts = ["*"] corsdomain = ["*"] # prefix = "" + # ep-size = 40 +# ep-requesttimeout = "0s" # [jsonrpc.ws] # enabled = false # port = 8546 @@ -74,6 +78,8 @@ syncmode = "full" # host = "localhost" # api = ["web3", "net"] # origins = ["*"] + # ep-size = 40 +# ep-requesttimeout = "0s" # [jsonrpc.graphql] # enabled = false # port = 0 diff --git a/packaging/templates/mainnet-v1/without-sentry/bor/config.toml b/packaging/templates/mainnet-v1/without-sentry/bor/config.toml index 573f1f3be895..1e5fd6776245 100644 --- a/packaging/templates/mainnet-v1/without-sentry/bor/config.toml +++ b/packaging/templates/mainnet-v1/without-sentry/bor/config.toml @@ -6,6 +6,8 @@ chain = "mainnet" datadir = "/var/lib/bor/data" # ancient = "" # keystore = "$BOR_DIR/keystore" +# "rpc.batchlimit" = 100 +# "rpc.returndatalimit" = 100000 syncmode = "full" # gcmode = "full" # snapshot = true @@ -67,6 +69,8 @@ syncmode = "full" vhosts = ["*"] corsdomain = ["*"] # prefix = "" + # ep-size = 40 +# ep-requesttimeout = "0s" # [jsonrpc.ws] # enabled = false # port = 8546 @@ -74,6 +78,8 @@ syncmode = "full" # host = "localhost" # api = ["web3", "net"] # origins = ["*"] + # ep-size = 40 +# ep-requesttimeout = "0s" # [jsonrpc.graphql] # enabled = false # port = 0 diff --git a/packaging/templates/package_scripts/control b/packaging/templates/package_scripts/control index cb62165a5e96..c036378b0e6e 100644 --- a/packaging/templates/package_scripts/control +++ b/packaging/templates/package_scripts/control @@ -1,5 +1,5 @@ Source: bor -Version: 0.3.3 +Version: 0.3.4-stable Section: develop Priority: standard Maintainer: Polygon diff --git a/packaging/templates/package_scripts/control.arm64 b/packaging/templates/package_scripts/control.arm64 index 56276cb43ac9..3f45d5564f43 100644 --- a/packaging/templates/package_scripts/control.arm64 +++ b/packaging/templates/package_scripts/control.arm64 @@ -1,5 +1,5 @@ Source: bor -Version: 0.3.3 +Version: 0.3.4-stable Section: develop Priority: standard Maintainer: Polygon diff --git a/packaging/templates/package_scripts/control.profile.amd64 b/packaging/templates/package_scripts/control.profile.amd64 index 4ddd8424ff48..e3261e3f57de 100644 --- a/packaging/templates/package_scripts/control.profile.amd64 +++ b/packaging/templates/package_scripts/control.profile.amd64 @@ -1,5 +1,5 @@ Source: bor-profile -Version: 0.3.3 +Version: 0.3.4-stable Section: develop Priority: standard Maintainer: Polygon diff --git a/packaging/templates/package_scripts/control.profile.arm64 b/packaging/templates/package_scripts/control.profile.arm64 index 9f9301c9258d..93c22067702d 100644 --- a/packaging/templates/package_scripts/control.profile.arm64 +++ b/packaging/templates/package_scripts/control.profile.arm64 @@ -1,5 +1,5 @@ Source: bor-profile -Version: 0.3.3 +Version: 0.3.4-stable Section: develop Priority: standard Maintainer: Polygon diff --git a/packaging/templates/package_scripts/control.validator b/packaging/templates/package_scripts/control.validator index d43250c89131..3c5633d11f4e 100644 --- a/packaging/templates/package_scripts/control.validator +++ b/packaging/templates/package_scripts/control.validator @@ -1,5 +1,5 @@ Source: bor-profile -Version: 0.3.3 +Version: 0.3.4-stable Section: develop Priority: standard Maintainer: Polygon diff --git a/packaging/templates/package_scripts/control.validator.arm64 b/packaging/templates/package_scripts/control.validator.arm64 index 5a50f8cb39c7..e475d91bd35a 100644 --- a/packaging/templates/package_scripts/control.validator.arm64 +++ b/packaging/templates/package_scripts/control.validator.arm64 @@ -1,5 +1,5 @@ Source: bor-profile -Version: 0.3.3 +Version: 0.3.4-stable Section: develop Priority: standard Maintainer: Polygon diff --git a/packaging/templates/testnet-v4/archive/config.toml b/packaging/templates/testnet-v4/archive/config.toml index 1762fdf117b7..fb9ffd0a176e 100644 --- a/packaging/templates/testnet-v4/archive/config.toml +++ b/packaging/templates/testnet-v4/archive/config.toml @@ -4,6 +4,8 @@ chain = "mumbai" datadir = "/var/lib/bor/data" # ancient = "" # keystore = "" +# "rpc.batchlimit" = 100 +# "rpc.returndatalimit" = 100000 syncmode = "full" gcmode = "archive" # snapshot = true @@ -65,6 +67,8 @@ gcmode = "archive" vhosts = ["*"] corsdomain = ["*"] # prefix = "" + # ep-size = 40 + # ep-requesttimeout = "0s" [jsonrpc.ws] enabled = true port = 8546 @@ -72,6 +76,8 @@ gcmode = "archive" # host = "localhost" # api = ["web3", "net"] origins = ["*"] + # ep-size = 40 + # ep-requesttimeout = "0s" # [jsonrpc.graphql] # enabled = false # port = 0 diff --git a/packaging/templates/testnet-v4/sentry/sentry/bor/config.toml b/packaging/templates/testnet-v4/sentry/sentry/bor/config.toml index ae191cec2c7d..9884c0ecccae 100644 --- a/packaging/templates/testnet-v4/sentry/sentry/bor/config.toml +++ b/packaging/templates/testnet-v4/sentry/sentry/bor/config.toml @@ -4,6 +4,8 @@ chain = "mumbai" datadir = "/var/lib/bor/data" # ancient = "" # keystore = "" +# "rpc.batchlimit" = 100 +# "rpc.returndatalimit" = 100000 syncmode = "full" # gcmode = "full" # snapshot = true @@ -65,6 +67,8 @@ syncmode = "full" vhosts = ["*"] corsdomain = ["*"] # prefix = "" + # ep-size = 40 + # ep-requesttimeout = "0s" # [jsonrpc.ws] # enabled = false # port = 8546 @@ -72,6 +76,8 @@ syncmode = "full" # host = "localhost" # api = ["web3", "net"] # origins = ["*"] + # ep-size = 40 + # ep-requesttimeout = "0s" # [jsonrpc.graphql] # enabled = false # port = 0 diff --git a/packaging/templates/testnet-v4/sentry/validator/bor/config.toml b/packaging/templates/testnet-v4/sentry/validator/bor/config.toml index b441cc137d29..49c47fedd4f6 100644 --- a/packaging/templates/testnet-v4/sentry/validator/bor/config.toml +++ b/packaging/templates/testnet-v4/sentry/validator/bor/config.toml @@ -6,6 +6,8 @@ chain = "mumbai" datadir = "/var/lib/bor/data" # ancient = "" # keystore = "$BOR_DIR/keystore" +# "rpc.batchlimit" = 100 +# "rpc.returndatalimit" = 100000 syncmode = "full" # gcmode = "full" # snapshot = true @@ -67,6 +69,8 @@ syncmode = "full" vhosts = ["*"] corsdomain = ["*"] # prefix = "" + # ep-size = 40 +# ep-requesttimeout = "0s" # [jsonrpc.ws] # enabled = false # port = 8546 @@ -74,6 +78,8 @@ syncmode = "full" # host = "localhost" # api = ["web3", "net"] # origins = ["*"] + # ep-size = 40 +# ep-requesttimeout = "0s" # [jsonrpc.graphql] # enabled = false # port = 0 diff --git a/packaging/templates/testnet-v4/without-sentry/bor/config.toml b/packaging/templates/testnet-v4/without-sentry/bor/config.toml index 05a254e18423..2fb83a6ae2e2 100644 --- a/packaging/templates/testnet-v4/without-sentry/bor/config.toml +++ b/packaging/templates/testnet-v4/without-sentry/bor/config.toml @@ -6,6 +6,8 @@ chain = "mumbai" datadir = "/var/lib/bor/data" # ancient = "" # keystore = "$BOR_DIR/keystore" +# "rpc.batchlimit" = 100 +# "rpc.returndatalimit" = 100000 syncmode = "full" # gcmode = "full" # snapshot = true @@ -67,6 +69,8 @@ syncmode = "full" vhosts = ["*"] corsdomain = ["*"] # prefix = "" + # ep-size = 40 + # ep-requesttimeout = "0s" # [jsonrpc.ws] # enabled = false # port = 8546 @@ -74,6 +78,8 @@ syncmode = "full" # host = "localhost" # api = ["web3", "net"] # origins = ["*"] + # ep-size = 40 + # ep-requesttimeout = "0s" # [jsonrpc.graphql] # enabled = false # port = 0 diff --git a/params/config.go b/params/config.go index 94729224bb49..9833c9eac5ca 100644 --- a/params/config.go +++ b/params/config.go @@ -617,6 +617,10 @@ func (c *BorConfig) IsDelhi(number *big.Int) bool { return isForked(c.DelhiBlock, number) } +func (c *BorConfig) IsSprintStart(number uint64) bool { + return number%c.CalculateSprint(number) == 0 +} + func (c *BorConfig) calculateBorConfigHelper(field map[string]uint64, number uint64) uint64 { keys := make([]string, 0, len(field)) for k := range field { diff --git a/params/protocol_params.go b/params/protocol_params.go index d468af5d3c96..103266caff08 100644 --- a/params/protocol_params.go +++ b/params/protocol_params.go @@ -125,7 +125,8 @@ const ( ElasticityMultiplier = 2 // Bounds the maximum gas limit an EIP-1559 block may have. InitialBaseFee = 1000000000 // Initial base fee for EIP-1559 blocks. - MaxCodeSize = 24576 // Maximum bytecode to permit for a contract + MaxCodeSize = 24576 // Maximum bytecode to permit for a contract + MaxInitCodeSize = 2 * MaxCodeSize // Maximum initcode to permit in a creation transaction and create instructions // Precompiled contract gas prices diff --git a/params/version.go b/params/version.go index 199e49095fda..37b67a87e958 100644 --- a/params/version.go +++ b/params/version.go @@ -23,7 +23,7 @@ import ( const ( VersionMajor = 0 // Major version component of the current release VersionMinor = 3 // Minor version component of the current release - VersionPatch = 3 // Patch version component of the current release + VersionPatch = 4 // Patch version component of the current release VersionMeta = "stable" // Version metadata to append to the version string ) diff --git a/rpc/client.go b/rpc/client.go index d3ce0297754c..fc286fe8dc74 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -112,7 +112,7 @@ func (c *Client) newClientConn(conn ServerCodec) *clientConn { ctx := context.Background() ctx = context.WithValue(ctx, clientContextKey{}, c) ctx = context.WithValue(ctx, peerInfoContextKey{}, conn.peerInfo()) - handler := newHandler(ctx, conn, c.idgen, c.services) + handler := newHandler(ctx, conn, c.idgen, c.services, NewExecutionPool(100, 0)) return &clientConn{conn, handler} } diff --git a/rpc/client_test.go b/rpc/client_test.go index fa6010bb199c..1bebd2767774 100644 --- a/rpc/client_test.go +++ b/rpc/client_test.go @@ -33,12 +33,14 @@ import ( "time" "github.com/davecgh/go-spew/spew" + "github.com/ethereum/go-ethereum/log" ) func TestClientRequest(t *testing.T) { server := newTestServer() defer server.Stop() + client := DialInProc(server) defer client.Close() @@ -46,6 +48,7 @@ func TestClientRequest(t *testing.T) { if err := client.Call(&resp, "test_echo", "hello", 10, &echoArgs{"world"}); err != nil { t.Fatal(err) } + if !reflect.DeepEqual(resp, echoResult{"hello", 10, &echoArgs{"world"}}) { t.Errorf("incorrect result %#v", resp) } @@ -407,7 +410,7 @@ func TestClientSubscriptionUnsubscribeServer(t *testing.T) { t.Parallel() // Create the server. - srv := NewServer() + srv := NewServer(0, 0) srv.RegisterName("nftest", new(notificationTestService)) p1, p2 := net.Pipe() recorder := &unsubscribeRecorder{ServerCodec: NewCodec(p1)} @@ -443,7 +446,7 @@ func TestClientSubscriptionChannelClose(t *testing.T) { t.Parallel() var ( - srv = NewServer() + srv = NewServer(0, 0) httpsrv = httptest.NewServer(srv.WebsocketHandler(nil)) wsURL = "ws:" + strings.TrimPrefix(httpsrv.URL, "http:") ) diff --git a/rpc/endpoints.go b/rpc/endpoints.go index d78ebe2858bc..2a539d4fc51f 100644 --- a/rpc/endpoints.go +++ b/rpc/endpoints.go @@ -27,7 +27,7 @@ import ( func StartIPCEndpoint(ipcEndpoint string, apis []API) (net.Listener, *Server, error) { // Register all the APIs exposed by the services. var ( - handler = NewServer() + handler = NewServer(0, 0) regMap = make(map[string]struct{}) registered []string ) diff --git a/rpc/execution_pool.go b/rpc/execution_pool.go new file mode 100644 index 000000000000..d0f5ab5daa59 --- /dev/null +++ b/rpc/execution_pool.go @@ -0,0 +1,99 @@ +package rpc + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "github.com/JekaMas/workerpool" +) + +type SafePool struct { + executionPool *atomic.Pointer[workerpool.WorkerPool] + + sync.RWMutex + + timeout time.Duration + size int + + // Skip sending task to execution pool + fastPath bool +} + +func NewExecutionPool(initialSize int, timeout time.Duration) *SafePool { + sp := &SafePool{ + size: initialSize, + timeout: timeout, + } + + if initialSize == 0 { + sp.fastPath = true + + return sp + } + + var ptr atomic.Pointer[workerpool.WorkerPool] + + p := workerpool.New(initialSize) + ptr.Store(p) + sp.executionPool = &ptr + + return sp +} + +func (s *SafePool) Submit(ctx context.Context, fn func() error) (<-chan error, bool) { + if s.fastPath { + go func() { + _ = fn() + }() + + return nil, true + } + + if s.executionPool == nil { + return nil, false + } + + pool := s.executionPool.Load() + if pool == nil { + return nil, false + } + + return pool.Submit(ctx, fn, s.Timeout()), true +} + +func (s *SafePool) ChangeSize(n int) { + oldPool := s.executionPool.Swap(workerpool.New(n)) + + if oldPool != nil { + go func() { + oldPool.StopWait() + }() + } + + s.Lock() + s.size = n + s.Unlock() +} + +func (s *SafePool) ChangeTimeout(n time.Duration) { + s.Lock() + defer s.Unlock() + + s.timeout = n +} + +func (s *SafePool) Timeout() time.Duration { + s.RLock() + defer s.RUnlock() + + return s.timeout +} + +func (s *SafePool) Size() int { + s.RLock() + defer s.RUnlock() + + return s.size +} diff --git a/rpc/handler.go b/rpc/handler.go index 488a29300a10..f1fb555c0097 100644 --- a/rpc/handler.go +++ b/rpc/handler.go @@ -34,21 +34,20 @@ import ( // // The entry points for incoming messages are: // -// h.handleMsg(message) -// h.handleBatch(message) +// h.handleMsg(message) +// h.handleBatch(message) // // Outgoing calls use the requestOp struct. Register the request before sending it // on the connection: // -// op := &requestOp{ids: ...} -// h.addRequestOp(op) +// op := &requestOp{ids: ...} +// h.addRequestOp(op) // // Now send the request, then wait for the reply to be delivered through handleMsg: // -// if err := op.wait(...); err != nil { -// h.removeRequestOp(op) // timeout, etc. -// } -// +// if err := op.wait(...); err != nil { +// h.removeRequestOp(op) // timeout, etc. +// } type handler struct { reg *serviceRegistry unsubscribeCb *callback @@ -64,6 +63,8 @@ type handler struct { subLock sync.Mutex serverSubs map[ID]*Subscription + + executionPool *SafePool } type callProc struct { @@ -71,7 +72,7 @@ type callProc struct { notifiers []*Notifier } -func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg *serviceRegistry) *handler { +func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg *serviceRegistry, pool *SafePool) *handler { rootCtx, cancelRoot := context.WithCancel(connCtx) h := &handler{ reg: reg, @@ -84,11 +85,13 @@ func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg * allowSubscribe: true, serverSubs: make(map[ID]*Subscription), log: log.Root(), + executionPool: pool, } if conn.remoteAddr() != "" { h.log = h.log.New("conn", conn.remoteAddr()) } h.unsubscribeCb = newCallback(reflect.Value{}, reflect.ValueOf(h.unsubscribe)) + return h } @@ -219,12 +222,16 @@ func (h *handler) cancelServerSubscriptions(err error) { // startCallProc runs fn in a new goroutine and starts tracking it in the h.calls wait group. func (h *handler) startCallProc(fn func(*callProc)) { h.callWG.Add(1) - go func() { - ctx, cancel := context.WithCancel(h.rootCtx) + + ctx, cancel := context.WithCancel(h.rootCtx) + + h.executionPool.Submit(context.Background(), func() error { defer h.callWG.Done() defer cancel() fn(&callProc{ctx: ctx}) - }() + + return nil + }) } // handleImmediate executes non-call messages. It returns false if the message is a @@ -261,6 +268,7 @@ func (h *handler) handleSubscriptionResult(msg *jsonrpcMessage) { // handleResponse processes method call responses. func (h *handler) handleResponse(msg *jsonrpcMessage) { + op := h.respWait[string(msg.ID)] if op == nil { h.log.Debug("Unsolicited RPC response", "reqid", idForLog{msg.ID}) @@ -281,7 +289,11 @@ func (h *handler) handleResponse(msg *jsonrpcMessage) { return } if op.err = json.Unmarshal(msg.Result, &op.sub.subid); op.err == nil { - go op.sub.run() + h.executionPool.Submit(context.Background(), func() error { + op.sub.run() + return nil + }) + h.clientSubs[op.sub.subid] = op.sub } } diff --git a/rpc/http_test.go b/rpc/http_test.go index c84d7705f205..9737e64e918c 100644 --- a/rpc/http_test.go +++ b/rpc/http_test.go @@ -103,7 +103,7 @@ func TestHTTPResponseWithEmptyGet(t *testing.T) { func TestHTTPRespBodyUnlimited(t *testing.T) { const respLength = maxRequestContentLength * 3 - s := NewServer() + s := NewServer(0, 0) defer s.Stop() s.RegisterName("test", largeRespService{respLength}) ts := httptest.NewServer(s) diff --git a/rpc/inproc.go b/rpc/inproc.go index fbe9a40ceca9..29af5507b9be 100644 --- a/rpc/inproc.go +++ b/rpc/inproc.go @@ -26,7 +26,13 @@ func DialInProc(handler *Server) *Client { initctx := context.Background() c, _ := newClient(initctx, func(context.Context) (ServerCodec, error) { p1, p2 := net.Pipe() - go handler.ServeCodec(NewCodec(p1), 0) + + //nolint:contextcheck + handler.executionPool.Submit(initctx, func() error { + handler.ServeCodec(NewCodec(p1), 0) + return nil + }) + return NewCodec(p2), nil }) return c diff --git a/rpc/ipc.go b/rpc/ipc.go index 07a211c6277c..76fbd13f92fb 100644 --- a/rpc/ipc.go +++ b/rpc/ipc.go @@ -35,7 +35,11 @@ func (s *Server) ServeListener(l net.Listener) error { return err } log.Trace("Accepted RPC connection", "conn", conn.RemoteAddr()) - go s.ServeCodec(NewCodec(conn), 0) + + s.executionPool.Submit(context.Background(), func() error { + s.ServeCodec(NewCodec(conn), 0) + return nil + }) } } diff --git a/rpc/server.go b/rpc/server.go index babc5688e264..04ee2dc87b71 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -18,10 +18,13 @@ package rpc import ( "context" + "fmt" "io" "sync/atomic" + "time" mapset "github.com/deckarep/golang-set" + "github.com/ethereum/go-ethereum/log" ) @@ -47,11 +50,20 @@ type Server struct { idgen func() ID run int32 codecs mapset.Set + + BatchLimit uint64 + executionPool *SafePool } // NewServer creates a new server instance with no registered handlers. -func NewServer() *Server { - server := &Server{idgen: randomIDGenerator(), codecs: mapset.NewSet(), run: 1} +func NewServer(executionPoolSize uint64, executionPoolRequesttimeout time.Duration) *Server { + server := &Server{ + idgen: randomIDGenerator(), + codecs: mapset.NewSet(), + run: 1, + executionPool: NewExecutionPool(int(executionPoolSize), executionPoolRequesttimeout), + } + // Register the default service providing meta information about the RPC service such // as the services and methods it offers. rpcService := &RPCService{server} @@ -59,6 +71,26 @@ func NewServer() *Server { return server } +func (s *Server) SetRPCBatchLimit(batchLimit uint64) { + s.BatchLimit = batchLimit +} + +func (s *Server) SetExecutionPoolSize(n int) { + s.executionPool.ChangeSize(n) +} + +func (s *Server) SetExecutionPoolRequestTimeout(n time.Duration) { + s.executionPool.ChangeTimeout(n) +} + +func (s *Server) GetExecutionPoolRequestTimeout() time.Duration { + return s.executionPool.Timeout() +} + +func (s *Server) GetExecutionPoolSize() int { + return s.executionPool.Size() +} + // RegisterName creates a service for the given receiver type under the given name. When no // methods on the given receiver match the criteria to be either a RPC method or a // subscription an error is returned. Otherwise a new service is created and added to the @@ -98,20 +130,34 @@ func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec) { return } - h := newHandler(ctx, codec, s.idgen, &s.services) + h := newHandler(ctx, codec, s.idgen, &s.services, s.executionPool) + h.allowSubscribe = false defer h.close(io.EOF, nil) reqs, batch, err := codec.readBatch() if err != nil { if err != io.EOF { - codec.writeJSON(ctx, errorMessage(&invalidMessageError{"parse error"})) + if err1 := codec.writeJSON(ctx, err); err1 != nil { + log.Warn("WARNING - error in reading batch", "err", err1) + return + } } return } + if batch { - h.handleBatch(reqs) + if s.BatchLimit > 0 && len(reqs) > int(s.BatchLimit) { + if err1 := codec.writeJSON(ctx, errorMessage(fmt.Errorf("batch limit %d exceeded: %d requests given", s.BatchLimit, len(reqs)))); err1 != nil { + log.Warn("WARNING - requests given exceeds the batch limit", "err", err1) + log.Debug("batch limit %d exceeded: %d requests given", s.BatchLimit, len(reqs)) + } + } else { + //nolint:contextcheck + h.handleBatch(reqs) + } } else { + //nolint:contextcheck h.handleMsg(reqs[0]) } } diff --git a/rpc/server_test.go b/rpc/server_test.go index e67893710dc2..166956681bdc 100644 --- a/rpc/server_test.go +++ b/rpc/server_test.go @@ -29,7 +29,7 @@ import ( ) func TestServerRegisterName(t *testing.T) { - server := NewServer() + server := NewServer(0, 0) service := new(testService) if err := server.RegisterName("test", service); err != nil { diff --git a/rpc/subscription_test.go b/rpc/subscription_test.go index 54a053dba805..cfca1b24b92e 100644 --- a/rpc/subscription_test.go +++ b/rpc/subscription_test.go @@ -53,7 +53,7 @@ func TestSubscriptions(t *testing.T) { subCount = len(namespaces) notificationCount = 3 - server = NewServer() + server = NewServer(0, 0) clientConn, serverConn = net.Pipe() out = json.NewEncoder(clientConn) in = json.NewDecoder(clientConn) diff --git a/rpc/testservice_test.go b/rpc/testservice_test.go index 253e26328900..228582177904 100644 --- a/rpc/testservice_test.go +++ b/rpc/testservice_test.go @@ -26,7 +26,7 @@ import ( ) func newTestServer() *Server { - server := NewServer() + server := NewServer(0, 0) server.idgen = sequentialIDGenerator() if err := server.RegisterName("test", new(testService)); err != nil { panic(err) diff --git a/rpc/websocket_test.go b/rpc/websocket_test.go index f74b7fd08bb4..b805ed20234f 100644 --- a/rpc/websocket_test.go +++ b/rpc/websocket_test.go @@ -203,7 +203,7 @@ func TestClientWebsocketPing(t *testing.T) { // This checks that the websocket transport can deal with large messages. func TestClientWebsocketLargeMessage(t *testing.T) { var ( - srv = NewServer() + srv = NewServer(0, 0) httpsrv = httptest.NewServer(srv.WebsocketHandler(nil)) wsURL = "ws:" + strings.TrimPrefix(httpsrv.URL, "http:") ) diff --git a/tests/bor/bor_test.go b/tests/bor/bor_test.go index d059956e6a4d..2dc20a915ed7 100644 --- a/tests/bor/bor_test.go +++ b/tests/bor/bor_test.go @@ -7,6 +7,7 @@ import ( "context" "crypto/ecdsa" "encoding/hex" + "fmt" "io" "math/big" "os" @@ -458,9 +459,21 @@ func TestFetchStateSyncEvents(t *testing.T) { _bor.SetHeimdallClient(h) block = buildNextBlock(t, _bor, chain, block, nil, init.genesis.Config.Bor, nil, res.Result.ValidatorSet.Validators) + + // Validate the state sync transactions set by consensus + validateStateSyncEvents(t, eventRecords, chain.GetStateSync()) + insertNewBlock(t, chain, block) } +func validateStateSyncEvents(t *testing.T, expected []*clerk.EventRecordWithTime, got []*types.StateSyncData) { + require.Equal(t, len(expected), len(got), "number of state sync events should be equal") + + for i := 0; i < len(expected); i++ { + require.Equal(t, expected[i].ID, got[i].ID, fmt.Sprintf("state sync ids should be equal - index: %d, expected: %d, got: %d", i, expected[i].ID, got[i].ID)) + } +} + func TestFetchStateSyncEvents_2(t *testing.T) { init := buildEthereumInstance(t, rawdb.NewMemoryDatabase()) chain := init.ethereum.BlockChain() diff --git a/tests/bor/helper.go b/tests/bor/helper.go index 64d5c299ac6e..e28076a3b15f 100644 --- a/tests/bor/helper.go +++ b/tests/bor/helper.go @@ -360,7 +360,7 @@ func generateFakeStateSyncEvents(sample *clerk.EventRecordWithTime, count int) [ *events[0] = event for i := 1; i < count; i++ { - event.ID = uint64(i) + event.ID = uint64(i + 1) event.Time = event.Time.Add(1 * time.Second) events[i] = &clerk.EventRecordWithTime{} *events[i] = event