Skip to content

Commit

Permalink
tikvclient: fix a bug that double close channels. (#10991)
Browse files Browse the repository at this point in the history
  • Loading branch information
hicqu committed Jul 5, 2019
1 parent 8493e45 commit bf5bc66
Show file tree
Hide file tree
Showing 8 changed files with 239 additions and 60 deletions.
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,13 @@ require (
github.com/myesui/uuid v1.0.0 // indirect
github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7
github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef
github.com/onsi/ginkgo v1.7.0 // indirect
github.com/onsi/gomega v1.4.3 // indirect
github.com/opentracing/basictracer-go v1.0.0
github.com/opentracing/opentracing-go v1.0.2
github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8
github.com/pingcap/errors v0.11.4
github.com/pingcap/failpoint v0.0.0-20190422094118-d8535965f59b
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20190619024611-a4759dfe3753
github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596
github.com/pingcap/parser v0.0.0-20190613045206-37cc370a20a4
github.com/pingcap/pd v0.0.0-20190424024702-bd1e2496a669
Expand Down
13 changes: 7 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/StackExchange/wmi v0.0.0-20180725035823-b12b22c5341f h1:5ZfJxyXo8KyX8DgGXC5B7ILL8y51fci/qYz2B4j8iLY=
github.com/StackExchange/wmi v0.0.0-20180725035823-b12b22c5341f/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
Expand Down Expand Up @@ -70,6 +71,7 @@ github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfb
github.com/golang/protobuf v0.0.0-20180814211427-aa810b61a9c7/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
Expand Down Expand Up @@ -139,12 +141,10 @@ github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef h1:K0Fn+DoFqNqktdZtdV3
github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef/go.mod h1:7WjlapSfwQyo6LNmIvEWzsW1hbBQfpUO4JWnuQRmva8=
github.com/nicksnyder/go-i18n v1.10.0/go.mod h1:HrK7VCrbOvQoUAQ7Vpy7i87N7JZZZ7R2xBGjv0j365Q=
github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
github.com/onsi/ginkgo v1.6.0 h1:Ix8l273rp3QzYgXSR+c8d1fTG7UPgYkOSELPhiY/YGw=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.2 h1:3mYCb7aPxS/RU7TI1y4rkEn1oKmPRjNJLNEXgw7MH2I=
github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/opentracing/basictracer-go v1.0.0 h1:YyUAhaEfjoWXclZVJ9sGoNct7j4TVk7lZWlQw5UXuoo=
github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74=
github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg4X946/Y5Zwg=
Expand All @@ -165,8 +165,8 @@ github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3/go.mod h1:DazNTg0PT
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20190327032727-3d8cb3a30d5d/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20190619024611-a4759dfe3753 h1:92t0y430CJF0tN1lvUhP5fhnYTFmssATJqwxQtvixYU=
github.com/pingcap/kvproto v0.0.0-20190619024611-a4759dfe3753/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531 h1:8xk2HobDwClB5E3Hv9TEPiS7K7bv3ykWHLyZzuUYywI=
github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7/go.mod h1:xsfkWVaFVV5B8e1K9seWfyJWFrIhbtUTAD8NV1Pq3+w=
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 h1:t2OQTpPJnrPDGlvA+3FwJptMTt6MEPdzK1Wt99oaefQ=
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
Expand Down Expand Up @@ -301,6 +301,7 @@ google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3
gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U=
gopkg.in/alecthomas/gometalinter.v2 v2.0.12/go.mod h1:NDRytsqEZyolNuAgTzJkZMkSQM7FIKyzVzGhjB/qfYo=
gopkg.in/alecthomas/kingpin.v3-unstable v3.0.0-20180810215634-df19058c872c/go.mod h1:3HH7i1SgMqlzxCcBmUHW657sD4Kvv9sC3HpL3YukzwA=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
4 changes: 2 additions & 2 deletions metrics/tikvclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,13 +180,13 @@ var (
})

// TiKVPendingBatchRequests indicates the number of requests pending in the batch channel.
TiKVPendingBatchRequests = prometheus.NewGauge(
TiKVPendingBatchRequests = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "tidb",
Subsystem: "tikvclient",
Name: "pending_batch_requests",
Help: "Pending batch requests",
})
}, []string{"store"})

TiKVBatchWaitDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Expand Down
125 changes: 78 additions & 47 deletions store/tikv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/debugpb"
"github.com/pingcap/kvproto/pkg/tikvpb"
Expand All @@ -33,16 +34,13 @@ import (
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/util/logutil"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
)

// MaxSendMsgSize set max gRPC request message size sent to server. If any request message size is larger than
// current value, an error will be reported from gRPC.
var MaxSendMsgSize = 10 * 1024 * 1024

// MaxRecvMsgSize set max gRPC receive message size received from server. If any message size is larger than
// current value, an error will be reported from gRPC.
var MaxRecvMsgSize = math.MaxInt64
Expand Down Expand Up @@ -82,11 +80,14 @@ type connArray struct {
batchCommandsCh chan *batchCommandsEntry
batchCommandsClients []*batchCommandsClient
tikvTransportLayerLoad uint64
closed chan struct{}

// Notify rpcClient to check the idle flag
idleNotify *uint32
idle bool
idleDetect *time.Timer

pendingRequests prometheus.Gauge
}

type batchCommandsClient struct {
Expand All @@ -109,17 +110,70 @@ func (c *batchCommandsClient) isStopped() bool {
return atomic.LoadInt32(&c.closed) != 0
}

func (c *batchCommandsClient) send(request *tikvpb.BatchCommandsRequest, entries []*batchCommandsEntry) {
// Use the lock to protect the stream client won't be replaced by RecvLoop,
// and new added request won't be removed by `failPendingRequests`.
c.clientLock.Lock()
defer c.clientLock.Unlock()
for i, requestID := range request.RequestIds {
c.batched.Store(requestID, entries[i])
}
if err := c.client.Send(request); err != nil {
logutil.Logger(context.Background()).Error(
"batch commands send error",
zap.String("target", c.target),
zap.Error(err),
)
c.failPendingRequests(err)
}
}

func (c *batchCommandsClient) recv() (*tikvpb.BatchCommandsResponse, error) {
failpoint.Inject("gotErrorInRecvLoop", func(_ failpoint.Value) (*tikvpb.BatchCommandsResponse, error) {
return nil, errors.New("injected error in batchRecvLoop")
})
// When `conn.Close()` is called, `client.Recv()` will return an error.
return c.client.Recv()
}

// `failPendingRequests` must be called in locked contexts in order to avoid double closing channels.
func (c *batchCommandsClient) failPendingRequests(err error) {
failpoint.Inject("panicInFailPendingRequests", nil)
c.batched.Range(func(key, value interface{}) bool {
id, _ := key.(uint64)
entry, _ := value.(*batchCommandsEntry)
entry.err = err
close(entry.res)
c.batched.Delete(id)
close(entry.res)
return true
})
}

func (c *batchCommandsClient) reCreateStreamingClient(err error) bool {
// Hold the lock to forbid batchSendLoop using the old client.
c.clientLock.Lock()
defer c.clientLock.Unlock()
c.failPendingRequests(err) // fail all pending requests.

// Re-establish a application layer stream. TCP layer is handled by gRPC.
tikvClient := tikvpb.NewTikvClient(c.conn)
streamClient, err := tikvClient.BatchCommands(context.TODO())
if err == nil {
logutil.Logger(context.Background()).Info(
"batchRecvLoop re-create streaming success",
zap.String("target", c.target),
)
c.client = streamClient
return true
}
logutil.Logger(context.Background()).Error(
"batchRecvLoop re-create streaming fail",
zap.String("target", c.target),
zap.Error(err),
)
return false
}

func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) {
defer func() {
if r := recover(); r != nil {
Expand All @@ -133,8 +187,7 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) {
}()

for {
// When `conn.Close()` is called, `client.Recv()` will return an error.
resp, err := c.client.Recv()
resp, err := c.recv()
if err != nil {
now := time.Now()
for { // try to re-create the streaming in the loop.
Expand All @@ -147,28 +200,10 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) {
zap.Error(err),
)

// Hold the lock to forbid batchSendLoop using the old client.
c.clientLock.Lock()
c.failPendingRequests(err) // fail all pending requests.

// Re-establish a application layer stream. TCP layer is handled by gRPC.
tikvClient := tikvpb.NewTikvClient(c.conn)
streamClient, err := tikvClient.BatchCommands(context.TODO())
c.clientLock.Unlock()

if err == nil {
logutil.Logger(context.Background()).Info(
"batchRecvLoop re-create streaming success",
zap.String("target", c.target),
)
c.client = streamClient
if c.reCreateStreamingClient(err) {
break
}
logutil.Logger(context.Background()).Error(
"batchRecvLoop re-create streaming fail",
zap.String("target", c.target),
zap.Error(err),
)

// TODO: Use a more smart backoff strategy.
time.Sleep(time.Second)
}
Expand Down Expand Up @@ -212,6 +247,7 @@ func newConnArray(maxSize uint, addr string, security config.Security, idleNotif
batchCommandsCh: make(chan *batchCommandsEntry, cfg.TiKVClient.MaxBatchSize),
batchCommandsClients: make([]*batchCommandsClient, 0, maxSize),
tikvTransportLayerLoad: 0,
closed: make(chan struct{}),

idleNotify: idleNotify,
idleDetect: time.NewTimer(idleTimeout),
Expand All @@ -224,6 +260,7 @@ func newConnArray(maxSize uint, addr string, security config.Security, idleNotif

func (a *connArray) Init(addr string, security config.Security) error {
a.target = addr
a.pendingRequests = metrics.TiKVPendingBatchRequests.WithLabelValues(a.target)

opt := grpc.WithInsecure()
if len(security.ClusterSSLCA) != 0 {
Expand Down Expand Up @@ -258,7 +295,6 @@ func (a *connArray) Init(addr string, security config.Security) error {
grpc.WithUnaryInterceptor(unaryInterceptor),
grpc.WithStreamInterceptor(streamInterceptor),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(MaxRecvMsgSize)),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(MaxSendMsgSize)),
grpc.WithBackoffMaxDelay(time.Second*3),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: time.Duration(keepAlive) * time.Second,
Expand Down Expand Up @@ -314,7 +350,10 @@ func (a *connArray) Close() {
// After connections are closed, `batchRecvLoop`s will check the flag.
atomic.StoreInt32(&c.closed, 1)
}
close(a.batchCommandsCh)
// Don't close(batchCommandsCh) because when Close() is called, someone maybe
// calling SendRequest and writing batchCommandsCh, if we close it here the
// writing goroutine will panic.
close(a.closed)

for i, c := range a.v {
if c != nil {
Expand Down Expand Up @@ -361,6 +400,8 @@ func (a *connArray) fetchAllPendingRequests(
atomic.CompareAndSwapUint32(a.idleNotify, 0, 1)
// This connArray to be recycled
return
case <-a.closed:
return
}
if headEntry == nil {
return
Expand Down Expand Up @@ -455,7 +496,7 @@ func (a *connArray) batchSendLoop(cfg config.TiKVClient) {
requests = requests[:0]
requestIDs = requestIDs[:0]

metrics.TiKVPendingBatchRequests.Set(float64(len(a.batchCommandsCh)))
a.pendingRequests.Set(float64(len(a.batchCommandsCh)))
a.fetchAllPendingRequests(int(cfg.MaxBatchSize), &entries, &requests)

if len(entries) < int(cfg.MaxBatchSize) && cfg.MaxBatchWaitTime > 0 {
Expand Down Expand Up @@ -489,27 +530,12 @@ func (a *connArray) batchSendLoop(cfg config.TiKVClient) {
requestIDs = append(requestIDs, requestID)
}

request := &tikvpb.BatchCommandsRequest{
req := &tikvpb.BatchCommandsRequest{
Requests: requests,
RequestIds: requestIDs,
}

// Use the lock to protect the stream client won't be replaced by RecvLoop,
// and new added request won't be removed by `failPendingRequests`.
batchCommandsClient.clientLock.Lock()
for i, requestID := range request.RequestIds {
batchCommandsClient.batched.Store(requestID, entries[i])
}
err := batchCommandsClient.client.Send(request)
batchCommandsClient.clientLock.Unlock()
if err != nil {
logutil.Logger(context.Background()).Error(
"batch commands send error",
zap.String("target", a.target),
zap.Error(err),
)
batchCommandsClient.failPendingRequests(err)
}
batchCommandsClient.send(req, entries)
}
}

Expand Down Expand Up @@ -552,6 +578,11 @@ func newRPCClient(security config.Security) *rpcClient {
}
}

// NewTestRPCClient is for some external tests.
func NewTestRPCClient() Client {
return newRPCClient(config.Security{})
}

func (c *rpcClient) getConnArray(addr string) (*connArray, error) {
c.RLock()
if c.isClosed {
Expand Down
60 changes: 60 additions & 0 deletions store/tikv/client_fail_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package tikv

import (
"context"
"fmt"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/tikvpb"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
)

func setGrpcConnectionCount(count uint) {
config.GetGlobalConfig().TiKVClient.GrpcConnectionCount = count
}

func (s *testClientSuite) TestPanicInRecvLoop(c *C) {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/panicInFailPendingRequests", `panic`), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/gotErrorInRecvLoop", `return("0")`), IsNil)

server, port := startMockTikvService()
c.Assert(port > 0, IsTrue)

grpcConnectionCount := config.GetGlobalConfig().TiKVClient.GrpcConnectionCount
setGrpcConnectionCount(1)
addr := fmt.Sprintf("%s:%d", "127.0.0.1", port)
rpcClient := newRPCClient(config.Security{})

// Start batchRecvLoop, and it should panic in `failPendingRequests`.
_, err := rpcClient.getConnArray(addr)
c.Assert(err, IsNil)

time.Sleep(time.Second)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/panicInFailPendingRequests"), IsNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/gotErrorInRecvLoop"), IsNil)

req := &tikvrpc.Request{
Type: tikvrpc.CmdEmpty,
Empty: &tikvpb.BatchCommandsEmptyRequest{},
}
_, err = rpcClient.SendRequest(context.Background(), addr, req, time.Second)
c.Assert(err, IsNil)
server.Stop()
setGrpcConnectionCount(grpcConnectionCount)
}
Loading

0 comments on commit bf5bc66

Please sign in to comment.