Skip to content

Commit

Permalink
fix(tsm1): fix data race and validation in cache ring (#20802)
Browse files Browse the repository at this point in the history
Co-authored-by: Yun Zhao <zhaoyun2316@gmail.com>

Co-authored-by: Yun Zhao <zhaoyun2316@gmail.com>
  • Loading branch information
danxmoran and StoneYunZhao authored Feb 24, 2021
1 parent a8b2129 commit e85a102
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 59 deletions.
24 changes: 15 additions & 9 deletions tsdb/engine/tsm1/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ const partitions = 16
// ring is implemented as a crude hash ring, in so much that you can have
// variable numbers of members in the ring, and the appropriate member for a
// given series key can always consistently be found. Unlike a true hash ring
// though, this ring is not resizeable—there must be at most 256 members in the
// though, this ring is not resizeable—there must be at most 16 members in the
// ring, and the number of members must always be a power of 2.
//
// ring works as follows: Each member of the ring contains a single store, which
// contains a map of series keys to entries. A ring always has 256 partitions,
// contains a map of series keys to entries. A ring always has 16 partitions,
// and a member takes up one or more of these partitions (depending on how many
// members are specified to be in the ring)
//
Expand All @@ -47,11 +47,14 @@ type ring struct {
// power of 2, and for performance reasons should be larger than the number of
// cores on the host. The supported set of values for n is:
//
// {1, 2, 4, 8, 16, 32, 64, 128, 256}.
// {1, 2, 4, 8, 16}.
//
func newring(n int) (*ring, error) {
if n <= 0 || n > partitions {
return nil, fmt.Errorf("invalid number of paritions: %d", n)
return nil, fmt.Errorf("invalid number of partitions: %d", n)
}
if n&(n-1) != 0 {
return nil, fmt.Errorf("partitions %d is not a power of two", n)
}

r := ring{
Expand All @@ -78,7 +81,7 @@ func (r *ring) reset() {
for _, partition := range r.partitions {
partition.reset()
}
r.keysHint = 0
atomic.StoreInt64(&r.keysHint, 0)
}

// getPartition retrieves the hash ring partition associated with the provided
Expand Down Expand Up @@ -110,8 +113,11 @@ func (r *ring) add(key []byte, entry *entry) {
// remove is safe for use by multiple goroutines.
func (r *ring) remove(key []byte) {
r.getPartition(key).remove(key)
if r.keysHint > 0 {
atomic.AddInt64(&r.keysHint, -1)
for {
prev := atomic.LoadInt64(&r.keysHint)
if prev <= 0 || atomic.CompareAndSwapInt64(&r.keysHint, prev, prev-1) {
break
}
}
}

Expand All @@ -129,6 +135,8 @@ func (r *ring) keys(sorted bool) [][]byte {
return keys
}

// count returns the number of values in the ring
// count is not accurate since it doesn't use read lock when iterating over partitions
func (r *ring) count() int {
var n int
for _, p := range r.partitions {
Expand Down Expand Up @@ -202,7 +210,6 @@ func (r *ring) applySerial(f func([]byte, *entry) error) error {
}

func (r *ring) split(n int) []storer {
var keys int
storers := make([]storer, n)
for i := 0; i < n; i++ {
storers[i], _ = newring(len(r.partitions))
Expand All @@ -211,7 +218,6 @@ func (r *ring) split(n int) []storer {
for i, p := range r.partitions {
r := storers[i%n].(*ring)
r.partitions[i] = p
keys += len(p.store)
}
return storers
}
Expand Down
95 changes: 45 additions & 50 deletions tsdb/engine/tsm1/ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,41 +5,44 @@ import (
"runtime"
"sync"
"testing"

"github.com/stretchr/testify/require"
)

func TestRing_newRing(t *testing.T) {
examples := []struct {
n int
returnErr bool
}{
{n: 1}, {n: 2}, {n: 4}, {n: 8}, {n: 16}, {n: 32, returnErr: true},
{n: 0, returnErr: true}, {n: 3, returnErr: true},
{n: 1},
{n: 2},
{n: 4},
{n: 8},
{n: 16},
{n: 32, returnErr: true},
{n: 0, returnErr: true},
{n: 3, returnErr: true},
}

for i, example := range examples {
r, err := newring(example.n)
if err != nil {
for _, example := range examples {
t.Run(fmt.Sprintf("ring n %d", example.n), func(t *testing.T) {
r, err := newring(example.n)
if example.returnErr {
continue // expecting an error.
require.Error(t, err)
return
}
t.Fatal(err)
}

if got, exp := len(r.partitions), example.n; got != exp {
t.Fatalf("[Example %d] got %v, expected %v", i, got, exp)
}

// Check partitions distributed correctly
partitions := make([]*partition, 0)
for i, partition := range r.partitions {
if i == 0 || partition != partitions[len(partitions)-1] {
partitions = append(partitions, partition)
require.NoError(t, err)
require.Equal(t, example.n, len(r.partitions))

// Check partitions distributed correctly
partitions := make([]*partition, 0)
for i, partition := range r.partitions {
if i == 0 || partition != partitions[len(partitions)-1] {
partitions = append(partitions, partition)
}
}
}

if got, exp := len(partitions), example.n; got != exp {
t.Fatalf("[Example %d] got %v, expected %v", i, got, exp)
}
require.Equal(t, example.n, len(partitions))
})
}
}

Expand All @@ -48,7 +51,7 @@ var strSliceRes [][]byte
func benchmarkRingkeys(b *testing.B, r *ring, keys int) {
// Add some keys
for i := 0; i < keys; i++ {
r.add([]byte(fmt.Sprintf("cpu,host=server-%d value=1", i)), nil)
r.add([]byte(fmt.Sprintf("cpu,host=server-%d value=1", i)), &entry{})
}

b.ReportAllocs()
Expand All @@ -58,10 +61,10 @@ func benchmarkRingkeys(b *testing.B, r *ring, keys int) {
}
}

func BenchmarkRing_keys_100(b *testing.B) { benchmarkRingkeys(b, MustNewRing(256), 100) }
func BenchmarkRing_keys_1000(b *testing.B) { benchmarkRingkeys(b, MustNewRing(256), 1000) }
func BenchmarkRing_keys_10000(b *testing.B) { benchmarkRingkeys(b, MustNewRing(256), 10000) }
func BenchmarkRing_keys_100000(b *testing.B) { benchmarkRingkeys(b, MustNewRing(256), 100000) }
func BenchmarkRing_keys_100(b *testing.B) { benchmarkRingkeys(b, MustNewRing(16), 100) }
func BenchmarkRing_keys_1000(b *testing.B) { benchmarkRingkeys(b, MustNewRing(16), 1000) }
func BenchmarkRing_keys_10000(b *testing.B) { benchmarkRingkeys(b, MustNewRing(16), 10000) }
func BenchmarkRing_keys_100000(b *testing.B) { benchmarkRingkeys(b, MustNewRing(16), 100000) }

func benchmarkRingGetPartition(b *testing.B, r *ring, keys int) {
vals := make([][]byte, keys)
Expand All @@ -80,10 +83,10 @@ func benchmarkRingGetPartition(b *testing.B, r *ring, keys int) {
}

func BenchmarkRing_getPartition_100(b *testing.B) {
benchmarkRingGetPartition(b, MustNewRing(256), 100)
benchmarkRingGetPartition(b, MustNewRing(16), 100)
}
func BenchmarkRing_getPartition_1000(b *testing.B) {
benchmarkRingGetPartition(b, MustNewRing(256), 1000)
benchmarkRingGetPartition(b, MustNewRing(16), 1000)
}

func benchmarkRingWrite(b *testing.B, r *ring, n int) {
Expand Down Expand Up @@ -116,26 +119,18 @@ func benchmarkRingWrite(b *testing.B, r *ring, n int) {
}
}

func BenchmarkRing_write_1_100(b *testing.B) { benchmarkRingWrite(b, MustNewRing(1), 100) }
func BenchmarkRing_write_1_1000(b *testing.B) { benchmarkRingWrite(b, MustNewRing(1), 1000) }
func BenchmarkRing_write_1_10000(b *testing.B) { benchmarkRingWrite(b, MustNewRing(1), 10000) }
func BenchmarkRing_write_1_100000(b *testing.B) { benchmarkRingWrite(b, MustNewRing(1), 100000) }
func BenchmarkRing_write_4_100(b *testing.B) { benchmarkRingWrite(b, MustNewRing(4), 100) }
func BenchmarkRing_write_4_1000(b *testing.B) { benchmarkRingWrite(b, MustNewRing(4), 1000) }
func BenchmarkRing_write_4_10000(b *testing.B) { benchmarkRingWrite(b, MustNewRing(4), 10000) }
func BenchmarkRing_write_4_100000(b *testing.B) { benchmarkRingWrite(b, MustNewRing(4), 100000) }
func BenchmarkRing_write_32_100(b *testing.B) { benchmarkRingWrite(b, MustNewRing(32), 100) }
func BenchmarkRing_write_32_1000(b *testing.B) { benchmarkRingWrite(b, MustNewRing(32), 1000) }
func BenchmarkRing_write_32_10000(b *testing.B) { benchmarkRingWrite(b, MustNewRing(32), 10000) }
func BenchmarkRing_write_32_100000(b *testing.B) { benchmarkRingWrite(b, MustNewRing(32), 100000) }
func BenchmarkRing_write_128_100(b *testing.B) { benchmarkRingWrite(b, MustNewRing(128), 100) }
func BenchmarkRing_write_128_1000(b *testing.B) { benchmarkRingWrite(b, MustNewRing(128), 1000) }
func BenchmarkRing_write_128_10000(b *testing.B) { benchmarkRingWrite(b, MustNewRing(128), 10000) }
func BenchmarkRing_write_128_100000(b *testing.B) { benchmarkRingWrite(b, MustNewRing(256), 100000) }
func BenchmarkRing_write_256_100(b *testing.B) { benchmarkRingWrite(b, MustNewRing(256), 100) }
func BenchmarkRing_write_256_1000(b *testing.B) { benchmarkRingWrite(b, MustNewRing(256), 1000) }
func BenchmarkRing_write_256_10000(b *testing.B) { benchmarkRingWrite(b, MustNewRing(256), 10000) }
func BenchmarkRing_write_256_100000(b *testing.B) { benchmarkRingWrite(b, MustNewRing(256), 100000) }
func BenchmarkRing_write_1_100(b *testing.B) { benchmarkRingWrite(b, MustNewRing(1), 100) }
func BenchmarkRing_write_1_1000(b *testing.B) { benchmarkRingWrite(b, MustNewRing(1), 1000) }
func BenchmarkRing_write_1_10000(b *testing.B) { benchmarkRingWrite(b, MustNewRing(1), 10000) }
func BenchmarkRing_write_1_100000(b *testing.B) { benchmarkRingWrite(b, MustNewRing(1), 100000) }
func BenchmarkRing_write_4_100(b *testing.B) { benchmarkRingWrite(b, MustNewRing(4), 100) }
func BenchmarkRing_write_4_1000(b *testing.B) { benchmarkRingWrite(b, MustNewRing(4), 1000) }
func BenchmarkRing_write_4_10000(b *testing.B) { benchmarkRingWrite(b, MustNewRing(4), 10000) }
func BenchmarkRing_write_4_100000(b *testing.B) { benchmarkRingWrite(b, MustNewRing(4), 100000) }
func BenchmarkRing_write_16_100(b *testing.B) { benchmarkRingWrite(b, MustNewRing(16), 100) }
func BenchmarkRing_write_16_1000(b *testing.B) { benchmarkRingWrite(b, MustNewRing(16), 1000) }
func BenchmarkRing_write_16_10000(b *testing.B) { benchmarkRingWrite(b, MustNewRing(16), 10000) }
func BenchmarkRing_write_16_100000(b *testing.B) { benchmarkRingWrite(b, MustNewRing(16), 100000) }

func MustNewRing(n int) *ring {
r, err := newring(n)
Expand Down

0 comments on commit e85a102

Please sign in to comment.