From 4f6c5f2348afe333a3552aa4c4854eae62e22353 Mon Sep 17 00:00:00 2001 From: Arjan Singh Bal <46515553+arjan-bal@users.noreply.github.com> Date: Mon, 30 Sep 2024 15:04:22 +0530 Subject: [PATCH] xds/server: Fix xDS Server leak (#7664) (#7681) --- xds/internal/server/conn_wrapper.go | 1 + xds/internal/server/listener_wrapper.go | 34 +++--- xds/internal/server/listener_wrapper_test.go | 105 +++++++++++++++++++ 3 files changed, 125 insertions(+), 15 deletions(-) diff --git a/xds/internal/server/conn_wrapper.go b/xds/internal/server/conn_wrapper.go index d2a13d75c591..92d07e7fb6d1 100644 --- a/xds/internal/server/conn_wrapper.go +++ b/xds/internal/server/conn_wrapper.go @@ -161,6 +161,7 @@ func (c *connWrapper) Close() error { if c.rootProvider != nil { c.rootProvider.Close() } + c.parent.removeConn(c) return c.Conn.Close() } diff --git a/xds/internal/server/listener_wrapper.go b/xds/internal/server/listener_wrapper.go index e2246f41afec..09d320018aee 100644 --- a/xds/internal/server/listener_wrapper.go +++ b/xds/internal/server/listener_wrapper.go @@ -86,6 +86,7 @@ func NewListenerWrapper(params ListenerWrapperParams) net.Listener { xdsC: params.XDSClient, modeCallback: params.ModeCallback, isUnspecifiedAddr: params.Listener.Addr().(*net.TCPAddr).IP.IsUnspecified(), + conns: make(map[*connWrapper]bool), mode: connectivity.ServingModeNotServing, closed: grpcsync.NewEvent(), @@ -135,13 +136,13 @@ type listenerWrapper struct { // mu guards access to the current serving mode and the active filter chain // manager. - mu sync.RWMutex + mu sync.Mutex // Current serving mode. mode connectivity.ServingMode // Filter chain manager currently serving. activeFilterChainManager *xdsresource.FilterChainManager // conns accepted with configuration from activeFilterChainManager. - conns []*connWrapper + conns map[*connWrapper]bool // These fields are read/written to in the context of xDS updates, which are // guaranteed to be emitted synchronously from the xDS Client. Thus, they do @@ -202,17 +203,14 @@ func (l *listenerWrapper) maybeUpdateFilterChains() { // gracefully shut down with a grace period of 10 minutes for long-lived // RPC's, such that clients will reconnect and have the updated // configuration apply." - A36 - var connsToClose []*connWrapper - if l.activeFilterChainManager != nil { // If there is a filter chain manager to clean up. - connsToClose = l.conns - l.conns = nil - } + connsToClose := l.conns + l.conns = make(map[*connWrapper]bool) l.activeFilterChainManager = l.pendingFilterChainManager l.pendingFilterChainManager = nil l.instantiateFilterChainRoutingConfigurationsLocked() l.mu.Unlock() go func() { - for _, conn := range connsToClose { + for conn := range connsToClose { conn.Drain() } }() @@ -304,7 +302,7 @@ func (l *listenerWrapper) Accept() (net.Conn, error) { return nil, fmt.Errorf("received connection with non-TCP address (local: %T, remote %T)", conn.LocalAddr(), conn.RemoteAddr()) } - l.mu.RLock() + l.mu.Lock() if l.mode == connectivity.ServingModeNotServing { // Close connections as soon as we accept them when we are in // "not-serving" mode. Since we accept a net.Listener from the user @@ -312,7 +310,7 @@ func (l *listenerWrapper) Accept() (net.Conn, error) { // "not-serving". Closing the connection immediately upon accepting // is one of the other ways to implement the "not-serving" mode as // outlined in gRFC A36. - l.mu.RUnlock() + l.mu.Unlock() conn.Close() continue } @@ -324,7 +322,7 @@ func (l *listenerWrapper) Accept() (net.Conn, error) { SourcePort: srcAddr.Port, }) if err != nil { - l.mu.RUnlock() + l.mu.Unlock() // When a matching filter chain is not found, we close the // connection right away, but do not return an error back to // `grpc.Serve()` from where this Accept() was invoked. Returning an @@ -341,12 +339,18 @@ func (l *listenerWrapper) Accept() (net.Conn, error) { continue } cw := &connWrapper{Conn: conn, filterChain: fc, parent: l, urc: fc.UsableRouteConfiguration} - l.conns = append(l.conns, cw) - l.mu.RUnlock() + l.conns[cw] = true + l.mu.Unlock() return cw, nil } } +func (l *listenerWrapper) removeConn(conn *connWrapper) { + l.mu.Lock() + defer l.mu.Unlock() + delete(l.conns, conn) +} + // Close closes the underlying listener. It also cancels the xDS watch // registered in Serve() and closes any certificate provider instances created // based on security configuration received in the LDS response. @@ -376,9 +380,9 @@ func (l *listenerWrapper) switchModeLocked(newMode connectivity.ServingMode, err l.mode = newMode if l.mode == connectivity.ServingModeNotServing { connsToClose := l.conns - l.conns = nil + l.conns = make(map[*connWrapper]bool) go func() { - for _, conn := range connsToClose { + for conn := range connsToClose { conn.Drain() } }() diff --git a/xds/internal/server/listener_wrapper_test.go b/xds/internal/server/listener_wrapper_test.go index 1fc0dd49b23c..981d65e46ba8 100644 --- a/xds/internal/server/listener_wrapper_test.go +++ b/xds/internal/server/listener_wrapper_test.go @@ -23,12 +23,18 @@ import ( "fmt" "net" "strconv" + "sync" "testing" + "time" + "google.golang.org/grpc" "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" + testgrpc "google.golang.org/grpc/interop/grpc_testing" + testpb "google.golang.org/grpc/interop/grpc_testing" xdsinternal "google.golang.org/grpc/xds/internal" "google.golang.org/grpc/xds/internal/xdsclient" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" @@ -151,5 +157,104 @@ func (s) TestListenerWrapper(t *testing.T) { t.Fatalf("mode change received: %v, want: %v", mode, connectivity.ServingModeNotServing) } } +} + +type testService struct { + testgrpc.TestServiceServer +} + +func (*testService) EmptyCall(context.Context, *testpb.Empty) (*testpb.Empty, error) { + return &testpb.Empty{}, nil +} + +// TestConnsCleanup tests that the listener wrapper clears it's connection +// references when connections close. It sets up a listener wrapper and gRPC +// Server, and connects to the server 100 times and makes an RPC each time, and +// then closes the connection. After these 100 connections Close, the listener +// wrapper should have no more references to any connections. +func (s) TestConnsCleanup(t *testing.T) { + mgmtServer, nodeID, _, _, xdsC := xdsSetupForTests(t) + lis, err := testutils.LocalTCPListener() + if err != nil { + t.Fatalf("Failed to create a local TCP listener: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + modeCh := make(chan connectivity.ServingMode, 1) + vm := verifyMode{ + modeCh: modeCh, + } + + host, port := hostPortFromListener(t, lis) + lisResourceName := fmt.Sprintf(e2e.ServerListenerResourceNameTemplate, net.JoinHostPort(host, strconv.Itoa(int(port)))) + params := ListenerWrapperParams{ + Listener: lis, + ListenerResourceName: lisResourceName, + XDSClient: xdsC, + ModeCallback: vm.verifyModeCallback, + } + lw := NewListenerWrapper(params) + if lw == nil { + t.Fatalf("NewListenerWrapper(%+v) returned nil", params) + } + defer lw.Close() + + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultServerListener(host, port, e2e.SecurityLevelNone, route1)}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Wait for Listener Mode to go serving. + select { + case <-ctx.Done(): + t.Fatalf("timeout waiting for mode change") + case mode := <-modeCh: + if mode != connectivity.ServingModeServing { + t.Fatalf("mode change received: %v, want: %v", mode, connectivity.ServingModeServing) + } + } + + server := grpc.NewServer(grpc.Creds(insecure.NewCredentials())) + testgrpc.RegisterTestServiceServer(server, &testService{}) + wg := sync.WaitGroup{} + go func() { + if err := server.Serve(lw); err != nil { + t.Errorf("failed to serve: %v", err) + } + }() + + // Make 100 connections to the server, and make an RPC on each one. + for i := 0; i < 100; i++ { + cc, err := grpc.NewClient(lw.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.NewClient failed with err: %v", err) + } + client := testgrpc.NewTestServiceClient(cc) + if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Fatalf("client.EmptyCall() failed: %v", err) + } + cc.Close() + } + + lisWrapper := lw.(*listenerWrapper) + // Eventually when the server processes the connection shutdowns, the + // listener wrapper should clear its references to the wrapped connections. + lenConns := 1 + for ; ctx.Err() == nil && lenConns > 0; <-time.After(time.Millisecond) { + lisWrapper.mu.Lock() + lenConns = len(lisWrapper.conns) + lisWrapper.mu.Unlock() + } + if lenConns > 0 { + t.Fatalf("timeout waiting for lis wrapper conns to clear, size: %v", lenConns) + } + server.Stop() + wg.Wait() }