diff --git a/clientv3/integration/leasing_test.go b/clientv3/integration/leasing_test.go new file mode 100644 index 00000000000..317e690f814 --- /dev/null +++ b/clientv3/integration/leasing_test.go @@ -0,0 +1,1939 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// // http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package integration + +import ( + "context" + "fmt" + "math/rand" + "reflect" + "sync" + "testing" + "time" + + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/clientv3/concurrency" + "github.com/coreos/etcd/clientv3/leasing" + "github.com/coreos/etcd/integration" + "github.com/coreos/etcd/pkg/testutil" +) + +func TestLeasingPutGet(t *testing.T) { + defer testutil.AfterTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + lKV1, err := leasing.NewKV(clus.Client(0), "foo/") + lKV2, err := leasing.NewKV(clus.Client(1), "foo/") + lKV3, err := leasing.NewKV(clus.Client(2), "foo/") + + resp, err := lKV1.Get(context.TODO(), "abc") + if err != nil { + t.Fatal(err) + } + if len(resp.Kvs) != 0 { + t.Errorf("expected nil, got %q", resp.Kvs[0].Key) + } + + if _, err = lKV1.Put(context.TODO(), "abc", "def"); err != nil { + t.Fatal(err) + } + if resp, err = lKV2.Get(context.TODO(), "abc"); err != nil { + t.Fatal(err) + } + if string(resp.Kvs[0].Key) != "abc" { + t.Errorf("expected key=%q, got key=%q", "abc", resp.Kvs[0].Key) + } + if string(resp.Kvs[0].Value) != "def" { + t.Errorf("expected value=%q, got value=%q", "bar", resp.Kvs[0].Value) + } + + if _, err = lKV3.Get(context.TODO(), "abc"); err != nil { + t.Fatal(err) + } + if _, err = lKV2.Put(context.TODO(), "abc", "ghi"); err != nil { + t.Fatal(err) + } + + if resp, err = lKV3.Get(context.TODO(), "abc"); err != nil { + t.Fatal(err) + } + if string(resp.Kvs[0].Key) != "abc" { + t.Errorf("expected key=%q, got key=%q", "abc", resp.Kvs[0].Key) + } + + if string(resp.Kvs[0].Value) != "ghi" { + t.Errorf("expected value=%q, got value=%q", "bar", resp.Kvs[0].Value) + } +} + +// TestLeasingInterval checks the leasing KV fetches key intervals. +func TestLeasingInterval(t *testing.T) { + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + lkv, err := leasing.NewKV(clus.Client(0), "pfx/") + if err != nil { + t.Fatal(err) + } + + keys := []string{"abc/a", "abc/b", "abc/a/a"} + for _, k := range keys { + if _, err = clus.Client(0).Put(context.TODO(), k, "v"); err != nil { + t.Fatal(err) + } + } + + resp, err := lkv.Get(context.TODO(), "abc/", clientv3.WithPrefix()) + if err != nil { + t.Fatal(err) + } + if len(resp.Kvs) != 3 { + t.Fatalf("expected keys %+v, got response keys %+v", keys, resp.Kvs) + } + + // load into cache + if resp, err = lkv.Get(context.TODO(), "abc/a"); err != nil { + t.Fatal(err) + } + + // get when prefix is also a cached key + if resp, err = lkv.Get(context.TODO(), "abc/a", clientv3.WithPrefix()); err != nil { + t.Fatal(err) + } + if len(resp.Kvs) != 2 { + t.Fatalf("expected keys %+v, got response keys %+v", keys, resp.Kvs) + } +} + +// TestLeasingPutInvalidateNew checks the leasing KV updates its cache on a Put to a new key. +func TestLeasingPutInvalidateNew(t *testing.T) { + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + lkv, err := leasing.NewKV(clus.Client(0), "pfx/") + if err != nil { + t.Fatal(err) + } + + if _, err = lkv.Get(context.TODO(), "k"); err != nil { + t.Fatal(err) + } + if _, err = lkv.Put(context.TODO(), "k", "v"); err != nil { + t.Fatal(err) + } + + lkvResp, err := lkv.Get(context.TODO(), "k") + if err != nil { + t.Fatal(err) + } + cResp, cerr := clus.Client(0).Get(context.TODO(), "k") + if cerr != nil { + t.Fatal(cerr) + } + if !reflect.DeepEqual(lkvResp, cResp) { + t.Fatalf(`expected %+v, got response %+v`, cResp, lkvResp) + } +} + +// TestLeasingPutInvalidateExisting checks the leasing KV updates its cache on a Put to an existing key. +func TestLeasingPutInvalidatExisting(t *testing.T) { + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + if _, err := clus.Client(0).Put(context.TODO(), "k", "abc"); err != nil { + t.Fatal(err) + } + + lkv, err := leasing.NewKV(clus.Client(0), "pfx/") + if err != nil { + t.Fatal(err) + } + + if _, err = lkv.Get(context.TODO(), "k"); err != nil { + t.Fatal(err) + } + if _, err = lkv.Put(context.TODO(), "k", "v"); err != nil { + t.Fatal(err) + } + + lkvResp, err := lkv.Get(context.TODO(), "k") + if err != nil { + t.Fatal(err) + } + cResp, cerr := clus.Client(0).Get(context.TODO(), "k") + if cerr != nil { + t.Fatal(cerr) + } + if !reflect.DeepEqual(lkvResp, cResp) { + t.Fatalf(`expected %+v, got response %+v`, cResp, lkvResp) + } +} + +// TestLeasingGetSerializable checks the leasing KV can make serialized requests +// when the etcd cluster is partitioned. +func TestLeasingGetSerializable(t *testing.T) { + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2}) + defer clus.Terminate(t) + + lkv, err := leasing.NewKV(clus.Client(0), "pfx/") + if err != nil { + t.Fatal(err) + } + + if _, err = clus.Client(0).Put(context.TODO(), "cached", "abc"); err != nil { + t.Fatal(err) + } + if _, err = lkv.Get(context.TODO(), "cached"); err != nil { + t.Fatal(err) + } + + clus.Members[1].Stop(t) + + // don't necessarily try to acquire leasing key ownership for new key + resp, err := lkv.Get(context.TODO(), "uncached", clientv3.WithSerializable()) + if err != nil { + t.Fatal(err) + } + if len(resp.Kvs) != 0 { + t.Fatalf(`expected no keys, got response %+v`, resp) + } + + clus.Members[0].Stop(t) + + // leasing key ownership should have "cached" locally served + cachedResp, err := lkv.Get(context.TODO(), "cached", clientv3.WithSerializable()) + if err != nil { + t.Fatal(err) + } + if len(cachedResp.Kvs) != 1 || string(cachedResp.Kvs[0].Value) != "abc" { + t.Fatalf(`expected "cached"->"abc", got response %+v`, cachedResp) + } +} + +// TestLeasingPrevKey checks the cache respects the PrevKV flag on puts. +func TestLeasingPrevKey(t *testing.T) { + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2}) + defer clus.Terminate(t) + + lkv, err := leasing.NewKV(clus.Client(0), "pfx/") + if err != nil { + t.Fatal(err) + } + if _, err = clus.Client(0).Put(context.TODO(), "k", "abc"); err != nil { + t.Fatal(err) + } + // fetch without prevkv to acquire leasing key + if _, err = lkv.Get(context.TODO(), "k"); err != nil { + t.Fatal(err) + } + // fetch prevkv via put + resp, err := lkv.Put(context.TODO(), "k", "def", clientv3.WithPrevKV()) + if err != nil { + t.Fatal(err) + } + if resp.PrevKv == nil || string(resp.PrevKv.Value) != "abc" { + t.Fatalf(`expected PrevKV.Value="abc", got response %+v`, resp) + } +} + +// TestLeasingRevGet checks the cache respects Get by Revision. +func TestLeasingRevGet(t *testing.T) { + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + lkv, err := leasing.NewKV(clus.Client(0), "pfx/") + if err != nil { + t.Fatal(err) + } + + putResp, err := clus.Client(0).Put(context.TODO(), "k", "abc") + if err != nil { + t.Fatal(err) + } + if _, err = clus.Client(0).Put(context.TODO(), "k", "def"); err != nil { + t.Fatal(err) + } + + // check historic revision + getResp, gerr := lkv.Get(context.TODO(), "k", clientv3.WithRev(putResp.Header.Revision)) + if gerr != nil { + t.Fatal(gerr) + } + if len(getResp.Kvs) != 1 || string(getResp.Kvs[0].Value) != "abc" { + t.Fatalf(`expeted "k"->"abc" at rev=%d, got response %+v`, putResp.Header.Revision, getResp) + } + // check current revision + getResp, gerr = lkv.Get(context.TODO(), "k") + if gerr != nil { + t.Fatal(gerr) + } + if len(getResp.Kvs) != 1 || string(getResp.Kvs[0].Value) != "def" { + t.Fatalf(`expeted "k"->"abc" at rev=%d, got response %+v`, putResp.Header.Revision, getResp) + } +} + +// TestLeasingGetWithOpts checks options that can be served through the cache do not depend on the server. +func TestLeasingGetWithOpts(t *testing.T) { + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + lkv, err := leasing.NewKV(clus.Client(0), "pfx/") + if err != nil { + t.Fatal(err) + } + if _, err = clus.Client(0).Put(context.TODO(), "k", "abc"); err != nil { + t.Fatal(err) + } + // in cache + if _, err = lkv.Get(context.TODO(), "k", clientv3.WithKeysOnly()); err != nil { + t.Fatal(err) + } + + clus.Members[0].Stop(t) + + opts := []clientv3.OpOption{ + clientv3.WithKeysOnly(), + clientv3.WithLimit(1), + clientv3.WithMinCreateRev(1), + clientv3.WithMinModRev(1), + clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend), + clientv3.WithSerializable(), + } + for _, opt := range opts { + if _, err := lkv.Get(context.TODO(), "k", opt); err != nil { + t.Fatal(err) + } + } + + getOpts := []clientv3.OpOption{} + for i := 0; i < len(opts); i++ { + getOpts = append(getOpts, opts[rand.Intn(len(opts))]) + } + getOpts = getOpts[:rand.Intn(len(opts))] + if _, err := lkv.Get(context.TODO(), "k", getOpts...); err != nil { + t.Fatal(err) + } +} + +// TestLeasingConcurrentPut ensures that a get after concurrent puts returns +// the recently put data. +func TestLeasingConcurrentPut(t *testing.T) { + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + lkv, err := leasing.NewKV(clus.Client(0), "pfx/") + if err != nil { + t.Fatal(err) + } + // force key into leasing key cache + if _, err = lkv.Get(context.TODO(), "k"); err != nil { + t.Fatal(err) + } + + // concurrently put through leasing client + numPuts := 16 + putc := make(chan *clientv3.PutResponse, numPuts) + for i := 0; i < numPuts; i++ { + go func() { + resp, perr := lkv.Put(context.TODO(), "k", "abc") + if perr != nil { + t.Fatal(perr) + } + putc <- resp + }() + } + // record maximum revision from puts + maxRev := int64(0) + for i := 0; i < numPuts; i++ { + if resp := <-putc; resp.Header.Revision > maxRev { + maxRev = resp.Header.Revision + } + } + + // confirm Get gives most recently put revisions + getResp, gerr := lkv.Get(context.TODO(), "k") + if gerr != nil { + t.Fatal(err) + } + if mr := getResp.Kvs[0].ModRevision; mr != maxRev { + t.Errorf("expected ModRevision %d, got %d", maxRev, mr) + } + if ver := getResp.Kvs[0].Version; ver != int64(numPuts) { + t.Errorf("expected Version %d, got %d", numPuts, ver) + } +} + +func TestLeasingDisconnectedGet(t *testing.T) { + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + lkv, err := leasing.NewKV(clus.Client(0), "pfx/") + if err != nil { + t.Fatal(err) + } + if _, err = clus.Client(0).Put(context.TODO(), "cached", "abc"); err != nil { + t.Fatal(err) + } + // get key so it's cached + if _, err = lkv.Get(context.TODO(), "cached"); err != nil { + t.Fatal(err) + } + + clus.Members[0].Stop(t) + + // leasing key ownership should have "cached" locally served + cachedResp, err := lkv.Get(context.TODO(), "cached") + if err != nil { + t.Fatal(err) + } + if len(cachedResp.Kvs) != 1 || string(cachedResp.Kvs[0].Value) != "abc" { + t.Fatalf(`expected "cached"->"abc", got response %+v`, cachedResp) + } +} + +func TestLeasingDeleteOwner(t *testing.T) { + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + lkv, err := leasing.NewKV(clus.Client(0), "pfx/") + if err != nil { + t.Fatal(err) + } + if _, err = clus.Client(0).Put(context.TODO(), "k", "abc"); err != nil { + t.Fatal(err) + } + + // get+own / delete / get + if _, err = lkv.Get(context.TODO(), "k"); err != nil { + t.Fatal(err) + } + if _, err = lkv.Delete(context.TODO(), "k"); err != nil { + t.Fatal(err) + } + resp, err := lkv.Get(context.TODO(), "k") + if err != nil { + t.Fatal(err) + } + + if len(resp.Kvs) != 0 { + t.Fatalf(`expected "k" to be deleted, got response %+v`, resp) + } + // try to double delete + if _, err = lkv.Delete(context.TODO(), "k"); err != nil { + t.Fatal(err) + } +} + +func TestLeasingDeleteNonOwner(t *testing.T) { + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + lkv1, err := leasing.NewKV(clus.Client(0), "pfx/") + if err != nil { + t.Fatal(err) + } + lkv2, err := leasing.NewKV(clus.Client(0), "pfx/") + if err != nil { + t.Fatal(err) + } + + if _, err = clus.Client(0).Put(context.TODO(), "k", "abc"); err != nil { + t.Fatal(err) + } + // acquire ownership + if _, err = lkv1.Get(context.TODO(), "k"); err != nil { + t.Fatal(err) + } + // delete via non-owner + if _, err = lkv2.Delete(context.TODO(), "k"); err != nil { + t.Fatal(err) + } + + // key should be removed from lkv1 + resp, err := lkv1.Get(context.TODO(), "k") + if err != nil { + t.Fatal(err) + } + if len(resp.Kvs) != 0 { + t.Fatalf(`expected "k" to be deleted, got response %+v`, resp) + } +} + +func TestLeasingOverwriteResponse(t *testing.T) { + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + lkv, err := leasing.NewKV(clus.Client(0), "pfx/") + if err != nil { + t.Fatal(err) + } + if _, err = clus.Client(0).Put(context.TODO(), "k", "abc"); err != nil { + t.Fatal(err) + } + + resp, err := lkv.Get(context.TODO(), "k") + if err != nil { + t.Fatal(err) + } + + resp.Kvs[0].Key[0] = 'z' + resp.Kvs[0].Value[0] = 'z' + + resp, err = lkv.Get(context.TODO(), "k") + if err != nil { + t.Fatal(err) + } + + if string(resp.Kvs[0].Key) != "k" { + t.Errorf(`expected key "k", got %q`, string(resp.Kvs[0].Key)) + } + if string(resp.Kvs[0].Value) != "abc" { + t.Errorf(`expected value "abc", got %q`, string(resp.Kvs[0].Value)) + } +} + +func TestLeasingOwnerPutResponse(t *testing.T) { + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + lkv, err := leasing.NewKV(clus.Client(0), "pfx/") + if err != nil { + t.Fatal(err) + } + if _, err = clus.Client(0).Put(context.TODO(), "k", "abc"); err != nil { + t.Fatal(err) + } + gresp, gerr := lkv.Get(context.TODO(), "k") + if gerr != nil { + t.Fatal(gerr) + } + presp, err := lkv.Put(context.TODO(), "k", "def") + if err != nil { + t.Fatal(err) + } + if presp == nil { + t.Fatal("expected put response, got nil") + } + + clus.Members[0].Stop(t) + + gresp, gerr = lkv.Get(context.TODO(), "k") + if gerr != nil { + t.Fatal(gerr) + } + if gresp.Kvs[0].ModRevision != presp.Header.Revision { + t.Errorf("expected mod revision %d, got %d", presp.Header.Revision, gresp.Kvs[0].ModRevision) + } + if gresp.Kvs[0].Version != 2 { + t.Errorf("expected version 2, got version %d", gresp.Kvs[0].Version) + } +} + +func TestLeasingTxnOwnerGetRange(t *testing.T) { + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + lkv, err := leasing.NewKV(clus.Client(0), "pfx/") + if err != nil { + t.Fatal(err) + } + + keyCount := rand.Intn(10) + 1 + for i := 0; i < keyCount; i++ { + k := fmt.Sprintf("k-%d", i) + if _, err := clus.Client(0).Put(context.TODO(), k, k+k); err != nil { + t.Fatal(err) + } + } + if _, err := lkv.Get(context.TODO(), "k-"); err != nil { + t.Fatal(err) + } + + tresp, terr := lkv.Txn(context.TODO()).Then(clientv3.OpGet("k-", clientv3.WithPrefix())).Commit() + if terr != nil { + t.Fatal(terr) + } + if resp := tresp.Responses[0].GetResponseRange(); len(resp.Kvs) != keyCount { + t.Fatalf("expected %d keys, got response %+v", keyCount, resp.Kvs) + } +} + +func TestLeasingTxnOwnerGet(t *testing.T) { + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + lkv, err := leasing.NewKV(clus.Client(0), "pfx/") + if err != nil { + t.Fatal(err) + } + + keyCount := rand.Intn(10) + 1 + var ops []clientv3.Op + presps := make([]*clientv3.PutResponse, keyCount) + for i := range presps { + k := fmt.Sprintf("k-%d", i) + presp, err := clus.Client(0).Put(context.TODO(), k, k+k) + if err != nil { + t.Fatal(err) + } + presps[i] = presp + + if _, err = lkv.Get(context.TODO(), k); err != nil { + t.Fatal(err) + } + ops = append(ops, clientv3.OpGet(k)) + } + ops = ops[:rand.Intn(len(ops))] + + // served through cache + clus.Members[0].Stop(t) + + var thenOps, elseOps []clientv3.Op + cmps, useThen := randCmps("k-", presps) + + if useThen { + + thenOps = ops + elseOps = []clientv3.Op{clientv3.OpPut("k", "1")} + } else { + thenOps = []clientv3.Op{clientv3.OpPut("k", "1")} + elseOps = ops + } + + tresp, terr := lkv.Txn(context.TODO()). + If(cmps...). + Then(thenOps...). + Else(elseOps...).Commit() + + if terr != nil { + t.Fatal(terr) + } + if tresp.Succeeded != useThen { + t.Fatalf("expected succeeded=%v, got tresp=%+v", useThen, tresp) + } + if len(tresp.Responses) != len(ops) { + t.Fatalf("expected %d responses, got %d", len(ops), len(tresp.Responses)) + } + wrev := presps[len(presps)-1].Header.Revision + if tresp.Header.Revision < wrev { + t.Fatalf("expected header revision >= %d, got %d", wrev, tresp.Header.Revision) + } + for i := range ops { + k := fmt.Sprintf("k-%d", i) + rr := tresp.Responses[i].GetResponseRange() + if rr == nil { + t.Errorf("expected get response, got %+v", tresp.Responses[i]) + } + if string(rr.Kvs[0].Key) != k || string(rr.Kvs[0].Value) != k+k { + t.Errorf(`expected key for %q, got %+v`, k, rr.Kvs) + } + } +} + +func TestLeasingTxnOwnerIf(t *testing.T) { + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + lkv, err := leasing.NewKV(clus.Client(0), "pfx/") + if err != nil { + t.Fatal(err) + } + if _, err = clus.Client(0).Put(context.TODO(), "k", "abc"); err != nil { + t.Fatal(err) + } + if _, err = lkv.Get(context.TODO(), "k"); err != nil { + t.Fatal(err) + } + + // served through cache + clus.Members[0].Stop(t) + + tests := []struct { + cmps []clientv3.Cmp + wSucceeded bool + wResponses int + }{ + // success + { + cmps: []clientv3.Cmp{clientv3.Compare(clientv3.Value("k"), "=", "abc")}, + wSucceeded: true, + wResponses: 1, + }, + { + cmps: []clientv3.Cmp{clientv3.Compare(clientv3.CreateRevision("k"), "=", 2)}, + wSucceeded: true, + wResponses: 1, + }, + { + cmps: []clientv3.Cmp{clientv3.Compare(clientv3.ModRevision("k"), "=", 2)}, + wSucceeded: true, + wResponses: 1, + }, + { + cmps: []clientv3.Cmp{clientv3.Compare(clientv3.Version("k"), "=", 1)}, + wSucceeded: true, + wResponses: 1, + }, + // failure + { + cmps: []clientv3.Cmp{clientv3.Compare(clientv3.Value("k"), ">", "abc")}, + }, + { + cmps: []clientv3.Cmp{clientv3.Compare(clientv3.CreateRevision("k"), ">", 2)}, + }, + { + cmps: []clientv3.Cmp{clientv3.Compare(clientv3.ModRevision("k"), "=", 2)}, + wSucceeded: true, + wResponses: 1, + }, + { + cmps: []clientv3.Cmp{clientv3.Compare(clientv3.Version("k"), ">", 1)}, + }, + { + cmps: []clientv3.Cmp{clientv3.Compare(clientv3.Value("k"), "<", "abc")}, + }, + { + cmps: []clientv3.Cmp{clientv3.Compare(clientv3.CreateRevision("k"), "<", 2)}, + }, + { + cmps: []clientv3.Cmp{clientv3.Compare(clientv3.ModRevision("k"), "<", 2)}, + }, + { + cmps: []clientv3.Cmp{clientv3.Compare(clientv3.Version("k"), "<", 1)}, + }, + { + cmps: []clientv3.Cmp{ + clientv3.Compare(clientv3.Version("k"), "=", 1), + clientv3.Compare(clientv3.Version("k"), "<", 1), + }, + }, + } + + for i, tt := range tests { + tresp, terr := lkv.Txn(context.TODO()).If(tt.cmps...).Then(clientv3.OpGet("k")).Commit() + if terr != nil { + t.Fatal(terr) + } + if tresp.Succeeded != tt.wSucceeded { + t.Errorf("#%d: expected succeded %v, got %v", i, tt.wSucceeded, tresp.Succeeded) + } + if len(tresp.Responses) != tt.wResponses { + t.Errorf("#%d: expected %d responses, got %d", i, tt.wResponses, len(tresp.Responses)) + } + } +} + +func TestLeasingTxnCancel(t *testing.T) { + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + lkv1, err := leasing.NewKV(clus.Client(0), "pfx/") + if err != nil { + t.Fatal(err) + } + lkv2, err := leasing.NewKV(clus.Client(1), "pfx/") + if err != nil { + t.Fatal(err) + } + + // acquire lease but disconnect so no revoke in time + if _, err = lkv1.Get(context.TODO(), "k"); err != nil { + t.Fatal(err) + } + clus.Members[0].Stop(t) + + // wait for leader election, if any + if _, err = clus.Client(1).Get(context.TODO(), "abc"); err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithCancel(context.TODO()) + go func() { + time.Sleep(100 * time.Millisecond) + cancel() + }() + if _, err := lkv2.Txn(ctx).Then(clientv3.OpPut("k", "v")).Commit(); err != context.Canceled { + t.Fatalf("expected %v, got %v", context.Canceled, err) + } +} + +func TestLeasingTxnNonOwnerPut(t *testing.T) { + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + lkv, err := leasing.NewKV(clus.Client(0), "pfx/") + if err != nil { + t.Fatal(err) + } + lkv2, err := leasing.NewKV(clus.Client(0), "pfx/") + + if _, err = clus.Client(0).Put(context.TODO(), "k", "abc"); err != nil { + t.Fatal(err) + } + if _, err = clus.Client(0).Put(context.TODO(), "k2", "123"); err != nil { + t.Fatal(err) + } + // cache in lkv + if _, err = lkv.Get(context.TODO(), "k"); err != nil { + t.Fatal(err) + } + if _, err = lkv.Get(context.TODO(), "k2"); err != nil { + t.Fatal(err) + } + // invalidate via lkv2 txn + opArray := make([]clientv3.Op, 0) + opArray = append(opArray, clientv3.OpPut("k2", "456")) + tresp, terr := lkv2.Txn(context.TODO()).Then( + clientv3.OpTxn(nil, opArray, nil), + clientv3.OpPut("k", "def"), + clientv3.OpPut("k3", "999"), // + a key not in any cache + ).Commit() + if terr != nil { + t.Fatal(terr) + } + if !tresp.Succeeded || len(tresp.Responses) != 3 { + t.Fatalf("expected txn success, got %+v", tresp) + } + // check cache was invalidated + gresp, gerr := lkv.Get(context.TODO(), "k") + if gerr != nil { + t.Fatal(err) + } + if len(gresp.Kvs) != 1 || string(gresp.Kvs[0].Value) != "def" { + t.Errorf(`expected value "def", got %+v`, gresp) + } + gresp, gerr = lkv.Get(context.TODO(), "k2") + if gerr != nil { + t.Fatal(gerr) + } + if len(gresp.Kvs) != 1 || string(gresp.Kvs[0].Value) != "456" { + t.Errorf(`expected value "def", got %+v`, gresp) + } + // check puts were applied and are all in the same revision + w := clus.Client(0).Watch( + clus.Client(0).Ctx(), + "k", + clientv3.WithRev(tresp.Header.Revision), + clientv3.WithPrefix()) + wresp := <-w + c := 0 + evs := []clientv3.Event{} + for _, ev := range wresp.Events { + evs = append(evs, *ev) + if ev.Kv.ModRevision == tresp.Header.Revision { + c++ + } + } + if c != 3 { + t.Fatalf("expected 3 put events, got %+v", evs) + } +} + +// TestLeasingTxnRandIfThen randomly leases keys two separate clients, then +// issues a random If/{Then,Else} transaction on those keys to one client. +func TestLeasingTxnRandIfThenOrElse(t *testing.T) { + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + lkv1, err1 := leasing.NewKV(clus.Client(0), "pfx/") + if err1 != nil { + t.Fatal(err1) + } + lkv2, err2 := leasing.NewKV(clus.Client(0), "pfx/") + if err2 != nil { + t.Fatal(err2) + } + + keyCount := 16 + dat := make([]*clientv3.PutResponse, keyCount) + for i := 0; i < keyCount; i++ { + k, v := fmt.Sprintf("k-%d", i), fmt.Sprintf("%d", i) + dat[i], err1 = clus.Client(0).Put(context.TODO(), k, v) + if err1 != nil { + t.Fatal(err1) + } + } + + // nondeterministically populate leasing caches + var wg sync.WaitGroup + getc := make(chan struct{}, keyCount) + getRandom := func(kv clientv3.KV) { + defer wg.Done() + for i := 0; i < keyCount/2; i++ { + k := fmt.Sprintf("k-%d", rand.Intn(keyCount)) + if _, err := kv.Get(context.TODO(), k); err != nil { + t.Fatal(err) + } + getc <- struct{}{} + } + } + wg.Add(2) + defer wg.Wait() + go getRandom(lkv1) + go getRandom(lkv2) + + // random list of comparisons, all true + cmps, useThen := randCmps("k-", dat) + // random list of puts/gets; unique keys + ops := []clientv3.Op{} + usedIdx := make(map[int]struct{}) + for i := 0; i < keyCount; i++ { + idx := rand.Intn(keyCount) + if _, ok := usedIdx[idx]; ok { + continue + } + usedIdx[idx] = struct{}{} + k := fmt.Sprintf("k-%d", idx) + switch rand.Intn(2) { + case 0: + ops = append(ops, clientv3.OpGet(k)) + case 1: + ops = append(ops, clientv3.OpPut(k, "a")) + // TODO: add delete + } + } + // random lengths + ops = ops[:rand.Intn(len(ops))] + + // wait for some gets to populate the leasing caches before committing + for i := 0; i < keyCount/2; i++ { + <-getc + } + + // randomly choose between then and else blocks + var thenOps, elseOps []clientv3.Op + if useThen { + thenOps = ops + } else { + // force failure + elseOps = ops + } + + tresp, terr := lkv1.Txn(context.TODO()).If(cmps...).Then(thenOps...).Else(elseOps...).Commit() + if terr != nil { + t.Fatal(terr) + } + // cmps always succeed + if tresp.Succeeded != useThen { + t.Fatalf("expected succeeded=%v, got tresp=%+v", useThen, tresp) + } + // get should match what was put + checkPuts := func(s string, kv clientv3.KV) { + for _, op := range ops { + if !op.IsPut() { + continue + } + resp, rerr := kv.Get(context.TODO(), string(op.KeyBytes())) + if rerr != nil { + t.Fatal(rerr) + } + if len(resp.Kvs) != 1 || string(resp.Kvs[0].Value) != "a" { + t.Fatalf(`%s: expected value="a", got %+v`, s, resp.Kvs) + } + } + } + checkPuts("client(0)", clus.Client(0)) + checkPuts("lkv1", lkv1) + checkPuts("lkv2", lkv2) +} + +func TestLeasingOwnerPutError(t *testing.T) { + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + lkv, err := leasing.NewKV(clus.Client(0), "pfx/") + if err != nil { + t.Fatal(err) + } + if _, err = lkv.Get(context.TODO(), "k"); err != nil { + t.Fatal(err) + } + + clus.Members[0].Stop(t) + ctx, cancel := context.WithTimeout(context.TODO(), 100*time.Millisecond) + defer cancel() + if resp, err := lkv.Put(ctx, "k", "v"); err == nil { + t.Fatalf("expected error, got response %+v", resp) + } +} + +func TestLeasingOwnerDeleteError(t *testing.T) { + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + lkv, err := leasing.NewKV(clus.Client(0), "pfx/") + if err != nil { + t.Fatal(err) + } + if _, err = lkv.Get(context.TODO(), "k"); err != nil { + t.Fatal(err) + } + + clus.Members[0].Stop(t) + ctx, cancel := context.WithTimeout(context.TODO(), 100*time.Millisecond) + defer cancel() + if resp, err := lkv.Delete(ctx, "k"); err == nil { + t.Fatalf("expected error, got response %+v", resp) + } +} + +func TestLeasingNonOwnerPutError(t *testing.T) { + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + lkv, err := leasing.NewKV(clus.Client(0), "pfx/") + if err != nil { + t.Fatal(err) + } + + clus.Members[0].Stop(t) + ctx, cancel := context.WithTimeout(context.TODO(), 100*time.Millisecond) + defer cancel() + if resp, err := lkv.Put(ctx, "k", "v"); err == nil { + t.Fatalf("expected error, got response %+v", resp) + } +} + +func TestLeasingOwnerDeletePrefix(t *testing.T) { + testLeasingOwnerDelete(t, clientv3.OpDelete("key/", clientv3.WithPrefix())) +} + +func TestLeasingOwnerDeleteFrom(t *testing.T) { + testLeasingOwnerDelete(t, clientv3.OpDelete("kd", clientv3.WithFromKey())) +} + +func testLeasingOwnerDelete(t *testing.T, del clientv3.Op) { + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + lkv, err := leasing.NewKV(clus.Client(0), "0/") + if err != nil { + t.Fatal(err) + } + + for i := 0; i < 8; i++ { + if _, err = clus.Client(0).Put(context.TODO(), fmt.Sprintf("key/%d", i), "123"); err != nil { + t.Fatal(err) + } + } + + if _, err = lkv.Get(context.TODO(), "key/1"); err != nil { + t.Fatal(err) + } + + opResp, delErr := lkv.Do(context.TODO(), del) + if delErr != nil { + t.Fatal(delErr) + } + delResp := opResp.Del() + + // confirm keys are invalidated from cache and deleted on etcd + for i := 0; i < 8; i++ { + resp, err := lkv.Get(context.TODO(), fmt.Sprintf("key/%d", i)) + if err != nil { + t.Fatal(err) + } + if len(resp.Kvs) != 0 { + t.Fatalf("expected no keys on key/%d, got %+v", i, resp) + } + } + + // confirm keys were deleted atomically + + w := clus.Client(0).Watch( + clus.Client(0).Ctx(), + "key/", + clientv3.WithRev(delResp.Header.Revision), + clientv3.WithPrefix()) + + if wresp := <-w; len(wresp.Events) != 8 { + t.Fatalf("expected %d delete events,got %d", 8, len(wresp.Events)) + } +} + +func TestLeasingDeleteRangeBounds(t *testing.T) { + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + delkv, err := leasing.NewKV(clus.Client(0), "0/") + if err != nil { + t.Fatal(err) + } + + getkv, err := leasing.NewKV(clus.Client(0), "0/") + if err != nil { + t.Fatal(err) + } + + for _, k := range []string{"j", "m"} { + if _, err = clus.Client(0).Put(context.TODO(), k, "123"); err != nil { + t.Fatal(err) + } + if _, err = getkv.Get(context.TODO(), k); err != nil { + t.Fatal(err) + } + } + + if _, err = delkv.Delete(context.TODO(), "k", clientv3.WithPrefix()); err != nil { + t.Fatal(err) + } + + // leases still on server? + for _, k := range []string{"j", "m"} { + resp, geterr := clus.Client(0).Get(context.TODO(), "0/"+k, clientv3.WithPrefix()) + if geterr != nil { + t.Fatal(geterr) + } + if len(resp.Kvs) != 1 { + t.Fatalf("expected leasing key, got %+v", resp) + } + } + + // j and m should still have leases registered since not under k* + clus.Members[0].Stop(t) + + if _, err = getkv.Get(context.TODO(), "j"); err != nil { + t.Fatal(err) + } + if _, err = getkv.Get(context.TODO(), "m"); err != nil { + t.Fatal(err) + } +} + +func TestLeasingDeleteRangeContendTxn(t *testing.T) { + then := []clientv3.Op{clientv3.OpDelete("key/", clientv3.WithPrefix())} + testLeasingDeleteRangeContend(t, clientv3.OpTxn(nil, then, nil)) +} + +func TestLeaseDeleteRangeContendDel(t *testing.T) { + op := clientv3.OpDelete("key/", clientv3.WithPrefix()) + testLeasingDeleteRangeContend(t, op) +} + +func testLeasingDeleteRangeContend(t *testing.T, op clientv3.Op) { + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + delkv, err := leasing.NewKV(clus.Client(0), "0/") + if err != nil { + t.Fatal(err) + } + putkv, err := leasing.NewKV(clus.Client(0), "0/") + if err != nil { + t.Fatal(err) + } + + for i := 0; i < 8; i++ { + key := fmt.Sprintf("key/%d", i) + if _, err = clus.Client(0).Put(context.TODO(), key, "123"); err != nil { + t.Fatal(err) + } + if _, err = putkv.Get(context.TODO(), key); err != nil { + t.Fatal(err) + } + } + + ctx, cancel := context.WithCancel(context.TODO()) + donec := make(chan struct{}) + go func() { + defer close(donec) + for i := 0; ctx.Err() == nil; i++ { + key := fmt.Sprintf("key/%d", i%8) + putkv.Put(ctx, key, "123") + putkv.Get(ctx, key) + } + }() + + _, delErr := delkv.Do(context.TODO(), op) + cancel() + <-donec + if delErr != nil { + t.Fatal(delErr) + } + + // confirm keys on non-deleter match etcd + for i := 0; i < 8; i++ { + key := fmt.Sprintf("key/%d", i) + resp, err := putkv.Get(context.TODO(), key) + if err != nil { + t.Fatal(err) + } + servResp, err := clus.Client(0).Get(context.TODO(), key) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(resp.Kvs, servResp.Kvs) { + t.Errorf("#%d: expected %+v, got %+v", i, servResp.Kvs, resp.Kvs) + } + } +} + +func TestLeasingPutGetDeleteConcurrent(t *testing.T) { + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + lkvs := make([]clientv3.KV, 16) + for i := range lkvs { + lkv, err := leasing.NewKV(clus.Client(0), "pfx/") + if err != nil { + t.Fatal(err) + } + lkvs[i] = lkv + } + + getdel := func(kv clientv3.KV) { + if _, err := kv.Put(context.TODO(), "k", "abc"); err != nil { + t.Fatal(err) + } + time.Sleep(time.Millisecond) + if _, err := kv.Get(context.TODO(), "k"); err != nil { + t.Fatal(err) + } + if _, err := kv.Delete(context.TODO(), "k"); err != nil { + t.Fatal(err) + } + time.Sleep(2 * time.Millisecond) + } + + var wg sync.WaitGroup + wg.Add(16) + for i := 0; i < 16; i++ { + go func() { + defer wg.Done() + for _, kv := range lkvs { + getdel(kv) + } + }() + } + wg.Wait() + + resp, err := lkvs[0].Get(context.TODO(), "k") + if err != nil { + t.Fatal(err) + } + + if len(resp.Kvs) > 0 { + t.Fatalf("expected no kvs, got %+v", resp.Kvs) + } + + resp, err = clus.Client(0).Get(context.TODO(), "k") + if err != nil { + t.Fatal(err) + } + if len(resp.Kvs) > 0 { + t.Fatalf("expected no kvs, got %+v", resp.Kvs) + } +} + +// TestLeasingReconnectRevoke checks that revocation works if +// disconnected when trying to submit revoke txn. +func TestLeasingReconnectOwnerRevoke(t *testing.T) { + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + lkv1, err1 := leasing.NewKV(clus.Client(0), "foo/") + if err1 != nil { + t.Fatal(err1) + } + lkv2, err2 := leasing.NewKV(clus.Client(1), "foo/") + if err2 != nil { + t.Fatal(err2) + } + + if _, err := lkv1.Get(context.TODO(), "k"); err != nil { + t.Fatal(err) + } + + // force leader away from member 0 + clus.Members[0].Stop(t) + clus.WaitLeader(t) + clus.Members[0].Restart(t) + + cctx, cancel := context.WithCancel(context.TODO()) + sdonec, pdonec := make(chan struct{}), make(chan struct{}) + // make lkv1 connection choppy so txns fail + go func() { + defer close(sdonec) + for i := 0; i < 10 && cctx.Err() == nil; i++ { + clus.Members[0].Stop(t) + time.Sleep(100 * time.Millisecond) + clus.Members[0].Restart(t) + } + }() + go func() { + defer close(pdonec) + if _, err := lkv2.Put(cctx, "k", "v"); err != nil { + t.Log(err) + } + resp, err := lkv1.Get(cctx, "k") + if err != nil { + t.Fatal(err) + } + if string(resp.Kvs[0].Value) != "v" { + t.Fatalf(`expected "v" value, got %+v`, resp) + } + }() + select { + case <-pdonec: + cancel() + <-sdonec + case <-time.After(5 * time.Second): + cancel() + <-sdonec + <-pdonec + t.Fatal("took to long to revoke and put") + } +} + +// TestLeasingReconnectRevokeCompaction checks that revocation works if +// disconnected and the watch is compacted. +func TestLeasingReconnectOwnerRevokeCompact(t *testing.T) { + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + lkv1, err1 := leasing.NewKV(clus.Client(0), "foo/") + if err1 != nil { + t.Fatal(err1) + } + lkv2, err2 := leasing.NewKV(clus.Client(1), "foo/") + if err2 != nil { + t.Fatal(err2) + } + + if _, err := lkv1.Get(context.TODO(), "k"); err != nil { + t.Fatal(err) + } + + clus.Members[0].Stop(t) + clus.WaitLeader(t) + + // put some more revisions for compaction + presp, err := clus.Client(1).Put(context.TODO(), "a", "123") + if err != nil { + t.Fatal(err) + } + presp, err = clus.Client(1).Put(context.TODO(), "a", "123") + if err != nil { + t.Fatal(err) + } + // compact while lkv1 is disconnected + rev := presp.Header.Revision + if _, err = clus.Client(1).Compact(context.TODO(), rev); err != nil { + t.Fatal(err) + } + + clus.Members[0].Restart(t) + + cctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel() + if _, err = lkv2.Put(cctx, "k", "v"); err != nil { + t.Fatal(err) + } + resp, err := lkv1.Get(cctx, "k") + if err != nil { + t.Fatal(err) + } + if string(resp.Kvs[0].Value) != "v" { + t.Fatalf(`expected "v" value, got %+v`, resp) + } +} + +// TestLeasingReconnectOwnerConsistency checks a write error on an owner will +// not cause inconsistency between the server and the client. +func TestLeasingReconnectOwnerConsistency(t *testing.T) { + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + lkv, err := leasing.NewKV(clus.Client(0), "foo/") + if err != nil { + t.Fatal(err) + } + + if _, err = lkv.Put(context.TODO(), "k", "x"); err != nil { + t.Fatal(err) + } + if _, err = lkv.Put(context.TODO(), "kk", "y"); err != nil { + t.Fatal(err) + } + if _, err = lkv.Get(context.TODO(), "k"); err != nil { + t.Fatal(err) + } + + for i := 0; i < 10; i++ { + v := fmt.Sprintf("%d", i) + donec := make(chan struct{}) + clus.Members[0].DropConnections() + go func() { + defer close(donec) + for i := 0; i < 20; i++ { + clus.Members[0].DropConnections() + time.Sleep(time.Millisecond) + } + }() + switch rand.Intn(7) { + case 0: + _, err = lkv.Put(context.TODO(), "k", v) + case 1: + _, err = lkv.Delete(context.TODO(), "k") + case 2: + txn := lkv.Txn(context.TODO()).Then( + clientv3.OpGet("k"), + clientv3.OpDelete("k"), + ) + _, err = txn.Commit() + case 3: + txn := lkv.Txn(context.TODO()).Then( + clientv3.OpGet("k"), + clientv3.OpPut("k", v), + ) + _, err = txn.Commit() + case 4: + _, err = lkv.Do(context.TODO(), clientv3.OpPut("k", v)) + case 5: + _, err = lkv.Do(context.TODO(), clientv3.OpDelete("k")) + case 6: + _, err = lkv.Delete(context.TODO(), "k", clientv3.WithPrefix()) + } + <-donec + if err != nil { + // TODO wrap input client to generate errors + break + } + } + + lresp, lerr := lkv.Get(context.TODO(), "k") + if lerr != nil { + t.Fatal(lerr) + } + cresp, cerr := clus.Client(0).Get(context.TODO(), "k") + if cerr != nil { + t.Fatal(cerr) + } + if !reflect.DeepEqual(lresp.Kvs, cresp.Kvs) { + t.Fatalf("expected %+v, got %+v", cresp, lresp) + } +} + +func TestLeasingTxnAtomicCache(t *testing.T) { + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + lkv, err := leasing.NewKV(clus.Client(0), "foo/") + if err != nil { + t.Fatal(err) + } + + puts, gets := make([]clientv3.Op, 16), make([]clientv3.Op, 16) + for i := range puts { + k := fmt.Sprintf("k-%d", i) + puts[i], gets[i] = clientv3.OpPut(k, k), clientv3.OpGet(k) + } + if _, err = clus.Client(0).Txn(context.TODO()).Then(puts...).Commit(); err != nil { + t.Fatal(err) + } + for i := range gets { + if _, err = lkv.Do(context.TODO(), gets[i]); err != nil { + t.Fatal(err) + } + } + + numPutters, numGetters := 16, 16 + + var wgPutters, wgGetters sync.WaitGroup + wgPutters.Add(numPutters) + wgGetters.Add(numGetters) + + f := func() { + defer wgPutters.Done() + for i := 0; i < 10; i++ { + if _, txnerr := lkv.Txn(context.TODO()).Then(puts...).Commit(); err != nil { + t.Fatal(txnerr) + } + } + } + + donec := make(chan struct{}, numPutters) + g := func() { + defer wgGetters.Done() + for { + select { + case <-donec: + return + default: + } + tresp, err := lkv.Txn(context.TODO()).Then(gets...).Commit() + if err != nil { + t.Fatal(err) + } + revs := make([]int64, len(gets)) + for i, resp := range tresp.Responses { + rr := resp.GetResponseRange() + revs[i] = rr.Kvs[0].ModRevision + } + for i := 1; i < len(revs); i++ { + if revs[i] != revs[i-1] { + t.Fatalf("expected matching revisions, got %+v", revs) + } + } + } + } + + for i := 0; i < numGetters; i++ { + go g() + } + for i := 0; i < numPutters; i++ { + go f() + } + + wgPutters.Wait() + close(donec) + wgGetters.Wait() +} + +// TestLeasingReconnectTxn checks that txns are resilient to disconnects. +func TestLeasingReconnectTxn(t *testing.T) { + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + lkv, err := leasing.NewKV(clus.Client(0), "foo/") + if err != nil { + t.Fatal(err) + } + if _, err = lkv.Get(context.TODO(), "k"); err != nil { + t.Fatal(err) + } + + donec := make(chan struct{}) + go func() { + defer close(donec) + clus.Members[0].DropConnections() + for i := 0; i < 10; i++ { + clus.Members[0].DropConnections() + time.Sleep(time.Millisecond) + } + }() + + _, lerr := lkv.Txn(context.TODO()). + If(clientv3.Compare(clientv3.Version("k"), "=", 0)). + Then(clientv3.OpGet("k")). + Commit() + <-donec + if lerr != nil { + t.Fatal(lerr) + } +} + +// TestLeasingReconnectNonOwnerGet checks a get error on an owner will +// not cause inconsistency between the server and the client. +func TestLeasingReconnectNonOwnerGet(t *testing.T) { + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + lkv, err := leasing.NewKV(clus.Client(0), "foo/") + if err != nil { + t.Fatal(err) + } + + // populate a few keys so some leasing gets have keys + for i := 0; i < 4; i++ { + k := fmt.Sprintf("k-%d", i*2) + if _, err = lkv.Put(context.TODO(), k, k[2:]); err != nil { + t.Fatal(err) + } + } + + n := 0 + for i := 0; i < 10; i++ { + donec := make(chan struct{}) + clus.Members[0].DropConnections() + go func() { + defer close(donec) + for j := 0; j < 10; j++ { + clus.Members[0].DropConnections() + time.Sleep(time.Millisecond) + } + }() + _, err = lkv.Get(context.TODO(), fmt.Sprintf("k-%d", i)) + <-donec + n++ + if err != nil { + break + } + } + for i := 0; i < n; i++ { + k := fmt.Sprintf("k-%d", i) + lresp, lerr := lkv.Get(context.TODO(), k) + if lerr != nil { + t.Fatal(lerr) + } + cresp, cerr := clus.Client(0).Get(context.TODO(), k) + if cerr != nil { + t.Fatal(cerr) + } + if !reflect.DeepEqual(lresp.Kvs, cresp.Kvs) { + t.Fatalf("expected %+v, got %+v", cresp, lresp) + } + } +} + +func TestLeasingTxnRangeCmp(t *testing.T) { + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + lkv, err := leasing.NewKV(clus.Client(0), "foo/") + if err != nil { + t.Fatal(err) + } + + if _, err = clus.Client(0).Put(context.TODO(), "k", "a"); err != nil { + t.Fatal(err) + } + // k2 version = 2 + if _, err = clus.Client(0).Put(context.TODO(), "k2", "a"); err != nil { + t.Fatal(err) + } + if _, err = clus.Client(0).Put(context.TODO(), "k2", "a"); err != nil { + t.Fatal(err) + } + + // cache k + if _, err = lkv.Get(context.TODO(), "k"); err != nil { + t.Fatal(err) + } + + cmp := clientv3.Compare(clientv3.Version("k").WithPrefix(), "=", 1) + tresp, terr := lkv.Txn(context.TODO()).If(cmp).Commit() + if terr != nil { + t.Fatal(err) + } + if tresp.Succeeded { + t.Fatalf("expected Succeeded=false, got %+v", tresp) + } +} + +func TestLeasingDo(t *testing.T) { + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + lkv, err := leasing.NewKV(clus.Client(0), "foo/") + if err != nil { + t.Fatal(err) + } + + ops := []clientv3.Op{ + clientv3.OpTxn(nil, nil, nil), + clientv3.OpGet("a"), + clientv3.OpPut("a/abc", "v"), + clientv3.OpDelete("a", clientv3.WithPrefix()), + clientv3.OpTxn(nil, nil, nil), + } + for i, op := range ops { + resp, resperr := lkv.Do(context.TODO(), op) + if resperr != nil { + t.Errorf("#%d: failed (%v)", i, resperr) + } + switch { + case op.IsGet() && resp.Get() == nil: + t.Errorf("#%d: get but nil get response", i) + case op.IsPut() && resp.Put() == nil: + t.Errorf("#%d: put op but nil get response", i) + case op.IsDelete() && resp.Del() == nil: + t.Errorf("#%d: delete op but nil delete response", i) + case op.IsTxn() && resp.Txn() == nil: + t.Errorf("#%d: txn op but nil txn response", i) + } + } + + gresp, err := clus.Client(0).Get(context.TODO(), "a", clientv3.WithPrefix()) + if err != nil { + t.Fatal(err) + } + if len(gresp.Kvs) != 0 { + t.Fatalf("expected no keys, got %+v", gresp.Kvs) + } +} + +func TestLeasingTxnOwnerPutBranch(t *testing.T) { + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + lkv, err := leasing.NewKV(clus.Client(0), "foo/") + if err != nil { + t.Fatal(err) + } + + n := 0 + treeOp := makePutTreeOp("tree", &n, 4) + for i := 0; i < n; i++ { + k := fmt.Sprintf("tree/%d", i) + if _, err = clus.Client(0).Put(context.TODO(), k, "a"); err != nil { + t.Fatal(err) + } + if _, err = lkv.Get(context.TODO(), k); err != nil { + t.Fatal(err) + } + } + + if _, err = lkv.Do(context.TODO(), treeOp); err != nil { + t.Fatal(err) + } + + // lkv shouldn't need to call out to server for updated leased keys + clus.Members[0].Stop(t) + + for i := 0; i < n; i++ { + k := fmt.Sprintf("tree/%d", i) + lkvResp, err := lkv.Get(context.TODO(), k) + if err != nil { + t.Fatal(err) + } + clusResp, err := clus.Client(1).Get(context.TODO(), k) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(clusResp.Kvs, lkvResp.Kvs) { + t.Fatalf("expected %+v, got %+v", clusResp.Kvs, lkvResp.Kvs) + } + } +} + +func makePutTreeOp(pfx string, v *int, depth int) clientv3.Op { + key := fmt.Sprintf("%s/%d", pfx, *v) + *v = *v + 1 + if depth == 0 { + return clientv3.OpPut(key, "leaf") + } + + t, e := makePutTreeOp(pfx, v, depth-1), makePutTreeOp(pfx, v, depth-1) + tPut, ePut := clientv3.OpPut(key, "then"), clientv3.OpPut(key, "else") + + cmps := make([]clientv3.Cmp, 1) + if rand.Intn(2) == 0 { + // follow then path + cmps[0] = clientv3.Compare(clientv3.Version("nokey"), "=", 0) + } else { + // follow else path + cmps[0] = clientv3.Compare(clientv3.Version("nokey"), ">", 0) + } + + return clientv3.OpTxn(cmps, []clientv3.Op{t, tPut}, []clientv3.Op{e, ePut}) +} + +func randCmps(pfx string, dat []*clientv3.PutResponse) (cmps []clientv3.Cmp, then bool) { + for i := 0; i < len(dat); i++ { + idx := rand.Intn(len(dat)) + k := fmt.Sprintf("%s%d", pfx, idx) + rev := dat[idx].Header.Revision + var cmp clientv3.Cmp + switch rand.Intn(4) { + case 0: + cmp = clientv3.Compare(clientv3.CreateRevision(k), ">", rev-1) + case 1: + cmp = clientv3.Compare(clientv3.Version(k), "=", 1) + case 2: + cmp = clientv3.Compare(clientv3.CreateRevision(k), "=", rev) + case 3: + cmp = clientv3.Compare(clientv3.CreateRevision(k), "!=", rev+1) + + } + cmps = append(cmps, cmp) + } + cmps = cmps[:rand.Intn(len(dat))] + if rand.Intn(2) == 0 { + return cmps, true + } + i := rand.Intn(len(dat)) + cmps = append(cmps, clientv3.Compare(clientv3.Version(fmt.Sprintf("k-%d", i)), "=", 0)) + return cmps, false +} + +func TestLeasingSessionExpire(t *testing.T) { + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + lkv, err := leasing.NewKV(clus.Client(0), "foo/", concurrency.WithTTL(1)) + if err != nil { + t.Fatal(err) + } + lkv2, err := leasing.NewKV(clus.Client(0), "foo/") + if err != nil { + t.Fatal(err) + } + + // acquire lease on abc + if _, err = lkv.Get(context.TODO(), "abc"); err != nil { + t.Fatal(err) + } + + // down endpoint lkv uses for keepalives + clus.Members[0].Stop(t) + if err := waitForLeasingExpire(clus.Client(1), "foo/abc"); err != nil { + t.Fatal(err) + } + waitForExpireAck(t, lkv) + clus.Members[0].Restart(t) + + if _, err = lkv2.Put(context.TODO(), "abc", "def"); err != nil { + t.Fatal(err) + } + + resp, err := lkv.Get(context.TODO(), "abc") + if err != nil { + t.Fatal(err) + } + if v := string(resp.Kvs[0].Value); v != "def" { + t.Fatalf("expected %q, got %q", "v", v) + } +} + +func TestLeasingSessionExpireCancel(t *testing.T) { + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + tests := []func(context.Context, clientv3.KV) error{ + func(ctx context.Context, kv clientv3.KV) error { + _, err := kv.Get(ctx, "abc") + return err + }, + func(ctx context.Context, kv clientv3.KV) error { + _, err := kv.Delete(ctx, "abc") + return err + }, + func(ctx context.Context, kv clientv3.KV) error { + _, err := kv.Put(ctx, "abc", "v") + return err + }, + func(ctx context.Context, kv clientv3.KV) error { + _, err := kv.Txn(ctx).Then(clientv3.OpGet("abc")).Commit() + return err + }, + func(ctx context.Context, kv clientv3.KV) error { + _, err := kv.Do(ctx, clientv3.OpPut("abc", "v")) + return err + }, + func(ctx context.Context, kv clientv3.KV) error { + _, err := kv.Do(ctx, clientv3.OpDelete("abc")) + return err + }, + func(ctx context.Context, kv clientv3.KV) error { + _, err := kv.Do(ctx, clientv3.OpGet("abc")) + return err + }, + func(ctx context.Context, kv clientv3.KV) error { + op := clientv3.OpTxn(nil, []clientv3.Op{clientv3.OpGet("abc")}, nil) + _, err := kv.Do(ctx, op) + return err + }, + } + for i := range tests { + lkv, err := leasing.NewKV(clus.Client(0), "foo/", concurrency.WithTTL(1)) + if err != nil { + t.Fatal(err) + } + if _, err = lkv.Get(context.TODO(), "abc"); err != nil { + t.Fatal(err) + } + + // down endpoint lkv uses for keepalives + clus.Members[0].Stop(t) + if err := waitForLeasingExpire(clus.Client(1), "foo/abc"); err != nil { + t.Fatal(err) + } + waitForExpireAck(t, lkv) + + ctx, cancel := context.WithCancel(context.TODO()) + errc := make(chan error, 1) + go func() { errc <- tests[i](ctx, lkv) }() + // some delay to get past for ctx.Err() != nil {} loops + time.Sleep(100 * time.Millisecond) + cancel() + + select { + case err := <-errc: + if err != ctx.Err() { + t.Errorf("#%d: expected %v, got %v", i, ctx.Err(), err) + } + case <-time.After(5 * time.Second): + t.Errorf("#%d: timed out waiting for cancel", i) + } + clus.Members[0].Restart(t) + } +} + +func waitForLeasingExpire(kv clientv3.KV, lkey string) error { + for { + time.Sleep(1 * time.Second) + resp, err := kv.Get(context.TODO(), lkey, clientv3.WithPrefix()) + if err != nil { + return err + } + if len(resp.Kvs) == 0 { + // server expired the leasing key + return nil + } + } +} + +func waitForExpireAck(t *testing.T, kv clientv3.KV) { + // wait for leasing client to acknowledge lost lease + for i := 0; i < 10; i++ { + ctx, cancel := context.WithTimeout(context.TODO(), time.Second) + _, err := kv.Get(ctx, "abc") + cancel() + if err == ctx.Err() { + return + } + time.Sleep(time.Second) + } + t.Fatalf("waited too long to acknlowedge lease expiration") +} diff --git a/clientv3/kv.go b/clientv3/kv.go index 418f6c32c1c..b0557aa29de 100644 --- a/clientv3/kv.go +++ b/clientv3/kv.go @@ -74,16 +74,16 @@ func (op OpResponse) Get() *GetResponse { return op.get } func (op OpResponse) Del() *DeleteResponse { return op.del } func (op OpResponse) Txn() *TxnResponse { return op.txn } -func (resp *PutResponse) ToOpResponse() OpResponse { +func (resp *PutResponse) OpResponse() OpResponse { return OpResponse{put: resp} } -func (resp *GetResponse) ToOpResponse() OpResponse { +func (resp *GetResponse) OpResponse() OpResponse { return OpResponse{get: resp} } -func (resp *DeleteResponse) ToOpResponse() OpResponse { +func (resp *DeleteResponse) OpResponse() OpResponse { return OpResponse{del: resp} } -func (resp *TxnResponse) ToOpResponse() OpResponse { +func (resp *TxnResponse) OpResponse() OpResponse { return OpResponse{txn: resp} } diff --git a/clientv3/leasing/cache.go b/clientv3/leasing/cache.go new file mode 100644 index 00000000000..8d2c482772f --- /dev/null +++ b/clientv3/leasing/cache.go @@ -0,0 +1,306 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package leasing + +import ( + "strings" + "sync" + "time" + + v3 "github.com/coreos/etcd/clientv3" + v3pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/mvcc/mvccpb" + "golang.org/x/net/context" +) + +const revokeBackoff = 2 * time.Second + +type leaseCache struct { + mu sync.RWMutex + entries map[string]*leaseKey + revokes map[string]time.Time + header *v3pb.ResponseHeader +} + +type leaseKey struct { + response *v3.GetResponse + // rev is the leasing key revision. + rev int64 + waitc chan struct{} +} + +func (lc *leaseCache) Rev(key string) int64 { + lc.mu.RLock() + defer lc.mu.RUnlock() + if li := lc.entries[key]; li != nil { + return li.rev + } + return 0 +} + +func (lc *leaseCache) Lock(key string) (chan<- struct{}, int64) { + lc.mu.Lock() + defer lc.mu.Unlock() + if li := lc.entries[key]; li != nil { + li.waitc = make(chan struct{}) + return li.waitc, li.rev + } + return nil, 0 +} + +func (lc *leaseCache) LockRange(begin, end string) (ret []chan<- struct{}) { + lc.mu.Lock() + defer lc.mu.Unlock() + for k, li := range lc.entries { + if inRange(k, begin, end) { + li.waitc = make(chan struct{}) + ret = append(ret, li.waitc) + } + } + return ret +} + +func inRange(k, begin, end string) bool { + if strings.Compare(k, begin) < 0 { + return false + } + if end != "\x00" && strings.Compare(k, end) >= 0 { + return false + } + return true +} + +func (lc *leaseCache) LockWriteOps(ops []v3.Op) (ret []chan<- struct{}) { + for _, op := range ops { + if op.IsGet() { + continue + } + key := string(op.KeyBytes()) + if end := string(op.RangeBytes()); end == "" { + if wc, _ := lc.Lock(key); wc != nil { + ret = append(ret, wc) + } + } else { + for k := range lc.entries { + if !inRange(k, key, end) { + continue + } + if wc, _ := lc.Lock(k); wc != nil { + ret = append(ret, wc) + } + } + } + } + return ret +} + +func (lc *leaseCache) NotifyOps(ops []v3.Op) (wcs []<-chan struct{}) { + for _, op := range ops { + if op.IsGet() { + if _, wc := lc.notify(string(op.KeyBytes())); wc != nil { + wcs = append(wcs, wc) + } + } + } + return wcs +} + +func (lc *leaseCache) MayAcquire(key string) bool { + lc.mu.RLock() + lr, ok := lc.revokes[key] + lc.mu.RUnlock() + return !ok || time.Since(lr) > revokeBackoff +} + +func (lc *leaseCache) Add(key string, resp *v3.GetResponse, op v3.Op) *v3.GetResponse { + lk := &leaseKey{resp, resp.Header.Revision, closedCh} + lc.mu.Lock() + if lc.header == nil || lc.header.Revision < resp.Header.Revision { + lc.header = resp.Header + } + lc.entries[key] = lk + ret := lk.get(op) + lc.mu.Unlock() + return ret +} + +func (lc *leaseCache) Update(key, val []byte, respHeader *v3pb.ResponseHeader) { + li := lc.entries[string(key)] + if li == nil { + return + } + cacheResp := li.response + if len(cacheResp.Kvs) == 0 { + kv := &mvccpb.KeyValue{ + Key: key, + CreateRevision: respHeader.Revision, + } + cacheResp.Kvs = append(cacheResp.Kvs, kv) + cacheResp.Count = 1 + } + cacheResp.Kvs[0].Version++ + if cacheResp.Kvs[0].ModRevision < respHeader.Revision { + cacheResp.Header = respHeader + cacheResp.Kvs[0].ModRevision = respHeader.Revision + cacheResp.Kvs[0].Value = val + } +} + +func (lc *leaseCache) Delete(key string, hdr *v3pb.ResponseHeader) { + lc.mu.Lock() + defer lc.mu.Unlock() + lc.delete(key, hdr) +} + +func (lc *leaseCache) delete(key string, hdr *v3pb.ResponseHeader) { + if li := lc.entries[key]; li != nil && hdr.Revision >= li.response.Header.Revision { + li.response.Kvs = nil + li.response.Header = copyHeader(hdr) + } +} + +func (lc *leaseCache) Evict(key string) (rev int64) { + lc.mu.Lock() + defer lc.mu.Unlock() + if li := lc.entries[key]; li != nil { + rev = li.rev + delete(lc.entries, key) + lc.revokes[key] = time.Now() + } + return rev +} + +func (lc *leaseCache) EvictRange(key, end string) { + lc.mu.Lock() + defer lc.mu.Unlock() + for k := range lc.entries { + if inRange(k, key, end) { + delete(lc.entries, key) + lc.revokes[key] = time.Now() + } + } +} + +func isBadOp(op v3.Op) bool { return op.Rev() > 0 || len(op.RangeBytes()) > 0 } + +func (lc *leaseCache) Get(ctx context.Context, op v3.Op) (*v3.GetResponse, bool) { + if isBadOp(op) { + return nil, false + } + key := string(op.KeyBytes()) + li, wc := lc.notify(key) + if li == nil { + return nil, true + } + select { + case <-wc: + case <-ctx.Done(): + return nil, true + } + lc.mu.RLock() + lk := *li + ret := lk.get(op) + lc.mu.RUnlock() + return ret, true +} + +func (lk *leaseKey) get(op v3.Op) *v3.GetResponse { + ret := *lk.response + ret.Header = copyHeader(ret.Header) + empty := len(ret.Kvs) == 0 || op.IsCountOnly() + empty = empty || (op.MinModRev() > ret.Kvs[0].ModRevision) + empty = empty || (op.MaxModRev() != 0 && op.MaxModRev() < ret.Kvs[0].ModRevision) + empty = empty || (op.MinCreateRev() > ret.Kvs[0].CreateRevision) + empty = empty || (op.MaxCreateRev() != 0 && op.MaxCreateRev() < ret.Kvs[0].CreateRevision) + if empty { + ret.Kvs = nil + } else { + kv := *ret.Kvs[0] + kv.Key = make([]byte, len(kv.Key)) + copy(kv.Key, ret.Kvs[0].Key) + if !op.IsKeysOnly() { + kv.Value = make([]byte, len(kv.Value)) + copy(kv.Value, ret.Kvs[0].Value) + } + ret.Kvs = []*mvccpb.KeyValue{&kv} + } + return &ret +} + +func (lc *leaseCache) notify(key string) (*leaseKey, <-chan struct{}) { + lc.mu.RLock() + defer lc.mu.RUnlock() + if li := lc.entries[key]; li != nil { + return li, li.waitc + } + return nil, nil +} + +func (lc *leaseCache) clearOldRevokes(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case <-time.After(time.Second): + lc.mu.Lock() + for k, lr := range lc.revokes { + if time.Now().Sub(lr.Add(revokeBackoff)) > 0 { + delete(lc.revokes, k) + } + } + lc.mu.Unlock() + } + } +} + +func (lc *leaseCache) evalCmp(cmps []v3.Cmp) (cmpVal bool, ok bool) { + for _, cmp := range cmps { + if len(cmp.RangeEnd) > 0 { + return false, false + } + lk := lc.entries[string(cmp.Key)] + if lk == nil { + return false, false + } + if !evalCmp(lk.response, cmp) { + return false, true + } + } + return true, true +} + +func (lc *leaseCache) evalOps(ops []v3.Op) ([]*v3pb.ResponseOp, bool) { + resps := make([]*v3pb.ResponseOp, len(ops)) + for i, op := range ops { + if !op.IsGet() || isBadOp(op) { + // TODO: support read-only txns + return nil, false + } + lk := lc.entries[string(op.KeyBytes())] + if lk == nil { + return nil, false + } + resp := lk.get(op) + if resp == nil { + return nil, false + } + resps[i] = &v3pb.ResponseOp{ + Response: &v3pb.ResponseOp_ResponseRange{ + (*v3pb.RangeResponse)(resp), + }, + } + } + return resps, true +} diff --git a/clientv3/leasing/doc.go b/clientv3/leasing/doc.go new file mode 100644 index 00000000000..30c3443dfb6 --- /dev/null +++ b/clientv3/leasing/doc.go @@ -0,0 +1,45 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package leasing is a clientv3 wrapper that provides the client exclusive write access to a key by acquiring a lease and be lineraizably +// served locally. This leasing layer can either directly wrap the etcd client or +// it can be exposed through the etcd grace proxy server, granting multiple clients write access. +// +// First, create a leasing client interface: +// +// leasingCli,error = leasing.NewKV(cli.KV, "leasing-prefix") +// if error != nil { +// //handle error +// } +// +// The first range request acquires the lease by adding the leasing key ("leasing-prefix"/key) on the server and stores the key locally. +// Further linearized read requests using 'cli.leasing' will be served locally as long as the lease exists: +// cli.Put(context.TODO(), "abc", "123") +// +// Lease Acquisition: +// leasingCli.Get(context.TODO(), "abc") +// +// Local reads: +// resp,_ := leasingCli.Get(context.TODO(), "abc") +// fmt.Printf("%s\n", resp.Kvs[0].Value) +// //Output: 123 (served locally) +// +// Lease Revocation: +// If a client writes to the key owned by the leasing client,then the leasing client gives up its lease allowing the client to modify the key. +// cli.Put(context.TODO(), "abc", "456") +// resp, _ = leasingCli.Get("abc") +// fmt.Printf("%s\n", resp.Kvs[0].Value) +// // Output: 456 (fetched from server) +// +package leasing diff --git a/clientv3/leasing/kv.go b/clientv3/leasing/kv.go new file mode 100644 index 00000000000..7da812b98be --- /dev/null +++ b/clientv3/leasing/kv.go @@ -0,0 +1,431 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package leasing + +import ( + "strings" + "time" + + v3 "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/clientv3/concurrency" + "github.com/coreos/etcd/mvcc/mvccpb" + + "golang.org/x/net/context" +) + +type leasingKV struct { + cl *v3.Client + kv v3.KV + pfx string + leases leaseCache + ctx context.Context + cancel context.CancelFunc + + sessionOpts []concurrency.SessionOption + session *concurrency.Session + sessionc chan struct{} +} + +var closedCh chan struct{} + +func init() { + closedCh = make(chan struct{}) + close(closedCh) +} + +// NewKV wraps a KV instance so that all requests are wired through a leasing protocol. +func NewKV(cl *v3.Client, pfx string, opts ...concurrency.SessionOption) (v3.KV, error) { + cctx, cancel := context.WithCancel(cl.Ctx()) + lkv := leasingKV{ + cl: cl, + kv: cl.KV, + pfx: pfx, + leases: leaseCache{revokes: make(map[string]time.Time)}, + ctx: cctx, + cancel: cancel, + sessionOpts: opts, + sessionc: make(chan struct{}), + } + go lkv.monitorSession() + go lkv.leases.clearOldRevokes(cctx) + return &lkv, lkv.waitSession(cctx) +} + +func (lkv *leasingKV) Get(ctx context.Context, key string, opts ...v3.OpOption) (*v3.GetResponse, error) { + return lkv.get(ctx, v3.OpGet(key, opts...)) +} + +func (lkv *leasingKV) Put(ctx context.Context, key, val string, opts ...v3.OpOption) (*v3.PutResponse, error) { + return lkv.put(ctx, v3.OpPut(key, val, opts...)) +} + +func (lkv *leasingKV) Delete(ctx context.Context, key string, opts ...v3.OpOption) (*v3.DeleteResponse, error) { + return lkv.delete(ctx, v3.OpDelete(key, opts...)) +} + +func (lkv *leasingKV) Do(ctx context.Context, op v3.Op) (v3.OpResponse, error) { + switch { + case op.IsGet(): + resp, err := lkv.get(ctx, op) + return resp.OpResponse(), err + case op.IsPut(): + resp, err := lkv.put(ctx, op) + return resp.OpResponse(), err + case op.IsDelete(): + resp, err := lkv.delete(ctx, op) + return resp.OpResponse(), err + case op.IsTxn(): + cmps, thenOps, elseOps := op.Txn() + resp, err := lkv.Txn(ctx).If(cmps...).Then(thenOps...).Else(elseOps...).Commit() + return resp.OpResponse(), err + } + return v3.OpResponse{}, nil +} + +func (lkv *leasingKV) Compact(ctx context.Context, rev int64, opts ...v3.CompactOption) (*v3.CompactResponse, error) { + return lkv.kv.Compact(ctx, rev, opts...) +} + +func (lkv *leasingKV) Txn(ctx context.Context) v3.Txn { + return &txnLeasing{Txn: lkv.kv.Txn(ctx), lkv: lkv, ctx: ctx} +} + +func (lkv *leasingKV) monitorSession() { + for lkv.ctx.Err() == nil { + if lkv.session != nil { + select { + case <-lkv.session.Done(): + case <-lkv.ctx.Done(): + return + } + } + lkv.leases.mu.Lock() + select { + case <-lkv.sessionc: + lkv.sessionc = make(chan struct{}) + default: + } + lkv.leases.entries = make(map[string]*leaseKey) + lkv.leases.mu.Unlock() + + s, err := concurrency.NewSession(lkv.cl, lkv.sessionOpts...) + if err != nil { + continue + } + + lkv.leases.mu.Lock() + lkv.session = s + close(lkv.sessionc) + lkv.leases.mu.Unlock() + } +} + +func (lkv *leasingKV) monitorLease(ctx context.Context, key string, rev int64) { + cctx, cancel := context.WithCancel(lkv.ctx) + defer cancel() + for cctx.Err() == nil { + if rev == 0 { + resp, err := lkv.kv.Get(ctx, lkv.pfx+key) + if err != nil { + continue + } + rev = resp.Header.Revision + if len(resp.Kvs) == 0 || string(resp.Kvs[0].Value) == "REVOKE" { + lkv.rescind(cctx, key, rev) + return + } + } + wch := lkv.cl.Watch(cctx, lkv.pfx+key, v3.WithRev(rev+1)) + for resp := range wch { + for _, ev := range resp.Events { + if string(ev.Kv.Value) != "REVOKE" { + continue + } + if v3.LeaseID(ev.Kv.Lease) == lkv.leaseID() { + lkv.rescind(cctx, key, ev.Kv.ModRevision) + } + return + } + } + rev = 0 + } +} + +// rescind releases a lease from this client. +func (lkv *leasingKV) rescind(ctx context.Context, key string, rev int64) { + if lkv.leases.Evict(key) > rev { + return + } + cmp := v3.Compare(v3.CreateRevision(lkv.pfx+key), "<", rev) + op := v3.OpDelete(lkv.pfx + key) + for ctx.Err() == nil { + if _, err := lkv.kv.Txn(ctx).If(cmp).Then(op).Commit(); err == nil { + return + } + } +} + +func (lkv *leasingKV) waitRescind(ctx context.Context, key string, rev int64) error { + cctx, cancel := context.WithCancel(ctx) + defer cancel() + wch := lkv.cl.Watch(cctx, lkv.pfx+key, v3.WithRev(rev+1)) + for resp := range wch { + for _, ev := range resp.Events { + if ev.Type == v3.EventTypeDelete { + return ctx.Err() + } + } + } + return ctx.Err() +} + +func (lkv *leasingKV) tryModifyOp(ctx context.Context, op v3.Op) (*v3.TxnResponse, chan<- struct{}, error) { + key := string(op.KeyBytes()) + wc, rev := lkv.leases.Lock(key) + cmp := v3.Compare(v3.CreateRevision(lkv.pfx+key), "<", rev+1) + resp, err := lkv.kv.Txn(ctx).If(cmp).Then(op).Commit() + switch { + case err != nil: + lkv.leases.Evict(key) + fallthrough + case !resp.Succeeded: + if wc != nil { + close(wc) + } + return nil, nil, err + } + return resp, wc, nil +} + +func (lkv *leasingKV) put(ctx context.Context, op v3.Op) (pr *v3.PutResponse, err error) { + if err := lkv.waitSession(ctx); err != nil { + return nil, err + } + for ctx.Err() == nil { + resp, wc, err := lkv.tryModifyOp(ctx, op) + if err != nil || wc == nil { + resp, err = lkv.revoke(ctx, string(op.KeyBytes()), op) + } + if err != nil { + return nil, err + } + if resp.Succeeded { + lkv.leases.mu.Lock() + lkv.leases.Update(op.KeyBytes(), op.ValueBytes(), resp.Header) + lkv.leases.mu.Unlock() + pr = (*v3.PutResponse)(resp.Responses[0].GetResponsePut()) + pr.Header = resp.Header + } + if wc != nil { + close(wc) + } + if resp.Succeeded { + return pr, nil + } + } + return nil, ctx.Err() +} + +func (lkv *leasingKV) acquire(ctx context.Context, key string, op v3.Op) (*v3.TxnResponse, error) { + if err := lkv.waitSession(ctx); err != nil { + return nil, err + } + return lkv.kv.Txn(ctx).If( + v3.Compare(v3.CreateRevision(lkv.pfx+key), "=", 0)). + Then( + op, + v3.OpPut(lkv.pfx+key, "", v3.WithLease(lkv.leaseID()))). + Else(op). + Commit() +} + +func (lkv *leasingKV) get(ctx context.Context, op v3.Op) (*v3.GetResponse, error) { + do := func() (*v3.GetResponse, error) { + r, err := lkv.kv.Do(ctx, op) + return r.Get(), err + } + if !lkv.readySession() { + return do() + } + + if resp, ok := lkv.leases.Get(ctx, op); resp != nil { + return resp, nil + } else if !ok || op.IsSerializable() { + // must be handled by server or can skip linearization + return do() + } + + key := string(op.KeyBytes()) + if !lkv.leases.MayAcquire(key) { + resp, err := lkv.kv.Do(ctx, op) + return resp.Get(), err + } + + resp, err := lkv.acquire(ctx, key, v3.OpGet(key)) + if err != nil { + return nil, err + } + getResp := (*v3.GetResponse)(resp.Responses[0].GetResponseRange()) + getResp.Header = resp.Header + if resp.Succeeded { + getResp = lkv.leases.Add(key, getResp, op) + go lkv.monitorLease(ctx, key, resp.Header.Revision) + } + return getResp, nil +} + +func (lkv *leasingKV) deleteRangeRPC(ctx context.Context, maxLeaseRev int64, key, end string) (*v3.DeleteResponse, error) { + lkey, lend := lkv.pfx+key, lkv.pfx+end + resp, err := lkv.kv.Txn(ctx).If( + v3.Compare(v3.CreateRevision(lkey).WithRange(lend), "<", maxLeaseRev+1), + ).Then( + v3.OpGet(key, v3.WithRange(end), v3.WithKeysOnly()), + v3.OpDelete(key, v3.WithRange(end)), + ).Commit() + if err != nil { + lkv.leases.EvictRange(key, end) + return nil, err + } + if !resp.Succeeded { + return nil, nil + } + for _, kv := range resp.Responses[0].GetResponseRange().Kvs { + lkv.leases.Delete(string(kv.Key), resp.Header) + } + delResp := (*v3.DeleteResponse)(resp.Responses[1].GetResponseDeleteRange()) + delResp.Header = resp.Header + return delResp, nil +} + +func (lkv *leasingKV) deleteRange(ctx context.Context, op v3.Op) (*v3.DeleteResponse, error) { + key, end := string(op.KeyBytes()), string(op.RangeBytes()) + for ctx.Err() == nil { + maxLeaseRev, err := lkv.revokeRange(ctx, key, end) + if err != nil { + return nil, err + } + wcs := lkv.leases.LockRange(key, end) + delResp, err := lkv.deleteRangeRPC(ctx, maxLeaseRev, key, end) + closeAll(wcs) + if err != nil || delResp != nil { + return delResp, err + } + } + return nil, ctx.Err() +} + +func (lkv *leasingKV) delete(ctx context.Context, op v3.Op) (dr *v3.DeleteResponse, err error) { + if err := lkv.waitSession(ctx); err != nil { + return nil, err + } + if len(op.RangeBytes()) > 0 { + return lkv.deleteRange(ctx, op) + } + key := string(op.KeyBytes()) + for ctx.Err() == nil { + resp, wc, err := lkv.tryModifyOp(ctx, op) + if err != nil || wc == nil { + resp, err = lkv.revoke(ctx, key, op) + } + if err != nil { + // don't know if delete was processed + lkv.leases.Evict(key) + return nil, err + } + if resp.Succeeded { + dr = (*v3.DeleteResponse)(resp.Responses[0].GetResponseDeleteRange()) + dr.Header = resp.Header + lkv.leases.Delete(key, dr.Header) + } + if wc != nil { + close(wc) + } + if resp.Succeeded { + return dr, nil + } + } + return nil, ctx.Err() +} + +func (lkv *leasingKV) revoke(ctx context.Context, key string, op v3.Op) (*v3.TxnResponse, error) { + rev := lkv.leases.Rev(key) + txn := lkv.kv.Txn(ctx).If(v3.Compare(v3.CreateRevision(lkv.pfx+key), "<", rev+1)).Then(op) + resp, err := txn.Else(v3.OpPut(lkv.pfx+key, "REVOKE", v3.WithIgnoreLease())).Commit() + if err != nil || resp.Succeeded { + return resp, err + } + return resp, lkv.waitRescind(ctx, key, resp.Header.Revision) +} + +func (lkv *leasingKV) revokeRange(ctx context.Context, begin, end string) (int64, error) { + lkey, lend := lkv.pfx+begin, "" + if len(end) > 0 { + lend = lkv.pfx + end + } + leaseKeys, err := lkv.kv.Get(ctx, lkey, v3.WithRange(lend)) + if err != nil { + return 0, err + } + return lkv.revokeLeaseKvs(ctx, leaseKeys.Kvs) +} + +func (lkv *leasingKV) revokeLeaseKvs(ctx context.Context, kvs []*mvccpb.KeyValue) (int64, error) { + maxLeaseRev := int64(0) + for _, kv := range kvs { + if rev := kv.CreateRevision; rev > maxLeaseRev { + maxLeaseRev = rev + } + if v3.LeaseID(kv.Lease) == lkv.leaseID() { + // don't revoke own keys + continue + } + key := strings.TrimPrefix(string(kv.Key), lkv.pfx) + if _, err := lkv.revoke(ctx, key, v3.OpGet(key)); err != nil { + return 0, err + } + } + return maxLeaseRev, nil +} + +func (lkv *leasingKV) waitSession(ctx context.Context) error { + select { + case <-lkv.sessionc: + return nil + case <-lkv.ctx.Done(): + return lkv.ctx.Err() + case <-ctx.Done(): + return ctx.Err() + } +} + +func (lkv *leasingKV) readySession() bool { + lkv.leases.mu.RLock() + defer lkv.leases.mu.RUnlock() + if lkv.session == nil { + return false + } + select { + case <-lkv.session.Done(): + default: + return true + } + return false +} + +func (lkv *leasingKV) leaseID() v3.LeaseID { + lkv.leases.mu.RLock() + defer lkv.leases.mu.RUnlock() + return lkv.session.Lease() +} diff --git a/clientv3/leasing/txn.go b/clientv3/leasing/txn.go new file mode 100644 index 00000000000..da5b83a8abb --- /dev/null +++ b/clientv3/leasing/txn.go @@ -0,0 +1,223 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package leasing + +import ( + "context" + "strings" + + v3 "github.com/coreos/etcd/clientv3" + v3pb "github.com/coreos/etcd/etcdserver/etcdserverpb" +) + +type txnLeasing struct { + v3.Txn + lkv *leasingKV + ctx context.Context + cs []v3.Cmp + opst []v3.Op + opse []v3.Op +} + +func (txn *txnLeasing) If(cs ...v3.Cmp) v3.Txn { + txn.cs = append(txn.cs, cs...) + txn.Txn = txn.Txn.If(cs...) + return txn +} + +func (txn *txnLeasing) Then(ops ...v3.Op) v3.Txn { + txn.opst = append(txn.opst, ops...) + txn.Txn = txn.Txn.Then(ops...) + return txn +} + +func (txn *txnLeasing) Else(ops ...v3.Op) v3.Txn { + txn.opse = append(txn.opse, ops...) + txn.Txn = txn.Txn.Else(ops...) + return txn +} + +func (txn *txnLeasing) Commit() (*v3.TxnResponse, error) { + if resp, err := txn.eval(); resp != nil || err != nil { + return resp, err + } + return txn.serverTxn() +} + +func (txn *txnLeasing) eval() (*v3.TxnResponse, error) { + // TODO: wait on keys in comparisons + thenOps, elseOps := gatherOps(txn.opst), gatherOps(txn.opse) + ops := make([]v3.Op, 0, len(thenOps)+len(elseOps)) + ops = append(ops, thenOps...) + ops = append(ops, elseOps...) + + for _, ch := range txn.lkv.leases.NotifyOps(ops) { + select { + case <-ch: + case <-txn.ctx.Done(): + return nil, txn.ctx.Err() + } + } + + txn.lkv.leases.mu.RLock() + defer txn.lkv.leases.mu.RUnlock() + succeeded, ok := txn.lkv.leases.evalCmp(txn.cs) + if !ok || txn.lkv.leases.header == nil { + return nil, nil + } + if ops = txn.opst; !succeeded { + ops = txn.opse + } + + resps, ok := txn.lkv.leases.evalOps(ops) + if !ok { + return nil, nil + } + return &v3.TxnResponse{copyHeader(txn.lkv.leases.header), succeeded, resps}, nil +} + +// fallback computes the ops to fetch all possible conflicting +// leasing keys for a list of ops. +func (txn *txnLeasing) fallback(ops []v3.Op) (fbOps []v3.Op) { + for _, op := range ops { + if op.IsGet() { + continue + } + lkey, lend := txn.lkv.pfx+string(op.KeyBytes()), "" + if len(op.RangeBytes()) > 0 { + lend = txn.lkv.pfx + string(op.RangeBytes()) + } + fbOps = append(fbOps, v3.OpGet(lkey, v3.WithRange(lend))) + } + return fbOps +} + +func (txn *txnLeasing) guardKeys(ops []v3.Op) (cmps []v3.Cmp) { + seen := make(map[string]bool) + for _, op := range ops { + key := string(op.KeyBytes()) + if op.IsGet() || len(op.RangeBytes()) != 0 || seen[key] { + continue + } + rev := txn.lkv.leases.Rev(key) + cmps = append(cmps, v3.Compare(v3.CreateRevision(txn.lkv.pfx+key), "<", rev+1)) + seen[key] = true + } + return cmps +} + +func (txn *txnLeasing) guardRanges(ops []v3.Op) (cmps []v3.Cmp, err error) { + for _, op := range ops { + if op.IsGet() || len(op.RangeBytes()) == 0 { + continue + } + + key, end := string(op.KeyBytes()), string(op.RangeBytes()) + maxRevLK, err := txn.lkv.revokeRange(txn.ctx, key, end) + if err != nil { + return nil, err + } + + opts := append(v3.WithLastRev(), v3.WithRange(end)) + getResp, err := txn.lkv.kv.Get(txn.ctx, key, opts...) + if err != nil { + return nil, err + } + maxModRev := int64(0) + if len(getResp.Kvs) > 0 { + maxModRev = getResp.Kvs[0].ModRevision + } + + noKeyUpdate := v3.Compare(v3.ModRevision(key).WithRange(end), "<", maxModRev+1) + noLeaseUpdate := v3.Compare( + v3.CreateRevision(txn.lkv.pfx+key).WithRange(txn.lkv.pfx+end), + "<", + maxRevLK+1) + cmps = append(cmps, noKeyUpdate, noLeaseUpdate) + } + return cmps, nil +} + +func (txn *txnLeasing) guard(ops []v3.Op) ([]v3.Cmp, error) { + cmps := txn.guardKeys(ops) + rangeCmps, err := txn.guardRanges(ops) + return append(cmps, rangeCmps...), err +} + +func (txn *txnLeasing) commitToCache(txnResp *v3pb.TxnResponse, userTxn v3.Op) { + ops := gatherResponseOps(txnResp.Responses, []v3.Op{userTxn}) + txn.lkv.leases.mu.Lock() + for _, op := range ops { + key := string(op.KeyBytes()) + if op.IsDelete() && len(op.RangeBytes()) > 0 { + end := string(op.RangeBytes()) + for k := range txn.lkv.leases.entries { + if inRange(k, key, end) { + txn.lkv.leases.delete(k, txnResp.Header) + } + } + } else if op.IsDelete() { + txn.lkv.leases.delete(key, txnResp.Header) + } + if op.IsPut() { + txn.lkv.leases.Update(op.KeyBytes(), op.ValueBytes(), txnResp.Header) + } + } + txn.lkv.leases.mu.Unlock() +} + +func (txn *txnLeasing) revokeFallback(fbResps []*v3pb.ResponseOp) error { + for _, resp := range fbResps { + _, err := txn.lkv.revokeLeaseKvs(txn.ctx, resp.GetResponseRange().Kvs) + if err != nil { + return err + } + } + return nil +} + +func (txn *txnLeasing) serverTxn() (*v3.TxnResponse, error) { + if err := txn.lkv.waitSession(txn.ctx); err != nil { + return nil, err + } + + userOps := gatherOps(append(txn.opst, txn.opse...)) + userTxn := v3.OpTxn(txn.cs, txn.opst, txn.opse) + fbOps := txn.fallback(userOps) + + defer closeAll(txn.lkv.leases.LockWriteOps(userOps)) + for { + cmps, err := txn.guard(userOps) + if err != nil { + return nil, err + } + resp, err := txn.lkv.kv.Txn(txn.ctx).If(cmps...).Then(userTxn).Else(fbOps...).Commit() + if err != nil { + for _, cmp := range cmps { + txn.lkv.leases.Evict(strings.TrimPrefix(string(cmp.Key), txn.lkv.pfx)) + } + return nil, err + } + if resp.Succeeded { + txn.commitToCache((*v3pb.TxnResponse)(resp), userTxn) + userResp := resp.Responses[0].GetResponseTxn() + userResp.Header = resp.Header + return (*v3.TxnResponse)(userResp), nil + } + if err := txn.revokeFallback(resp.Responses); err != nil { + return nil, err + } + } +} diff --git a/clientv3/leasing/util.go b/clientv3/leasing/util.go new file mode 100644 index 00000000000..61f6e8c33fc --- /dev/null +++ b/clientv3/leasing/util.go @@ -0,0 +1,108 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package leasing + +import ( + "bytes" + + v3 "github.com/coreos/etcd/clientv3" + v3pb "github.com/coreos/etcd/etcdserver/etcdserverpb" +) + +func compareInt64(a, b int64) int { + switch { + case a < b: + return -1 + case a > b: + return 1 + default: + return 0 + } +} + +func evalCmp(resp *v3.GetResponse, tcmp v3.Cmp) bool { + var result int + if len(resp.Kvs) != 0 { + kv := resp.Kvs[0] + switch tcmp.Target { + case v3pb.Compare_VALUE: + if tv, _ := tcmp.TargetUnion.(*v3pb.Compare_Value); tv != nil { + result = bytes.Compare(kv.Value, tv.Value) + } + case v3pb.Compare_CREATE: + if tv, _ := tcmp.TargetUnion.(*v3pb.Compare_CreateRevision); tv != nil { + result = compareInt64(kv.CreateRevision, tv.CreateRevision) + } + case v3pb.Compare_MOD: + if tv, _ := tcmp.TargetUnion.(*v3pb.Compare_ModRevision); tv != nil { + result = compareInt64(kv.ModRevision, tv.ModRevision) + } + case v3pb.Compare_VERSION: + if tv, _ := tcmp.TargetUnion.(*v3pb.Compare_Version); tv != nil { + result = compareInt64(kv.Version, tv.Version) + } + } + } + switch tcmp.Result { + case v3pb.Compare_EQUAL: + return result == 0 + case v3pb.Compare_NOT_EQUAL: + return result != 0 + case v3pb.Compare_GREATER: + return result > 0 + case v3pb.Compare_LESS: + return result < 0 + } + return true +} + +func gatherOps(ops []v3.Op) (ret []v3.Op) { + for _, op := range ops { + if !op.IsTxn() { + ret = append(ret, op) + continue + } + _, thenOps, elseOps := op.Txn() + ret = append(ret, gatherOps(append(thenOps, elseOps...))...) + } + return ret +} + +func gatherResponseOps(resp []*v3pb.ResponseOp, ops []v3.Op) (ret []v3.Op) { + for i, op := range ops { + if !op.IsTxn() { + ret = append(ret, op) + continue + } + _, thenOps, elseOps := op.Txn() + if txnResp := resp[i].GetResponseTxn(); txnResp.Succeeded { + ret = append(ret, gatherResponseOps(txnResp.Responses, thenOps)...) + } else { + ret = append(ret, gatherResponseOps(txnResp.Responses, elseOps)...) + } + } + return ret +} + +func copyHeader(hdr *v3pb.ResponseHeader) *v3pb.ResponseHeader { + h := *hdr + return &h +} + +func closeAll(chs []chan<- struct{}) { + for _, ch := range chs { + close(ch) + } +} diff --git a/clientv3/op.go b/clientv3/op.go index ef6f04368a3..8ef043a114b 100644 --- a/clientv3/op.go +++ b/clientv3/op.go @@ -89,6 +89,45 @@ func (op *Op) WithKeyBytes(key []byte) { op.key = key } // RangeBytes returns the byte slice holding with the Op's range end, if any. func (op Op) RangeBytes() []byte { return op.end } +// Rev returns the requested revision, if any. +func (op Op) Rev() int64 { return op.rev } + +// IsPut returns true iff the operation is a Put. +func (op Op) IsPut() bool { return op.t == tPut } + +// IsGet returns true iff the operation is a Get. +func (op Op) IsGet() bool { return op.t == tRange } + +// IsDelete returns true iff the operation is a Delete. +func (op Op) IsDelete() bool { return op.t == tDeleteRange } + +// IsSerializable returns true if the serializable field is true. +func (op Op) IsSerializable() bool { return op.serializable == true } + +// IsKeysOnly returns true if the keysonly field is true. +func (op Op) IsKeysOnly() bool { return op.keysOnly == true } + +// IsCountOnly returns true if the countonly field is true. +func (op Op) IsCountOnly() bool { return op.countOnly == true } + +// MinModRev returns if field is populated. +func (op Op) MinModRev() int64 { return op.minModRev } + +// MaxModRev returns if field is populated. +func (op Op) MaxModRev() int64 { return op.maxModRev } + +// MinCreateRev returns if field is populated. +func (op Op) MinCreateRev() int64 { return op.minCreateRev } + +// MaxCreateRev returns if field is populated. +func (op Op) MaxCreateRev() int64 { return op.maxCreateRev } + +// Limit returns if field is populated. +func (op Op) retLimit() int64 { return op.limit } + +// Sort returns if field is populated. +func (op Op) retSort() bool { return op.sort != nil } + // WithRangeBytes sets the byte slice for the Op's range end. func (op *Op) WithRangeBytes(end []byte) { op.end = end } diff --git a/clientv3/ordering/kv_test.go b/clientv3/ordering/kv_test.go index 7884cb62288..d6ec597b65b 100644 --- a/clientv3/ordering/kv_test.go +++ b/clientv3/ordering/kv_test.go @@ -195,7 +195,7 @@ var rangeTests = []struct { func TestKvOrdering(t *testing.T) { for i, tt := range rangeTests { - mKV := &mockKV{clientv3.NewKVFromKVClient(nil), tt.response.ToOpResponse()} + mKV := &mockKV{clientv3.NewKVFromKVClient(nil), tt.response.OpResponse()} kv := &kvOrdering{ mKV, func(r *clientv3.GetResponse) OrderViolationFunc { @@ -249,7 +249,7 @@ var txnTests = []struct { func TestTxnOrdering(t *testing.T) { for i, tt := range txnTests { - mKV := &mockKV{clientv3.NewKVFromKVClient(nil), tt.response.ToOpResponse()} + mKV := &mockKV{clientv3.NewKVFromKVClient(nil), tt.response.OpResponse()} kv := &kvOrdering{ mKV, func(r *clientv3.TxnResponse) OrderViolationFunc { diff --git a/etcdmain/grpc_proxy.go b/etcdmain/grpc_proxy.go index 969dd83ed51..2e4adc45e96 100644 --- a/etcdmain/grpc_proxy.go +++ b/etcdmain/grpc_proxy.go @@ -26,6 +26,7 @@ import ( "time" "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/clientv3/leasing" "github.com/coreos/etcd/clientv3/namespace" "github.com/coreos/etcd/clientv3/ordering" "github.com/coreos/etcd/etcdserver/api/etcdhttp" @@ -70,6 +71,7 @@ var ( grpcProxyResolverTTL int grpcProxyNamespace string + grpcProxyLeasing string grpcProxyEnablePprof bool grpcProxyEnableOrdering bool @@ -124,7 +126,7 @@ func newGRPCProxyStartCommand() *cobra.Command { // experimental flags cmd.Flags().BoolVar(&grpcProxyEnableOrdering, "experimental-serializable-ordering", false, "Ensure serializable reads have monotonically increasing store revisions across endpoints.") - + cmd.Flags().StringVar(&grpcProxyLeasing, "experimental-leasing-prefix", "", "leasing metadata prefix for disconnected linearized reads.") return &cmd } @@ -282,6 +284,10 @@ func newGRPCProxyServer(client *clientv3.Client) *grpc.Server { client.Lease = namespace.NewLease(client.Lease, grpcProxyNamespace) } + if len(grpcProxyLeasing) > 0 { + client.KV, _ = leasing.NewKV(client, grpcProxyLeasing) + } + kvp, _ := grpcproxy.NewKvProxy(client) watchp, _ := grpcproxy.NewWatchProxy(client) if grpcProxyResolverPrefix != "" { diff --git a/test b/test index 8daf74597bf..e0cffe0f44f 100755 --- a/test +++ b/test @@ -212,14 +212,14 @@ function integration_e2e_pass { wait $e2epid wait $intpid go test -timeout 1m -v ${RACE} -cpu 1,2,4 $@ ${REPO_PATH}/client/integration - go test -timeout 10m -v ${RACE} -cpu 1,2,4 $@ ${REPO_PATH}/clientv3/integration + go test -timeout 20m -v ${RACE} -cpu 1,2,4 $@ ${REPO_PATH}/clientv3/integration go test -timeout 1m -v -cpu 1,2,4 $@ ${REPO_PATH}/contrib/raftexample go test -timeout 1m -v ${RACE} -cpu 1,2,4 -run=Example $@ ${TEST} } function grpcproxy_pass { go test -timeout 20m -v ${RACE} -tags cluster_proxy -cpu 1,2,4 $@ ${REPO_PATH}/integration - go test -timeout 15m -v ${RACE} -tags cluster_proxy -cpu 1,2,4 $@ ${REPO_PATH}/clientv3/integration + go test -timeout 20m -v ${RACE} -tags cluster_proxy -cpu 1,2,4 $@ ${REPO_PATH}/clientv3/integration go test -timeout 15m -v -tags cluster_proxy $@ ${REPO_PATH}/e2e }