diff --git a/eth/stagedsync/stage_finish.go b/eth/stagedsync/stage_finish.go index a90be80bc37..212baa2aedf 100644 --- a/eth/stagedsync/stage_finish.go +++ b/eth/stagedsync/stage_finish.go @@ -143,8 +143,8 @@ func NotifyNewHeaders(ctx context.Context, finishStageBeforeSync uint64, finishS heightSpan = 1024 } notifyFrom = finishStageAfterSync - heightSpan + notifyFrom++ } - notifyFrom++ var notifyTo = notifyFrom var notifyToHash libcommon.Hash diff --git a/turbo/jsonrpc/eth_subscribe_test.go b/turbo/jsonrpc/eth_subscribe_test.go index 1a595fb2718..eee0edb7a44 100644 --- a/turbo/jsonrpc/eth_subscribe_test.go +++ b/turbo/jsonrpc/eth_subscribe_test.go @@ -2,7 +2,7 @@ package jsonrpc import ( "context" - "fmt" + "math/big" "testing" "github.com/stretchr/testify/require" @@ -25,14 +25,22 @@ import ( "github.com/ledgerwatch/erigon/turbo/stages/mock" ) -func TestEthSubscribe(t *testing.T) { - m, require := mock.Mock(t), require.New(t) - chain, err := core.GenerateChain(m.ChainConfig, m.Genesis, m.Engine, m.DB, 7, func(i int, b *core.BlockGen) { - b.SetCoinbase(libcommon.Address{1}) +func sendBlock(t *testing.T, require *require.Assertions, m *mock.MockSentry, chain *core.ChainPack) { + // Send NewBlock message + b, err := rlp.EncodeToBytes(ð.NewBlockPacket{ + Block: chain.TopBlock, + TD: big.NewInt(1), // This is ignored anyway }) - require.NoError(err) + if err != nil { + t.Fatal(err) + } + m.ReceiveWg.Add(1) + for _, err = range m.Send(&sentry.InboundMessage{Id: sentry.MessageId_NEW_BLOCK_66, Data: b, PeerId: m.PeerId}) { + require.NoError(err) + } - b, err := rlp.EncodeToBytes(ð.BlockHeadersPacket66{ + // Send all the headers + b, err = rlp.EncodeToBytes(ð.BlockHeadersPacket66{ RequestId: 1, BlockHeadersPacket: chain.Headers, }) @@ -43,6 +51,16 @@ func TestEthSubscribe(t *testing.T) { require.NoError(err) } m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed +} + +func TestEthSubscribe(t *testing.T) { + m, require := mock.Mock(t), require.New(t) + chain, err := core.GenerateChain(m.ChainConfig, m.Genesis, m.Engine, m.DB, 7, func(i int, b *core.BlockGen) { + b.SetCoinbase(libcommon.Address{1}) + }) + require.NoError(err) + + sendBlock(t, require, m, chain) ctx := context.Background() logger := log.New() @@ -64,7 +82,34 @@ func TestEthSubscribe(t *testing.T) { for i := uint64(1); i <= highestSeenHeader; i++ { header := <-newHeads - fmt.Printf("Got header %d\n", header.Number.Uint64()) require.Equal(i, header.Number.Uint64()) + require.Equal(chain.Blocks[i-1].Hash(), header.Hash()) + } + + // create reorg chain starting with common ancestor of 3, 4 will be first block with different coinbase + m2 := mock.Mock(t) + chain, err = core.GenerateChain(m2.ChainConfig, m2.Genesis, m2.Engine, m2.DB, 9, func(i int, b *core.BlockGen) { + // i starts from 0, so this means everything under block 4 will have coinbase 1, and 4 and above will have coinbase 2 + if i < 3 { + b.SetCoinbase(libcommon.Address{1}) + } else { + b.SetCoinbase(libcommon.Address{2}) + } + }) + require.NoError(err) + + sendBlock(t, require, m, chain) + + if err := stages.StageLoopIteration(m.Ctx, m.DB, wrap.TxContainer{}, m.Sync, initialCycle, logger, m.BlockReader, hook, false); err != nil { + t.Fatal(err) + } + + highestSeenHeader = chain.TopBlock.NumberU64() + + // since common ancestor of reorg is 3 the first new header we will see should be 3 + for i := uint64(3); i <= highestSeenHeader; i++ { + header := <-newHeads + require.Equal(i, header.Number.Uint64()) + require.Equal(chain.Blocks[i-1].Hash(), header.Hash()) } }