Skip to content

Commit

Permalink
e3: parallel miss idx (#728)
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov authored Nov 7, 2022
1 parent 4b0ff0a commit bfd3996
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 74 deletions.
2 changes: 0 additions & 2 deletions state/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,8 +796,6 @@ func (d *Domain) BuildMissedIndices(ctx context.Context, sem *semaphore.Weighted
}

func buildIndex(ctx context.Context, d *compress.Decompressor, idxPath, tmpdir string, count int, values bool) (*recsplit.Index, error) {
_, fName := filepath.Split(idxPath)
log.Debug("[snapshots] build idx", "file", fName)
var rs *recsplit.RecSplit
var err error
if rs, err = recsplit.NewRecSplit(recsplit.RecSplitArgs{
Expand Down
77 changes: 37 additions & 40 deletions state/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,52 +271,49 @@ func (h *History) BuildMissedIndices(ctx context.Context, sem *semaphore.Weighte
}

missedFiles := h.missedIdxFiles()
//errs := make(chan error, len(missedFiles))
//wg := sync.WaitGroup{}
errs := make(chan error, len(missedFiles))
wg := sync.WaitGroup{}

for _, item := range missedFiles {
//if err := sem.Acquire(ctx, 1); err != nil {
// errs <- err
// break
//}
//wg.Add(1)
//go func(item *filesItem) {
// defer sem.Release(1)
// defer wg.Done()

search := &filesItem{startTxNum: item.startTxNum, endTxNum: item.endTxNum}
iiItem, ok := h.InvertedIndex.files.Get(search)
if !ok {
return nil
if err := sem.Acquire(ctx, 1); err != nil {
errs <- err
break
}
wg.Add(1)
go func(item *filesItem) {
defer sem.Release(1)
defer wg.Done()

fromStep, toStep := item.startTxNum/h.aggregationStep, item.endTxNum/h.aggregationStep
fName := fmt.Sprintf("%s.%d-%d.vi", h.filenameBase, fromStep, toStep)
idxPath := filepath.Join(h.dir, fName)
log.Info("[snapshots] build idx", "file", fName)
count, err := iterateForVi(item, iiItem, h.compressVals, func(v []byte) error { return nil })
search := &filesItem{startTxNum: item.startTxNum, endTxNum: item.endTxNum}
iiItem, ok := h.InvertedIndex.files.Get(search)
if !ok {
return
}

fromStep, toStep := item.startTxNum/h.aggregationStep, item.endTxNum/h.aggregationStep
fName := fmt.Sprintf("%s.%d-%d.vi", h.filenameBase, fromStep, toStep)
idxPath := filepath.Join(h.dir, fName)
log.Info("[snapshots] build idx", "file", fName)
count, err := iterateForVi(item, iiItem, h.compressVals, func(v []byte) error { return nil })
if err != nil {
errs <- err
}
errs <- buildVi(item, iiItem, idxPath, h.tmpdir, count, false /* values */, h.compressVals)
}(item)
}
go func() {
wg.Wait()
close(errs)
}()
var lastError error
for err := range errs {
if err != nil {
return err
lastError = err
}
if err := buildVi(item, iiItem, idxPath, h.tmpdir, count, false /* values */, h.compressVals); err != nil {
return err
}
//errs <- buildVi(item, iiItem, idxPath, h.tmpdir, count, false /* values */, h.compressVals)
//}(item)
}
//go func() {
// wg.Wait()
// close(errs)
//}()
//var lastError error
//for err := range errs {
// if err != nil {
// lastError = err
// }
//}
//if lastError != nil {
// return lastError
//}
}
if lastError != nil {
return lastError
}

return h.openFiles()
}
Expand Down
63 changes: 31 additions & 32 deletions state/inverted_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,41 +196,40 @@ func (ii *InvertedIndex) missedIdxFiles() (l []*filesItem) {
// BuildMissedIndices - produce .efi/.vi/.kvi from .ef/.v/.kv
func (ii *InvertedIndex) BuildMissedIndices(ctx context.Context, sem *semaphore.Weighted) (err error) {
missedFiles := ii.missedIdxFiles()
//errs := make(chan error, len(missedFiles))
//wg := sync.WaitGroup{}
errs := make(chan error, len(missedFiles))
wg := sync.WaitGroup{}
for _, item := range missedFiles {
//if err := sem.Acquire(ctx, 1); err != nil {
// errs <- err
// break
//}
//wg.Add(1)
//go func(item *filesItem) {
// defer sem.Release(1)
// defer wg.Done()
fromStep, toStep := item.startTxNum/ii.aggregationStep, item.endTxNum/ii.aggregationStep
fName := fmt.Sprintf("%s.%d-%d.efi", ii.filenameBase, fromStep, toStep)
idxPath := filepath.Join(ii.dir, fName)
log.Info("[snapshots] build idx", "file", fName)
_, err := buildIndex(ctx, item.decompressor, idxPath, ii.tmpdir, item.decompressor.Count()/2, false)
if err := sem.Acquire(ctx, 1); err != nil {
errs <- err
break
}
wg.Add(1)
go func(item *filesItem) {
defer sem.Release(1)
defer wg.Done()
fromStep, toStep := item.startTxNum/ii.aggregationStep, item.endTxNum/ii.aggregationStep
fName := fmt.Sprintf("%s.%d-%d.efi", ii.filenameBase, fromStep, toStep)
idxPath := filepath.Join(ii.dir, fName)
log.Info("[snapshots] build idx", "file", fName)
_, err := buildIndex(ctx, item.decompressor, idxPath, ii.tmpdir, item.decompressor.Count()/2, false)
if err != nil {
errs <- err
}
}(item)
}
go func() {
wg.Wait()
close(errs)
}()
var lastError error
for err := range errs {
if err != nil {
return err
lastError = err
}
//errs <- err
//}(item)
}
//go func() {
// wg.Wait()
// close(errs)
//}()
//var lastError error
//for err := range errs {
// if err != nil {
// lastError = err
// }
//}
//if lastError != nil {
// return lastError
//}
}
if lastError != nil {
return lastError
}
return ii.openFiles()
}

Expand Down

0 comments on commit bfd3996

Please sign in to comment.