diff --git a/pkg/utils/tsoutil/tso_request.go b/pkg/utils/tsoutil/tso_request.go index 0ccaba91872d..92a0d3de222d 100644 --- a/pkg/utils/tsoutil/tso_request.go +++ b/pkg/utils/tsoutil/tso_request.go @@ -23,8 +23,8 @@ import ( "google.golang.org/grpc" ) -// Request is an interface wrapping tsopb.TsoRequest and pdpb.TsoRequest so -// they can be generally handled by the TSO dispatcher +// Request is an interface wrapping tsopb.TsoRequest and pdpb.TsoRequest +// so that they can be generally handled by the TSO dispatcher type Request interface { // getForwardedHost returns the forwarded host getForwardedHost() string @@ -107,7 +107,10 @@ func (r *TSOProtoRequest) sendResponseAsync(countSum, physical, firstLogical int SuffixBits: suffixBits, }, } - // Asynchronously send response back to the client. No blocking. + // Asynchronously send response back to the client. Though responseCh is a buffered channel + // with size 1, in TSO streaming process routine, it calls stream.Recv() followed by stream.Send() + // in a loop and strictly follows this order, so the responseCh is always empty and the outputting + // the response to the channel is always non-blocking. select { case <-r.grpcSvrStreamCtx.Done(): case r.responseCh <- response: diff --git a/tests/integrations/mcs/tso/server_test.go b/tests/integrations/mcs/tso/server_test.go index b0a099023c0f..01194f1ae325 100644 --- a/tests/integrations/mcs/tso/server_test.go +++ b/tests/integrations/mcs/tso/server_test.go @@ -656,7 +656,7 @@ func TestTSOProxy(t *testing.T) { } // Create multiple TSO client streams and each stream uses a different gRPC connection to simulate multiple clients. - clientCount := 1000 + clientCount := 100 grpcClientConns := make([]*grpc.ClientConn, 0, clientCount) streams := make([]pdpb.PD_TsoClient, 0, clientCount) for i := 0; i < clientCount; i++ { @@ -681,3 +681,61 @@ func TestTSOProxy(t *testing.T) { conn.Close() } } + +// BenchmarkTSOProxy benchmarks TSO proxy performance. +func BenchmarkTSOProxy(b *testing.B) { + re := require.New(b) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create an API cluster with 1 server + apiCluster, err := tests.NewTestAPICluster(ctx, 1) + re.NoError(err) + defer apiCluster.Destroy() + err = apiCluster.RunInitialServers() + re.NoError(err) + leaderName := apiCluster.WaitLeader() + pdLeader := apiCluster.GetServer(leaderName) + backendEndpoints := pdLeader.GetAddr() + + // Create a TSO cluster with 2 servers + tsoCluster, err := mcs.NewTestTSOCluster(ctx, 2, backendEndpoints) + re.NoError(err) + defer tsoCluster.Destroy() + tsoCluster.WaitForDefaultPrimaryServing(re) + tsoReq := &pdpb.TsoRequest{ + Header: &pdpb.RequestHeader{ClusterId: pdLeader.GetClusterID()}, + Count: 1, + } + + // Create multiple TSO client streams and each stream uses a different gRPC connection to simulate multiple clients. + clientCount := 100 + grpcClientConns := make([]*grpc.ClientConn, 0, clientCount) + streams := make([]pdpb.PD_TsoClient, 0, clientCount) + for i := 0; i < clientCount; i++ { + conn, err := grpc.Dial(strings.TrimPrefix(backendEndpoints, "http://"), grpc.WithInsecure()) + re.NoError(err) + grpcClientConns = append(grpcClientConns, conn) + grpcPDClient := pdpb.NewPDClient(conn) + cctx, cancel := context.WithCancel(ctx) + defer cancel() + stream, err := grpcPDClient.Tso(cctx) + re.NoError(err) + streams = append(streams, stream) + } + + // Benchmark TSO proxy + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := TSOProxy(streams, tsoReq) + re.NoError(err) + } + b.StopTimer() + + for _, stream := range streams { + stream.CloseSend() + } + for _, conn := range grpcClientConns { + conn.Close() + } +}