Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tikvclient: fix a bug that double close channels. (#10991) #11101

Merged
merged 4 commits into from
Jul 9, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,13 @@ require (
github.com/myesui/uuid v1.0.0 // indirect
github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7
github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef
github.com/onsi/ginkgo v1.7.0 // indirect
github.com/onsi/gomega v1.4.3 // indirect
github.com/opentracing/basictracer-go v1.0.0
github.com/opentracing/opentracing-go v1.0.2
github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8
github.com/pingcap/errors v0.11.4
github.com/pingcap/failpoint v0.0.0-20190422094118-d8535965f59b
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20190619024611-a4759dfe3753
github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596
github.com/pingcap/parser v0.0.0-20190613045206-37cc370a20a4
github.com/pingcap/pd v0.0.0-20190424024702-bd1e2496a669
Expand Down
13 changes: 7 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/StackExchange/wmi v0.0.0-20180725035823-b12b22c5341f h1:5ZfJxyXo8KyX8DgGXC5B7ILL8y51fci/qYz2B4j8iLY=
github.com/StackExchange/wmi v0.0.0-20180725035823-b12b22c5341f/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
Expand Down Expand Up @@ -70,6 +71,7 @@ github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfb
github.com/golang/protobuf v0.0.0-20180814211427-aa810b61a9c7/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
Expand Down Expand Up @@ -139,12 +141,10 @@ github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef h1:K0Fn+DoFqNqktdZtdV3
github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef/go.mod h1:7WjlapSfwQyo6LNmIvEWzsW1hbBQfpUO4JWnuQRmva8=
github.com/nicksnyder/go-i18n v1.10.0/go.mod h1:HrK7VCrbOvQoUAQ7Vpy7i87N7JZZZ7R2xBGjv0j365Q=
github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
github.com/onsi/ginkgo v1.6.0 h1:Ix8l273rp3QzYgXSR+c8d1fTG7UPgYkOSELPhiY/YGw=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.2 h1:3mYCb7aPxS/RU7TI1y4rkEn1oKmPRjNJLNEXgw7MH2I=
github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/opentracing/basictracer-go v1.0.0 h1:YyUAhaEfjoWXclZVJ9sGoNct7j4TVk7lZWlQw5UXuoo=
github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74=
github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg4X946/Y5Zwg=
Expand All @@ -165,8 +165,8 @@ github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3/go.mod h1:DazNTg0PT
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20190327032727-3d8cb3a30d5d/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20190619024611-a4759dfe3753 h1:92t0y430CJF0tN1lvUhP5fhnYTFmssATJqwxQtvixYU=
github.com/pingcap/kvproto v0.0.0-20190619024611-a4759dfe3753/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531 h1:8xk2HobDwClB5E3Hv9TEPiS7K7bv3ykWHLyZzuUYywI=
github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7/go.mod h1:xsfkWVaFVV5B8e1K9seWfyJWFrIhbtUTAD8NV1Pq3+w=
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 h1:t2OQTpPJnrPDGlvA+3FwJptMTt6MEPdzK1Wt99oaefQ=
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
Expand Down Expand Up @@ -301,6 +301,7 @@ google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3
gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U=
gopkg.in/alecthomas/gometalinter.v2 v2.0.12/go.mod h1:NDRytsqEZyolNuAgTzJkZMkSQM7FIKyzVzGhjB/qfYo=
gopkg.in/alecthomas/kingpin.v3-unstable v3.0.0-20180810215634-df19058c872c/go.mod h1:3HH7i1SgMqlzxCcBmUHW657sD4Kvv9sC3HpL3YukzwA=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
4 changes: 2 additions & 2 deletions metrics/tikvclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,13 +180,13 @@ var (
})

// TiKVPendingBatchRequests indicates the number of requests pending in the batch channel.
TiKVPendingBatchRequests = prometheus.NewGauge(
TiKVPendingBatchRequests = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "tidb",
Subsystem: "tikvclient",
Name: "pending_batch_requests",
Help: "Pending batch requests",
})
}, []string{"store"})

TiKVBatchWaitDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Expand Down
125 changes: 78 additions & 47 deletions store/tikv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/debugpb"
"github.com/pingcap/kvproto/pkg/tikvpb"
Expand All @@ -33,16 +34,13 @@ import (
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/util/logutil"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
)

// MaxSendMsgSize set max gRPC request message size sent to server. If any request message size is larger than
// current value, an error will be reported from gRPC.
var MaxSendMsgSize = 10 * 1024 * 1024

// MaxRecvMsgSize set max gRPC receive message size received from server. If any message size is larger than
// current value, an error will be reported from gRPC.
var MaxRecvMsgSize = math.MaxInt64
Expand Down Expand Up @@ -82,11 +80,14 @@ type connArray struct {
batchCommandsCh chan *batchCommandsEntry
batchCommandsClients []*batchCommandsClient
tikvTransportLayerLoad uint64
closed chan struct{}

// Notify rpcClient to check the idle flag
idleNotify *uint32
idle bool
idleDetect *time.Timer

pendingRequests prometheus.Gauge
}

type batchCommandsClient struct {
Expand All @@ -109,17 +110,70 @@ func (c *batchCommandsClient) isStopped() bool {
return atomic.LoadInt32(&c.closed) != 0
}

func (c *batchCommandsClient) send(request *tikvpb.BatchCommandsRequest, entries []*batchCommandsEntry) {
// Use the lock to protect the stream client won't be replaced by RecvLoop,
// and new added request won't be removed by `failPendingRequests`.
c.clientLock.Lock()
defer c.clientLock.Unlock()
for i, requestID := range request.RequestIds {
c.batched.Store(requestID, entries[i])
}
if err := c.client.Send(request); err != nil {
logutil.Logger(context.Background()).Error(
"batch commands send error",
zap.String("target", c.target),
zap.Error(err),
)
c.failPendingRequests(err)
}
}

func (c *batchCommandsClient) recv() (*tikvpb.BatchCommandsResponse, error) {
failpoint.Inject("gotErrorInRecvLoop", func(_ failpoint.Value) (*tikvpb.BatchCommandsResponse, error) {
return nil, errors.New("injected error in batchRecvLoop")
})
// When `conn.Close()` is called, `client.Recv()` will return an error.
return c.client.Recv()
}

// `failPendingRequests` must be called in locked contexts in order to avoid double closing channels.
func (c *batchCommandsClient) failPendingRequests(err error) {
failpoint.Inject("panicInFailPendingRequests", nil)
c.batched.Range(func(key, value interface{}) bool {
id, _ := key.(uint64)
entry, _ := value.(*batchCommandsEntry)
entry.err = err
close(entry.res)
c.batched.Delete(id)
close(entry.res)
return true
})
}

func (c *batchCommandsClient) reCreateStreamingClient(err error) bool {
// Hold the lock to forbid batchSendLoop using the old client.
c.clientLock.Lock()
defer c.clientLock.Unlock()
c.failPendingRequests(err) // fail all pending requests.

// Re-establish a application layer stream. TCP layer is handled by gRPC.
tikvClient := tikvpb.NewTikvClient(c.conn)
streamClient, err := tikvClient.BatchCommands(context.TODO())
if err == nil {
logutil.Logger(context.Background()).Info(
"batchRecvLoop re-create streaming success",
zap.String("target", c.target),
)
c.client = streamClient
return true
}
logutil.Logger(context.Background()).Error(
"batchRecvLoop re-create streaming fail",
zap.String("target", c.target),
zap.Error(err),
)
return false
}

func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) {
defer func() {
if r := recover(); r != nil {
Expand All @@ -133,8 +187,7 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) {
}()

for {
// When `conn.Close()` is called, `client.Recv()` will return an error.
resp, err := c.client.Recv()
resp, err := c.recv()
if err != nil {
now := time.Now()
for { // try to re-create the streaming in the loop.
Expand All @@ -147,28 +200,10 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) {
zap.Error(err),
)

// Hold the lock to forbid batchSendLoop using the old client.
c.clientLock.Lock()
c.failPendingRequests(err) // fail all pending requests.

// Re-establish a application layer stream. TCP layer is handled by gRPC.
tikvClient := tikvpb.NewTikvClient(c.conn)
streamClient, err := tikvClient.BatchCommands(context.TODO())
c.clientLock.Unlock()

if err == nil {
logutil.Logger(context.Background()).Info(
"batchRecvLoop re-create streaming success",
zap.String("target", c.target),
)
c.client = streamClient
if c.reCreateStreamingClient(err) {
break
}
logutil.Logger(context.Background()).Error(
"batchRecvLoop re-create streaming fail",
zap.String("target", c.target),
zap.Error(err),
)

// TODO: Use a more smart backoff strategy.
time.Sleep(time.Second)
}
Expand Down Expand Up @@ -212,6 +247,7 @@ func newConnArray(maxSize uint, addr string, security config.Security, idleNotif
batchCommandsCh: make(chan *batchCommandsEntry, cfg.TiKVClient.MaxBatchSize),
batchCommandsClients: make([]*batchCommandsClient, 0, maxSize),
tikvTransportLayerLoad: 0,
closed: make(chan struct{}),

idleNotify: idleNotify,
idleDetect: time.NewTimer(idleTimeout),
Expand All @@ -224,6 +260,7 @@ func newConnArray(maxSize uint, addr string, security config.Security, idleNotif

func (a *connArray) Init(addr string, security config.Security) error {
a.target = addr
a.pendingRequests = metrics.TiKVPendingBatchRequests.WithLabelValues(a.target)

opt := grpc.WithInsecure()
if len(security.ClusterSSLCA) != 0 {
Expand Down Expand Up @@ -258,7 +295,6 @@ func (a *connArray) Init(addr string, security config.Security) error {
grpc.WithUnaryInterceptor(unaryInterceptor),
grpc.WithStreamInterceptor(streamInterceptor),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(MaxRecvMsgSize)),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(MaxSendMsgSize)),
grpc.WithBackoffMaxDelay(time.Second*3),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: time.Duration(keepAlive) * time.Second,
Expand Down Expand Up @@ -314,7 +350,10 @@ func (a *connArray) Close() {
// After connections are closed, `batchRecvLoop`s will check the flag.
atomic.StoreInt32(&c.closed, 1)
}
close(a.batchCommandsCh)
// Don't close(batchCommandsCh) because when Close() is called, someone maybe
// calling SendRequest and writing batchCommandsCh, if we close it here the
// writing goroutine will panic.
close(a.closed)

for i, c := range a.v {
if c != nil {
Expand Down Expand Up @@ -361,6 +400,8 @@ func (a *connArray) fetchAllPendingRequests(
atomic.CompareAndSwapUint32(a.idleNotify, 0, 1)
// This connArray to be recycled
return
case <-a.closed:
return
}
if headEntry == nil {
return
Expand Down Expand Up @@ -455,7 +496,7 @@ func (a *connArray) batchSendLoop(cfg config.TiKVClient) {
requests = requests[:0]
requestIDs = requestIDs[:0]

metrics.TiKVPendingBatchRequests.Set(float64(len(a.batchCommandsCh)))
a.pendingRequests.Set(float64(len(a.batchCommandsCh)))
a.fetchAllPendingRequests(int(cfg.MaxBatchSize), &entries, &requests)

if len(entries) < int(cfg.MaxBatchSize) && cfg.MaxBatchWaitTime > 0 {
Expand Down Expand Up @@ -489,27 +530,12 @@ func (a *connArray) batchSendLoop(cfg config.TiKVClient) {
requestIDs = append(requestIDs, requestID)
}

request := &tikvpb.BatchCommandsRequest{
req := &tikvpb.BatchCommandsRequest{
Requests: requests,
RequestIds: requestIDs,
}

// Use the lock to protect the stream client won't be replaced by RecvLoop,
// and new added request won't be removed by `failPendingRequests`.
batchCommandsClient.clientLock.Lock()
for i, requestID := range request.RequestIds {
batchCommandsClient.batched.Store(requestID, entries[i])
}
err := batchCommandsClient.client.Send(request)
batchCommandsClient.clientLock.Unlock()
if err != nil {
logutil.Logger(context.Background()).Error(
"batch commands send error",
zap.String("target", a.target),
zap.Error(err),
)
batchCommandsClient.failPendingRequests(err)
}
batchCommandsClient.send(req, entries)
}
}

Expand Down Expand Up @@ -552,6 +578,11 @@ func newRPCClient(security config.Security) *rpcClient {
}
}

// NewTestRPCClient is for some external tests.
func NewTestRPCClient() Client {
return newRPCClient(config.Security{})
}

func (c *rpcClient) getConnArray(addr string) (*connArray, error) {
c.RLock()
if c.isClosed {
Expand Down
60 changes: 60 additions & 0 deletions store/tikv/client_fail_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package tikv

import (
"context"
"fmt"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/tikvpb"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
)

func setGrpcConnectionCount(count uint) {
config.GetGlobalConfig().TiKVClient.GrpcConnectionCount = count
}

func (s *testClientSuite) TestPanicInRecvLoop(c *C) {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/panicInFailPendingRequests", `panic`), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/gotErrorInRecvLoop", `return("0")`), IsNil)

server, port := startMockTikvService()
c.Assert(port > 0, IsTrue)

grpcConnectionCount := config.GetGlobalConfig().TiKVClient.GrpcConnectionCount
setGrpcConnectionCount(1)
addr := fmt.Sprintf("%s:%d", "127.0.0.1", port)
rpcClient := newRPCClient(config.Security{})

// Start batchRecvLoop, and it should panic in `failPendingRequests`.
_, err := rpcClient.getConnArray(addr)
c.Assert(err, IsNil)

time.Sleep(time.Second)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/panicInFailPendingRequests"), IsNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/gotErrorInRecvLoop"), IsNil)

req := &tikvrpc.Request{
Type: tikvrpc.CmdEmpty,
Empty: &tikvpb.BatchCommandsEmptyRequest{},
}
_, err = rpcClient.SendRequest(context.Background(), addr, req, time.Second)
c.Assert(err, IsNil)
server.Stop()
setGrpcConnectionCount(grpcConnectionCount)
}
Loading