From 8d438c2939468a8539fdd8928e73f3d92584ce34 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Thu, 5 Jan 2017 02:07:50 -0800 Subject: [PATCH 1/6] backend: readtx ReadTxs are designed for read-only accesses to the backend using a read-only boltDB transaction. Since BatchTx's are long-running transactions, all writes to BatchTx will writeback to ReadTx, overlaying the base read-only transaction. --- mvcc/backend/backend.go | 27 +++++- mvcc/backend/backend_test.go | 75 +++++++++++++++ mvcc/backend/batch_tx.go | 144 ++++++++++++++++++++-------- mvcc/backend/read_tx.go | 92 ++++++++++++++++++ mvcc/backend/tx_buffer.go | 181 +++++++++++++++++++++++++++++++++++ 5 files changed, 477 insertions(+), 42 deletions(-) create mode 100644 mvcc/backend/read_tx.go create mode 100644 mvcc/backend/tx_buffer.go diff --git a/mvcc/backend/backend.go b/mvcc/backend/backend.go index 1be8b6697a0..f65e6c963eb 100644 --- a/mvcc/backend/backend.go +++ b/mvcc/backend/backend.go @@ -53,7 +53,9 @@ const ( ) type Backend interface { + ReadTx() ReadTx BatchTx() BatchTx + Snapshot() Snapshot Hash(ignores map[IgnoreKey]struct{}) (uint32, error) // Size returns the current size of the backend. @@ -86,7 +88,9 @@ type backend struct { batchInterval time.Duration batchLimit int - batchTx *batchTx + batchTx *batchTxBuffered + + readTx *readTx stopc chan struct{} donec chan struct{} @@ -106,16 +110,22 @@ func newBackend(path string, d time.Duration, limit int) *backend { plog.Panicf("cannot open database at %s (%v)", path, err) } + // In future, may want to make buffering optional for low-concurrency systems + // or dynamically swap between buffered/non-buffered depending on workload. b := &backend{ db: db, batchInterval: d, batchLimit: limit, + readTx: &readTx{buf: txReadBuffer{ + txBuffer: txBuffer{make(map[string]*bucketBuffer)}}, + }, + stopc: make(chan struct{}), donec: make(chan struct{}), } - b.batchTx = newBatchTx(b) + b.batchTx = newBatchTxBuffered(b) go b.run() return b } @@ -127,6 +137,8 @@ func (b *backend) BatchTx() BatchTx { return b.batchTx } +func (b *backend) ReadTx() ReadTx { return b.readTx } + // ForceCommit forces the current batching tx to commit. func (b *backend) ForceCommit() { b.batchTx.Commit() @@ -328,6 +340,17 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error { return tmptx.Commit() } +func (b *backend) begin(write bool) *bolt.Tx { + b.mu.RLock() + tx, err := b.db.Begin(write) + if err != nil { + plog.Fatalf("cannot begin tx (%s)", err) + } + b.mu.RUnlock() + atomic.StoreInt64(&b.size, tx.Size()) + return tx +} + // NewTmpBackend creates a backend implementation for testing. func NewTmpBackend(batchInterval time.Duration, batchLimit int) (*backend, string) { dir, err := ioutil.TempDir(os.TempDir(), "etcd_backend_test") diff --git a/mvcc/backend/backend_test.go b/mvcc/backend/backend_test.go index 06d908db523..68d0b19599e 100644 --- a/mvcc/backend/backend_test.go +++ b/mvcc/backend/backend_test.go @@ -18,6 +18,7 @@ import ( "fmt" "io/ioutil" "os" + "reflect" "testing" "time" @@ -173,6 +174,80 @@ func TestBackendDefrag(t *testing.T) { b.ForceCommit() } +// TestBackendWriteback ensures writes are stored to the read txn on write txn unlock. +func TestBackendWriteback(t *testing.T) { + b, tmpPath := NewDefaultTmpBackend() + defer cleanup(b, tmpPath) + + tx := b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket([]byte("key")) + tx.UnsafePut([]byte("key"), []byte("abc"), []byte("bar")) + tx.UnsafePut([]byte("key"), []byte("def"), []byte("baz")) + tx.UnsafePut([]byte("key"), []byte("overwrite"), []byte("1")) + tx.Unlock() + + // overwrites should be propagated too + tx.Lock() + tx.UnsafePut([]byte("key"), []byte("overwrite"), []byte("2")) + tx.Unlock() + + keys := []struct { + key []byte + end []byte + limit int64 + + wkey [][]byte + wval [][]byte + }{ + { + key: []byte("abc"), + end: nil, + + wkey: [][]byte{[]byte("abc")}, + wval: [][]byte{[]byte("bar")}, + }, + { + key: []byte("abc"), + end: []byte("def"), + + wkey: [][]byte{[]byte("abc")}, + wval: [][]byte{[]byte("bar")}, + }, + { + key: []byte("abc"), + end: []byte("deg"), + + wkey: [][]byte{[]byte("abc"), []byte("def")}, + wval: [][]byte{[]byte("bar"), []byte("baz")}, + }, + { + key: []byte("abc"), + end: []byte("\xff"), + limit: 1, + + wkey: [][]byte{[]byte("abc")}, + wval: [][]byte{[]byte("bar")}, + }, + { + key: []byte("abc"), + end: []byte("\xff"), + + wkey: [][]byte{[]byte("abc"), []byte("def"), []byte("overwrite")}, + wval: [][]byte{[]byte("bar"), []byte("baz"), []byte("2")}, + }, + } + rtx := b.ReadTx() + for i, tt := range keys { + rtx.Lock() + k, v := rtx.UnsafeRange([]byte("key"), tt.key, tt.end, tt.limit) + rtx.Unlock() + if !reflect.DeepEqual(tt.wkey, k) || !reflect.DeepEqual(tt.wval, v) { + t.Errorf("#%d: want k=%+v, v=%+v; got k=%+v, v=%+v", i, tt.wkey, tt.wval, k, v) + } + } +} + func cleanup(b Backend, path string) { b.Close() os.Remove(path) diff --git a/mvcc/backend/batch_tx.go b/mvcc/backend/batch_tx.go index 04fea1e9477..1c248de5997 100644 --- a/mvcc/backend/batch_tx.go +++ b/mvcc/backend/batch_tx.go @@ -16,6 +16,8 @@ package backend import ( "bytes" + "fmt" + "math" "sync" "sync/atomic" "time" @@ -24,15 +26,14 @@ import ( ) type BatchTx interface { - Lock() - Unlock() + ReadTx UnsafeCreateBucket(name []byte) UnsafePut(bucketName []byte, key []byte, value []byte) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) - UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) UnsafeDelete(bucketName []byte, key []byte) - UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error + // Commit commits a previous tx and begins a new writable one. Commit() + // CommitAndStop commits the previous tx and does not create a new one. CommitAndStop() } @@ -40,13 +41,8 @@ type batchTx struct { sync.Mutex tx *bolt.Tx backend *backend - pending int -} -func newBatchTx(backend *backend) *batchTx { - tx := &batchTx{backend: backend} - tx.Commit() - return tx + pending int } func (t *batchTx) UnsafeCreateBucket(name []byte) { @@ -84,30 +80,37 @@ func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq boo } // UnsafeRange must be called holding the lock on the tx. -func (t *batchTx) UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) { - bucket := t.tx.Bucket(bucketName) - if bucket == nil { - plog.Fatalf("bucket %s does not exist", bucketName) +func (t *batchTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) { + k, v, err := unsafeRange(t.tx, bucketName, key, endKey, limit) + if err != nil { + plog.Fatal(err) } + return k, v +} +func unsafeRange(tx *bolt.Tx, bucketName, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte, err error) { + bucket := tx.Bucket(bucketName) + if bucket == nil { + return nil, nil, fmt.Errorf("bucket %s does not exist", bucketName) + } if len(endKey) == 0 { - if v := bucket.Get(key); v == nil { - return keys, vs - } else { - return append(keys, key), append(vs, v) + if v := bucket.Get(key); v != nil { + return append(keys, key), append(vs, v), nil } + return nil, nil, nil + } + if limit <= 0 { + limit = math.MaxInt64 } - c := bucket.Cursor() for ck, cv := c.Seek(key); ck != nil && bytes.Compare(ck, endKey) < 0; ck, cv = c.Next() { vs = append(vs, cv) keys = append(keys, ck) - if limit > 0 && limit == int64(len(keys)) { + if limit == int64(len(keys)) { break } } - - return keys, vs + return keys, vs, nil } // UnsafeDelete must be called holding the lock on the tx. @@ -125,12 +128,14 @@ func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) { // UnsafeForEach must be called holding the lock on the tx. func (t *batchTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error { - b := t.tx.Bucket(bucketName) - if b == nil { - // bucket does not exist - return nil + return unsafeForEach(t.tx, bucketName, visitor) +} + +func unsafeForEach(tx *bolt.Tx, bucket []byte, visitor func(k, v []byte) error) error { + if b := tx.Bucket(bucket); b != nil { + return b.ForEach(visitor) } - return b.ForEach(visitor) + return nil } // Commit commits a previous tx and begins a new writable one. @@ -140,7 +145,7 @@ func (t *batchTx) Commit() { t.commit(false) } -// CommitAndStop commits the previous tx and do not create a new one. +// CommitAndStop commits the previous tx and does not create a new one. func (t *batchTx) CommitAndStop() { t.Lock() defer t.Unlock() @@ -150,13 +155,11 @@ func (t *batchTx) CommitAndStop() { func (t *batchTx) Unlock() { if t.pending >= t.backend.batchLimit { t.commit(false) - t.pending = 0 } t.Mutex.Unlock() } func (t *batchTx) commit(stop bool) { - var err error // commit the last tx if t.tx != nil { if t.pending == 0 && !stop { @@ -178,9 +181,10 @@ func (t *batchTx) commit(stop bool) { } return } + start := time.Now() // gofail: var beforeCommit struct{} - err = t.tx.Commit() + err := t.tx.Commit() // gofail: var afterCommit struct{} commitDurations.Observe(time.Since(start).Seconds()) atomic.AddInt64(&t.backend.commits, 1) @@ -190,17 +194,77 @@ func (t *batchTx) commit(stop bool) { plog.Fatalf("cannot commit tx (%s)", err) } } + if !stop { + t.tx = t.backend.begin(true) + } +} + +type batchTxBuffered struct { + batchTx + buf txWriteBuffer +} - if stop { - return +func newBatchTxBuffered(backend *backend) *batchTxBuffered { + tx := &batchTxBuffered{ + batchTx: batchTx{backend: backend}, + buf: txWriteBuffer{ + txBuffer: txBuffer{make(map[string]*bucketBuffer)}, + seq: true, + }, } + tx.Commit() + return tx +} - t.backend.mu.RLock() - defer t.backend.mu.RUnlock() - // begin a new tx - t.tx, err = t.backend.db.Begin(true) - if err != nil { - plog.Fatalf("cannot begin tx (%s)", err) +func (t *batchTxBuffered) Unlock() { + if t.pending != 0 { + t.backend.readTx.mu.Lock() + t.buf.writeback(&t.backend.readTx.buf) + t.backend.readTx.mu.Unlock() + if t.pending >= t.backend.batchLimit { + t.commit(false) + } + } + t.batchTx.Unlock() +} + +func (t *batchTxBuffered) Commit() { + t.Lock() + defer t.Unlock() + t.commit(false) +} + +func (t *batchTxBuffered) CommitAndStop() { + t.Lock() + defer t.Unlock() + t.commit(true) +} + +func (t *batchTxBuffered) commit(stop bool) { + // all read txs must be closed to acquire boltdb commit rwlock + t.backend.readTx.mu.Lock() + defer t.backend.readTx.mu.Unlock() + if t.backend.readTx.tx != nil { + if err := t.backend.readTx.tx.Rollback(); err != nil { + plog.Fatalf("cannot rollback tx (%s)", err) + } + t.backend.readTx.buf.reset() + t.backend.readTx.tx = nil + } + + t.batchTx.commit(stop) + + if !stop { + t.backend.readTx.tx = t.backend.begin(false) } - atomic.StoreInt64(&t.backend.size, t.tx.Size()) +} + +func (t *batchTxBuffered) UnsafePut(bucketName []byte, key []byte, value []byte) { + t.batchTx.UnsafePut(bucketName, key, value) + t.buf.put(bucketName, key, value) +} + +func (t *batchTxBuffered) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) { + t.batchTx.UnsafeSeqPut(bucketName, key, value) + t.buf.putSeq(bucketName, key, value) } diff --git a/mvcc/backend/read_tx.go b/mvcc/backend/read_tx.go new file mode 100644 index 00000000000..51596ffdf27 --- /dev/null +++ b/mvcc/backend/read_tx.go @@ -0,0 +1,92 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package backend + +import ( + "bytes" + "math" + "sync" + + "github.com/boltdb/bolt" +) + +// safeRangeBucket is a hack to avoid inadvertently reading duplicate keys; +// overwrites on a bucket should only fetch with limit=1, but safeRangeBucket +// is known to never overwrite any key so range is safe. +var safeRangeBucket = []byte("key") + +type ReadTx interface { + Lock() + Unlock() + + UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) + UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error +} + +type readTx struct { + // mu protects accesses to the txReadBuffer + mu sync.RWMutex + buf txReadBuffer + + // txmu protects accesses to the Tx on Range requests + txmu sync.Mutex + tx *bolt.Tx +} + +func (rt *readTx) Lock() { rt.mu.RLock() } +func (rt *readTx) Unlock() { rt.mu.RUnlock() } + +func (rt *readTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) { + if endKey == nil { + // forbid duplicates for single keys + limit = 1 + } + if limit <= 0 { + limit = math.MaxInt64 + } + if limit > 1 && !bytes.Equal(bucketName, safeRangeBucket) { + panic("do not use unsafeRange on non-keys bucket") + } + keys, vals := rt.buf.Range(bucketName, key, endKey, limit) + if int64(len(keys)) == limit { + return keys, vals + } + rt.txmu.Lock() + // ignore error since bucket may have been created in this batch + k2, v2, _ := unsafeRange(rt.tx, bucketName, key, endKey, limit-int64(len(keys))) + rt.txmu.Unlock() + return append(k2, keys...), append(v2, vals...) +} + +func (rt *readTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error { + dups := make(map[string]struct{}) + f1 := func(k, v []byte) error { + dups[string(k)] = struct{}{} + return visitor(k, v) + } + f2 := func(k, v []byte) error { + if _, ok := dups[string(k)]; ok { + return nil + } + return visitor(k, v) + } + if err := rt.buf.ForEach(bucketName, f1); err != nil { + return err + } + rt.txmu.Lock() + err := unsafeForEach(rt.tx, bucketName, f2) + rt.txmu.Unlock() + return err +} diff --git a/mvcc/backend/tx_buffer.go b/mvcc/backend/tx_buffer.go new file mode 100644 index 00000000000..56e885dbfbc --- /dev/null +++ b/mvcc/backend/tx_buffer.go @@ -0,0 +1,181 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package backend + +import ( + "bytes" + "sort" +) + +// txBuffer handles functionality shared between txWriteBuffer and txReadBuffer. +type txBuffer struct { + buckets map[string]*bucketBuffer +} + +func (txb *txBuffer) reset() { + for k, v := range txb.buckets { + if v.used == 0 { + // demote + delete(txb.buckets, k) + } + v.used = 0 + } +} + +// txWriteBuffer buffers writes of pending updates that have not yet committed. +type txWriteBuffer struct { + txBuffer + seq bool +} + +func (txw *txWriteBuffer) put(bucket, k, v []byte) { + txw.seq = false + txw.putSeq(bucket, k, v) +} + +func (txw *txWriteBuffer) putSeq(bucket, k, v []byte) { + b, ok := txw.buckets[string(bucket)] + if !ok { + b = newBucketBuffer() + txw.buckets[string(bucket)] = b + } + b.add(k, v) +} + +func (txw *txWriteBuffer) writeback(txr *txReadBuffer) { + for k, wb := range txw.buckets { + rb, ok := txr.buckets[k] + if !ok { + delete(txw.buckets, k) + txr.buckets[k] = wb + continue + } + if !txw.seq && wb.used > 1 { + // assume no duplicate keys + sort.Sort(wb) + } + rb.merge(wb) + } + txw.reset() +} + +// txReadBuffer accesses buffered updates. +type txReadBuffer struct{ txBuffer } + +func (txr *txReadBuffer) Range(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) { + if b := txr.buckets[string(bucketName)]; b != nil { + return b.Range(key, endKey, limit) + } + return nil, nil +} + +func (txr *txReadBuffer) ForEach(bucketName []byte, visitor func(k, v []byte) error) error { + if b := txr.buckets[string(bucketName)]; b != nil { + return b.ForEach(visitor) + } + return nil +} + +type kv struct { + key []byte + val []byte +} + +// bucketBuffer buffers key-value pairs that are pending commit. +type bucketBuffer struct { + buf []kv + // used tracks number of elements in use so buf can be reused without reallocation. + used int +} + +func newBucketBuffer() *bucketBuffer { + return &bucketBuffer{buf: make([]kv, 512), used: 0} +} + +func (bb *bucketBuffer) Range(key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) { + f := func(i int) bool { return bytes.Compare(bb.buf[i].key, key) >= 0 } + idx := sort.Search(bb.used, f) + if idx < 0 { + return nil, nil + } + if len(endKey) == 0 { + if bytes.Equal(key, bb.buf[idx].key) { + keys = append(keys, bb.buf[idx].key) + vals = append(vals, bb.buf[idx].val) + } + return keys, vals + } + if bytes.Compare(endKey, bb.buf[idx].key) <= 0 { + return nil, nil + } + for i := idx; i < bb.used && int64(len(keys)) < limit; i++ { + if bytes.Compare(endKey, bb.buf[i].key) <= 0 { + break + } + keys = append(keys, bb.buf[i].key) + vals = append(vals, bb.buf[i].val) + } + return keys, vals +} + +func (bb *bucketBuffer) ForEach(visitor func(k, v []byte) error) error { + for i := 0; i < bb.used; i++ { + if err := visitor(bb.buf[i].key, bb.buf[i].val); err != nil { + return err + } + } + return nil +} + +func (bb *bucketBuffer) add(k, v []byte) { + bb.buf[bb.used].key, bb.buf[bb.used].val = k, v + bb.used++ + if bb.used == len(bb.buf) { + buf := make([]kv, (3*len(bb.buf))/2) + copy(buf, bb.buf) + bb.buf = buf + } +} + +// merge merges data from bb into bbsrc. +func (bb *bucketBuffer) merge(bbsrc *bucketBuffer) { + for i := 0; i < bbsrc.used; i++ { + bb.add(bbsrc.buf[i].key, bbsrc.buf[i].val) + } + if bb.used == bbsrc.used { + return + } + if bytes.Compare(bb.buf[(bb.used-bbsrc.used)-1].key, bbsrc.buf[0].key) < 0 { + return + } + + sort.Stable(bb) + + // remove duplicates, using only newest update + widx := 0 + for ridx := 1; ridx < bb.used; ridx++ { + if !bytes.Equal(bb.buf[ridx].key, bb.buf[widx].key) { + widx++ + } + bb.buf[widx] = bb.buf[ridx] + } + bb.used = widx + 1 +} + +func (bb *bucketBuffer) Len() int { return bb.used } +func (bb *bucketBuffer) Less(i, j int) bool { + return bytes.Compare(bb.buf[i].key, bb.buf[j].key) < 0 +} +func (bb *bucketBuffer) Swap(i, j int) { bb.buf[i], bb.buf[j] = bb.buf[j], bb.buf[i] } From 33acbb694b81f0d4a254936195d706a0b29ce158 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Wed, 4 Jan 2017 17:01:31 -0800 Subject: [PATCH 2/6] mvcc: txns and r/w views Clean-up of the mvcc interfaces to use txn interfaces instead of an id. Adds support for concurrent read-only mvcc transactions. Fixes #7083 --- mvcc/kv.go | 78 ++++-- mvcc/kv_test.go | 95 +++---- mvcc/kv_view.go | 53 ++++ mvcc/kvstore.go | 390 ++++------------------------- mvcc/kvstore_bench_test.go | 16 +- mvcc/kvstore_test.go | 86 +++---- mvcc/kvstore_txn.go | 254 +++++++++++++++++++ mvcc/metrics_txn.go | 67 +++++ mvcc/watchable_store.go | 150 +++-------- mvcc/watchable_store_bench_test.go | 8 +- mvcc/watchable_store_test.go | 4 +- mvcc/watchable_store_txn.go | 53 ++++ 12 files changed, 643 insertions(+), 611 deletions(-) create mode 100644 mvcc/kv_view.go create mode 100644 mvcc/kvstore_txn.go create mode 100644 mvcc/metrics_txn.go create mode 100644 mvcc/watchable_store_txn.go diff --git a/mvcc/kv.go b/mvcc/kv.go index c851c8725e8..e13cd647948 100644 --- a/mvcc/kv.go +++ b/mvcc/kv.go @@ -32,15 +32,15 @@ type RangeResult struct { Count int } -type KV interface { - // Rev returns the current revision of the KV. - Rev() int64 - - // FirstRev returns the first revision of the KV. +type ReadView interface { + // FirstRev returns the first KV revision at the time of opening the txn. // After a compaction, the first revision increases to the compaction // revision. FirstRev() int64 + // Rev returns the revision of the KV at the time of opening the txn. + Rev() int64 + // Range gets the keys in the range at rangeRev. // The returned rev is the current revision of the KV when the operation is executed. // If rangeRev <=0, range gets the keys at currentRev. @@ -50,14 +50,17 @@ type KV interface { // Limit limits the number of keys returned. // If the required rev is compacted, ErrCompacted will be returned. Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) +} - // Put puts the given key, value into the store. Put also takes additional argument lease to - // attach a lease to a key-value pair as meta-data. KV implementation does not validate the lease - // id. - // A put also increases the rev of the store, and generates one event in the event history. - // The returned rev is the current revision of the KV when the operation is executed. - Put(key, value []byte, lease lease.LeaseID) (rev int64) +// TxnRead represents a read-only transaction with operations that will not +// block other read transactions. +type TxnRead interface { + ReadView + // End marks the transaction is complete and ready to commit. + End() +} +type WriteView interface { // DeleteRange deletes the given range from the store. // A deleteRange increases the rev of the store if any key in the range exists. // The number of key deleted will be returned. @@ -67,26 +70,49 @@ type KV interface { // if the `end` is not nil, deleteRange deletes the keys in range [key, range_end). DeleteRange(key, end []byte) (n, rev int64) - // TxnBegin begins a txn. Only Txn prefixed operation can be executed, others will be blocked - // until txn ends. Only one on-going txn is allowed. - // TxnBegin returns an int64 txn ID. - // All txn prefixed operations with same txn ID will be done with the same rev. - TxnBegin() int64 - // TxnEnd ends the on-going txn with txn ID. If the on-going txn ID is not matched, error is returned. - TxnEnd(txnID int64) error - // TxnRange returns the current revision of the KV when the operation is executed. - TxnRange(txnID int64, key, end []byte, ro RangeOptions) (r *RangeResult, err error) - TxnPut(txnID int64, key, value []byte, lease lease.LeaseID) (rev int64, err error) - TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) + // Put puts the given key, value into the store. Put also takes additional argument lease to + // attach a lease to a key-value pair as meta-data. KV implementation does not validate the lease + // id. + // A put also increases the rev of the store, and generates one event in the event history. + // The returned rev is the current revision of the KV when the operation is executed. + Put(key, value []byte, lease lease.LeaseID) (rev int64) +} - // Compact frees all superseded keys with revisions less than rev. - Compact(rev int64) (<-chan struct{}, error) +// TxnWrite represents a transaction that can modify the store. +type TxnWrite interface { + TxnRead + WriteView + // Changes gets the changes made since opening the write txn. + Changes() []mvccpb.KeyValue +} + +// txnReadWrite coerces a read txn to a write, panicking on any write operation. +type txnReadWrite struct{ TxnRead } + +func (trw *txnReadWrite) DeleteRange(key, end []byte) (n, rev int64) { panic("unexpected DeleteRange") } +func (trw *txnReadWrite) Put(key, value []byte, lease lease.LeaseID) (rev int64) { + panic("unexpected Put") +} +func (trw *txnReadWrite) Changes() []mvccpb.KeyValue { panic("unexpected Changes") } + +type KV interface { + ReadView + WriteView + + // Read creates a read transaction. + Read() TxnRead + + // Write creates a write transaction. + Write() TxnWrite // Hash retrieves the hash of KV state and revision. - // This method is designed for consistency checking purpose. + // This method is designed for consistency checking purposes. Hash() (hash uint32, revision int64, err error) - // Commit commits txns into the underlying backend. + // Compact frees all superseded keys with revisions less than rev. + Compact(rev int64) (<-chan struct{}, error) + + // Commit commits outstanding txns into the underlying backend. Commit() // Restore restores the KV store from a backend. diff --git a/mvcc/kv_test.go b/mvcc/kv_test.go index 721d70a57f9..162dac40b5c 100644 --- a/mvcc/kv_test.go +++ b/mvcc/kv_test.go @@ -43,35 +43,27 @@ var ( return kv.Range(key, end, ro) } txnRangeFunc = func(kv KV, key, end []byte, ro RangeOptions) (*RangeResult, error) { - id := kv.TxnBegin() - defer kv.TxnEnd(id) - return kv.TxnRange(id, key, end, ro) + txn := kv.Read() + defer txn.End() + return txn.Range(key, end, ro) } normalPutFunc = func(kv KV, key, value []byte, lease lease.LeaseID) int64 { return kv.Put(key, value, lease) } txnPutFunc = func(kv KV, key, value []byte, lease lease.LeaseID) int64 { - id := kv.TxnBegin() - defer kv.TxnEnd(id) - rev, err := kv.TxnPut(id, key, value, lease) - if err != nil { - panic("txn put error") - } - return rev + txn := kv.Write() + defer txn.End() + return txn.Put(key, value, lease) } normalDeleteRangeFunc = func(kv KV, key, end []byte) (n, rev int64) { return kv.DeleteRange(key, end) } txnDeleteRangeFunc = func(kv KV, key, end []byte) (n, rev int64) { - id := kv.TxnBegin() - defer kv.TxnEnd(id) - n, rev, err := kv.TxnDeleteRange(id, key, end) - if err != nil { - panic("txn delete error") - } - return n, rev + txn := kv.Write() + defer txn.End() + return txn.DeleteRange(key, end) } ) @@ -142,7 +134,7 @@ func testKVRange(t *testing.T, f rangeFunc) { } func TestKVRangeRev(t *testing.T) { testKVRangeRev(t, normalRangeFunc) } -func TestKVTxnRangeRev(t *testing.T) { testKVRangeRev(t, normalRangeFunc) } +func TestKVTxnRangeRev(t *testing.T) { testKVRangeRev(t, txnRangeFunc) } func testKVRangeRev(t *testing.T, f rangeFunc) { b, tmpPath := backend.NewDefaultTmpBackend() @@ -178,7 +170,7 @@ func testKVRangeRev(t *testing.T, f rangeFunc) { } func TestKVRangeBadRev(t *testing.T) { testKVRangeBadRev(t, normalRangeFunc) } -func TestKVTxnRangeBadRev(t *testing.T) { testKVRangeBadRev(t, normalRangeFunc) } +func TestKVTxnRangeBadRev(t *testing.T) { testKVRangeBadRev(t, txnRangeFunc) } func testKVRangeBadRev(t *testing.T, f rangeFunc) { b, tmpPath := backend.NewDefaultTmpBackend() @@ -404,17 +396,16 @@ func TestKVOperationInSequence(t *testing.T) { } } -func TestKVTxnBlockNonTxnOperations(t *testing.T) { +func TestKVTxnBlockWriteOperations(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() s := NewStore(b, &lease.FakeLessor{}, nil) tests := []func(){ - func() { s.Range([]byte("foo"), nil, RangeOptions{}) }, func() { s.Put([]byte("foo"), nil, lease.NoLease) }, func() { s.DeleteRange([]byte("foo"), nil) }, } for i, tt := range tests { - id := s.TxnBegin() + txn := s.Write() done := make(chan struct{}, 1) go func() { tt() @@ -426,7 +417,7 @@ func TestKVTxnBlockNonTxnOperations(t *testing.T) { case <-time.After(10 * time.Millisecond): } - s.TxnEnd(id) + txn.End() select { case <-done: case <-time.After(10 * time.Second): @@ -438,39 +429,23 @@ func TestKVTxnBlockNonTxnOperations(t *testing.T) { cleanup(s, b, tmpPath) } -func TestKVTxnWrongID(t *testing.T) { +func TestKVTxnNonBlockRange(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() s := NewStore(b, &lease.FakeLessor{}, nil) defer cleanup(s, b, tmpPath) - id := s.TxnBegin() - wrongid := id + 1 - - tests := []func() error{ - func() error { - _, err := s.TxnRange(wrongid, []byte("foo"), nil, RangeOptions{}) - return err - }, - func() error { - _, err := s.TxnPut(wrongid, []byte("foo"), nil, lease.NoLease) - return err - }, - func() error { - _, _, err := s.TxnDeleteRange(wrongid, []byte("foo"), nil) - return err - }, - func() error { return s.TxnEnd(wrongid) }, - } - for i, tt := range tests { - err := tt() - if err != ErrTxnIDMismatch { - t.Fatalf("#%d: err = %+v, want %+v", i, err, ErrTxnIDMismatch) - } - } + txn := s.Write() + defer txn.End() - err := s.TxnEnd(id) - if err != nil { - t.Fatalf("end err = %+v, want %+v", err, nil) + donec := make(chan struct{}) + go func() { + defer close(donec) + s.Range([]byte("foo"), nil, RangeOptions{}) + }() + select { + case <-donec: + case <-time.After(100 * time.Millisecond): + t.Fatalf("range operation blocked on write txn") } } @@ -481,19 +456,16 @@ func TestKVTxnOperationInSequence(t *testing.T) { defer cleanup(s, b, tmpPath) for i := 0; i < 10; i++ { - id := s.TxnBegin() + txn := s.Write() base := int64(i + 1) // put foo - rev, err := s.TxnPut(id, []byte("foo"), []byte("bar"), lease.NoLease) - if err != nil { - t.Fatal(err) - } + rev := txn.Put([]byte("foo"), []byte("bar"), lease.NoLease) if rev != base+1 { t.Errorf("#%d: put rev = %d, want %d", i, rev, base+1) } - r, err := s.TxnRange(id, []byte("foo"), nil, RangeOptions{Rev: base + 1}) + r, err := txn.Range([]byte("foo"), nil, RangeOptions{Rev: base + 1}) if err != nil { t.Fatal(err) } @@ -508,15 +480,12 @@ func TestKVTxnOperationInSequence(t *testing.T) { } // delete foo - n, rev, err := s.TxnDeleteRange(id, []byte("foo"), nil) - if err != nil { - t.Fatal(err) - } + n, rev := txn.DeleteRange([]byte("foo"), nil) if n != 1 || rev != base+1 { t.Errorf("#%d: n = %d, rev = %d, want (%d, %d)", i, n, rev, 1, base+1) } - r, err = s.TxnRange(id, []byte("foo"), nil, RangeOptions{Rev: base + 1}) + r, err = txn.Range([]byte("foo"), nil, RangeOptions{Rev: base + 1}) if err != nil { t.Errorf("#%d: range error (%v)", i, err) } @@ -527,7 +496,7 @@ func TestKVTxnOperationInSequence(t *testing.T) { t.Errorf("#%d: range rev = %d, want %d", i, r.Rev, base+1) } - s.TxnEnd(id) + txn.End() } } diff --git a/mvcc/kv_view.go b/mvcc/kv_view.go new file mode 100644 index 00000000000..f40ba8edc22 --- /dev/null +++ b/mvcc/kv_view.go @@ -0,0 +1,53 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mvcc + +import ( + "github.com/coreos/etcd/lease" +) + +type readView struct{ kv KV } + +func (rv *readView) FirstRev() int64 { + tr := rv.kv.Read() + defer tr.End() + return tr.FirstRev() +} + +func (rv *readView) Rev() int64 { + tr := rv.kv.Read() + defer tr.End() + return tr.Rev() +} + +func (rv *readView) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) { + tr := rv.kv.Read() + defer tr.End() + return tr.Range(key, end, ro) +} + +type writeView struct{ kv KV } + +func (wv *writeView) DeleteRange(key, end []byte) (n, rev int64) { + tw := wv.kv.Write() + defer tw.End() + return tw.DeleteRange(key, end) +} + +func (wv *writeView) Put(key, value []byte, lease lease.LeaseID) (rev int64) { + tw := wv.kv.Write() + defer tw.End() + return tw.Put(key, value, lease) +} diff --git a/mvcc/kvstore.go b/mvcc/kvstore.go index f9f78c15547..6a0e4167dd0 100644 --- a/mvcc/kvstore.go +++ b/mvcc/kvstore.go @@ -18,7 +18,6 @@ import ( "encoding/binary" "errors" "math" - "math/rand" "sync" "time" @@ -45,10 +44,10 @@ var ( scheduledCompactKeyName = []byte("scheduledCompactRev") finishedCompactKeyName = []byte("finishedCompactRev") - ErrTxnIDMismatch = errors.New("mvcc: txn id mismatch") - ErrCompacted = errors.New("mvcc: required revision has been compacted") - ErrFutureRev = errors.New("mvcc: required revision is a future revision") - ErrCanceled = errors.New("mvcc: watcher is canceled") + ErrCompacted = errors.New("mvcc: required revision has been compacted") + ErrFutureRev = errors.New("mvcc: required revision is a future revision") + ErrCanceled = errors.New("mvcc: watcher is canceled") + ErrClosed = errors.New("mvcc: closed") plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mvcc") ) @@ -61,7 +60,11 @@ type ConsistentIndexGetter interface { } type store struct { - mu sync.Mutex // guards the following + ReadView + WriteView + + // mu read locks for txns and write locks for non-txn store changes. + mu sync.RWMutex ig ConsistentIndexGetter @@ -70,19 +73,19 @@ type store struct { le lease.Lessor - currentRev revision - // the main revision of the last compaction + // revMuLock protects currentRev and compactMainRev. + // Locked at end of write txn and released after write txn unlock lock. + // Locked before locking read txn and released after locking. + revMu sync.RWMutex + // currentRev is the revision of the last completed transaction. + currentRev int64 + // compactMainRev is the main revision of the last compaction. compactMainRev int64 - tx backend.BatchTx - txnID int64 // tracks the current txnID to verify txn operations - txnModify bool - // bytesBuf8 is a byte slice of length 8 // to avoid a repetitive allocation in saveIndex. bytesBuf8 []byte - changes []mvccpb.KeyValue fifoSched schedule.Scheduler stopc chan struct{} @@ -98,7 +101,7 @@ func NewStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *sto le: le, - currentRev: revision{main: 1}, + currentRev: 1, compactMainRev: -1, bytesBuf8: make([]byte, 8), @@ -106,9 +109,10 @@ func NewStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *sto stopc: make(chan struct{}), } - + s.ReadView = &readView{s} + s.WriteView = &writeView{s} if s.le != nil { - s.le.SetRangeDeleter(s) + s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write() }) } tx := s.b.BatchTx() @@ -126,140 +130,6 @@ func NewStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *sto return s } -func (s *store) Rev() int64 { - s.mu.Lock() - defer s.mu.Unlock() - - return s.currentRev.main -} - -func (s *store) FirstRev() int64 { - s.mu.Lock() - defer s.mu.Unlock() - - return s.compactMainRev -} - -func (s *store) Put(key, value []byte, lease lease.LeaseID) int64 { - id := s.TxnBegin() - s.put(key, value, lease) - s.txnEnd(id) - - putCounter.Inc() - - return int64(s.currentRev.main) -} - -func (s *store) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) { - id := s.TxnBegin() - kvs, count, rev, err := s.rangeKeys(key, end, ro.Limit, ro.Rev, ro.Count) - s.txnEnd(id) - - rangeCounter.Inc() - - r = &RangeResult{ - KVs: kvs, - Count: count, - Rev: rev, - } - - return r, err -} - -func (s *store) DeleteRange(key, end []byte) (n, rev int64) { - id := s.TxnBegin() - n = s.deleteRange(key, end) - s.txnEnd(id) - - deleteCounter.Inc() - - return n, int64(s.currentRev.main) -} - -func (s *store) TxnBegin() int64 { - s.mu.Lock() - s.currentRev.sub = 0 - s.tx = s.b.BatchTx() - s.tx.Lock() - - s.txnID = rand.Int63() - return s.txnID -} - -func (s *store) TxnEnd(txnID int64) error { - err := s.txnEnd(txnID) - if err != nil { - return err - } - - txnCounter.Inc() - return nil -} - -// txnEnd is used for unlocking an internal txn. It does -// not increase the txnCounter. -func (s *store) txnEnd(txnID int64) error { - if txnID != s.txnID { - return ErrTxnIDMismatch - } - - // only update index if the txn modifies the mvcc state. - // read only txn might execute with one write txn concurrently, - // it should not write its index to mvcc. - if s.txnModify { - s.saveIndex() - } - s.txnModify = false - - s.tx.Unlock() - if s.currentRev.sub != 0 { - s.currentRev.main += 1 - } - s.currentRev.sub = 0 - - dbTotalSize.Set(float64(s.b.Size())) - s.mu.Unlock() - return nil -} - -func (s *store) TxnRange(txnID int64, key, end []byte, ro RangeOptions) (r *RangeResult, err error) { - if txnID != s.txnID { - return nil, ErrTxnIDMismatch - } - - kvs, count, rev, err := s.rangeKeys(key, end, ro.Limit, ro.Rev, ro.Count) - - r = &RangeResult{ - KVs: kvs, - Count: count, - Rev: rev, - } - return r, err -} - -func (s *store) TxnPut(txnID int64, key, value []byte, lease lease.LeaseID) (rev int64, err error) { - if txnID != s.txnID { - return 0, ErrTxnIDMismatch - } - - s.put(key, value, lease) - return int64(s.currentRev.main + 1), nil -} - -func (s *store) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) { - if txnID != s.txnID { - return 0, 0, ErrTxnIDMismatch - } - - n = s.deleteRange(key, end) - if n != 0 || s.currentRev.sub != 0 { - rev = int64(s.currentRev.main + 1) - } else { - rev = int64(s.currentRev.main) - } - return n, rev, nil -} - func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) { if ctx == nil || ctx.Err() != nil { s.mu.Lock() @@ -275,16 +145,32 @@ func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) { close(ch) } +func (s *store) Hash() (hash uint32, revision int64, err error) { + // TODO: nothing should be able to call into backend when closed + select { + case <-s.stopc: + return 0, 0, ErrClosed + default: + } + + s.b.ForceCommit() + h, err := s.b.Hash(DefaultIgnores) + return h, s.currentRev, err +} + func (s *store) Compact(rev int64) (<-chan struct{}, error) { s.mu.Lock() defer s.mu.Unlock() + s.revMu.Lock() + defer s.revMu.Unlock() + if rev <= s.compactMainRev { ch := make(chan struct{}) f := func(ctx context.Context) { s.compactBarrier(ctx, ch) } s.fifoSched.Schedule(f) return ch, ErrCompacted } - if rev > s.currentRev.main { + if rev > s.currentRev { return nil, ErrFutureRev } @@ -333,24 +219,14 @@ func init() { } } -func (s *store) Hash() (uint32, int64, error) { - s.mu.Lock() - defer s.mu.Unlock() - s.b.ForceCommit() - - h, err := s.b.Hash(DefaultIgnores) - rev := s.currentRev.main - return h, rev, err -} - func (s *store) Commit() { s.mu.Lock() defer s.mu.Unlock() - s.tx = s.b.BatchTx() - s.tx.Lock() - s.saveIndex() - s.tx.Unlock() + tx := s.b.BatchTx() + tx.Lock() + s.saveIndex(tx) + tx.Unlock() s.b.ForceCommit() } @@ -363,10 +239,8 @@ func (s *store) Restore(b backend.Backend) error { s.b = b s.kvindex = newTreeIndex() - s.currentRev = revision{main: 1} + s.currentRev = 1 s.compactMainRev = -1 - s.tx = b.BatchTx() - s.txnID = -1 s.fifoSched = schedule.NewFIFOScheduler() s.stopc = make(chan struct{}) @@ -403,6 +277,7 @@ func (s *store) restore() error { } rev := bytesToRev(key[:revBytesLen]) + s.currentRev = rev.main // restore index switch { @@ -428,9 +303,6 @@ func (s *store) restore() error { delete(keyToLease, string(kv.Key)) } } - - // update revision - s.currentRev = rev } // restore the tree index from the unordered index. @@ -441,8 +313,8 @@ func (s *store) restore() error { // keys in the range [compacted revision -N, compaction] might all be deleted due to compaction. // the correct revision should be set to compaction revision in the case, not the largest revision // we have seen. - if s.currentRev.main < s.compactMainRev { - s.currentRev.main = s.compactMainRev + if s.currentRev < s.compactMainRev { + s.currentRev = s.compactMainRev } for key, lid := range keyToLease { @@ -490,180 +362,10 @@ func (a *store) Equal(b *store) bool { return a.kvindex.Equal(b.kvindex) } -// range is a keyword in Go, add Keys suffix. -func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64, countOnly bool) (kvs []mvccpb.KeyValue, count int, curRev int64, err error) { - curRev = int64(s.currentRev.main) - if s.currentRev.sub > 0 { - curRev += 1 - } - - if rangeRev > curRev { - return nil, -1, s.currentRev.main, ErrFutureRev - } - var rev int64 - if rangeRev <= 0 { - rev = curRev - } else { - rev = rangeRev - } - if rev < s.compactMainRev { - return nil, -1, 0, ErrCompacted - } - - _, revpairs := s.kvindex.Range(key, end, int64(rev)) - if len(revpairs) == 0 { - return nil, 0, curRev, nil - } - if countOnly { - return nil, len(revpairs), curRev, nil - } - - for _, revpair := range revpairs { - start, end := revBytesRange(revpair) - - _, vs := s.tx.UnsafeRange(keyBucketName, start, end, 0) - if len(vs) != 1 { - plog.Fatalf("range cannot find rev (%d,%d)", revpair.main, revpair.sub) - } - - var kv mvccpb.KeyValue - if err := kv.Unmarshal(vs[0]); err != nil { - plog.Fatalf("cannot unmarshal event: %v", err) - } - kvs = append(kvs, kv) - if limit > 0 && len(kvs) >= int(limit) { - break - } - } - return kvs, len(revpairs), curRev, nil -} - -func (s *store) put(key, value []byte, leaseID lease.LeaseID) { - s.txnModify = true - - rev := s.currentRev.main + 1 - c := rev - oldLease := lease.NoLease - - // if the key exists before, use its previous created and - // get its previous leaseID - _, created, ver, err := s.kvindex.Get(key, rev) - if err == nil { - c = created.main - oldLease = s.le.GetLease(lease.LeaseItem{Key: string(key)}) - } - - ibytes := newRevBytes() - revToBytes(revision{main: rev, sub: s.currentRev.sub}, ibytes) - - ver = ver + 1 - kv := mvccpb.KeyValue{ - Key: key, - Value: value, - CreateRevision: c, - ModRevision: rev, - Version: ver, - Lease: int64(leaseID), - } - - d, err := kv.Marshal() - if err != nil { - plog.Fatalf("cannot marshal event: %v", err) - } - - s.tx.UnsafeSeqPut(keyBucketName, ibytes, d) - s.kvindex.Put(key, revision{main: rev, sub: s.currentRev.sub}) - s.changes = append(s.changes, kv) - s.currentRev.sub += 1 - - if oldLease != lease.NoLease { - if s.le == nil { - panic("no lessor to detach lease") - } - - err = s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}}) - if err != nil { - plog.Errorf("unexpected error from lease detach: %v", err) - } - } - - if leaseID != lease.NoLease { - if s.le == nil { - panic("no lessor to attach lease") - } - - err = s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}}) - if err != nil { - panic("unexpected error from lease Attach") - } - } -} - -func (s *store) deleteRange(key, end []byte) int64 { - s.txnModify = true - - rrev := s.currentRev.main - if s.currentRev.sub > 0 { - rrev += 1 - } - keys, revs := s.kvindex.Range(key, end, rrev) - - if len(keys) == 0 { - return 0 - } - - for i, key := range keys { - s.delete(key, revs[i]) - } - return int64(len(keys)) -} - -func (s *store) delete(key []byte, rev revision) { - mainrev := s.currentRev.main + 1 - - ibytes := newRevBytes() - revToBytes(revision{main: mainrev, sub: s.currentRev.sub}, ibytes) - ibytes = appendMarkTombstone(ibytes) - - kv := mvccpb.KeyValue{ - Key: key, - } - - d, err := kv.Marshal() - if err != nil { - plog.Fatalf("cannot marshal event: %v", err) - } - - s.tx.UnsafeSeqPut(keyBucketName, ibytes, d) - err = s.kvindex.Tombstone(key, revision{main: mainrev, sub: s.currentRev.sub}) - if err != nil { - plog.Fatalf("cannot tombstone an existing key (%s): %v", string(key), err) - } - s.changes = append(s.changes, kv) - s.currentRev.sub += 1 - - item := lease.LeaseItem{Key: string(key)} - leaseID := s.le.GetLease(item) - - if leaseID != lease.NoLease { - err = s.le.Detach(leaseID, []lease.LeaseItem{item}) - if err != nil { - plog.Errorf("cannot detach %v", err) - } - } -} - -func (s *store) getChanges() []mvccpb.KeyValue { - changes := s.changes - s.changes = make([]mvccpb.KeyValue, 0, 4) - return changes -} - -func (s *store) saveIndex() { +func (s *store) saveIndex(tx backend.BatchTx) { if s.ig == nil { return } - tx := s.tx bs := s.bytesBuf8 binary.BigEndian.PutUint64(bs, s.ig.ConsistentIndex()) // put the index into the underlying backend diff --git a/mvcc/kvstore_bench_test.go b/mvcc/kvstore_bench_test.go index 41a42420bc0..821fb675537 100644 --- a/mvcc/kvstore_bench_test.go +++ b/mvcc/kvstore_bench_test.go @@ -78,11 +78,9 @@ func BenchmarkStoreTxnPut(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - id := s.TxnBegin() - if _, err := s.TxnPut(id, keys[i], vals[i], lease.NoLease); err != nil { - plog.Fatalf("txn put error: %v", err) - } - s.TxnEnd(id) + txn := s.Write() + txn.Put(keys[i], vals[i], lease.NoLease) + txn.End() } } @@ -100,11 +98,9 @@ func benchmarkStoreRestore(revsPerKey int, b *testing.B) { for i := 0; i < b.N; i++ { for j := 0; j < revsPerKey; j++ { - id := s.TxnBegin() - if _, err := s.TxnPut(id, keys[i], vals[i], lease.NoLease); err != nil { - plog.Fatalf("txn put error: %v", err) - } - s.TxnEnd(id) + txn := s.Write() + txn.Put(keys[i], vals[i], lease.NoLease) + txn.End() } } b.ResetTimer() diff --git a/mvcc/kvstore_test.go b/mvcc/kvstore_test.go index 816a1b66d60..8f1972426cd 100644 --- a/mvcc/kvstore_test.go +++ b/mvcc/kvstore_test.go @@ -72,7 +72,7 @@ func TestStorePut(t *testing.T) { indexGetResp{revision{}, revision{}, 0, ErrRevisionNotFound}, nil, - revision{1, 1}, + revision{2, 0}, newTestKeyBytes(revision{2, 0}, false), mvccpb.KeyValue{ Key: []byte("foo"), @@ -89,8 +89,8 @@ func TestStorePut(t *testing.T) { indexGetResp{revision{2, 0}, revision{2, 0}, 1, nil}, &rangeResp{[][]byte{newTestKeyBytes(revision{2, 1}, false)}, [][]byte{kvb}}, - revision{1, 2}, - newTestKeyBytes(revision{2, 1}, false), + revision{2, 0}, + newTestKeyBytes(revision{2, 0}, false), mvccpb.KeyValue{ Key: []byte("foo"), Value: []byte("bar"), @@ -99,14 +99,14 @@ func TestStorePut(t *testing.T) { Version: 2, Lease: 2, }, - revision{2, 1}, + revision{2, 0}, }, { revision{2, 0}, indexGetResp{revision{2, 1}, revision{2, 0}, 2, nil}, &rangeResp{[][]byte{newTestKeyBytes(revision{2, 1}, false)}, [][]byte{kvb}}, - revision{2, 1}, + revision{3, 0}, newTestKeyBytes(revision{3, 0}, false), mvccpb.KeyValue{ Key: []byte("foo"), @@ -124,14 +124,13 @@ func TestStorePut(t *testing.T) { b := s.b.(*fakeBackend) fi := s.kvindex.(*fakeIndex) - s.currentRev = tt.rev - s.tx = b.BatchTx() + s.currentRev = tt.rev.main fi.indexGetRespc <- tt.r if tt.rr != nil { b.tx.rangeRespc <- *tt.rr } - s.put([]byte("foo"), []byte("bar"), lease.LeaseID(i+1)) + s.Put([]byte("foo"), []byte("bar"), lease.LeaseID(i+1)) data, err := tt.wkv.Marshal() if err != nil { @@ -158,7 +157,7 @@ func TestStorePut(t *testing.T) { if g := fi.Action(); !reflect.DeepEqual(g, wact) { t.Errorf("#%d: index action = %+v, want %+v", i, g, wact) } - if s.currentRev != tt.wrev { + if s.currentRev != tt.wrev.main { t.Errorf("#%d: rev = %+v, want %+v", i, s.currentRev, tt.wrev) } @@ -179,7 +178,6 @@ func TestStoreRange(t *testing.T) { if err != nil { t.Fatal(err) } - currev := revision{1, 1} wrev := int64(2) tests := []struct { @@ -195,25 +193,26 @@ func TestStoreRange(t *testing.T) { rangeResp{[][]byte{key}, [][]byte{kvb}}, }, } + + ro := RangeOptions{Limit: 1, Rev: 0, Count: false} for i, tt := range tests { s := newFakeStore() b := s.b.(*fakeBackend) fi := s.kvindex.(*fakeIndex) - s.currentRev = currev - s.tx = b.BatchTx() + s.currentRev = 2 b.tx.rangeRespc <- tt.r fi.indexRangeRespc <- tt.idxr - kvs, _, rev, err := s.rangeKeys([]byte("foo"), []byte("goo"), 1, 0, false) + ret, err := s.Range([]byte("foo"), []byte("goo"), ro) if err != nil { t.Errorf("#%d: err = %v, want nil", i, err) } - if w := []mvccpb.KeyValue{kv}; !reflect.DeepEqual(kvs, w) { - t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, w) + if w := []mvccpb.KeyValue{kv}; !reflect.DeepEqual(ret.KVs, w) { + t.Errorf("#%d: kvs = %+v, want %+v", i, ret.KVs, w) } - if rev != wrev { - t.Errorf("#%d: rev = %d, want %d", i, rev, wrev) + if ret.Rev != wrev { + t.Errorf("#%d: rev = %d, want %d", i, ret.Rev, wrev) } wstart, wend := revBytesRange(tt.idxr.revs[0]) @@ -229,8 +228,8 @@ func TestStoreRange(t *testing.T) { if g := fi.Action(); !reflect.DeepEqual(g, wact) { t.Errorf("#%d: index action = %+v, want %+v", i, g, wact) } - if s.currentRev != currev { - t.Errorf("#%d: current rev = %+v, want %+v", i, s.currentRev, currev) + if s.currentRev != 2 { + t.Errorf("#%d: current rev = %+v, want %+v", i, s.currentRev, 2) } s.Close() @@ -267,32 +266,21 @@ func TestStoreDeleteRange(t *testing.T) { rangeResp{[][]byte{key}, [][]byte{kvb}}, newTestKeyBytes(revision{3, 0}, true), - revision{2, 1}, + revision{3, 0}, 2, revision{3, 0}, }, - { - revision{2, 1}, - indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}}, - rangeResp{[][]byte{key}, [][]byte{kvb}}, - - newTestKeyBytes(revision{3, 1}, true), - revision{2, 2}, - 3, - revision{3, 1}, - }, } for i, tt := range tests { s := newFakeStore() b := s.b.(*fakeBackend) fi := s.kvindex.(*fakeIndex) - s.currentRev = tt.rev - s.tx = b.BatchTx() + s.currentRev = tt.rev.main fi.indexRangeRespc <- tt.r b.tx.rangeRespc <- tt.rr - n := s.deleteRange([]byte("foo"), []byte("goo")) + n, _ := s.DeleteRange([]byte("foo"), []byte("goo")) if n != 1 { t.Errorf("#%d: n = %d, want 1", i, n) } @@ -316,7 +304,7 @@ func TestStoreDeleteRange(t *testing.T) { if g := fi.Action(); !reflect.DeepEqual(g, wact) { t.Errorf("#%d: index action = %+v, want %+v", i, g, wact) } - if s.currentRev != tt.wrev { + if s.currentRev != tt.wrev.main { t.Errorf("#%d: rev = %+v, want %+v", i, s.currentRev, tt.wrev) } } @@ -328,7 +316,7 @@ func TestStoreCompact(t *testing.T) { b := s.b.(*fakeBackend) fi := s.kvindex.(*fakeIndex) - s.currentRev = revision{3, 0} + s.currentRev = 3 fi.indexCompactRespc <- map[revision]struct{}{{1, 0}: {}} key1 := newTestKeyBytes(revision{1, 0}, false) key2 := newTestKeyBytes(revision{2, 0}, false) @@ -393,9 +381,8 @@ func TestStoreRestore(t *testing.T) { if s.compactMainRev != 3 { t.Errorf("compact rev = %d, want 5", s.compactMainRev) } - wrev := revision{5, 0} - if !reflect.DeepEqual(s.currentRev, wrev) { - t.Errorf("current rev = %v, want %v", s.currentRev, wrev) + if s.currentRev != 5 { + t.Errorf("current rev = %v, want 5", s.currentRev) } wact := []testutil.Action{ {"range", []interface{}{metaBucketName, finishedCompactKeyName, []byte(nil), int64(0)}}, @@ -479,18 +466,12 @@ func TestTxnPut(t *testing.T) { defer cleanup(s, b, tmpPath) for i := 0; i < sliceN; i++ { - id := s.TxnBegin() + txn := s.Write() base := int64(i + 2) - - rev, err := s.TxnPut(id, keys[i], vals[i], lease.NoLease) - if err != nil { - t.Error("txn put error") - } - if rev != base { + if rev := txn.Put(keys[i], vals[i], lease.NoLease); rev != base { t.Errorf("#%d: rev = %d, want %d", i, rev, base) } - - s.TxnEnd(id) + txn.End() } } @@ -499,7 +480,7 @@ func TestTxnBlockBackendForceCommit(t *testing.T) { s := NewStore(b, &lease.FakeLessor{}, nil) defer os.Remove(tmpPath) - id := s.TxnBegin() + txn := s.Read() done := make(chan struct{}) go func() { @@ -512,7 +493,7 @@ func TestTxnBlockBackendForceCommit(t *testing.T) { case <-time.After(100 * time.Millisecond): } - s.TxnEnd(id) + txn.End() select { case <-done: case <-time.After(5 * time.Second): // wait 5 seconds for CI with slow IO @@ -562,15 +543,17 @@ func newFakeStore() *store { indexRangeEventsRespc: make(chan indexRangeEventsResp, 1), indexCompactRespc: make(chan map[revision]struct{}, 1), } - return &store{ + s := &store{ b: b, le: &lease.FakeLessor{}, kvindex: fi, - currentRev: revision{}, + currentRev: 0, compactMainRev: -1, fifoSched: schedule.NewFIFOScheduler(), stopc: make(chan struct{}), } + s.ReadView, s.WriteView = &readView{s}, &writeView{s} + return s } type rangeResp struct { @@ -611,6 +594,7 @@ type fakeBackend struct { } func (b *fakeBackend) BatchTx() backend.BatchTx { return b.tx } +func (b *fakeBackend) ReadTx() backend.ReadTx { return b.tx } func (b *fakeBackend) Hash(ignores map[backend.IgnoreKey]struct{}) (uint32, error) { return 0, nil } func (b *fakeBackend) Size() int64 { return 0 } func (b *fakeBackend) Snapshot() backend.Snapshot { return nil } diff --git a/mvcc/kvstore_txn.go b/mvcc/kvstore_txn.go new file mode 100644 index 00000000000..1e614755190 --- /dev/null +++ b/mvcc/kvstore_txn.go @@ -0,0 +1,254 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mvcc + +import ( + "github.com/coreos/etcd/lease" + "github.com/coreos/etcd/mvcc/backend" + "github.com/coreos/etcd/mvcc/mvccpb" +) + +type storeTxnRead struct { + s *store + tx backend.ReadTx + + firstRev int64 + rev int64 +} + +func (s *store) Read() TxnRead { + s.mu.RLock() + tx := s.b.ReadTx() + s.revMu.RLock() + tx.Lock() + firstRev, rev := s.compactMainRev, s.currentRev + s.revMu.RUnlock() + return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev}) +} + +func (tr *storeTxnRead) FirstRev() int64 { return tr.firstRev } +func (tr *storeTxnRead) Rev() int64 { return tr.rev } + +func (tr *storeTxnRead) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) { + return tr.rangeKeys(key, end, tr.Rev(), ro) +} + +func (tr *storeTxnRead) End() { + tr.tx.Unlock() + tr.s.mu.RUnlock() +} + +type storeTxnWrite struct { + *storeTxnRead + tx backend.BatchTx + // beginRev is the revision where the txn begins; it will write to the next revision. + beginRev int64 + changes []mvccpb.KeyValue +} + +func (s *store) Write() TxnWrite { + s.mu.RLock() + tx := s.b.BatchTx() + tx.Lock() + tw := &storeTxnWrite{ + storeTxnRead: &storeTxnRead{s, tx, 0, 0}, + tx: tx, + beginRev: s.currentRev, + changes: make([]mvccpb.KeyValue, 0, 4), + } + return newMetricsTxnWrite(tw) +} + +func (tw *storeTxnWrite) Rev() int64 { return tw.beginRev } + +func (tw *storeTxnWrite) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) { + rev := tw.beginRev + if len(tw.changes) > 0 { + rev++ + } + return tw.rangeKeys(key, end, rev, ro) +} + +func (tw *storeTxnWrite) DeleteRange(key, end []byte) (int64, int64) { + if n := tw.deleteRange(key, end); n != 0 || len(tw.changes) > 0 { + return n, int64(tw.beginRev + 1) + } + return 0, int64(tw.beginRev) +} + +func (tw *storeTxnWrite) Put(key, value []byte, lease lease.LeaseID) int64 { + tw.put(key, value, lease) + return int64(tw.beginRev + 1) +} + +func (tw *storeTxnWrite) End() { + // only update index if the txn modifies the mvcc state. + if len(tw.changes) != 0 { + tw.s.saveIndex(tw.tx) + // hold revMu lock to prevent new read txns from opening until writeback. + tw.s.revMu.Lock() + tw.s.currentRev++ + } + tw.tx.Unlock() + if len(tw.changes) != 0 { + tw.s.revMu.Unlock() + } + dbTotalSize.Set(float64(tw.s.b.Size())) + tw.s.mu.RUnlock() +} + +func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) { + rev := ro.Rev + if rev > curRev { + return &RangeResult{KVs: nil, Count: -1, Rev: curRev}, ErrFutureRev + } + if rev <= 0 { + rev = curRev + } + if rev < tr.s.compactMainRev { + return &RangeResult{KVs: nil, Count: -1, Rev: 0}, ErrCompacted + } + + _, revpairs := tr.s.kvindex.Range(key, end, int64(rev)) + if len(revpairs) == 0 { + return &RangeResult{KVs: nil, Count: 0, Rev: curRev}, nil + } + if ro.Count { + return &RangeResult{KVs: nil, Count: len(revpairs), Rev: curRev}, nil + } + + var kvs []mvccpb.KeyValue + for _, revpair := range revpairs { + start, end := revBytesRange(revpair) + _, vs := tr.tx.UnsafeRange(keyBucketName, start, end, 0) + if len(vs) != 1 { + plog.Fatalf("range cannot find rev (%d,%d)", revpair.main, revpair.sub) + } + + var kv mvccpb.KeyValue + if err := kv.Unmarshal(vs[0]); err != nil { + plog.Fatalf("cannot unmarshal event: %v", err) + } + kvs = append(kvs, kv) + if ro.Limit > 0 && len(kvs) >= int(ro.Limit) { + break + } + } + return &RangeResult{KVs: kvs, Count: len(revpairs), Rev: curRev}, nil +} + +func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) { + rev := tw.beginRev + 1 + c := rev + oldLease := lease.NoLease + + // if the key exists before, use its previous created and + // get its previous leaseID + _, created, ver, err := tw.s.kvindex.Get(key, rev) + if err == nil { + c = created.main + oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)}) + } + + ibytes := newRevBytes() + idxRev := revision{main: rev, sub: int64(len(tw.changes))} + revToBytes(idxRev, ibytes) + + ver = ver + 1 + kv := mvccpb.KeyValue{ + Key: key, + Value: value, + CreateRevision: c, + ModRevision: rev, + Version: ver, + Lease: int64(leaseID), + } + + d, err := kv.Marshal() + if err != nil { + plog.Fatalf("cannot marshal event: %v", err) + } + + tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d) + tw.s.kvindex.Put(key, idxRev) + tw.changes = append(tw.changes, kv) + + if oldLease != lease.NoLease { + if tw.s.le == nil { + panic("no lessor to detach lease") + } + err = tw.s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}}) + if err != nil { + plog.Errorf("unexpected error from lease detach: %v", err) + } + } + if leaseID != lease.NoLease { + if tw.s.le == nil { + panic("no lessor to attach lease") + } + err = tw.s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}}) + if err != nil { + panic("unexpected error from lease Attach") + } + } +} + +func (tw *storeTxnWrite) deleteRange(key, end []byte) int64 { + rrev := tw.beginRev + if len(tw.changes) > 0 { + rrev += 1 + } + keys, revs := tw.s.kvindex.Range(key, end, rrev) + if len(keys) == 0 { + return 0 + } + for i, key := range keys { + tw.delete(key, revs[i]) + } + return int64(len(keys)) +} + +func (tw *storeTxnWrite) delete(key []byte, rev revision) { + ibytes := newRevBytes() + idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))} + revToBytes(idxRev, ibytes) + ibytes = appendMarkTombstone(ibytes) + + kv := mvccpb.KeyValue{Key: key} + + d, err := kv.Marshal() + if err != nil { + plog.Fatalf("cannot marshal event: %v", err) + } + + tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d) + err = tw.s.kvindex.Tombstone(key, idxRev) + if err != nil { + plog.Fatalf("cannot tombstone an existing key (%s): %v", string(key), err) + } + tw.changes = append(tw.changes, kv) + + item := lease.LeaseItem{Key: string(key)} + leaseID := tw.s.le.GetLease(item) + + if leaseID != lease.NoLease { + err = tw.s.le.Detach(leaseID, []lease.LeaseItem{item}) + if err != nil { + plog.Errorf("cannot detach %v", err) + } + } +} + +func (tw *storeTxnWrite) Changes() []mvccpb.KeyValue { return tw.changes } diff --git a/mvcc/metrics_txn.go b/mvcc/metrics_txn.go new file mode 100644 index 00000000000..fd2144279ae --- /dev/null +++ b/mvcc/metrics_txn.go @@ -0,0 +1,67 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mvcc + +import ( + "github.com/coreos/etcd/lease" +) + +type metricsTxnWrite struct { + TxnWrite + ranges uint + puts uint + deletes uint +} + +func newMetricsTxnRead(tr TxnRead) TxnRead { + return &metricsTxnWrite{&txnReadWrite{tr}, 0, 0, 0} +} + +func newMetricsTxnWrite(tw TxnWrite) TxnWrite { + return &metricsTxnWrite{tw, 0, 0, 0} +} + +func (tw *metricsTxnWrite) Range(key, end []byte, ro RangeOptions) (*RangeResult, error) { + tw.ranges++ + return tw.TxnWrite.Range(key, end, ro) +} + +func (tw *metricsTxnWrite) DeleteRange(key, end []byte) (n, rev int64) { + tw.deletes++ + return tw.TxnWrite.DeleteRange(key, end) +} + +func (tw *metricsTxnWrite) Put(key, value []byte, lease lease.LeaseID) (rev int64) { + tw.puts++ + return tw.TxnWrite.Put(key, value, lease) +} + +func (tw *metricsTxnWrite) End() { + defer tw.TxnWrite.End() + if sum := tw.ranges + tw.puts + tw.deletes; sum != 1 { + if sum > 1 { + txnCounter.Inc() + } + return + } + switch { + case tw.ranges == 1: + rangeCounter.Inc() + case tw.puts == 1: + putCounter.Inc() + case tw.deletes == 1: + deleteCounter.Inc() + } +} diff --git a/mvcc/watchable_store.go b/mvcc/watchable_store.go index dbb79bcb693..ce852fddea2 100644 --- a/mvcc/watchable_store.go +++ b/mvcc/watchable_store.go @@ -41,10 +41,12 @@ type watchable interface { } type watchableStore struct { - mu sync.Mutex - *store + // mu protects watcher groups and batches. It should never be locked + // before locking store.mu to avoid deadlock. + mu sync.RWMutex + // victims are watcher batches that were blocked on the watch channel victims []watcherBatch victimc chan struct{} @@ -76,9 +78,11 @@ func newWatchableStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGet synced: newWatcherGroup(), stopc: make(chan struct{}), } + s.store.ReadView = &readView{s} + s.store.WriteView = &writeView{s} if s.le != nil { // use this store as the deleter so revokes trigger watch events - s.le.SetRangeDeleter(s) + s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write() }) } s.wg.Add(2) go s.syncWatchersLoop() @@ -86,89 +90,6 @@ func newWatchableStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGet return s } -func (s *watchableStore) Put(key, value []byte, lease lease.LeaseID) (rev int64) { - s.mu.Lock() - defer s.mu.Unlock() - - rev = s.store.Put(key, value, lease) - changes := s.store.getChanges() - if len(changes) != 1 { - plog.Panicf("unexpected len(changes) != 1 after put") - } - - ev := mvccpb.Event{ - Type: mvccpb.PUT, - Kv: &changes[0], - } - s.notify(rev, []mvccpb.Event{ev}) - return rev -} - -func (s *watchableStore) DeleteRange(key, end []byte) (n, rev int64) { - s.mu.Lock() - defer s.mu.Unlock() - - n, rev = s.store.DeleteRange(key, end) - changes := s.store.getChanges() - - if len(changes) != int(n) { - plog.Panicf("unexpected len(changes) != n after deleteRange") - } - - if n == 0 { - return n, rev - } - - evs := make([]mvccpb.Event, n) - for i := range changes { - evs[i] = mvccpb.Event{ - Type: mvccpb.DELETE, - Kv: &changes[i]} - evs[i].Kv.ModRevision = rev - } - s.notify(rev, evs) - return n, rev -} - -func (s *watchableStore) TxnBegin() int64 { - s.mu.Lock() - return s.store.TxnBegin() -} - -func (s *watchableStore) TxnEnd(txnID int64) error { - err := s.store.TxnEnd(txnID) - if err != nil { - return err - } - - changes := s.getChanges() - if len(changes) == 0 { - s.mu.Unlock() - return nil - } - - rev := s.store.Rev() - evs := make([]mvccpb.Event, len(changes)) - for i, change := range changes { - switch change.CreateRevision { - case 0: - evs[i] = mvccpb.Event{ - Type: mvccpb.DELETE, - Kv: &changes[i]} - evs[i].Kv.ModRevision = rev - default: - evs[i] = mvccpb.Event{ - Type: mvccpb.PUT, - Kv: &changes[i]} - } - } - - s.notify(rev, evs) - s.mu.Unlock() - - return nil -} - func (s *watchableStore) Close() error { close(s.stopc) s.wg.Wait() @@ -186,9 +107,6 @@ func (s *watchableStore) NewWatchStream() WatchStream { } func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) { - s.mu.Lock() - defer s.mu.Unlock() - wa := &watcher{ key: key, end: end, @@ -198,21 +116,24 @@ func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch c fcs: fcs, } - s.store.mu.Lock() - synced := startRev > s.store.currentRev.main || startRev == 0 + s.mu.Lock() + s.revMu.RLock() + synced := startRev > s.store.currentRev || startRev == 0 if synced { - wa.minRev = s.store.currentRev.main + 1 + wa.minRev = s.store.currentRev + 1 if startRev > wa.minRev { wa.minRev = startRev } } - s.store.mu.Unlock() if synced { s.synced.add(wa) } else { slowWatcherGauge.Inc() s.unsynced.add(wa) } + s.revMu.RUnlock() + s.mu.Unlock() + watcherGauge.Inc() return wa, func() { s.cancelWatcher(wa) } @@ -263,12 +184,15 @@ func (s *watchableStore) syncWatchersLoop() { defer s.wg.Done() for { - s.mu.Lock() + s.mu.RLock() st := time.Now() lastUnsyncedWatchers := s.unsynced.size() - s.syncWatchers() - unsyncedWatchers := s.unsynced.size() - s.mu.Unlock() + s.mu.RUnlock() + + unsyncedWatchers := 0 + if lastUnsyncedWatchers > 0 { + unsyncedWatchers = s.syncWatchers() + } syncDuration := time.Since(st) waitDuration := 100 * time.Millisecond @@ -295,9 +219,9 @@ func (s *watchableStore) syncVictimsLoop() { for s.moveVictims() != 0 { // try to update all victim watchers } - s.mu.Lock() + s.mu.RLock() isEmpty := len(s.victims) == 0 - s.mu.Unlock() + s.mu.RUnlock() var tickc <-chan time.Time if !isEmpty { @@ -340,8 +264,8 @@ func (s *watchableStore) moveVictims() (moved int) { // assign completed victim watchers to unsync/sync s.mu.Lock() - s.store.mu.Lock() - curRev := s.store.currentRev.main + s.store.revMu.RLock() + curRev := s.store.currentRev for w, eb := range wb { if newVictim != nil && newVictim[w] != nil { // couldn't send watch response; stays victim @@ -358,7 +282,7 @@ func (s *watchableStore) moveVictims() (moved int) { s.synced.add(w) } } - s.store.mu.Unlock() + s.store.revMu.RUnlock() s.mu.Unlock() } @@ -376,19 +300,23 @@ func (s *watchableStore) moveVictims() (moved int) { // 2. iterate over the set to get the minimum revision and remove compacted watchers // 3. use minimum revision to get all key-value pairs and send those events to watchers // 4. remove synced watchers in set from unsynced group and move to synced group -func (s *watchableStore) syncWatchers() { +func (s *watchableStore) syncWatchers() int { + s.mu.Lock() + defer s.mu.Unlock() + if s.unsynced.size() == 0 { - return + return 0 } - s.store.mu.Lock() - defer s.store.mu.Unlock() + s.store.revMu.RLock() + defer s.store.revMu.RUnlock() // in order to find key-value pairs from unsynced watchers, we need to // find min revision index, and these revisions can be used to // query the backend store of key-value pairs - curRev := s.store.currentRev.main + curRev := s.store.currentRev compactionRev := s.store.compactMainRev + wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev) minBytes, maxBytes := newRevBytes(), newRevBytes() revToBytes(revision{main: minRev}, minBytes) @@ -396,7 +324,7 @@ func (s *watchableStore) syncWatchers() { // UnsafeRange returns keys and values. And in boltdb, keys are revisions. // values are actual key-value pairs in backend. - tx := s.store.b.BatchTx() + tx := s.store.b.ReadTx() tx.Lock() revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0) evs := kvsToEvents(wg, revs, vs) @@ -446,6 +374,8 @@ func (s *watchableStore) syncWatchers() { vsz += len(v) } slowWatcherGauge.Set(float64(s.unsynced.size() + vsz)) + + return s.unsynced.size() } // kvsToEvents gets all events for the watchers from all key-value pairs @@ -511,8 +441,8 @@ func (s *watchableStore) addVictim(victim watcherBatch) { func (s *watchableStore) rev() int64 { return s.store.Rev() } func (s *watchableStore) progress(w *watcher) { - s.mu.Lock() - defer s.mu.Unlock() + s.mu.RLock() + defer s.mu.RUnlock() if _, ok := s.synced.watchers[w]; ok { w.send(WatchResponse{WatchID: w.id, Revision: s.rev()}) diff --git a/mvcc/watchable_store_bench_test.go b/mvcc/watchable_store_bench_test.go index c6eedfe2040..769d1bc38a8 100644 --- a/mvcc/watchable_store_bench_test.go +++ b/mvcc/watchable_store_bench_test.go @@ -57,11 +57,9 @@ func BenchmarkWatchableStoreTxnPut(b *testing.B) { b.ResetTimer() b.ReportAllocs() for i := 0; i < b.N; i++ { - id := s.TxnBegin() - if _, err := s.TxnPut(id, keys[i], vals[i], lease.NoLease); err != nil { - plog.Fatalf("txn put error: %v", err) - } - s.TxnEnd(id) + txn := s.Write() + txn.Put(keys[i], vals[i], lease.NoLease) + txn.End() } } diff --git a/mvcc/watchable_store_test.go b/mvcc/watchable_store_test.go index 42f69ee7561..37bd01d8878 100644 --- a/mvcc/watchable_store_test.go +++ b/mvcc/watchable_store_test.go @@ -321,8 +321,8 @@ func TestWatchBatchUnsynced(t *testing.T) { } } - s.store.mu.Lock() - defer s.store.mu.Unlock() + s.store.revMu.Lock() + defer s.store.revMu.Unlock() if size := s.synced.size(); size != 1 { t.Errorf("synced size = %d, want 1", size) } diff --git a/mvcc/watchable_store_txn.go b/mvcc/watchable_store_txn.go new file mode 100644 index 00000000000..5c5bfda1341 --- /dev/null +++ b/mvcc/watchable_store_txn.go @@ -0,0 +1,53 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mvcc + +import ( + "github.com/coreos/etcd/mvcc/mvccpb" +) + +func (tw *watchableStoreTxnWrite) End() { + changes := tw.Changes() + if len(changes) == 0 { + tw.TxnWrite.End() + return + } + + rev := tw.Rev() + 1 + evs := make([]mvccpb.Event, len(changes)) + for i, change := range changes { + evs[i].Kv = &changes[i] + if change.CreateRevision == 0 { + evs[i].Type = mvccpb.DELETE + evs[i].Kv.ModRevision = rev + } else { + evs[i].Type = mvccpb.PUT + } + } + + // end write txn under watchable store lock so the updates are visible + // when asynchronous event posting checks the current store revision + tw.s.mu.Lock() + tw.s.notify(rev, evs) + tw.TxnWrite.End() + tw.s.mu.Unlock() +} + +type watchableStoreTxnWrite struct { + TxnWrite + s *watchableStore +} + +func (s *watchableStore) Write() TxnWrite { return &watchableStoreTxnWrite{s.store.Write(), s} } From f0c184b3a23af4b3cb40522e42fdfeb978dfb991 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Wed, 4 Jan 2017 22:30:03 -0800 Subject: [PATCH 3/6] lease: support mvcc txn --- lease/lessor.go | 45 +++++++++++++++++--------------------------- lease/lessor_test.go | 22 ++++++---------------- 2 files changed, 23 insertions(+), 44 deletions(-) diff --git a/lease/lessor.go b/lease/lessor.go index 385bd76d73c..5120d1cfcdd 100644 --- a/lease/lessor.go +++ b/lease/lessor.go @@ -43,28 +43,24 @@ var ( ErrLeaseExists = errors.New("lease already exists") ) -type LeaseID int64 - -// RangeDeleter defines an interface with Txn and DeleteRange method. -// We define this interface only for lessor to limit the number -// of methods of mvcc.KV to what lessor actually needs. -// -// Having a minimum interface makes testing easy. -type RangeDeleter interface { - // TxnBegin see comments on mvcc.KV - TxnBegin() int64 - // TxnEnd see comments on mvcc.KV - TxnEnd(txnID int64) error - // TxnDeleteRange see comments on mvcc.KV - TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) +// TxnDelete is a TxnWrite that only permits deletes. Defined here +// to avoid circular dependency with mvcc. +type TxnDelete interface { + DeleteRange(key, end []byte) (n, rev int64) + End() } +// RangeDeleter is a TxnDelete constructor. +type RangeDeleter func() TxnDelete + +type LeaseID int64 + // Lessor owns leases. It can grant, revoke, renew and modify leases for lessee. type Lessor interface { - // SetRangeDeleter sets the RangeDeleter to the Lessor. - // Lessor deletes the items in the revoked or expired lease from the - // the set RangeDeleter. - SetRangeDeleter(dr RangeDeleter) + // SetRangeDeleter lets the lessor create TxnDeletes to the store. + // Lessor deletes the items in the revoked or expired lease by creating + // new TxnDeletes. + SetRangeDeleter(rd RangeDeleter) // Grant grants a lease that expires at least after TTL seconds. Grant(id LeaseID, ttl int64) (*Lease, error) @@ -248,17 +244,14 @@ func (le *lessor) Revoke(id LeaseID) error { return nil } - tid := le.rd.TxnBegin() + txn := le.rd() // sort keys so deletes are in same order among all members, // otherwise the backened hashes will be different keys := l.Keys() sort.StringSlice(keys).Sort() for _, key := range keys { - _, _, err := le.rd.TxnDeleteRange(tid, []byte(key), nil) - if err != nil { - panic(err) - } + txn.DeleteRange([]byte(key), nil) } le.mu.Lock() @@ -269,11 +262,7 @@ func (le *lessor) Revoke(id LeaseID) error { // deleting the keys if etcdserver fails in between. le.b.BatchTx().UnsafeDelete(leaseBucketName, int64ToBytes(int64(l.ID))) - err := le.rd.TxnEnd(tid) - if err != nil { - panic(err) - } - + txn.End() return nil } diff --git a/lease/lessor_test.go b/lease/lessor_test.go index 97793f6942d..c90b64aba01 100644 --- a/lease/lessor_test.go +++ b/lease/lessor_test.go @@ -86,10 +86,8 @@ func TestLeaseConcurrentKeys(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - fd := &fakeDeleter{} - le := newLessor(be, minLeaseTTL) - le.SetRangeDeleter(fd) + le.SetRangeDeleter(func() TxnDelete { return &fakeDeleter{} }) // grant a lease with long term (100 seconds) to // avoid early termination during the test. @@ -138,7 +136,7 @@ func TestLessorRevoke(t *testing.T) { fd := &fakeDeleter{} le := newLessor(be, minLeaseTTL) - le.SetRangeDeleter(fd) + le.SetRangeDeleter(func() TxnDelete { return fd }) // grant a lease with long term (100 seconds) to // avoid early termination during the test. @@ -215,10 +213,8 @@ func TestLessorDetach(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - fd := &fakeDeleter{} - le := newLessor(be, minLeaseTTL) - le.SetRangeDeleter(fd) + le.SetRangeDeleter(func() TxnDelete { return &fakeDeleter{} }) // grant a lease with long term (100 seconds) to // avoid early termination during the test. @@ -382,17 +378,11 @@ type fakeDeleter struct { deleted []string } -func (fd *fakeDeleter) TxnBegin() int64 { - return 0 -} - -func (fd *fakeDeleter) TxnEnd(txnID int64) error { - return nil -} +func (fd *fakeDeleter) End() {} -func (fd *fakeDeleter) TxnDeleteRange(tid int64, key, end []byte) (int64, int64, error) { +func (fd *fakeDeleter) DeleteRange(key, end []byte) (int64, int64) { fd.deleted = append(fd.deleted, string(key)+"_"+string(end)) - return 0, 0, nil + return 0, 0 } func NewTestBackend(t *testing.T) (string, backend.Backend) { From 58da8b17ee68fb93d919c7dd396f69c1d8bc6d7f Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Thu, 5 Jan 2017 01:13:47 -0800 Subject: [PATCH 4/6] etcdserver: support mvcc txn --- etcdserver/apply.go | 192 +++++++++++++++------------------------ etcdserver/apply_auth.go | 13 +-- etcdserver/server.go | 2 +- etcdserver/v3_server.go | 4 +- 4 files changed, 81 insertions(+), 130 deletions(-) diff --git a/etcdserver/apply.go b/etcdserver/apply.go index 0b27b3f5bf6..b90e0a0992d 100644 --- a/etcdserver/apply.go +++ b/etcdserver/apply.go @@ -16,7 +16,6 @@ package etcdserver import ( "bytes" - "fmt" "sort" "time" @@ -30,11 +29,6 @@ import ( ) const ( - // noTxn is an invalid txn ID. - // To apply with independent Range, Put, Delete, you can pass noTxn - // to apply functions instead of a valid txn ID. - noTxn = -1 - warnApplyDuration = 100 * time.Millisecond ) @@ -51,9 +45,9 @@ type applyResult struct { type applierV3 interface { Apply(r *pb.InternalRaftRequest) *applyResult - Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse, error) - Range(txnID int64, r *pb.RangeRequest) (*pb.RangeResponse, error) - DeleteRange(txnID int64, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) + Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) + Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) + DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error) @@ -99,11 +93,11 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult { // call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls switch { case r.Range != nil: - ar.resp, ar.err = a.s.applyV3.Range(noTxn, r.Range) + ar.resp, ar.err = a.s.applyV3.Range(nil, r.Range) case r.Put != nil: - ar.resp, ar.err = a.s.applyV3.Put(noTxn, r.Put) + ar.resp, ar.err = a.s.applyV3.Put(nil, r.Put) case r.DeleteRange != nil: - ar.resp, ar.err = a.s.applyV3.DeleteRange(noTxn, r.DeleteRange) + ar.resp, ar.err = a.s.applyV3.DeleteRange(nil, r.DeleteRange) case r.Txn != nil: ar.resp, ar.err = a.s.applyV3.Txn(r.Txn) case r.Compaction != nil: @@ -152,122 +146,87 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult { return ar } -func (a *applierV3backend) Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse, error) { - resp := &pb.PutResponse{} +func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, err error) { + resp = &pb.PutResponse{} resp.Header = &pb.ResponseHeader{} - var ( - rev int64 - err error - ) - var rr *mvcc.RangeResult - if p.PrevKv || p.IgnoreValue || p.IgnoreLease { - if txnID != noTxn { - rr, err = a.s.KV().TxnRange(txnID, p.Key, nil, mvcc.RangeOptions{}) - if err != nil { - return nil, err - } - } else { - rr, err = a.s.KV().Range(p.Key, nil, mvcc.RangeOptions{}) - if err != nil { - return nil, err + val, leaseID := p.Value, lease.LeaseID(p.Lease) + if txn == nil { + if leaseID != lease.NoLease { + if l := a.s.lessor.Lookup(leaseID); l == nil { + return nil, lease.ErrLeaseNotFound } } + txn = a.s.KV().Write() + defer txn.End() } - if p.IgnoreValue { - if rr == nil || len(rr.KVs) == 0 { - // ignore_value flag expects previous key-value pair - return nil, ErrKeyNotFound + var rr *mvcc.RangeResult + if p.IgnoreValue || p.IgnoreLease || p.PrevKv { + rr, err = txn.Range(p.Key, nil, mvcc.RangeOptions{}) + if err != nil { + return nil, err } - p.Value = rr.KVs[0].Value } - - if p.IgnoreLease { + if p.IgnoreValue || p.IgnoreLease { if rr == nil || len(rr.KVs) == 0 { - // ignore_lease flag expects previous key-value pair + // ignore_{lease,value} flag expects previous key-value pair return nil, ErrKeyNotFound } - p.Lease = rr.KVs[0].Lease } - - if txnID != noTxn { - rev, err = a.s.KV().TxnPut(txnID, p.Key, p.Value, lease.LeaseID(p.Lease)) - if err != nil { - return nil, err - } - } else { - leaseID := lease.LeaseID(p.Lease) - if leaseID != lease.NoLease { - if l := a.s.lessor.Lookup(leaseID); l == nil { - return nil, lease.ErrLeaseNotFound - } - } - rev = a.s.KV().Put(p.Key, p.Value, leaseID) + if p.IgnoreValue { + val = rr.KVs[0].Value } - resp.Header.Revision = rev - if p.PrevKv && rr != nil && len(rr.KVs) != 0 { - resp.PrevKv = &rr.KVs[0] + if p.IgnoreLease { + leaseID = lease.LeaseID(rr.KVs[0].Lease) + } + if p.PrevKv { + if rr != nil && len(rr.KVs) != 0 { + resp.PrevKv = &rr.KVs[0] + } } + + resp.Header.Revision = txn.Put(p.Key, val, leaseID) return resp, nil } -func (a *applierV3backend) DeleteRange(txnID int64, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { +func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { resp := &pb.DeleteRangeResponse{} resp.Header = &pb.ResponseHeader{} - var ( - n int64 - rev int64 - err error - ) + if txn == nil { + txn = a.s.kv.Write() + defer txn.End() + } if isGteRange(dr.RangeEnd) { dr.RangeEnd = []byte{} } - var rr *mvcc.RangeResult if dr.PrevKv { - if txnID != noTxn { - rr, err = a.s.KV().TxnRange(txnID, dr.Key, dr.RangeEnd, mvcc.RangeOptions{}) - if err != nil { - return nil, err - } - } else { - rr, err = a.s.KV().Range(dr.Key, dr.RangeEnd, mvcc.RangeOptions{}) - if err != nil { - return nil, err - } - } - } - - if txnID != noTxn { - n, rev, err = a.s.KV().TxnDeleteRange(txnID, dr.Key, dr.RangeEnd) + rr, err := txn.Range(dr.Key, dr.RangeEnd, mvcc.RangeOptions{}) if err != nil { return nil, err } - } else { - n, rev = a.s.KV().DeleteRange(dr.Key, dr.RangeEnd) - } - - resp.Deleted = n - if rr != nil { - for i := range rr.KVs { - resp.PrevKvs = append(resp.PrevKvs, &rr.KVs[i]) + if rr != nil { + for i := range rr.KVs { + resp.PrevKvs = append(resp.PrevKvs, &rr.KVs[i]) + } } } - resp.Header.Revision = rev + + resp.Deleted, resp.Header.Revision = txn.DeleteRange(dr.Key, dr.RangeEnd) return resp, nil } -func (a *applierV3backend) Range(txnID int64, r *pb.RangeRequest) (*pb.RangeResponse, error) { +func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) { resp := &pb.RangeResponse{} resp.Header = &pb.ResponseHeader{} - var ( - rr *mvcc.RangeResult - err error - ) + if txn == nil { + txn = a.s.kv.Read() + defer txn.End() + } if isGteRange(r.RangeEnd) { r.RangeEnd = []byte{} @@ -291,16 +250,9 @@ func (a *applierV3backend) Range(txnID int64, r *pb.RangeRequest) (*pb.RangeResp Count: r.CountOnly, } - if txnID != noTxn { - rr, err = a.s.KV().TxnRange(txnID, r.Key, r.RangeEnd, ro) - if err != nil { - return nil, err - } - } else { - rr, err = a.s.KV().Range(r.Key, r.RangeEnd, ro) - if err != nil { - return nil, err - } + rr, err := txn.Range(r.Key, r.RangeEnd, ro) + if err != nil { + return nil, err } if r.MaxModRevision != 0 { @@ -387,23 +339,24 @@ func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) { return nil, err } - // When executing the operations of txn, we need to hold the txn lock. - // So the reader will not see any intermediate results. - txnID := a.s.KV().TxnBegin() - resps := make([]*pb.ResponseOp, len(reqs)) + + // When executing the operations of txn, etcd must hold the txn lock so + // readers do not see any intermediate results. + // TODO: use Read txn if only Ranges + txn := a.s.KV().Write() for i := range reqs { - resps[i] = a.applyUnion(txnID, reqs[i]) + resps[i] = a.applyUnion(txn, reqs[i]) } - - err := a.s.KV().TxnEnd(txnID) - if err != nil { - panic(fmt.Sprint("unexpected error when closing txn", txnID)) + rev := txn.Rev() + if len(txn.Changes()) != 0 { + rev++ } + txn.End() txnResp := &pb.TxnResponse{} txnResp.Header = &pb.ResponseHeader{} - txnResp.Header.Revision = a.s.KV().Rev() + txnResp.Header.Revision = rev txnResp.Responses = resps txnResp.Succeeded = ok return txnResp, nil @@ -417,9 +370,6 @@ func (a *applierV3backend) applyCompare(c *pb.Compare) (int64, bool) { rev := rr.Rev if err != nil { - if err == mvcc.ErrTxnIDMismatch { - panic("unexpected txn ID mismatch error") - } return rev, false } var ckv mvccpb.KeyValue @@ -483,11 +433,11 @@ func (a *applierV3backend) applyCompare(c *pb.Compare) (int64, bool) { return rev, true } -func (a *applierV3backend) applyUnion(txnID int64, union *pb.RequestOp) *pb.ResponseOp { +func (a *applierV3backend) applyUnion(txn mvcc.TxnWrite, union *pb.RequestOp) *pb.ResponseOp { switch tv := union.Request.(type) { case *pb.RequestOp_RequestRange: if tv.RequestRange != nil { - resp, err := a.Range(txnID, tv.RequestRange) + resp, err := a.Range(txn, tv.RequestRange) if err != nil { plog.Panicf("unexpected error during txn: %v", err) } @@ -495,7 +445,7 @@ func (a *applierV3backend) applyUnion(txnID int64, union *pb.RequestOp) *pb.Resp } case *pb.RequestOp_RequestPut: if tv.RequestPut != nil { - resp, err := a.Put(txnID, tv.RequestPut) + resp, err := a.Put(txn, tv.RequestPut) if err != nil { plog.Panicf("unexpected error during txn: %v", err) } @@ -503,7 +453,7 @@ func (a *applierV3backend) applyUnion(txnID int64, union *pb.RequestOp) *pb.Resp } case *pb.RequestOp_RequestDeleteRange: if tv.RequestDeleteRange != nil { - resp, err := a.DeleteRange(txnID, tv.RequestDeleteRange) + resp, err := a.DeleteRange(txn, tv.RequestDeleteRange) if err != nil { plog.Panicf("unexpected error during txn: %v", err) } @@ -605,7 +555,7 @@ type applierV3Capped struct { // with Puts so that the number of keys in the store is capped. func newApplierV3Capped(base applierV3) applierV3 { return &applierV3Capped{applierV3: base} } -func (a *applierV3Capped) Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse, error) { +func (a *applierV3Capped) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) { return nil, ErrNoSpace } @@ -699,9 +649,9 @@ func newQuotaApplierV3(s *EtcdServer, app applierV3) applierV3 { return "aApplierV3{app, NewBackendQuota(s)} } -func (a *quotaApplierV3) Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse, error) { +func (a *quotaApplierV3) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) { ok := a.q.Available(p) - resp, err := a.applierV3.Put(txnID, p) + resp, err := a.applierV3.Put(txn, p) if err == nil && !ok { err = ErrNoSpace } diff --git a/etcdserver/apply_auth.go b/etcdserver/apply_auth.go index 4868e855ca1..7da4ae45df5 100644 --- a/etcdserver/apply_auth.go +++ b/etcdserver/apply_auth.go @@ -19,6 +19,7 @@ import ( "github.com/coreos/etcd/auth" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/mvcc" ) type authApplierV3 struct { @@ -58,7 +59,7 @@ func (aa *authApplierV3) Apply(r *pb.InternalRaftRequest) *applyResult { return ret } -func (aa *authApplierV3) Put(txnID int64, r *pb.PutRequest) (*pb.PutResponse, error) { +func (aa *authApplierV3) Put(txn mvcc.TxnWrite, r *pb.PutRequest) (*pb.PutResponse, error) { if err := aa.as.IsPutPermitted(&aa.authInfo, r.Key); err != nil { return nil, err } @@ -68,17 +69,17 @@ func (aa *authApplierV3) Put(txnID int64, r *pb.PutRequest) (*pb.PutResponse, er return nil, err } } - return aa.applierV3.Put(txnID, r) + return aa.applierV3.Put(txn, r) } -func (aa *authApplierV3) Range(txnID int64, r *pb.RangeRequest) (*pb.RangeResponse, error) { +func (aa *authApplierV3) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) { if err := aa.as.IsRangePermitted(&aa.authInfo, r.Key, r.RangeEnd); err != nil { return nil, err } - return aa.applierV3.Range(txnID, r) + return aa.applierV3.Range(txn, r) } -func (aa *authApplierV3) DeleteRange(txnID int64, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { +func (aa *authApplierV3) DeleteRange(txn mvcc.TxnWrite, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { if err := aa.as.IsDeleteRangePermitted(&aa.authInfo, r.Key, r.RangeEnd); err != nil { return nil, err } @@ -89,7 +90,7 @@ func (aa *authApplierV3) DeleteRange(txnID int64, r *pb.DeleteRangeRequest) (*pb } } - return aa.applierV3.DeleteRange(txnID, r) + return aa.applierV3.DeleteRange(txn, r) } func checkTxnReqsPermission(as auth.AuthStore, ai *auth.AuthInfo, reqs []*pb.RequestOp) error { diff --git a/etcdserver/server.go b/etcdserver/server.go index 759200a4862..dc662985231 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -807,7 +807,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { // If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers. if s.lessor != nil { plog.Info("recovering lessor...") - s.lessor.Recover(newbe, s.kv) + s.lessor.Recover(newbe, func() lease.TxnDelete { return s.kv.Write() }) plog.Info("finished recovering lessor") } diff --git a/etcdserver/v3_server.go b/etcdserver/v3_server.go index 1472ac34baf..c53c175397f 100644 --- a/etcdserver/v3_server.go +++ b/etcdserver/v3_server.go @@ -107,7 +107,7 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe chk := func(ai *auth.AuthInfo) error { return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd) } - get := func() { resp, err = s.applyV3Base.Range(noTxn, r) } + get := func() { resp, err = s.applyV3Base.Range(nil, r) } if serr := s.doSerialize(ctx, chk, get); serr != nil { return nil, serr } @@ -122,7 +122,7 @@ func (s *EtcdServer) legacyRange(ctx context.Context, r *pb.RangeRequest) (*pb.R chk := func(ai *auth.AuthInfo) error { return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd) } - get := func() { resp, err = s.applyV3Base.Range(noTxn, r) } + get := func() { resp, err = s.applyV3Base.Range(nil, r) } if serr := s.doSerialize(ctx, chk, get); serr != nil { return nil, serr } From 0ed3c83e49f876c2adfa28b049804e36f7e5cb98 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Thu, 5 Jan 2017 01:16:15 -0800 Subject: [PATCH 5/6] benchmark: support mvcc txn --- tools/benchmark/cmd/mvcc-put.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/tools/benchmark/cmd/mvcc-put.go b/tools/benchmark/cmd/mvcc-put.go index 992ff2331d0..bebb13d8c27 100644 --- a/tools/benchmark/cmd/mvcc-put.go +++ b/tools/benchmark/cmd/mvcc-put.go @@ -109,12 +109,9 @@ func mvccPutFunc(cmd *cobra.Command, args []string) { for i := 0; i < totalNrKeys; i++ { st := time.Now() if txn { - id := s.TxnBegin() - if _, err := s.TxnPut(id, keys[i], vals[i], lease.NoLease); err != nil { - fmt.Fprintln(os.Stderr, "txn put error:", err) - os.Exit(1) - } - s.TxnEnd(id) + tw := s.Write() + tw.Put(keys[i], vals[i], lease.NoLease) + tw.End() } else { s.Put(keys[i], vals[i], lease.NoLease) } From d1dcc828c814b7dda7cfa74b74e57d7cbc17119a Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Mon, 13 Feb 2017 09:38:54 -0800 Subject: [PATCH 6/6] etcdctl: support mvcc txn --- etcdctl/ctlv3/command/snapshot_command.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/etcdctl/ctlv3/command/snapshot_command.go b/etcdctl/ctlv3/command/snapshot_command.go index 28f2d8bc638..bcfd8145fde 100644 --- a/etcdctl/ctlv3/command/snapshot_command.go +++ b/etcdctl/ctlv3/command/snapshot_command.go @@ -375,13 +375,12 @@ func makeDB(snapdir, dbfile string, commit int) { be := backend.NewDefaultBackend(dbpath) // a lessor never timeouts leases lessor := lease.NewLessor(be, math.MaxInt64) - s := mvcc.NewStore(be, lessor, (*initIndex)(&commit)) - id := s.TxnBegin() + txn := s.Write() btx := be.BatchTx() del := func(k, v []byte) error { - _, _, err := s.TxnDeleteRange(id, k, nil) - return err + txn.DeleteRange(k, nil) + return nil } // delete stored members from old cluster since using new members @@ -389,7 +388,7 @@ func makeDB(snapdir, dbfile string, commit int) { // todo: add back new members when we start to deprecate old snap file. btx.UnsafeForEach([]byte("members_removed"), del) // trigger write-out of new consistent index - s.TxnEnd(id) + txn.End() s.Commit() s.Close() }