Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

.kvi support in tooling #13293

Merged
merged 5 commits into from
Jan 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion erigon-lib/downloader/snaptype/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func SeedableV3Extensions() []string {
}

func AllV3Extensions() []string {
return []string{".kv", ".v", ".ef", ".kvei", ".vi", ".efi", ".bt"}
return []string{".kv", ".v", ".ef", ".kvei", ".vi", ".efi", ".bt", ".kvi"}
}

func IsSeedableExtension(name string) bool {
Expand Down
4 changes: 2 additions & 2 deletions erigon-lib/seg/seg_auto_rw.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func DetectCompressType(getter *Getter) (compressed FileCompression) {
getter.Reset(0)
for i := 0; i < 100; i++ {
if getter.HasNext() {
_, _ = getter.NextUncompressed()
_, _ = getter.SkipUncompressed()
}
if getter.HasNext() {
_, _ = getter.Skip()
Expand All @@ -196,7 +196,7 @@ func DetectCompressType(getter *Getter) (compressed FileCompression) {
_, _ = getter.Skip()
}
if getter.HasNext() {
_, _ = getter.NextUncompressed()
_, _ = getter.SkipUncompressed()
}
}
return compressed
Expand Down
12 changes: 6 additions & 6 deletions erigon-lib/state/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ var dbgCommBtIndex = dbg.EnvBool("AGG_COMMITMENT_BT", false)
func init() {
if dbgCommBtIndex {
cfg := Schema[kv.CommitmentDomain]
cfg.indexList = withBTree | withExistence
cfg.IndexList = AccessorBTree | AccessorExistence
Schema[kv.CommitmentDomain] = cfg
}
}
Expand All @@ -140,7 +140,7 @@ var Schema = map[kv.Domain]domainCfg{
kv.AccountsDomain: {
name: kv.AccountsDomain, valuesTable: kv.TblAccountVals,

indexList: withBTree | withExistence,
IndexList: AccessorBTree | AccessorExistence,
crossDomainIntegrity: domainIntegrityCheck,
compression: seg.CompressNone,
compressCfg: DomainCompressCfg,
Expand All @@ -162,7 +162,7 @@ var Schema = map[kv.Domain]domainCfg{
kv.StorageDomain: {
name: kv.StorageDomain, valuesTable: kv.TblStorageVals,

indexList: withBTree | withExistence,
IndexList: AccessorBTree | AccessorExistence,
compression: seg.CompressKeys,
compressCfg: DomainCompressCfg,

Expand All @@ -183,7 +183,7 @@ var Schema = map[kv.Domain]domainCfg{
kv.CodeDomain: {
name: kv.CodeDomain, valuesTable: kv.TblCodeVals,

indexList: withBTree | withExistence,
IndexList: AccessorBTree | AccessorExistence,
compression: seg.CompressVals, // compress Code with keys doesn't show any profit. compress of values show 4x ratio on eth-mainnet and 2.5x ratio on bor-mainnet
compressCfg: DomainCompressCfg,
largeValues: true,
Expand All @@ -205,7 +205,7 @@ var Schema = map[kv.Domain]domainCfg{
kv.CommitmentDomain: {
name: kv.CommitmentDomain, valuesTable: kv.TblCommitmentVals,

indexList: withHashMap,
IndexList: AccessorHashMap,
compression: seg.CompressKeys,
compressCfg: DomainCompressCfg,

Expand All @@ -227,7 +227,7 @@ var Schema = map[kv.Domain]domainCfg{
kv.ReceiptDomain: {
name: kv.ReceiptDomain, valuesTable: kv.TblReceiptVals,

indexList: withBTree | withExistence,
IndexList: AccessorBTree | AccessorExistence,
compression: seg.CompressNone, //seg.CompressKeys | seg.CompressVals,
compressCfg: DomainCompressCfg,

Expand Down
2 changes: 1 addition & 1 deletion erigon-lib/state/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1274,7 +1274,7 @@ func TestAggregator_RebuildCommitmentBasedOnFiles(t *testing.T) {
fnames := []string{}
for _, f := range ac.d[kv.CommitmentDomain].files {
var k, stateVal []byte
if ac.d[kv.CommitmentDomain].d.indexList&withHashMap != 0 {
if ac.d[kv.CommitmentDomain].d.IndexList&AccessorHashMap != 0 {
idx := f.src.index.GetReaderFromPool()
r := seg.NewReader(f.src.decompressor.MakeGetter(), compression)

Expand Down
34 changes: 17 additions & 17 deletions erigon-lib/state/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ type domainCfg struct {
name kv.Domain
compression seg.FileCompression
compressCfg seg.Cfg
indexList idxList // list of indexes for given domain
valuesTable string // bucket to store domain values; key -> inverted_step + values (Dupsort)
IndexList Accessors // list of indexes for given domain
valuesTable string // bucket to store domain values; key -> inverted_step + values (Dupsort)
largeValues bool

crossDomainIntegrity rangeDomainIntegrityChecker
Expand Down Expand Up @@ -358,7 +358,7 @@ func (d *Domain) openDirtyFiles() (err error) {
}
}

if item.index == nil && d.indexList&withHashMap != 0 {
if item.index == nil && d.IndexList&AccessorHashMap != 0 {
fPath := d.kvAccessorFilePath(fromStep, toStep)
exists, err := dir.FileExist(fPath)
if err != nil {
Expand All @@ -373,7 +373,7 @@ func (d *Domain) openDirtyFiles() (err error) {
}
}
}
if item.bindex == nil && d.indexList&withBTree != 0 {
if item.bindex == nil && d.IndexList&AccessorBTree != 0 {
fPath := d.kvBtFilePath(fromStep, toStep)
exists, err := dir.FileExist(fPath)
if err != nil {
Expand All @@ -392,7 +392,7 @@ func (d *Domain) openDirtyFiles() (err error) {
}
}
}
if item.existence == nil && d.indexList&withExistence != 0 {
if item.existence == nil && d.IndexList&AccessorExistence != 0 {
fPath := d.kvExistenceIdxFilePath(fromStep, toStep)
exists, err := dir.FileExist(fPath)
if err != nil {
Expand Down Expand Up @@ -443,7 +443,7 @@ func (d *Domain) closeWhatNotInList(fNames []string) {
}

func (d *Domain) reCalcVisibleFiles(toTxNum uint64) {
d._visible = newDomainVisible(d.name, calcVisibleFiles(d.dirtyFiles, d.indexList, false, toTxNum))
d._visible = newDomainVisible(d.name, calcVisibleFiles(d.dirtyFiles, d.IndexList, false, toTxNum))
d.History.reCalcVisibleFiles(toTxNum)
}

Expand Down Expand Up @@ -674,15 +674,15 @@ func (dt *DomainRoTx) getLatestFromFile(i int, filekey []byte) (v []byte, ok boo
}

g := dt.statelessGetter(i)
if dt.d.indexList&withBTree != 0 {
if dt.d.IndexList&AccessorBTree != 0 {
_, v, offset, ok, err = dt.statelessBtree(i).Get(filekey, g)
if err != nil || !ok {
return nil, false, 0, err
}
//fmt.Printf("getLatestFromBtreeColdFiles key %x shard %d %x\n", filekey, exactColdShard, v)
return v, true, offset, nil
}
if dt.d.indexList&withHashMap != 0 {
if dt.d.IndexList&AccessorHashMap != 0 {
reader := dt.statelessIdxReader(i)
if reader.Empty() {
return nil, false, 0, nil
Expand Down Expand Up @@ -1051,7 +1051,7 @@ func (d *Domain) buildFileRange(ctx context.Context, stepFrom, stepTo uint64, co
return StaticFiles{}, fmt.Errorf("open %s values decompressor: %w", d.filenameBase, err)
}

if d.indexList&withHashMap != 0 {
if d.IndexList&AccessorHashMap != 0 {
if err = d.buildAccessor(ctx, stepFrom, stepTo, valuesDecomp, ps); err != nil {
return StaticFiles{}, fmt.Errorf("build %s values idx: %w", d.filenameBase, err)
}
Expand All @@ -1061,7 +1061,7 @@ func (d *Domain) buildFileRange(ctx context.Context, stepFrom, stepTo uint64, co
}
}

if d.indexList&withBTree != 0 {
if d.IndexList&AccessorBTree != 0 {
btPath := d.kvBtFilePath(stepFrom, stepTo)
btM := DefaultBtreeM
if stepFrom == 0 && d.filenameBase == "commitment" {
Expand All @@ -1073,7 +1073,7 @@ func (d *Domain) buildFileRange(ctx context.Context, stepFrom, stepTo uint64, co
return StaticFiles{}, fmt.Errorf("build %s .bt idx: %w", d.filenameBase, err)
}
}
if d.indexList&withExistence != 0 {
if d.IndexList&AccessorExistence != 0 {
fPath := d.kvExistenceIdxFilePath(stepFrom, stepTo)
exists, err := dir.FileExist(fPath)
if err != nil {
Expand Down Expand Up @@ -1154,7 +1154,7 @@ func (d *Domain) buildFiles(ctx context.Context, step uint64, collation Collatio
return StaticFiles{}, fmt.Errorf("open %s values decompressor: %w", d.filenameBase, err)
}

if d.indexList&withHashMap != 0 {
if d.IndexList&AccessorHashMap != 0 {
if err = d.buildAccessor(ctx, step, step+1, valuesDecomp, ps); err != nil {
return StaticFiles{}, fmt.Errorf("build %s values idx: %w", d.filenameBase, err)
}
Expand All @@ -1164,7 +1164,7 @@ func (d *Domain) buildFiles(ctx context.Context, step uint64, collation Collatio
}
}

if d.indexList&withBTree != 0 {
if d.IndexList&AccessorBTree != 0 {
btPath := d.kvBtFilePath(step, step+1)
btM := DefaultBtreeM
if step == 0 && d.filenameBase == "commitment" {
Expand All @@ -1175,7 +1175,7 @@ func (d *Domain) buildFiles(ctx context.Context, step uint64, collation Collatio
return StaticFiles{}, fmt.Errorf("build %s .bt idx: %w", d.filenameBase, err)
}
}
if d.indexList&withExistence != 0 {
if d.IndexList&AccessorExistence != 0 {
fPath := d.kvExistenceIdxFilePath(step, step+1)
exists, err := dir.FileExist(fPath)
if err != nil {
Expand Down Expand Up @@ -1263,7 +1263,7 @@ func (d *Domain) missedAccessors() (l []*filesItem) {
func (d *Domain) BuildMissedAccessors(ctx context.Context, g *errgroup.Group, ps *background.ProgressSet) {
d.History.BuildMissedAccessors(ctx, g, ps)
for _, item := range d.missedBtreeAccessors() {
if d.indexList&withBTree == 0 {
if d.IndexList&AccessorBTree == 0 {
continue
}
if item.decompressor == nil {
Expand All @@ -1281,7 +1281,7 @@ func (d *Domain) BuildMissedAccessors(ctx context.Context, g *errgroup.Group, ps
})
}
for _, item := range d.missedAccessors() {
if d.indexList&withHashMap == 0 {
if d.IndexList&AccessorHashMap == 0 {
continue
}
if item.decompressor == nil {
Expand Down Expand Up @@ -1451,7 +1451,7 @@ func (dt *DomainRoTx) getFromFiles(filekey []byte, maxTxNum uint64) (v []byte, f
if maxTxNum == 0 {
maxTxNum = math.MaxUint64
}
useExistenceFilter := dt.d.indexList&withExistence != 0
useExistenceFilter := dt.d.IndexList&AccessorExistence != 0
useCache := dt.name != kv.CommitmentDomain && maxTxNum == math.MaxUint64

hi, _ := dt.ht.iit.hashKey(filekey)
Expand Down
4 changes: 2 additions & 2 deletions erigon-lib/state/domain_committed.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (dt *DomainRoTx) findShortenedKey(fullKey []byte, itemGetter *seg.Reader, i
// }
//}

if dt.d.indexList&withHashMap != 0 {
if dt.d.IndexList&AccessorHashMap != 0 {
reader := recsplit.NewIndexReader(item.index)
defer reader.Close()

Expand All @@ -135,7 +135,7 @@ func (dt *DomainRoTx) findShortenedKey(fullKey []byte, itemGetter *seg.Reader, i
}
return encodeShorterKey(nil, offset), true
}
if dt.d.indexList&withBTree != 0 {
if dt.d.IndexList&AccessorBTree != 0 {
if item.bindex == nil {
dt.d.logger.Warn("[agg] commitment branch key replacement: file doesn't have index", "name", item.decompressor.FileName())
}
Expand Down
6 changes: 3 additions & 3 deletions erigon-lib/state/domain_shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,8 +818,8 @@ func (sd *SharedDomains) IterateStoragePrefix(prefix []byte, it func(k []byte, v
}
}
case FILE_CURSOR:
indexList := sd.aggTx.d[kv.StorageDomain].d.indexList
if indexList&withBTree != 0 {
indexList := sd.aggTx.d[kv.StorageDomain].d.IndexList
if indexList&AccessorBTree != 0 {
if ci1.btCursor.Next() {
ci1.key = ci1.btCursor.Key()
if ci1.key != nil && bytes.HasPrefix(ci1.key, prefix) {
Expand All @@ -830,7 +830,7 @@ func (sd *SharedDomains) IterateStoragePrefix(prefix []byte, it func(k []byte, v
ci1.btCursor.Close()
}
}
if indexList&withHashMap != 0 {
if indexList&AccessorHashMap != 0 {
ci1.dg.Reset(ci1.latestOffset)
if !ci1.dg.HasNext() {
break
Expand Down
8 changes: 4 additions & 4 deletions erigon-lib/state/files_item.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ type visibleFile struct {
func (i *visibleFile) isSubSetOf(j *visibleFile) bool { return i.src.isSubsetOf(j.src) } //nolint
func (i *visibleFile) isSubsetOf(j *visibleFile) bool { return i.src.isSubsetOf(j.src) } //nolint

func calcVisibleFiles(files *btree2.BTreeG[*filesItem], l idxList, trace bool, toTxNum uint64) (roItems []visibleFile) {
func calcVisibleFiles(files *btree2.BTreeG[*filesItem], l Accessors, trace bool, toTxNum uint64) (roItems []visibleFile) {
newVisibleFiles := make([]visibleFile, 0, files.Len())
// trace = true
if trace {
Expand Down Expand Up @@ -271,21 +271,21 @@ func calcVisibleFiles(files *btree2.BTreeG[*filesItem], l idxList, trace bool, t
}
continue
}
if (l&withBTree != 0) && item.bindex == nil {
if (l&AccessorBTree != 0) && item.bindex == nil {
if trace {
log.Warn("[dbg] calcVisibleFiles: BTindex not opened", "f", item.decompressor.FileName())
}
//panic(fmt.Errorf("btindex nil: %s", item.decompressor.FileName()))
continue
}
if (l&withHashMap != 0) && item.index == nil {
if (l&AccessorHashMap != 0) && item.index == nil {
if trace {
log.Warn("[dbg] calcVisibleFiles: RecSplit not opened", "f", item.decompressor.FileName())
}
//panic(fmt.Errorf("index nil: %s", item.decompressor.FileName()))
continue
}
if (l&withExistence != 0) && item.existence == nil {
if (l&AccessorExistence != 0) && item.existence == nil {
if trace {
log.Warn("[dbg] calcVisibleFiles: Existence not opened", "f", item.decompressor.FileName())
}
Expand Down
4 changes: 2 additions & 2 deletions erigon-lib/state/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ type histCfg struct {
snapshotsDisabled bool // don't produce .v and .ef files, keep in db table. old data will be pruned anyway.
historyDisabled bool // skip all write operations to this History (even in DB)

indexList idxList
indexList Accessors
compressorCfg seg.Cfg // compression settings for history files
compression seg.FileCompression // defines type of compression for history files

Expand All @@ -108,7 +108,7 @@ func NewHistory(cfg histCfg, logger log.Logger) (*History, error) {
//if cfg.compressorCfg.MaxDictPatterns == 0 && cfg.compressorCfg.MaxPatternLen == 0 {
cfg.compressorCfg = seg.DefaultCfg
if cfg.indexList == 0 {
cfg.indexList = withHashMap
cfg.indexList = AccessorHashMap
}
if cfg.iiCfg.filenameBase == "" {
cfg.iiCfg.filenameBase = cfg.filenameBase
Expand Down
16 changes: 9 additions & 7 deletions erigon-lib/state/inverted_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ type iiCfg struct {

// external checker for integrity of inverted index ranges
integrity rangeIntegrityChecker
indexList idxList
indexList Accessors
}

type iiVisible struct {
Expand All @@ -111,7 +111,7 @@ func NewInvertedIndex(cfg iiCfg, logger log.Logger) (*InvertedIndex, error) {
//if cfg.compressorCfg.MaxDictPatterns == 0 && cfg.compressorCfg.MaxPatternLen == 0 {
cfg.compressorCfg = seg.DefaultCfg
if cfg.indexList == 0 {
cfg.indexList = withHashMap
cfg.indexList = AccessorHashMap
}

ii := InvertedIndex{
Expand Down Expand Up @@ -200,12 +200,14 @@ func (ii *InvertedIndex) scanDirtyFiles(fileNames []string) {
}
}

type idxList int
type Accessors int

var (
withBTree idxList = 0b1
withHashMap idxList = 0b10
withExistence idxList = 0b100
func (l Accessors) Has(target Accessors) bool { return l&target != 0 }

const (
AccessorBTree Accessors = 0b1
AccessorHashMap Accessors = 0b10
AccessorExistence Accessors = 0b100
)

func (ii *InvertedIndex) reCalcVisibleFiles(toTxNum uint64) {
Expand Down
6 changes: 3 additions & 3 deletions erigon-lib/state/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ func (dt *DomainRoTx) mergeFiles(ctx context.Context, domainFiles, indexFiles, h
return nil, nil, nil, fmt.Errorf("merge %s decompressor [%d-%d]: %w", dt.d.filenameBase, r.values.from, r.values.to, err)
}

if dt.d.indexList&withBTree != 0 {
if dt.d.IndexList&AccessorBTree != 0 {
btPath := dt.d.kvBtFilePath(fromStep, toStep)
btM := DefaultBtreeM
if toStep == 0 && dt.d.filenameBase == "commitment" {
Expand All @@ -526,7 +526,7 @@ func (dt *DomainRoTx) mergeFiles(ctx context.Context, domainFiles, indexFiles, h
return nil, nil, nil, fmt.Errorf("merge %s btindex [%d-%d]: %w", dt.d.filenameBase, r.values.from, r.values.to, err)
}
}
if dt.d.indexList&withHashMap != 0 {
if dt.d.IndexList&AccessorHashMap != 0 {
if err = dt.d.buildAccessor(ctx, fromStep, toStep, valuesIn.decompressor, ps); err != nil {
return nil, nil, nil, fmt.Errorf("merge %s buildAccessor [%d-%d]: %w", dt.d.filenameBase, r.values.from, r.values.to, err)
}
Expand All @@ -535,7 +535,7 @@ func (dt *DomainRoTx) mergeFiles(ctx context.Context, domainFiles, indexFiles, h
}
}

if dt.d.indexList&withExistence != 0 {
if dt.d.IndexList&AccessorExistence != 0 {
bloomIndexPath := dt.d.kvExistenceIdxFilePath(fromStep, toStep)
exists, err := dir.FileExist(bloomIndexPath)
if err != nil {
Expand Down
Loading
Loading