From cb0a479556e92d5e685ec49d87fc33e5c4c9d486 Mon Sep 17 00:00:00 2001 From: Vissa Janardhan Krishna Sai Date: Mon, 2 Dec 2024 13:58:39 +0000 Subject: [PATCH 01/12] Initializing StreamingInputCall and SteamingOutputCall in the stubserver --- internal/stubserver/stubserver.go | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/internal/stubserver/stubserver.go b/internal/stubserver/stubserver.go index 2e404e294bf6..3c3f4fb067f2 100644 --- a/internal/stubserver/stubserver.go +++ b/internal/stubserver/stubserver.go @@ -56,9 +56,11 @@ type StubServer struct { testgrpc.TestServiceServer // Customizable implementations of server handlers. - EmptyCallF func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) - UnaryCallF func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) - FullDuplexCallF func(stream testgrpc.TestService_FullDuplexCallServer) error + EmptyCallF func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) + UnaryCallF func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) + FullDuplexCallF func(stream testgrpc.TestService_FullDuplexCallServer) error + StreamingInputCallF func(stream testgrpc.TestService_StreamingInputCallServer) error + StreamingOutputCallF func(req *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error // A client connected to this service the test may use. Created in Start(). Client testgrpc.TestServiceClient @@ -101,6 +103,16 @@ func (ss *StubServer) FullDuplexCall(stream testgrpc.TestService_FullDuplexCallS return ss.FullDuplexCallF(stream) } +// StreamingInputCall is the handler for testpb.StreamingInputCall +func (ss *StubServer) StreamingInputCall(stream testgrpc.TestService_StreamingInputCallServer) error { + return ss.StreamingInputCallF(stream) +} + +// StreamingOutputCall is the handler for testpb.StreamingOutputCall +func (ss *StubServer) StreamingOutputCall(req *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error { + return ss.StreamingOutputCallF(req, stream) +} + // Start starts the server and creates a client connected to it. func (ss *StubServer) Start(sopts []grpc.ServerOption, dopts ...grpc.DialOption) error { if err := ss.StartServer(sopts...); err != nil { From d364fdc3d9e0ed208343a1c78f3a05c4e424d851 Mon Sep 17 00:00:00 2001 From: Vissa Janardhan Krishna Sai Date: Tue, 3 Dec 2024 07:00:39 +0000 Subject: [PATCH 02/12] switching to stubserver in tests instead of testserviceimpl --- authz/grpc_authz_end2end_test.go | 116 ++++++++++++++++++++++--------- 1 file changed, 82 insertions(+), 34 deletions(-) diff --git a/authz/grpc_authz_end2end_test.go b/authz/grpc_authz_end2end_test.go index 4e798f7ca3d7..8008a2275d40 100644 --- a/authz/grpc_authz_end2end_test.go +++ b/authz/grpc_authz_end2end_test.go @@ -34,6 +34,7 @@ import ( "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal/grpctest" + "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "google.golang.org/grpc/testdata" @@ -42,26 +43,6 @@ import ( testpb "google.golang.org/grpc/interop/grpc_testing" ) -type testServer struct { - testgrpc.UnimplementedTestServiceServer -} - -func (s *testServer) UnaryCall(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { - return &testpb.SimpleResponse{}, nil -} - -func (s *testServer) StreamingInputCall(stream testgrpc.TestService_StreamingInputCallServer) error { - for { - _, err := stream.Recv() - if err == io.EOF { - return stream.SendAndClose(&testpb.StreamingInputCallResponse{}) - } - if err != nil { - return err - } - } -} - type s struct { grpctest.Tester } @@ -317,13 +298,30 @@ func (s) TestStaticPolicyEnd2End(t *testing.T) { grpc.ChainUnaryInterceptor(i.UnaryInterceptor), grpc.ChainStreamInterceptor(i.StreamInterceptor)) defer s.Stop() - testgrpc.RegisterTestServiceServer(s, &testServer{}) lis, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("error listening: %v", err) } - go s.Serve(lis) + stub := &stubserver.StubServer{ + Listener: lis, + UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return &testpb.SimpleResponse{}, nil + }, + StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error { + for { + _, err := stream.Recv() + if err == io.EOF { + return stream.SendAndClose(&testpb.StreamingInputCallResponse{}) + } + if err != nil { + return err + } + } + }, + } + stub.S = s + stubserver.StartTestService(t, stub) // Establish a connection to the server. clientConn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -387,13 +385,19 @@ func (s) TestAllowsRPCRequestWithPrincipalsFieldOnTLSAuthenticatedConnection(t * grpc.Creds(creds), grpc.ChainUnaryInterceptor(i.UnaryInterceptor)) defer s.Stop() - testgrpc.RegisterTestServiceServer(s, &testServer{}) lis, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("error listening: %v", err) } - go s.Serve(lis) + stub := &stubserver.StubServer{ + Listener: lis, + UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return &testpb.SimpleResponse{}, nil + }, + } + stub.S = s + stubserver.StartTestService(t, stub) // Establish a connection to the server. creds, err = credentials.NewClientTLSFromFile(testdata.Path("x509/server_ca_cert.pem"), "x.test.example.com") @@ -452,13 +456,19 @@ func (s) TestAllowsRPCRequestWithPrincipalsFieldOnMTLSAuthenticatedConnection(t grpc.Creds(creds), grpc.ChainUnaryInterceptor(i.UnaryInterceptor)) defer s.Stop() - testgrpc.RegisterTestServiceServer(s, &testServer{}) lis, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("error listening: %v", err) } - go s.Serve(lis) + stub := &stubserver.StubServer{ + Listener: lis, + UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return &testpb.SimpleResponse{}, nil + }, + } + stub.S = s + stubserver.StartTestService(t, stub) // Establish a connection to the server. cert, err = tls.LoadX509KeyPair(testdata.Path("x509/client1_cert.pem"), testdata.Path("x509/client1_key.pem")) @@ -506,14 +516,31 @@ func (s) TestFileWatcherEnd2End(t *testing.T) { grpc.ChainUnaryInterceptor(i.UnaryInterceptor), grpc.ChainStreamInterceptor(i.StreamInterceptor)) defer s.Stop() - testgrpc.RegisterTestServiceServer(s, &testServer{}) lis, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("error listening: %v", err) } defer lis.Close() - go s.Serve(lis) + stub := &stubserver.StubServer{ + Listener: lis, + UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return &testpb.SimpleResponse{}, nil + }, + StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error { + for { + _, err := stream.Recv() + if err == io.EOF { + return stream.SendAndClose(&testpb.StreamingInputCallResponse{}) + } + if err != nil { + return err + } + } + }, + } + stub.S = s + stubserver.StartTestService(t, stub) // Establish a connection to the server. clientConn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -575,14 +602,21 @@ func (s) TestFileWatcher_ValidPolicyRefresh(t *testing.T) { s := grpc.NewServer( grpc.ChainUnaryInterceptor(i.UnaryInterceptor)) defer s.Stop() - testgrpc.RegisterTestServiceServer(s, &testServer{}) lis, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("error listening: %v", err) } defer lis.Close() - go s.Serve(lis) + + stub := &stubserver.StubServer{ + Listener: lis, + UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return &testpb.SimpleResponse{}, nil + }, + } + stub.S = s + stubserver.StartTestService(t, stub) // Establish a connection to the server. clientConn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -623,14 +657,21 @@ func (s) TestFileWatcher_InvalidPolicySkipReload(t *testing.T) { s := grpc.NewServer( grpc.ChainUnaryInterceptor(i.UnaryInterceptor)) defer s.Stop() - testgrpc.RegisterTestServiceServer(s, &testServer{}) lis, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("error listening: %v", err) } defer lis.Close() - go s.Serve(lis) + + stub := &stubserver.StubServer{ + Listener: lis, + UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return &testpb.SimpleResponse{}, nil + }, + } + stub.S = s + stubserver.StartTestService(t, stub) // Establish a connection to the server. clientConn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -674,14 +715,21 @@ func (s) TestFileWatcher_RecoversFromReloadFailure(t *testing.T) { s := grpc.NewServer( grpc.ChainUnaryInterceptor(i.UnaryInterceptor)) defer s.Stop() - testgrpc.RegisterTestServiceServer(s, &testServer{}) lis, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("error listening: %v", err) } defer lis.Close() - go s.Serve(lis) + + stub := &stubserver.StubServer{ + Listener: lis, + UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return &testpb.SimpleResponse{}, nil + }, + } + stub.S = s + stubserver.StartTestService(t, stub) // Establish a connection to the server. clientConn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) From 309abd15146c379dd73e53fcb9c064780d577c3e Mon Sep 17 00:00:00 2001 From: Vissa Janardhan Krishna Sai Date: Tue, 3 Dec 2024 12:02:40 +0000 Subject: [PATCH 03/12] switching to stubserver instead of testservice --- authz/audit/audit_logging_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/authz/audit/audit_logging_test.go b/authz/audit/audit_logging_test.go index ea84db099608..2344cc12329f 100644 --- a/authz/audit/audit_logging_test.go +++ b/authz/audit/audit_logging_test.go @@ -271,7 +271,8 @@ func (s) TestAuditLogger(t *testing.T) { grpc.ChainUnaryInterceptor(i.UnaryInterceptor), grpc.ChainStreamInterceptor(i.StreamInterceptor)) defer s.Stop() - testgrpc.RegisterTestServiceServer(s, ss) + ss.S = s + stubserver.StartTestService(t, ss) lis, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("Error listening: %v", err) From fa74a99642833e691eec69eeb5675ae1ee1ff633 Mon Sep 17 00:00:00 2001 From: Vissa Janardhan Krishna Sai Date: Fri, 6 Dec 2024 06:42:57 +0000 Subject: [PATCH 04/12] removing redundant server code --- authz/audit/audit_logging_test.go | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/authz/audit/audit_logging_test.go b/authz/audit/audit_logging_test.go index 2344cc12329f..5db4487e227f 100644 --- a/authz/audit/audit_logging_test.go +++ b/authz/audit/audit_logging_test.go @@ -24,7 +24,6 @@ import ( "crypto/x509" "encoding/json" "io" - "net" "os" "testing" "time" @@ -273,16 +272,11 @@ func (s) TestAuditLogger(t *testing.T) { defer s.Stop() ss.S = s stubserver.StartTestService(t, ss) - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("Error listening: %v", err) - } - go s.Serve(lis) // Setup gRPC test client with certificates containing a SPIFFE Id. - clientConn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(clientCreds)) + clientConn, err := grpc.NewClient(ss.Address, grpc.WithTransportCredentials(clientCreds)) if err != nil { - t.Fatalf("grpc.NewClient(%v) failed: %v", lis.Addr().String(), err) + t.Fatalf("grpc.NewClient(%v) failed: %v", ss.Address, err) } defer clientConn.Close() client := testgrpc.NewTestServiceClient(clientConn) From 1e7a099279df1f99ccd3e6518cd4b00a0b88a689 Mon Sep 17 00:00:00 2001 From: Vissa Janardhan Krishna Sai Date: Wed, 18 Dec 2024 09:49:28 +0000 Subject: [PATCH 05/12] renaming streamingcalls --- authz/grpc_authz_end2end_test.go | 4 ++-- internal/stubserver/stubserver.go | 14 +++++++------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/authz/grpc_authz_end2end_test.go b/authz/grpc_authz_end2end_test.go index 8008a2275d40..24ceacd43d81 100644 --- a/authz/grpc_authz_end2end_test.go +++ b/authz/grpc_authz_end2end_test.go @@ -308,7 +308,7 @@ func (s) TestStaticPolicyEnd2End(t *testing.T) { UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{}, nil }, - StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error { + ClientStreamingInputCall: func(stream testgrpc.TestService_StreamingInputCallServer) error { for { _, err := stream.Recv() if err == io.EOF { @@ -527,7 +527,7 @@ func (s) TestFileWatcherEnd2End(t *testing.T) { UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{}, nil }, - StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error { + ClientStreamingInputCall: func(stream testgrpc.TestService_StreamingInputCallServer) error { for { _, err := stream.Recv() if err == io.EOF { diff --git a/internal/stubserver/stubserver.go b/internal/stubserver/stubserver.go index 3c3f4fb067f2..a9d766a31a39 100644 --- a/internal/stubserver/stubserver.go +++ b/internal/stubserver/stubserver.go @@ -56,11 +56,11 @@ type StubServer struct { testgrpc.TestServiceServer // Customizable implementations of server handlers. - EmptyCallF func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) - UnaryCallF func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) - FullDuplexCallF func(stream testgrpc.TestService_FullDuplexCallServer) error - StreamingInputCallF func(stream testgrpc.TestService_StreamingInputCallServer) error - StreamingOutputCallF func(req *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error + EmptyCallF func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) + UnaryCallF func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) + FullDuplexCallF func(stream testgrpc.TestService_FullDuplexCallServer) error + ClientStreamingInputCall func(stream testgrpc.TestService_StreamingInputCallServer) error + ClientStreamingOutputCall func(req *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error // A client connected to this service the test may use. Created in Start(). Client testgrpc.TestServiceClient @@ -105,12 +105,12 @@ func (ss *StubServer) FullDuplexCall(stream testgrpc.TestService_FullDuplexCallS // StreamingInputCall is the handler for testpb.StreamingInputCall func (ss *StubServer) StreamingInputCall(stream testgrpc.TestService_StreamingInputCallServer) error { - return ss.StreamingInputCallF(stream) + return ss.ClientStreamingInputCall(stream) } // StreamingOutputCall is the handler for testpb.StreamingOutputCall func (ss *StubServer) StreamingOutputCall(req *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error { - return ss.StreamingOutputCallF(req, stream) + return ss.ClientStreamingOutputCall(req, stream) } // Start starts the server and creates a client connected to it. From 924a9e74781cbc9d9e672da86f8aeaa2c46ec472 Mon Sep 17 00:00:00 2001 From: Vissa Janardhan Krishna Sai Date: Wed, 18 Dec 2024 10:28:28 +0000 Subject: [PATCH 06/12] moving initialization of NewServer inside stub --- authz/grpc_authz_end2end_test.go | 70 +++++++++++++------------------- 1 file changed, 29 insertions(+), 41 deletions(-) diff --git a/authz/grpc_authz_end2end_test.go b/authz/grpc_authz_end2end_test.go index 24ceacd43d81..45dcc30f69f9 100644 --- a/authz/grpc_authz_end2end_test.go +++ b/authz/grpc_authz_end2end_test.go @@ -294,10 +294,6 @@ func (s) TestStaticPolicyEnd2End(t *testing.T) { t.Run(name, func(t *testing.T) { // Start a gRPC server with gRPC authz unary and stream server interceptors. i, _ := authz.NewStatic(test.authzPolicy) - s := grpc.NewServer( - grpc.ChainUnaryInterceptor(i.UnaryInterceptor), - grpc.ChainStreamInterceptor(i.StreamInterceptor)) - defer s.Stop() lis, err := net.Listen("tcp", "localhost:0") if err != nil { @@ -319,9 +315,12 @@ func (s) TestStaticPolicyEnd2End(t *testing.T) { } } }, + S: grpc.NewServer( + grpc.ChainUnaryInterceptor(i.UnaryInterceptor), + grpc.ChainStreamInterceptor(i.StreamInterceptor)), } - stub.S = s stubserver.StartTestService(t, stub) + defer stub.S.Stop() // Establish a connection to the server. clientConn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -381,10 +380,6 @@ func (s) TestAllowsRPCRequestWithPrincipalsFieldOnTLSAuthenticatedConnection(t * if err != nil { t.Fatalf("failed to generate credentials: %v", err) } - s := grpc.NewServer( - grpc.Creds(creds), - grpc.ChainUnaryInterceptor(i.UnaryInterceptor)) - defer s.Stop() lis, err := net.Listen("tcp", "localhost:0") if err != nil { @@ -395,9 +390,12 @@ func (s) TestAllowsRPCRequestWithPrincipalsFieldOnTLSAuthenticatedConnection(t * UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{}, nil }, + S: grpc.NewServer( + grpc.Creds(creds), + grpc.ChainUnaryInterceptor(i.UnaryInterceptor)), } - stub.S = s stubserver.StartTestService(t, stub) + defer stub.S.Stop() // Establish a connection to the server. creds, err = credentials.NewClientTLSFromFile(testdata.Path("x509/server_ca_cert.pem"), "x.test.example.com") @@ -452,10 +450,6 @@ func (s) TestAllowsRPCRequestWithPrincipalsFieldOnMTLSAuthenticatedConnection(t Certificates: []tls.Certificate{cert}, ClientCAs: certPool, }) - s := grpc.NewServer( - grpc.Creds(creds), - grpc.ChainUnaryInterceptor(i.UnaryInterceptor)) - defer s.Stop() lis, err := net.Listen("tcp", "localhost:0") if err != nil { @@ -466,9 +460,12 @@ func (s) TestAllowsRPCRequestWithPrincipalsFieldOnMTLSAuthenticatedConnection(t UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{}, nil }, + S: grpc.NewServer( + grpc.Creds(creds), + grpc.ChainUnaryInterceptor(i.UnaryInterceptor)), } - stub.S = s stubserver.StartTestService(t, stub) + defer stub.S.Stop() // Establish a connection to the server. cert, err = tls.LoadX509KeyPair(testdata.Path("x509/client1_cert.pem"), testdata.Path("x509/client1_key.pem")) @@ -511,12 +508,6 @@ func (s) TestFileWatcherEnd2End(t *testing.T) { i, _ := authz.NewFileWatcher(file, 1*time.Second) defer i.Close() - // Start a gRPC server with gRPC authz unary and stream server interceptors. - s := grpc.NewServer( - grpc.ChainUnaryInterceptor(i.UnaryInterceptor), - grpc.ChainStreamInterceptor(i.StreamInterceptor)) - defer s.Stop() - lis, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("error listening: %v", err) @@ -538,9 +529,13 @@ func (s) TestFileWatcherEnd2End(t *testing.T) { } } }, + // Start a gRPC server with gRPC authz unary and stream server interceptors. + S: grpc.NewServer( + grpc.ChainUnaryInterceptor(i.UnaryInterceptor), + grpc.ChainStreamInterceptor(i.StreamInterceptor)), } - stub.S = s stubserver.StartTestService(t, stub) + defer stub.S.Stop() // Establish a connection to the server. clientConn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -598,11 +593,6 @@ func (s) TestFileWatcher_ValidPolicyRefresh(t *testing.T) { i, _ := authz.NewFileWatcher(file, 100*time.Millisecond) defer i.Close() - // Start a gRPC server with gRPC authz unary server interceptor. - s := grpc.NewServer( - grpc.ChainUnaryInterceptor(i.UnaryInterceptor)) - defer s.Stop() - lis, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("error listening: %v", err) @@ -614,9 +604,12 @@ func (s) TestFileWatcher_ValidPolicyRefresh(t *testing.T) { UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{}, nil }, + // Start a gRPC server with gRPC authz unary server interceptor. + S: grpc.NewServer( + grpc.ChainUnaryInterceptor(i.UnaryInterceptor)), } - stub.S = s stubserver.StartTestService(t, stub) + defer stub.S.Stop() // Establish a connection to the server. clientConn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -653,11 +646,6 @@ func (s) TestFileWatcher_InvalidPolicySkipReload(t *testing.T) { i, _ := authz.NewFileWatcher(file, 20*time.Millisecond) defer i.Close() - // Start a gRPC server with gRPC authz unary server interceptors. - s := grpc.NewServer( - grpc.ChainUnaryInterceptor(i.UnaryInterceptor)) - defer s.Stop() - lis, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("error listening: %v", err) @@ -669,9 +657,12 @@ func (s) TestFileWatcher_InvalidPolicySkipReload(t *testing.T) { UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{}, nil }, + // Start a gRPC server with gRPC authz unary server interceptors. + S: grpc.NewServer( + grpc.ChainUnaryInterceptor(i.UnaryInterceptor)), } - stub.S = s stubserver.StartTestService(t, stub) + defer stub.S.Stop() // Establish a connection to the server. clientConn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -705,17 +696,12 @@ func (s) TestFileWatcher_InvalidPolicySkipReload(t *testing.T) { } } -func (s) TestFileWatcher_RecoversFromReloadFailure(t *testing.T) { +func TestFileWatcher_RecoversFromReloadFailure(t *testing.T) { valid1 := authzTests["DeniesRPCMatchInDenyAndAllow"] file := createTmpPolicyFile(t, "recovers_from_reload_failure", []byte(valid1.authzPolicy)) i, _ := authz.NewFileWatcher(file, 100*time.Millisecond) defer i.Close() - // Start a gRPC server with gRPC authz unary server interceptors. - s := grpc.NewServer( - grpc.ChainUnaryInterceptor(i.UnaryInterceptor)) - defer s.Stop() - lis, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("error listening: %v", err) @@ -727,9 +713,11 @@ func (s) TestFileWatcher_RecoversFromReloadFailure(t *testing.T) { UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{}, nil }, + S: grpc.NewServer( + grpc.ChainUnaryInterceptor(i.UnaryInterceptor)), } - stub.S = s stubserver.StartTestService(t, stub) + defer stub.S.Stop() // Establish a connection to the server. clientConn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) From ecdab23b72f154d6e6efa74abf06574526121fd4 Mon Sep 17 00:00:00 2001 From: Vissa Janardhan Krishna Sai Date: Fri, 20 Dec 2024 09:55:48 +0000 Subject: [PATCH 07/12] renaming client and server handlers --- authz/grpc_authz_end2end_test.go | 4 ++-- internal/stubserver/stubserver.go | 14 +++++++------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/authz/grpc_authz_end2end_test.go b/authz/grpc_authz_end2end_test.go index 45dcc30f69f9..4c0340111cd8 100644 --- a/authz/grpc_authz_end2end_test.go +++ b/authz/grpc_authz_end2end_test.go @@ -304,7 +304,7 @@ func (s) TestStaticPolicyEnd2End(t *testing.T) { UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{}, nil }, - ClientStreamingInputCall: func(stream testgrpc.TestService_StreamingInputCallServer) error { + StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error { for { _, err := stream.Recv() if err == io.EOF { @@ -518,7 +518,7 @@ func (s) TestFileWatcherEnd2End(t *testing.T) { UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{}, nil }, - ClientStreamingInputCall: func(stream testgrpc.TestService_StreamingInputCallServer) error { + StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error { for { _, err := stream.Recv() if err == io.EOF { diff --git a/internal/stubserver/stubserver.go b/internal/stubserver/stubserver.go index a9d766a31a39..1c92793d8df6 100644 --- a/internal/stubserver/stubserver.go +++ b/internal/stubserver/stubserver.go @@ -56,11 +56,11 @@ type StubServer struct { testgrpc.TestServiceServer // Customizable implementations of server handlers. - EmptyCallF func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) - UnaryCallF func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) - FullDuplexCallF func(stream testgrpc.TestService_FullDuplexCallServer) error - ClientStreamingInputCall func(stream testgrpc.TestService_StreamingInputCallServer) error - ClientStreamingOutputCall func(req *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error + EmptyCallF func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) + UnaryCallF func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) + FullDuplexCallF func(stream testgrpc.TestService_FullDuplexCallServer) error + StreamingInputCallF func(stream testgrpc.TestService_StreamingInputCallServer) error // ClientStreaming + StreamingOutputCallF func(req *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error // ServerStreaming // A client connected to this service the test may use. Created in Start(). Client testgrpc.TestServiceClient @@ -105,12 +105,12 @@ func (ss *StubServer) FullDuplexCall(stream testgrpc.TestService_FullDuplexCallS // StreamingInputCall is the handler for testpb.StreamingInputCall func (ss *StubServer) StreamingInputCall(stream testgrpc.TestService_StreamingInputCallServer) error { - return ss.ClientStreamingInputCall(stream) + return ss.StreamingInputCallF(stream) } // StreamingOutputCall is the handler for testpb.StreamingOutputCall func (ss *StubServer) StreamingOutputCall(req *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error { - return ss.ClientStreamingOutputCall(req, stream) + return ss.StreamingOutputCallF(req, stream) } // Start starts the server and creates a client connected to it. From 72879839ecb0434f9c91b7182a936fa2daa48635 Mon Sep 17 00:00:00 2001 From: Vissa Janardhan Krishna Sai Date: Fri, 20 Dec 2024 10:10:26 +0000 Subject: [PATCH 08/12] updating grpc_authz_end2end_test.go --- authz/grpc_authz_end2end_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/authz/grpc_authz_end2end_test.go b/authz/grpc_authz_end2end_test.go index 4c0340111cd8..d54c88d67ff6 100644 --- a/authz/grpc_authz_end2end_test.go +++ b/authz/grpc_authz_end2end_test.go @@ -696,7 +696,7 @@ func (s) TestFileWatcher_InvalidPolicySkipReload(t *testing.T) { } } -func TestFileWatcher_RecoversFromReloadFailure(t *testing.T) { +func (s) TestFileWatcher_RecoversFromReloadFailure(t *testing.T) { valid1 := authzTests["DeniesRPCMatchInDenyAndAllow"] file := createTmpPolicyFile(t, "recovers_from_reload_failure", []byte(valid1.authzPolicy)) i, _ := authz.NewFileWatcher(file, 100*time.Millisecond) From 4588fc111159824ce72e97d9590481d2dc8efe75 Mon Sep 17 00:00:00 2001 From: Vissa Janardhan Krishna Sai Date: Sun, 22 Dec 2024 06:23:31 +0000 Subject: [PATCH 09/12] updating client and server streaming comments --- internal/stubserver/stubserver.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/stubserver/stubserver.go b/internal/stubserver/stubserver.go index 1c92793d8df6..92262af877a6 100644 --- a/internal/stubserver/stubserver.go +++ b/internal/stubserver/stubserver.go @@ -59,8 +59,8 @@ type StubServer struct { EmptyCallF func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) UnaryCallF func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) FullDuplexCallF func(stream testgrpc.TestService_FullDuplexCallServer) error - StreamingInputCallF func(stream testgrpc.TestService_StreamingInputCallServer) error // ClientStreaming - StreamingOutputCallF func(req *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error // ServerStreaming + StreamingInputCallF func(stream testgrpc.TestService_StreamingInputCallServer) error // Client-Streaming request + StreamingOutputCallF func(req *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error // Server-streaming response // A client connected to this service the test may use. Created in Start(). Client testgrpc.TestServiceClient From 4298e86ab2788f20e6d69f37f521cd6904a34607 Mon Sep 17 00:00:00 2001 From: Vissa Janardhan Krishna Sai Date: Sun, 22 Dec 2024 06:32:33 +0000 Subject: [PATCH 10/12] removing empty lines for re-trigger --- authz/grpc_authz_end2end_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/authz/grpc_authz_end2end_test.go b/authz/grpc_authz_end2end_test.go index d54c88d67ff6..6ddc8dbf78f0 100644 --- a/authz/grpc_authz_end2end_test.go +++ b/authz/grpc_authz_end2end_test.go @@ -598,7 +598,6 @@ func (s) TestFileWatcher_ValidPolicyRefresh(t *testing.T) { t.Fatalf("error listening: %v", err) } defer lis.Close() - stub := &stubserver.StubServer{ Listener: lis, UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { @@ -651,7 +650,6 @@ func (s) TestFileWatcher_InvalidPolicySkipReload(t *testing.T) { t.Fatalf("error listening: %v", err) } defer lis.Close() - stub := &stubserver.StubServer{ Listener: lis, UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { From 136cc82dd3fcbd6f25264cec6fd13a44f97ed4cd Mon Sep 17 00:00:00 2001 From: Vissa Janardhan Krishna Sai Date: Wed, 8 Jan 2025 08:00:26 +0000 Subject: [PATCH 11/12] removing explicit listener creation code --- authz/audit/audit_logging_test.go | 46 +++++----- authz/grpc_authz_end2end_test.go | 141 +++++++++--------------------- internal/stubserver/stubserver.go | 4 +- 3 files changed, 67 insertions(+), 124 deletions(-) diff --git a/authz/audit/audit_logging_test.go b/authz/audit/audit_logging_test.go index 5db4487e227f..b889852be2e1 100644 --- a/authz/audit/audit_logging_test.go +++ b/authz/audit/audit_logging_test.go @@ -88,6 +88,21 @@ func (*loggerBuilder) ParseLoggerConfig(config json.RawMessage) (audit.LoggerCon // and 'deny' outcomes. Additionally, it checks if SPIFFE ID from a certificate // is propagated correctly. func (s) TestAuditLogger(t *testing.T) { + // Construct the credentials for the tests and the stub server + serverCreds := loadServerCreds(t) + clientCreds := loadClientCreds(t) + ss := &stubserver.StubServer{ + UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return &testpb.SimpleResponse{}, nil + }, + FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error { + _, err := stream.Recv() + if err != io.EOF { + return err + } + return nil + }, + } // Each test data entry contains an authz policy for a grpc server, // how many 'allow' and 'deny' outcomes we expect (each test case makes 2 // unary calls and one client-streaming call), and a structure to check if @@ -239,21 +254,7 @@ func (s) TestAuditLogger(t *testing.T) { wantStreamingCallCode: codes.PermissionDenied, }, } - // Construct the credentials for the tests and the stub server - serverCreds := loadServerCreds(t) - clientCreds := loadClientCreds(t) - ss := &stubserver.StubServer{ - UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { - return &testpb.SimpleResponse{}, nil - }, - FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error { - _, err := stream.Recv() - if err != io.EOF { - return err - } - return nil - }, - } + for _, test := range tests { t.Run(test.name, func(t *testing.T) { // Setup test statAuditLogger, gRPC test server with authzPolicy, unary @@ -265,21 +266,18 @@ func (s) TestAuditLogger(t *testing.T) { audit.RegisterLoggerBuilder(lb) i, _ := authz.NewStatic(test.authzPolicy) - s := grpc.NewServer( - grpc.Creds(serverCreds), - grpc.ChainUnaryInterceptor(i.UnaryInterceptor), - grpc.ChainStreamInterceptor(i.StreamInterceptor)) + s := grpc.NewServer(grpc.Creds(serverCreds), grpc.ChainUnaryInterceptor(i.UnaryInterceptor), grpc.ChainStreamInterceptor(i.StreamInterceptor)) defer s.Stop() ss.S = s stubserver.StartTestService(t, ss) // Setup gRPC test client with certificates containing a SPIFFE Id. - clientConn, err := grpc.NewClient(ss.Address, grpc.WithTransportCredentials(clientCreds)) + cc, err := grpc.NewClient(ss.Address, grpc.WithTransportCredentials(clientCreds)) if err != nil { t.Fatalf("grpc.NewClient(%v) failed: %v", ss.Address, err) } - defer clientConn.Close() - client := testgrpc.NewTestServiceClient(clientConn) + defer cc.Close() + client := testgrpc.NewTestServiceClient(cc) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -291,7 +289,7 @@ func (s) TestAuditLogger(t *testing.T) { } stream, err := client.StreamingInputCall(ctx) if err != nil { - t.Fatalf("StreamingInputCall failed:%v", err) + t.Fatalf("StreamingInputCall failed: %v", err) } req := &testpb.StreamingInputCallRequest{ Payload: &testpb.Payload{ @@ -299,7 +297,7 @@ func (s) TestAuditLogger(t *testing.T) { }, } if err := stream.Send(req); err != nil && err != io.EOF { - t.Fatalf("stream.Send failed:%v", err) + t.Fatalf("stream.Send failed: %v", err) } if _, err := stream.CloseAndRecv(); status.Code(err) != test.wantStreamingCallCode { t.Errorf("Unexpected stream.CloseAndRecv fail: got %v want %v", err, test.wantStreamingCallCode) diff --git a/authz/grpc_authz_end2end_test.go b/authz/grpc_authz_end2end_test.go index 6ddc8dbf78f0..089dc0a0b467 100644 --- a/authz/grpc_authz_end2end_test.go +++ b/authz/grpc_authz_end2end_test.go @@ -23,7 +23,6 @@ import ( "crypto/tls" "crypto/x509" "io" - "net" "os" "testing" "time" @@ -295,12 +294,7 @@ func (s) TestStaticPolicyEnd2End(t *testing.T) { // Start a gRPC server with gRPC authz unary and stream server interceptors. i, _ := authz.NewStatic(test.authzPolicy) - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("error listening: %v", err) - } stub := &stubserver.StubServer{ - Listener: lis, UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{}, nil }, @@ -315,20 +309,18 @@ func (s) TestStaticPolicyEnd2End(t *testing.T) { } } }, - S: grpc.NewServer( - grpc.ChainUnaryInterceptor(i.UnaryInterceptor), - grpc.ChainStreamInterceptor(i.StreamInterceptor)), + S: grpc.NewServer(grpc.ChainUnaryInterceptor(i.UnaryInterceptor), grpc.ChainStreamInterceptor(i.StreamInterceptor)), } stubserver.StartTestService(t, stub) - defer stub.S.Stop() + defer stub.Stop() // Establish a connection to the server. - clientConn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + cc, err := grpc.NewClient(stub.Address, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { - t.Fatalf("grpc.NewClient(%v) failed: %v", lis.Addr().String(), err) + t.Fatalf("grpc.NewClient(%v) failed: %v", stub.Address, err) } - defer clientConn.Close() - client := testgrpc.NewTestServiceClient(clientConn) + defer cc.Close() + client := testgrpc.NewTestServiceClient(cc) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -381,18 +373,11 @@ func (s) TestAllowsRPCRequestWithPrincipalsFieldOnTLSAuthenticatedConnection(t * t.Fatalf("failed to generate credentials: %v", err) } - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("error listening: %v", err) - } stub := &stubserver.StubServer{ - Listener: lis, UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{}, nil }, - S: grpc.NewServer( - grpc.Creds(creds), - grpc.ChainUnaryInterceptor(i.UnaryInterceptor)), + S: grpc.NewServer(grpc.Creds(creds), grpc.ChainUnaryInterceptor(i.UnaryInterceptor)), } stubserver.StartTestService(t, stub) defer stub.S.Stop() @@ -402,12 +387,12 @@ func (s) TestAllowsRPCRequestWithPrincipalsFieldOnTLSAuthenticatedConnection(t * if err != nil { t.Fatalf("failed to load credentials: %v", err) } - clientConn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(creds)) + cc, err := grpc.NewClient(stub.Address, grpc.WithTransportCredentials(creds)) if err != nil { - t.Fatalf("grpc.NewClient(%v) failed: %v", lis.Addr().String(), err) + t.Fatalf("grpc.NewClient(%v) failed: %v", stub.Address, err) } - defer clientConn.Close() - client := testgrpc.NewTestServiceClient(clientConn) + defer cc.Close() + client := testgrpc.NewTestServiceClient(cc) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -450,22 +435,14 @@ func (s) TestAllowsRPCRequestWithPrincipalsFieldOnMTLSAuthenticatedConnection(t Certificates: []tls.Certificate{cert}, ClientCAs: certPool, }) - - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("error listening: %v", err) - } stub := &stubserver.StubServer{ - Listener: lis, UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{}, nil }, - S: grpc.NewServer( - grpc.Creds(creds), - grpc.ChainUnaryInterceptor(i.UnaryInterceptor)), + S: grpc.NewServer(grpc.Creds(creds), grpc.ChainUnaryInterceptor(i.UnaryInterceptor)), } stubserver.StartTestService(t, stub) - defer stub.S.Stop() + defer stub.Stop() // Establish a connection to the server. cert, err = tls.LoadX509KeyPair(testdata.Path("x509/client1_cert.pem"), testdata.Path("x509/client1_key.pem")) @@ -485,12 +462,12 @@ func (s) TestAllowsRPCRequestWithPrincipalsFieldOnMTLSAuthenticatedConnection(t RootCAs: roots, ServerName: "x.test.example.com", }) - clientConn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(creds)) + cc, err := grpc.NewClient(stub.Address, grpc.WithTransportCredentials(creds)) if err != nil { - t.Fatalf("grpc.NewClient(%v) failed: %v", lis.Addr().String(), err) + t.Fatalf("grpc.NewClient(%v) failed: %v", stub.Address, err) } - defer clientConn.Close() - client := testgrpc.NewTestServiceClient(clientConn) + defer cc.Close() + client := testgrpc.NewTestServiceClient(cc) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -508,13 +485,7 @@ func (s) TestFileWatcherEnd2End(t *testing.T) { i, _ := authz.NewFileWatcher(file, 1*time.Second) defer i.Close() - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("error listening: %v", err) - } - defer lis.Close() stub := &stubserver.StubServer{ - Listener: lis, UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{}, nil }, @@ -530,20 +501,18 @@ func (s) TestFileWatcherEnd2End(t *testing.T) { } }, // Start a gRPC server with gRPC authz unary and stream server interceptors. - S: grpc.NewServer( - grpc.ChainUnaryInterceptor(i.UnaryInterceptor), - grpc.ChainStreamInterceptor(i.StreamInterceptor)), + S: grpc.NewServer(grpc.ChainUnaryInterceptor(i.UnaryInterceptor), grpc.ChainStreamInterceptor(i.StreamInterceptor)), } stubserver.StartTestService(t, stub) - defer stub.S.Stop() + defer stub.Stop() // Establish a connection to the server. - clientConn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + cc, err := grpc.NewClient(stub.Address, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { - t.Fatalf("grpc.NewClient(%v) failed: %v", lis.Addr().String(), err) + t.Fatalf("grpc.NewClient(%v) failed: %v", stub.Address, err) } - defer clientConn.Close() - client := testgrpc.NewTestServiceClient(clientConn) + defer cc.Close() + client := testgrpc.NewTestServiceClient(cc) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -558,7 +527,7 @@ func (s) TestFileWatcherEnd2End(t *testing.T) { // Verifying authorization decision for Streaming RPC. stream, err := client.StreamingInputCall(ctx) if err != nil { - t.Fatalf("failed StreamingInputCall err: %v", err) + t.Fatalf("failed StreamingInputCall : %v", err) } req := &testpb.StreamingInputCallRequest{ Payload: &testpb.Payload{ @@ -566,7 +535,7 @@ func (s) TestFileWatcherEnd2End(t *testing.T) { }, } if err := stream.Send(req); err != nil && err != io.EOF { - t.Fatalf("failed stream.Send err: %v", err) + t.Fatalf("failed stream.Send : %v", err) } _, err = stream.CloseAndRecv() if got := status.Convert(err); got.Code() != test.wantStatus.Code() || got.Message() != test.wantStatus.Message() { @@ -593,30 +562,22 @@ func (s) TestFileWatcher_ValidPolicyRefresh(t *testing.T) { i, _ := authz.NewFileWatcher(file, 100*time.Millisecond) defer i.Close() - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("error listening: %v", err) - } - defer lis.Close() stub := &stubserver.StubServer{ - Listener: lis, UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{}, nil }, // Start a gRPC server with gRPC authz unary server interceptor. - S: grpc.NewServer( - grpc.ChainUnaryInterceptor(i.UnaryInterceptor)), - } + S: grpc.NewServer(grpc.ChainUnaryInterceptor(i.UnaryInterceptor))} stubserver.StartTestService(t, stub) - defer stub.S.Stop() + defer stub.Stop() // Establish a connection to the server. - clientConn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + cc, err := grpc.NewClient(stub.Address, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { - t.Fatalf("grpc.NewClient(%v) failed: %v", lis.Addr().String(), err) + t.Fatalf("grpc.NewClient(%v) failed: %v", stub.Address, err) } - defer clientConn.Close() - client := testgrpc.NewTestServiceClient(clientConn) + defer cc.Close() + client := testgrpc.NewTestServiceClient(cc) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -645,30 +606,23 @@ func (s) TestFileWatcher_InvalidPolicySkipReload(t *testing.T) { i, _ := authz.NewFileWatcher(file, 20*time.Millisecond) defer i.Close() - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("error listening: %v", err) - } - defer lis.Close() stub := &stubserver.StubServer{ - Listener: lis, UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{}, nil }, // Start a gRPC server with gRPC authz unary server interceptors. - S: grpc.NewServer( - grpc.ChainUnaryInterceptor(i.UnaryInterceptor)), + S: grpc.NewServer(grpc.ChainUnaryInterceptor(i.UnaryInterceptor)), } stubserver.StartTestService(t, stub) - defer stub.S.Stop() + defer stub.Stop() // Establish a connection to the server. - clientConn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + cc, err := grpc.NewClient(stub.Address, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { - t.Fatalf("grpc.NewClient(%v) failed: %v", lis.Addr().String(), err) + t.Fatalf("grpc.NewClient(%v) failed: %v", stub.Address, err) } - defer clientConn.Close() - client := testgrpc.NewTestServiceClient(clientConn) + defer cc.Close() + client := testgrpc.NewTestServiceClient(cc) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -700,30 +654,21 @@ func (s) TestFileWatcher_RecoversFromReloadFailure(t *testing.T) { i, _ := authz.NewFileWatcher(file, 100*time.Millisecond) defer i.Close() - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("error listening: %v", err) - } - defer lis.Close() - stub := &stubserver.StubServer{ - Listener: lis, UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{}, nil }, - S: grpc.NewServer( - grpc.ChainUnaryInterceptor(i.UnaryInterceptor)), - } + S: grpc.NewServer(grpc.ChainUnaryInterceptor(i.UnaryInterceptor))} stubserver.StartTestService(t, stub) - defer stub.S.Stop() + defer stub.Stop() // Establish a connection to the server. - clientConn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + cc, err := grpc.NewClient(stub.Address, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { - t.Fatalf("grpc.NewClient(%v) failed: %v", lis.Addr().String(), err) + t.Fatalf("grpc.NewClient(%v) failed: %v", stub.Address, err) } - defer clientConn.Close() - client := testgrpc.NewTestServiceClient(clientConn) + defer cc.Close() + client := testgrpc.NewTestServiceClient(cc) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() diff --git a/internal/stubserver/stubserver.go b/internal/stubserver/stubserver.go index 92262af877a6..3c3f4fb067f2 100644 --- a/internal/stubserver/stubserver.go +++ b/internal/stubserver/stubserver.go @@ -59,8 +59,8 @@ type StubServer struct { EmptyCallF func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) UnaryCallF func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) FullDuplexCallF func(stream testgrpc.TestService_FullDuplexCallServer) error - StreamingInputCallF func(stream testgrpc.TestService_StreamingInputCallServer) error // Client-Streaming request - StreamingOutputCallF func(req *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error // Server-streaming response + StreamingInputCallF func(stream testgrpc.TestService_StreamingInputCallServer) error + StreamingOutputCallF func(req *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error // A client connected to this service the test may use. Created in Start(). Client testgrpc.TestServiceClient From 05ad4cc89dbb5b25f3122a3702f7de2a95f2d02a Mon Sep 17 00:00:00 2001 From: Vissa Janardhan Krishna Sai Date: Wed, 8 Jan 2025 12:35:36 +0000 Subject: [PATCH 12/12] adding new settings under dialoptions --- dialoptions.go | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/dialoptions.go b/dialoptions.go index f3a045296a46..2c4c590cd66b 100644 --- a/dialoptions.go +++ b/dialoptions.go @@ -94,6 +94,8 @@ type dialOptions struct { idleTimeout time.Duration defaultScheme string maxCallAttempts int + staticConnWindowSize int32 + staticStreamWindowSize int32 } // DialOption configures how we set up the connection. @@ -662,6 +664,32 @@ func WithDisableHealthCheck() DialOption { }) } +// WithStaticConnWindowSize returns a DialOption which sets a static value for +// the connection window size. This disables BDP estimation and keeps the +// window size fixed at the provided value. The lower bound for window size is +// 64K, and any value smaller than that will be ignored. +func WithStaticConnWindowSize(s int32) DialOption { + return newFuncDialOption(func(o *dialOptions) { + if s < 64*1024 { + return + } + o.staticConnWindowSize = s + }) +} + +// WithStaticStreamWindowSize returns a DialOption which sets a static value for +// the stream window size. This disables BDP estimation and keeps the +// window size fixed at the provided value. The lower bound for window size is +// 64K, and any value smaller than that will be ignored. +func WithStaticStreamWindowSize(s int32) DialOption { + return newFuncDialOption(func(o *dialOptions) { + if s < 64*1024 { + return + } + o.staticStreamWindowSize = s + }) +} + func defaultDialOptions() dialOptions { return dialOptions{ copts: transport.ConnectOptions{