Skip to content

Commit

Permalink
removing explicit listener creation code
Browse files Browse the repository at this point in the history
  • Loading branch information
janardhankrishna-sai committed Jan 8, 2025
1 parent 96cd53a commit 136cc82
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 124 deletions.
46 changes: 22 additions & 24 deletions authz/audit/audit_logging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,21 @@ func (*loggerBuilder) ParseLoggerConfig(config json.RawMessage) (audit.LoggerCon
// and 'deny' outcomes. Additionally, it checks if SPIFFE ID from a certificate
// is propagated correctly.
func (s) TestAuditLogger(t *testing.T) {
// Construct the credentials for the tests and the stub server
serverCreds := loadServerCreds(t)
clientCreds := loadClientCreds(t)
ss := &stubserver.StubServer{
UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{}, nil
},
FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
_, err := stream.Recv()
if err != io.EOF {
return err
}
return nil
},
}
// Each test data entry contains an authz policy for a grpc server,
// how many 'allow' and 'deny' outcomes we expect (each test case makes 2
// unary calls and one client-streaming call), and a structure to check if
Expand Down Expand Up @@ -239,21 +254,7 @@ func (s) TestAuditLogger(t *testing.T) {
wantStreamingCallCode: codes.PermissionDenied,
},
}
// Construct the credentials for the tests and the stub server
serverCreds := loadServerCreds(t)
clientCreds := loadClientCreds(t)
ss := &stubserver.StubServer{
UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{}, nil
},
FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
_, err := stream.Recv()
if err != io.EOF {
return err
}
return nil
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
// Setup test statAuditLogger, gRPC test server with authzPolicy, unary
Expand All @@ -265,21 +266,18 @@ func (s) TestAuditLogger(t *testing.T) {
audit.RegisterLoggerBuilder(lb)
i, _ := authz.NewStatic(test.authzPolicy)

s := grpc.NewServer(
grpc.Creds(serverCreds),
grpc.ChainUnaryInterceptor(i.UnaryInterceptor),
grpc.ChainStreamInterceptor(i.StreamInterceptor))
s := grpc.NewServer(grpc.Creds(serverCreds), grpc.ChainUnaryInterceptor(i.UnaryInterceptor), grpc.ChainStreamInterceptor(i.StreamInterceptor))
defer s.Stop()
ss.S = s
stubserver.StartTestService(t, ss)

// Setup gRPC test client with certificates containing a SPIFFE Id.
clientConn, err := grpc.NewClient(ss.Address, grpc.WithTransportCredentials(clientCreds))
cc, err := grpc.NewClient(ss.Address, grpc.WithTransportCredentials(clientCreds))
if err != nil {
t.Fatalf("grpc.NewClient(%v) failed: %v", ss.Address, err)
}
defer clientConn.Close()
client := testgrpc.NewTestServiceClient(clientConn)
defer cc.Close()
client := testgrpc.NewTestServiceClient(cc)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

Expand All @@ -291,15 +289,15 @@ func (s) TestAuditLogger(t *testing.T) {
}
stream, err := client.StreamingInputCall(ctx)
if err != nil {
t.Fatalf("StreamingInputCall failed:%v", err)
t.Fatalf("StreamingInputCall failed: %v", err)
}
req := &testpb.StreamingInputCallRequest{
Payload: &testpb.Payload{
Body: []byte("hi"),
},
}
if err := stream.Send(req); err != nil && err != io.EOF {
t.Fatalf("stream.Send failed:%v", err)
t.Fatalf("stream.Send failed: %v", err)
}
if _, err := stream.CloseAndRecv(); status.Code(err) != test.wantStreamingCallCode {
t.Errorf("Unexpected stream.CloseAndRecv fail: got %v want %v", err, test.wantStreamingCallCode)
Expand Down
141 changes: 43 additions & 98 deletions authz/grpc_authz_end2end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"crypto/tls"
"crypto/x509"
"io"
"net"
"os"
"testing"
"time"
Expand Down Expand Up @@ -295,12 +294,7 @@ func (s) TestStaticPolicyEnd2End(t *testing.T) {
// Start a gRPC server with gRPC authz unary and stream server interceptors.
i, _ := authz.NewStatic(test.authzPolicy)

lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("error listening: %v", err)
}
stub := &stubserver.StubServer{
Listener: lis,
UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{}, nil
},
Expand All @@ -315,20 +309,18 @@ func (s) TestStaticPolicyEnd2End(t *testing.T) {
}
}
},
S: grpc.NewServer(
grpc.ChainUnaryInterceptor(i.UnaryInterceptor),
grpc.ChainStreamInterceptor(i.StreamInterceptor)),
S: grpc.NewServer(grpc.ChainUnaryInterceptor(i.UnaryInterceptor), grpc.ChainStreamInterceptor(i.StreamInterceptor)),
}
stubserver.StartTestService(t, stub)
defer stub.S.Stop()
defer stub.Stop()

// Establish a connection to the server.
clientConn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
cc, err := grpc.NewClient(stub.Address, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("grpc.NewClient(%v) failed: %v", lis.Addr().String(), err)
t.Fatalf("grpc.NewClient(%v) failed: %v", stub.Address, err)
}
defer clientConn.Close()
client := testgrpc.NewTestServiceClient(clientConn)
defer cc.Close()
client := testgrpc.NewTestServiceClient(cc)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand Down Expand Up @@ -381,18 +373,11 @@ func (s) TestAllowsRPCRequestWithPrincipalsFieldOnTLSAuthenticatedConnection(t *
t.Fatalf("failed to generate credentials: %v", err)
}

lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("error listening: %v", err)
}
stub := &stubserver.StubServer{
Listener: lis,
UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{}, nil
},
S: grpc.NewServer(
grpc.Creds(creds),
grpc.ChainUnaryInterceptor(i.UnaryInterceptor)),
S: grpc.NewServer(grpc.Creds(creds), grpc.ChainUnaryInterceptor(i.UnaryInterceptor)),
}
stubserver.StartTestService(t, stub)
defer stub.S.Stop()
Expand All @@ -402,12 +387,12 @@ func (s) TestAllowsRPCRequestWithPrincipalsFieldOnTLSAuthenticatedConnection(t *
if err != nil {
t.Fatalf("failed to load credentials: %v", err)
}
clientConn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(creds))
cc, err := grpc.NewClient(stub.Address, grpc.WithTransportCredentials(creds))
if err != nil {
t.Fatalf("grpc.NewClient(%v) failed: %v", lis.Addr().String(), err)
t.Fatalf("grpc.NewClient(%v) failed: %v", stub.Address, err)
}
defer clientConn.Close()
client := testgrpc.NewTestServiceClient(clientConn)
defer cc.Close()
client := testgrpc.NewTestServiceClient(cc)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand Down Expand Up @@ -450,22 +435,14 @@ func (s) TestAllowsRPCRequestWithPrincipalsFieldOnMTLSAuthenticatedConnection(t
Certificates: []tls.Certificate{cert},
ClientCAs: certPool,
})

lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("error listening: %v", err)
}
stub := &stubserver.StubServer{
Listener: lis,
UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{}, nil
},
S: grpc.NewServer(
grpc.Creds(creds),
grpc.ChainUnaryInterceptor(i.UnaryInterceptor)),
S: grpc.NewServer(grpc.Creds(creds), grpc.ChainUnaryInterceptor(i.UnaryInterceptor)),
}
stubserver.StartTestService(t, stub)
defer stub.S.Stop()
defer stub.Stop()

// Establish a connection to the server.
cert, err = tls.LoadX509KeyPair(testdata.Path("x509/client1_cert.pem"), testdata.Path("x509/client1_key.pem"))
Expand All @@ -485,12 +462,12 @@ func (s) TestAllowsRPCRequestWithPrincipalsFieldOnMTLSAuthenticatedConnection(t
RootCAs: roots,
ServerName: "x.test.example.com",
})
clientConn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(creds))
cc, err := grpc.NewClient(stub.Address, grpc.WithTransportCredentials(creds))
if err != nil {
t.Fatalf("grpc.NewClient(%v) failed: %v", lis.Addr().String(), err)
t.Fatalf("grpc.NewClient(%v) failed: %v", stub.Address, err)
}
defer clientConn.Close()
client := testgrpc.NewTestServiceClient(clientConn)
defer cc.Close()
client := testgrpc.NewTestServiceClient(cc)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand All @@ -508,13 +485,7 @@ func (s) TestFileWatcherEnd2End(t *testing.T) {
i, _ := authz.NewFileWatcher(file, 1*time.Second)
defer i.Close()

lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("error listening: %v", err)
}
defer lis.Close()
stub := &stubserver.StubServer{
Listener: lis,
UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{}, nil
},
Expand All @@ -530,20 +501,18 @@ func (s) TestFileWatcherEnd2End(t *testing.T) {
}
},
// Start a gRPC server with gRPC authz unary and stream server interceptors.
S: grpc.NewServer(
grpc.ChainUnaryInterceptor(i.UnaryInterceptor),
grpc.ChainStreamInterceptor(i.StreamInterceptor)),
S: grpc.NewServer(grpc.ChainUnaryInterceptor(i.UnaryInterceptor), grpc.ChainStreamInterceptor(i.StreamInterceptor)),
}
stubserver.StartTestService(t, stub)
defer stub.S.Stop()
defer stub.Stop()

// Establish a connection to the server.
clientConn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
cc, err := grpc.NewClient(stub.Address, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("grpc.NewClient(%v) failed: %v", lis.Addr().String(), err)
t.Fatalf("grpc.NewClient(%v) failed: %v", stub.Address, err)
}
defer clientConn.Close()
client := testgrpc.NewTestServiceClient(clientConn)
defer cc.Close()
client := testgrpc.NewTestServiceClient(cc)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand All @@ -558,15 +527,15 @@ func (s) TestFileWatcherEnd2End(t *testing.T) {
// Verifying authorization decision for Streaming RPC.
stream, err := client.StreamingInputCall(ctx)
if err != nil {
t.Fatalf("failed StreamingInputCall err: %v", err)
t.Fatalf("failed StreamingInputCall : %v", err)
}
req := &testpb.StreamingInputCallRequest{
Payload: &testpb.Payload{
Body: []byte("hi"),
},
}
if err := stream.Send(req); err != nil && err != io.EOF {
t.Fatalf("failed stream.Send err: %v", err)
t.Fatalf("failed stream.Send : %v", err)
}
_, err = stream.CloseAndRecv()
if got := status.Convert(err); got.Code() != test.wantStatus.Code() || got.Message() != test.wantStatus.Message() {
Expand All @@ -593,30 +562,22 @@ func (s) TestFileWatcher_ValidPolicyRefresh(t *testing.T) {
i, _ := authz.NewFileWatcher(file, 100*time.Millisecond)
defer i.Close()

lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("error listening: %v", err)
}
defer lis.Close()
stub := &stubserver.StubServer{
Listener: lis,
UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{}, nil
},
// Start a gRPC server with gRPC authz unary server interceptor.
S: grpc.NewServer(
grpc.ChainUnaryInterceptor(i.UnaryInterceptor)),
}
S: grpc.NewServer(grpc.ChainUnaryInterceptor(i.UnaryInterceptor))}
stubserver.StartTestService(t, stub)
defer stub.S.Stop()
defer stub.Stop()

// Establish a connection to the server.
clientConn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
cc, err := grpc.NewClient(stub.Address, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("grpc.NewClient(%v) failed: %v", lis.Addr().String(), err)
t.Fatalf("grpc.NewClient(%v) failed: %v", stub.Address, err)
}
defer clientConn.Close()
client := testgrpc.NewTestServiceClient(clientConn)
defer cc.Close()
client := testgrpc.NewTestServiceClient(cc)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand Down Expand Up @@ -645,30 +606,23 @@ func (s) TestFileWatcher_InvalidPolicySkipReload(t *testing.T) {
i, _ := authz.NewFileWatcher(file, 20*time.Millisecond)
defer i.Close()

lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("error listening: %v", err)
}
defer lis.Close()
stub := &stubserver.StubServer{
Listener: lis,
UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{}, nil
},
// Start a gRPC server with gRPC authz unary server interceptors.
S: grpc.NewServer(
grpc.ChainUnaryInterceptor(i.UnaryInterceptor)),
S: grpc.NewServer(grpc.ChainUnaryInterceptor(i.UnaryInterceptor)),
}
stubserver.StartTestService(t, stub)
defer stub.S.Stop()
defer stub.Stop()

// Establish a connection to the server.
clientConn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
cc, err := grpc.NewClient(stub.Address, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("grpc.NewClient(%v) failed: %v", lis.Addr().String(), err)
t.Fatalf("grpc.NewClient(%v) failed: %v", stub.Address, err)
}
defer clientConn.Close()
client := testgrpc.NewTestServiceClient(clientConn)
defer cc.Close()
client := testgrpc.NewTestServiceClient(cc)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand Down Expand Up @@ -700,30 +654,21 @@ func (s) TestFileWatcher_RecoversFromReloadFailure(t *testing.T) {
i, _ := authz.NewFileWatcher(file, 100*time.Millisecond)
defer i.Close()

lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("error listening: %v", err)
}
defer lis.Close()

stub := &stubserver.StubServer{
Listener: lis,
UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{}, nil
},
S: grpc.NewServer(
grpc.ChainUnaryInterceptor(i.UnaryInterceptor)),
}
S: grpc.NewServer(grpc.ChainUnaryInterceptor(i.UnaryInterceptor))}
stubserver.StartTestService(t, stub)
defer stub.S.Stop()
defer stub.Stop()

// Establish a connection to the server.
clientConn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
cc, err := grpc.NewClient(stub.Address, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("grpc.NewClient(%v) failed: %v", lis.Addr().String(), err)
t.Fatalf("grpc.NewClient(%v) failed: %v", stub.Address, err)
}
defer clientConn.Close()
client := testgrpc.NewTestServiceClient(clientConn)
defer cc.Close()
client := testgrpc.NewTestServiceClient(cc)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand Down
4 changes: 2 additions & 2 deletions internal/stubserver/stubserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ type StubServer struct {
EmptyCallF func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error)
UnaryCallF func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error)
FullDuplexCallF func(stream testgrpc.TestService_FullDuplexCallServer) error
StreamingInputCallF func(stream testgrpc.TestService_StreamingInputCallServer) error // Client-Streaming request
StreamingOutputCallF func(req *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error // Server-streaming response
StreamingInputCallF func(stream testgrpc.TestService_StreamingInputCallServer) error
StreamingOutputCallF func(req *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error

// A client connected to this service the test may use. Created in Start().
Client testgrpc.TestServiceClient
Expand Down

0 comments on commit 136cc82

Please sign in to comment.