Skip to content

Commit

Permalink
.kvi support in tooling (#13293)
Browse files Browse the repository at this point in the history
- `torrent_create --all`
- seedable files list
  • Loading branch information
AskAlexSharov authored Jan 1, 2025
1 parent 6e1bab7 commit 68694ca
Show file tree
Hide file tree
Showing 14 changed files with 115 additions and 88 deletions.
2 changes: 1 addition & 1 deletion erigon-lib/downloader/snaptype/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func SeedableV3Extensions() []string {
}

func AllV3Extensions() []string {
return []string{".kv", ".v", ".ef", ".kvei", ".vi", ".efi", ".bt"}
return []string{".kv", ".v", ".ef", ".kvei", ".vi", ".efi", ".bt", ".kvi"}
}

func IsSeedableExtension(name string) bool {
Expand Down
4 changes: 2 additions & 2 deletions erigon-lib/seg/seg_auto_rw.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func DetectCompressType(getter *Getter) (compressed FileCompression) {
getter.Reset(0)
for i := 0; i < 100; i++ {
if getter.HasNext() {
_, _ = getter.NextUncompressed()
_, _ = getter.SkipUncompressed()
}
if getter.HasNext() {
_, _ = getter.Skip()
Expand All @@ -196,7 +196,7 @@ func DetectCompressType(getter *Getter) (compressed FileCompression) {
_, _ = getter.Skip()
}
if getter.HasNext() {
_, _ = getter.NextUncompressed()
_, _ = getter.SkipUncompressed()
}
}
return compressed
Expand Down
12 changes: 6 additions & 6 deletions erigon-lib/state/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ var dbgCommBtIndex = dbg.EnvBool("AGG_COMMITMENT_BT", false)
func init() {
if dbgCommBtIndex {
cfg := Schema[kv.CommitmentDomain]
cfg.indexList = withBTree | withExistence
cfg.IndexList = AccessorBTree | AccessorExistence
Schema[kv.CommitmentDomain] = cfg
}
}
Expand All @@ -140,7 +140,7 @@ var Schema = map[kv.Domain]domainCfg{
kv.AccountsDomain: {
name: kv.AccountsDomain, valuesTable: kv.TblAccountVals,

indexList: withBTree | withExistence,
IndexList: AccessorBTree | AccessorExistence,
crossDomainIntegrity: domainIntegrityCheck,
compression: seg.CompressNone,
compressCfg: DomainCompressCfg,
Expand All @@ -162,7 +162,7 @@ var Schema = map[kv.Domain]domainCfg{
kv.StorageDomain: {
name: kv.StorageDomain, valuesTable: kv.TblStorageVals,

indexList: withBTree | withExistence,
IndexList: AccessorBTree | AccessorExistence,
compression: seg.CompressKeys,
compressCfg: DomainCompressCfg,

Expand All @@ -183,7 +183,7 @@ var Schema = map[kv.Domain]domainCfg{
kv.CodeDomain: {
name: kv.CodeDomain, valuesTable: kv.TblCodeVals,

indexList: withBTree | withExistence,
IndexList: AccessorBTree | AccessorExistence,
compression: seg.CompressVals, // compress Code with keys doesn't show any profit. compress of values show 4x ratio on eth-mainnet and 2.5x ratio on bor-mainnet
compressCfg: DomainCompressCfg,
largeValues: true,
Expand All @@ -205,7 +205,7 @@ var Schema = map[kv.Domain]domainCfg{
kv.CommitmentDomain: {
name: kv.CommitmentDomain, valuesTable: kv.TblCommitmentVals,

indexList: withHashMap,
IndexList: AccessorHashMap,
compression: seg.CompressKeys,
compressCfg: DomainCompressCfg,

Expand All @@ -227,7 +227,7 @@ var Schema = map[kv.Domain]domainCfg{
kv.ReceiptDomain: {
name: kv.ReceiptDomain, valuesTable: kv.TblReceiptVals,

indexList: withBTree | withExistence,
IndexList: AccessorBTree | AccessorExistence,
compression: seg.CompressNone, //seg.CompressKeys | seg.CompressVals,
compressCfg: DomainCompressCfg,

Expand Down
2 changes: 1 addition & 1 deletion erigon-lib/state/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1274,7 +1274,7 @@ func TestAggregator_RebuildCommitmentBasedOnFiles(t *testing.T) {
fnames := []string{}
for _, f := range ac.d[kv.CommitmentDomain].files {
var k, stateVal []byte
if ac.d[kv.CommitmentDomain].d.indexList&withHashMap != 0 {
if ac.d[kv.CommitmentDomain].d.IndexList&AccessorHashMap != 0 {
idx := f.src.index.GetReaderFromPool()
r := seg.NewReader(f.src.decompressor.MakeGetter(), compression)

Expand Down
34 changes: 17 additions & 17 deletions erigon-lib/state/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ type domainCfg struct {
name kv.Domain
compression seg.FileCompression
compressCfg seg.Cfg
indexList idxList // list of indexes for given domain
valuesTable string // bucket to store domain values; key -> inverted_step + values (Dupsort)
IndexList Accessors // list of indexes for given domain
valuesTable string // bucket to store domain values; key -> inverted_step + values (Dupsort)
largeValues bool

crossDomainIntegrity rangeDomainIntegrityChecker
Expand Down Expand Up @@ -358,7 +358,7 @@ func (d *Domain) openDirtyFiles() (err error) {
}
}

if item.index == nil && d.indexList&withHashMap != 0 {
if item.index == nil && d.IndexList&AccessorHashMap != 0 {
fPath := d.kvAccessorFilePath(fromStep, toStep)
exists, err := dir.FileExist(fPath)
if err != nil {
Expand All @@ -373,7 +373,7 @@ func (d *Domain) openDirtyFiles() (err error) {
}
}
}
if item.bindex == nil && d.indexList&withBTree != 0 {
if item.bindex == nil && d.IndexList&AccessorBTree != 0 {
fPath := d.kvBtFilePath(fromStep, toStep)
exists, err := dir.FileExist(fPath)
if err != nil {
Expand All @@ -392,7 +392,7 @@ func (d *Domain) openDirtyFiles() (err error) {
}
}
}
if item.existence == nil && d.indexList&withExistence != 0 {
if item.existence == nil && d.IndexList&AccessorExistence != 0 {
fPath := d.kvExistenceIdxFilePath(fromStep, toStep)
exists, err := dir.FileExist(fPath)
if err != nil {
Expand Down Expand Up @@ -443,7 +443,7 @@ func (d *Domain) closeWhatNotInList(fNames []string) {
}

func (d *Domain) reCalcVisibleFiles(toTxNum uint64) {
d._visible = newDomainVisible(d.name, calcVisibleFiles(d.dirtyFiles, d.indexList, false, toTxNum))
d._visible = newDomainVisible(d.name, calcVisibleFiles(d.dirtyFiles, d.IndexList, false, toTxNum))
d.History.reCalcVisibleFiles(toTxNum)
}

Expand Down Expand Up @@ -674,15 +674,15 @@ func (dt *DomainRoTx) getLatestFromFile(i int, filekey []byte) (v []byte, ok boo
}

g := dt.statelessGetter(i)
if dt.d.indexList&withBTree != 0 {
if dt.d.IndexList&AccessorBTree != 0 {
_, v, offset, ok, err = dt.statelessBtree(i).Get(filekey, g)
if err != nil || !ok {
return nil, false, 0, err
}
//fmt.Printf("getLatestFromBtreeColdFiles key %x shard %d %x\n", filekey, exactColdShard, v)
return v, true, offset, nil
}
if dt.d.indexList&withHashMap != 0 {
if dt.d.IndexList&AccessorHashMap != 0 {
reader := dt.statelessIdxReader(i)
if reader.Empty() {
return nil, false, 0, nil
Expand Down Expand Up @@ -1051,7 +1051,7 @@ func (d *Domain) buildFileRange(ctx context.Context, stepFrom, stepTo uint64, co
return StaticFiles{}, fmt.Errorf("open %s values decompressor: %w", d.filenameBase, err)
}

if d.indexList&withHashMap != 0 {
if d.IndexList&AccessorHashMap != 0 {
if err = d.buildAccessor(ctx, stepFrom, stepTo, valuesDecomp, ps); err != nil {
return StaticFiles{}, fmt.Errorf("build %s values idx: %w", d.filenameBase, err)
}
Expand All @@ -1061,7 +1061,7 @@ func (d *Domain) buildFileRange(ctx context.Context, stepFrom, stepTo uint64, co
}
}

if d.indexList&withBTree != 0 {
if d.IndexList&AccessorBTree != 0 {
btPath := d.kvBtFilePath(stepFrom, stepTo)
btM := DefaultBtreeM
if stepFrom == 0 && d.filenameBase == "commitment" {
Expand All @@ -1073,7 +1073,7 @@ func (d *Domain) buildFileRange(ctx context.Context, stepFrom, stepTo uint64, co
return StaticFiles{}, fmt.Errorf("build %s .bt idx: %w", d.filenameBase, err)
}
}
if d.indexList&withExistence != 0 {
if d.IndexList&AccessorExistence != 0 {
fPath := d.kvExistenceIdxFilePath(stepFrom, stepTo)
exists, err := dir.FileExist(fPath)
if err != nil {
Expand Down Expand Up @@ -1154,7 +1154,7 @@ func (d *Domain) buildFiles(ctx context.Context, step uint64, collation Collatio
return StaticFiles{}, fmt.Errorf("open %s values decompressor: %w", d.filenameBase, err)
}

if d.indexList&withHashMap != 0 {
if d.IndexList&AccessorHashMap != 0 {
if err = d.buildAccessor(ctx, step, step+1, valuesDecomp, ps); err != nil {
return StaticFiles{}, fmt.Errorf("build %s values idx: %w", d.filenameBase, err)
}
Expand All @@ -1164,7 +1164,7 @@ func (d *Domain) buildFiles(ctx context.Context, step uint64, collation Collatio
}
}

if d.indexList&withBTree != 0 {
if d.IndexList&AccessorBTree != 0 {
btPath := d.kvBtFilePath(step, step+1)
btM := DefaultBtreeM
if step == 0 && d.filenameBase == "commitment" {
Expand All @@ -1175,7 +1175,7 @@ func (d *Domain) buildFiles(ctx context.Context, step uint64, collation Collatio
return StaticFiles{}, fmt.Errorf("build %s .bt idx: %w", d.filenameBase, err)
}
}
if d.indexList&withExistence != 0 {
if d.IndexList&AccessorExistence != 0 {
fPath := d.kvExistenceIdxFilePath(step, step+1)
exists, err := dir.FileExist(fPath)
if err != nil {
Expand Down Expand Up @@ -1263,7 +1263,7 @@ func (d *Domain) missedAccessors() (l []*filesItem) {
func (d *Domain) BuildMissedAccessors(ctx context.Context, g *errgroup.Group, ps *background.ProgressSet) {
d.History.BuildMissedAccessors(ctx, g, ps)
for _, item := range d.missedBtreeAccessors() {
if d.indexList&withBTree == 0 {
if d.IndexList&AccessorBTree == 0 {
continue
}
if item.decompressor == nil {
Expand All @@ -1281,7 +1281,7 @@ func (d *Domain) BuildMissedAccessors(ctx context.Context, g *errgroup.Group, ps
})
}
for _, item := range d.missedAccessors() {
if d.indexList&withHashMap == 0 {
if d.IndexList&AccessorHashMap == 0 {
continue
}
if item.decompressor == nil {
Expand Down Expand Up @@ -1451,7 +1451,7 @@ func (dt *DomainRoTx) getFromFiles(filekey []byte, maxTxNum uint64) (v []byte, f
if maxTxNum == 0 {
maxTxNum = math.MaxUint64
}
useExistenceFilter := dt.d.indexList&withExistence != 0
useExistenceFilter := dt.d.IndexList&AccessorExistence != 0
useCache := dt.name != kv.CommitmentDomain && maxTxNum == math.MaxUint64

hi, _ := dt.ht.iit.hashKey(filekey)
Expand Down
4 changes: 2 additions & 2 deletions erigon-lib/state/domain_committed.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (dt *DomainRoTx) findShortenedKey(fullKey []byte, itemGetter *seg.Reader, i
// }
//}

if dt.d.indexList&withHashMap != 0 {
if dt.d.IndexList&AccessorHashMap != 0 {
reader := recsplit.NewIndexReader(item.index)
defer reader.Close()

Expand All @@ -135,7 +135,7 @@ func (dt *DomainRoTx) findShortenedKey(fullKey []byte, itemGetter *seg.Reader, i
}
return encodeShorterKey(nil, offset), true
}
if dt.d.indexList&withBTree != 0 {
if dt.d.IndexList&AccessorBTree != 0 {
if item.bindex == nil {
dt.d.logger.Warn("[agg] commitment branch key replacement: file doesn't have index", "name", item.decompressor.FileName())
}
Expand Down
6 changes: 3 additions & 3 deletions erigon-lib/state/domain_shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,8 +818,8 @@ func (sd *SharedDomains) IterateStoragePrefix(prefix []byte, it func(k []byte, v
}
}
case FILE_CURSOR:
indexList := sd.aggTx.d[kv.StorageDomain].d.indexList
if indexList&withBTree != 0 {
indexList := sd.aggTx.d[kv.StorageDomain].d.IndexList
if indexList&AccessorBTree != 0 {
if ci1.btCursor.Next() {
ci1.key = ci1.btCursor.Key()
if ci1.key != nil && bytes.HasPrefix(ci1.key, prefix) {
Expand All @@ -830,7 +830,7 @@ func (sd *SharedDomains) IterateStoragePrefix(prefix []byte, it func(k []byte, v
ci1.btCursor.Close()
}
}
if indexList&withHashMap != 0 {
if indexList&AccessorHashMap != 0 {
ci1.dg.Reset(ci1.latestOffset)
if !ci1.dg.HasNext() {
break
Expand Down
8 changes: 4 additions & 4 deletions erigon-lib/state/files_item.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ type visibleFile struct {
func (i *visibleFile) isSubSetOf(j *visibleFile) bool { return i.src.isSubsetOf(j.src) } //nolint
func (i *visibleFile) isSubsetOf(j *visibleFile) bool { return i.src.isSubsetOf(j.src) } //nolint

func calcVisibleFiles(files *btree2.BTreeG[*filesItem], l idxList, trace bool, toTxNum uint64) (roItems []visibleFile) {
func calcVisibleFiles(files *btree2.BTreeG[*filesItem], l Accessors, trace bool, toTxNum uint64) (roItems []visibleFile) {
newVisibleFiles := make([]visibleFile, 0, files.Len())
// trace = true
if trace {
Expand Down Expand Up @@ -271,21 +271,21 @@ func calcVisibleFiles(files *btree2.BTreeG[*filesItem], l idxList, trace bool, t
}
continue
}
if (l&withBTree != 0) && item.bindex == nil {
if (l&AccessorBTree != 0) && item.bindex == nil {
if trace {
log.Warn("[dbg] calcVisibleFiles: BTindex not opened", "f", item.decompressor.FileName())
}
//panic(fmt.Errorf("btindex nil: %s", item.decompressor.FileName()))
continue
}
if (l&withHashMap != 0) && item.index == nil {
if (l&AccessorHashMap != 0) && item.index == nil {
if trace {
log.Warn("[dbg] calcVisibleFiles: RecSplit not opened", "f", item.decompressor.FileName())
}
//panic(fmt.Errorf("index nil: %s", item.decompressor.FileName()))
continue
}
if (l&withExistence != 0) && item.existence == nil {
if (l&AccessorExistence != 0) && item.existence == nil {
if trace {
log.Warn("[dbg] calcVisibleFiles: Existence not opened", "f", item.decompressor.FileName())
}
Expand Down
4 changes: 2 additions & 2 deletions erigon-lib/state/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ type histCfg struct {
snapshotsDisabled bool // don't produce .v and .ef files, keep in db table. old data will be pruned anyway.
historyDisabled bool // skip all write operations to this History (even in DB)

indexList idxList
indexList Accessors
compressorCfg seg.Cfg // compression settings for history files
compression seg.FileCompression // defines type of compression for history files

Expand All @@ -108,7 +108,7 @@ func NewHistory(cfg histCfg, logger log.Logger) (*History, error) {
//if cfg.compressorCfg.MaxDictPatterns == 0 && cfg.compressorCfg.MaxPatternLen == 0 {
cfg.compressorCfg = seg.DefaultCfg
if cfg.indexList == 0 {
cfg.indexList = withHashMap
cfg.indexList = AccessorHashMap
}
if cfg.iiCfg.filenameBase == "" {
cfg.iiCfg.filenameBase = cfg.filenameBase
Expand Down
16 changes: 9 additions & 7 deletions erigon-lib/state/inverted_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ type iiCfg struct {

// external checker for integrity of inverted index ranges
integrity rangeIntegrityChecker
indexList idxList
indexList Accessors
}

type iiVisible struct {
Expand All @@ -111,7 +111,7 @@ func NewInvertedIndex(cfg iiCfg, logger log.Logger) (*InvertedIndex, error) {
//if cfg.compressorCfg.MaxDictPatterns == 0 && cfg.compressorCfg.MaxPatternLen == 0 {
cfg.compressorCfg = seg.DefaultCfg
if cfg.indexList == 0 {
cfg.indexList = withHashMap
cfg.indexList = AccessorHashMap
}

ii := InvertedIndex{
Expand Down Expand Up @@ -200,12 +200,14 @@ func (ii *InvertedIndex) scanDirtyFiles(fileNames []string) {
}
}

type idxList int
type Accessors int

var (
withBTree idxList = 0b1
withHashMap idxList = 0b10
withExistence idxList = 0b100
func (l Accessors) Has(target Accessors) bool { return l&target != 0 }

const (
AccessorBTree Accessors = 0b1
AccessorHashMap Accessors = 0b10
AccessorExistence Accessors = 0b100
)

func (ii *InvertedIndex) reCalcVisibleFiles(toTxNum uint64) {
Expand Down
6 changes: 3 additions & 3 deletions erigon-lib/state/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ func (dt *DomainRoTx) mergeFiles(ctx context.Context, domainFiles, indexFiles, h
return nil, nil, nil, fmt.Errorf("merge %s decompressor [%d-%d]: %w", dt.d.filenameBase, r.values.from, r.values.to, err)
}

if dt.d.indexList&withBTree != 0 {
if dt.d.IndexList&AccessorBTree != 0 {
btPath := dt.d.kvBtFilePath(fromStep, toStep)
btM := DefaultBtreeM
if toStep == 0 && dt.d.filenameBase == "commitment" {
Expand All @@ -526,7 +526,7 @@ func (dt *DomainRoTx) mergeFiles(ctx context.Context, domainFiles, indexFiles, h
return nil, nil, nil, fmt.Errorf("merge %s btindex [%d-%d]: %w", dt.d.filenameBase, r.values.from, r.values.to, err)
}
}
if dt.d.indexList&withHashMap != 0 {
if dt.d.IndexList&AccessorHashMap != 0 {
if err = dt.d.buildAccessor(ctx, fromStep, toStep, valuesIn.decompressor, ps); err != nil {
return nil, nil, nil, fmt.Errorf("merge %s buildAccessor [%d-%d]: %w", dt.d.filenameBase, r.values.from, r.values.to, err)
}
Expand All @@ -535,7 +535,7 @@ func (dt *DomainRoTx) mergeFiles(ctx context.Context, domainFiles, indexFiles, h
}
}

if dt.d.indexList&withExistence != 0 {
if dt.d.IndexList&AccessorExistence != 0 {
bloomIndexPath := dt.d.kvExistenceIdxFilePath(fromStep, toStep)
exists, err := dir.FileExist(bloomIndexPath)
if err != nil {
Expand Down
Loading

0 comments on commit 68694ca

Please sign in to comment.