Skip to content

Commit

Permalink
eliminate business logic from IndexRange (#13459)
Browse files Browse the repository at this point in the history
- respond to
[this](#13444 (comment))
comment
- benchmark results (pr
[here](https://github.com/erigontech/erigon/pull/13602/files))
```
main
BenchmarkAggregator_BeginFilesRo/begin_files_ro-16 1490890 850.0 ns/op 3280 B/op 22 allocs/op


aggr12 branch
BenchmarkAggregator_BeginFilesRo/begin_files_ro-16 1688707 712.3 ns/op 3216 B/op 21 allocs/op
```
  • Loading branch information
sudeepdino008 authored Jan 28, 2025
1 parent 84da60f commit c8d89ee
Showing 9 changed files with 60 additions and 60 deletions.
26 changes: 10 additions & 16 deletions erigon-lib/state/aggregator.go
Original file line number Diff line number Diff line change
@@ -1557,24 +1557,18 @@ func (ac *AggregatorRoTx) HistoryStartFrom(domainName kv.Domain) uint64 {
}

func (ac *AggregatorRoTx) IndexRange(name kv.InvertedIdx, k []byte, fromTs, toTs int, asc order.By, limit int, tx kv.Tx) (timestamps stream.U64, err error) {
switch name {
case kv.AccountsHistoryIdx:
return ac.d[kv.AccountsDomain].ht.IdxRange(k, fromTs, toTs, asc, limit, tx)
case kv.StorageHistoryIdx:
return ac.d[kv.StorageDomain].ht.IdxRange(k, fromTs, toTs, asc, limit, tx)
case kv.CodeHistoryIdx:
return ac.d[kv.CodeDomain].ht.IdxRange(k, fromTs, toTs, asc, limit, tx)
case kv.CommitmentHistoryIdx:
return ac.d[kv.StorageDomain].ht.IdxRange(k, fromTs, toTs, asc, limit, tx)
case kv.ReceiptHistoryIdx:
return ac.d[kv.ReceiptDomain].ht.IdxRange(k, fromTs, toTs, asc, limit, tx)
default:
// check the ii
if ii := ac.searchII(name); ii != nil {
return ii.IdxRange(k, fromTs, toTs, asc, limit, tx)
// check domain iis
for _, d := range ac.d {
if d.d.historyIdx == name {
return d.ht.IdxRange(k, fromTs, toTs, asc, limit, tx)
}
return nil, fmt.Errorf("unexpected history name: %s", name)
}

// check the ii
if ii := ac.searchII(name); ii != nil {
return ii.IdxRange(k, fromTs, toTs, asc, limit, tx)
}
return nil, fmt.Errorf("unexpected history name: %s", name)
}

// -- range end
31 changes: 18 additions & 13 deletions erigon-lib/state/aggregator2.go
Original file line number Diff line number Diff line change
@@ -60,7 +60,7 @@ var dbgCommBtIndex = dbg.EnvBool("AGG_COMMITMENT_BT", false)
func init() {
if dbgCommBtIndex {
cfg := Schema[kv.CommitmentDomain]
cfg.IndexList = AccessorBTree | AccessorExistence
cfg.AccessorList = AccessorBTree | AccessorExistence
Schema[kv.CommitmentDomain] = cfg
}
}
@@ -69,7 +69,7 @@ var Schema = map[kv.Domain]domainCfg{
kv.AccountsDomain: {
name: kv.AccountsDomain, valuesTable: kv.TblAccountVals,

IndexList: AccessorBTree | AccessorExistence,
AccessorList: AccessorBTree | AccessorExistence,
crossDomainIntegrity: domainIntegrityCheck,
Compression: seg.CompressNone,
CompressCfg: DomainCompressCfg,
@@ -80,6 +80,7 @@ var Schema = map[kv.Domain]domainCfg{

historyLargeValues: false,
filenameBase: kv.AccountsDomain.String(), //TODO: looks redundant
historyIdx: kv.AccountsHistoryIdx,

iiCfg: iiCfg{
keysTable: kv.TblAccountHistoryKeys, valuesTable: kv.TblAccountIdx,
@@ -91,16 +92,17 @@ var Schema = map[kv.Domain]domainCfg{
kv.StorageDomain: {
name: kv.StorageDomain, valuesTable: kv.TblStorageVals,

IndexList: AccessorBTree | AccessorExistence,
Compression: seg.CompressKeys,
CompressCfg: DomainCompressCfg,
AccessorList: AccessorBTree | AccessorExistence,
Compression: seg.CompressKeys,
CompressCfg: DomainCompressCfg,

hist: histCfg{
valuesTable: kv.TblStorageHistoryVals,
compression: seg.CompressNone,

historyLargeValues: false,
filenameBase: kv.StorageDomain.String(),
historyIdx: kv.StorageHistoryIdx,

iiCfg: iiCfg{
keysTable: kv.TblStorageHistoryKeys, valuesTable: kv.TblStorageIdx,
@@ -112,17 +114,18 @@ var Schema = map[kv.Domain]domainCfg{
kv.CodeDomain: {
name: kv.CodeDomain, valuesTable: kv.TblCodeVals,

IndexList: AccessorBTree | AccessorExistence,
Compression: seg.CompressVals, // compress Code with keys doesn't show any profit. compress of values show 4x ratio on eth-mainnet and 2.5x ratio on bor-mainnet
CompressCfg: DomainCompressCfg,
largeValues: true,
AccessorList: AccessorBTree | AccessorExistence,
Compression: seg.CompressVals, // compress Code with keys doesn't show any profit. compress of values show 4x ratio on eth-mainnet and 2.5x ratio on bor-mainnet
CompressCfg: DomainCompressCfg,
largeValues: true,

hist: histCfg{
valuesTable: kv.TblCodeHistoryVals,
compression: seg.CompressKeys | seg.CompressVals,

historyLargeValues: true,
filenameBase: kv.CodeDomain.String(),
historyIdx: kv.CodeHistoryIdx,

iiCfg: iiCfg{
withExistence: false, compressorCfg: seg.DefaultCfg,
@@ -134,7 +137,7 @@ var Schema = map[kv.Domain]domainCfg{
kv.CommitmentDomain: {
name: kv.CommitmentDomain, valuesTable: kv.TblCommitmentVals,

IndexList: AccessorHashMap,
AccessorList: AccessorHashMap,
Compression: seg.CompressKeys,
CompressCfg: DomainCompressCfg,
replaceKeysInValues: AggregatorSqueezeCommitmentValues,
@@ -146,6 +149,7 @@ var Schema = map[kv.Domain]domainCfg{
snapshotsDisabled: true,
historyLargeValues: false,
filenameBase: kv.CommitmentDomain.String(),
historyIdx: kv.CommitmentHistoryIdx,

iiCfg: iiCfg{
keysTable: kv.TblCommitmentHistoryKeys, valuesTable: kv.TblCommitmentIdx,
@@ -157,16 +161,17 @@ var Schema = map[kv.Domain]domainCfg{
kv.ReceiptDomain: {
name: kv.ReceiptDomain, valuesTable: kv.TblReceiptVals,

IndexList: AccessorBTree | AccessorExistence,
Compression: seg.CompressNone, //seg.CompressKeys | seg.CompressVals,
CompressCfg: DomainCompressCfg,
AccessorList: AccessorBTree | AccessorExistence,
Compression: seg.CompressNone, //seg.CompressKeys | seg.CompressVals,
CompressCfg: DomainCompressCfg,

hist: histCfg{
valuesTable: kv.TblReceiptHistoryVals,
compression: seg.CompressNone,

historyLargeValues: false,
filenameBase: kv.ReceiptDomain.String(),
historyIdx: kv.ReceiptHistoryIdx,

iiCfg: iiCfg{
keysTable: kv.TblReceiptHistoryKeys, valuesTable: kv.TblReceiptIdx,
2 changes: 1 addition & 1 deletion erigon-lib/state/aggregator_test.go
Original file line number Diff line number Diff line change
@@ -1274,7 +1274,7 @@ func TestAggregator_RebuildCommitmentBasedOnFiles(t *testing.T) {
fnames := []string{}
for _, f := range ac.d[kv.CommitmentDomain].files {
var k, stateVal []byte
if ac.d[kv.CommitmentDomain].d.IndexList&AccessorHashMap != 0 {
if ac.d[kv.CommitmentDomain].d.AccessorList&AccessorHashMap != 0 {
idx := f.src.index.GetReaderFromPool()
r := seg.NewReader(f.src.decompressor.MakeGetter(), compression)

42 changes: 21 additions & 21 deletions erigon-lib/state/domain.go
Original file line number Diff line number Diff line change
@@ -94,12 +94,12 @@ type Domain struct {
type domainCfg struct {
hist histCfg

name kv.Domain
Compression seg.FileCompression
CompressCfg seg.Cfg
IndexList Accessors // list of indexes for given domain
valuesTable string // bucket to store domain values; key -> inverted_step + values (Dupsort)
largeValues bool
name kv.Domain
Compression seg.FileCompression
CompressCfg seg.Cfg
AccessorList Accessors // list of indexes for given domain
valuesTable string // bucket to store domain values; key -> inverted_step + values (Dupsort)
largeValues bool

crossDomainIntegrity rangeDomainIntegrityChecker

@@ -358,7 +358,7 @@ func (d *Domain) openDirtyFiles() (err error) {
}
}

if item.index == nil && d.IndexList&AccessorHashMap != 0 {
if item.index == nil && d.AccessorList&AccessorHashMap != 0 {
fPath := d.kvAccessorFilePath(fromStep, toStep)
exists, err := dir.FileExist(fPath)
if err != nil {
@@ -373,7 +373,7 @@ func (d *Domain) openDirtyFiles() (err error) {
}
}
}
if item.bindex == nil && d.IndexList&AccessorBTree != 0 {
if item.bindex == nil && d.AccessorList&AccessorBTree != 0 {
fPath := d.kvBtFilePath(fromStep, toStep)
exists, err := dir.FileExist(fPath)
if err != nil {
@@ -392,7 +392,7 @@ func (d *Domain) openDirtyFiles() (err error) {
}
}
}
if item.existence == nil && d.IndexList&AccessorExistence != 0 {
if item.existence == nil && d.AccessorList&AccessorExistence != 0 {
fPath := d.kvExistenceIdxFilePath(fromStep, toStep)
exists, err := dir.FileExist(fPath)
if err != nil {
@@ -443,7 +443,7 @@ func (d *Domain) closeWhatNotInList(fNames []string) {
}

func (d *Domain) reCalcVisibleFiles(toTxNum uint64) {
d._visible = newDomainVisible(d.name, calcVisibleFiles(d.dirtyFiles, d.IndexList, false, toTxNum))
d._visible = newDomainVisible(d.name, calcVisibleFiles(d.dirtyFiles, d.AccessorList, false, toTxNum))
d.History.reCalcVisibleFiles(toTxNum)
}

@@ -674,15 +674,15 @@ func (dt *DomainRoTx) getLatestFromFile(i int, filekey []byte) (v []byte, ok boo
}

g := dt.statelessGetter(i)
if dt.d.IndexList&AccessorBTree != 0 {
if dt.d.AccessorList&AccessorBTree != 0 {
_, v, offset, ok, err = dt.statelessBtree(i).Get(filekey, g)
if err != nil || !ok {
return nil, false, 0, err
}
//fmt.Printf("getLatestFromBtreeColdFiles key %x shard %d %x\n", filekey, exactColdShard, v)
return v, true, offset, nil
}
if dt.d.IndexList&AccessorHashMap != 0 {
if dt.d.AccessorList&AccessorHashMap != 0 {
reader := dt.statelessIdxReader(i)
if reader.Empty() {
return nil, false, 0, nil
@@ -1051,7 +1051,7 @@ func (d *Domain) buildFileRange(ctx context.Context, stepFrom, stepTo uint64, co
return StaticFiles{}, fmt.Errorf("open %s values decompressor: %w", d.filenameBase, err)
}

if d.IndexList&AccessorHashMap != 0 {
if d.AccessorList&AccessorHashMap != 0 {
if err = d.buildAccessor(ctx, stepFrom, stepTo, valuesDecomp, ps); err != nil {
return StaticFiles{}, fmt.Errorf("build %s values idx: %w", d.filenameBase, err)
}
@@ -1061,7 +1061,7 @@ func (d *Domain) buildFileRange(ctx context.Context, stepFrom, stepTo uint64, co
}
}

if d.IndexList&AccessorBTree != 0 {
if d.AccessorList&AccessorBTree != 0 {
btPath := d.kvBtFilePath(stepFrom, stepTo)
btM := DefaultBtreeM
if stepFrom == 0 && d.filenameBase == "commitment" {
@@ -1073,7 +1073,7 @@ func (d *Domain) buildFileRange(ctx context.Context, stepFrom, stepTo uint64, co
return StaticFiles{}, fmt.Errorf("build %s .bt idx: %w", d.filenameBase, err)
}
}
if d.IndexList&AccessorExistence != 0 {
if d.AccessorList&AccessorExistence != 0 {
fPath := d.kvExistenceIdxFilePath(stepFrom, stepTo)
exists, err := dir.FileExist(fPath)
if err != nil {
@@ -1154,7 +1154,7 @@ func (d *Domain) buildFiles(ctx context.Context, step uint64, collation Collatio
return StaticFiles{}, fmt.Errorf("open %s values decompressor: %w", d.filenameBase, err)
}

if d.IndexList&AccessorHashMap != 0 {
if d.AccessorList&AccessorHashMap != 0 {
if err = d.buildAccessor(ctx, step, step+1, valuesDecomp, ps); err != nil {
return StaticFiles{}, fmt.Errorf("build %s values idx: %w", d.filenameBase, err)
}
@@ -1164,7 +1164,7 @@ func (d *Domain) buildFiles(ctx context.Context, step uint64, collation Collatio
}
}

if d.IndexList&AccessorBTree != 0 {
if d.AccessorList&AccessorBTree != 0 {
btPath := d.kvBtFilePath(step, step+1)
btM := DefaultBtreeM
if step == 0 && d.filenameBase == "commitment" {
@@ -1175,7 +1175,7 @@ func (d *Domain) buildFiles(ctx context.Context, step uint64, collation Collatio
return StaticFiles{}, fmt.Errorf("build %s .bt idx: %w", d.filenameBase, err)
}
}
if d.IndexList&AccessorExistence != 0 {
if d.AccessorList&AccessorExistence != 0 {
fPath := d.kvExistenceIdxFilePath(step, step+1)
exists, err := dir.FileExist(fPath)
if err != nil {
@@ -1215,7 +1215,7 @@ func (d *Domain) buildAccessor(ctx context.Context, fromStep, toStep uint64, dat
}

func (d *Domain) missedBtreeAccessors() (l []*filesItem) {
if !d.IndexList.Has(AccessorBTree) {
if !d.AccessorList.Has(AccessorBTree) {
return nil
}
return fileItemsWithMissingAccessors(d.dirtyFiles, d.aggregationStep, func(fromStep uint64, toStep uint64) []string {
@@ -1224,7 +1224,7 @@ func (d *Domain) missedBtreeAccessors() (l []*filesItem) {
}

func (d *Domain) missedMapAccessors() (l []*filesItem) {
if !d.IndexList.Has(AccessorHashMap) {
if !d.AccessorList.Has(AccessorHashMap) {
return nil
}
return fileItemsWithMissingAccessors(d.dirtyFiles, d.aggregationStep, func(fromStep uint64, toStep uint64) []string {
@@ -1418,7 +1418,7 @@ func (dt *DomainRoTx) getFromFiles(filekey []byte, maxTxNum uint64) (v []byte, f
if maxTxNum == 0 {
maxTxNum = math.MaxUint64
}
useExistenceFilter := dt.d.IndexList&AccessorExistence != 0
useExistenceFilter := dt.d.AccessorList&AccessorExistence != 0
useCache := dt.name != kv.CommitmentDomain && maxTxNum == math.MaxUint64

hi, _ := dt.ht.iit.hashKey(filekey)
4 changes: 2 additions & 2 deletions erigon-lib/state/domain_committed.go
Original file line number Diff line number Diff line change
@@ -110,7 +110,7 @@ func (dt *DomainRoTx) findShortenedKey(fullKey []byte, itemGetter *seg.Reader, i
// }
//}

if dt.d.IndexList&AccessorHashMap != 0 {
if dt.d.AccessorList&AccessorHashMap != 0 {
reader := recsplit.NewIndexReader(item.index)
defer reader.Close()

@@ -135,7 +135,7 @@ func (dt *DomainRoTx) findShortenedKey(fullKey []byte, itemGetter *seg.Reader, i
}
return encodeShorterKey(nil, offset), true
}
if dt.d.IndexList&AccessorBTree != 0 {
if dt.d.AccessorList&AccessorBTree != 0 {
if item.bindex == nil {
dt.d.logger.Warn("[agg] commitment branch key replacement: file doesn't have index", "name", item.decompressor.FileName())
}
2 changes: 1 addition & 1 deletion erigon-lib/state/domain_shared.go
Original file line number Diff line number Diff line change
@@ -817,7 +817,7 @@ func (sd *SharedDomains) IterateStoragePrefix(prefix []byte, it func(k []byte, v
}
}
case FILE_CURSOR:
indexList := sd.aggTx.d[kv.StorageDomain].d.IndexList
indexList := sd.aggTx.d[kv.StorageDomain].d.AccessorList
if indexList&AccessorBTree != 0 {
if ci1.btCursor.Next() {
ci1.key = ci1.btCursor.Key()
1 change: 1 addition & 0 deletions erigon-lib/state/history.go
Original file line number Diff line number Diff line change
@@ -99,6 +99,7 @@ type histCfg struct {
indexList Accessors
compressorCfg seg.Cfg // compression settings for history files
compression seg.FileCompression // defines type of compression for history files
historyIdx kv.InvertedIdx

//TODO: re-visit this check - maybe we don't need it. It's about kill in the middle of merge
integrity rangeIntegrityChecker
6 changes: 3 additions & 3 deletions erigon-lib/state/merge.go
Original file line number Diff line number Diff line change
@@ -515,7 +515,7 @@ func (dt *DomainRoTx) mergeFiles(ctx context.Context, domainFiles, indexFiles, h
return nil, nil, nil, fmt.Errorf("merge %s decompressor [%d-%d]: %w", dt.d.filenameBase, r.values.from, r.values.to, err)
}

if dt.d.IndexList&AccessorBTree != 0 {
if dt.d.AccessorList&AccessorBTree != 0 {
btPath := dt.d.kvBtFilePath(fromStep, toStep)
btM := DefaultBtreeM
if toStep == 0 && dt.d.filenameBase == "commitment" {
@@ -526,7 +526,7 @@ func (dt *DomainRoTx) mergeFiles(ctx context.Context, domainFiles, indexFiles, h
return nil, nil, nil, fmt.Errorf("merge %s btindex [%d-%d]: %w", dt.d.filenameBase, r.values.from, r.values.to, err)
}
}
if dt.d.IndexList&AccessorHashMap != 0 {
if dt.d.AccessorList&AccessorHashMap != 0 {
if err = dt.d.buildAccessor(ctx, fromStep, toStep, valuesIn.decompressor, ps); err != nil {
return nil, nil, nil, fmt.Errorf("merge %s buildAccessor [%d-%d]: %w", dt.d.filenameBase, r.values.from, r.values.to, err)
}
@@ -535,7 +535,7 @@ func (dt *DomainRoTx) mergeFiles(ctx context.Context, domainFiles, indexFiles, h
}
}

if dt.d.IndexList&AccessorExistence != 0 {
if dt.d.AccessorList&AccessorExistence != 0 {
bloomIndexPath := dt.d.kvExistenceIdxFilePath(fromStep, toStep)
exists, err := dir.FileExist(bloomIndexPath)
if err != nil {
6 changes: 3 additions & 3 deletions turbo/app/snapshots_cmd.go
Original file line number Diff line number Diff line change
@@ -683,7 +683,7 @@ func checkIfStateSnapshotsPublishable(dirs datadir.Dirs) error {
return fmt.Errorf("missing file %s at path %s", expectedFileName, filepath.Join(dirs.SnapDomain, expectedFileName))
}
// check that the index file exist
if libstate.Schema[snapType].IndexList.Has(libstate.AccessorBTree) {
if libstate.Schema[snapType].AccessorList.Has(libstate.AccessorBTree) {
fileName := strings.Replace(expectedFileName, ".kv", ".bt", 1)
exists, err := dir.FileExist(filepath.Join(dirs.SnapDomain, fileName))
if err != nil {
@@ -693,7 +693,7 @@ func checkIfStateSnapshotsPublishable(dirs datadir.Dirs) error {
return fmt.Errorf("missing file %s", fileName)
}
}
if libstate.Schema[snapType].IndexList.Has(libstate.AccessorExistence) {
if libstate.Schema[snapType].AccessorList.Has(libstate.AccessorExistence) {
fileName := strings.Replace(expectedFileName, ".kv", ".kvei", 1)
exists, err := dir.FileExist(filepath.Join(dirs.SnapDomain, fileName))
if err != nil {
@@ -703,7 +703,7 @@ func checkIfStateSnapshotsPublishable(dirs datadir.Dirs) error {
return fmt.Errorf("missing file %s", fileName)
}
}
if libstate.Schema[snapType].IndexList.Has(libstate.AccessorHashMap) {
if libstate.Schema[snapType].AccessorList.Has(libstate.AccessorHashMap) {
fileName := strings.Replace(expectedFileName, ".kv", ".kvi", 1)
exists, err := dir.FileExist(filepath.Join(dirs.SnapDomain, fileName))
if err != nil {

0 comments on commit c8d89ee

Please sign in to comment.