Skip to content

Commit

Permalink
Merge pull request #9084 from influxdata/jw-delete-time
Browse files Browse the repository at this point in the history
Handle high cardinality deletes in TSM engine
  • Loading branch information
jwilder authored Nov 13, 2017
2 parents ae0b28a + a8646b6 commit 48e21e6
Show file tree
Hide file tree
Showing 31 changed files with 1,522 additions and 453 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

* The default logging format has been changed. See [#9055](https://github.com/influxdata/influxdb/pull/9055) for details.

### Features

- [#9088](https://github.com/influxdata/influxdb/pull/9084): Handle high cardinality deletes in TSM engine

### Bugfixes

- [#8538](https://github.com/influxdata/influxdb/pull/8538): Fix panic: runtime error: slice bounds out of range
Expand Down
5 changes: 5 additions & 0 deletions cmd/influxd/run/command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ func TestCommand_PIDFile(t *testing.T) {

pidFile := filepath.Join(tmpdir, "influxdb.pid")

// Override the default data/wal dir so it doesn't look in ~/.influxdb which
// might have junk not related to this test.
os.Setenv("INFLUXDB_DATA_DIR", tmpdir)
os.Setenv("INFLUXDB_DATA_WAL_DIR", tmpdir)

cmd := run.NewCommand()
cmd.Getenv = func(key string) string {
switch key {
Expand Down
9 changes: 7 additions & 2 deletions models/points.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,11 @@ func ParsePointsString(buf string) ([]Point, error) {
// NOTE: to minimize heap allocations, the returned Tags will refer to subslices of buf.
// This can have the unintended effect preventing buf from being garbage collected.
func ParseKey(buf []byte) (string, Tags) {
meas, tags := ParseKeyBytes(buf)
return string(meas), tags
}

func ParseKeyBytes(buf []byte) ([]byte, Tags) {
// Ignore the error because scanMeasurement returns "missing fields" which we ignore
// when just parsing a key
state, i, _ := scanMeasurement(buf, 0)
Expand All @@ -271,9 +276,9 @@ func ParseKey(buf []byte) (string, Tags) {
if state == tagKeyState {
tags = parseTags(buf)
// scanMeasurement returns the location of the comma if there are tags, strip that off
return string(buf[:i-1]), tags
return buf[:i-1], tags
}
return string(buf[:i]), tags
return buf[:i], tags
}

func ParseTags(buf []byte) (Tags, error) {
Expand Down
41 changes: 41 additions & 0 deletions pkg/bytesutil/bytesutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,47 @@ func CloneSlice(a [][]byte) [][]byte {
return other
}

// Pack converts a sparse array to a dense one. It removes sections of a containing
// runs of val of length width. The returned value is a subslice of a.
func Pack(a []byte, width int, val byte) []byte {
var i, j, iStart, jStart, end int

fill := make([]byte, width)
for i := 0; i < len(fill); i++ {
fill[i] = val
}

// Skip the first run that won't move
for ; i < len(a) && a[i] != val; i += width {
}
end = i

for i < len(a) {
// Find the next gap to remove
iStart = i
for i < len(a) && a[i] == val {
i += width
}

// Find the next non-gap to keep
jStart = i
for j = i; j < len(a) && a[j] != val; j += width {
}

if jStart == len(a) {
break
}

// Move the non-gap over the section to remove.
copy(a[end:], a[jStart:j])
i = iStart + len(a[jStart:j])
end += j - jStart
i = j
}

return a[:end]
}

type byteSlices [][]byte

func (a byteSlices) Len() int { return len(a) }
Expand Down
97 changes: 97 additions & 0 deletions pkg/bytesutil/bytesutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,100 @@ func TestSearchBytesFixed(t *testing.T) {
t.Fatalf("index mismatch: exp %v, got %v", exp, got)
}
}

func TestPack_WidthOne_One(t *testing.T) {
a := make([]byte, 8)

a[4] = 1

a = bytesutil.Pack(a, 1, 0)
if got, exp := len(a), 1; got != exp {
t.Fatalf("len mismatch: got %v, exp %v", got, exp)
}

for i, v := range []byte{1} {
if got, exp := a[i], v; got != exp {
t.Fatalf("value mismatch: a[%d] = %v, exp %v", i, got, exp)
}
}
}

func TestPack_WidthOne_Two(t *testing.T) {
a := make([]byte, 8)

a[4] = 1
a[6] = 2

a = bytesutil.Pack(a, 1, 0)
if got, exp := len(a), 2; got != exp {
t.Fatalf("len mismatch: got %v, exp %v", got, exp)
}

for i, v := range []byte{1, 2} {
if got, exp := a[i], v; got != exp {
t.Fatalf("value mismatch: a[%d] = %v, exp %v", i, got, exp)
}
}
}

func TestPack_WidthTwo_Two(t *testing.T) {
a := make([]byte, 8)

a[2] = 1
a[3] = 1
a[6] = 2
a[7] = 2

a = bytesutil.Pack(a, 2, 0)
if got, exp := len(a), 4; got != exp {
t.Fatalf("len mismatch: got %v, exp %v", got, exp)
}

for i, v := range []byte{1, 1, 2, 2} {
if got, exp := a[i], v; got != exp {
t.Fatalf("value mismatch: a[%d] = %v, exp %v", i, got, exp)
}
}
}

func TestPack_WidthOne_Last(t *testing.T) {
a := make([]byte, 8)

a[6] = 2
a[7] = 2

a = bytesutil.Pack(a, 2, 255)
if got, exp := len(a), 8; got != exp {
t.Fatalf("len mismatch: got %v, exp %v", got, exp)
}

for i, v := range []byte{0, 0, 0, 0, 0, 0, 2, 2} {
if got, exp := a[i], v; got != exp {
t.Fatalf("value mismatch: a[%d] = %v, exp %v", i, got, exp)
}
}
}

func TestPack_WidthOne_LastFill(t *testing.T) {
a := make([]byte, 8)

a[0] = 255
a[1] = 255
a[2] = 2
a[3] = 2
a[4] = 2
a[5] = 2
a[6] = 2
a[7] = 2

a = bytesutil.Pack(a, 2, 255)
if got, exp := len(a), 6; got != exp {
t.Fatalf("len mismatch: got %v, exp %v", got, exp)
}

for i, v := range []byte{2, 2, 2, 2, 2, 2} {
if got, exp := a[i], v; got != exp {
t.Fatalf("value mismatch: a[%d] = %v, exp %v", i, got, exp)
}
}
}
2 changes: 1 addition & 1 deletion test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ function build_docker_image {
local imagename=$2

echo "Building docker image $imagename"
exit_if_fail docker build -f "$dockerfile" -t "$imagename" .
exit_if_fail docker build --rm=$DOCKER_RM -f "$dockerfile" -t "$imagename" .
}


Expand Down
3 changes: 2 additions & 1 deletion tsdb/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type Engine interface {

CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error
CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error
DeleteSeriesRange(keys [][]byte, min, max int64) error
DeleteSeriesRange(itr SeriesIterator, min, max int64) error

SeriesSketches() (estimator.Sketch, estimator.Sketch, error)
MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error)
Expand All @@ -72,6 +72,7 @@ type Engine interface {
TagKeyCardinality(name, key []byte) int

// InfluxQL iterators
MeasurementSeriesKeysByExprIterator(name []byte, expr influxql.Expr) (SeriesIterator, error)
MeasurementSeriesKeysByExpr(name []byte, condition influxql.Expr) ([][]byte, error)
SeriesPointIterator(opt query.IteratorOptions) (query.Iterator, error)

Expand Down
12 changes: 12 additions & 0 deletions tsdb/engine/tsm1/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,10 @@ func TestCompactor_CompactFull_TombstonedSkipBlock(t *testing.T) {
}
ts.AddRange([][]byte{[]byte("cpu,host=A#!~#value")}, math.MinInt64, math.MaxInt64)

if err := ts.Flush(); err != nil {
t.Fatalf("unexpected error flushing tombstone: %v", err)
}

a3 := tsm1.NewValue(3, 1.3)
writes = map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{a3},
Expand Down Expand Up @@ -563,6 +567,10 @@ func TestCompactor_CompactFull_TombstonedPartialBlock(t *testing.T) {
// a1 should remain after compaction
ts.AddRange([][]byte{[]byte("cpu,host=A#!~#value")}, 2, math.MaxInt64)

if err := ts.Flush(); err != nil {
t.Fatalf("unexpected error flushing tombstone: %v", err)
}

a3 := tsm1.NewValue(3, 1.3)
writes = map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{a3},
Expand Down Expand Up @@ -670,6 +678,10 @@ func TestCompactor_CompactFull_TombstonedMultipleRanges(t *testing.T) {
ts.AddRange([][]byte{[]byte("cpu,host=A#!~#value")}, 2, 2)
ts.AddRange([][]byte{[]byte("cpu,host=A#!~#value")}, 4, 4)

if err := ts.Flush(); err != nil {
t.Fatalf("unexpected error flushing tombstone: %v", err)
}

a5 := tsm1.NewValue(5, 1.5)
writes = map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{a5},
Expand Down
Loading

0 comments on commit 48e21e6

Please sign in to comment.