From faf2fc966d54e514efaa19731930ea524f483150 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Thu, 14 Sep 2023 00:04:07 +0800 Subject: [PATCH 1/4] add metrics Signed-off-by: crazycs520 --- internal/client/client_batch.go | 11 +++++++++-- metrics/metrics.go | 6 +++--- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/internal/client/client_batch.go b/internal/client/client_batch.go index b7159847b..a4075ebf4 100644 --- a/internal/client/client_batch.go +++ b/internal/client/client_batch.go @@ -70,6 +70,8 @@ type batchCommandsEntry struct { // canceled indicated the request is canceled or not. canceled int32 err error + start time.Time + target string } func (b *batchCommandsEntry) isCanceled() bool { @@ -100,6 +102,7 @@ func (b *batchCommandsBuilder) len() int { func (b *batchCommandsBuilder) push(entry *batchCommandsEntry) { b.entries = append(b.entries, entry) + metrics.TiKVBatchWaitDuration.WithLabelValues("wait-fetch", entry.target).Observe(float64(time.Since(entry.start))) } // build builds BatchCommandsRequests and calls collect() for each valid entry. @@ -109,6 +112,7 @@ func (b *batchCommandsBuilder) build( collect func(id uint64, e *batchCommandsEntry), ) (*tikvpb.BatchCommandsRequest, map[string]*tikvpb.BatchCommandsRequest) { for _, e := range b.entries { + metrics.TiKVBatchWaitDuration.WithLabelValues("wait-build-req", e.target).Observe(float64(time.Since(e.start))) if e.isCanceled() { continue } @@ -649,6 +653,7 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient, tikvTransport trace.Log(entry.ctx, "rpc", "received") } logutil.Eventf(entry.ctx, "receive %T response with other %d batched requests from %s", responses[i].GetCmd(), len(responses), c.target) + metrics.TiKVBatchWaitDuration.WithLabelValues("wait-resp", entry.target).Observe(float64(time.Since(entry.start))) if atomic.LoadInt32(&entry.canceled) == 0 { // Put the response only if the request is not canceled. entry.res <- responses[i] @@ -773,6 +778,7 @@ func sendBatchRequest( req *tikvpb.BatchCommandsRequest_Request, timeout time.Duration, ) (*tikvrpc.Response, error) { + start := time.Now() entry := &batchCommandsEntry{ ctx: ctx, req: req, @@ -780,11 +786,12 @@ func sendBatchRequest( forwardedHost: forwardedHost, canceled: 0, err: nil, + start: start, + target: addr, } timer := time.NewTimer(timeout) defer timer.Stop() - start := time.Now() select { case batchConn.batchCommandsCh <- entry: case <-ctx.Done(): @@ -795,7 +802,7 @@ func sendBatchRequest( return nil, errors.WithMessage(context.DeadlineExceeded, "wait sendLoop") } waitDuration := time.Since(start) - metrics.TiKVBatchWaitDuration.Observe(float64(waitDuration)) + metrics.TiKVBatchWaitDuration.WithLabelValues("wait-send", entry.target).Observe(float64(waitDuration)) select { case res, ok := <-entry.res: diff --git a/metrics/metrics.go b/metrics/metrics.go index 5c72f05b2..912b9b5bd 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -62,7 +62,7 @@ var ( TiKVLocalLatchWaitTimeHistogram prometheus.Histogram TiKVStatusDuration *prometheus.HistogramVec TiKVStatusCounter *prometheus.CounterVec - TiKVBatchWaitDuration prometheus.Histogram + TiKVBatchWaitDuration *prometheus.HistogramVec TiKVBatchSendLatency prometheus.Histogram TiKVBatchWaitOverLoad prometheus.Counter TiKVBatchPendingRequests *prometheus.HistogramVec @@ -333,7 +333,7 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { ConstLabels: constLabels, }, []string{LblResult}) - TiKVBatchWaitDuration = prometheus.NewHistogram( + TiKVBatchWaitDuration = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: namespace, Subsystem: subsystem, @@ -341,7 +341,7 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { Buckets: prometheus.ExponentialBuckets(1, 2, 34), // 1ns ~ 8s Help: "batch wait duration", ConstLabels: constLabels, - }) + }, []string{LblType, LblStore}) TiKVBatchSendLatency = prometheus.NewHistogram( prometheus.HistogramOpts{ From 7e8947553ca6c7ec5ed95e316d5f389012ea8903 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Thu, 14 Sep 2023 00:58:14 +0800 Subject: [PATCH 2/4] update metrics Signed-off-by: crazycs520 --- metrics/metrics.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metrics/metrics.go b/metrics/metrics.go index 912b9b5bd..579148031 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -338,7 +338,7 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { Namespace: namespace, Subsystem: subsystem, Name: "batch_wait_duration", - Buckets: prometheus.ExponentialBuckets(1, 2, 34), // 1ns ~ 8s + Buckets: prometheus.ExponentialBuckets(16, 2, 36), // 16ns ~ 549s Help: "batch wait duration", ConstLabels: constLabels, }, []string{LblType, LblStore}) From 50583eeddce4fe000732c93675bd6da8c7298cfc Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Thu, 14 Sep 2023 23:22:26 +0800 Subject: [PATCH 3/4] update metrics Signed-off-by: crazycs520 --- internal/client/client_batch.go | 21 +++++++++++++++------ metrics/metrics.go | 22 +++++++++++++++++----- 2 files changed, 32 insertions(+), 11 deletions(-) diff --git a/internal/client/client_batch.go b/internal/client/client_batch.go index a4075ebf4..f2a225b6f 100644 --- a/internal/client/client_batch.go +++ b/internal/client/client_batch.go @@ -71,7 +71,6 @@ type batchCommandsEntry struct { canceled int32 err error start time.Time - target string } func (b *batchCommandsEntry) isCanceled() bool { @@ -102,7 +101,6 @@ func (b *batchCommandsBuilder) len() int { func (b *batchCommandsBuilder) push(entry *batchCommandsEntry) { b.entries = append(b.entries, entry) - metrics.TiKVBatchWaitDuration.WithLabelValues("wait-fetch", entry.target).Observe(float64(time.Since(entry.start))) } // build builds BatchCommandsRequests and calls collect() for each valid entry. @@ -112,7 +110,6 @@ func (b *batchCommandsBuilder) build( collect func(id uint64, e *batchCommandsEntry), ) (*tikvpb.BatchCommandsRequest, map[string]*tikvpb.BatchCommandsRequest) { for _, e := range b.entries { - metrics.TiKVBatchWaitDuration.WithLabelValues("wait-build-req", e.target).Observe(float64(time.Since(e.start))) if e.isCanceled() { continue } @@ -382,11 +379,14 @@ func (a *batchConn) getClientAndSend() { } defer cli.unlockForSend() + now := time.Now() + batchCmdWaitToSendDuration := metrics.TiKVBatchCmdDuration.WithLabelValues("wait-to-send", target) req, forwardingReqs := a.reqBuilder.build(func(id uint64, e *batchCommandsEntry) { cli.batched.Store(id, e) if trace.IsEnabled() { trace.Log(e.ctx, "rpc", "send") } + batchCmdWaitToSendDuration.Observe(float64(now.Sub(e.start))) }) if req != nil { cli.send("", req) @@ -511,6 +511,14 @@ func (c *batchCommandsClient) isStopped() bool { } func (c *batchCommandsClient) send(forwardedHost string, req *tikvpb.BatchCommandsRequest) { + start := time.Now() + defer func() { + if forwardedHost == "" { + metrics.TiKVBatchConnSendDuration.WithLabelValues(c.target).Observe(time.Since(start).Seconds()) + } else { + metrics.TiKVBatchConnSendDuration.WithLabelValues(forwardedHost).Observe(time.Since(start).Seconds()) + } + }() err := c.initBatchClient(forwardedHost) if err != nil { logutil.BgLogger().Warn( @@ -616,6 +624,7 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient, tikvTransport } }() + batchCmdGotRespDuration := metrics.TiKVBatchCmdDuration.WithLabelValues("got-resp", c.target) epoch := atomic.LoadUint64(&c.epoch) for { resp, err := streamClient.recv() @@ -639,6 +648,7 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient, tikvTransport } responses := resp.GetResponses() + now := time.Now() for i, requestID := range resp.GetRequestIds() { value, ok := c.batched.Load(requestID) if !ok { @@ -653,7 +663,7 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient, tikvTransport trace.Log(entry.ctx, "rpc", "received") } logutil.Eventf(entry.ctx, "receive %T response with other %d batched requests from %s", responses[i].GetCmd(), len(responses), c.target) - metrics.TiKVBatchWaitDuration.WithLabelValues("wait-resp", entry.target).Observe(float64(time.Since(entry.start))) + batchCmdGotRespDuration.Observe(float64(now.Sub(entry.start))) if atomic.LoadInt32(&entry.canceled) == 0 { // Put the response only if the request is not canceled. entry.res <- responses[i] @@ -787,7 +797,6 @@ func sendBatchRequest( canceled: 0, err: nil, start: start, - target: addr, } timer := time.NewTimer(timeout) defer timer.Stop() @@ -802,7 +811,7 @@ func sendBatchRequest( return nil, errors.WithMessage(context.DeadlineExceeded, "wait sendLoop") } waitDuration := time.Since(start) - metrics.TiKVBatchWaitDuration.WithLabelValues("wait-send", entry.target).Observe(float64(waitDuration)) + metrics.TiKVBatchCmdDuration.WithLabelValues("send-to-chan", addr).Observe(float64(waitDuration)) select { case res, ok := <-entry.res: diff --git a/metrics/metrics.go b/metrics/metrics.go index 579148031..9328ebcd8 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -62,7 +62,8 @@ var ( TiKVLocalLatchWaitTimeHistogram prometheus.Histogram TiKVStatusDuration *prometheus.HistogramVec TiKVStatusCounter *prometheus.CounterVec - TiKVBatchWaitDuration *prometheus.HistogramVec + TiKVBatchConnSendDuration *prometheus.HistogramVec + TiKVBatchCmdDuration *prometheus.HistogramVec TiKVBatchSendLatency prometheus.Histogram TiKVBatchWaitOverLoad prometheus.Counter TiKVBatchPendingRequests *prometheus.HistogramVec @@ -333,13 +334,23 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { ConstLabels: constLabels, }, []string{LblResult}) - TiKVBatchWaitDuration = prometheus.NewHistogramVec( + TiKVBatchConnSendDuration = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "batch_wait_duration", + Name: "batch_conn_send_seconds", + Buckets: prometheus.ExponentialBuckets(0.0005, 2, 22), // 0.5ms ~ 1048s + Help: "batch conn send duration", + ConstLabels: constLabels, + }, []string{LblStore}) + + TiKVBatchCmdDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "batch_cmd_duration", Buckets: prometheus.ExponentialBuckets(16, 2, 36), // 16ns ~ 549s - Help: "batch wait duration", + Help: "batch cmd duration, unit is nanosecond", ConstLabels: constLabels, }, []string{LblType, LblStore}) @@ -767,7 +778,8 @@ func RegisterMetrics() { prometheus.MustRegister(TiKVLocalLatchWaitTimeHistogram) prometheus.MustRegister(TiKVStatusDuration) prometheus.MustRegister(TiKVStatusCounter) - prometheus.MustRegister(TiKVBatchWaitDuration) + prometheus.MustRegister(TiKVBatchConnSendDuration) + prometheus.MustRegister(TiKVBatchCmdDuration) prometheus.MustRegister(TiKVBatchSendLatency) prometheus.MustRegister(TiKVBatchRecvLatency) prometheus.MustRegister(TiKVBatchWaitOverLoad) From 49e4a52adc03639ef59bfaef3f14298da23212fc Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Fri, 15 Sep 2023 00:45:17 +0800 Subject: [PATCH 4/4] not use batch client when it is busy Signed-off-by: crazycs520 --- internal/client/client.go | 2 +- internal/client/client_batch.go | 22 +++++++++++++++++++++- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/internal/client/client.go b/internal/client/client.go index 98d847a90..3bf9b0ebf 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -620,7 +620,7 @@ func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.R // TiDB RPC server supports batch RPC, but batch connection will send heart beat, It's not necessary since // request to TiDB is not high frequency. - if config.GetGlobalConfig().TiKVClient.MaxBatchSize > 0 && enableBatch { + if config.GetGlobalConfig().TiKVClient.MaxBatchSize > 0 && enableBatch && !connArray.batchConn.isBusy(start.UnixNano()) { if batchReq := req.ToBatchCommandsRequest(); batchReq != nil { defer trace.StartRegion(ctx, req.Type.String()).End() return sendBatchRequest(ctx, addr, req.ForwardedHost, connArray.batchConn, batchReq, timeout) diff --git a/internal/client/client_batch.go b/internal/client/client_batch.go index f2a225b6f..eb58cb033 100644 --- a/internal/client/client_batch.go +++ b/internal/client/client_batch.go @@ -198,9 +198,16 @@ type batchConn struct { pendingRequests prometheus.Observer batchSize prometheus.Observer - index uint32 + index uint32 + state atomic.Int32 + startHandingTime atomic.Int64 } +var ( + batchConnIdle = int32(0) + batchConnHanding = int32(1) +) + func newBatchConn(connCount, maxBatchSize uint, idleNotify *uint32) *batchConn { return &batchConn{ batchCommandsCh: make(chan *batchCommandsEntry, maxBatchSize), @@ -217,6 +224,16 @@ func (a *batchConn) isIdle() bool { return atomic.LoadUint32(&a.idle) != 0 } +func (a *batchConn) isBusy(now int64) bool { + if len(a.batchCommandsCh) == cap(a.batchCommandsCh) { + return true + } + if a.state.Load() == batchConnHanding && (now-a.startHandingTime.Load()) > int64(time.Second) { + return true + } + return false +} + // fetchAllPendingRequests fetches all pending requests from the channel. func (a *batchConn) fetchAllPendingRequests( maxBatchSize int, @@ -312,6 +329,7 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) { bestBatchWaitSize := cfg.BatchWaitSize for { + a.state.Store(batchConnIdle) a.reqBuilder.reset() start := a.fetchAllPendingRequests(int(cfg.MaxBatchSize)) @@ -323,6 +341,8 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) { } } + a.state.Store(batchConnHanding) + a.startHandingTime.Store(start.UnixNano()) if a.reqBuilder.len() < int(cfg.MaxBatchSize) && cfg.MaxBatchWaitTime > 0 { // If the target TiKV is overload, wait a while to collect more requests. if atomic.LoadUint64(&a.tikvTransportLayerLoad) >= uint64(cfg.OverloadThreshold) {