Skip to content

Commit

Permalink
Merge pull request #8626 from gyuho/kc
Browse files Browse the repository at this point in the history
*: add watch with client keepalive test
  • Loading branch information
gyuho authored Sep 28, 2017
2 parents 55b7289 + 65ffb52 commit 2cfe0d6
Show file tree
Hide file tree
Showing 4 changed files with 203 additions and 31 deletions.
5 changes: 5 additions & 0 deletions .words
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
RPC
RPCs
blackholed
cancelable
cancelation
cluster_proxy
defragment
defragmenting
etcd
Expand All @@ -20,5 +22,8 @@ prefetching
protobuf
serializable
teardown
too_many_pings
uncontended
unprefixed
unlisting

92 changes: 92 additions & 0 deletions clientv3/integration/watch_keepalive_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// +build !cluster_proxy

package integration

import (
"context"
"testing"
"time"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/integration"
"github.com/coreos/etcd/pkg/testutil"
)

// TestWatchKeepAlive tests when watch discovers it cannot talk to
// blackholed endpoint, client balancer switches to healthy one.
// TODO: test server-to-client keepalive ping
func TestWatchKeepAlive(t *testing.T) {
defer testutil.AfterTest(t)

clus := integration.NewClusterV3(t, &integration.ClusterConfig{
Size: 3,
GRPCKeepAliveMinTime: time.Millisecond, // avoid too_many_pings
})
defer clus.Terminate(t)

ccfg := clientv3.Config{
Endpoints: []string{clus.Members[0].GRPCAddr()},
DialTimeout: 3 * time.Second,
DialKeepAliveTime: 2 * time.Second,
DialKeepAliveTimeout: 2 * time.Second,
}
cli, err := clientv3.New(ccfg)
if err != nil {
t.Fatal(err)
}
defer cli.Close()

wch := cli.Watch(context.Background(), "foo", clientv3.WithCreatedNotify())
if _, ok := <-wch; !ok {
t.Fatalf("watch failed on creation")
}

clus.Members[0].Blackhole()

// expects endpoint switch to ep[1]
cli.SetEndpoints(clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr())

// ep[0] keepalive time-out after DialKeepAliveTime + DialKeepAliveTimeout
// wait extra for processing network error for endpoint switching
timeout := ccfg.DialKeepAliveTime + ccfg.DialKeepAliveTimeout + ccfg.DialTimeout
time.Sleep(timeout)

if _, err = clus.Client(1).Put(context.TODO(), "foo", "bar"); err != nil {
t.Fatal(err)
}
select {
case <-wch:
case <-time.After(5 * time.Second):
t.Fatal("took too long to receive events")
}

clus.Members[0].Unblackhole()
clus.Members[1].Blackhole()
defer clus.Members[1].Unblackhole()

// wait for ep[0] recover, ep[1] fail
time.Sleep(timeout)

if _, err = clus.Client(0).Put(context.TODO(), "foo", "bar"); err != nil {
t.Fatal(err)
}
select {
case <-wch:
case <-time.After(5 * time.Second):
t.Fatal("took too long to receive events")
}
}
67 changes: 57 additions & 10 deletions integration/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package integration
import (
"fmt"
"io"
"io/ioutil"
"net"
"sync"

Expand All @@ -31,21 +32,23 @@ type bridge struct {
l net.Listener
conns map[*bridgeConn]struct{}

stopc chan struct{}
pausec chan struct{}
wg sync.WaitGroup
stopc chan struct{}
pausec chan struct{}
blackholec chan struct{}
wg sync.WaitGroup

mu sync.Mutex
}

func newBridge(addr string) (*bridge, error) {
b := &bridge{
// bridge "port" is ("%05d%05d0", port, pid) since go1.8 expects the port to be a number
inaddr: addr + "0",
outaddr: addr,
conns: make(map[*bridgeConn]struct{}),
stopc: make(chan struct{}),
pausec: make(chan struct{}),
inaddr: addr + "0",
outaddr: addr,
conns: make(map[*bridgeConn]struct{}),
stopc: make(chan struct{}),
pausec: make(chan struct{}),
blackholec: make(chan struct{}),
}
close(b.pausec)

Expand Down Expand Up @@ -152,12 +155,12 @@ func (b *bridge) serveConn(bc *bridgeConn) {
var wg sync.WaitGroup
wg.Add(2)
go func() {
io.Copy(bc.out, bc.in)
b.ioCopy(bc, bc.out, bc.in)
bc.close()
wg.Done()
}()
go func() {
io.Copy(bc.in, bc.out)
b.ioCopy(bc, bc.in, bc.out)
bc.close()
wg.Done()
}()
Expand All @@ -179,3 +182,47 @@ func (bc *bridgeConn) close() {
bc.in.Close()
bc.out.Close()
}

func (b *bridge) Blackhole() {
b.mu.Lock()
close(b.blackholec)
b.mu.Unlock()
}

func (b *bridge) Unblackhole() {
b.mu.Lock()
for bc := range b.conns {
bc.Close()
}
b.conns = make(map[*bridgeConn]struct{})
b.blackholec = make(chan struct{})
b.mu.Unlock()
}

// ref. https://github.com/golang/go/blob/master/src/io/io.go copyBuffer
func (b *bridge) ioCopy(bc *bridgeConn, dst io.Writer, src io.Reader) (err error) {
buf := make([]byte, 32*1024)
for {
select {
case <-b.blackholec:
io.Copy(ioutil.Discard, src)
return nil
default:
}
nr, er := src.Read(buf)
if nr > 0 {
nw, ew := dst.Write(buf[0:nr])
if ew != nil {
return ew
}
if nr != nw {
return io.ErrShortWrite
}
}
if er != nil {
err = er
break
}
}
return
}
70 changes: 49 additions & 21 deletions integration/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/coreos/pkg/capnslog"
"github.com/soheilhy/cmux"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)

const (
Expand Down Expand Up @@ -90,14 +91,17 @@ var (
)

type ClusterConfig struct {
Size int
PeerTLS *transport.TLSInfo
ClientTLS *transport.TLSInfo
DiscoveryURL string
UseGRPC bool
QuotaBackendBytes int64
MaxTxnOps uint
MaxRequestBytes uint
Size int
PeerTLS *transport.TLSInfo
ClientTLS *transport.TLSInfo
DiscoveryURL string
UseGRPC bool
QuotaBackendBytes int64
MaxTxnOps uint
MaxRequestBytes uint
GRPCKeepAliveMinTime time.Duration
GRPCKeepAliveInterval time.Duration
GRPCKeepAliveTimeout time.Duration
}

type cluster struct {
Expand Down Expand Up @@ -225,12 +229,15 @@ func (c *cluster) HTTPMembers() []client.Member {
func (c *cluster) mustNewMember(t *testing.T) *member {
m := mustNewMember(t,
memberConfig{
name: c.name(rand.Int()),
peerTLS: c.cfg.PeerTLS,
clientTLS: c.cfg.ClientTLS,
quotaBackendBytes: c.cfg.QuotaBackendBytes,
maxTxnOps: c.cfg.MaxTxnOps,
maxRequestBytes: c.cfg.MaxRequestBytes,
name: c.name(rand.Int()),
peerTLS: c.cfg.PeerTLS,
clientTLS: c.cfg.ClientTLS,
quotaBackendBytes: c.cfg.QuotaBackendBytes,
maxTxnOps: c.cfg.MaxTxnOps,
maxRequestBytes: c.cfg.MaxRequestBytes,
grpcKeepAliveMinTime: c.cfg.GRPCKeepAliveMinTime,
grpcKeepAliveInterval: c.cfg.GRPCKeepAliveInterval,
grpcKeepAliveTimeout: c.cfg.GRPCKeepAliveTimeout,
})
m.DiscoveryURL = c.cfg.DiscoveryURL
if c.cfg.UseGRPC {
Expand Down Expand Up @@ -482,6 +489,7 @@ type member struct {
s *etcdserver.EtcdServer
serverClosers []func()

grpcServerOpts []grpc.ServerOption
grpcServer *grpc.Server
grpcServerPeer *grpc.Server
grpcAddr string
Expand All @@ -496,12 +504,15 @@ type member struct {
func (m *member) GRPCAddr() string { return m.grpcAddr }

type memberConfig struct {
name string
peerTLS *transport.TLSInfo
clientTLS *transport.TLSInfo
quotaBackendBytes int64
maxTxnOps uint
maxRequestBytes uint
name string
peerTLS *transport.TLSInfo
clientTLS *transport.TLSInfo
quotaBackendBytes int64
maxTxnOps uint
maxRequestBytes uint
grpcKeepAliveMinTime time.Duration
grpcKeepAliveInterval time.Duration
grpcKeepAliveTimeout time.Duration
}

// mustNewMember return an inited member with the given name. If peerTLS is
Expand Down Expand Up @@ -558,6 +569,21 @@ func mustNewMember(t *testing.T, mcfg memberConfig) *member {
m.MaxRequestBytes = embed.DefaultMaxRequestBytes
}
m.AuthToken = "simple" // for the purpose of integration testing, simple token is enough

m.grpcServerOpts = []grpc.ServerOption{}
if mcfg.grpcKeepAliveMinTime > time.Duration(0) {
m.grpcServerOpts = append(m.grpcServerOpts, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: mcfg.grpcKeepAliveMinTime,
PermitWithoutStream: false,
}))
}
if mcfg.grpcKeepAliveInterval > time.Duration(0) &&
mcfg.grpcKeepAliveTimeout > time.Duration(0) {
m.grpcServerOpts = append(m.grpcServerOpts, grpc.KeepaliveParams(keepalive.ServerParameters{
Time: mcfg.grpcKeepAliveInterval,
Timeout: mcfg.grpcKeepAliveTimeout,
}))
}
return m
}

Expand Down Expand Up @@ -588,6 +614,8 @@ func (m *member) ID() types.ID { return m.s.ID() }
func (m *member) DropConnections() { m.grpcBridge.Reset() }
func (m *member) PauseConnections() { m.grpcBridge.Pause() }
func (m *member) UnpauseConnections() { m.grpcBridge.Unpause() }
func (m *member) Blackhole() { m.grpcBridge.Blackhole() }
func (m *member) Unblackhole() { m.grpcBridge.Unblackhole() }

// NewClientV3 creates a new grpc client connection to the member
func NewClientV3(m *member) (*clientv3.Client, error) {
Expand Down Expand Up @@ -670,7 +698,7 @@ func (m *member) Launch() error {
return err
}
}
m.grpcServer = v3rpc.Server(m.s, tlscfg)
m.grpcServer = v3rpc.Server(m.s, tlscfg, m.grpcServerOpts...)
m.grpcServerPeer = v3rpc.Server(m.s, peerTLScfg)
m.serverClient = v3client.New(m.s)
lockpb.RegisterLockServer(m.grpcServer, v3lock.NewLockServer(m.serverClient))
Expand Down

0 comments on commit 2cfe0d6

Please sign in to comment.