diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d696d72e95..7473c8a4e51 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -71,6 +71,7 @@ Please see the *Features* section below for full details. - [#3704](https://github.com/influxdb/influxdb/issues/3704): cluster replication issue for measurement name containing backslash - [#3681](https://github.com/influxdb/influxdb/issues/3681): Quoted measurement names fail - [#3681](https://github.com/influxdb/influxdb/issues/3682): Fix inserting string value with backslashes +- [#3735](https://github.com/influxdb/influxdb/issues/3735): Append to small bz1 blocks - [#3736](https://github.com/influxdb/influxdb/pull/3736): Update shard group duration with retention policy changes. Thanks for the report @papylhomme ## v0.9.2 [2015-07-24] diff --git a/tsdb/engine/bz1/bz1.go b/tsdb/engine/bz1/bz1.go index e10a4673f2e..f3fc14e9657 100644 --- a/tsdb/engine/bz1/bz1.go +++ b/tsdb/engine/bz1/bz1.go @@ -36,7 +36,7 @@ func init() { const ( // DefaultBlockSize is the default size of uncompressed points blocks. - DefaultBlockSize = 32 * 1024 // 32KB + DefaultBlockSize = 64 * 1024 // 64KB ) // Ensure Engine implements the interface. @@ -359,12 +359,30 @@ func (e *Engine) writeIndex(tx *bolt.Tx, key string, a [][]byte) error { // // This is the optimized fast path. Otherwise we need to merge the points // with existing blocks on disk and rewrite all the blocks for that range. - if k, v := c.Last(); k == nil || int64(btou64(v[0:8])) < tmin { + if k, v := c.Last(); k == nil { bkt.FillPercent = 1.0 if err := e.writeBlocks(bkt, a); err != nil { - return fmt.Errorf("append blocks: %s", err) + return fmt.Errorf("new blocks: %s", err) } return nil + } else { + // Determine uncompressed block size. + sz, err := snappy.DecodedLen(v[8:]) + if err != nil { + return fmt.Errorf("snappy decoded len: %s", err) + } + + // Append new blocks if our time range is past the last on-disk time + // and if our previous block was at least the minimum block size. + if int64(btou64(v[0:8])) < tmin && sz >= e.BlockSize { + bkt.FillPercent = 1.0 + if err := e.writeBlocks(bkt, a); err != nil { + return fmt.Errorf("append blocks: %s", err) + } + return nil + } + + // Otherwise fallthrough to slower insert mode. } // Generate map of inserted keys. diff --git a/tsdb/engine/bz1/bz1_test.go b/tsdb/engine/bz1/bz1_test.go index b4a07cd6bce..d82c9d87a91 100644 --- a/tsdb/engine/bz1/bz1_test.go +++ b/tsdb/engine/bz1/bz1_test.go @@ -257,15 +257,15 @@ func TestEngine_WriteIndex_NoPoints(t *testing.T) { } } -// Ensure the engine ignores writes without points in a key. +// Ensure the engine can accept randomly generated points. func TestEngine_WriteIndex_Quick(t *testing.T) { if testing.Short() { t.Skip("short mode") } - quick.Check(func(sets []Points, blockSize int) bool { + quick.Check(func(sets []Points, blockSize uint) bool { e := OpenDefaultEngine() - e.BlockSize = blockSize % 1024 // 1KB max block size + e.BlockSize = int(blockSize % 1024) // 1KB max block size defer e.Close() // Write points to index in multiple sets. @@ -304,6 +304,53 @@ func TestEngine_WriteIndex_Quick(t *testing.T) { }, nil) } +// Ensure the engine can accept randomly generated append-only points. +func TestEngine_WriteIndex_Quick_Append(t *testing.T) { + if testing.Short() { + t.Skip("short mode") + } + + quick.Check(func(sets appendPointSets, blockSize uint) bool { + e := OpenDefaultEngine() + e.BlockSize = int(blockSize % 1024) // 1KB max block size + defer e.Close() + + // Write points to index in multiple sets. + for _, set := range sets { + if err := e.WriteIndex(map[string][][]byte(set), nil, nil); err != nil { + t.Fatal(err) + } + } + + // Merge all points together. + points := MergePoints([]Points(sets)) + + // Retrieve a sorted list of keys so results are deterministic. + keys := points.Keys() + + // Start transaction to read index. + tx := e.MustBegin(false) + defer tx.Rollback() + + // Iterate over results to ensure they are correct. + for _, key := range keys { + c := tx.Cursor(key) + + // Read list of key/values. + var got [][]byte + for k, v := c.Seek(u64tob(0)); k != nil; k, v = c.Next() { + got = append(got, append(copyBytes(k), v...)) + } + + if !reflect.DeepEqual(got, points[key]) { + t.Fatalf("points: block size=%d, key=%s:\n\ngot=%x\n\nexp=%x\n\n", e.BlockSize, key, got, points[key]) + } + } + + return true + }, nil) +} + func BenchmarkEngine_WriteIndex_512b(b *testing.B) { benchmarkEngine_WriteIndex(b, 512) } func BenchmarkEngine_WriteIndex_1KB(b *testing.B) { benchmarkEngine_WriteIndex(b, 1*1024) } func BenchmarkEngine_WriteIndex_4KB(b *testing.B) { benchmarkEngine_WriteIndex(b, 4*1024) } @@ -452,14 +499,37 @@ func (m Points) Keys() []string { } func (Points) Generate(rand *rand.Rand, size int) reflect.Value { + return reflect.ValueOf(Points(GeneratePoints(rand, size, + rand.Intn(size), + func(_ int) time.Time { return time.Unix(0, 0).Add(time.Duration(rand.Intn(100))) }, + ))) +} + +// appendPointSets represents sets of sequential points. Implements quick.Generator. +type appendPointSets []Points + +func (appendPointSets) Generate(rand *rand.Rand, size int) reflect.Value { + sets := make([]Points, 0) + for i, n := 0, rand.Intn(size); i < n; i++ { + sets = append(sets, GeneratePoints(rand, size, + rand.Intn(size), + func(j int) time.Time { + return time.Unix(0, 0).Add((time.Duration(i) * time.Second) + (time.Duration(j) * time.Nanosecond)) + }, + )) + } + return reflect.ValueOf(appendPointSets(sets)) +} + +func GeneratePoints(rand *rand.Rand, size, seriesN int, timestampFn func(int) time.Time) Points { // Generate series with a random number of points in each. - m := make(map[string][][]byte) - for i, seriesN := 0, rand.Intn(size); i < seriesN; i++ { - key := strconv.Itoa(rand.Intn(20)) + m := make(Points) + for i := 0; i < seriesN; i++ { + key := strconv.Itoa(i) // Generate points for the series. for j, pointN := 0, rand.Intn(size); j < pointN; j++ { - timestamp := time.Unix(0, 0).Add(time.Duration(rand.Intn(100))) + timestamp := timestampFn(j) data, ok := quick.Value(reflect.TypeOf([]byte(nil)), rand) if !ok { panic("cannot generate data") @@ -467,8 +537,7 @@ func (Points) Generate(rand *rand.Rand, size int) reflect.Value { m[key] = append(m[key], bz1.MarshalEntry(timestamp.UnixNano(), data.Interface().([]byte))) } } - - return reflect.ValueOf(Points(m)) + return m } // MergePoints returns a map of all points merged together by key.