Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tikvclient: fix a bug that double close channels. #10991

Merged
merged 31 commits into from
Jul 5, 2019
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
6f1f360
tikvclient: fix a bug that double close channels.
hicqu Jun 30, 2019
5981368
Merge branch 'master' into fix-tikvclient-double-close
hicqu Jul 3, 2019
9b9c9df
address comments
hicqu Jul 3, 2019
9de3d8d
add test case
hicqu Jul 3, 2019
8b1d72b
address comments
hicqu Jul 3, 2019
4aa7247
address comments
hicqu Jul 3, 2019
1adf066
Merge branch 'master' into fix-tikvclient-double-close
hicqu Jul 3, 2019
5424849
Merge branch 'master' into fix-tikvclient-double-close
hicqu Jul 3, 2019
c4c4d02
export `NewTestRPCClient` for schrodinger tests.
hicqu Jul 3, 2019
bd93882
Merge branch 'master' into fix-tikvclient-double-close
hicqu Jul 3, 2019
4f276a5
Merge branch 'master' into fix-tikvclient-double-close
hicqu Jul 4, 2019
2a68090
Merge branch 'master' into fix-tikvclient-double-close
hicqu Jul 4, 2019
d277516
Merge branch 'master' into fix-tikvclient-double-close
hicqu Jul 4, 2019
e4c9654
address comments
hicqu Jul 4, 2019
c3fa876
Merge branch 'fix-tikvclient-double-close' of github.com:hicqu/tidb i…
hicqu Jul 4, 2019
fffd60d
Merge branch 'master' into fix-tikvclient-double-close
hicqu Jul 4, 2019
a8c2f5f
address comments
hicqu Jul 4, 2019
e1c5fb3
Merge branch 'fix-tikvclient-double-close' of github.com:hicqu/tidb i…
hicqu Jul 4, 2019
a555da6
Merge branch 'master' into fix-tikvclient-double-close
hicqu Jul 4, 2019
a350b68
Merge branch 'master' into fix-tikvclient-double-close
hicqu Jul 4, 2019
390be9d
Merge branch 'master' into fix-tikvclient-double-close
hicqu Jul 4, 2019
3d974d3
Merge branch 'master' into fix-tikvclient-double-close
hicqu Jul 4, 2019
2d2eeb7
fix test case
hicqu Jul 4, 2019
305afee
Merge branch 'fix-tikvclient-double-close' of github.com:hicqu/tidb i…
hicqu Jul 4, 2019
f8d481b
fix test
hicqu Jul 4, 2019
bae8d04
Merge branch 'master' into fix-tikvclient-double-close
hicqu Jul 4, 2019
1b9447c
Merge branch 'master' into fix-tikvclient-double-close
tiancaiamao Jul 5, 2019
837f069
Merge branch 'master' into fix-tikvclient-double-close
hicqu Jul 5, 2019
a007bb8
Merge branch 'master' into fix-tikvclient-double-close
hicqu Jul 5, 2019
ac73559
fix panic when sendBatchRequest on conn.Array and close rpcClient sim…
tiancaiamao Jul 5, 2019
f6c50f6
Merge branch 'fix-tikvclient-double-close' of github.com:hicqu/tidb i…
tiancaiamao Jul 5, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions metrics/tikvclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,13 +180,13 @@ var (
})

// TiKVPendingBatchRequests indicates the number of requests pending in the batch channel.
TiKVPendingBatchRequests = prometheus.NewGauge(
TiKVPendingBatchRequests = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "tidb",
Subsystem: "tikvclient",
Name: "pending_batch_requests",
Help: "Pending batch requests",
})
}, []string{"store"})

TiKVBatchWaitDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Expand Down
110 changes: 70 additions & 40 deletions store/tikv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/debugpb"
"github.com/pingcap/kvproto/pkg/tikvpb"
Expand All @@ -33,6 +34,7 @@ import (
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/util/logutil"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
Expand Down Expand Up @@ -83,6 +85,8 @@ type connArray struct {
idleNotify *uint32
idle bool
idleDetect *time.Timer

pendingRequests prometheus.Gauge
}

type batchCommandsClient struct {
Expand All @@ -105,7 +109,27 @@ func (c *batchCommandsClient) isStopped() bool {
return atomic.LoadInt32(&c.closed) != 0
}

func (c *batchCommandsClient) send(request *tikvpb.BatchCommandsRequest, entries []*batchCommandsEntry) {
// Use the lock to protect the stream client won't be replaced by RecvLoop,
// and new added request won't be removed by `failPendingRequests`.
c.clientLock.Lock()
defer c.clientLock.Unlock()
for i, requestID := range request.RequestIds {
c.batched.Store(requestID, entries[i])
}
if err := c.client.Send(request); err != nil {
logutil.BgLogger().Error(
"batch commands send error",
zap.String("target", c.target),
zap.Error(err),
)
c.failPendingRequests(err)
}
}

// `failPendingRequests` must be called in locked contexts in order to avoid double closing channels.
func (c *batchCommandsClient) failPendingRequests(err error) {
failpoint.Inject("panicInFailPendingRequests", nil)
hicqu marked this conversation as resolved.
Show resolved Hide resolved
c.batched.Range(func(key, value interface{}) bool {
id, _ := key.(uint64)
entry, _ := value.(*batchCommandsEntry)
Expand All @@ -116,6 +140,31 @@ func (c *batchCommandsClient) failPendingRequests(err error) {
})
}

func (c *batchCommandsClient) reCreateStreamingClient(err error) bool {
// Hold the lock to forbid batchSendLoop using the old client.
c.clientLock.Lock()
defer c.clientLock.Unlock()
c.failPendingRequests(err) // fail all pending requests.

// Re-establish a application layer stream. TCP layer is handled by gRPC.
tikvClient := tikvpb.NewTikvClient(c.conn)
streamClient, err := tikvClient.BatchCommands(context.TODO())
if err == nil {
logutil.BgLogger().Info(
"batchRecvLoop re-create streaming success",
zap.String("target", c.target),
)
c.client = streamClient
lysu marked this conversation as resolved.
Show resolved Hide resolved
return true
}
logutil.BgLogger().Error(
"batchRecvLoop re-create streaming fail",
zap.String("target", c.target),
zap.Error(err),
)
return false
}

func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) {
defer func() {
if r := recover(); r != nil {
Expand All @@ -129,8 +178,16 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) {
}()

for {
// When `conn.Close()` is called, `client.Recv()` will return an error.
resp, err := c.client.Recv()
var err error = nil
var resp *tikvpb.BatchCommandsResponse = nil
failpoint.Inject("gotErrorInRecvLoop", func(_ failpoint.Value) {
err = errors.New("injected error in batchRecvLoop")
})
if err == nil {
// When `conn.Close()` is called, `client.Recv()` will return an error.
resp, err = c.client.Recv()
hicqu marked this conversation as resolved.
Show resolved Hide resolved
}

if err != nil {
Copy link
Contributor

@lysu lysu Jul 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have a question: we need refine this error check in next, for error like "over max limit" should not recreate stream?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure, we can test it later.

now := time.Now()
for { // try to re-create the streaming in the loop.
Expand All @@ -143,28 +200,10 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) {
zap.Error(err),
)

// Hold the lock to forbid batchSendLoop using the old client.
c.clientLock.Lock()
c.failPendingRequests(err) // fail all pending requests.

// Re-establish a application layer stream. TCP layer is handled by gRPC.
tikvClient := tikvpb.NewTikvClient(c.conn)
streamClient, err := tikvClient.BatchCommands(context.TODO())
c.clientLock.Unlock()

if err == nil {
logutil.BgLogger().Info(
"batchRecvLoop re-create streaming success",
zap.String("target", c.target),
)
c.client = streamClient
if c.reCreateStreamingClient(err) {
break
}
logutil.BgLogger().Error(
"batchRecvLoop re-create streaming fail",
zap.String("target", c.target),
zap.Error(err),
)

// TODO: Use a more smart backoff strategy.
time.Sleep(time.Second)
}
Expand Down Expand Up @@ -220,6 +259,7 @@ func newConnArray(maxSize uint, addr string, security config.Security, idleNotif

func (a *connArray) Init(addr string, security config.Security) error {
a.target = addr
a.pendingRequests = metrics.TiKVPendingBatchRequests.WithLabelValues(a.target)

opt := grpc.WithInsecure()
if len(security.ClusterSSLCA) != 0 {
Expand Down Expand Up @@ -450,7 +490,7 @@ func (a *connArray) batchSendLoop(cfg config.TiKVClient) {
requests = requests[:0]
requestIDs = requestIDs[:0]

metrics.TiKVPendingBatchRequests.Set(float64(len(a.batchCommandsCh)))
a.pendingRequests.Set(float64(len(a.batchCommandsCh)))
a.fetchAllPendingRequests(int(cfg.MaxBatchSize), &entries, &requests)

if len(entries) < int(cfg.MaxBatchSize) && cfg.MaxBatchWaitTime > 0 {
Expand Down Expand Up @@ -484,27 +524,12 @@ func (a *connArray) batchSendLoop(cfg config.TiKVClient) {
requestIDs = append(requestIDs, requestID)
}

request := &tikvpb.BatchCommandsRequest{
req := &tikvpb.BatchCommandsRequest{
Requests: requests,
RequestIds: requestIDs,
}

// Use the lock to protect the stream client won't be replaced by RecvLoop,
// and new added request won't be removed by `failPendingRequests`.
batchCommandsClient.clientLock.Lock()
for i, requestID := range request.RequestIds {
batchCommandsClient.batched.Store(requestID, entries[i])
}
err := batchCommandsClient.client.Send(request)
batchCommandsClient.clientLock.Unlock()
if err != nil {
logutil.BgLogger().Error(
"batch commands send error",
zap.String("target", a.target),
zap.Error(err),
)
batchCommandsClient.failPendingRequests(err)
}
batchCommandsClient.send(req, entries)
}
}

Expand Down Expand Up @@ -547,6 +572,11 @@ func newRPCClient(security config.Security) *rpcClient {
}
}

// NewTestRPCClient is for some external tests.
func NewTestRPCClient() Client {
hicqu marked this conversation as resolved.
Show resolved Hide resolved
return newRPCClient(config.Security{})
}

func (c *rpcClient) getConnArray(addr string) (*connArray, error) {
c.RLock()
if c.isClosed {
Expand Down
53 changes: 53 additions & 0 deletions store/tikv/client_fail_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2016 PingCAP, Inc.
hicqu marked this conversation as resolved.
Show resolved Hide resolved
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package tikv

import (
"context"
"fmt"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
)

func (s *testClientSuite) TestPanicInRecvLoop(c *C) {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/panicInFailPendingRequests", `panic`), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/gotErrorInRecvLoop", `return("0")`), IsNil)

hicqu marked this conversation as resolved.
Show resolved Hide resolved
port := startMockTikvService()
c.Assert(port > 0, IsTrue)

config.GetGlobalConfig().TiKVClient.GrpcConnectionCount = 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better reset previous value after this test method finished.

addr := fmt.Sprintf("%s:%d", "127.0.0.1", port)
rpcClient := newRPCClient(config.Security{})

// Start batchRecvLoop, and it should panic in `failPendingRequests`.
_, err := rpcClient.getConnArray(addr)
c.Assert(err, IsNil)

time.Sleep(time.Second)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/panicInFailPendingRequests"), IsNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/gotErrorInRecvLoop"), IsNil)

req := &tikvrpc.Request{
Type: tikvrpc.CmdGet,
Get: &kvrpcpb.GetRequest{},
}
_, err = rpcClient.SendRequest(context.Background(), addr, req, time.Second)
c.Assert(err, IsNil)
}
71 changes: 71 additions & 0 deletions store/tikv/mock_tikv_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package tikv

import (
"fmt"
"net"
"time"

"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/tikvpb"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
"google.golang.org/grpc"
)

type server struct {
tikvpb.TikvServer
}

func (s *server) BatchCommands(ss tikvpb.Tikv_BatchCommandsServer) error {
for {
req, err := ss.Recv()
if err != nil {
logutil.BgLogger().Error("batch commands receive fail", zap.Error(err))
return err
}

responses := make([]*tikvpb.BatchCommandsResponse_Response, 0, len(req.GetRequestIds()))
for i := 0; i < len(req.GetRequestIds()); i++ {
responses = append(responses, &tikvpb.BatchCommandsResponse_Response{
Cmd: &tikvpb.BatchCommandsResponse_Response_Get{
Get: &kvrpcpb.GetResponse{
Value: []byte{'a', 'b', 'c'},
},
},
})
}

err = ss.Send(&tikvpb.BatchCommandsResponse{
Responses: responses,
RequestIds: req.GetRequestIds(),
})
if err != nil {
logutil.BgLogger().Error("batch commands send fail", zap.Error(err))
return err
}
}
}

// Try to start a gRPC server and retrun the binded port.
func startMockTikvService() int {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we need shutdown mock tikv service after test

for port := 40000; port < 50000; port++ {
lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", port))
if err != nil {
logutil.BgLogger().Error("can't listen", zap.Error(err))
continue
}
s := grpc.NewServer(grpc.ConnectionTimeout(time.Minute))
tikvpb.RegisterTikvServer(s, &server{})
go func() {
if err = s.Serve(lis); err != nil {
logutil.BgLogger().Error(
"can't serve gRPC requests",
zap.Error(err),
)
}
}()
return port
}
logutil.BgLogger().Error("can't start mock tikv service because no available ports")
return -1
}