Skip to content

Commit

Permalink
[release-2.9.x] Pin chunk and index format to schema version. (#…
Browse files Browse the repository at this point in the history
…10333)

Backport bbfb13c from #10213

---

We pin all three `Chunk`, `HeadBlock` and `TSDB` Version to `schema`
version in period config.


This is the following mapping (after being discussed with @owen-d and
@sandeepsukhani )

`v12` (current existing schema) - ChunkFormatV3 (UnorderedHeadBlock) +
TSDBv2
`v13` (introducing new schema) - ChunkFormatV4
(UnorderedWithNonIndexedLabelsHeadBlockFmt) + TSDBv3

Note the new schema `v13` supports the latest chunk and index format.

**NOTES for Reviewer**

1. General approach is we removed the idea of `index.LiveFormat`,
`chunkenc.DefaultChunkFormat` and `chunkenc.DefaultHeadBlockFmt` and
made following two changes. These variables were used before to tie
chunk and tsdb formats specific to Loki versions. This PR remove that
coupling and pin these formats to `schema` version instead.

1. These variables were replaced with explicit chunk and index formats
within those packages (and it's tests)
2. If these variables were used outside it's own packages say by
ingester, compactor, etc. Then we extract correct chunk and index
versions from the `schema` config.

2. Add two methods to `periodConfig`. (1) `ChunkFormat()` returning
chunk and head format tied to schema (2) `TSDBFormat()` returning tsdb
format tied to schema.

2. Other ideas I thought of doing but didn't end up doing is make
`ChunkFormat` and `IndexFormat` as separate type (rather than `byte` and
`int` currently. Similar to `HeadBlockFmt` type). But didnt' do it
eventually to keep the PR small and don't want to complicate with lots
of changes.

4. Moved couple of test cases from `chunkenc` to `config` package,
because the test case was actually testing methods on `schemaconfig` and
it was creating cycling dependencies.

Co-authored-by: Kaviraj Kanagaraj <kavirajkanagaraj@gmail.com>
  • Loading branch information
grafanabot and kavirajk authored Aug 24, 2023
1 parent 9b7a634 commit 70bfc98
Show file tree
Hide file tree
Showing 44 changed files with 1,907 additions and 1,395 deletions.
84 changes: 49 additions & 35 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,10 @@ import (

const (
_ byte = iota
chunkFormatV1
chunkFormatV2
chunkFormatV3
chunkFormatV4

DefaultChunkFormat = chunkFormatV4 // the currently used chunk format
ChunkFormatV1
ChunkFormatV2
ChunkFormatV3
ChunkFormatV4

blocksPerChunk = 10
maxLineLength = 1024 * 1024 * 1024
Expand Down Expand Up @@ -84,10 +82,22 @@ const (
OrderedHeadBlockFmt
UnorderedHeadBlockFmt
UnorderedWithNonIndexedLabelsHeadBlockFmt

DefaultHeadBlockFmt = UnorderedWithNonIndexedLabelsHeadBlockFmt
)

// ChunkHeadFormatFor returns corresponding head block format for the given `chunkfmt`.
func ChunkHeadFormatFor(chunkfmt byte) HeadBlockFmt {
if chunkfmt < ChunkFormatV3 {
return OrderedHeadBlockFmt
}

if chunkfmt == ChunkFormatV3 {
return UnorderedHeadBlockFmt
}

// return the latest head format for all chunkformat >v3
return UnorderedWithNonIndexedLabelsHeadBlockFmt
}

var magicNumber = uint32(0x12EE56A)

// The table gets initialized with sync.Once but may still cause a race
Expand Down Expand Up @@ -293,7 +303,7 @@ func (hb *headBlock) LoadBytes(b []byte) error {
return errors.Wrap(db.err(), "verifying headblock header")
}
switch version {
case chunkFormatV1, chunkFormatV2, chunkFormatV3, chunkFormatV4:
case ChunkFormatV1, ChunkFormatV2, ChunkFormatV3, ChunkFormatV4:
default:
return errors.Errorf("incompatible headBlock version (%v), only V1,V2,V3 is currently supported", version)
}
Expand Down Expand Up @@ -344,15 +354,16 @@ type entry struct {
}

// NewMemChunk returns a new in-mem chunk.
func NewMemChunk(enc Encoding, head HeadBlockFmt, blockSize, targetSize int) *MemChunk {
return newMemChunkWithFormat(DefaultChunkFormat, enc, head, blockSize, targetSize)
func NewMemChunk(chunkFormat byte, enc Encoding, head HeadBlockFmt, blockSize, targetSize int) *MemChunk {
return newMemChunkWithFormat(chunkFormat, enc, head, blockSize, targetSize)
}

func panicIfInvalidFormat(chunkFmt byte, head HeadBlockFmt) {
if chunkFmt == chunkFormatV2 && head != OrderedHeadBlockFmt {
if chunkFmt == ChunkFormatV2 && head != OrderedHeadBlockFmt {
panic("only OrderedHeadBlockFmt is supported for V2 chunks")
}
if chunkFmt == chunkFormatV4 && head != UnorderedWithNonIndexedLabelsHeadBlockFmt {
if chunkFmt == ChunkFormatV4 && head != UnorderedWithNonIndexedLabelsHeadBlockFmt {
fmt.Println("received head fmt", head.String())
panic("only UnorderedWithNonIndexedLabelsHeadBlockFmt is supported for V4 chunks")
}
}
Expand Down Expand Up @@ -401,9 +412,9 @@ func newByteChunk(b []byte, blockSize, targetSize int, fromCheckpoint bool) (*Me
}
bc.format = version
switch version {
case chunkFormatV1:
case ChunkFormatV1:
bc.encoding = EncGZIP
case chunkFormatV2, chunkFormatV3, chunkFormatV4:
case ChunkFormatV2, ChunkFormatV3, ChunkFormatV4:
// format v2+ has a byte for block encoding.
enc := Encoding(db.byte())
if db.err() != nil {
Expand All @@ -414,6 +425,9 @@ func newByteChunk(b []byte, blockSize, targetSize int, fromCheckpoint bool) (*Me
return nil, errors.Errorf("invalid version %d", version)
}

// Set the correct headblock format based on chunk format
bc.headFmt = ChunkHeadFormatFor(version)

// readSectionLenAndOffset reads len and offset for different sections within the chunk.
// Starting from chunk version 4, we have started writing offset and length of various sections within the chunk.
// These len and offset pairs would be stored together at the end of the chunk.
Expand All @@ -427,7 +441,7 @@ func newByteChunk(b []byte, blockSize, targetSize int, fromCheckpoint bool) (*Me

metasOffset := uint64(0)
metasLen := uint64(0)
if version >= chunkFormatV4 {
if version >= ChunkFormatV4 {
// version >= 4 starts writing length of sections after their offsets
metasLen, metasOffset = readSectionLenAndOffset(chunkMetasSectionIdx)
} else {
Expand Down Expand Up @@ -458,7 +472,7 @@ func newByteChunk(b []byte, blockSize, targetSize int, fromCheckpoint bool) (*Me

// Read offset and length.
blk.offset = db.uvarint()
if version >= chunkFormatV3 {
if version >= ChunkFormatV3 {
blk.uncompressedSize = db.uvarint()
}
l := db.uvarint()
Expand All @@ -481,7 +495,7 @@ func newByteChunk(b []byte, blockSize, targetSize int, fromCheckpoint bool) (*Me
}
}

if version >= chunkFormatV4 {
if version >= ChunkFormatV4 {
nonIndexedLabelsLen, nonIndexedLabelsOffset := readSectionLenAndOffset(chunkNonIndexedLabelsSectionIdx)
lb := b[nonIndexedLabelsOffset : nonIndexedLabelsOffset+nonIndexedLabelsLen] // non-indexed labels Offset + checksum
db = decbuf{b: lb}
Expand Down Expand Up @@ -526,7 +540,7 @@ func (c *MemChunk) Bytes() ([]byte, error) {
func (c *MemChunk) BytesSize() int {
size := 4 // magic number
size++ // format
if c.format > chunkFormatV1 {
if c.format > ChunkFormatV1 {
size++ // chunk format v2+ has a byte for encoding.
}

Expand All @@ -538,7 +552,7 @@ func (c *MemChunk) BytesSize() int {
size += binary.MaxVarintLen64 // mint
size += binary.MaxVarintLen64 // maxt
size += binary.MaxVarintLen32 // offset
if c.format >= chunkFormatV3 {
if c.format >= ChunkFormatV3 {
size += binary.MaxVarintLen32 // uncompressed size
}
size += binary.MaxVarintLen32 // len(b)
Expand All @@ -550,7 +564,7 @@ func (c *MemChunk) BytesSize() int {
size += crc32.Size // metablock crc
size += 8 // metaoffset

if c.format >= chunkFormatV4 {
if c.format >= ChunkFormatV4 {
size += 8 // metablock length

size += c.symbolizer.CheckpointSize() // non-indexed labels block
Expand Down Expand Up @@ -586,7 +600,7 @@ func (c *MemChunk) writeTo(w io.Writer, forCheckpoint bool) (int64, error) {
// Write the header (magicNum + version).
eb.putBE32(magicNumber)
eb.putByte(c.format)
if c.format > chunkFormatV1 {
if c.format > ChunkFormatV1 {
// chunk format v2+ has a byte for encoding.
eb.putByte(byte(c.encoding))
}
Expand All @@ -599,7 +613,7 @@ func (c *MemChunk) writeTo(w io.Writer, forCheckpoint bool) (int64, error) {
nonIndexedLabelsOffset := offset
nonIndexedLabelsLen := 0

if c.format >= chunkFormatV4 {
if c.format >= ChunkFormatV4 {
var (
n int
crcHash []byte
Expand Down Expand Up @@ -655,7 +669,7 @@ func (c *MemChunk) writeTo(w io.Writer, forCheckpoint bool) (int64, error) {
eb.putVarint64(b.mint)
eb.putVarint64(b.maxt)
eb.putUvarint(b.offset)
if c.format >= chunkFormatV3 {
if c.format >= ChunkFormatV3 {
eb.putUvarint(b.uncompressedSize)
}
eb.putUvarint(len(b.b))
Expand All @@ -669,7 +683,7 @@ func (c *MemChunk) writeTo(w io.Writer, forCheckpoint bool) (int64, error) {
}
offset += int64(n)

if c.format >= chunkFormatV4 {
if c.format >= ChunkFormatV4 {
// Write non-indexed labels offset and length
eb.reset()
eb.putBE64int(nonIndexedLabelsLen)
Expand All @@ -683,7 +697,7 @@ func (c *MemChunk) writeTo(w io.Writer, forCheckpoint bool) (int64, error) {

// Write the metasOffset.
eb.reset()
if c.format >= chunkFormatV4 {
if c.format >= ChunkFormatV4 {
eb.putBE64int(metasLen)
}
eb.putBE64int(int(metasOffset))
Expand Down Expand Up @@ -763,7 +777,7 @@ func (c *MemChunk) SpaceFor(e *logproto.Entry) bool {
// a great check, but it will guarantee we are always under the target size
newHBSize := c.head.UncompressedSize() + len(e.Line)
nonIndexedLabelsSize := 0
if c.format >= chunkFormatV4 {
if c.format >= ChunkFormatV4 {
newHBSize += metaLabelsLen(logproto.FromLabelAdaptersToLabels(e.NonIndexedLabels))
// non-indexed labels are compressed while serializing the chunk so we don't know what their size would be after compression.
// As adoption increases, their overall size can be non-trivial so we can't ignore them while calculating chunk size.
Expand All @@ -786,7 +800,7 @@ func (c *MemChunk) UncompressedSize() int {
size += b.uncompressedSize
}

if c.format >= chunkFormatV4 {
if c.format >= ChunkFormatV4 {
size += c.symbolizer.UncompressedSize()
}

Expand All @@ -802,7 +816,7 @@ func (c *MemChunk) CompressedSize() int {
size := 0
// Better to account for any uncompressed data than ignore it even though this isn't accurate.
size += c.head.UncompressedSize()
if c.format >= chunkFormatV4 {
if c.format >= ChunkFormatV4 {
size += c.symbolizer.UncompressedSize() // length of each symbol
}

Expand All @@ -829,7 +843,7 @@ func (c *MemChunk) Append(entry *logproto.Entry) error {
return ErrOutOfOrder
}

if c.format < chunkFormatV4 {
if c.format < ChunkFormatV4 {
entry.NonIndexedLabels = nil
}
if err := c.head.Append(entryTimestamp, entry.Line, logproto.FromLabelAdaptersToLabels(entry.NonIndexedLabels)); err != nil {
Expand Down Expand Up @@ -940,7 +954,7 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi
mint, maxt := mintT.UnixNano(), maxtT.UnixNano()
blockItrs := make([]iter.EntryIterator, 0, len(c.blocks)+1)

if c.format >= chunkFormatV4 {
if c.format >= ChunkFormatV4 {
stats := stats.FromContext(ctx)
stats.AddCompressedBytes(int64(c.symbolizer.CompressedSize()))
decompressedSize := int64(c.symbolizer.DecompressedSize())
Expand Down Expand Up @@ -1025,7 +1039,7 @@ func (c *MemChunk) SampleIterator(ctx context.Context, from, through time.Time,
mint, maxt := from.UnixNano(), through.UnixNano()
its := make([]iter.SampleIterator, 0, len(c.blocks)+1)

if c.format >= chunkFormatV4 {
if c.format >= ChunkFormatV4 {
stats := stats.FromContext(ctx)
stats.AddCompressedBytes(int64(c.symbolizer.CompressedSize()))
decompressedSize := int64(c.symbolizer.DecompressedSize())
Expand Down Expand Up @@ -1095,12 +1109,12 @@ func (c *MemChunk) Rebound(start, end time.Time, filter filter.Func) (Chunk, err
// as close as possible, respect the block/target sizes specified. However,
// if the blockSize is not set, use reasonable defaults.
if c.blockSize > 0 {
newChunk = NewMemChunk(c.Encoding(), DefaultHeadBlockFmt, c.blockSize, c.targetSize)
newChunk = NewMemChunk(c.format, c.Encoding(), c.headFmt, c.blockSize, c.targetSize)
} else {
// Using defaultBlockSize for target block size.
// The alternative here could be going over all the blocks and using the size of the largest block as target block size but I(Sandeep) feel that it is not worth the complexity.
// For target chunk size I am using compressed size of original chunk since the newChunk should anyways be lower in size than that.
newChunk = NewMemChunk(c.Encoding(), DefaultHeadBlockFmt, defaultBlockSize, c.CompressedSize())
newChunk = NewMemChunk(c.format, c.Encoding(), c.headFmt, defaultBlockSize, c.CompressedSize())
}

for itr.Next() {
Expand Down Expand Up @@ -1423,7 +1437,7 @@ func (si *bufferedIterator) moveNext() (int64, []byte, labels.Labels, bool) {

decompressedBytes += int64(lineSize)

if si.format < chunkFormatV4 {
if si.format < ChunkFormatV4 {
si.stats.AddDecompressedBytes(decompressedBytes)
si.stats.AddDecompressedLines(1)
return ts, si.buf[:lineSize], nil, true
Expand Down
Loading

0 comments on commit 70bfc98

Please sign in to comment.