Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make logs subscription channel size configurable #9810

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/rpcdaemon/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func RootCommand() (*cobra.Command, *httpcfg.HttpCfg) {
rootCmd.PersistentFlags().IntVar(&cfg.MaxGetProofRewindBlockCount, utils.RpcMaxGetProofRewindBlockCount.Name, utils.RpcMaxGetProofRewindBlockCount.Value, utils.RpcMaxGetProofRewindBlockCount.Usage)
rootCmd.PersistentFlags().Uint64Var(&cfg.OtsMaxPageSize, utils.OtsSearchMaxCapFlag.Name, utils.OtsSearchMaxCapFlag.Value, utils.OtsSearchMaxCapFlag.Usage)
rootCmd.PersistentFlags().DurationVar(&cfg.RPCSlowLogThreshold, utils.RPCSlowFlag.Name, utils.RPCSlowFlag.Value, utils.RPCSlowFlag.Usage)
rootCmd.PersistentFlags().IntVar(&cfg.WebsocketSubscribeLogsChannelSize, utils.WSSubscribeLogsChannelSize.Name, utils.WSSubscribeLogsChannelSize.Value, utils.WSSubscribeLogsChannelSize.Usage)

if err := rootCmd.MarkPersistentFlagFilename("rpc.accessList", "json"); err != nil {
panic(err)
Expand Down
31 changes: 16 additions & 15 deletions cmd/rpcdaemon/cli/httpcfg/http_cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,22 @@ type HttpCfg struct {
AuthRpcPort int
PrivateApiAddr string

API []string
Gascap uint64
MaxTraces uint64
WebsocketPort int
WebsocketEnabled bool
WebsocketCompression bool
RpcAllowListFilePath string
RpcBatchConcurrency uint
RpcStreamingDisable bool
DBReadConcurrency int
TraceCompatibility bool // Bug for bug compatibility for trace_ routines with OpenEthereum
TxPoolApiAddr string
StateCache kvcache.CoherentConfig
Snap ethconfig.BlocksFreezing
Sync ethconfig.Sync
API []string
Gascap uint64
MaxTraces uint64
WebsocketPort int
WebsocketEnabled bool
WebsocketCompression bool
WebsocketSubscribeLogsChannelSize int
RpcAllowListFilePath string
RpcBatchConcurrency uint
RpcStreamingDisable bool
DBReadConcurrency int
TraceCompatibility bool // Bug for bug compatibility for trace_ routines with OpenEthereum
TxPoolApiAddr string
StateCache kvcache.CoherentConfig
Snap ethconfig.BlocksFreezing
Sync ethconfig.Sync

// GRPC server
GRPCServerEnabled bool
Expand Down
5 changes: 5 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,11 @@ var (
Usage: "HTTP path prefix on which JSON-RPC is served. Use '/' to serve on all paths.",
Value: "",
}
WSSubscribeLogsChannelSize = cli.IntFlag{
Name: "ws.api.subscribelogs.channelsize",
Usage: "Size of the channel used for websocket logs subscriptions",
Value: 8192,
}
ExecFlag = cli.StringFlag{
Name: "exec",
Usage: "Execute JavaScript statement",
Expand Down
33 changes: 17 additions & 16 deletions turbo/cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,22 +476,23 @@ func setEmbeddedRpcDaemon(ctx *cli.Context, cfg *nodecfg.Config, logger log.Logg
WriteTimeout: ctx.Duration(AuthRpcWriteTimeoutFlag.Name),
IdleTimeout: ctx.Duration(HTTPIdleTimeoutFlag.Name),
},
EvmCallTimeout: ctx.Duration(EvmCallTimeoutFlag.Name),
OverlayGetLogsTimeout: ctx.Duration(OverlayGetLogsFlag.Name),
OverlayReplayBlockTimeout: ctx.Duration(OverlayReplayBlockFlag.Name),
WebsocketPort: ctx.Int(utils.WSPortFlag.Name),
WebsocketEnabled: ctx.IsSet(utils.WSEnabledFlag.Name),
RpcBatchConcurrency: ctx.Uint(utils.RpcBatchConcurrencyFlag.Name),
RpcStreamingDisable: ctx.Bool(utils.RpcStreamingDisableFlag.Name),
DBReadConcurrency: ctx.Int(utils.DBReadConcurrencyFlag.Name),
RpcAllowListFilePath: ctx.String(utils.RpcAccessListFlag.Name),
Gascap: ctx.Uint64(utils.RpcGasCapFlag.Name),
MaxTraces: ctx.Uint64(utils.TraceMaxtracesFlag.Name),
TraceCompatibility: ctx.Bool(utils.RpcTraceCompatFlag.Name),
BatchLimit: ctx.Int(utils.RpcBatchLimit.Name),
ReturnDataLimit: ctx.Int(utils.RpcReturnDataLimit.Name),
AllowUnprotectedTxs: ctx.Bool(utils.AllowUnprotectedTxs.Name),
MaxGetProofRewindBlockCount: ctx.Int(utils.RpcMaxGetProofRewindBlockCount.Name),
EvmCallTimeout: ctx.Duration(EvmCallTimeoutFlag.Name),
OverlayGetLogsTimeout: ctx.Duration(OverlayGetLogsFlag.Name),
OverlayReplayBlockTimeout: ctx.Duration(OverlayReplayBlockFlag.Name),
WebsocketPort: ctx.Int(utils.WSPortFlag.Name),
WebsocketEnabled: ctx.IsSet(utils.WSEnabledFlag.Name),
WebsocketSubscribeLogsChannelSize: ctx.Int(utils.WSSubscribeLogsChannelSize.Name),
RpcBatchConcurrency: ctx.Uint(utils.RpcBatchConcurrencyFlag.Name),
RpcStreamingDisable: ctx.Bool(utils.RpcStreamingDisableFlag.Name),
DBReadConcurrency: ctx.Int(utils.DBReadConcurrencyFlag.Name),
RpcAllowListFilePath: ctx.String(utils.RpcAccessListFlag.Name),
Gascap: ctx.Uint64(utils.RpcGasCapFlag.Name),
MaxTraces: ctx.Uint64(utils.TraceMaxtracesFlag.Name),
TraceCompatibility: ctx.Bool(utils.RpcTraceCompatFlag.Name),
BatchLimit: ctx.Int(utils.RpcBatchLimit.Name),
ReturnDataLimit: ctx.Int(utils.RpcReturnDataLimit.Name),
AllowUnprotectedTxs: ctx.Bool(utils.AllowUnprotectedTxs.Name),
MaxGetProofRewindBlockCount: ctx.Int(utils.RpcMaxGetProofRewindBlockCount.Name),

OtsMaxPageSize: ctx.Uint64(utils.OtsSearchMaxCapFlag.Name),

Expand Down
2 changes: 1 addition & 1 deletion turbo/engineapi/engine_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (e *EngineServer) Start(
) {
base := jsonrpc.NewBaseApi(filters, stateCache, blockReader, agg, httpConfig.WithDatadir, httpConfig.EvmCallTimeout, engineReader, httpConfig.Dirs)

ethImpl := jsonrpc.NewEthAPI(base, db, eth, txPool, mining, httpConfig.Gascap, httpConfig.ReturnDataLimit, httpConfig.AllowUnprotectedTxs, httpConfig.MaxGetProofRewindBlockCount, e.logger)
ethImpl := jsonrpc.NewEthAPI(base, db, eth, txPool, mining, httpConfig.Gascap, httpConfig.ReturnDataLimit, httpConfig.AllowUnprotectedTxs, httpConfig.MaxGetProofRewindBlockCount, httpConfig.WebsocketSubscribeLogsChannelSize, e.logger)

// engineImpl := NewEngineAPI(base, db, engineBackend)
// e.startEngineMessageHandler()
Expand Down
2 changes: 1 addition & 1 deletion turbo/jsonrpc/corner_cases_support_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestNotFoundMustReturnNil(t *testing.T) {
require := require.New(t)
m, _, _ := rpcdaemontest.CreateTestSentry(t)
api := NewEthAPI(newBaseApiForTest(m),
m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, log.New())
m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, 128, log.New())
ctx := context.Background()

a, err := api.GetTransactionByBlockNumberAndIndex(ctx, 10_000, 1)
Expand Down
2 changes: 1 addition & 1 deletion turbo/jsonrpc/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func APIList(db kv.RoDB, eth rpchelper.ApiBackend, txPool txpool.TxpoolClient, m
logger log.Logger,
) (list []rpc.API) {
base := NewBaseApi(filters, stateCache, blockReader, agg, cfg.WithDatadir, cfg.EvmCallTimeout, engine, cfg.Dirs)
ethImpl := NewEthAPI(base, db, eth, txPool, mining, cfg.Gascap, cfg.ReturnDataLimit, cfg.AllowUnprotectedTxs, cfg.MaxGetProofRewindBlockCount, logger)
ethImpl := NewEthAPI(base, db, eth, txPool, mining, cfg.Gascap, cfg.ReturnDataLimit, cfg.AllowUnprotectedTxs, cfg.MaxGetProofRewindBlockCount, cfg.WebsocketSubscribeLogsChannelSize, logger)
erigonImpl := NewErigonAPI(base, db, eth)
txpoolImpl := NewTxPoolAPI(base, db, txPool)
netImpl := NewNetAPIImpl(eth)
Expand Down
4 changes: 2 additions & 2 deletions turbo/jsonrpc/debug_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestTraceBlockByNumber(t *testing.T) {
agg := m.HistoryV3Components()
stateCache := kvcache.New(kvcache.DefaultCoherentConfig)
baseApi := NewBaseApi(nil, stateCache, m.BlockReader, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine, m.Dirs)
ethApi := NewEthAPI(baseApi, m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, log.New())
ethApi := NewEthAPI(baseApi, m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, 128, log.New())
api := NewPrivateDebugAPI(baseApi, m.DB, 0)
for _, tt := range debugTraceTransactionTests {
var buf bytes.Buffer
Expand Down Expand Up @@ -97,7 +97,7 @@ func TestTraceBlockByNumber(t *testing.T) {

func TestTraceBlockByHash(t *testing.T) {
m, _, _ := rpcdaemontest.CreateTestSentry(t)
ethApi := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, log.New())
ethApi := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, 128, log.New())
api := NewPrivateDebugAPI(newBaseApiForTest(m), m.DB, 0)
for _, tt := range debugTraceTransactionTests {
var buf bytes.Buffer
Expand Down
2 changes: 1 addition & 1 deletion turbo/jsonrpc/erigon_receipts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestGetLogs(t *testing.T) {
assert := assert.New(t)
m, _, _ := rpcdaemontest.CreateTestSentry(t)
{
ethApi := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, log.New())
ethApi := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, 128, log.New())

logs, err := ethApi.GetLogs(context.Background(), filters.FilterCriteria{FromBlock: big.NewInt(0), ToBlock: big.NewInt(10)})
assert.NoError(err)
Expand Down
4 changes: 3 additions & 1 deletion turbo/jsonrpc/eth_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,11 +328,12 @@ type APIImpl struct {
ReturnDataLimit int
AllowUnprotectedTxs bool
MaxGetProofRewindBlockCount int
SubscribeLogsChannelSize int
logger log.Logger
}

// NewEthAPI returns APIImpl instance
func NewEthAPI(base *BaseAPI, db kv.RoDB, eth rpchelper.ApiBackend, txPool txpool.TxpoolClient, mining txpool.MiningClient, gascap uint64, returnDataLimit int, allowUnprotectedTxs bool, maxGetProofRewindBlockCount int, logger log.Logger) *APIImpl {
func NewEthAPI(base *BaseAPI, db kv.RoDB, eth rpchelper.ApiBackend, txPool txpool.TxpoolClient, mining txpool.MiningClient, gascap uint64, returnDataLimit int, allowUnprotectedTxs bool, maxGetProofRewindBlockCount int, subscribeLogsChannelSize int, logger log.Logger) *APIImpl {
if gascap == 0 {
gascap = uint64(math.MaxUint64 / 2)
}
Expand All @@ -348,6 +349,7 @@ func NewEthAPI(base *BaseAPI, db kv.RoDB, eth rpchelper.ApiBackend, txPool txpoo
AllowUnprotectedTxs: allowUnprotectedTxs,
ReturnDataLimit: returnDataLimit,
MaxGetProofRewindBlockCount: maxGetProofRewindBlockCount,
SubscribeLogsChannelSize: subscribeLogsChannelSize,
logger: logger,
}
}
Expand Down
22 changes: 11 additions & 11 deletions turbo/jsonrpc/eth_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestGetTransactionReceipt(t *testing.T) {
db := m.DB
agg := m.HistoryV3Components()
stateCache := kvcache.New(kvcache.DefaultCoherentConfig)
api := NewEthAPI(NewBaseApi(nil, stateCache, m.BlockReader, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine, m.Dirs), db, nil, nil, nil, 5000000, 100_000, false, 100_000, log.New())
api := NewEthAPI(NewBaseApi(nil, stateCache, m.BlockReader, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine, m.Dirs), db, nil, nil, nil, 5000000, 100_000, false, 100_000, 128, log.New())
// Call GetTransactionReceipt for transaction which is not in the database
if _, err := api.GetTransactionReceipt(context.Background(), common.Hash{}); err != nil {
t.Errorf("calling GetTransactionReceipt with empty hash: %v", err)
Expand All @@ -64,7 +64,7 @@ func TestGetTransactionReceipt(t *testing.T) {

func TestGetTransactionReceiptUnprotected(t *testing.T) {
m, _, _ := rpcdaemontest.CreateTestSentry(t)
api := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, log.New())
api := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, 128, log.New())
// Call GetTransactionReceipt for un-protected transaction
if _, err := api.GetTransactionReceipt(context.Background(), common.HexToHash("0x3f3cb8a0e13ed2481f97f53f7095b9cbc78b6ffb779f2d3e565146371a8830ea")); err != nil {
t.Errorf("calling GetTransactionReceipt for unprotected tx: %v", err)
Expand All @@ -76,7 +76,7 @@ func TestGetTransactionReceiptUnprotected(t *testing.T) {
func TestGetStorageAt_ByBlockNumber_WithRequireCanonicalDefault(t *testing.T) {
assert := assert.New(t)
m, _, _ := rpcdaemontest.CreateTestSentry(t)
api := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, log.New())
api := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, 128, log.New())
addr := common.HexToAddress("0x71562b71999873db5b286df957af199ec94617f7")

result, err := api.GetStorageAt(context.Background(), addr, "0x0", rpc.BlockNumberOrHashWithNumber(0))
Expand All @@ -90,7 +90,7 @@ func TestGetStorageAt_ByBlockNumber_WithRequireCanonicalDefault(t *testing.T) {
func TestGetStorageAt_ByBlockHash_WithRequireCanonicalDefault(t *testing.T) {
assert := assert.New(t)
m, _, _ := rpcdaemontest.CreateTestSentry(t)
api := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, log.New())
api := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, 128, log.New())
addr := common.HexToAddress("0x71562b71999873db5b286df957af199ec94617f7")

result, err := api.GetStorageAt(context.Background(), addr, "0x0", rpc.BlockNumberOrHashWithHash(m.Genesis.Hash(), false))
Expand All @@ -104,7 +104,7 @@ func TestGetStorageAt_ByBlockHash_WithRequireCanonicalDefault(t *testing.T) {
func TestGetStorageAt_ByBlockHash_WithRequireCanonicalTrue(t *testing.T) {
assert := assert.New(t)
m, _, _ := rpcdaemontest.CreateTestSentry(t)
api := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, log.New())
api := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, 128, log.New())
addr := common.HexToAddress("0x71562b71999873db5b286df957af199ec94617f7")

result, err := api.GetStorageAt(context.Background(), addr, "0x0", rpc.BlockNumberOrHashWithHash(m.Genesis.Hash(), true))
Expand All @@ -117,7 +117,7 @@ func TestGetStorageAt_ByBlockHash_WithRequireCanonicalTrue(t *testing.T) {

func TestGetStorageAt_ByBlockHash_WithRequireCanonicalDefault_BlockNotFoundError(t *testing.T) {
m, _, _ := rpcdaemontest.CreateTestSentry(t)
api := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, log.New())
api := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, 128, log.New())
addr := common.HexToAddress("0x71562b71999873db5b286df957af199ec94617f7")

offChain, err := core.GenerateChain(m.ChainConfig, m.Genesis, m.Engine, m.DB, 1, func(i int, block *core.BlockGen) {
Expand All @@ -138,7 +138,7 @@ func TestGetStorageAt_ByBlockHash_WithRequireCanonicalDefault_BlockNotFoundError

func TestGetStorageAt_ByBlockHash_WithRequireCanonicalTrue_BlockNotFoundError(t *testing.T) {
m, _, _ := rpcdaemontest.CreateTestSentry(t)
api := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, log.New())
api := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, 128, log.New())
addr := common.HexToAddress("0x71562b71999873db5b286df957af199ec94617f7")

offChain, err := core.GenerateChain(m.ChainConfig, m.Genesis, m.Engine, m.DB, 1, func(i int, block *core.BlockGen) {
Expand All @@ -160,7 +160,7 @@ func TestGetStorageAt_ByBlockHash_WithRequireCanonicalTrue_BlockNotFoundError(t
func TestGetStorageAt_ByBlockHash_WithRequireCanonicalDefault_NonCanonicalBlock(t *testing.T) {
assert := assert.New(t)
m, _, orphanedChain := rpcdaemontest.CreateTestSentry(t)
api := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, log.New())
api := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, 128, log.New())
addr := common.HexToAddress("0x71562b71999873db5b286df957af199ec94617f7")

orphanedBlock := orphanedChain[0].Blocks[0]
Expand All @@ -179,7 +179,7 @@ func TestGetStorageAt_ByBlockHash_WithRequireCanonicalDefault_NonCanonicalBlock(

func TestGetStorageAt_ByBlockHash_WithRequireCanonicalTrue_NonCanonicalBlock(t *testing.T) {
m, _, orphanedChain := rpcdaemontest.CreateTestSentry(t)
api := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, log.New())
api := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, 128, log.New())
addr := common.HexToAddress("0x71562b71999873db5b286df957af199ec94617f7")

orphanedBlock := orphanedChain[0].Blocks[0]
Expand All @@ -195,7 +195,7 @@ func TestGetStorageAt_ByBlockHash_WithRequireCanonicalTrue_NonCanonicalBlock(t *

func TestCall_ByBlockHash_WithRequireCanonicalDefault_NonCanonicalBlock(t *testing.T) {
m, _, orphanedChain := rpcdaemontest.CreateTestSentry(t)
api := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, log.New())
api := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, 128, log.New())
from := common.HexToAddress("0x71562b71999873db5b286df957af199ec94617f7")
to := common.HexToAddress("0x0d3ab14bbad3d99f4203bd7a11acb94882050e7e")

Expand All @@ -218,7 +218,7 @@ func TestCall_ByBlockHash_WithRequireCanonicalDefault_NonCanonicalBlock(t *testi

func TestCall_ByBlockHash_WithRequireCanonicalTrue_NonCanonicalBlock(t *testing.T) {
m, _, orphanedChain := rpcdaemontest.CreateTestSentry(t)
api := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, log.New())
api := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, 128, log.New())
from := common.HexToAddress("0x71562b71999873db5b286df957af199ec94617f7")
to := common.HexToAddress("0x0d3ab14bbad3d99f4203bd7a11acb94882050e7e")

Expand Down
Loading
Loading