Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: move tx indexer to its own file #28857

Merged
merged 3 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 7 additions & 171 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,17 +192,6 @@ type txLookup struct {
transaction *types.Transaction
}

// TxIndexProgress is the struct describing the progress for transaction indexing.
type TxIndexProgress struct {
Indexed uint64 // number of blocks whose transactions are indexed
Remaining uint64 // number of blocks whose transactions are not indexed yet
}

// Done returns an indicator if the transaction indexing is finished.
func (prog TxIndexProgress) Done() bool {
return prog.Remaining == 0
}

// BlockChain represents the canonical chain given a database with a genesis
// block. The Blockchain manages chain imports, reverts, chain reorganisations.
//
Expand All @@ -229,13 +218,7 @@ type BlockChain struct {
flushInterval atomic.Int64 // Time interval (processing time) after which to flush a state
triedb *trie.Database // The database handler for maintaining trie nodes.
stateCache state.Database // State database to reuse between imports (contains state cache)

// txLookupLimit is the maximum number of blocks from head whose tx indices
// are reserved:
// * 0: means no limit and regenerate any missing indexes
// * N: means N block limit [HEAD-N+1, HEAD] and delete extra indexes
// * nil: disable tx reindexer/deleter, but still index new blocks
txLookupLimit uint64
txIndexer *txIndexer // Transaction indexer, might be nil if not enabled

hc *HeaderChain
rmLogsFeed event.Feed
Expand Down Expand Up @@ -270,9 +253,6 @@ type BlockChain struct {
stopping atomic.Bool // false if chain is running, true when stopped
procInterrupt atomic.Bool // interrupt signaler for block processing

txIndexRunning bool // flag if the background tx indexer is activated
txIndexProgCh chan chan TxIndexProgress // chan for querying the progress of transaction indexing

engine consensus.Engine
validator Validator // Block and state validator interface
prefetcher Prefetcher
Expand Down Expand Up @@ -320,7 +300,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
blockCache: lru.NewCache[common.Hash, *types.Block](blockCacheLimit),
txLookupCache: lru.NewCache[common.Hash, txLookup](txLookupCacheLimit),
futureBlocks: lru.NewCache[common.Hash, *types.Block](maxFutureBlocks),
txIndexProgCh: make(chan chan TxIndexProgress),
engine: engine,
vmConfig: vmConfig,
}
Expand Down Expand Up @@ -485,13 +464,9 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
}
rawdb.WriteChainConfig(db, genesisHash, chainConfig)
}
// Start tx indexer/unindexer if required.
// Start tx indexer if it's enabled.
if txLookupLimit != nil {
bc.txLookupLimit = *txLookupLimit
bc.txIndexRunning = true

bc.wg.Add(1)
go bc.maintainTxIndex()
bc.txIndexer = newTxIndexer(*txLookupLimit, bc)
}
return bc, nil
}
Expand Down Expand Up @@ -981,7 +956,10 @@ func (bc *BlockChain) stopWithoutSaving() {
if !bc.stopping.CompareAndSwap(false, true) {
return
}

// Signal shutdown tx indexer.
if bc.txIndexer != nil {
bc.txIndexer.close()
}
// Unsubscribe all subscriptions registered from blockchain.
bc.scope.Close()

Expand Down Expand Up @@ -2403,148 +2381,6 @@ func (bc *BlockChain) skipBlock(err error, it *insertIterator) bool {
return false
}

// indexBlocks reindexes or unindexes transactions depending on user configuration
func (bc *BlockChain) indexBlocks(tail *uint64, head uint64, done chan struct{}) {
defer func() { close(done) }()

// If head is 0, it means the chain is just initialized and no blocks are
// inserted, so don't need to index anything.
if head == 0 {
return
}
// The tail flag is not existent, it means the node is just initialized
// and all blocks in the chain (part of them may from ancient store) are
// not indexed yet, index the chain according to the configuration then.
if tail == nil {
from := uint64(0)
if bc.txLookupLimit != 0 && head >= bc.txLookupLimit {
from = head - bc.txLookupLimit + 1
}
rawdb.IndexTransactions(bc.db, from, head+1, bc.quit, true)
return
}
// The tail flag is existent (which means indexes in [tail, head] should be
// present), while the whole chain are requested for indexing.
if bc.txLookupLimit == 0 || head < bc.txLookupLimit {
if *tail > 0 {
// It can happen when chain is rewound to a historical point which
// is even lower than the indexes tail, recap the indexing target
// to new head to avoid reading non-existent block bodies.
end := *tail
if end > head+1 {
end = head + 1
}
rawdb.IndexTransactions(bc.db, 0, end, bc.quit, true)
}
return
}
// The tail flag is existent, adjust the index range according to configuration
// and latest head.
if head-bc.txLookupLimit+1 < *tail {
// Reindex a part of missing indices and rewind index tail to HEAD-limit
rawdb.IndexTransactions(bc.db, head-bc.txLookupLimit+1, *tail, bc.quit, true)
} else {
// Unindex a part of stale indices and forward index tail to HEAD-limit
rawdb.UnindexTransactions(bc.db, *tail, head-bc.txLookupLimit+1, bc.quit, false)
}
}

// reportTxIndexProgress returns the tx indexing progress.
func (bc *BlockChain) reportTxIndexProgress(head uint64) TxIndexProgress {
var (
remaining uint64
tail = rawdb.ReadTxIndexTail(bc.db)
)
total := bc.txLookupLimit
if bc.txLookupLimit == 0 {
total = head + 1 // genesis included
}
var indexed uint64
if tail != nil {
indexed = head - *tail + 1
}
// The value of indexed might be larger than total if some blocks need
// to be unindexed, avoiding a negative remaining.
if indexed < total {
remaining = total - indexed
}
return TxIndexProgress{
Indexed: indexed,
Remaining: remaining,
}
}

// TxIndexProgress retrieves the tx indexing progress, or an error if the
// background tx indexer is not activated or already stopped.
func (bc *BlockChain) TxIndexProgress() (TxIndexProgress, error) {
if !bc.txIndexRunning {
return TxIndexProgress{}, errors.New("tx indexer is not activated")
}
ch := make(chan TxIndexProgress, 1)
select {
case bc.txIndexProgCh <- ch:
return <-ch, nil
case <-bc.quit:
return TxIndexProgress{}, errors.New("blockchain is closed")
}
}

// maintainTxIndex is responsible for the construction and deletion of the
// transaction index.
//
// User can use flag `txlookuplimit` to specify a "recentness" block, below
// which ancient tx indices get deleted. If `txlookuplimit` is 0, it means
// all tx indices will be reserved.
//
// The user can adjust the txlookuplimit value for each launch after sync,
// Geth will automatically construct the missing indices or delete the extra
// indices.
func (bc *BlockChain) maintainTxIndex() {
defer bc.wg.Done()

// Listening to chain events and manipulate the transaction indexes.
var (
done chan struct{} // Non-nil if background unindexing or reindexing routine is active.
lastHead uint64 // The latest announced chain head (whose tx indexes are assumed created)
headCh = make(chan ChainHeadEvent, 1) // Buffered to avoid locking up the event feed
)
sub := bc.SubscribeChainHeadEvent(headCh)
if sub == nil {
return
}
defer sub.Unsubscribe()
log.Info("Initialized transaction indexer", "limit", bc.TxLookupLimit())

// Launch the initial processing if chain is not empty (head != genesis).
// This step is useful in these scenarios that chain has no progress and
// indexer is never triggered.
if head := rawdb.ReadHeadBlock(bc.db); head != nil && head.Number().Uint64() != 0 {
done = make(chan struct{})
lastHead = head.Number().Uint64()
go bc.indexBlocks(rawdb.ReadTxIndexTail(bc.db), head.NumberU64(), done)
}
for {
select {
case head := <-headCh:
if done == nil {
done = make(chan struct{})
go bc.indexBlocks(rawdb.ReadTxIndexTail(bc.db), head.Block.NumberU64(), done)
}
lastHead = head.Block.NumberU64()
case <-done:
done = nil
case ch := <-bc.txIndexProgCh:
ch <- bc.reportTxIndexProgress(lastHead)
case <-bc.quit:
if done != nil {
log.Info("Waiting background transaction indexer to exit")
<-done
}
return
}
}
}

// reportBlock logs a bad block error.
func (bc *BlockChain) reportBlock(block *types.Block, receipts types.Receipts, err error) {
rawdb.WriteBadBlock(bc.db, block)
Expand Down
16 changes: 6 additions & 10 deletions core/blockchain_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,16 +397,12 @@ func (bc *BlockChain) GetVMConfig() *vm.Config {
return &bc.vmConfig
}

// SetTxLookupLimit is responsible for updating the txlookup limit to the
// original one stored in db if the new mismatches with the old one.
func (bc *BlockChain) SetTxLookupLimit(limit uint64) {
bc.txLookupLimit = limit
}

// TxLookupLimit retrieves the txlookup limit used by blockchain to prune
// stale transaction indices.
func (bc *BlockChain) TxLookupLimit() uint64 {
return bc.txLookupLimit
// TxIndexProgress returns the transaction indexing progress.
func (bc *BlockChain) TxIndexProgress() (TxIndexProgress, error) {
if bc.txIndexer == nil {
return TxIndexProgress{}, errors.New("tx indexer is not enabled")
}
return bc.txIndexer.txIndexProgress()
}

// TrieDB retrieves the low level trie database used for data storage.
Expand Down
Loading
Loading