Skip to content

Commit

Permalink
Fix new_heads Events Emission on Block Forks (#10072)
Browse files Browse the repository at this point in the history
TL;DR: on a reorg, the common ancestor block is not being published to
subscribers of newHeads

#### Expected behavior

if the reorg's common ancestor is 2, I expect 2 to be republished

1, 2, **2**, **3**, **4**

#### Actual behavior

2 is not republished, and 3's parentHash points to a 2 header that was
never received

1, 2, **3**, **4**

This PR is the same thing as
#9738 except with a test.

Note... the test passes, but **this does not actually work in
production** (for Ethereum mainnet with prysm as external CL).

Why? Because in production, `h.sync.PrevUnwindPoint()` is always nil:
https://github.com/ledgerwatch/erigon/blob/a5270bccf5e69a6beaaab9a0663bdad80e989505/turbo/stages/stageloop.go#L291
which means the initial "if block" is never entered, and thus we have
**no control** of increment/decrement `notifyFrom` during reorgs
https://github.com/ledgerwatch/erigon/blob/a5270bccf5e69a6beaaab9a0663bdad80e989505/eth/stagedsync/stage_finish.go#L137-L146

I don't know why `h.sync.PrevUnwindPoint()` is seemingly always nil, or
how the test can pass if it fails in prod. I'm hoping to pass the baton
to someone who might. Thank you @indanielo for original fix.

If we can figure this bug out, it closes #8848 and closes #9568 and
closes #10056

---------

Co-authored-by: Daniel Gimenez <25278291+indanielo@users.noreply.github.com>
  • Loading branch information
jotto and indanielo authored Apr 26, 2024
1 parent aee77ab commit 4e56433
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 9 deletions.
2 changes: 1 addition & 1 deletion eth/stagedsync/stage_finish.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ func NotifyNewHeaders(ctx context.Context, finishStageBeforeSync uint64, finishS
heightSpan = 1024
}
notifyFrom = finishStageAfterSync - heightSpan
notifyFrom++
}
notifyFrom++

var notifyTo = notifyFrom
var notifyToHash libcommon.Hash
Expand Down
61 changes: 53 additions & 8 deletions turbo/jsonrpc/eth_subscribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package jsonrpc

import (
"context"
"fmt"
"math/big"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -25,14 +25,22 @@ import (
"github.com/ledgerwatch/erigon/turbo/stages/mock"
)

func TestEthSubscribe(t *testing.T) {
m, require := mock.Mock(t), require.New(t)
chain, err := core.GenerateChain(m.ChainConfig, m.Genesis, m.Engine, m.DB, 7, func(i int, b *core.BlockGen) {
b.SetCoinbase(libcommon.Address{1})
func sendBlock(t *testing.T, require *require.Assertions, m *mock.MockSentry, chain *core.ChainPack) {
// Send NewBlock message
b, err := rlp.EncodeToBytes(&eth.NewBlockPacket{
Block: chain.TopBlock,
TD: big.NewInt(1), // This is ignored anyway
})
require.NoError(err)
if err != nil {
t.Fatal(err)
}
m.ReceiveWg.Add(1)
for _, err = range m.Send(&sentry.InboundMessage{Id: sentry.MessageId_NEW_BLOCK_66, Data: b, PeerId: m.PeerId}) {
require.NoError(err)
}

b, err := rlp.EncodeToBytes(&eth.BlockHeadersPacket66{
// Send all the headers
b, err = rlp.EncodeToBytes(&eth.BlockHeadersPacket66{
RequestId: 1,
BlockHeadersPacket: chain.Headers,
})
Expand All @@ -43,6 +51,16 @@ func TestEthSubscribe(t *testing.T) {
require.NoError(err)
}
m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed
}

func TestEthSubscribe(t *testing.T) {
m, require := mock.Mock(t), require.New(t)
chain, err := core.GenerateChain(m.ChainConfig, m.Genesis, m.Engine, m.DB, 7, func(i int, b *core.BlockGen) {
b.SetCoinbase(libcommon.Address{1})
})
require.NoError(err)

sendBlock(t, require, m, chain)

ctx := context.Background()
logger := log.New()
Expand All @@ -64,7 +82,34 @@ func TestEthSubscribe(t *testing.T) {

for i := uint64(1); i <= highestSeenHeader; i++ {
header := <-newHeads
fmt.Printf("Got header %d\n", header.Number.Uint64())
require.Equal(i, header.Number.Uint64())
require.Equal(chain.Blocks[i-1].Hash(), header.Hash())
}

// create reorg chain starting with common ancestor of 3, 4 will be first block with different coinbase
m2 := mock.Mock(t)
chain, err = core.GenerateChain(m2.ChainConfig, m2.Genesis, m2.Engine, m2.DB, 9, func(i int, b *core.BlockGen) {
// i starts from 0, so this means everything under block 4 will have coinbase 1, and 4 and above will have coinbase 2
if i < 3 {
b.SetCoinbase(libcommon.Address{1})
} else {
b.SetCoinbase(libcommon.Address{2})
}
})
require.NoError(err)

sendBlock(t, require, m, chain)

if err := stages.StageLoopIteration(m.Ctx, m.DB, wrap.TxContainer{}, m.Sync, initialCycle, logger, m.BlockReader, hook, false); err != nil {
t.Fatal(err)
}

highestSeenHeader = chain.TopBlock.NumberU64()

// since common ancestor of reorg is 3 the first new header we will see should be 3
for i := uint64(3); i <= highestSeenHeader; i++ {
header := <-newHeads
require.Equal(i, header.Number.Uint64())
require.Equal(chain.Blocks[i-1].Hash(), header.Hash())
}
}

0 comments on commit 4e56433

Please sign in to comment.