Skip to content

Commit

Permalink
Merge pull request #7399 from influxdata/jw-backport
Browse files Browse the repository at this point in the history
Backport 1.0.2 Fixes
  • Loading branch information
jwilder authored Oct 3, 2016
2 parents 601d440 + ac4ae1a commit 2c0d5b1
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 23 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

- [#7150](https://github.com/influxdata/influxdb/issues/7150): Do not automatically reset the shard duration when using ALTER RETENTION POLICY
- [#5878](https://github.com/influxdata/influxdb/issues/5878): Ensure correct shard groups created when retention policy has been altered.

- [#7391](https://github.com/influxdata/influxdb/issues/7391): Fix RLE integer decoding producing negative numbers
- [#7335](https://github.com/influxdata/influxdb/pull/7335): Avoid stat syscall when planning compactions

## v1.0.1 [2016-09-26]

Expand Down
3 changes: 1 addition & 2 deletions tsdb/engine/tsm1/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,9 +458,8 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup {
// findGenerations groups all the TSM files by they generation based
// on their filename then returns the generations in descending order (newest first)
func (c *DefaultPlanner) findGenerations() tsmGenerations {
generations := map[int]*tsmGeneration{}

tsmStats := c.FileStore.Stats()
generations := make(map[int]*tsmGeneration, len(tsmStats))
for _, f := range tsmStats {
gen, _, _ := ParseTSMFileName(f.Path)

Expand Down
2 changes: 1 addition & 1 deletion tsdb/engine/tsm1/int.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (d *IntegerDecoder) Error() error {
func (d *IntegerDecoder) Read() int64 {
switch d.encoding {
case intCompressedRLE:
return ZigZagDecode(d.rleFirst + uint64(d.i)*d.rleDelta)
return ZigZagDecode(d.rleFirst) + int64(d.i)*ZigZagDecode(d.rleDelta)
default:
v := ZigZagDecode(d.values[d.i])
// v is the delta encoded value, we need to add the prior value to get the original
Expand Down
88 changes: 88 additions & 0 deletions tsdb/engine/tsm1/int_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,94 @@ func Test_IntegerEncoder_CounterRLE(t *testing.T) {
t.Fatalf("unexpected error: %v", err)
}

if b[0]>>4 != intCompressedRLE {
t.Fatalf("unexpected encoding format: expected RLE, got %v", b[0]>>4)
}

// Should use 1 header byte, 8 byte first value, 1 var-byte for delta and 1 var-byte for
// count of deltas in this particular RLE.
if exp := 11; len(b) != exp {
t.Fatalf("encoded length mismatch: got %v, exp %v", len(b), exp)
}

var dec IntegerDecoder
dec.SetBytes(b)
i := 0
for dec.Next() {
if i > len(values) {
t.Fatalf("read too many values: got %v, exp %v", i, len(values))
}

if values[i] != dec.Read() {
t.Fatalf("read value %d mismatch: got %v, exp %v", i, dec.Read(), values[i])
}
i += 1
}

if i != len(values) {
t.Fatalf("failed to read enough values: got %v, exp %v", i, len(values))
}
}

func Test_IntegerEncoder_Descending(t *testing.T) {
enc := NewIntegerEncoder()
values := []int64{
7094, 4472, 1850,
}

for _, v := range values {
enc.Write(v)
}

b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

if b[0]>>4 != intCompressedRLE {
t.Fatalf("unexpected encoding format: expected simple, got %v", b[0]>>4)
}

// Should use 1 header byte, 8 byte first value, 1 var-byte for delta and 1 var-byte for
// count of deltas in this particular RLE.
if exp := 12; len(b) != exp {
t.Fatalf("encoded length mismatch: got %v, exp %v", len(b), exp)
}

var dec IntegerDecoder
dec.SetBytes(b)
i := 0
for dec.Next() {
if i > len(values) {
t.Fatalf("read too many values: got %v, exp %v", i, len(values))
}

if values[i] != dec.Read() {
t.Fatalf("read value %d mismatch: got %v, exp %v", i, dec.Read(), values[i])
}
i += 1
}

if i != len(values) {
t.Fatalf("failed to read enough values: got %v, exp %v", i, len(values))
}
}

func Test_IntegerEncoder_Flat(t *testing.T) {
enc := NewIntegerEncoder()
values := []int64{
1, 1, 1, 1,
}

for _, v := range values {
enc.Write(v)
}

b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

if b[0]>>4 != intCompressedRLE {
t.Fatalf("unexpected encoding format: expected simple, got %v", b[0]>>4)
}
Expand Down
6 changes: 3 additions & 3 deletions tsdb/engine/tsm1/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1167,9 +1167,9 @@ func (m *mmapAccessor) readAll(key string) ([]Value, error) {

func (m *mmapAccessor) path() string {
m.mu.RLock()
defer m.mu.RUnlock()

return m.f.Name()
path := m.f.Name()
m.mu.RUnlock()
return path
}

func (m *mmapAccessor) close() error {
Expand Down
54 changes: 38 additions & 16 deletions tsdb/engine/tsm1/tombstone.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@ const (
)

type Tombstoner struct {
mu sync.Mutex
mu sync.RWMutex

// Path is the location of the file to record tombstone. This should be the
// full path to a TSM file.
Path string

// cache of the stats for this tombstone
fileStats []FileStat
// indicates that the stats may be out of sync with what is on disk and they
// should be refreshed.
statsLoaded bool
}

type Tombstone struct {
Expand Down Expand Up @@ -53,6 +59,8 @@ func (t *Tombstoner) AddRange(keys []string, min, max int64) error {
return nil
}

t.statsLoaded = false

tombstones, err := t.readTombstone()
if err != nil {
return nil
Expand All @@ -79,34 +87,48 @@ func (t *Tombstoner) Delete() error {
if err := os.RemoveAll(t.tombstonePath()); err != nil {
return err
}
t.statsLoaded = false
return nil
}

// HasTombstones return true if there are any tombstone entries recorded.
func (t *Tombstoner) HasTombstones() bool {
stat, err := os.Stat(t.tombstonePath())
if err != nil {
return false
}

return stat.Size() > 0
files := t.TombstoneFiles()
return len(files) > 0 && files[0].Size > 0
}

// TombstoneFiles returns any tombstone files associated with this TSM file.
func (t *Tombstoner) TombstoneFiles() []FileStat {
stat, err := os.Stat(t.tombstonePath())
if err != nil {
return nil
t.mu.RLock()
if t.statsLoaded {
stats := t.fileStats
t.mu.RUnlock()
return stats
}
t.mu.RUnlock()

if stat.Size() > 0 {
return []FileStat{FileStat{
Path: t.tombstonePath(),
LastModified: stat.ModTime().UnixNano(),
Size: uint32(stat.Size())}}
stat, err := os.Stat(t.tombstonePath())
if os.IsNotExist(err) || err != nil {
t.mu.Lock()
// The file doesn't exist so record that we tried to load it so
// we don't continue to keep trying. This is the common case.
t.statsLoaded = os.IsNotExist(err)
t.fileStats = t.fileStats[:0]
t.mu.Unlock()
return nil
}

return nil
t.mu.Lock()
t.fileStats = append(t.fileStats[:0], FileStat{
Path: t.tombstonePath(),
LastModified: stat.ModTime().UnixNano(),
Size: uint32(stat.Size()),
})
t.statsLoaded = true
stats := t.fileStats
t.mu.Unlock()

return stats
}

func (t *Tombstoner) Walk(fn func(t Tombstone) error) error {
Expand Down
33 changes: 33 additions & 0 deletions tsdb/engine/tsm1/tombstone_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,35 @@ func TestTombstoner_Add(t *testing.T) {
t.Fatalf("length mismatch: got %v, exp %v", got, exp)
}

stats := ts.TombstoneFiles()
if got, exp := len(stats), 0; got != exp {
t.Fatalf("stat length mismatch: got %v, exp %v", got, exp)
}

ts.Add([]string{"foo"})

entries, err = ts.ReadAll()
if err != nil {
fatal(t, "ReadAll", err)
}

stats = ts.TombstoneFiles()
if got, exp := len(stats), 1; got != exp {
t.Fatalf("stat length mismatch: got %v, exp %v", got, exp)
}

if stats[0].Size == 0 {
t.Fatalf("got size %v, exp > 0", stats[0].Size)
}

if stats[0].LastModified == 0 {
t.Fatalf("got lastModified %v, exp > 0", stats[0].LastModified)
}

if stats[0].Path == "" {
t.Fatalf("got path %v, exp != ''", stats[0].Path)
}

if got, exp := len(entries), 1; got != exp {
t.Fatalf("length mismatch: got %v, exp %v", got, exp)
}
Expand Down Expand Up @@ -83,6 +105,12 @@ func TestTombstoner_Add_Empty(t *testing.T) {
if got, exp := len(entries), 0; got != exp {
t.Fatalf("length mismatch: got %v, exp %v", got, exp)
}

stats := ts.TombstoneFiles()
if got, exp := len(stats), 0; got != exp {
t.Fatalf("stat length mismatch: got %v, exp %v", got, exp)
}

}

func TestTombstoner_Delete(t *testing.T) {
Expand Down Expand Up @@ -113,6 +141,11 @@ func TestTombstoner_Delete(t *testing.T) {
fatal(t, "delete tombstone", err)
}

stats := ts.TombstoneFiles()
if got, exp := len(stats), 0; got != exp {
t.Fatalf("stat length mismatch: got %v, exp %v", got, exp)
}

ts = &tsm1.Tombstoner{Path: f.Name()}
entries, err = ts.ReadAll()
if err != nil {
Expand Down

0 comments on commit 2c0d5b1

Please sign in to comment.