Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add gRPC connection monitor and fix related bug #665

Merged
merged 5 commits into from
Jan 31, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 106 additions & 7 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,30 +114,119 @@ type connArray struct {
target string

index uint32
v []*grpc.ClientConn
v []*monitoredConn
// streamTimeout binds with a background goroutine to process coprocessor streaming timeout.
streamTimeout chan *tikvrpc.Lease
dialTimeout time.Duration
// batchConn is not null when batch is enabled.
*batchConn
done chan struct{}

monitor *connMonitor
}

func newConnArray(maxSize uint, addr string, security config.Security,
idleNotify *uint32, enableBatch bool, dialTimeout time.Duration, opts []grpc.DialOption) (*connArray, error) {
idleNotify *uint32, enableBatch bool, dialTimeout time.Duration, m *connMonitor, opts []grpc.DialOption) (*connArray, error) {
a := &connArray{
index: 0,
v: make([]*grpc.ClientConn, maxSize),
v: make([]*monitoredConn, maxSize),
streamTimeout: make(chan *tikvrpc.Lease, 1024),
done: make(chan struct{}),
dialTimeout: dialTimeout,
monitor: m,
}
if err := a.Init(addr, security, idleNotify, enableBatch, opts...); err != nil {
return nil, err
}
return a, nil
}

type connMonitor struct {
m sync.Map
loopOnce sync.Once
stopOnce sync.Once
stop chan struct{}
}

func (c *connMonitor) AddConn(conn *monitoredConn) {
c.m.Store(conn.Name, conn)
}

func (c *connMonitor) RemoveConn(conn *monitoredConn) {
c.m.Delete(conn.Name)
for state := connectivity.Idle; state <= connectivity.Shutdown; state++ {
metrics.TiKVGrpcConnectionState.WithLabelValues(conn.Name, conn.Target(), state.String()).Set(0)
}
}

func (c *connMonitor) Start() {
c.loopOnce.Do(
func() {
c.stop = make(chan struct{})
go c.start()
},
)
}

func (c *connMonitor) Stop() {
c.stopOnce.Do(
func() {
if c.stop != nil {
close(c.stop)
}
},
)
}

func (c *connMonitor) start() {

ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
c.m.Range(func(_, value interface{}) bool {
conn := value.(*monitoredConn)
nowState := conn.GetState()
for state := connectivity.Idle; state <= connectivity.Shutdown; state++ {
if state == nowState {
metrics.TiKVGrpcConnectionState.WithLabelValues(conn.Name, conn.Target(), nowState.String()).Set(1)
} else {
metrics.TiKVGrpcConnectionState.WithLabelValues(conn.Name, conn.Target(), state.String()).Set(0)
}
}
return true
})
case <-c.stop:
return
}
}
}

type monitoredConn struct {
*grpc.ClientConn
Name string
}

func (a *connArray) monitoredDial(ctx context.Context, connName, target string, opts ...grpc.DialOption) (conn *monitoredConn, err error) {
conn = &monitoredConn{
Name: connName,
}
conn.ClientConn, err = grpc.DialContext(ctx, target, opts...)
if err != nil {
return nil, err
}
a.monitor.AddConn(conn)
return conn, nil
}

func (c *monitoredConn) Close() error {
if c.ClientConn != nil {
return c.ClientConn.Close()
}
return nil
}

func (a *connArray) Init(addr string, security config.Security, idleNotify *uint32, enableBatch bool, opts ...grpc.DialOption) error {
a.target = addr

Expand Down Expand Up @@ -198,11 +287,13 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint
}),
}, opts...)

conn, err := grpc.DialContext(
conn, err := a.monitoredDial(
ctx,
fmt.Sprintf("%s-%d", a.target, i),
addr,
opts...,
)

cancel()
if err != nil {
// Cleanup if the initialization fails.
Expand All @@ -214,7 +305,7 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint
if allowBatch {
batchClient := &batchCommandsClient{
target: a.target,
conn: conn,
conn: conn.ClientConn,
forwardedClients: make(map[string]*batchCommandsStream),
batched: sync.Map{},
epoch: 0,
Expand All @@ -237,7 +328,7 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint

func (a *connArray) Get() *grpc.ClientConn {
next := atomic.AddUint32(&a.index, 1) % uint32(len(a.v))
return a.v[next]
return a.v[next].ClientConn
}

func (a *connArray) Close() {
Expand All @@ -249,6 +340,9 @@ func (a *connArray) Close() {
if c != nil {
err := c.Close()
tikverr.Log(err)
if err == nil {
a.monitor.RemoveConn(c)
}
}
}

Expand Down Expand Up @@ -301,6 +395,8 @@ type RPCClient struct {
// Periodically check whether there is any connection that is idle and then close and remove these connections.
// Implement background cleanup.
isClosed bool

connMonitor *connMonitor
}

// NewRPCClient creates a client that manages connections and rpc calls with tikv-servers.
Expand All @@ -310,10 +406,12 @@ func NewRPCClient(opts ...Opt) *RPCClient {
option: &option{
dialTimeout: dialTimeout,
},
connMonitor: &connMonitor{},
}
for _, opt := range opts {
opt(cli.option)
}
cli.connMonitor.Start()
return cli
}

Expand Down Expand Up @@ -352,14 +450,14 @@ func (c *RPCClient) createConnArray(addr string, enableBatch bool, opts ...func(
for _, opt := range opts {
opt(&client)
}

array, err = newConnArray(
client.GrpcConnectionCount,
addr,
c.option.security,
&c.idleNotify,
enableBatch,
c.option.dialTimeout,
c.connMonitor,
c.option.gRPCDialOptions)

if err != nil {
Expand Down Expand Up @@ -663,6 +761,7 @@ func (c *RPCClient) getMPPStreamResponse(ctx context.Context, client tikvpb.Tikv
func (c *RPCClient) Close() error {
// TODO: add a unit test for SendRequest After Closed
c.closeConns()
c.connMonitor.Stop()
return nil
}

Expand Down
3 changes: 3 additions & 0 deletions internal/client/client_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,9 @@ func (c *batchCommandsClient) waitConnReady() (err error) {
if c.conn.GetState() == connectivity.Ready {
return
}
if c.conn.GetState() == connectivity.Idle {
c.conn.Connect()
}
start := time.Now()
defer func() {
metrics.TiKVBatchClientWaitEstablish.Observe(time.Since(start).Seconds())
Expand Down
5 changes: 2 additions & 3 deletions internal/client/client_fail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func TestPanicInRecvLoop(t *testing.T) {

addr := fmt.Sprintf("%s:%d", "127.0.0.1", port)
rpcClient := NewRPCClient()
defer rpcClient.Close()
rpcClient.option.dialTimeout = time.Second / 3

// Start batchRecvLoop, and it should panic in `failPendingRequests`.
Expand All @@ -77,8 +78,6 @@ func TestPanicInRecvLoop(t *testing.T) {
req = tikvrpc.NewRequest(tikvrpc.CmdEmpty, &tikvpb.BatchCommandsEmptyRequest{})
_, err = rpcClient.SendRequest(context.Background(), addr, req, time.Second*4)
assert.Nil(t, err)

rpcClient.closeConns()
}

func TestRecvErrorInMultipleRecvLoops(t *testing.T) {
Expand All @@ -94,7 +93,7 @@ func TestRecvErrorInMultipleRecvLoops(t *testing.T) {
conf.TiKVClient.GrpcConnectionCount = 1
})()
rpcClient := NewRPCClient()
defer rpcClient.closeConns()
defer rpcClient.Close()

// Create 4 BatchCommands streams.
prewriteReq := tikvrpc.NewRequest(tikvrpc.CmdPrewrite, &kvrpcpb.PrewriteRequest{})
Expand Down
8 changes: 5 additions & 3 deletions internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func TestConn(t *testing.T) {
})()

client := NewRPCClient()
defer client.Close()

addr := "127.0.0.1:6379"
conn1, err := client.getConnArray(addr, true)
Expand All @@ -88,6 +89,7 @@ func TestConn(t *testing.T) {

func TestGetConnAfterClose(t *testing.T) {
client := NewRPCClient()
defer client.Close()

addr := "127.0.0.1:6379"
connArray, err := client.getConnArray(addr, true)
Expand Down Expand Up @@ -116,6 +118,7 @@ func TestSendWhenReconnect(t *testing.T) {
require.True(t, port > 0)

rpcClient := NewRPCClient()
defer rpcClient.Close()
addr := fmt.Sprintf("%s:%d", "127.0.0.1", port)
conn, err := rpcClient.getConnArray(addr, true)
assert.Nil(t, err)
Expand All @@ -128,7 +131,6 @@ func TestSendWhenReconnect(t *testing.T) {
req := tikvrpc.NewRequest(tikvrpc.CmdEmpty, &tikvpb.BatchCommandsEmptyRequest{})
_, err = rpcClient.SendRequest(context.Background(), addr, req, 100*time.Second)
assert.True(t, err.Error() == "no available connections")
conn.Close()
server.Stop()
}

Expand Down Expand Up @@ -247,7 +249,7 @@ func TestForwardMetadataByUnaryCall(t *testing.T) {
conf.TiKVClient.GrpcConnectionCount = 1
})()
rpcClient := NewRPCClient()
defer rpcClient.closeConns()
defer rpcClient.Close()

var checkCnt uint64
// Check no corresponding metadata if ForwardedHost is empty.
Expand Down Expand Up @@ -316,7 +318,7 @@ func TestForwardMetadataByBatchCommands(t *testing.T) {
conf.TiKVClient.GrpcConnectionCount = 1
})()
rpcClient := NewRPCClient()
defer rpcClient.closeConns()
defer rpcClient.Close()

var checkCnt uint64
setCheckHandler := func(forwardedHost string) {
Expand Down
10 changes: 10 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ var (
TiKVReadThroughput prometheus.Histogram
TiKVUnsafeDestroyRangeFailuresCounterVec *prometheus.CounterVec
TiKVPrewriteAssertionUsageCounter *prometheus.CounterVec
TiKVGrpcConnectionState *prometheus.GaugeVec
)

// Label constants.
Expand Down Expand Up @@ -589,6 +590,14 @@ func initMetrics(namespace, subsystem string) {
Help: "Counter of assertions used in prewrite requests",
}, []string{LblType})

TiKVGrpcConnectionState = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "grpc_connection_state",
Help: "State of gRPC connection",
}, []string{"connection_id", "store_ip", "grpc_state"})

initShortcuts()
}

Expand Down Expand Up @@ -659,6 +668,7 @@ func RegisterMetrics() {
prometheus.MustRegister(TiKVReadThroughput)
prometheus.MustRegister(TiKVUnsafeDestroyRangeFailuresCounterVec)
prometheus.MustRegister(TiKVPrewriteAssertionUsageCounter)
prometheus.MustRegister(TiKVGrpcConnectionState)
}

// readCounter reads the value of a prometheus.Counter.
Expand Down