From 45e603f052b294f5ba1993c6676d991f866881dd Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Sun, 4 Aug 2019 05:43:54 +0200 Subject: [PATCH 1/2] util/netutil: add missing explanations on MakeServer() the `netutil.Server` is really a multi-purpose thing. This patch extends the comment to explain what it does in more details. Release note: None --- pkg/util/netutil/net.go | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/pkg/util/netutil/net.go b/pkg/util/netutil/net.go index 672afff72b6a..dfa8026f682a 100644 --- a/pkg/util/netutil/net.go +++ b/pkg/util/netutil/net.go @@ -61,8 +61,19 @@ type Server struct { *http.Server } -// MakeServer constructs a Server that tracks active connections, closing them -// when signaled by stopper. +// MakeServer constructs a Server that tracks active connections, +// closing them when signaled by stopper. +// +// It can serve two different purposes simultaneously: +// +// - to serve as actual HTTP server, using the .Serve(net.Listener) method. +// - to serve as plain TCP server, using the .ServeWith(...) method. +// +// The latter is used e.g. to accept SQL client connections. +// +// When the HTTP facility is not used, the Go HTTP server object is +// still used internally to maintain/register the connections via the +// ConnState() method, for convenience. func MakeServer(stopper *stop.Stopper, tlsConfig *tls.Config, handler http.Handler) Server { var mu syncutil.Mutex activeConns := make(map[net.Conn]struct{}) From 9f2752b2b2f4801a94b3689cd8022e1fee11d347 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Sun, 4 Aug 2019 05:55:42 +0200 Subject: [PATCH 2/2] server: factor and simplify the network setup logic This patch performs the following cleanups and refactors: - adds explanatory comments; - factors the call to net.Listen and starting the server for the admin UI into a new method `(*Server).startServeUI()`. - factors the call to net.Listen and starting the RPC server into a new method `(*Server).startServeRPC()`. - factors the start of the SQL server into a new method `(*Server).startServeSQL()`. - extracts the TCP keepalive configuration function into a top-level function (was a closure). Release note: None --- pkg/server/server.go | 453 +++++++++++++++++++++++++------------------ 1 file changed, 267 insertions(+), 186 deletions(-) diff --git a/pkg/server/server.go b/pkg/server/server.go index 960d294d6e51..78137cbad3ac 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -1103,166 +1103,47 @@ func (s *Server) Start(ctx context.Context) error { } ctx = s.AnnotateCtx(ctx) + // Start the time sanity checker. s.startTime = timeutil.Now() s.startMonitoringForwardClockJumps(ctx) + // Connect the node as loopback handler for RPC requests to the + // local node. + s.rpcContext.SetLocalInternalServer(s.node) + + // Load the TLS configuration for the HTTP server. uiTLSConfig, err := s.cfg.GetUIServerTLSConfig() if err != nil { return err } - httpServer := netutil.MakeServer(s.stopper, uiTLSConfig, s) - - // The following code is a specialization of util/net.go's ListenAndServe - // which adds pgwire support. A single port is used to serve all protocols - // (pg, http, h2) via the following construction: - // - // non-TLS case: - // net.Listen -> cmux.New - // | - // - -> pgwire.Match -> pgwire.Server.ServeConn - // - -> cmux.Any -> grpc.(*Server).Serve - // - // TLS case: - // net.Listen -> cmux.New - // | - // - -> pgwire.Match -> pgwire.Server.ServeConn - // - -> cmux.Any -> grpc.(*Server).Serve - // - // Note that the difference between the TLS and non-TLS cases exists due to - // Go's lack of an h2c (HTTP2 Clear Text) implementation. See inline comments - // in util.ListenAndServe for an explanation of how h2c is implemented there - // and here. - - ln, err := net.Listen("tcp", s.cfg.Addr) - if err != nil { - return ListenError{error: err, Addr: s.cfg.Addr} - } - if err := base.UpdateAddrs(ctx, &s.cfg.Addr, &s.cfg.AdvertiseAddr, ln.Addr()); err != nil { - return errors.Wrapf(err, "internal error: cannot parse listen address") - } - log.Eventf(ctx, "listening on port %s", s.cfg.Addr) - - s.rpcContext.SetLocalInternalServer(s.node) - - // The cmux matches don't shut down properly unless serve is called on the - // cmux at some point. Use serveOnMux to ensure it's called during shutdown - // if we wouldn't otherwise reach the point where we start serving on it. - var serveOnMux sync.Once - m := cmux.New(ln) - - pgL := m.Match(func(r io.Reader) bool { - return pgwire.Match(r) - }) - - anyL := m.Match(cmux.Any()) - - httpLn, err := net.Listen("tcp", s.cfg.HTTPAddr) - if err != nil { - return ListenError{ - error: err, - Addr: s.cfg.HTTPAddr, - } - } - if err := base.UpdateAddrs(ctx, &s.cfg.HTTPAddr, &s.cfg.HTTPAdvertiseAddr, httpLn.Addr()); err != nil { - return errors.Wrapf(err, "internal error: cannot parse http listen address") - } + // connManager tracks incoming connections accepted via listeners + // and automatically closes them when the stopper indicates a + // shutdown. + // This handles both: + // - HTTP connections for the admin UI with an optional TLS handshake over HTTP. + // - SQL client connections with a TLS handshake over TCP. + // (gRPC connections are handled separately via s.grpc and perform + // their TLS handshake on their own) + connManager := netutil.MakeServer(s.stopper, uiTLSConfig, s) + // Start a context for the asynchronous network workers. workersCtx := s.AnnotateCtx(context.Background()) - s.stopper.RunWorker(workersCtx, func(workersCtx context.Context) { - <-s.stopper.ShouldQuiesce() - if err := httpLn.Close(); err != nil { - log.Fatal(workersCtx, err) - } - }) - - if uiTLSConfig != nil { - httpMux := cmux.New(httpLn) - clearL := httpMux.Match(cmux.HTTP1()) - tlsL := httpMux.Match(cmux.Any()) - - s.stopper.RunWorker(workersCtx, func(context.Context) { - netutil.FatalIfUnexpected(httpMux.Serve()) - }) - - s.stopper.RunWorker(workersCtx, func(context.Context) { - mux := http.NewServeMux() - mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - http.Redirect(w, r, "https://"+r.Host+r.RequestURI, http.StatusTemporaryRedirect) - }) - mux.Handle("/health", s) - - plainRedirectServer := netutil.MakeServer(s.stopper, uiTLSConfig, mux) - - netutil.FatalIfUnexpected(plainRedirectServer.Serve(clearL)) - }) - - httpLn = tls.NewListener(tlsL, uiTLSConfig) + // Start the admin UI server. This opens the HTTP listen socket, + // optionally sets up TLS, and dispatches the server worker for the + // web UI. + if err := s.startServeUI(ctx, workersCtx, connManager, uiTLSConfig); err != nil { + return err } - s.stopper.RunWorker(workersCtx, func(context.Context) { - netutil.FatalIfUnexpected(httpServer.Serve(httpLn)) - }) - - s.stopper.RunWorker(workersCtx, func(context.Context) { - <-s.stopper.ShouldQuiesce() - // TODO(bdarnell): Do we need to also close the other listeners? - netutil.FatalIfUnexpected(anyL.Close()) - <-s.stopper.ShouldStop() - s.grpc.Stop() - serveOnMux.Do(func() { - // A cmux can't gracefully shut down without Serve being called on it. - netutil.FatalIfUnexpected(m.Serve()) - }) - }) - - s.stopper.RunWorker(workersCtx, func(context.Context) { - netutil.FatalIfUnexpected(s.grpc.Serve(anyL)) - }) - - // Running the SQL migrations safely requires that we aren't serving SQL - // requests at the same time -- to ensure that, block the serving of SQL - // traffic until the migrations are done, as indicated by this channel. - serveSQL := make(chan bool) - - tcpKeepAlive := envutil.EnvOrDefaultDuration("COCKROACH_SQL_TCP_KEEP_ALIVE", time.Minute) - var loggedKeepAliveStatus int32 - - // Attempt to set TCP keep-alive on connection. Don't fail on errors. - setTCPKeepAlive := func(ctx context.Context, conn net.Conn) { - if tcpKeepAlive == 0 { - return - } - - muxConn, ok := conn.(*cmux.MuxConn) - if !ok { - return - } - tcpConn, ok := muxConn.Conn.(*net.TCPConn) - if !ok { - return - } - - // Only log success/failure once. - doLog := atomic.CompareAndSwapInt32(&loggedKeepAliveStatus, 0, 1) - if err := tcpConn.SetKeepAlive(true); err != nil { - if doLog { - log.Warningf(ctx, "failed to enable TCP keep-alive for pgwire: %v", err) - } - return - - } - if err := tcpConn.SetKeepAlivePeriod(tcpKeepAlive); err != nil { - if doLog { - log.Warningf(ctx, "failed to set TCP keep-alive duration for pgwire: %v", err) - } - return - } - - if doLog { - log.VEventf(ctx, 2, "setting TCP keep-alive to %s for pgwire", tcpKeepAlive) - } + // Start the RPC server. This opens the RPC/SQL listen socket, + // and dispatches the server worker for the RPC. + // The SQL listener is returned, to start the SQL server later + // below when the server has initialized. + pgL, startRPCServer, err := s.startServeRPC(ctx, workersCtx) + if err != nil { + return err } // Enable the debug endpoints first to provide an earlier window into what's @@ -1428,11 +1309,7 @@ func (s *Server) Start(ctx context.Context) error { // // TODO(knz): This may need tweaking when #24118 is addressed. - s.stopper.RunWorker(workersCtx, func(context.Context) { - serveOnMux.Do(func() { - netutil.FatalIfUnexpected(m.Serve()) - }) - }) + startRPCServer(workersCtx) ready := make(chan struct{}) if s.cfg.ReadyFn != nil { @@ -1486,11 +1363,7 @@ func (s *Server) Start(ctx context.Context) error { } // This opens the main listener. - s.stopper.RunWorker(workersCtx, func(context.Context) { - serveOnMux.Do(func() { - netutil.FatalIfUnexpected(m.Serve()) - }) - }) + startRPCServer(workersCtx) // We ran this before, but might've bootstrapped in the meantime. This time // we'll get the actual list of bootstrapped and empty engines. @@ -1661,10 +1534,9 @@ func (s *Server) Start(ctx context.Context) error { } } log.Infof(ctx, "done ensuring all necessary migrations have run") - close(serveSQL) - log.Info(ctx, "serving sql connections") - // Start servicing SQL connections. + // Start garbage collecting system events. + s.startSystemLogsGC(ctx) // Serve UI assets. // @@ -1707,22 +1579,188 @@ func (s *Server) Start(ctx context.Context) error { // Attempt to upgrade cluster version. s.startAttemptUpgrade(ctx) + // Start serving SQL clients. + if err := s.startServeSQL(ctx, workersCtx, connManager, pgL); err != nil { + return err + } + + // Record that this node joined the cluster in the event log. Since this + // executes a SQL query, this must be done after the SQL layer is ready. + s.node.recordJoinEvent() + + // Delete all orphaned table leases created by a prior instance of this + // node. This also uses SQL. + s.leaseMgr.DeleteOrphanedLeases(timeThreshold) + + log.Event(ctx, "server ready") + + return nil +} + +// startServeRPC starts the RPC and SQL listeners. +// It returns the SQL listener, which can be used +// to start the SQL server when initialization has completed. +// It also returns a function that starts the RPC server, +// when the cluster is known to have bootstrapped or +// when waiting for init(). +func (s *Server) startServeRPC( + ctx, workersCtx context.Context, +) (sqlListener net.Listener, startRPCServer func(ctx context.Context), err error) { + ln, err := listen(ctx, &s.cfg.Addr, &s.cfg.AdvertiseAddr, "rpc/sql") + if err != nil { + return nil, nil, err + } + log.Eventf(ctx, "listening on port %s", s.cfg.Addr) + + // The following code is a specialization of util/net.go's ListenAndServe + // which adds pgwire support. A single port is used to serve all protocols + // (pg, http, h2) via the following construction: + // + // non-TLS case: + // net.Listen -> cmux.New + // | + // - -> pgwire.Match -> pgwire.Server.ServeConn + // - -> cmux.Any -> grpc.(*Server).Serve + // + // TLS case: + // net.Listen -> cmux.New + // | + // - -> pgwire.Match -> pgwire.Server.ServeConn + // - -> cmux.Any -> grpc.(*Server).Serve + // + // Note that the difference between the TLS and non-TLS cases exists due to + // Go's lack of an h2c (HTTP2 Clear Text) implementation. See inline comments + // in util.ListenAndServe for an explanation of how h2c is implemented there + // and here. + + // serveOnMux is used to ensure that the mux gets listened on eventually, + // either via the returned startRPCServer() or upon stopping. + var serveOnMux sync.Once + + m := cmux.New(ln) + + pgL := m.Match(func(r io.Reader) bool { + return pgwire.Match(r) + }) + + anyL := m.Match(cmux.Any()) + + // The remainder shutdown worker. + s.stopper.RunWorker(workersCtx, func(context.Context) { + <-s.stopper.ShouldQuiesce() + // TODO(bdarnell): Do we need to also close the other listeners? + netutil.FatalIfUnexpected(anyL.Close()) + <-s.stopper.ShouldStop() + s.grpc.Stop() + serveOnMux.Do(func() { + // The cmux matches don't shut down properly unless serve is called on the + // cmux at some point. Use serveOnMux to ensure it's called during shutdown + // if we wouldn't otherwise reach the point where we start serving on it. + netutil.FatalIfUnexpected(m.Serve()) + }) + }) + + // Serve the gRPC endpoint. + s.stopper.RunWorker(workersCtx, func(context.Context) { + netutil.FatalIfUnexpected(s.grpc.Serve(anyL)) + }) + + // startRPCServer starts the RPC server. We do not do this + // immediately because we want the cluster to be ready (or ready to + // initialize) before we accept RPC requests. The caller + // (Server.Start) will call this at the right moment. + startRPCServer = func(ctx context.Context) { + s.stopper.RunWorker(ctx, func(context.Context) { + serveOnMux.Do(func() { + netutil.FatalIfUnexpected(m.Serve()) + }) + }) + } + + return pgL, startRPCServer, nil +} + +func (s *Server) startServeUI( + ctx, workersCtx context.Context, connManager netutil.Server, uiTLSConfig *tls.Config, +) error { + httpLn, err := listen(ctx, &s.cfg.HTTPAddr, &s.cfg.HTTPAdvertiseAddr, "http") + if err != nil { + return err + } + log.Eventf(ctx, "listening on http port %s", s.cfg.HTTPAddr) + + // The HTTP listener shutdown worker, which closes everything under + // the HTTP port when the stopper indicates we are shutting down. + s.stopper.RunWorker(workersCtx, func(workersCtx context.Context) { + <-s.stopper.ShouldQuiesce() + if err := httpLn.Close(); err != nil { + log.Fatal(workersCtx, err) + } + }) + + if uiTLSConfig != nil { + httpMux := cmux.New(httpLn) + clearL := httpMux.Match(cmux.HTTP1()) + tlsL := httpMux.Match(cmux.Any()) + + // Dispatch incoming requests to either clearL or tlsL. + s.stopper.RunWorker(workersCtx, func(context.Context) { + netutil.FatalIfUnexpected(httpMux.Serve()) + }) + + // Serve the plain HTTP (non-TLS) connection over clearL. + // This produces a HTTP redirect to the `https` URL for the path /, + // handles the request normally (via s.ServeHTTP) for the path /health, + // and produces 404 for anything else. + s.stopper.RunWorker(workersCtx, func(context.Context) { + mux := http.NewServeMux() + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + http.Redirect(w, r, "https://"+r.Host+r.RequestURI, http.StatusTemporaryRedirect) + }) + mux.Handle("/health", s) + + plainRedirectServer := netutil.MakeServer(s.stopper, uiTLSConfig, mux) + + netutil.FatalIfUnexpected(plainRedirectServer.Serve(clearL)) + }) + + httpLn = tls.NewListener(tlsL, uiTLSConfig) + } + + // Serve the HTTP endpoint. This will be the original httpLn + // listening on --http-listen-addr without TLS if uiTLSConfig was + // nil, or overridden above if uiTLSConfig was not nil to come from + // the TLS negotiation over the HTTP port. + s.stopper.RunWorker(workersCtx, func(context.Context) { + netutil.FatalIfUnexpected(connManager.Serve(httpLn)) + }) + + return nil +} + +func (s *Server) startServeSQL( + ctx, workersCtx context.Context, connManager netutil.Server, pgL net.Listener, +) error { + log.Info(ctx, "serving sql connections") + // Start servicing SQL connections. + pgCtx := s.pgServer.AmbientCtx.AnnotateCtx(context.Background()) + tcpKeepAlive := tcpKeepAliveManager{ + tcpKeepAlive: envutil.EnvOrDefaultDuration("COCKROACH_SQL_TCP_KEEP_ALIVE", time.Minute), + } + s.stopper.RunWorker(pgCtx, func(pgCtx context.Context) { - select { - case <-serveSQL: - case <-s.stopper.ShouldQuiesce(): - return - } - netutil.FatalIfUnexpected(httpServer.ServeWith(pgCtx, s.stopper, pgL, func(conn net.Conn) { + netutil.FatalIfUnexpected(connManager.ServeWith(pgCtx, s.stopper, pgL, func(conn net.Conn) { connCtx := logtags.AddTag(pgCtx, "client", conn.RemoteAddr().String()) - setTCPKeepAlive(connCtx, conn) + tcpKeepAlive.configure(connCtx, conn) if err := s.pgServer.ServeConn(connCtx, conn); err != nil { log.Error(connCtx, err) } })) }) + + // If a unix socket was requested, start serving there too. if len(s.cfg.SocketFile) != 0 { log.Infof(ctx, "starting postgres server at unix:%s", s.cfg.SocketFile) @@ -1740,12 +1778,7 @@ func (s *Server) Start(ctx context.Context) error { }) s.stopper.RunWorker(pgCtx, func(pgCtx context.Context) { - select { - case <-serveSQL: - case <-s.stopper.ShouldQuiesce(): - return - } - netutil.FatalIfUnexpected(httpServer.ServeWith(pgCtx, s.stopper, unixLn, func(conn net.Conn) { + netutil.FatalIfUnexpected(connManager.ServeWith(pgCtx, s.stopper, unixLn, func(conn net.Conn) { connCtx := logtags.AddTag(pgCtx, "client", conn.RemoteAddr().String()) if err := s.pgServer.ServeConn(connCtx, conn); err != nil { log.Error(connCtx, err) @@ -1753,19 +1786,6 @@ func (s *Server) Start(ctx context.Context) error { })) }) } - - s.startSystemLogsGC(ctx) - - // Record that this node joined the cluster in the event log. Since this - // executes a SQL query, this must be done after the SQL layer is ready. - s.node.recordJoinEvent() - - // Delete all orphaned table leases created by a prior instance of this - // node. - s.leaseMgr.DeleteOrphanedLeases(timeThreshold) - - log.Event(ctx, "server ready") - return nil } @@ -2102,3 +2122,64 @@ func (w *gzipResponseWriter) Close() error { func init() { tracing.RegisterTagRemapping("n", "node") } + +// configure attempts to set TCP keep-alive on +// connection. Does not fail on errors. +func (k *tcpKeepAliveManager) configure(ctx context.Context, conn net.Conn) { + if k.tcpKeepAlive == 0 { + return + } + + muxConn, ok := conn.(*cmux.MuxConn) + if !ok { + return + } + tcpConn, ok := muxConn.Conn.(*net.TCPConn) + if !ok { + return + } + + // Only log success/failure once. + doLog := atomic.CompareAndSwapInt32(&k.loggedKeepAliveStatus, 0, 1) + if err := tcpConn.SetKeepAlive(true); err != nil { + if doLog { + log.Warningf(ctx, "failed to enable TCP keep-alive for pgwire: %v", err) + } + return + + } + if err := tcpConn.SetKeepAlivePeriod(k.tcpKeepAlive); err != nil { + if doLog { + log.Warningf(ctx, "failed to set TCP keep-alive duration for pgwire: %v", err) + } + return + } + + if doLog { + log.VEventf(ctx, 2, "setting TCP keep-alive to %s for pgwire", k.tcpKeepAlive) + } +} + +type tcpKeepAliveManager struct { + // The keepalive duration. + tcpKeepAlive time.Duration + // loggedKeepAliveStatus ensures that errors about setting the TCP + // keepalive status are only reported once. + loggedKeepAliveStatus int32 +} + +func listen( + ctx context.Context, addr, advertiseAddr *string, connName string, +) (net.Listener, error) { + ln, err := net.Listen("tcp", *addr) + if err != nil { + return nil, ListenError{ + error: err, + Addr: *addr, + } + } + if err := base.UpdateAddrs(ctx, addr, advertiseAddr, ln.Addr()); err != nil { + return nil, errors.Wrapf(err, "internal error: cannot parse %s listen address", connName) + } + return ln, nil +}