Skip to content

Commit

Permalink
fix : go-routine leak in commitInterrupt channel (ethereum#851)
Browse files Browse the repository at this point in the history
* fix : go-routine leak in commitInterrupt channel

* fix : fix lint

* rm : remove t.Parallel() from TestGenerateBlockAndImport tests

* fix : minor optimisations in test

* fix : BenchmarkBorMining
  • Loading branch information
0xsharma committed May 8, 2023
1 parent 51cbcea commit 1995521
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 59 deletions.
2 changes: 1 addition & 1 deletion miner/fake_miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ var (
// Test accounts
testBankKey, _ = crypto.GenerateKey()
TestBankAddress = crypto.PubkeyToAddress(testBankKey.PublicKey)
testBankFunds = big.NewInt(1000000000000000000)
testBankFunds = big.NewInt(9000000000000000000)

testUserKey, _ = crypto.GenerateKey()
testUserAddress = crypto.PubkeyToAddress(testUserKey.PublicKey)
Expand Down
35 changes: 19 additions & 16 deletions miner/test_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,16 +358,14 @@ func (w *worker) mainLoopWithDelay(ctx context.Context, delay uint) {
txset := types.NewTransactionsByPriceAndNonce(w.current.signer, txs, cmath.FromBig(w.current.header.BaseFee))
tcount := w.current.tcount

interruptCh, stopFn := getInterruptTimer(ctx, w.current, w.chain.CurrentBlock())
w.commitTransactionsWithDelay(w.current, txset, nil, interruptCh, delay)
//nolint:contextcheck
w.commitTransactions(w.current, txset, nil, context.Background())

// Only update the snapshot if any new transactions were added
// to the pending block
if tcount != w.current.tcount {
w.updateSnapshot(w.current)
}

stopFn()
} else {
// Special case, if the consensus engine is 0 period clique(dev mode),
// submit sealing work here since all empty submission will be rejected
Expand All @@ -393,7 +391,7 @@ func (w *worker) mainLoopWithDelay(ctx context.Context, delay uint) {
}

// nolint:gocognit
func (w *worker) commitTransactionsWithDelay(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *int32, interruptCh chan struct{}, delay uint) bool {
func (w *worker) commitTransactionsWithDelay(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *int32, interruptCtx context.Context, delay uint) bool {
gasLimit := env.header.GasLimit
if env.gasPool == nil {
env.gasPool = new(core.GasPool).AddGas(gasLimit)
Expand All @@ -420,11 +418,15 @@ func (w *worker) commitTransactionsWithDelay(env *environment, txs *types.Transa
mainloop:
for {
// case of interrupting by timeout
select {
case <-interruptCh:
commitInterruptCounter.Inc(1)
break mainloop
default:
if interruptCtx != nil {
// case of interrupting by timeout
select {
case <-interruptCtx.Done():
commitInterruptCounter.Inc(1)
log.Warn("Tx Level Interrupt")
break mainloop
default:
}
}
// In the following three cases, we will interrupt the execution of the transaction.
// (1) new head block event arrival, the interrupt signal is 1
Expand Down Expand Up @@ -581,15 +583,16 @@ func (w *worker) commitWorkWithDelay(ctx context.Context, interrupt *int32, noem
return
}

var interruptCh chan struct{}
//nolint:contextcheck
var interruptCtx = context.Background()

stopFn := func() {}
defer func() {
stopFn()
}()

if !noempty {
interruptCh, stopFn = getInterruptTimer(ctx, work, w.chain.CurrentBlock())
interruptCtx, stopFn = getInterruptTimer(ctx, work, w.chain.CurrentBlock())
}

ctx, span := tracing.StartSpan(ctx, "commitWork")
Expand All @@ -610,7 +613,7 @@ func (w *worker) commitWorkWithDelay(ctx context.Context, interrupt *int32, noem
}

// Fill pending transactions from the txpool
w.fillTransactionsWithDelay(ctx, interrupt, work, interruptCh, delay)
w.fillTransactionsWithDelay(ctx, interrupt, work, interruptCtx, delay)

err = w.commit(ctx, work.copy(), w.fullTaskHook, true, start)
if err != nil {
Expand All @@ -627,7 +630,7 @@ func (w *worker) commitWorkWithDelay(ctx context.Context, interrupt *int32, noem
}

// nolint:gocognit
func (w *worker) fillTransactionsWithDelay(ctx context.Context, interrupt *int32, env *environment, interruptCh chan struct{}, delay uint) {
func (w *worker) fillTransactionsWithDelay(ctx context.Context, interrupt *int32, env *environment, interruptCtx context.Context, delay uint) {
ctx, span := tracing.StartSpan(ctx, "fillTransactions")
defer tracing.EndSpan(span)

Expand Down Expand Up @@ -751,7 +754,7 @@ func (w *worker) fillTransactionsWithDelay(ctx context.Context, interrupt *int32
})

tracing.Exec(ctx, "", "worker.LocalCommitTransactions", func(ctx context.Context, span trace.Span) {
committed = w.commitTransactionsWithDelay(env, txs, interrupt, interruptCh, delay)
committed = w.commitTransactionsWithDelay(env, txs, interrupt, interruptCtx, delay)
})

if committed {
Expand All @@ -774,7 +777,7 @@ func (w *worker) fillTransactionsWithDelay(ctx context.Context, interrupt *int32
})

tracing.Exec(ctx, "", "worker.RemoteCommitTransactions", func(ctx context.Context, span trace.Span) {
committed = w.commitTransactionsWithDelay(env, txs, interrupt, interruptCh, delay)
committed = w.commitTransactionsWithDelay(env, txs, interrupt, interruptCtx, delay)
})

if committed {
Expand Down
57 changes: 29 additions & 28 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,9 +654,8 @@ func (w *worker) mainLoop(ctx context.Context) {
txset := types.NewTransactionsByPriceAndNonce(w.current.signer, txs, cmath.FromBig(w.current.header.BaseFee))
tcount := w.current.tcount

var interruptCh chan struct{}

w.commitTransactions(w.current, txset, nil, interruptCh)
//nolint:contextcheck
w.commitTransactions(w.current, txset, nil, context.Background())

// Only update the snapshot if any new transactions were added
// to the pending block
Expand Down Expand Up @@ -945,7 +944,7 @@ func (w *worker) commitTransaction(env *environment, tx *types.Transaction) ([]*
}

//nolint:gocognit
func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *int32, interruptCh chan struct{}) bool {
func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *int32, interruptCtx context.Context) bool {
gasLimit := env.header.GasLimit
if env.gasPool == nil {
env.gasPool = new(core.GasPool).AddGas(gasLimit)
Expand All @@ -971,11 +970,15 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP
mainloop:
for {
// case of interrupting by timeout
select {
case <-interruptCh:
commitInterruptCounter.Inc(1)
break mainloop
default:
if interruptCtx != nil {
// case of interrupting by timeout
select {
case <-interruptCtx.Done():
commitInterruptCounter.Inc(1)
log.Warn("Tx Level Interrupt")
break mainloop
default:
}
}

// In the following three cases, we will interrupt the execution of the transaction.
Expand Down Expand Up @@ -1266,7 +1269,7 @@ func startProfiler(profile string, filepath string, number uint64) (func() error
// be customized with the plugin in the future.
//
//nolint:gocognit
func (w *worker) fillTransactions(ctx context.Context, interrupt *int32, env *environment, interruptCh chan struct{}) {
func (w *worker) fillTransactions(ctx context.Context, interrupt *int32, env *environment, interruptCtx context.Context) {
ctx, span := tracing.StartSpan(ctx, "fillTransactions")
defer tracing.EndSpan(span)

Expand Down Expand Up @@ -1390,7 +1393,7 @@ func (w *worker) fillTransactions(ctx context.Context, interrupt *int32, env *en
})

tracing.Exec(ctx, "", "worker.LocalCommitTransactions", func(ctx context.Context, span trace.Span) {
committed = w.commitTransactions(env, txs, interrupt, interruptCh)
committed = w.commitTransactions(env, txs, interrupt, interruptCtx)
})

if committed {
Expand All @@ -1413,7 +1416,7 @@ func (w *worker) fillTransactions(ctx context.Context, interrupt *int32, env *en
})

tracing.Exec(ctx, "", "worker.RemoteCommitTransactions", func(ctx context.Context, span trace.Span) {
committed = w.commitTransactions(env, txs, interrupt, interruptCh)
committed = w.commitTransactions(env, txs, interrupt, interruptCtx)
})

if committed {
Expand All @@ -1438,10 +1441,10 @@ func (w *worker) generateWork(ctx context.Context, params *generateParams) (*typ
}
defer work.discard()

interruptCh, stopFn := getInterruptTimer(ctx, work, w.chain.CurrentBlock())
interruptCtx, stopFn := getInterruptTimer(ctx, work, w.chain.CurrentBlock())
defer stopFn()

w.fillTransactions(ctx, nil, work, interruptCh)
w.fillTransactions(ctx, nil, work, interruptCtx)

return w.engine.FinalizeAndAssemble(ctx, w.chain, work.header, work.state, work.txs, work.unclelist(), work.receipts)
}
Expand Down Expand Up @@ -1479,15 +1482,16 @@ func (w *worker) commitWork(ctx context.Context, interrupt *int32, noempty bool,
return
}

var interruptCh chan struct{}
//nolint:contextcheck
var interruptCtx = context.Background()

stopFn := func() {}
defer func() {
stopFn()
}()

if !noempty {
interruptCh, stopFn = getInterruptTimer(ctx, work, w.chain.CurrentBlock())
interruptCtx, stopFn = getInterruptTimer(ctx, work, w.chain.CurrentBlock())
}

ctx, span := tracing.StartSpan(ctx, "commitWork")
Expand All @@ -1508,7 +1512,7 @@ func (w *worker) commitWork(ctx context.Context, interrupt *int32, noempty bool,
}

// Fill pending transactions from the txpool
w.fillTransactions(ctx, interrupt, work, interruptCh)
w.fillTransactions(ctx, interrupt, work, interruptCtx)

err = w.commit(ctx, work.copy(), w.fullTaskHook, true, start)
if err != nil {
Expand All @@ -1524,28 +1528,25 @@ func (w *worker) commitWork(ctx context.Context, interrupt *int32, noempty bool,
w.current = work
}

func getInterruptTimer(ctx context.Context, work *environment, current *types.Block) (chan struct{}, func()) {
func getInterruptTimer(ctx context.Context, work *environment, current *types.Block) (context.Context, func()) {
delay := time.Until(time.Unix(int64(work.header.Time), 0))

timeoutTimer := time.NewTimer(delay)
stopFn := func() {
timeoutTimer.Stop()
}
interruptCtx, cancel := context.WithTimeout(context.Background(), delay)

blockNumber := current.NumberU64() + 1
interruptCh := make(chan struct{})

go func() {
select {
case <-timeoutTimer.C:
log.Info("Commit Interrupt. Pre-committing the current block", "block", blockNumber)

close(interruptCh)
case <-interruptCtx.Done():
if interruptCtx.Err() != context.Canceled {
log.Info("Commit Interrupt. Pre-committing the current block", "block", blockNumber)
cancel()
}
case <-ctx.Done(): // nothing to do
}
}()

return interruptCh, stopFn
return interruptCtx, cancel
}

// commit runs any post-transaction state modifications, assembles the final block
Expand Down
20 changes: 6 additions & 14 deletions miner/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,21 +43,18 @@ import (
"github.com/ethereum/go-ethereum/tests/bor/mocks"
)

// nolint : paralleltest
func TestGenerateBlockAndImportEthash(t *testing.T) {
t.Parallel()

testGenerateBlockAndImport(t, false, false)
}

// nolint : paralleltest
func TestGenerateBlockAndImportClique(t *testing.T) {
t.Parallel()

testGenerateBlockAndImport(t, true, false)
}

// nolint : paralleltest
func TestGenerateBlockAndImportBor(t *testing.T) {
t.Parallel()

testGenerateBlockAndImport(t, false, true)
}

Expand Down Expand Up @@ -627,18 +624,18 @@ func testGetSealingWork(t *testing.T, chainConfig *params.ChainConfig, engine co
}
}

// nolint:paralleltest
func TestCommitInterruptExperimentBor(t *testing.T) {
t.Parallel()
// with 1 sec block time and 200 millisec tx delay we should get 5 txs per block
testCommitInterruptExperimentBor(t, 200, 5)

time.Sleep(3 * time.Second)
// with 1 sec block time and 100 millisec tx delay we should get 10 txs per block
testCommitInterruptExperimentBor(t, 100, 10)
}

// nolint:thelper
func testCommitInterruptExperimentBor(t *testing.T, delay uint, txCount int) {
t.Helper()

var (
engine consensus.Engine
chainConfig *params.ChainConfig
Expand Down Expand Up @@ -726,11 +723,6 @@ func BenchmarkBorMining(b *testing.B) {
chain, _ := core.NewBlockChain(db2, nil, back.chain.Config(), engine, vm.Config{}, nil, nil, nil)
defer chain.Stop()

// Ignore empty commit here for less noise.
w.skipSealHook = func(task *task) bool {
return len(task.receipts) == 0
}

// fulfill tx pool
const (
totalGas = testGas + params.TxGas
Expand Down

0 comments on commit 1995521

Please sign in to comment.