Skip to content

Commit

Permalink
*: allow fully concurrent large read
Browse files Browse the repository at this point in the history
  • Loading branch information
xiang90 committed May 24, 2018
1 parent 20cf7f4 commit cdf23e3
Show file tree
Hide file tree
Showing 7 changed files with 195 additions and 15 deletions.
2 changes: 0 additions & 2 deletions etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@ type Authenticator interface {
}

func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
defer warnOfExpensiveReadOnlyRangeRequest(s.getLogger(), time.Now(), r)

if !r.Serializable {
err := s.linearizableReadNotify(ctx)
if err != nil {
Expand Down
34 changes: 34 additions & 0 deletions mvcc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ var (
)

type Backend interface {
// CommittedReadTx returns a non-blocking read tx that is suitable for large reads.
// CommittedReadTx call itself will not return until the current BatchTx gets committed to
// ensure consistency.
CommittedReadTx() ReadTx

ReadTx() ReadTx
BatchTx() BatchTx

Expand Down Expand Up @@ -97,6 +102,8 @@ type backend struct {

readTx *readTx

concurrentReadTxCh chan chan ReadTx

stopc chan struct{}
donec chan struct{}

Expand Down Expand Up @@ -165,6 +172,8 @@ func newBackend(bcfg BackendConfig) *backend {
buckets: make(map[string]*bolt.Bucket),
},

concurrentReadTxCh: make(chan chan ReadTx),

stopc: make(chan struct{}),
donec: make(chan struct{}),

Expand All @@ -184,6 +193,12 @@ func (b *backend) BatchTx() BatchTx {

func (b *backend) ReadTx() ReadTx { return b.readTx }

func (b *backend) CommittedReadTx() ReadTx {
rch := make(chan ReadTx)
b.concurrentReadTxCh <- rch
return <-rch
}

// ForceCommit forces the current batching tx to commit.
func (b *backend) ForceCommit() {
b.batchTx.Commit()
Expand Down Expand Up @@ -301,6 +316,25 @@ func (b *backend) run() {
b.batchTx.Commit()
}
t.Reset(b.batchInterval)
b.createConcurrentReadTxs()
}
}

func (b *backend) createConcurrentReadTxs() {
// do not allow too many concurrent read txs.
// TODO: improve this by having a global pending counter?
for i := 0; i < 100; i++ {
select {
case rch := <-b.concurrentReadTxCh:
rtx, err := b.db.Begin(false)
if err != nil {
plog.Fatalf("cannot begin read tx (%s)", err)
}
rch <- &concurrentReadTx{tx: rtx}
default:
// no more to create.
return
}
}
}

Expand Down
75 changes: 75 additions & 0 deletions mvcc/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,81 @@ func TestBackendWritebackForEach(t *testing.T) {
}
}

// TestBackendConcurrentReadTx checks if the concurrent tx is created correctly.
func TestBackendConcurrentReadTx(t *testing.T) {
b, tmpPath := NewTmpBackend(2*time.Second, 10000)
defer cleanup(b, tmpPath)

var rtx0 ReadTx
done := make(chan struct{})
go func() {
rtx0 = b.ConcurrentReadTx()
close(done)
}()

tx := b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket([]byte("key"))
for i := 0; i < 5; i++ {
k := []byte(fmt.Sprintf("%04d", i))
tx.UnsafePut([]byte("key"), k, []byte("bar"))
}
tx.Unlock()

select {
case <-done:
t.Fatal("concurrent read tx should block on the last batch tx!")
case <-time.After(time.Second):
}

select {
case <-done:
case <-time.After(4 * time.Second):
t.Fatal("commit the last batched tx should unblock concurrent tx!")
}

rtx0.Lock()
defer rtx0.Unlock()
ks, _ := rtx0.UnsafeRange([]byte("key"), []byte(fmt.Sprintf("%04d", 0)), []byte(fmt.Sprintf("%04d", 5)), 0)
if len(ks) != 5 {
t.Errorf("got %d keys, expect %d", len(ks), 5)
}

// test if we can create concurrent read while the previous read tx is still open
var rtx1 ReadTx
done = make(chan struct{})
go func() {
rtx1 = b.ConcurrentReadTx()
rtx1.Lock()
rtx1.UnsafeForEach([]byte(""), nil)
rtx1.Unlock()
close(done)
}()
select {
case <-done:
case <-time.After(4 * time.Second):
t.Fatal("cannot create concurrent read")
}

done = make(chan struct{})
// test if we can create concurrent write while the previous read tx is still open
go func() {
tx := b.BatchTx()
tx.Lock()
for i := 0; i < 5; i++ {
k := []byte(fmt.Sprintf("%04d", i))
tx.UnsafePut([]byte("key"), k, []byte("bar"))
}
tx.Unlock()
close(done)
}()
select {
case <-done:
case <-time.After(4 * time.Second):
t.Fatal("cannot create concurrent write")
}
}

func cleanup(b Backend, path string) {
b.Close()
os.Remove(path)
Expand Down
38 changes: 38 additions & 0 deletions mvcc/backend/concurrent_read_tx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package backend

import (
bolt "github.com/coreos/bbolt"
)

type concurrentReadTx struct {
tx *bolt.Tx
}

func (rt *concurrentReadTx) Lock() {}
func (rt *concurrentReadTx) Unlock() { rt.tx.Rollback() }

func (rt *concurrentReadTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
bucket := rt.tx.Bucket(bucketName)
if bucket == nil {
plog.Fatalf("bucket %s does not exist", bucketName)
}
return unsafeRange(bucket.Cursor(), key, endKey, limit)
}

func (rt *concurrentReadTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
return unsafeForEach(rt.tx, bucketName, visitor)
}
7 changes: 4 additions & 3 deletions mvcc/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,11 @@ func (ti *treeIndex) keyIndex(keyi *keyIndex) *keyIndex {
func (ti *treeIndex) visit(key, end []byte, f func(ki *keyIndex)) {
keyi, endi := &keyIndex{key: key}, &keyIndex{key: end}

ti.RLock()
defer ti.RUnlock()
ti.Lock()
clone := ti.tree.Clone()
ti.Unlock()

ti.tree.AscendGreaterOrEqual(keyi, func(item btree.Item) bool {
clone.AscendGreaterOrEqual(keyi, func(item btree.Item) bool {
if len(endi.key) > 0 && !item.Less(endi) {
return false
}
Expand Down
3 changes: 2 additions & 1 deletion mvcc/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ func TestTxnBlockBackendForceCommit(t *testing.T) {
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
defer os.Remove(tmpPath)

txn := s.Read()
txn := s.Write()

done := make(chan struct{})
go func() {
Expand Down Expand Up @@ -742,6 +742,7 @@ type fakeBackend struct {

func (b *fakeBackend) BatchTx() backend.BatchTx { return b.tx }
func (b *fakeBackend) ReadTx() backend.ReadTx { return b.tx }
func (b *fakeBackend) ConcurrentReadTx() backend.ReadTx { return b.tx }
func (b *fakeBackend) Hash(ignores map[backend.IgnoreKey]struct{}) (uint32, error) { return 0, nil }
func (b *fakeBackend) Size() int64 { return 0 }
func (b *fakeBackend) SizeInUse() int64 { return 0 }
Expand Down
51 changes: 42 additions & 9 deletions mvcc/kvstore_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,21 @@ import (
"go.uber.org/zap"
)

const (
expensiveReadLimit = 1000
readonly = true
readwrite = false
)

type storeTxnRead struct {
s *store
tx backend.ReadTx
s *store
tx backend.ReadTx
txlocked bool

// for creating concurrent read tx when the read is expensive.
b backend.Backend
// is the transcation readonly?
ro bool

firstRev int64
rev int64
Expand All @@ -33,10 +45,15 @@ func (s *store) Read() TxnRead {
s.mu.RLock()
tx := s.b.ReadTx()
s.revMu.RLock()
tx.Lock()
firstRev, rev := s.compactMainRev, s.currentRev
s.revMu.RUnlock()
return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev})
return newMetricsTxnRead(&storeTxnRead{
s: s,
tx: tx,
b: s.b,
ro: readonly,
firstRev: firstRev,
rev: rev})
}

func (tr *storeTxnRead) FirstRev() int64 { return tr.firstRev }
Expand All @@ -47,7 +64,9 @@ func (tr *storeTxnRead) Range(key, end []byte, ro RangeOptions) (r *RangeResult,
}

func (tr *storeTxnRead) End() {
tr.tx.Unlock()
if tr.txlocked {
tr.tx.Unlock()
}
tr.s.mu.RUnlock()
}

Expand All @@ -64,10 +83,15 @@ func (s *store) Write() TxnWrite {
tx := s.b.BatchTx()
tx.Lock()
tw := &storeTxnWrite{
storeTxnRead: storeTxnRead{s, tx, 0, 0},
tx: tx,
beginRev: s.currentRev,
changes: make([]mvccpb.KeyValue, 0, 4),
storeTxnRead: storeTxnRead{
s: s,
txlocked: true,
tx: tx,
ro: readwrite,
},
tx: tx,
beginRev: s.currentRev,
changes: make([]mvccpb.KeyValue, 0, 4),
}
return newMetricsTxnWrite(tw)
}
Expand Down Expand Up @@ -134,6 +158,15 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions
limit = len(revpairs)
}

if limit > expensiveReadLimit && !tr.txlocked && tr.ro { // first expensive read in a read only transcation
// too many keys to range. upgrade the read transcation to concurrent read tx.
tr.tx = tr.b.CommittedReadTx()
}
if !tr.txlocked {
tr.tx.Lock()
tr.txlocked = true
}

kvs := make([]mvccpb.KeyValue, limit)
revBytes := newRevBytes()
for i, revpair := range revpairs[:len(kvs)] {
Expand Down

0 comments on commit cdf23e3

Please sign in to comment.