Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mvcc: txns and r/w views #7105

Merged
merged 6 commits into from
Mar 9, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions etcdctl/ctlv3/command/snapshot_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,21 +375,20 @@ func makeDB(snapdir, dbfile string, commit int) {
be := backend.NewDefaultBackend(dbpath)
// a lessor never timeouts leases
lessor := lease.NewLessor(be, math.MaxInt64)

s := mvcc.NewStore(be, lessor, (*initIndex)(&commit))
id := s.TxnBegin()
txn := s.Write()
btx := be.BatchTx()
del := func(k, v []byte) error {
_, _, err := s.TxnDeleteRange(id, k, nil)
return err
txn.DeleteRange(k, nil)
return nil
}

// delete stored members from old cluster since using new members
btx.UnsafeForEach([]byte("members"), del)
// todo: add back new members when we start to deprecate old snap file.
btx.UnsafeForEach([]byte("members_removed"), del)
// trigger write-out of new consistent index
s.TxnEnd(id)
txn.End()
s.Commit()
s.Close()
}
Expand Down
192 changes: 71 additions & 121 deletions etcdserver/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package etcdserver

import (
"bytes"
"fmt"
"sort"
"time"

Expand All @@ -30,11 +29,6 @@ import (
)

const (
// noTxn is an invalid txn ID.
// To apply with independent Range, Put, Delete, you can pass noTxn
// to apply functions instead of a valid txn ID.
noTxn = -1

warnApplyDuration = 100 * time.Millisecond
)

Expand All @@ -51,9 +45,9 @@ type applyResult struct {
type applierV3 interface {
Apply(r *pb.InternalRaftRequest) *applyResult

Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse, error)
Range(txnID int64, r *pb.RangeRequest) (*pb.RangeResponse, error)
DeleteRange(txnID int64, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error)
Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error)
DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error)
Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error)

Expand Down Expand Up @@ -99,11 +93,11 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
// call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls
switch {
case r.Range != nil:
ar.resp, ar.err = a.s.applyV3.Range(noTxn, r.Range)
ar.resp, ar.err = a.s.applyV3.Range(nil, r.Range)
case r.Put != nil:
ar.resp, ar.err = a.s.applyV3.Put(noTxn, r.Put)
ar.resp, ar.err = a.s.applyV3.Put(nil, r.Put)
case r.DeleteRange != nil:
ar.resp, ar.err = a.s.applyV3.DeleteRange(noTxn, r.DeleteRange)
ar.resp, ar.err = a.s.applyV3.DeleteRange(nil, r.DeleteRange)
case r.Txn != nil:
ar.resp, ar.err = a.s.applyV3.Txn(r.Txn)
case r.Compaction != nil:
Expand Down Expand Up @@ -152,122 +146,87 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
return ar
}

func (a *applierV3backend) Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse, error) {
resp := &pb.PutResponse{}
func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, err error) {
resp = &pb.PutResponse{}
resp.Header = &pb.ResponseHeader{}
var (
rev int64
err error
)

var rr *mvcc.RangeResult
if p.PrevKv || p.IgnoreValue || p.IgnoreLease {
if txnID != noTxn {
rr, err = a.s.KV().TxnRange(txnID, p.Key, nil, mvcc.RangeOptions{})
if err != nil {
return nil, err
}
} else {
rr, err = a.s.KV().Range(p.Key, nil, mvcc.RangeOptions{})
if err != nil {
return nil, err
val, leaseID := p.Value, lease.LeaseID(p.Lease)
if txn == nil {
if leaseID != lease.NoLease {
if l := a.s.lessor.Lookup(leaseID); l == nil {
return nil, lease.ErrLeaseNotFound
}
}
txn = a.s.KV().Write()
defer txn.End()
}

if p.IgnoreValue {
if rr == nil || len(rr.KVs) == 0 {
// ignore_value flag expects previous key-value pair
return nil, ErrKeyNotFound
var rr *mvcc.RangeResult
if p.IgnoreValue || p.IgnoreLease || p.PrevKv {
rr, err = txn.Range(p.Key, nil, mvcc.RangeOptions{})
if err != nil {
return nil, err
}
p.Value = rr.KVs[0].Value
}

if p.IgnoreLease {
if p.IgnoreValue || p.IgnoreLease {
if rr == nil || len(rr.KVs) == 0 {
// ignore_lease flag expects previous key-value pair
// ignore_{lease,value} flag expects previous key-value pair
return nil, ErrKeyNotFound
}
p.Lease = rr.KVs[0].Lease
}

if txnID != noTxn {
rev, err = a.s.KV().TxnPut(txnID, p.Key, p.Value, lease.LeaseID(p.Lease))
if err != nil {
return nil, err
}
} else {
leaseID := lease.LeaseID(p.Lease)
if leaseID != lease.NoLease {
if l := a.s.lessor.Lookup(leaseID); l == nil {
return nil, lease.ErrLeaseNotFound
}
}
rev = a.s.KV().Put(p.Key, p.Value, leaseID)
if p.IgnoreValue {
val = rr.KVs[0].Value
}
resp.Header.Revision = rev
if p.PrevKv && rr != nil && len(rr.KVs) != 0 {
resp.PrevKv = &rr.KVs[0]
if p.IgnoreLease {
leaseID = lease.LeaseID(rr.KVs[0].Lease)
}
if p.PrevKv {
if rr != nil && len(rr.KVs) != 0 {
resp.PrevKv = &rr.KVs[0]
}
}

resp.Header.Revision = txn.Put(p.Key, val, leaseID)
return resp, nil
}

func (a *applierV3backend) DeleteRange(txnID int64, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
resp := &pb.DeleteRangeResponse{}
resp.Header = &pb.ResponseHeader{}

var (
n int64
rev int64
err error
)
if txn == nil {
txn = a.s.kv.Write()
defer txn.End()
}

if isGteRange(dr.RangeEnd) {
dr.RangeEnd = []byte{}
}

var rr *mvcc.RangeResult
if dr.PrevKv {
if txnID != noTxn {
rr, err = a.s.KV().TxnRange(txnID, dr.Key, dr.RangeEnd, mvcc.RangeOptions{})
if err != nil {
return nil, err
}
} else {
rr, err = a.s.KV().Range(dr.Key, dr.RangeEnd, mvcc.RangeOptions{})
if err != nil {
return nil, err
}
}
}

if txnID != noTxn {
n, rev, err = a.s.KV().TxnDeleteRange(txnID, dr.Key, dr.RangeEnd)
rr, err := txn.Range(dr.Key, dr.RangeEnd, mvcc.RangeOptions{})
if err != nil {
return nil, err
}
} else {
n, rev = a.s.KV().DeleteRange(dr.Key, dr.RangeEnd)
}

resp.Deleted = n
if rr != nil {
for i := range rr.KVs {
resp.PrevKvs = append(resp.PrevKvs, &rr.KVs[i])
if rr != nil {
for i := range rr.KVs {
resp.PrevKvs = append(resp.PrevKvs, &rr.KVs[i])
}
}
}
resp.Header.Revision = rev

resp.Deleted, resp.Header.Revision = txn.DeleteRange(dr.Key, dr.RangeEnd)
return resp, nil
}

func (a *applierV3backend) Range(txnID int64, r *pb.RangeRequest) (*pb.RangeResponse, error) {
func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
resp := &pb.RangeResponse{}
resp.Header = &pb.ResponseHeader{}

var (
rr *mvcc.RangeResult
err error
)
if txn == nil {
txn = a.s.kv.Read()
defer txn.End()
}

if isGteRange(r.RangeEnd) {
r.RangeEnd = []byte{}
Expand All @@ -291,16 +250,9 @@ func (a *applierV3backend) Range(txnID int64, r *pb.RangeRequest) (*pb.RangeResp
Count: r.CountOnly,
}

if txnID != noTxn {
rr, err = a.s.KV().TxnRange(txnID, r.Key, r.RangeEnd, ro)
if err != nil {
return nil, err
}
} else {
rr, err = a.s.KV().Range(r.Key, r.RangeEnd, ro)
if err != nil {
return nil, err
}
rr, err := txn.Range(r.Key, r.RangeEnd, ro)
if err != nil {
return nil, err
}

if r.MaxModRevision != 0 {
Expand Down Expand Up @@ -387,23 +339,24 @@ func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
return nil, err
}

// When executing the operations of txn, we need to hold the txn lock.
// So the reader will not see any intermediate results.
txnID := a.s.KV().TxnBegin()

resps := make([]*pb.ResponseOp, len(reqs))

// When executing the operations of txn, etcd must hold the txn lock so
// readers do not see any intermediate results.
// TODO: use Read txn if only Ranges
txn := a.s.KV().Write()
for i := range reqs {
resps[i] = a.applyUnion(txnID, reqs[i])
resps[i] = a.applyUnion(txn, reqs[i])
}

err := a.s.KV().TxnEnd(txnID)
if err != nil {
panic(fmt.Sprint("unexpected error when closing txn", txnID))
rev := txn.Rev()
if len(txn.Changes()) != 0 {
rev++
}
txn.End()

txnResp := &pb.TxnResponse{}
txnResp.Header = &pb.ResponseHeader{}
txnResp.Header.Revision = a.s.KV().Rev()
txnResp.Header.Revision = rev
txnResp.Responses = resps
txnResp.Succeeded = ok
return txnResp, nil
Expand All @@ -417,9 +370,6 @@ func (a *applierV3backend) applyCompare(c *pb.Compare) (int64, bool) {
rev := rr.Rev

if err != nil {
if err == mvcc.ErrTxnIDMismatch {
panic("unexpected txn ID mismatch error")
}
return rev, false
}
var ckv mvccpb.KeyValue
Expand Down Expand Up @@ -483,27 +433,27 @@ func (a *applierV3backend) applyCompare(c *pb.Compare) (int64, bool) {
return rev, true
}

func (a *applierV3backend) applyUnion(txnID int64, union *pb.RequestOp) *pb.ResponseOp {
func (a *applierV3backend) applyUnion(txn mvcc.TxnWrite, union *pb.RequestOp) *pb.ResponseOp {
switch tv := union.Request.(type) {
case *pb.RequestOp_RequestRange:
if tv.RequestRange != nil {
resp, err := a.Range(txnID, tv.RequestRange)
resp, err := a.Range(txn, tv.RequestRange)
if err != nil {
plog.Panicf("unexpected error during txn: %v", err)
}
return &pb.ResponseOp{Response: &pb.ResponseOp_ResponseRange{ResponseRange: resp}}
}
case *pb.RequestOp_RequestPut:
if tv.RequestPut != nil {
resp, err := a.Put(txnID, tv.RequestPut)
resp, err := a.Put(txn, tv.RequestPut)
if err != nil {
plog.Panicf("unexpected error during txn: %v", err)
}
return &pb.ResponseOp{Response: &pb.ResponseOp_ResponsePut{ResponsePut: resp}}
}
case *pb.RequestOp_RequestDeleteRange:
if tv.RequestDeleteRange != nil {
resp, err := a.DeleteRange(txnID, tv.RequestDeleteRange)
resp, err := a.DeleteRange(txn, tv.RequestDeleteRange)
if err != nil {
plog.Panicf("unexpected error during txn: %v", err)
}
Expand Down Expand Up @@ -605,7 +555,7 @@ type applierV3Capped struct {
// with Puts so that the number of keys in the store is capped.
func newApplierV3Capped(base applierV3) applierV3 { return &applierV3Capped{applierV3: base} }

func (a *applierV3Capped) Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse, error) {
func (a *applierV3Capped) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) {
return nil, ErrNoSpace
}

Expand Down Expand Up @@ -699,9 +649,9 @@ func newQuotaApplierV3(s *EtcdServer, app applierV3) applierV3 {
return &quotaApplierV3{app, NewBackendQuota(s)}
}

func (a *quotaApplierV3) Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse, error) {
func (a *quotaApplierV3) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) {
ok := a.q.Available(p)
resp, err := a.applierV3.Put(txnID, p)
resp, err := a.applierV3.Put(txn, p)
if err == nil && !ok {
err = ErrNoSpace
}
Expand Down
Loading