From e9ea41dedc6e78a836bdbe9665743c903cba6b0f Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Wed, 13 Dec 2023 23:21:01 +0000 Subject: [PATCH 1/5] grpc: eliminate panics in server worker implementation This PR eliminates two possible panics: - write to a closed channel - this was possible when the close of the serverWorkerChannel from Stop/GracefulStop was racing with handling of a new stream. - close of a closed channel - this was possible when Stop/GracefulStop was called more than once. --- server.go | 22 +++++++++------- test/server_test.go | 64 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+), 9 deletions(-) diff --git a/server.go b/server.go index 2fa694d555e5..682fa1831ec8 100644 --- a/server.go +++ b/server.go @@ -144,7 +144,8 @@ type Server struct { channelzID *channelz.Identifier czData *channelzData - serverWorkerChannel chan func() + serverWorkerChannel chan func() + serverWorkerChannelClose func() } type serverOptions struct { @@ -623,15 +624,14 @@ func (s *Server) serverWorker() { // connections to reduce the time spent overall on runtime.morestack. func (s *Server) initServerWorkers() { s.serverWorkerChannel = make(chan func()) + s.serverWorkerChannelClose = grpcsync.OnceFunc(func() { + close(s.serverWorkerChannel) + }) for i := uint32(0); i < s.opts.numServerWorkers; i++ { go s.serverWorker() } } -func (s *Server) stopServerWorkers() { - close(s.serverWorkerChannel) -} - // NewServer creates a gRPC server which has no service registered and has not // started to accept requests yet. func NewServer(opt ...ServerOption) *Server { @@ -1898,15 +1898,19 @@ func (s *Server) stop(graceful bool) { s.closeServerTransportsLocked() } - if s.opts.numServerWorkers > 0 { - s.stopServerWorkers() - } - for len(s.conns) != 0 { s.cv.Wait() } s.conns = nil + if s.opts.numServerWorkers > 0 { + // Closing the channel (only once, via grpcsync.OnceFunc) after all the + // connections have been closed above ensures that there are no + // goroutines executing the callback passed to st.HandleStreams (where + // the channel is written to). + s.serverWorkerChannelClose() + } + if s.events != nil { s.events.Finish() s.events = nil diff --git a/test/server_test.go b/test/server_test.go index eb0872d0c113..bf169fb947e7 100644 --- a/test/server_test.go +++ b/test/server_test.go @@ -21,10 +21,13 @@ package test import ( "context" "io" + "runtime" + "sync" "testing" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/status" @@ -32,6 +35,67 @@ import ( testpb "google.golang.org/grpc/interop/grpc_testing" ) +// Tests the case where the server worker goroutine option is enabled, and a +// number of RPCs are initiated around the same time that Stop() is called. This +// used to result in a write to a closed channel. This test verifies that there +// is no panic. +func (s) TestServerWorkers_RPCsAndStop(t *testing.T) { + ss := stubserver.StartTestService(t, nil, grpc.NumStreamWorkers(uint32(runtime.NumCPU()))) + // This deferred stop takes care of stopping the server when one of the + // below grpc.Dials fail, and the test exits early. + defer ss.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + const numChannels = 20 + const numRPCLoops = 20 + var wg sync.WaitGroup + for i := 0; i < numChannels; i++ { + cc, err := grpc.Dial(ss.Address, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("[iteration: %d] grpc.Dial(%s) failed: %v", i, ss.Address, err) + } + defer cc.Close() + + client := testgrpc.NewTestServiceClient(cc) + for j := 0; j < numRPCLoops; j++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Logf("EmptyCall() failed: %v", err) + return + } + } + }() + } + } + // Call Stop() concurrently with the above RPC attempts. + ss.Stop() + wg.Wait() +} + +// Tests the case where the server worker goroutine option is enabled, and both +// Stop() and GracefulStop() care called. This used to result in a close of a +// closed channel. This test verifies that there is no panic. +func (s) TestServerGracefulStopAndStop(t *testing.T) { + ss := stubserver.StartTestService(t, nil, grpc.NumStreamWorkers(uint32(runtime.NumCPU()))) + defer ss.Stop() + + if err := ss.StartClient(grpc.WithTransportCredentials(insecure.NewCredentials())); err != nil { + t.Fatalf("Failed to create client to stub server: %v", err) + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + client := testgrpc.NewTestServiceClient(ss.CC) + if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Fatalf("EmptyCall() failed: %v", err) + } + + ss.S.GracefulStop() +} + type ctxKey string // TestServerReturningContextError verifies that if a context error is returned From e1d3fab08358c5159c082d13b6f0c752e6cc92e0 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 14 Dec 2023 20:38:36 +0000 Subject: [PATCH 2/5] review comments; pass 1 --- server_ext_test.go | 93 +++++++++++++++++++++++++++++++++++++++++++++ stream_test.go | 1 + test/server_test.go | 64 ------------------------------- 3 files changed, 94 insertions(+), 64 deletions(-) diff --git a/server_ext_test.go b/server_ext_test.go index df79755f3255..ddda32e81b8b 100644 --- a/server_ext_test.go +++ b/server_ext_test.go @@ -21,14 +21,20 @@ package grpc_test import ( "context" "io" + "runtime" + "sync" "testing" "time" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/stubserver" + "google.golang.org/grpc/status" testgrpc "google.golang.org/grpc/interop/grpc_testing" + testpb "google.golang.org/grpc/interop/grpc_testing" ) // TestServer_MaxHandlers ensures that no more than MaxConcurrentStreams server @@ -97,3 +103,90 @@ func (s) TestServer_MaxHandlers(t *testing.T) { t.Fatal("Received unexpected RPC error:", err) } } + +// Tests the case where the stream worker goroutine option is enabled, and a +// number of RPCs are initiated around the same time that Stop() is called. This +// used to result in a write to a closed channel. This test verifies that there +// is no panic. +func (s) TestStreamWorkers_RPCsAndStop(t *testing.T) { + ss := stubserver.StartTestService(t, nil, grpc.NumStreamWorkers(uint32(runtime.NumCPU()))) + // This deferred stop takes care of stopping the server when one of the + // below grpc.Dials fail, and the test exits early. + defer ss.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + const numChannels = 20 + const numRPCLoops = 20 + + // Create a bunch of clientconns and ensure that they are READY by making an + // RPC on them. + ccs := make([]*grpc.ClientConn, numChannels) + for i := 0; i < numChannels; i++ { + var err error + ccs[i], err = grpc.Dial(ss.Address, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("[iteration: %d] grpc.Dial(%s) failed: %v", i, ss.Address, err) + } + defer ccs[i].Close() + client := testgrpc.NewTestServiceClient(ccs[i]) + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + t.Fatalf("EmptyCall() failed: %v", err) + } + } + + // Make a bunch of concurrent RPCs on the above clientconns. These will + // eventually race with Stop(), and will start to fail. + var wg sync.WaitGroup + for i := 0; i < numChannels; i++ { + client := testgrpc.NewTestServiceClient(ccs[i]) + for j := 0; j < numRPCLoops; j++ { + wg.Add(1) + go func(client testgrpc.TestServiceClient) { + defer wg.Done() + for { + sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + _, err := client.EmptyCall(sCtx, &testpb.Empty{}) + if err == nil { + continue + } + if code := status.Code(err); code == codes.DeadlineExceeded || code == codes.Unavailable { + // Once Stop() has been called on the server, we expect + // subsequent calls to fail with Unavailable or + // DeadlineExceeded, the latter happens when the client + // channel moves to TRANSIENT_FAILURE and will never + // recover from there. + return + } + t.Errorf("EmptyCall() failed: %v", err) + return + } + }(client) + } + } + + // Call Stop() concurrently with the above RPC attempts. + ss.Stop() + wg.Wait() +} + +// Tests the case where the stream worker goroutine option is enabled, and both +// Stop() and GracefulStop() care called. This used to result in a close of a +// closed channel. This test verifies that there is no panic. +func (s) TestStreamWorkers_GracefulStopAndStop(t *testing.T) { + ss := stubserver.StartTestService(t, nil, grpc.NumStreamWorkers(uint32(runtime.NumCPU()))) + defer ss.Stop() + + if err := ss.StartClient(grpc.WithTransportCredentials(insecure.NewCredentials())); err != nil { + t.Fatalf("Failed to create client to stub server: %v", err) + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + client := testgrpc.NewTestServiceClient(ss.CC) + if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Fatalf("EmptyCall() failed: %v", err) + } + + ss.S.GracefulStop() +} diff --git a/stream_test.go b/stream_test.go index 6f1649554f6c..13782a27e750 100644 --- a/stream_test.go +++ b/stream_test.go @@ -32,6 +32,7 @@ import ( ) const defaultTestTimeout = 10 * time.Second +const defaultTestShortTimeout = 10 * time.Millisecond type s struct { grpctest.Tester diff --git a/test/server_test.go b/test/server_test.go index bf169fb947e7..eb0872d0c113 100644 --- a/test/server_test.go +++ b/test/server_test.go @@ -21,13 +21,10 @@ package test import ( "context" "io" - "runtime" - "sync" "testing" "google.golang.org/grpc" "google.golang.org/grpc/codes" - "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/status" @@ -35,67 +32,6 @@ import ( testpb "google.golang.org/grpc/interop/grpc_testing" ) -// Tests the case where the server worker goroutine option is enabled, and a -// number of RPCs are initiated around the same time that Stop() is called. This -// used to result in a write to a closed channel. This test verifies that there -// is no panic. -func (s) TestServerWorkers_RPCsAndStop(t *testing.T) { - ss := stubserver.StartTestService(t, nil, grpc.NumStreamWorkers(uint32(runtime.NumCPU()))) - // This deferred stop takes care of stopping the server when one of the - // below grpc.Dials fail, and the test exits early. - defer ss.Stop() - - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - const numChannels = 20 - const numRPCLoops = 20 - var wg sync.WaitGroup - for i := 0; i < numChannels; i++ { - cc, err := grpc.Dial(ss.Address, grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - t.Fatalf("[iteration: %d] grpc.Dial(%s) failed: %v", i, ss.Address, err) - } - defer cc.Close() - - client := testgrpc.NewTestServiceClient(cc) - for j := 0; j < numRPCLoops; j++ { - wg.Add(1) - go func() { - defer wg.Done() - for { - if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { - t.Logf("EmptyCall() failed: %v", err) - return - } - } - }() - } - } - // Call Stop() concurrently with the above RPC attempts. - ss.Stop() - wg.Wait() -} - -// Tests the case where the server worker goroutine option is enabled, and both -// Stop() and GracefulStop() care called. This used to result in a close of a -// closed channel. This test verifies that there is no panic. -func (s) TestServerGracefulStopAndStop(t *testing.T) { - ss := stubserver.StartTestService(t, nil, grpc.NumStreamWorkers(uint32(runtime.NumCPU()))) - defer ss.Stop() - - if err := ss.StartClient(grpc.WithTransportCredentials(insecure.NewCredentials())); err != nil { - t.Fatalf("Failed to create client to stub server: %v", err) - } - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - client := testgrpc.NewTestServiceClient(ss.CC) - if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { - t.Fatalf("EmptyCall() failed: %v", err) - } - - ss.S.GracefulStop() -} - type ctxKey string // TestServerReturningContextError verifies that if a context error is returned From fe69cb2e44983d10884e1b44f74ccbea1b3c0f0b Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 14 Dec 2023 22:30:51 +0000 Subject: [PATCH 3/5] use context with defaultTestTimeout for RPCs --- server_ext_test.go | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/server_ext_test.go b/server_ext_test.go index ddda32e81b8b..f7aa657554dd 100644 --- a/server_ext_test.go +++ b/server_ext_test.go @@ -145,18 +145,13 @@ func (s) TestStreamWorkers_RPCsAndStop(t *testing.T) { go func(client testgrpc.TestServiceClient) { defer wg.Done() for { - sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) - defer sCancel() - _, err := client.EmptyCall(sCtx, &testpb.Empty{}) + _, err := client.EmptyCall(ctx, &testpb.Empty{}) if err == nil { continue } - if code := status.Code(err); code == codes.DeadlineExceeded || code == codes.Unavailable { + if code := status.Code(err); code == codes.Unavailable { // Once Stop() has been called on the server, we expect - // subsequent calls to fail with Unavailable or - // DeadlineExceeded, the latter happens when the client - // channel moves to TRANSIENT_FAILURE and will never - // recover from there. + // subsequent calls to fail with Unavailable. return } t.Errorf("EmptyCall() failed: %v", err) From b31d719951fc8d8474b5d9b47f6f6ac93172cf23 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Fri, 15 Dec 2023 00:08:20 +0000 Subject: [PATCH 4/5] make vet happy --- stream_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/stream_test.go b/stream_test.go index 13782a27e750..6f1649554f6c 100644 --- a/stream_test.go +++ b/stream_test.go @@ -32,7 +32,6 @@ import ( ) const defaultTestTimeout = 10 * time.Second -const defaultTestShortTimeout = 10 * time.Millisecond type s struct { grpctest.Tester From afc5e88fbf2e8060169001caa9c2ff6313d70b1f Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Fri, 15 Dec 2023 00:50:03 +0000 Subject: [PATCH 5/5] remove wait for ready call option --- server_ext_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server_ext_test.go b/server_ext_test.go index f7aa657554dd..c065e4ad42a8 100644 --- a/server_ext_test.go +++ b/server_ext_test.go @@ -130,7 +130,7 @@ func (s) TestStreamWorkers_RPCsAndStop(t *testing.T) { } defer ccs[i].Close() client := testgrpc.NewTestServiceClient(ccs[i]) - if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { t.Fatalf("EmptyCall() failed: %v", err) } }