Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: series file index compaction #23916

Merged
merged 3 commits into from
Jun 1, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 74 additions & 58 deletions tsdb/series_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
"encoding/binary"
"errors"
"fmt"
"math"
"os"
"path/filepath"
"sync"

"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
errors2 "github.com/influxdata/influxdb/pkg/errors"
"github.com/influxdata/influxdb/pkg/limiter"
"github.com/influxdata/influxdb/pkg/rhh"
"go.uber.org/zap"
Expand Down Expand Up @@ -86,7 +88,7 @@ func (p *SeriesPartition) Open() error {
p.index = NewSeriesIndex(p.IndexPath())
if err := p.index.Open(); err != nil {
return err
} else if p.index.Recover(p.segments); err != nil {
} else if err = p.index.Recover(p.segments); err != nil {
return err
}

Expand Down Expand Up @@ -573,94 +575,108 @@ func (c *SeriesPartitionCompactor) Compact(p *SeriesPartition) error {
return nil
}

func (c *SeriesPartitionCompactor) compactIndexTo(index *SeriesIndex, seriesN uint64, segments []*SeriesSegment, path string) error {
hdr := NewSeriesIndexHeader()
hdr.Count = seriesN
hdr.Capacity = pow2((int64(hdr.Count) * 100) / SeriesIndexLoadFactor)

// Allocate space for maps.
keyIDMap := make([]byte, (hdr.Capacity * SeriesIndexElemSize))
idOffsetMap := make([]byte, (hdr.Capacity * SeriesIndexElemSize))
var errDone error = errors.New("done")

// Reindex all partitions.
var entryN int
for _, segment := range segments {
errDone := errors.New("done")
func (c *SeriesPartitionCompactor) compactIndexTo(index *SeriesIndex, seriesN uint64, segments []*SeriesSegment, path string) (err error) {

if err := segment.ForEachEntry(func(flag uint8, id uint64, offset int64, key []byte) error {
hdr := NewSeriesIndexHeader()
var keyIDMap []byte
var idOffsetMap []byte

hdr.Count = math.MaxUint64
// seriesN is the current size of the index. Because it may contain tombstones
// for deleted series, we recalculate that number (as seriesCount) without the
// deleted series as we rebuild the index. If the count of existing series does
// not equal the seriesN passed in (meaning there were tombstones), we rebuild
// the index a second time with the correct size.
seriesCount := seriesN
for {
seriesN = seriesCount
Comment on lines +592 to +594
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused by what's going on here. Part of this may be due to changing the value of the input parameter seriesN. This changes the meaning of seriesN in the function call vs the function body. Copying to a different variable before using might help. A comment explaining what seriesN and seriesCount represent as the function runs would also help.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That helps. Weird whitespace, but not going to worry about that.

seriesCount = uint64(0)
// This only loops if there are deleted entries, which shrinks the size
hdr.Capacity = pow2((int64(seriesN) * 100) / SeriesIndexLoadFactor)
// Allocate space for maps, guaranteeing slices are initialized to zero
keyIDMap = make([]byte, hdr.Capacity*SeriesIndexElemSize)
idOffsetMap = make([]byte, hdr.Capacity*SeriesIndexElemSize)

// Reindex all partitions.
var entryN int
for _, segment := range segments {

if err = segment.ForEachEntry(func(flag uint8, id uint64, offset int64, key []byte) error {

// Make sure we don't go past the offset where the compaction began.
if offset > index.maxOffset {
return errDone
}

// Make sure we don't go past the offset where the compaction began.
if offset > index.maxOffset {
return errDone
}
// Check for cancellation periodically.
if entryN++; entryN%1000 == 0 {
select {
case <-c.cancel:
return ErrSeriesPartitionCompactionCancelled
default:
}
}

// Check for cancellation periodically.
if entryN++; entryN%1000 == 0 {
select {
case <-c.cancel:
return ErrSeriesPartitionCompactionCancelled
// Only process insert entries.
switch flag {
case SeriesEntryInsertFlag:
// does not fallthrough
case SeriesEntryTombstoneFlag:
return nil
default:
return fmt.Errorf("unexpected series partition log entry flag: %d", flag)
}
}

// Only process insert entries.
switch flag {
case SeriesEntryInsertFlag: // fallthrough
case SeriesEntryTombstoneFlag:
return nil
default:
return fmt.Errorf("unexpected series partition log entry flag: %d", flag)
}

// Save max series identifier processed.
hdr.MaxSeriesID, hdr.MaxOffset = id, offset
// Save max series identifier processed.
hdr.MaxSeriesID, hdr.MaxOffset = id, offset

// Ignore entry if tombstoned.
if index.IsDeleted(id) {
return nil
// Ignore entry if tombstoned.
if index.IsDeleted(id) {
return nil
}
seriesCount++
// Insert into maps.
c.insertIDOffsetMap(idOffsetMap, hdr.Capacity, id, offset)
return c.insertKeyIDMap(keyIDMap, hdr.Capacity, segments, key, offset, id)
}); err == errDone {
break
} else if err != nil {
return err
}

// Insert into maps.
c.insertIDOffsetMap(idOffsetMap, hdr.Capacity, id, offset)
return c.insertKeyIDMap(keyIDMap, hdr.Capacity, segments, key, offset, id)
}); err == errDone {
}
hdr.Count = seriesCount
if seriesN != seriesCount {
continue
} else {
break
} else if err != nil {
return err
}
}

// Open file handler.
f, err := os.Create(path)
if err != nil {
return err
}
defer f.Close()

defer errors2.Capture(&err, f.Close)()
// Calculate map positions.
hdr.KeyIDMap.Offset, hdr.KeyIDMap.Size = SeriesIndexHeaderSize, int64(len(keyIDMap))
hdr.IDOffsetMap.Offset, hdr.IDOffsetMap.Size = hdr.KeyIDMap.Offset+hdr.KeyIDMap.Size, int64(len(idOffsetMap))

// Write header.
if _, err := hdr.WriteTo(f); err != nil {
if _, err = hdr.WriteTo(f); err != nil {
return err
}

// Write maps.
if _, err := f.Write(keyIDMap); err != nil {
if _, err = f.Write(keyIDMap); err != nil {
return err
} else if _, err := f.Write(idOffsetMap); err != nil {
return err
}

// Sync & close.
if err := f.Sync(); err != nil {
return err
} else if err := f.Close(); err != nil {
return err
}

return nil
// Sync, then deferred close
return f.Sync()
}

func (c *SeriesPartitionCompactor) insertKeyIDMap(dst []byte, capacity int64, segments []*SeriesSegment, key []byte, offset int64, id uint64) error {
Expand Down