Skip to content

Commit

Permalink
Make logs subscription channel size configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
adytzu2007 committed Mar 26, 2024
1 parent 010b230 commit 187827a
Show file tree
Hide file tree
Showing 19 changed files with 79 additions and 69 deletions.
1 change: 1 addition & 0 deletions cmd/rpcdaemon/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func RootCommand() (*cobra.Command, *httpcfg.HttpCfg) {
rootCmd.PersistentFlags().IntVar(&cfg.MaxGetProofRewindBlockCount, utils.RpcMaxGetProofRewindBlockCount.Name, utils.RpcMaxGetProofRewindBlockCount.Value, utils.RpcMaxGetProofRewindBlockCount.Usage)
rootCmd.PersistentFlags().Uint64Var(&cfg.OtsMaxPageSize, utils.OtsSearchMaxCapFlag.Name, utils.OtsSearchMaxCapFlag.Value, utils.OtsSearchMaxCapFlag.Usage)
rootCmd.PersistentFlags().DurationVar(&cfg.RPCSlowLogThreshold, utils.RPCSlowFlag.Name, utils.RPCSlowFlag.Value, utils.RPCSlowFlag.Usage)
rootCmd.PersistentFlags().IntVar(&cfg.WebsocketSubscribeLogsChannelSize, utils.WSSubscribeLogsChannelSize.Name, utils.WSSubscribeLogsChannelSize.Value, utils.WSSubscribeLogsChannelSize.Usage)

if err := rootCmd.MarkPersistentFlagFilename("rpc.accessList", "json"); err != nil {
panic(err)
Expand Down
31 changes: 16 additions & 15 deletions cmd/rpcdaemon/cli/httpcfg/http_cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,22 @@ type HttpCfg struct {
AuthRpcPort int
PrivateApiAddr string

API []string
Gascap uint64
MaxTraces uint64
WebsocketPort int
WebsocketEnabled bool
WebsocketCompression bool
RpcAllowListFilePath string
RpcBatchConcurrency uint
RpcStreamingDisable bool
DBReadConcurrency int
TraceCompatibility bool // Bug for bug compatibility for trace_ routines with OpenEthereum
TxPoolApiAddr string
StateCache kvcache.CoherentConfig
Snap ethconfig.BlocksFreezing
Sync ethconfig.Sync
API []string
Gascap uint64
MaxTraces uint64
WebsocketPort int
WebsocketEnabled bool
WebsocketCompression bool
WebsocketSubscribeLogsChannelSize int
RpcAllowListFilePath string
RpcBatchConcurrency uint
RpcStreamingDisable bool
DBReadConcurrency int
TraceCompatibility bool // Bug for bug compatibility for trace_ routines with OpenEthereum
TxPoolApiAddr string
StateCache kvcache.CoherentConfig
Snap ethconfig.BlocksFreezing
Sync ethconfig.Sync

// GRPC server
GRPCServerEnabled bool
Expand Down
5 changes: 5 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,11 @@ var (
Usage: "HTTP path prefix on which JSON-RPC is served. Use '/' to serve on all paths.",
Value: "",
}
WSSubscribeLogsChannelSize = cli.IntFlag{
Name: "ws.api.subscribelogs.channelsize",
Usage: "Size of the channel used for websocket logs subscriptions",
Value: 128,
}
ExecFlag = cli.StringFlag{
Name: "exec",
Usage: "Execute JavaScript statement",
Expand Down
29 changes: 15 additions & 14 deletions turbo/cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,20 +464,21 @@ func setEmbeddedRpcDaemon(ctx *cli.Context, cfg *nodecfg.Config, logger log.Logg
WriteTimeout: ctx.Duration(AuthRpcWriteTimeoutFlag.Name),
IdleTimeout: ctx.Duration(HTTPIdleTimeoutFlag.Name),
},
EvmCallTimeout: ctx.Duration(EvmCallTimeoutFlag.Name),
WebsocketPort: ctx.Int(utils.WSPortFlag.Name),
WebsocketEnabled: ctx.IsSet(utils.WSEnabledFlag.Name),
RpcBatchConcurrency: ctx.Uint(utils.RpcBatchConcurrencyFlag.Name),
RpcStreamingDisable: ctx.Bool(utils.RpcStreamingDisableFlag.Name),
DBReadConcurrency: ctx.Int(utils.DBReadConcurrencyFlag.Name),
RpcAllowListFilePath: ctx.String(utils.RpcAccessListFlag.Name),
Gascap: ctx.Uint64(utils.RpcGasCapFlag.Name),
MaxTraces: ctx.Uint64(utils.TraceMaxtracesFlag.Name),
TraceCompatibility: ctx.Bool(utils.RpcTraceCompatFlag.Name),
BatchLimit: ctx.Int(utils.RpcBatchLimit.Name),
ReturnDataLimit: ctx.Int(utils.RpcReturnDataLimit.Name),
AllowUnprotectedTxs: ctx.Bool(utils.AllowUnprotectedTxs.Name),
MaxGetProofRewindBlockCount: ctx.Int(utils.RpcMaxGetProofRewindBlockCount.Name),
EvmCallTimeout: ctx.Duration(EvmCallTimeoutFlag.Name),
WebsocketPort: ctx.Int(utils.WSPortFlag.Name),
WebsocketEnabled: ctx.IsSet(utils.WSEnabledFlag.Name),
WebsocketSubscribeLogsChannelSize: ctx.Int(utils.WSSubscribeLogsChannelSize.Name),
RpcBatchConcurrency: ctx.Uint(utils.RpcBatchConcurrencyFlag.Name),
RpcStreamingDisable: ctx.Bool(utils.RpcStreamingDisableFlag.Name),
DBReadConcurrency: ctx.Int(utils.DBReadConcurrencyFlag.Name),
RpcAllowListFilePath: ctx.String(utils.RpcAccessListFlag.Name),
Gascap: ctx.Uint64(utils.RpcGasCapFlag.Name),
MaxTraces: ctx.Uint64(utils.TraceMaxtracesFlag.Name),
TraceCompatibility: ctx.Bool(utils.RpcTraceCompatFlag.Name),
BatchLimit: ctx.Int(utils.RpcBatchLimit.Name),
ReturnDataLimit: ctx.Int(utils.RpcReturnDataLimit.Name),
AllowUnprotectedTxs: ctx.Bool(utils.AllowUnprotectedTxs.Name),
MaxGetProofRewindBlockCount: ctx.Int(utils.RpcMaxGetProofRewindBlockCount.Name),

OtsMaxPageSize: ctx.Uint64(utils.OtsSearchMaxCapFlag.Name),

Expand Down
2 changes: 1 addition & 1 deletion turbo/engineapi/engine_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (e *EngineServer) Start(
) {
base := jsonrpc.NewBaseApi(filters, stateCache, blockReader, agg, httpConfig.WithDatadir, httpConfig.EvmCallTimeout, engineReader, httpConfig.Dirs)

ethImpl := jsonrpc.NewEthAPI(base, db, eth, txPool, mining, httpConfig.Gascap, httpConfig.ReturnDataLimit, httpConfig.AllowUnprotectedTxs, httpConfig.MaxGetProofRewindBlockCount, e.logger)
ethImpl := jsonrpc.NewEthAPI(base, db, eth, txPool, mining, httpConfig.Gascap, httpConfig.ReturnDataLimit, httpConfig.AllowUnprotectedTxs, httpConfig.MaxGetProofRewindBlockCount, httpConfig.WebsocketSubscribeLogsChannelSize, e.logger)

// engineImpl := NewEngineAPI(base, db, engineBackend)
// e.startEngineMessageHandler()
Expand Down
2 changes: 1 addition & 1 deletion turbo/jsonrpc/corner_cases_support_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestNotFoundMustReturnNil(t *testing.T) {
require := require.New(t)
m, _, _ := rpcdaemontest.CreateTestSentry(t)
api := NewEthAPI(newBaseApiForTest(m),
m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, log.New())
m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, 128, log.New())
ctx := context.Background()

a, err := api.GetTransactionByBlockNumberAndIndex(ctx, 10_000, 1)
Expand Down
2 changes: 1 addition & 1 deletion turbo/jsonrpc/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func APIList(db kv.RoDB, eth rpchelper.ApiBackend, txPool txpool.TxpoolClient, m
logger log.Logger,
) (list []rpc.API) {
base := NewBaseApi(filters, stateCache, blockReader, agg, cfg.WithDatadir, cfg.EvmCallTimeout, engine, cfg.Dirs)
ethImpl := NewEthAPI(base, db, eth, txPool, mining, cfg.Gascap, cfg.ReturnDataLimit, cfg.AllowUnprotectedTxs, cfg.MaxGetProofRewindBlockCount, logger)
ethImpl := NewEthAPI(base, db, eth, txPool, mining, cfg.Gascap, cfg.ReturnDataLimit, cfg.AllowUnprotectedTxs, cfg.MaxGetProofRewindBlockCount, cfg.WebsocketSubscribeLogsChannelSize, logger)
erigonImpl := NewErigonAPI(base, db, eth)
txpoolImpl := NewTxPoolAPI(base, db, txPool)
netImpl := NewNetAPIImpl(eth)
Expand Down
4 changes: 2 additions & 2 deletions turbo/jsonrpc/debug_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestTraceBlockByNumber(t *testing.T) {
agg := m.HistoryV3Components()
stateCache := kvcache.New(kvcache.DefaultCoherentConfig)
baseApi := NewBaseApi(nil, stateCache, m.BlockReader, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine, m.Dirs)
ethApi := NewEthAPI(baseApi, m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, log.New())
ethApi := NewEthAPI(baseApi, m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, 128, log.New())
api := NewPrivateDebugAPI(baseApi, m.DB, 0)
for _, tt := range debugTraceTransactionTests {
var buf bytes.Buffer
Expand Down Expand Up @@ -97,7 +97,7 @@ func TestTraceBlockByNumber(t *testing.T) {

func TestTraceBlockByHash(t *testing.T) {
m, _, _ := rpcdaemontest.CreateTestSentry(t)
ethApi := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, log.New())
ethApi := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, 128, log.New())
api := NewPrivateDebugAPI(newBaseApiForTest(m), m.DB, 0)
for _, tt := range debugTraceTransactionTests {
var buf bytes.Buffer
Expand Down
2 changes: 1 addition & 1 deletion turbo/jsonrpc/erigon_receipts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestGetLogs(t *testing.T) {
assert := assert.New(t)
m, _, _ := rpcdaemontest.CreateTestSentry(t)
{
ethApi := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, log.New())
ethApi := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, 128, log.New())

logs, err := ethApi.GetLogs(context.Background(), filters.FilterCriteria{FromBlock: big.NewInt(0), ToBlock: big.NewInt(10)})
assert.NoError(err)
Expand Down
4 changes: 3 additions & 1 deletion turbo/jsonrpc/eth_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,11 +328,12 @@ type APIImpl struct {
ReturnDataLimit int
AllowUnprotectedTxs bool
MaxGetProofRewindBlockCount int
SubscribeLogsChannelSize int
logger log.Logger
}

// NewEthAPI returns APIImpl instance
func NewEthAPI(base *BaseAPI, db kv.RoDB, eth rpchelper.ApiBackend, txPool txpool.TxpoolClient, mining txpool.MiningClient, gascap uint64, returnDataLimit int, allowUnprotectedTxs bool, maxGetProofRewindBlockCount int, logger log.Logger) *APIImpl {
func NewEthAPI(base *BaseAPI, db kv.RoDB, eth rpchelper.ApiBackend, txPool txpool.TxpoolClient, mining txpool.MiningClient, gascap uint64, returnDataLimit int, allowUnprotectedTxs bool, maxGetProofRewindBlockCount int, subscribeLogsChannelSize int, logger log.Logger) *APIImpl {
if gascap == 0 {
gascap = uint64(math.MaxUint64 / 2)
}
Expand All @@ -348,6 +349,7 @@ func NewEthAPI(base *BaseAPI, db kv.RoDB, eth rpchelper.ApiBackend, txPool txpoo
AllowUnprotectedTxs: allowUnprotectedTxs,
ReturnDataLimit: returnDataLimit,
MaxGetProofRewindBlockCount: maxGetProofRewindBlockCount,
SubscribeLogsChannelSize: subscribeLogsChannelSize,
logger: logger,
}
}
Expand Down
22 changes: 11 additions & 11 deletions turbo/jsonrpc/eth_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestGetTransactionReceipt(t *testing.T) {
db := m.DB
agg := m.HistoryV3Components()
stateCache := kvcache.New(kvcache.DefaultCoherentConfig)
api := NewEthAPI(NewBaseApi(nil, stateCache, m.BlockReader, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine, m.Dirs), db, nil, nil, nil, 5000000, 100_000, false, 100_000, log.New())
api := NewEthAPI(NewBaseApi(nil, stateCache, m.BlockReader, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine, m.Dirs), db, nil, nil, nil, 5000000, 100_000, false, 100_000, 128, log.New())
// Call GetTransactionReceipt for transaction which is not in the database
if _, err := api.GetTransactionReceipt(context.Background(), common.Hash{}); err != nil {
t.Errorf("calling GetTransactionReceipt with empty hash: %v", err)
Expand All @@ -64,7 +64,7 @@ func TestGetTransactionReceipt(t *testing.T) {

func TestGetTransactionReceiptUnprotected(t *testing.T) {
m, _, _ := rpcdaemontest.CreateTestSentry(t)
api := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, log.New())
api := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, 128, log.New())
// Call GetTransactionReceipt for un-protected transaction
if _, err := api.GetTransactionReceipt(context.Background(), common.HexToHash("0x3f3cb8a0e13ed2481f97f53f7095b9cbc78b6ffb779f2d3e565146371a8830ea")); err != nil {
t.Errorf("calling GetTransactionReceipt for unprotected tx: %v", err)
Expand All @@ -76,7 +76,7 @@ func TestGetTransactionReceiptUnprotected(t *testing.T) {
func TestGetStorageAt_ByBlockNumber_WithRequireCanonicalDefault(t *testing.T) {
assert := assert.New(t)
m, _, _ := rpcdaemontest.CreateTestSentry(t)
api := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, log.New())
api := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, 128, log.New())
addr := common.HexToAddress("0x71562b71999873db5b286df957af199ec94617f7")

result, err := api.GetStorageAt(context.Background(), addr, "0x0", rpc.BlockNumberOrHashWithNumber(0))
Expand All @@ -90,7 +90,7 @@ func TestGetStorageAt_ByBlockNumber_WithRequireCanonicalDefault(t *testing.T) {
func TestGetStorageAt_ByBlockHash_WithRequireCanonicalDefault(t *testing.T) {
assert := assert.New(t)
m, _, _ := rpcdaemontest.CreateTestSentry(t)
api := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, log.New())
api := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, 128, log.New())
addr := common.HexToAddress("0x71562b71999873db5b286df957af199ec94617f7")

result, err := api.GetStorageAt(context.Background(), addr, "0x0", rpc.BlockNumberOrHashWithHash(m.Genesis.Hash(), false))
Expand All @@ -104,7 +104,7 @@ func TestGetStorageAt_ByBlockHash_WithRequireCanonicalDefault(t *testing.T) {
func TestGetStorageAt_ByBlockHash_WithRequireCanonicalTrue(t *testing.T) {
assert := assert.New(t)
m, _, _ := rpcdaemontest.CreateTestSentry(t)
api := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, log.New())
api := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, 128, log.New())
addr := common.HexToAddress("0x71562b71999873db5b286df957af199ec94617f7")

result, err := api.GetStorageAt(context.Background(), addr, "0x0", rpc.BlockNumberOrHashWithHash(m.Genesis.Hash(), true))
Expand All @@ -117,7 +117,7 @@ func TestGetStorageAt_ByBlockHash_WithRequireCanonicalTrue(t *testing.T) {

func TestGetStorageAt_ByBlockHash_WithRequireCanonicalDefault_BlockNotFoundError(t *testing.T) {
m, _, _ := rpcdaemontest.CreateTestSentry(t)
api := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, log.New())
api := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, 128, log.New())
addr := common.HexToAddress("0x71562b71999873db5b286df957af199ec94617f7")

offChain, err := core.GenerateChain(m.ChainConfig, m.Genesis, m.Engine, m.DB, 1, func(i int, block *core.BlockGen) {
Expand All @@ -138,7 +138,7 @@ func TestGetStorageAt_ByBlockHash_WithRequireCanonicalDefault_BlockNotFoundError

func TestGetStorageAt_ByBlockHash_WithRequireCanonicalTrue_BlockNotFoundError(t *testing.T) {
m, _, _ := rpcdaemontest.CreateTestSentry(t)
api := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, log.New())
api := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, 128, log.New())
addr := common.HexToAddress("0x71562b71999873db5b286df957af199ec94617f7")

offChain, err := core.GenerateChain(m.ChainConfig, m.Genesis, m.Engine, m.DB, 1, func(i int, block *core.BlockGen) {
Expand All @@ -160,7 +160,7 @@ func TestGetStorageAt_ByBlockHash_WithRequireCanonicalTrue_BlockNotFoundError(t
func TestGetStorageAt_ByBlockHash_WithRequireCanonicalDefault_NonCanonicalBlock(t *testing.T) {
assert := assert.New(t)
m, _, orphanedChain := rpcdaemontest.CreateTestSentry(t)
api := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, log.New())
api := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, 128, log.New())
addr := common.HexToAddress("0x71562b71999873db5b286df957af199ec94617f7")

orphanedBlock := orphanedChain[0].Blocks[0]
Expand All @@ -179,7 +179,7 @@ func TestGetStorageAt_ByBlockHash_WithRequireCanonicalDefault_NonCanonicalBlock(

func TestGetStorageAt_ByBlockHash_WithRequireCanonicalTrue_NonCanonicalBlock(t *testing.T) {
m, _, orphanedChain := rpcdaemontest.CreateTestSentry(t)
api := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, log.New())
api := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, 128, log.New())
addr := common.HexToAddress("0x71562b71999873db5b286df957af199ec94617f7")

orphanedBlock := orphanedChain[0].Blocks[0]
Expand All @@ -195,7 +195,7 @@ func TestGetStorageAt_ByBlockHash_WithRequireCanonicalTrue_NonCanonicalBlock(t *

func TestCall_ByBlockHash_WithRequireCanonicalDefault_NonCanonicalBlock(t *testing.T) {
m, _, orphanedChain := rpcdaemontest.CreateTestSentry(t)
api := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, log.New())
api := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, 128, log.New())
from := common.HexToAddress("0x71562b71999873db5b286df957af199ec94617f7")
to := common.HexToAddress("0x0d3ab14bbad3d99f4203bd7a11acb94882050e7e")

Expand All @@ -218,7 +218,7 @@ func TestCall_ByBlockHash_WithRequireCanonicalDefault_NonCanonicalBlock(t *testi

func TestCall_ByBlockHash_WithRequireCanonicalTrue_NonCanonicalBlock(t *testing.T) {
m, _, orphanedChain := rpcdaemontest.CreateTestSentry(t)
api := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, log.New())
api := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, 128, log.New())
from := common.HexToAddress("0x71562b71999873db5b286df957af199ec94617f7")
to := common.HexToAddress("0x0d3ab14bbad3d99f4203bd7a11acb94882050e7e")

Expand Down
Loading

0 comments on commit 187827a

Please sign in to comment.