Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core, eth, ethstats: simplify chain head events #30601

Merged
merged 3 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 9 additions & 15 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,6 @@ type BlockChain struct {
hc *HeaderChain
rmLogsFeed event.Feed
chainFeed event.Feed
chainSideFeed event.Feed
chainHeadFeed event.Feed
logsFeed event.Feed
blockProcFeed event.Feed
Expand Down Expand Up @@ -571,15 +570,14 @@ func (bc *BlockChain) SetHead(head uint64) error {
}
// Send chain head event to update the transaction pool
header := bc.CurrentBlock()
block := bc.GetBlock(header.Hash(), header.Number.Uint64())
if block == nil {
if block := bc.GetBlock(header.Hash(), header.Number.Uint64()); block == nil {
// This should never happen. In practice, previously currentBlock
// contained the entire block whereas now only a "marker", so there
// is an ever so slight chance for a race we should handle.
log.Error("Current block not found in database", "block", header.Number, "hash", header.Hash())
return fmt.Errorf("current block missing: #%d [%x..]", header.Number, header.Hash().Bytes()[:4])
}
bc.chainHeadFeed.Send(ChainHeadEvent{Block: block})
bc.chainHeadFeed.Send(ChainHeadEvent{Header: header})
return nil
}

Expand All @@ -593,15 +591,14 @@ func (bc *BlockChain) SetHeadWithTimestamp(timestamp uint64) error {
}
// Send chain head event to update the transaction pool
header := bc.CurrentBlock()
block := bc.GetBlock(header.Hash(), header.Number.Uint64())
if block == nil {
if block := bc.GetBlock(header.Hash(), header.Number.Uint64()); block == nil {
// This should never happen. In practice, previously currentBlock
// contained the entire block whereas now only a "marker", so there
// is an ever so slight chance for a race we should handle.
log.Error("Current block not found in database", "block", header.Number, "hash", header.Hash())
return fmt.Errorf("current block missing: #%d [%x..]", header.Number, header.Hash().Bytes()[:4])
}
bc.chainHeadFeed.Send(ChainHeadEvent{Block: block})
bc.chainHeadFeed.Send(ChainHeadEvent{Header: header})
return nil
}

Expand Down Expand Up @@ -1552,7 +1549,7 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types
// Set new head.
bc.writeHeadBlock(block)

bc.chainFeed.Send(ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
bc.chainFeed.Send(ChainEvent{Header: block.Header()})
if len(logs) > 0 {
bc.logsFeed.Send(logs)
}
Expand All @@ -1562,7 +1559,7 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types
// we will fire an accumulated ChainHeadEvent and disable fire
// event here.
if emitHeadEvent {
bc.chainHeadFeed.Send(ChainHeadEvent{Block: block})
bc.chainHeadFeed.Send(ChainHeadEvent{Header: block.Header()})
}
return CanonStatTy, nil
}
Expand Down Expand Up @@ -1627,7 +1624,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool, makeWitness
// Fire a single chain head event if we've progressed the chain
defer func() {
if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() {
bc.chainHeadFeed.Send(ChainHeadEvent{lastCanon})
bc.chainHeadFeed.Send(ChainHeadEvent{Header: lastCanon.Header()})
}
}()
// Start the parallel header verifier
Expand Down Expand Up @@ -2328,9 +2325,6 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error {
// Deleted logs + blocks:
var deletedLogs []*types.Log
for i := len(oldChain) - 1; i >= 0; i-- {
// Also send event for blocks removed from the canon chain.
bc.chainSideFeed.Send(ChainSideEvent{Block: oldChain[i]})

// Collect deleted logs for notification
if logs := bc.collectLogs(oldChain[i], true); len(logs) > 0 {
deletedLogs = append(deletedLogs, logs...)
Expand Down Expand Up @@ -2403,11 +2397,11 @@ func (bc *BlockChain) SetCanonical(head *types.Block) (common.Hash, error) {

// Emit events
logs := bc.collectLogs(head, false)
bc.chainFeed.Send(ChainEvent{Block: head, Hash: head.Hash(), Logs: logs})
bc.chainFeed.Send(ChainEvent{Header: head.Header()})
if len(logs) > 0 {
bc.logsFeed.Send(logs)
}
bc.chainHeadFeed.Send(ChainHeadEvent{Block: head})
bc.chainHeadFeed.Send(ChainHeadEvent{Header: head.Header()})

context := []interface{}{
"number", head.Number(),
Expand Down
5 changes: 0 additions & 5 deletions core/blockchain_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,11 +430,6 @@ func (bc *BlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Su
return bc.scope.Track(bc.chainHeadFeed.Subscribe(ch))
}

// SubscribeChainSideEvent registers a subscription of ChainSideEvent.
func (bc *BlockChain) SubscribeChainSideEvent(ch chan<- ChainSideEvent) event.Subscription {
return bc.scope.Track(bc.chainSideFeed.Subscribe(ch))
}

// SubscribeLogsEvent registers a subscription of []*types.Log.
func (bc *BlockChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
return bc.scope.Track(bc.logsFeed.Subscribe(ch))
Expand Down
79 changes: 0 additions & 79 deletions core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1332,85 +1332,6 @@ func checkLogEvents(t *testing.T, logsCh <-chan []*types.Log, rmLogsCh <-chan Re
}
}

func TestReorgSideEvent(t *testing.T) {
testReorgSideEvent(t, rawdb.HashScheme)
testReorgSideEvent(t, rawdb.PathScheme)
}

func testReorgSideEvent(t *testing.T, scheme string) {
var (
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
gspec = &Genesis{
Config: params.TestChainConfig,
Alloc: types.GenesisAlloc{addr1: {Balance: big.NewInt(10000000000000000)}},
}
signer = types.LatestSigner(gspec.Config)
)
blockchain, _ := NewBlockChain(rawdb.NewMemoryDatabase(), DefaultCacheConfigWithScheme(scheme), gspec, nil, ethash.NewFaker(), vm.Config{}, nil)
defer blockchain.Stop()

_, chain, _ := GenerateChainWithGenesis(gspec, ethash.NewFaker(), 3, func(i int, gen *BlockGen) {})
if _, err := blockchain.InsertChain(chain); err != nil {
t.Fatalf("failed to insert chain: %v", err)
}

_, replacementBlocks, _ := GenerateChainWithGenesis(gspec, ethash.NewFaker(), 4, func(i int, gen *BlockGen) {
tx, err := types.SignTx(types.NewContractCreation(gen.TxNonce(addr1), new(big.Int), 1000000, gen.header.BaseFee, nil), signer, key1)
if i == 2 {
gen.OffsetTime(-9)
}
if err != nil {
t.Fatalf("failed to create tx: %v", err)
}
gen.AddTx(tx)
})
chainSideCh := make(chan ChainSideEvent, 64)
blockchain.SubscribeChainSideEvent(chainSideCh)
if _, err := blockchain.InsertChain(replacementBlocks); err != nil {
t.Fatalf("failed to insert chain: %v", err)
}

expectedSideHashes := map[common.Hash]bool{
chain[0].Hash(): true,
chain[1].Hash(): true,
chain[2].Hash(): true,
}

i := 0

const timeoutDura = 10 * time.Second
timeout := time.NewTimer(timeoutDura)
done:
for {
select {
case ev := <-chainSideCh:
block := ev.Block
if _, ok := expectedSideHashes[block.Hash()]; !ok {
t.Errorf("%d: didn't expect %x to be in side chain", i, block.Hash())
}
i++

if i == len(expectedSideHashes) {
timeout.Stop()

break done
}
timeout.Reset(timeoutDura)

case <-timeout.C:
t.Fatalf("Timeout. Possibly not all blocks were triggered for sideevent: %v", i)
}
}

// make sure no more events are fired
select {
case e := <-chainSideCh:
t.Errorf("unexpected event fired: %v", e)
case <-time.After(250 * time.Millisecond):
}
}

// Tests if the canonical block can be fetched from the database during chain insertion.
func TestCanonicalBlockRetrieval(t *testing.T) {
testCanonicalBlockRetrieval(t, rawdb.HashScheme)
Expand Down
9 changes: 4 additions & 5 deletions core/chain_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,20 +222,19 @@ func (c *ChainIndexer) eventLoop(currentHeader *types.Header, events chan ChainH
errc <- nil
return
}
header := ev.Block.Header()
if header.ParentHash != prevHash {
if ev.Header.ParentHash != prevHash {
// Reorg to the common ancestor if needed (might not exist in light sync mode, skip reorg then)
// TODO(karalabe, zsfelfoldi): This seems a bit brittle, can we detect this case explicitly?

if rawdb.ReadCanonicalHash(c.chainDb, prevHeader.Number.Uint64()) != prevHash {
if h := rawdb.FindCommonAncestor(c.chainDb, prevHeader, header); h != nil {
if h := rawdb.FindCommonAncestor(c.chainDb, prevHeader, ev.Header); h != nil {
c.newHead(h.Number.Uint64(), true)
}
}
}
c.newHead(header.Number.Uint64(), false)
c.newHead(ev.Header.Number.Uint64(), false)

prevHeader, prevHash = header, header.Hash()
prevHeader, prevHash = ev.Header, ev.Header.Hash()
}
}
}
Expand Down
14 changes: 3 additions & 11 deletions core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,19 @@
package core

import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)

// NewTxsEvent is posted when a batch of transactions enter the transaction pool.
type NewTxsEvent struct{ Txs []*types.Transaction }

// NewMinedBlockEvent is posted when a block has been imported.
type NewMinedBlockEvent struct{ Block *types.Block }

// RemovedLogsEvent is posted when a reorg happens
type RemovedLogsEvent struct{ Logs []*types.Log }

type ChainEvent struct {
Block *types.Block
Hash common.Hash
Logs []*types.Log
Header *types.Header
}

type ChainSideEvent struct {
Block *types.Block
type ChainHeadEvent struct {
Header *types.Header
}

type ChainHeadEvent struct{ Block *types.Block }
4 changes: 2 additions & 2 deletions core/txindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,9 @@ func (indexer *txIndexer) loop(chain *BlockChain) {
if done == nil {
stop = make(chan struct{})
done = make(chan struct{})
go indexer.run(rawdb.ReadTxIndexTail(indexer.db), head.Block.NumberU64(), stop, done)
go indexer.run(rawdb.ReadTxIndexTail(indexer.db), head.Header.Number.Uint64(), stop, done)
}
lastHead = head.Block.NumberU64()
lastHead = head.Header.Number.Uint64()
case <-done:
stop = nil
done = nil
Expand Down
2 changes: 1 addition & 1 deletion core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (p *TxPool) loop(head *types.Header, chain BlockChain) {
select {
case event := <-newHeadCh:
// Chain moved forward, store the head for later consumption
newHead = event.Block.Header()
newHead = event.Header

case head := <-resetDone:
// Previous reset finished, update the old head and allow a new reset
Expand Down
4 changes: 0 additions & 4 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,10 +275,6 @@ func (b *EthAPIBackend) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) e
return b.eth.BlockChain().SubscribeChainHeadEvent(ch)
}

func (b *EthAPIBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription {
return b.eth.BlockChain().SubscribeChainSideEvent(ch)
}

func (b *EthAPIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
return b.eth.BlockChain().SubscribeLogsEvent(ch)
}
Expand Down
17 changes: 9 additions & 8 deletions eth/catalyst/simulated_beacon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,16 +123,16 @@ func TestSimulatedBeaconSendWithdrawals(t *testing.T) {
timer := time.NewTimer(12 * time.Second)
for {
select {
case evt := <-chainHeadCh:
for _, includedTx := range evt.Block.Transactions() {
case ev := <-chainHeadCh:
block := ethService.BlockChain().GetBlock(ev.Header.Hash(), ev.Header.Number.Uint64())
for _, includedTx := range block.Transactions() {
includedTxs[includedTx.Hash()] = struct{}{}
}
for _, includedWithdrawal := range evt.Block.Withdrawals() {
for _, includedWithdrawal := range block.Withdrawals() {
includedWithdrawals = append(includedWithdrawals, includedWithdrawal.Index)
}

// ensure all withdrawals/txs included. this will take two blocks b/c number of withdrawals > 10
if len(includedTxs) == len(txs) && len(includedWithdrawals) == len(withdrawals) && evt.Block.Number().Cmp(big.NewInt(2)) == 0 {
if len(includedTxs) == len(txs) && len(includedWithdrawals) == len(withdrawals) && ev.Header.Number.Cmp(big.NewInt(2)) == 0 {
return
}
case <-timer.C:
Expand Down Expand Up @@ -186,11 +186,12 @@ func TestOnDemandSpam(t *testing.T) {
)
for {
select {
case evt := <-chainHeadCh:
for _, itx := range evt.Block.Transactions() {
case ev := <-chainHeadCh:
block := eth.BlockChain().GetBlock(ev.Header.Hash(), ev.Header.Number.Uint64())
for _, itx := range block.Transactions() {
includedTxs[itx.Hash()] = struct{}{}
}
for _, iwx := range evt.Block.Withdrawals() {
for _, iwx := range block.Withdrawals() {
includedWxs = append(includedWxs, iwx.Index)
}
// ensure all withdrawals/txs included. this will take two blocks b/c number of withdrawals > 10
Expand Down
2 changes: 1 addition & 1 deletion eth/filters/filter_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent)

func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent) {
for _, f := range filters[BlocksSubscription] {
f.headers <- ev.Block.Header()
f.headers <- ev.Header
}
}

Expand Down
10 changes: 5 additions & 5 deletions eth/filters/filter_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func TestBlockSubscription(t *testing.T) {
)

for _, blk := range chain {
chainEvents = append(chainEvents, core.ChainEvent{Hash: blk.Hash(), Block: blk})
chainEvents = append(chainEvents, core.ChainEvent{Header: blk.Header()})
}

chan0 := make(chan *types.Header)
Expand All @@ -213,13 +213,13 @@ func TestBlockSubscription(t *testing.T) {
for i1 != len(chainEvents) || i2 != len(chainEvents) {
select {
case header := <-chan0:
if chainEvents[i1].Hash != header.Hash() {
t.Errorf("sub0 received invalid hash on index %d, want %x, got %x", i1, chainEvents[i1].Hash, header.Hash())
if chainEvents[i1].Header.Hash() != header.Hash() {
t.Errorf("sub0 received invalid hash on index %d, want %x, got %x", i1, chainEvents[i1].Header.Hash(), header.Hash())
}
i1++
case header := <-chan1:
if chainEvents[i2].Hash != header.Hash() {
t.Errorf("sub1 received invalid hash on index %d, want %x, got %x", i2, chainEvents[i2].Hash, header.Hash())
if chainEvents[i2].Header.Hash() != header.Hash() {
t.Errorf("sub1 received invalid hash on index %d, want %x, got %x", i2, chainEvents[i2].Header.Hash(), header.Hash())
}
i2++
}
Expand Down
4 changes: 2 additions & 2 deletions eth/gasprice/gasprice.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,10 @@ func NewOracle(backend OracleBackend, params Config, startPrice *big.Int) *Oracl
go func() {
var lastHead common.Hash
for ev := range headEvent {
if ev.Block.ParentHash() != lastHead {
if ev.Header.ParentHash != lastHead {
cache.Purge()
}
lastHead = ev.Block.Hash()
lastHead = ev.Header.Hash()
}
}()

Expand Down
Loading