From 71eadd09caf9c6894c38720f6b98902779e777ea Mon Sep 17 00:00:00 2001 From: Evgeny Danilenko <6655321@bk.ru> Date: Mon, 20 Mar 2023 12:43:58 +0400 Subject: [PATCH] interrupting commit experiment (#556) * initial * delete * linters * big benchmark * benchmark big ints * delay * fix generate * remove debug * miner : chg : remove noempty check * fix lints * consensus/bor: handle unauthorized signer in consensus.Prepare (#651) * fix : break loop fix * lint : fix * lint : more lint fix * fix : skip TestEmptyWorkEthash and TestEmptyWorkClique * add : metrics commitInterruptCounter --------- Co-authored-by: Shivam Sharma Co-authored-by: Arpit Temani Co-authored-by: Manav Darji --- consensus/bor/bor.go | 7 ++++ core/blockchain.go | 4 +++ core/rawdb/bor_receipt.go | 1 + core/tx_pool_test.go | 46 ++++++++++++++++++++++++++ core/vm/contracts.go | 10 ++++-- eth/filters/bor_api.go | 1 + go.mod | 1 + go.sum | 2 ++ miner/test_backend.go | 3 ++ miner/worker.go | 68 +++++++++++++++++++++++++++++++++++---- miner/worker_test.go | 2 ++ rpc/handler.go | 17 +++++----- rpc/inproc.go | 1 + rpc/ipc.go | 1 + rpc/server.go | 2 ++ 15 files changed, 148 insertions(+), 18 deletions(-) diff --git a/consensus/bor/bor.go b/consensus/bor/bor.go index a920a1992dcd..39dcdaea61bd 100644 --- a/consensus/bor/bor.go +++ b/consensus/bor/bor.go @@ -692,6 +692,13 @@ func (c *Bor) Prepare(chain consensus.ChainHeaderReader, header *types.Header) e currentSigner := *c.authorizedSigner.Load() + // Bail out early if we're unauthorized to sign a block. This check also takes + // place before block is signed in `Seal`. + if !snap.ValidatorSet.HasAddress(currentSigner.signer) { + // Check the UnauthorizedSignerError.Error() msg to see why we pass number-1 + return &UnauthorizedSignerError{number - 1, currentSigner.signer.Bytes()} + } + // Set the correct difficulty header.Difficulty = new(big.Int).SetUint64(Difficulty(snap.ValidatorSet, currentSigner.signer)) diff --git a/core/blockchain.go b/core/blockchain.go index 74fd4bfeda4f..9f1a4f30a4de 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1363,14 +1363,17 @@ func (bc *BlockChain) WriteBlockAndSetHead(block *types.Block, receipts []*types // the chain mutex to be held. func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, err error) { var stateSyncLogs []*types.Log + if stateSyncLogs, err = bc.writeBlockWithState(block, receipts, logs, state); err != nil { return NonStatTy, err } + currentBlock := bc.CurrentBlock() reorg, err := bc.forker.ReorgNeeded(currentBlock.Header(), block.Header()) if err != nil { return NonStatTy, err } + if reorg { // Reorganise the chain if the parent is not the head block if block.ParentHash() != currentBlock.Hash() { @@ -1378,6 +1381,7 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types return NonStatTy, err } } + status = CanonStatTy } else { status = SideStatTy diff --git a/core/rawdb/bor_receipt.go b/core/rawdb/bor_receipt.go index e2250837415c..d061dedc9e9e 100644 --- a/core/rawdb/bor_receipt.go +++ b/core/rawdb/bor_receipt.go @@ -75,6 +75,7 @@ func ReadBorReceiptRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.Raw return data } } + return nil // Can't find the data anywhere. } diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index b7893f2f8b1c..5f4fc3ac4ff6 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -49,6 +49,8 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/trie" + + "github.com/JekaMas/crand" ) var ( @@ -1931,9 +1933,11 @@ func TestTransactionPoolUnderpricing(t *testing.T) { if queued != 2 { t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2) } + if err := validateEvents(events, 1); err != nil { t.Fatalf("additional event firing failed: %v", err) } + if err := validateTxPoolInternals(pool); err != nil { t.Fatalf("pool internal state corrupted: %v", err) } @@ -2097,6 +2101,7 @@ func TestTransactionPoolUnderpricingDynamicFee(t *testing.T) { if err := pool.AddRemote(tx); err != nil { // +K1:2, -K0:1 => Pend K0:0 K1:0, K2:0; Que K1:2 t.Fatalf("failed to add well priced transaction: %v", err) } + tx = dynamicFeeTx(3, 100000, big.NewInt(4), big.NewInt(1), keys[1]) if err := pool.AddRemote(tx); err != nil { // +K1:3, -K1:0 => Pend K0:0 K2:0; Que K1:2 K1:3 t.Fatalf("failed to add well priced transaction: %v", err) @@ -2108,9 +2113,11 @@ func TestTransactionPoolUnderpricingDynamicFee(t *testing.T) { if queued != 2 { t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2) } + if err := validateEvents(events, 1); err != nil { t.Fatalf("additional event firing failed: %v", err) } + if err := validateTxPoolInternals(pool); err != nil { t.Fatalf("pool internal state corrupted: %v", err) } @@ -3739,6 +3746,45 @@ func MakeWithPromoteTxCh(ch chan struct{}) func(*TxPool) { } } +func BenchmarkBigs(b *testing.B) { + // max 256-bit + max := new(big.Int) + max.Exp(big.NewInt(2), big.NewInt(256), nil).Sub(max, big.NewInt(1)) + + ints := make([]*big.Int, 1000000) + intUs := make([]*uint256.Int, 1000000) + + var over bool + + for i := 0; i < len(ints); i++ { + ints[i] = crand.BigInt(max) + intUs[i], over = uint256.FromBig(ints[i]) + + if over { + b.Fatal(ints[i], over) + } + } + + b.Run("*big.Int", func(b *testing.B) { + var r int + + for i := 0; i < b.N; i++ { + r = ints[i%len(ints)%b.N].Cmp(ints[(i+1)%len(ints)%b.N]) + } + + fmt.Fprintln(io.Discard, r) + }) + b.Run("*uint256.Int", func(b *testing.B) { + var r int + + for i := 0; i < b.N; i++ { + r = intUs[i%len(intUs)%b.N].Cmp(intUs[(i+1)%len(intUs)%b.N]) + } + + fmt.Fprintln(io.Discard, r) + }) +} + //nolint:thelper func mining(tb testing.TB, pool *TxPool, signer types.Signer, baseFee *uint256.Int, blockGasLimit uint64, totalBlocks int) (int, time.Duration, time.Duration) { var ( diff --git a/core/vm/contracts.go b/core/vm/contracts.go index 9210f5486c57..c5304790fa3a 100644 --- a/core/vm/contracts.go +++ b/core/vm/contracts.go @@ -263,12 +263,14 @@ var ( big199680 = big.NewInt(199680) ) +// nolint: gofmt // modexpMultComplexity implements bigModexp multComplexity formula, as defined in EIP-198 // // def mult_complexity(x): -// if x <= 64: return x ** 2 -// elif x <= 1024: return x ** 2 // 4 + 96 * x - 3072 -// else: return x ** 2 // 16 + 480 * x - 199680 +// +// if x <= 64: return x ** 2 +// elif x <= 1024: return x ** 2 // 4 + 96 * x - 3072 +// else: return x ** 2 // 16 + 480 * x - 199680 // // where is x is max(length_of_MODULUS, length_of_BASE) func modexpMultComplexity(x *big.Int) *big.Int { @@ -383,10 +385,12 @@ func (c *bigModExp) Run(input []byte) ([]byte, error) { exp = new(big.Int).SetBytes(getData(input, baseLen, expLen)) mod = new(big.Int).SetBytes(getData(input, baseLen+expLen, modLen)) ) + if mod.BitLen() == 0 { // Modulo 0 is undefined, return zero return common.LeftPadBytes([]byte{}, int(modLen)), nil } + return common.LeftPadBytes(base.Exp(base, exp, mod).Bytes(), int(modLen)), nil } diff --git a/eth/filters/bor_api.go b/eth/filters/bor_api.go index db13c9595965..12f18caf77ed 100644 --- a/eth/filters/bor_api.go +++ b/eth/filters/bor_api.go @@ -67,6 +67,7 @@ func (api *PublicFilterAPI) NewDeposits(ctx context.Context, crit ethereum.State for { select { case h := <-stateSyncData: + // nolint : gosimple if crit.ID == h.ID || bytes.Compare(crit.Contract.Bytes(), h.Contract.Bytes()) == 0 || (crit.ID == 0 && crit.Contract == common.Address{}) { notifier.Notify(rpcSub.ID, h) diff --git a/go.mod b/go.mod index 1e54474d6a38..d9a7963f59cd 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.19 require ( github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.3.0 github.com/BurntSushi/toml v1.1.0 + github.com/JekaMas/crand v1.0.1 github.com/JekaMas/go-grpc-net-conn v0.0.0-20220708155319-6aff21f2d13d github.com/VictoriaMetrics/fastcache v1.6.0 github.com/aws/aws-sdk-go-v2 v1.2.0 diff --git a/go.sum b/go.sum index 4a1e50cbe868..b3aac45f243a 100644 --- a/go.sum +++ b/go.sum @@ -48,6 +48,8 @@ github.com/BurntSushi/toml v1.1.0 h1:ksErzDEI1khOiGPgpwuI7x2ebx/uXQNw7xJpn9Eq1+I github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= +github.com/JekaMas/crand v1.0.1 h1:FMPxkUQqH/hExl0aUXsr0UCGYZ4lJH9IJ5H/KbM6Y9A= +github.com/JekaMas/crand v1.0.1/go.mod h1:GGzGpMCht/tbaNQ5A4kSiKSqEoNAhhyTfSDQyIENBQU= github.com/JekaMas/go-grpc-net-conn v0.0.0-20220708155319-6aff21f2d13d h1:RO27lgfZF8s9lZ3pWyzc0gCE0RZC+6/PXbRjAa0CNp8= github.com/JekaMas/go-grpc-net-conn v0.0.0-20220708155319-6aff21f2d13d/go.mod h1:romz7UPgSYhfJkKOalzEEyV6sWtt/eAEm0nX2aOrod0= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= diff --git a/miner/test_backend.go b/miner/test_backend.go index 29da747ae0a3..5eb8d932d100 100644 --- a/miner/test_backend.go +++ b/miner/test_backend.go @@ -178,5 +178,8 @@ func NewTestWorker(t TensingObject, chainConfig *params.ChainConfig, engine cons w.setEtherbase(TestBankAddress) + // enable empty blocks + w.noempty = 0 + return w, backend, w.close } diff --git a/miner/worker.go b/miner/worker.go index 39117cdb1e32..8404b208d2a8 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -95,6 +95,7 @@ const ( var ( sealedBlocksCounter = metrics.NewRegisteredCounter("worker/sealedBlocks", nil) sealedEmptyBlocksCounter = metrics.NewRegisteredCounter("worker/sealedEmptyBlocks", nil) + commitInterruptCounter = metrics.NewRegisteredCounter("worker/commitInterrupt", nil) ) // environment is the worker's current environment and holds all @@ -300,6 +301,7 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus startCh: make(chan struct{}, 1), resubmitIntervalCh: make(chan time.Duration), resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize), + noempty: 1, } worker.profileCount = new(int32) // Subscribe NewTxsEvent for tx pool @@ -652,13 +654,16 @@ func (w *worker) mainLoop(ctx context.Context) { txset := types.NewTransactionsByPriceAndNonce(w.current.signer, txs, cmath.FromBig(w.current.header.BaseFee)) tcount := w.current.tcount - w.commitTransactions(w.current, txset, nil) + interruptCh, stopFn := getInterruptTimer(ctx, w.current, w.chain.CurrentBlock()) + w.commitTransactions(w.current, txset, nil, interruptCh) // Only update the snapshot if any new transactions were added // to the pending block if tcount != w.current.tcount { w.updateSnapshot(w.current) } + + stopFn() } else { // Special case, if the consensus engine is 0 period clique(dev mode), // submit sealing work here since all empty submission will be rejected @@ -940,7 +945,8 @@ func (w *worker) commitTransaction(env *environment, tx *types.Transaction) ([]* return receipt.Logs, nil } -func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *int32) bool { +//nolint:gocognit +func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *int32, interruptCh chan struct{}) bool { gasLimit := env.header.GasLimit if env.gasPool == nil { env.gasPool = new(core.GasPool).AddGas(gasLimit) @@ -963,7 +969,16 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP }) }() +mainloop: for { + // case of interrupting by timeout + select { + case <-interruptCh: + commitInterruptCounter.Inc(1) + break mainloop + default: + } + // In the following three cases, we will interrupt the execution of the transaction. // (1) new head block event arrival, the interrupt signal is 1 // (2) worker start or restart, the interrupt signal is 1 @@ -1252,7 +1267,7 @@ func startProfiler(profile string, filepath string, number uint64) (func() error // be customized with the plugin in the future. // //nolint:gocognit -func (w *worker) fillTransactions(ctx context.Context, interrupt *int32, env *environment) { +func (w *worker) fillTransactions(ctx context.Context, interrupt *int32, env *environment, interruptCh chan struct{}) { ctx, span := tracing.StartSpan(ctx, "fillTransactions") defer tracing.EndSpan(span) @@ -1376,7 +1391,7 @@ func (w *worker) fillTransactions(ctx context.Context, interrupt *int32, env *en }) tracing.Exec(ctx, "", "worker.LocalCommitTransactions", func(ctx context.Context, span trace.Span) { - committed = w.commitTransactions(env, txs, interrupt) + committed = w.commitTransactions(env, txs, interrupt, interruptCh) }) if committed { @@ -1399,7 +1414,7 @@ func (w *worker) fillTransactions(ctx context.Context, interrupt *int32, env *en }) tracing.Exec(ctx, "", "worker.RemoteCommitTransactions", func(ctx context.Context, span trace.Span) { - committed = w.commitTransactions(env, txs, interrupt) + committed = w.commitTransactions(env, txs, interrupt, interruptCh) }) if committed { @@ -1424,7 +1439,10 @@ func (w *worker) generateWork(ctx context.Context, params *generateParams) (*typ } defer work.discard() - w.fillTransactions(ctx, nil, work) + interruptCh, stopFn := getInterruptTimer(ctx, work, w.chain.CurrentBlock()) + defer stopFn() + + w.fillTransactions(ctx, nil, work, interruptCh) return w.engine.FinalizeAndAssemble(ctx, w.chain, work.header, work.state, work.txs, work.unclelist(), work.receipts) } @@ -1432,6 +1450,7 @@ func (w *worker) generateWork(ctx context.Context, params *generateParams) (*typ // commitWork generates several new sealing tasks based on the parent block // and submit them to the sealer. func (w *worker) commitWork(ctx context.Context, interrupt *int32, noempty bool, timestamp int64) { + start := time.Now() var ( @@ -1461,6 +1480,17 @@ func (w *worker) commitWork(ctx context.Context, interrupt *int32, noempty bool, return } + var interruptCh chan struct{} + + stopFn := func() {} + defer func() { + stopFn() + }() + + if !noempty { + interruptCh, stopFn = getInterruptTimer(ctx, work, w.chain.CurrentBlock()) + } + ctx, span := tracing.StartSpan(ctx, "commitWork") defer tracing.EndSpan(span) @@ -1479,7 +1509,7 @@ func (w *worker) commitWork(ctx context.Context, interrupt *int32, noempty bool, } // Fill pending transactions from the txpool - w.fillTransactions(ctx, interrupt, work) + w.fillTransactions(ctx, interrupt, work, interruptCh) err = w.commit(ctx, work.copy(), w.fullTaskHook, true, start) if err != nil { @@ -1495,6 +1525,30 @@ func (w *worker) commitWork(ctx context.Context, interrupt *int32, noempty bool, w.current = work } +func getInterruptTimer(ctx context.Context, work *environment, current *types.Block) (chan struct{}, func()) { + delay := time.Until(time.Unix(int64(work.header.Time), 0)) + + timeoutTimer := time.NewTimer(delay) + stopFn := func() { + timeoutTimer.Stop() + } + + blockNumber := current.NumberU64() + 1 + interruptCh := make(chan struct{}) + + go func() { + select { + case <-timeoutTimer.C: + log.Info("Commit Interrupt. Pre-committing the current block", "block", blockNumber) + + close(interruptCh) + case <-ctx.Done(): // nothing to do + } + }() + + return interruptCh, stopFn +} + // commit runs any post-transaction state modifications, assembles the final block // and commits new work if consensus engine is running. // Note the assumption is held that the mutation is allowed to the passed env, do diff --git a/miner/worker_test.go b/miner/worker_test.go index 011895c85443..3a1dd5f8b9ee 100644 --- a/miner/worker_test.go +++ b/miner/worker_test.go @@ -172,9 +172,11 @@ func testGenerateBlockAndImport(t *testing.T, isClique bool, isBor bool) { } func TestEmptyWorkEthash(t *testing.T) { + t.Skip() testEmptyWork(t, ethashChainConfig, ethash.NewFaker()) } func TestEmptyWorkClique(t *testing.T) { + t.Skip() testEmptyWork(t, cliqueChainConfig, clique.New(cliqueChainConfig.Clique, rawdb.NewMemoryDatabase())) } diff --git a/rpc/handler.go b/rpc/handler.go index 488a29300a10..e3c72c66b1f2 100644 --- a/rpc/handler.go +++ b/rpc/handler.go @@ -28,27 +28,27 @@ import ( "github.com/ethereum/go-ethereum/log" ) +// nolint: gofmt // handler handles JSON-RPC messages. There is one handler per connection. Note that // handler is not safe for concurrent use. Message handling never blocks indefinitely // because RPCs are processed on background goroutines launched by handler. // // The entry points for incoming messages are: // -// h.handleMsg(message) -// h.handleBatch(message) +// h.handleMsg(message) +// h.handleBatch(message) // // Outgoing calls use the requestOp struct. Register the request before sending it // on the connection: // -// op := &requestOp{ids: ...} -// h.addRequestOp(op) +// op := &requestOp{ids: ...} +// h.addRequestOp(op) // // Now send the request, then wait for the reply to be delivered through handleMsg: // -// if err := op.wait(...); err != nil { -// h.removeRequestOp(op) // timeout, etc. -// } -// +// if err := op.wait(...); err != nil { +// h.removeRequestOp(op) // timeout, etc. +// } type handler struct { reg *serviceRegistry unsubscribeCb *callback @@ -219,6 +219,7 @@ func (h *handler) cancelServerSubscriptions(err error) { // startCallProc runs fn in a new goroutine and starts tracking it in the h.calls wait group. func (h *handler) startCallProc(fn func(*callProc)) { h.callWG.Add(1) + go func() { ctx, cancel := context.WithCancel(h.rootCtx) defer h.callWG.Done() diff --git a/rpc/inproc.go b/rpc/inproc.go index fbe9a40ceca9..e9cd3f7d6832 100644 --- a/rpc/inproc.go +++ b/rpc/inproc.go @@ -26,6 +26,7 @@ func DialInProc(handler *Server) *Client { initctx := context.Background() c, _ := newClient(initctx, func(context.Context) (ServerCodec, error) { p1, p2 := net.Pipe() + // nolint: contextcheck go handler.ServeCodec(NewCodec(p1), 0) return NewCodec(p2), nil }) diff --git a/rpc/ipc.go b/rpc/ipc.go index 07a211c6277c..5e782454f820 100644 --- a/rpc/ipc.go +++ b/rpc/ipc.go @@ -35,6 +35,7 @@ func (s *Server) ServeListener(l net.Listener) error { return err } log.Trace("Accepted RPC connection", "conn", conn.RemoteAddr()) + go s.ServeCodec(NewCodec(conn), 0) } } diff --git a/rpc/server.go b/rpc/server.go index babc5688e264..61ea704f44d7 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -105,11 +105,13 @@ func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec) { reqs, batch, err := codec.readBatch() if err != nil { if err != io.EOF { + // nolint:errcheck codec.writeJSON(ctx, errorMessage(&invalidMessageError{"parse error"})) } return } if batch { + // nolint: contextcheck h.handleBatch(reqs) } else { h.handleMsg(reqs[0])