Skip to content

Commit

Permalink
Merge pull request #3735 from benbjohnson/append-threshold
Browse files Browse the repository at this point in the history
Append to small bz1 blocks
  • Loading branch information
benbjohnson committed Aug 20, 2015
2 parents 4e7631a + e57d602 commit 8f12cef
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ Please see the *Features* section below for full details.
- [#3704](https://github.com/influxdb/influxdb/issues/3704): cluster replication issue for measurement name containing backslash
- [#3681](https://github.com/influxdb/influxdb/issues/3681): Quoted measurement names fail
- [#3681](https://github.com/influxdb/influxdb/issues/3682): Fix inserting string value with backslashes
- [#3735](https://github.com/influxdb/influxdb/issues/3735): Append to small bz1 blocks
- [#3736](https://github.com/influxdb/influxdb/pull/3736): Update shard group duration with retention policy changes. Thanks for the report @papylhomme

## v0.9.2 [2015-07-24]
Expand Down
24 changes: 21 additions & 3 deletions tsdb/engine/bz1/bz1.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func init() {

const (
// DefaultBlockSize is the default size of uncompressed points blocks.
DefaultBlockSize = 32 * 1024 // 32KB
DefaultBlockSize = 64 * 1024 // 64KB
)

// Ensure Engine implements the interface.
Expand Down Expand Up @@ -359,12 +359,30 @@ func (e *Engine) writeIndex(tx *bolt.Tx, key string, a [][]byte) error {
//
// This is the optimized fast path. Otherwise we need to merge the points
// with existing blocks on disk and rewrite all the blocks for that range.
if k, v := c.Last(); k == nil || int64(btou64(v[0:8])) < tmin {
if k, v := c.Last(); k == nil {
bkt.FillPercent = 1.0
if err := e.writeBlocks(bkt, a); err != nil {
return fmt.Errorf("append blocks: %s", err)
return fmt.Errorf("new blocks: %s", err)
}
return nil
} else {
// Determine uncompressed block size.
sz, err := snappy.DecodedLen(v[8:])
if err != nil {
return fmt.Errorf("snappy decoded len: %s", err)
}

// Append new blocks if our time range is past the last on-disk time
// and if our previous block was at least the minimum block size.
if int64(btou64(v[0:8])) < tmin && sz >= e.BlockSize {
bkt.FillPercent = 1.0
if err := e.writeBlocks(bkt, a); err != nil {
return fmt.Errorf("append blocks: %s", err)
}
return nil
}

// Otherwise fallthrough to slower insert mode.
}

// Generate map of inserted keys.
Expand Down
87 changes: 78 additions & 9 deletions tsdb/engine/bz1/bz1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,15 +257,15 @@ func TestEngine_WriteIndex_NoPoints(t *testing.T) {
}
}

// Ensure the engine ignores writes without points in a key.
// Ensure the engine can accept randomly generated points.
func TestEngine_WriteIndex_Quick(t *testing.T) {
if testing.Short() {
t.Skip("short mode")
}

quick.Check(func(sets []Points, blockSize int) bool {
quick.Check(func(sets []Points, blockSize uint) bool {
e := OpenDefaultEngine()
e.BlockSize = blockSize % 1024 // 1KB max block size
e.BlockSize = int(blockSize % 1024) // 1KB max block size
defer e.Close()

// Write points to index in multiple sets.
Expand Down Expand Up @@ -304,6 +304,53 @@ func TestEngine_WriteIndex_Quick(t *testing.T) {
}, nil)
}

// Ensure the engine can accept randomly generated append-only points.
func TestEngine_WriteIndex_Quick_Append(t *testing.T) {
if testing.Short() {
t.Skip("short mode")
}

quick.Check(func(sets appendPointSets, blockSize uint) bool {
e := OpenDefaultEngine()
e.BlockSize = int(blockSize % 1024) // 1KB max block size
defer e.Close()

// Write points to index in multiple sets.
for _, set := range sets {
if err := e.WriteIndex(map[string][][]byte(set), nil, nil); err != nil {
t.Fatal(err)
}
}

// Merge all points together.
points := MergePoints([]Points(sets))

// Retrieve a sorted list of keys so results are deterministic.
keys := points.Keys()

// Start transaction to read index.
tx := e.MustBegin(false)
defer tx.Rollback()

// Iterate over results to ensure they are correct.
for _, key := range keys {
c := tx.Cursor(key)

// Read list of key/values.
var got [][]byte
for k, v := c.Seek(u64tob(0)); k != nil; k, v = c.Next() {
got = append(got, append(copyBytes(k), v...))
}

if !reflect.DeepEqual(got, points[key]) {
t.Fatalf("points: block size=%d, key=%s:\n\ngot=%x\n\nexp=%x\n\n", e.BlockSize, key, got, points[key])
}
}

return true
}, nil)
}

func BenchmarkEngine_WriteIndex_512b(b *testing.B) { benchmarkEngine_WriteIndex(b, 512) }
func BenchmarkEngine_WriteIndex_1KB(b *testing.B) { benchmarkEngine_WriteIndex(b, 1*1024) }
func BenchmarkEngine_WriteIndex_4KB(b *testing.B) { benchmarkEngine_WriteIndex(b, 4*1024) }
Expand Down Expand Up @@ -452,23 +499,45 @@ func (m Points) Keys() []string {
}

func (Points) Generate(rand *rand.Rand, size int) reflect.Value {
return reflect.ValueOf(Points(GeneratePoints(rand, size,
rand.Intn(size),
func(_ int) time.Time { return time.Unix(0, 0).Add(time.Duration(rand.Intn(100))) },
)))
}

// appendPointSets represents sets of sequential points. Implements quick.Generator.
type appendPointSets []Points

func (appendPointSets) Generate(rand *rand.Rand, size int) reflect.Value {
sets := make([]Points, 0)
for i, n := 0, rand.Intn(size); i < n; i++ {
sets = append(sets, GeneratePoints(rand, size,
rand.Intn(size),
func(j int) time.Time {
return time.Unix(0, 0).Add((time.Duration(i) * time.Second) + (time.Duration(j) * time.Nanosecond))
},
))
}
return reflect.ValueOf(appendPointSets(sets))
}

func GeneratePoints(rand *rand.Rand, size, seriesN int, timestampFn func(int) time.Time) Points {
// Generate series with a random number of points in each.
m := make(map[string][][]byte)
for i, seriesN := 0, rand.Intn(size); i < seriesN; i++ {
key := strconv.Itoa(rand.Intn(20))
m := make(Points)
for i := 0; i < seriesN; i++ {
key := strconv.Itoa(i)

// Generate points for the series.
for j, pointN := 0, rand.Intn(size); j < pointN; j++ {
timestamp := time.Unix(0, 0).Add(time.Duration(rand.Intn(100)))
timestamp := timestampFn(j)
data, ok := quick.Value(reflect.TypeOf([]byte(nil)), rand)
if !ok {
panic("cannot generate data")
}
m[key] = append(m[key], bz1.MarshalEntry(timestamp.UnixNano(), data.Interface().([]byte)))
}
}

return reflect.ValueOf(Points(m))
return m
}

// MergePoints returns a map of all points merged together by key.
Expand Down

0 comments on commit 8f12cef

Please sign in to comment.