Skip to content

Commit

Permalink
Add benchmark test
Browse files Browse the repository at this point in the history
Signed-off-by: Bin Shi <binshi.bing@gmail.com>
  • Loading branch information
binshi-bing committed Jun 7, 2023
1 parent bacbd0d commit 3625afb
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 4 deletions.
9 changes: 6 additions & 3 deletions pkg/utils/tsoutil/tso_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
"google.golang.org/grpc"
)

// Request is an interface wrapping tsopb.TsoRequest and pdpb.TsoRequest so
// they can be generally handled by the TSO dispatcher
// Request is an interface wrapping tsopb.TsoRequest and pdpb.TsoRequest
// so that they can be generally handled by the TSO dispatcher
type Request interface {
// getForwardedHost returns the forwarded host
getForwardedHost() string
Expand Down Expand Up @@ -107,7 +107,10 @@ func (r *TSOProtoRequest) sendResponseAsync(countSum, physical, firstLogical int
SuffixBits: suffixBits,
},
}
// Asynchronously send response back to the client. No blocking.
// Asynchronously send response back to the client. Though responseCh is a buffered channel
// with size 1, in TSO streaming process routine, it calls stream.Recv() followed by stream.Send()
// in a loop and strictly follows this order, so the responseCh is always empty and the outputting
// the response to the channel is always non-blocking.
select {
case <-r.grpcSvrStreamCtx.Done():
case r.responseCh <- response:
Expand Down
60 changes: 59 additions & 1 deletion tests/integrations/mcs/tso/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ func TestTSOProxy(t *testing.T) {
}

// Create multiple TSO client streams and each stream uses a different gRPC connection to simulate multiple clients.
clientCount := 1000
clientCount := 100
grpcClientConns := make([]*grpc.ClientConn, 0, clientCount)
streams := make([]pdpb.PD_TsoClient, 0, clientCount)
for i := 0; i < clientCount; i++ {
Expand All @@ -681,3 +681,61 @@ func TestTSOProxy(t *testing.T) {
conn.Close()
}
}

// BenchmarkTSOProxy benchmarks TSO proxy performance.
func BenchmarkTSOProxy(b *testing.B) {
re := require.New(b)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Create an API cluster with 1 server
apiCluster, err := tests.NewTestAPICluster(ctx, 1)
re.NoError(err)
defer apiCluster.Destroy()
err = apiCluster.RunInitialServers()
re.NoError(err)
leaderName := apiCluster.WaitLeader()
pdLeader := apiCluster.GetServer(leaderName)
backendEndpoints := pdLeader.GetAddr()

// Create a TSO cluster with 2 servers
tsoCluster, err := mcs.NewTestTSOCluster(ctx, 2, backendEndpoints)
re.NoError(err)
defer tsoCluster.Destroy()
tsoCluster.WaitForDefaultPrimaryServing(re)
tsoReq := &pdpb.TsoRequest{
Header: &pdpb.RequestHeader{ClusterId: pdLeader.GetClusterID()},
Count: 1,
}

// Create multiple TSO client streams and each stream uses a different gRPC connection to simulate multiple clients.
clientCount := 100
grpcClientConns := make([]*grpc.ClientConn, 0, clientCount)
streams := make([]pdpb.PD_TsoClient, 0, clientCount)
for i := 0; i < clientCount; i++ {
conn, err := grpc.Dial(strings.TrimPrefix(backendEndpoints, "http://"), grpc.WithInsecure())
re.NoError(err)
grpcClientConns = append(grpcClientConns, conn)
grpcPDClient := pdpb.NewPDClient(conn)
cctx, cancel := context.WithCancel(ctx)
defer cancel()
stream, err := grpcPDClient.Tso(cctx)
re.NoError(err)
streams = append(streams, stream)
}

// Benchmark TSO proxy
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := TSOProxy(streams, tsoReq)
re.NoError(err)
}
b.StopTimer()

for _, stream := range streams {
stream.CloseSend()
}
for _, conn := range grpcClientConns {
conn.Close()
}
}

0 comments on commit 3625afb

Please sign in to comment.