diff --git a/server/storage/backend/batch_tx_test.go b/server/storage/backend/batch_tx_test.go index cc099d1f904..9f83bc4fcf4 100644 --- a/server/storage/backend/batch_tx_test.go +++ b/server/storage/backend/batch_tx_test.go @@ -15,6 +15,8 @@ package backend_test import ( + "fmt" + "math/rand" "reflect" "testing" "time" @@ -241,24 +243,118 @@ func TestRangeAfterDeleteMatch(t *testing.T) { tx.Unlock() tx.Commit() - checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo"), nil, 0) + checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Test, []byte("foo"), nil, 0) checkForEach(t, b.BatchTx(), b.ReadTx(), [][]byte{[]byte("foo")}, [][]byte{[]byte("bar")}) tx.Lock() tx.UnsafeDelete(schema.Test, []byte("foo")) tx.Unlock() - checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo"), nil, 0) + checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Test, []byte("foo"), nil, 0) checkForEach(t, b.BatchTx(), b.ReadTx(), nil, nil) } -func checkRangeResponseMatch(t *testing.T, tx backend.BatchTx, rtx backend.ReadTx, key, endKey []byte, limit int64) { +func TestRangeAfterUnorderedKeyWriteMatch(t *testing.T) { + b, _ := betesting.NewTmpBackend(t, time.Hour, 10000) + defer betesting.Close(t, b) + + tx := b.BatchTx() + + tx.Lock() + tx.UnsafeCreateBucket(schema.Test) + tx.UnsafePut(schema.Test, []byte("foo5"), []byte("bar5")) + tx.UnsafePut(schema.Test, []byte("foo2"), []byte("bar2")) + tx.UnsafePut(schema.Test, []byte("foo1"), []byte("bar1")) + tx.UnsafePut(schema.Test, []byte("foo3"), []byte("bar3")) + tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar")) + tx.UnsafePut(schema.Test, []byte("foo4"), []byte("bar4")) + tx.Unlock() + + checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Test, []byte("foo"), nil, 1) +} + +func TestRangeAfterAlternatingBucketWriteMatch(t *testing.T) { + b, _ := betesting.NewTmpBackend(t, time.Hour, 10000) + defer betesting.Close(t, b) + + tx := b.BatchTx() + + tx.Lock() + tx.UnsafeCreateBucket(schema.Key) + tx.UnsafeCreateBucket(schema.Test) + tx.UnsafeSeqPut(schema.Key, []byte("key1"), []byte("val1")) + tx.Unlock() + tx.Lock() - ks1, vs1 := tx.UnsafeRange(schema.Test, key, endKey, limit) + tx.UnsafeSeqPut(schema.Key, []byte("key2"), []byte("val2")) + tx.Unlock() + tx.Commit() + // only in the 2nd commit the schema.Key key is removed from the readBuffer.buckets. + // This makes sure to test the case when an empty writeBuffer.bucket + // is used to replace the read buffer bucket. + tx.Commit() + + tx.Lock() + tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar")) + tx.Unlock() + checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Key, []byte("key"), []byte("key5"), 100) + checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Test, []byte("foo"), []byte("foo3"), 1) +} + +func TestRangeAfterOverwriteMatch(t *testing.T) { + b, _ := betesting.NewTmpBackend(t, time.Hour, 10000) + defer betesting.Close(t, b) + + tx := b.BatchTx() + + tx.Lock() + tx.UnsafeCreateBucket(schema.Test) + tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar2")) + tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar0")) + tx.UnsafePut(schema.Test, []byte("foo1"), []byte("bar10")) + tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar1")) + tx.UnsafePut(schema.Test, []byte("foo1"), []byte("bar11")) + tx.Unlock() + + checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Test, []byte("foo"), []byte("foo3"), 1) + checkForEach(t, b.BatchTx(), b.ReadTx(), [][]byte{[]byte("foo"), []byte("foo1")}, [][]byte{[]byte("bar1"), []byte("bar11")}) +} + +func TestRangeAfterOverwriteAndDeleteMatch(t *testing.T) { + b, _ := betesting.NewTmpBackend(t, time.Hour, 10000) + defer betesting.Close(t, b) + + tx := b.BatchTx() + + tx.Lock() + tx.UnsafeCreateBucket(schema.Test) + tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar2")) + tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar0")) + tx.UnsafePut(schema.Test, []byte("foo1"), []byte("bar10")) + tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar1")) + tx.UnsafePut(schema.Test, []byte("foo1"), []byte("bar11")) + tx.Unlock() + + checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Test, []byte("foo"), nil, 0) + checkForEach(t, b.BatchTx(), b.ReadTx(), [][]byte{[]byte("foo"), []byte("foo1")}, [][]byte{[]byte("bar1"), []byte("bar11")}) + + tx.Lock() + tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar3")) + tx.UnsafeDelete(schema.Test, []byte("foo1")) + tx.Unlock() + + checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Test, []byte("foo"), nil, 0) + checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Test, []byte("foo1"), nil, 0) + checkForEach(t, b.BatchTx(), b.ReadTx(), [][]byte{[]byte("foo")}, [][]byte{[]byte("bar3")}) +} + +func checkRangeResponseMatch(t *testing.T, tx backend.BatchTx, rtx backend.ReadTx, bucket backend.Bucket, key, endKey []byte, limit int64) { + tx.Lock() + ks1, vs1 := tx.UnsafeRange(bucket, key, endKey, limit) tx.Unlock() rtx.RLock() - ks2, vs2 := rtx.UnsafeRange(schema.Test, key, endKey, limit) + ks2, vs2 := rtx.UnsafeRange(bucket, key, endKey, limit) rtx.RUnlock() if diff := cmp.Diff(ks1, ks2); diff != "" { @@ -294,3 +390,87 @@ func checkUnsafeForEach(t *testing.T, tx backend.UnsafeReader, expectedKeys, exp t.Errorf("values on transaction doesn't match expected, diff: %s", diff) } } + +// runWriteback is used test the txWriteBuffer.writeback function, which is called inside tx.Unlock(). +// The parameters are chosen based on defaultBatchLimit = 10000 +func runWriteback(t testing.TB, kss, vss [][]string, isSeq bool) { + b, _ := betesting.NewTmpBackend(t, time.Hour, 10000) + defer betesting.Close(t, b) + + tx := b.BatchTx() + + tx.Lock() + tx.UnsafeCreateBucket(schema.Test) + tx.UnsafeCreateBucket(schema.Key) + tx.Unlock() + + for i, ks := range kss { + vs := vss[i] + tx.Lock() + for j := 0; j < len(ks); j++ { + if isSeq { + tx.UnsafeSeqPut(schema.Key, []byte(ks[j]), []byte(vs[j])) + } else { + tx.UnsafePut(schema.Test, []byte(ks[j]), []byte(vs[j])) + } + } + tx.Unlock() + } +} + +func BenchmarkWritebackSeqBatches1BatchSize10000(b *testing.B) { benchmarkWriteback(b, 1, 10000, true) } + +func BenchmarkWritebackSeqBatches10BatchSize1000(b *testing.B) { benchmarkWriteback(b, 10, 1000, true) } + +func BenchmarkWritebackSeqBatches100BatchSize100(b *testing.B) { benchmarkWriteback(b, 100, 100, true) } + +func BenchmarkWritebackSeqBatches1000BatchSize10(b *testing.B) { benchmarkWriteback(b, 1000, 10, true) } + +func BenchmarkWritebackNonSeqBatches1000BatchSize1(b *testing.B) { + // for non sequential writes, the batch size is usually small, 1 or the order of cluster size. + benchmarkWriteback(b, 1000, 1, false) +} + +func BenchmarkWritebackNonSeqBatches10000BatchSize1(b *testing.B) { + benchmarkWriteback(b, 10000, 1, false) +} + +func BenchmarkWritebackNonSeqBatches100BatchSize10(b *testing.B) { + benchmarkWriteback(b, 100, 10, false) +} + +func BenchmarkWritebackNonSeqBatches1000BatchSize10(b *testing.B) { + benchmarkWriteback(b, 1000, 10, false) +} + +func benchmarkWriteback(b *testing.B, batches, batchSize int, isSeq bool) { + // kss and vss are key and value arrays to write with size batches*batchSize + var kss, vss [][]string + for i := 0; i < batches; i++ { + var ks, vs []string + for j := i * batchSize; j < (i+1)*batchSize; j++ { + k := fmt.Sprintf("key%d", j) + v := fmt.Sprintf("val%d", j) + ks = append(ks, k) + vs = append(vs, v) + } + if !isSeq { + // make sure each batch is shuffled differently but the same for different test runs. + shuffleList(ks, i*batchSize) + } + kss = append(kss, ks) + vss = append(vss, vs) + } + b.ResetTimer() + for n := 1; n < b.N; n++ { + runWriteback(b, kss, vss, isSeq) + } +} + +func shuffleList(l []string, seed int) { + r := rand.New(rand.NewSource(int64(seed))) + for i := 0; i < len(l); i++ { + j := r.Intn(i + 1) + l[i], l[j] = l[j], l[i] + } +} diff --git a/server/storage/backend/tx_buffer.go b/server/storage/backend/tx_buffer.go index c6907e7e6b5..821b300bfef 100644 --- a/server/storage/backend/tx_buffer.go +++ b/server/storage/backend/tx_buffer.go @@ -97,6 +97,9 @@ func (txw *txWriteBuffer) writeback(txr *txReadBuffer) { rb, ok := txr.buckets[k] if !ok { delete(txw.buckets, k) + if seq, ok := txw.bucket2seq[k]; ok && !seq { + wb.dedupe() + } txr.buckets[k] = wb continue } @@ -218,10 +221,15 @@ func (bb *bucketBuffer) merge(bbsrc *bucketBuffer) { if bytes.Compare(bb.buf[(bb.used-bbsrc.used)-1].key, bbsrc.buf[0].key) < 0 { return } + bb.dedupe() +} +// dedupe removes duplicates, using only newest update +func (bb *bucketBuffer) dedupe() { + if bb.used <= 1 { + return + } sort.Stable(bb) - - // remove duplicates, using only newest update widx := 0 for ridx := 1; ridx < bb.used; ridx++ { if !bytes.Equal(bb.buf[ridx].key, bb.buf[widx].key) { diff --git a/server/storage/backend/tx_buffer_test.go b/server/storage/backend/tx_buffer_test.go index 745b2a7ec9d..3bcabde256d 100644 --- a/server/storage/backend/tx_buffer_test.go +++ b/server/storage/backend/tx_buffer_test.go @@ -90,3 +90,53 @@ func Test_bucketBuffer_CopyUsed(t *testing.T) { }) } } + +func TestDedupe(t *testing.T) { + tests := []struct { + name string + keys, vals, expectedKeys, expectedVals []string + }{ + { + name: "empty", + keys: []string{}, + vals: []string{}, + expectedKeys: []string{}, + expectedVals: []string{}, + }, + { + name: "single kv", + keys: []string{"key1"}, + vals: []string{"val1"}, + expectedKeys: []string{"key1"}, + expectedVals: []string{"val1"}, + }, + { + name: "duplicate key", + keys: []string{"key1", "key1"}, + vals: []string{"val1", "val2"}, + expectedKeys: []string{"key1"}, + expectedVals: []string{"val2"}, + }, + { + name: "unordered keys", + keys: []string{"key3", "key1", "key4", "key2", "key1", "key4"}, + vals: []string{"val1", "val5", "val3", "val4", "val2", "val6"}, + expectedKeys: []string{"key1", "key2", "key3", "key4"}, + expectedVals: []string{"val2", "val4", "val1", "val6"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + bb := &bucketBuffer{buf: make([]kv, 10), used: 0} + for i := 0; i < len(tt.keys); i++ { + bb.add([]byte(tt.keys[i]), []byte(tt.vals[i])) + } + bb.dedupe() + assert.Equal(t, bb.used, len(tt.expectedKeys)) + for i := 0; i < bb.used; i++ { + assert.Equal(t, bb.buf[i].key, []byte(tt.expectedKeys[i])) + assert.Equal(t, bb.buf[i].val, []byte(tt.expectedVals[i])) + } + }) + } +}