Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

M-map full chunks of Head from disk #6679

Merged
merged 44 commits into from
May 6, 2020
Merged
Show file tree
Hide file tree
Changes from 43 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
e5e4ca6
HeadReadWriter for writing and mmapping Head chunks
codesome Dec 27, 2019
e9e3ec9
Mmap chunks in Head
codesome Jan 2, 2020
c812d79
Use pool for chunks
codesome Jan 4, 2020
46a5ef1
Replay the chunks on disk
codesome Jan 17, 2020
527c0d3
Merge remote-tracking branch 'upstream/master' into mmap-chunks
codesome Jan 21, 2020
e8225bf
Make HeadReadWriter local to Head and fix tests
codesome Jan 21, 2020
5766f3e
Cleanup, fixes and unit tests
codesome Jan 27, 2020
77ba7f0
Fix review comments
codesome Jan 30, 2020
96b0823
Merge remote-tracking branch 'upstream/master' into mmap-chunks
codesome Jan 31, 2020
386ef22
Fix review comments
codesome Feb 3, 2020
9bdd4c7
Add repair for corrupt segment files
codesome Feb 5, 2020
5790095
Get rid of WAL interface and noopWAL; take chunks dir in NewHead instead
codesome Feb 5, 2020
df0c67a
Merge remote-tracking branch 'upstream/master' into mmap-chunks
codesome Feb 5, 2020
e4444b5
Fix wrong deleted chunks counting
codesome Feb 6, 2020
b5f9bb5
Test for HeadReadWriter repair
codesome Feb 6, 2020
e42ee47
Merge remote-tracking branch 'upstream/master' into mmap-chunks
codesome Feb 12, 2020
0c577d6
Fix review comments
codesome Feb 17, 2020
99d8892
Merge remote-tracking branch 'upstream/master' into mmap-chunks
codesome Mar 20, 2020
5709867
Merge remote-tracking branch 'upstream/master' into mmap-chunks
codesome Mar 20, 2020
c8f767b
Fixes after merging master
codesome Mar 20, 2020
8741e7b
Attempt fixing tests for windows
codesome Mar 20, 2020
88fd825
Replay num samples of chunks with mmap chunks
codesome Mar 21, 2020
5781812
Attempt fixing tests for windows
codesome Mar 21, 2020
b7389b5
Some renaming and added numSamples in IterateAllChunks
codesome Mar 23, 2020
fd4521d
Merge remote-tracking branch 'upstream/master' into mmap-chunks
codesome Mar 23, 2020
a9bfcc1
Fix review comments
codesome Mar 29, 2020
6cd8d40
Merge remote-tracking branch 'upstream/master' into mmap-chunks
codesome Apr 14, 2020
502d69d
Fix review comments
codesome Apr 24, 2020
f7dc9ed
Merge remote-tracking branch 'upstream/master' into mmap-chunks
codesome Apr 24, 2020
b99415c
Fix failing tests
codesome Apr 27, 2020
75d0fd7
Address review comments
codesome Apr 27, 2020
37a987f
Add test for PostWALRepairRecovery
codesome Apr 28, 2020
f90f530
Address feedback
codesome Apr 29, 2020
4b5fc07
Fix chunks metric when replaying mmapped chunks
codesome Apr 29, 2020
427f79f
Register metrics
codesome Apr 30, 2020
179fb9c
Add out of bound metric and add test for it
codesome Apr 30, 2020
5733830
Replay WAL only once by keeping map of mmapped chunks
codesome May 3, 2020
cbb3508
Fix Head mint maxt updates on setting mmap chunks. Fix metrics.
codesome May 4, 2020
e9417b0
Include mmapped chunks in TestHead_WALMultiRef
codesome May 5, 2020
2e2068e
Rename Repair to DeleteCorrupted
codesome May 5, 2020
9f2974d
Fix review comments
codesome May 5, 2020
eb2e6c1
Fix FlushWAL test in windows
codesome May 6, 2020
971f450
Merge remote-tracking branch 'upstream/master' into mmap-chunks
codesome May 6, 2020
6ec3f23
Add a TODO for the panic in getting mmapped chunks
codesome May 6, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion cmd/prometheus/query_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,19 @@ func (p *queryLogTest) run(t *testing.T) {
p.setQueryLog(t, "")
}

params := append([]string{"-test.main", "--config.file=" + p.configFile.Name(), "--web.enable-lifecycle", fmt.Sprintf("--web.listen-address=%s:%d", p.host, p.port)}, p.params()...)
dir, err := ioutil.TempDir("", "query_log_test")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()

params := append([]string{
"-test.main",
"--config.file=" + p.configFile.Name(),
"--web.enable-lifecycle",
fmt.Sprintf("--web.listen-address=%s:%d", p.host, p.port),
"--storage.tsdb.path=" + dir,
}, p.params()...)

prom := exec.Command(promPath, params...)

Expand Down
12 changes: 8 additions & 4 deletions tsdb/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,12 @@ func TestReadIndexFormatV1(t *testing.T) {

// createBlock creates a block with given set of series and returns its dir.
func createBlock(tb testing.TB, dir string, series []storage.Series) string {
return createBlockFromHead(tb, dir, createHead(tb, series))
chunkDir, err := ioutil.TempDir("", "chunk_dir")
testutil.Ok(tb, err)
defer func() { testutil.Ok(tb, os.RemoveAll(chunkDir)) }()
head := createHead(tb, series, chunkDir)
defer func() { testutil.Ok(tb, head.Close()) }()
return createBlockFromHead(tb, dir, head)
}

func createBlockFromHead(tb testing.TB, dir string, head *Head) string {
Expand All @@ -321,10 +326,9 @@ func createBlockFromHead(tb testing.TB, dir string, head *Head) string {
return filepath.Join(dir, ulid.String())
}

func createHead(tb testing.TB, series []storage.Series) *Head {
head, err := NewHead(nil, nil, nil, 2*60*60*1000, DefaultStripeSize)
func createHead(tb testing.TB, series []storage.Series, chunkDir string) *Head {
head, err := NewHead(nil, nil, nil, 2*60*60*1000, chunkDir, nil, DefaultStripeSize)
testutil.Ok(tb, err)
defer head.Close()

app := head.Appender()
for _, s := range series {
Expand Down
27 changes: 15 additions & 12 deletions tsdb/chunks/head_chunks.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,13 @@ const (
)

// corruptionErr is an error that's returned when corruption is encountered.
type corruptionErr struct {
type CorruptionErr struct {
Dir string
FileIndex int
Err error
}

func (e *corruptionErr) Error() string {
func (e *CorruptionErr) Error() string {
return errors.Wrapf(e.Err, "corruption in head chunk file %s", segmentFile(e.Dir, e.FileIndex)).Error()
}

Expand Down Expand Up @@ -512,7 +512,7 @@ func (cdm *ChunkDiskMapper) Chunk(ref uint64) (chunkenc.Chunk, error) {
// and runs the provided function on each chunk. It returns on the first error encountered.
// NOTE: This method needs to be called at least once after creating ChunkDiskMapper
// to set the maxt of all the file.
func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef, chunkRef uint64, mint, maxt int64) error) (err error) {
func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef, chunkRef uint64, mint, maxt int64, numSamples uint16) error) (err error) {
cdm.writePathMtx.Lock()
defer cdm.writePathMtx.Unlock()

Expand Down Expand Up @@ -550,7 +550,7 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef, chunkRef uint64,
if allZeros {
break
}
return &corruptionErr{
return &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: segID,
Err: errors.Errorf("head chunk file doesn't include enough bytes to read the chunk header - required:%v, available:%v, file:%d", idx+MaxHeadChunkMetaSize, fileEnd, segID),
Expand All @@ -577,12 +577,15 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef, chunkRef uint64,

idx += ChunkEncodingSize // Skip encoding.
dataLen, n := binary.Uvarint(mmapFile.byteSlice.Range(idx, idx+MaxChunkLengthFieldSize))
idx += n + int(dataLen) // Skip the data.
idx += n

numSamples := binary.BigEndian.Uint16(mmapFile.byteSlice.Range(idx, idx+2))
idx += int(dataLen) // Skip the data.

// In the beginning we only checked for the chunk meta size.
// Now that we have added the chunk data length, we check for sufficient bytes again.
if idx+CRCSize > fileEnd {
return &corruptionErr{
return &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: segID,
Err: errors.Errorf("head chunk file doesn't include enough bytes to read the chunk header - required:%v, available:%v, file:%d", idx+CRCSize, fileEnd, segID),
Expand All @@ -595,7 +598,7 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef, chunkRef uint64,
return err
}
if act := chkCRC32.Sum(nil); !bytes.Equal(act, sum) {
return &corruptionErr{
return &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: segID,
Err: errors.Errorf("checksum mismatch expected:%x, actual:%x", sum, act),
Expand All @@ -607,14 +610,14 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef, chunkRef uint64,
mmapFile.maxt = maxt
}

if err := f(seriesRef, chunkRef, mint, maxt); err != nil {
if err := f(seriesRef, chunkRef, mint, maxt, numSamples); err != nil {
return err
}
}

if idx > fileEnd {
// It should be equal to the slice length.
return &corruptionErr{
return &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: segID,
Err: errors.Errorf("head chunk file doesn't include enough bytes to read the last chunk data - required:%v, available:%v, file:%d", idx, fileEnd, segID),
Expand Down Expand Up @@ -678,11 +681,11 @@ func (cdm *ChunkDiskMapper) deleteFiles(removedFiles []int) error {
return nil
}

// Repair deletes all the head chunk files after the one which had the corruption
// DeleteCorrupted deletes all the head chunk files after the one which had the corruption
// (including the corrupt file).
func (cdm *ChunkDiskMapper) Repair(originalErr error) error {
func (cdm *ChunkDiskMapper) DeleteCorrupted(originalErr error) error {
err := errors.Cause(originalErr) // So that we can pick up errors even if wrapped.
cerr, ok := err.(*corruptionErr)
cerr, ok := err.(*CorruptionErr)
if !ok {
return errors.Wrap(originalErr, "cannot handle error")
}
Expand Down
19 changes: 11 additions & 8 deletions tsdb/chunks/head_chunks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func TestHeadReadWriter_WriteChunk_Chunk_IterateChunks(t *testing.T) {
type expectedDataType struct {
seriesRef, chunkRef uint64
mint, maxt int64
numSamples uint16
chunk chunkenc.Chunk
}
expectedData := []expectedDataType{}
Expand All @@ -51,11 +52,12 @@ func TestHeadReadWriter_WriteChunk_Chunk_IterateChunks(t *testing.T) {
seriesRef, chkRef, mint, maxt, chunk := createChunk(t, totalChunks, hrw)
totalChunks++
expectedData = append(expectedData, expectedDataType{
seriesRef: seriesRef,
mint: mint,
maxt: maxt,
chunkRef: chkRef,
chunk: chunk,
seriesRef: seriesRef,
mint: mint,
maxt: maxt,
chunkRef: chkRef,
chunk: chunk,
numSamples: uint16(chunk.NumSamples()),
})

if hrw.curFileSequence != 1 {
Expand Down Expand Up @@ -128,14 +130,15 @@ func TestHeadReadWriter_WriteChunk_Chunk_IterateChunks(t *testing.T) {
testutil.Ok(t, err)

idx := 0
err = hrw.IterateAllChunks(func(seriesRef, chunkRef uint64, mint, maxt int64) error {
err = hrw.IterateAllChunks(func(seriesRef, chunkRef uint64, mint, maxt int64, numSamples uint16) error {
t.Helper()

expData := expectedData[idx]
testutil.Equals(t, expData.seriesRef, seriesRef)
testutil.Equals(t, expData.chunkRef, chunkRef)
testutil.Equals(t, expData.maxt, maxt)
testutil.Equals(t, expData.maxt, maxt)
testutil.Equals(t, expData.numSamples, numSamples)

actChunk, err := hrw.Chunk(expData.chunkRef)
testutil.Ok(t, err)
Expand All @@ -157,7 +160,7 @@ func TestHeadReadWriter_Truncate(t *testing.T) {
}()

testutil.Assert(t, !hrw.fileMaxtSet, "")
testutil.Ok(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64) error { return nil }))
testutil.Ok(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil }))
testutil.Assert(t, hrw.fileMaxtSet, "")

timeRange := 0
Expand Down Expand Up @@ -227,7 +230,7 @@ func TestHeadReadWriter_Truncate(t *testing.T) {
testutil.Ok(t, err)

testutil.Assert(t, !hrw.fileMaxtSet, "")
testutil.Ok(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64) error { return nil }))
testutil.Ok(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil }))
testutil.Assert(t, hrw.fileMaxtSet, "")

// Truncating files after restart.
Expand Down
7 changes: 6 additions & 1 deletion tsdb/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,7 +870,12 @@ func BenchmarkCompactionFromHead(b *testing.B) {
for labelNames := 1; labelNames < totalSeries; labelNames *= 10 {
labelValues := totalSeries / labelNames
b.Run(fmt.Sprintf("labelnames=%d,labelvalues=%d", labelNames, labelValues), func(b *testing.B) {
h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize)
chunkDir, err := ioutil.TempDir("", "chunk_dir")
testutil.Ok(b, err)
defer func() {
testutil.Ok(b, os.RemoveAll(chunkDir))
}()
h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize)
testutil.Ok(b, err)
for ln := 0; ln < labelNames; ln++ {
app := h.Appender()
Expand Down
26 changes: 16 additions & 10 deletions tsdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) {
if err != nil {
return err
}
head, err := NewHead(nil, db.logger, w, 1, DefaultStripeSize)
head, err := NewHead(nil, db.logger, w, 1, db.dir, nil, DefaultStripeSize)
if err != nil {
return err
}
Expand Down Expand Up @@ -368,7 +368,7 @@ func (db *DBReadOnly) Querier(ctx context.Context, mint, maxt int64) (storage.Qu
blocks[i] = b
}

head, err := NewHead(nil, db.logger, nil, 1, DefaultStripeSize)
head, err := NewHead(nil, db.logger, nil, 1, db.dir, nil, DefaultStripeSize)
if err != nil {
return nil, err
}
Expand All @@ -379,11 +379,14 @@ func (db *DBReadOnly) Querier(ctx context.Context, mint, maxt int64) (storage.Qu

// Also add the WAL if the current blocks don't cover the requests time range.
if maxBlockTime <= maxt {
if err := head.Close(); err != nil {
return nil, err
}
w, err := wal.Open(db.logger, filepath.Join(db.dir, "wal"))
if err != nil {
return nil, err
}
head, err = NewHead(nil, db.logger, w, 1, DefaultStripeSize)
head, err = NewHead(nil, db.logger, w, 1, db.dir, nil, DefaultStripeSize)
if err != nil {
return nil, err
}
Expand All @@ -395,10 +398,10 @@ func (db *DBReadOnly) Querier(ctx context.Context, mint, maxt int64) (storage.Qu
// Set the wal to nil to disable all wal operations.
// This is mainly to avoid blocking when closing the head.
head.wal = nil

db.closers = append(db.closers, head)
}

db.closers = append(db.closers, head)

// TODO: Refactor so that it is possible to obtain a Querier without initializing a writable DB instance.
// Option 1: refactor DB to have the Querier implementation using the DBReadOnly.Querier implementation not the opposite.
// Option 2: refactor Querier to use another independent func which
Expand Down Expand Up @@ -583,19 +586,21 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs

var wlog *wal.WAL
segmentSize := wal.DefaultSegmentSize
walDir := filepath.Join(dir, "wal")
// Wal is enabled.
if opts.WALSegmentSize >= 0 {
// Wal is set to a custom size.
if opts.WALSegmentSize > 0 {
segmentSize = opts.WALSegmentSize
}
wlog, err = wal.NewSize(l, r, filepath.Join(dir, "wal"), segmentSize, opts.WALCompression)
wlog, err = wal.NewSize(l, r, walDir, segmentSize, opts.WALCompression)
if err != nil {
return nil, err
}
}

db.head, err = NewHead(r, l, wlog, rngs[0], opts.StripeSize)
db.head, err = NewHead(r, l, wlog, rngs[0], dir, db.chunkPool, opts.StripeSize)

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1018,9 +1023,10 @@ func (db *DB) beyondSizeRetention(blocks []*Block) (deletable map[ulid.ULID]*Blo
deletable = make(map[ulid.ULID]*Block)

walSize, _ := db.Head().wal.Size()
// Initializing size counter with WAL size,
// as that is part of the retention strategy.
blocksSize := walSize
headChunksSize := db.Head().chunkDiskMapper.Size()
// Initializing size counter with WAL size and Head chunks
// written to disk, as that is part of the retention strategy.
blocksSize := walSize + headChunksSize
for i, block := range blocks {
blocksSize += block.Size()
if blocksSize > int64(db.opts.MaxBytes) {
Expand Down
29 changes: 23 additions & 6 deletions tsdb/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -849,8 +849,15 @@ func TestWALSegmentSizeOptions(t *testing.T) {
tests := map[int]func(dbdir string, segmentSize int){
// Default Wal Size.
0: func(dbDir string, segmentSize int) {
files, err := ioutil.ReadDir(filepath.Join(dbDir, "wal"))
filesAndDir, err := ioutil.ReadDir(filepath.Join(dbDir, "wal"))
testutil.Ok(t, err)
files := []os.FileInfo{}
for _, f := range filesAndDir {
if !f.IsDir() {
files = append(files, f)
}
}
// All the full segment files (all but the last) should match the segment size option.
for _, f := range files[:len(files)-1] {
testutil.Equals(t, int64(DefaultOptions().WALSegmentSize), f.Size(), "WAL file size doesn't match WALSegmentSize option, filename: %v", f.Name())
}
Expand All @@ -859,9 +866,16 @@ func TestWALSegmentSizeOptions(t *testing.T) {
},
// Custom Wal Size.
2 * 32 * 1024: func(dbDir string, segmentSize int) {
files, err := ioutil.ReadDir(filepath.Join(dbDir, "wal"))
testutil.Assert(t, len(files) > 1, "current WALSegmentSize should result in more than a single WAL file.")
filesAndDir, err := ioutil.ReadDir(filepath.Join(dbDir, "wal"))
testutil.Ok(t, err)
files := []os.FileInfo{}
for _, f := range filesAndDir {
if !f.IsDir() {
files = append(files, f)
}
}
testutil.Assert(t, len(files) > 1, "current WALSegmentSize should result in more than a single WAL file.")
// All the full segment files (all but the last) should match the segment size option.
for _, f := range files[:len(files)-1] {
testutil.Equals(t, int64(segmentSize), f.Size(), "WAL file size doesn't match WALSegmentSize option, filename: %v", f.Name())
}
Expand All @@ -870,9 +884,12 @@ func TestWALSegmentSizeOptions(t *testing.T) {
},
// Wal disabled.
-1: func(dbDir string, segmentSize int) {
if _, err := os.Stat(filepath.Join(dbDir, "wal")); !os.IsNotExist(err) {
t.Fatal("wal directory is present when the wal is disabled")
}
// Check that WAL dir is not there.
_, err := os.Stat(filepath.Join(dbDir, "wal"))
testutil.NotOk(t, err)
// Check that there is chunks dir.
_, err = os.Stat(mmappedChunksDir(dbDir))
testutil.Ok(t, err)
},
}
for segmentSize, testFunc := range tests {
Expand Down
2 changes: 1 addition & 1 deletion tsdb/docs/format/head_chunks.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,4 @@ Unlike chunks in the on-disk blocks, here we additionally store series reference
┌─────────────────────┬───────────────────────┬───────────────────────┬───────────────────┬───────────────┬──────────────┬────────────────┐
| series ref <8 byte> | mint <8 byte, uint64> | maxt <8 byte, uint64> | encoding <1 byte> | len <uvarint> | data <bytes> │ CRC32 <4 byte> │
└─────────────────────┴───────────────────────┴───────────────────────┴───────────────────┴───────────────┴──────────────┴────────────────┘
```
```
Loading