Skip to content

Commit

Permalink
Refactor service idle support
Browse files Browse the repository at this point in the history
* Move connection tracking into APIServer using ConnState()
* Remove Connection counters from CLI code
* Update events handler to support client not closing connection
* Improve logging messages

Fixes containers#5599

Signed-off-by: Jhon Honce <jhonce@redhat.com>
  • Loading branch information
jwhonce authored and snj33v committed May 31, 2020
1 parent f301d4d commit 5582eef
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 98 deletions.
4 changes: 1 addition & 3 deletions cmd/podman/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,7 @@ func runREST(r *libpod.Runtime, uri string, timeout time.Duration) error {
}
}()

err = server.Serve()
logrus.Debugf("%d/%d Active connections/Total connections\n", server.ActiveConnections, server.TotalConnections)
return err
return server.Serve()
}

func runVarlink(r *libpod.Runtime, uri string, timeout time.Duration, c *cliconfig.ServiceValues) error {
Expand Down
19 changes: 16 additions & 3 deletions pkg/api/handlers/compat/events.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package compat

import (
"encoding/json"
"fmt"
"net/http"

Expand All @@ -10,6 +9,7 @@ import (
"github.com/containers/libpod/pkg/api/handlers"
"github.com/containers/libpod/pkg/api/handlers/utils"
"github.com/gorilla/schema"
jsoniter "github.com/json-iterator/go"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -48,14 +48,27 @@ func GetEvents(w http.ResponseWriter, r *http.Request) {
}()
if eventsError != nil {
utils.InternalServerError(w, eventsError)
close(eventChannel)
return
}

coder := json.NewEncoder(w)
coder.SetEscapeHTML(true)
// If client disappears we need to stop listening for events
go func(done <-chan struct{}) {
<-done
close(eventChannel)
}(r.Context().Done())

// Headers need to be written out before turning Writer() over to json encoder
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}

json := jsoniter.ConfigCompatibleWithStandardLibrary
coder := json.NewEncoder(w)
coder.SetEscapeHTML(true)

for event := range eventChannel {
e := handlers.EventToApiEvent(event)
if err := coder.Encode(e); err != nil {
Expand Down
9 changes: 2 additions & 7 deletions pkg/api/server/handler_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,15 @@ func (s *APIServer) APIHandler(h http.HandlerFunc) http.HandlerFunc {
if err != nil {
buf := make([]byte, 1<<20)
n := runtime.Stack(buf, true)
log.Warnf("Recovering from podman handler panic: %v, %s", err, buf[:n])
log.Warnf("Recovering from API handler panic: %v, %s", err, buf[:n])
// Try to inform client things went south... won't work if handler already started writing response body
utils.InternalServerError(w, fmt.Errorf("%v", err))
}
}()

// Wrapper to hide some boiler plate
fn := func(w http.ResponseWriter, r *http.Request) {
// Connection counting, ugh. Needed to support the sliding window for idle checking.
s.ConnectionCh <- EnterHandler
defer func() { s.ConnectionCh <- ExitHandler }()

log.Debugf("APIHandler -- Method: %s URL: %s (conn %d/%d)",
r.Method, r.URL.String(), s.ActiveConnections, s.TotalConnections)
log.Debugf("APIHandler -- Method: %s URL: %s", r.Method, r.URL.String())

if err := r.ParseForm(); err != nil {
log.Infof("Failed Request: unable to parse form: %q", err)
Expand Down
173 changes: 89 additions & 84 deletions pkg/api/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package server

import (
"context"
"log"
"net"
"net/http"
"os"
"os/signal"
"runtime"
"strings"
"sync"
"syscall"
"time"

Expand All @@ -20,26 +23,19 @@ import (
)

type APIServer struct {
http.Server // The HTTP work happens here
*schema.Decoder // Decoder for Query parameters to structs
context.Context // Context to carry objects to handlers
*libpod.Runtime // Where the real work happens
net.Listener // mux for routing HTTP API calls to libpod routines
context.CancelFunc // Stop APIServer
*time.Timer // Hold timer for sliding window
time.Duration // Duration of client access sliding window
ActiveConnections uint64 // Number of handlers holding a connection
TotalConnections uint64 // Number of connections handled
ConnectionCh chan int // Channel for signalling handler enter/exit
http.Server // The HTTP work happens here
*schema.Decoder // Decoder for Query parameters to structs
context.Context // Context to carry objects to handlers
*libpod.Runtime // Where the real work happens
net.Listener // mux for routing HTTP API calls to libpod routines
context.CancelFunc // Stop APIServer
idleTracker *IdleTracker // Track connections to support idle shutdown
}

// Number of seconds to wait for next request, if exceeded shutdown server
const (
DefaultServiceDuration = 300 * time.Second
UnlimitedServiceDuration = 0 * time.Second
EnterHandler = 1
ExitHandler = -1
NOOPHandler = 0
)

// NewServer will create and configure a new API server with all defaults
Expand Down Expand Up @@ -70,17 +66,20 @@ func newServer(runtime *libpod.Runtime, duration time.Duration, listener *net.Li
}

router := mux.NewRouter().UseEncodedPath()
idle := NewIdleTracker(duration)

server := APIServer{
Server: http.Server{
Handler: router,
ReadHeaderTimeout: 20 * time.Second,
IdleTimeout: duration,
ConnState: idle.ConnState,
ErrorLog: log.New(logrus.StandardLogger().Out, "", 0),
},
Decoder: handlers.NewAPIDecoder(),
Runtime: runtime,
Listener: *listener,
Duration: duration,
ConnectionCh: make(chan int),
Decoder: handlers.NewAPIDecoder(),
idleTracker: idle,
Listener: *listener,
Runtime: runtime,
}

router.NotFoundHandler = http.HandlerFunc(
Expand Down Expand Up @@ -120,11 +119,11 @@ func newServer(runtime *libpod.Runtime, duration time.Duration, listener *net.Li
router.Walk(func(route *mux.Route, r *mux.Router, ancestors []*mux.Route) error { // nolint
path, err := route.GetPathTemplate()
if err != nil {
path = ""
path = "<N/A>"
}
methods, err := route.GetMethods()
if err != nil {
methods = []string{}
methods = []string{"<N/A>"}
}
logrus.Debugf("Methods: %s Path: %s", strings.Join(methods, ", "), path)
return nil
Expand All @@ -136,24 +135,20 @@ func newServer(runtime *libpod.Runtime, duration time.Duration, listener *net.Li

// Serve starts responding to HTTP requests
func (s *APIServer) Serve() error {
// This is initialized here as Timer is not needed until Serve'ing
if s.Duration > 0 {
s.Timer = time.AfterFunc(s.Duration, func() {
s.ConnectionCh <- NOOPHandler
})
go s.ReadChannelWithTimeout()
} else {
go s.ReadChannelNoTimeout()
}

sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
errChan := make(chan error, 1)

go func() {
<-s.idleTracker.Done()
logrus.Debugf("API Server idle for %v", s.idleTracker.Duration)
_ = s.Shutdown()
}()

go func() {
err := s.Server.Serve(s.Listener)
if err != nil && err != http.ErrServerClosed {
errChan <- errors.Wrap(err, "Failed to start APIServer")
errChan <- errors.Wrap(err, "failed to start API server")
return
}
errChan <- nil
Expand All @@ -169,76 +164,86 @@ func (s *APIServer) Serve() error {
return nil
}

func (s *APIServer) ReadChannelWithTimeout() {
// stalker to count the connections. Should the timer expire it will shutdown the service.
for delta := range s.ConnectionCh {
switch delta {
case EnterHandler:
s.Timer.Stop()
s.ActiveConnections += 1
s.TotalConnections += 1
case ExitHandler:
s.Timer.Stop()
s.ActiveConnections -= 1
if s.ActiveConnections == 0 {
// Server will be shutdown iff the timer expires before being reset or stopped
s.Timer = time.AfterFunc(s.Duration, func() {
if err := s.Shutdown(); err != nil {
logrus.Errorf("Failed to shutdown APIServer: %v", err)
os.Exit(1)
}
})
} else {
s.Timer.Reset(s.Duration)
}
case NOOPHandler:
// push the check out another duration...
s.Timer.Reset(s.Duration)
default:
logrus.Warnf("ConnectionCh received unsupported input %d", delta)
}
}
}

func (s *APIServer) ReadChannelNoTimeout() {
// stalker to count the connections.
for delta := range s.ConnectionCh {
switch delta {
case EnterHandler:
s.ActiveConnections += 1
s.TotalConnections += 1
case ExitHandler:
s.ActiveConnections -= 1
case NOOPHandler:
default:
logrus.Warnf("ConnectionCh received unsupported input %d", delta)
}
}
}

// Shutdown is a clean shutdown waiting on existing clients
func (s *APIServer) Shutdown() error {
if logrus.IsLevelEnabled(logrus.DebugLevel) {
_, file, line, _ := runtime.Caller(1)
logrus.Debugf("APIServer.Shutdown by %s:%d, %d/%d connection(s)",
file, line, s.idleTracker.ActiveConnections(), s.idleTracker.TotalConnections())
}

// Duration == 0 flags no auto-shutdown of the server
if s.Duration == 0 {
if s.idleTracker.Duration == 0 {
logrus.Debug("APIServer.Shutdown ignored as Duration == 0")
return nil
}
logrus.Debugf("APIServer.Shutdown called %v, conn %d/%d", time.Now(), s.ActiveConnections, s.TotalConnections)

// Gracefully shutdown server
ctx, cancel := context.WithTimeout(context.Background(), s.Duration)
// Gracefully shutdown server, duration of wait same as idle window
ctx, cancel := context.WithTimeout(context.Background(), s.idleTracker.Duration)
defer cancel()

go func() {
err := s.Server.Shutdown(ctx)
if err != nil && err != context.Canceled && err != http.ErrServerClosed {
logrus.Errorf("Failed to cleanly shutdown APIServer: %s", err.Error())
}
}()
<-ctx.Done()
return nil
}

// Close immediately stops responding to clients and exits
func (s *APIServer) Close() error {
return s.Server.Close()
}

type IdleTracker struct {
active map[net.Conn]struct{}
total int
mux sync.Mutex
timer *time.Timer
Duration time.Duration
}

func NewIdleTracker(idle time.Duration) *IdleTracker {
return &IdleTracker{
active: make(map[net.Conn]struct{}),
Duration: idle,
timer: time.NewTimer(idle),
}
}

func (t *IdleTracker) ConnState(conn net.Conn, state http.ConnState) {
t.mux.Lock()
defer t.mux.Unlock()

oldActive := len(t.active)
logrus.Debugf("IdleTracker %p:%v %d/%d connection(s)", conn, state, t.ActiveConnections(), t.TotalConnections())
switch state {
case http.StateNew, http.StateActive, http.StateHijacked:
t.active[conn] = struct{}{}
// stop the timer if we transitioned from idle
if oldActive == 0 {
t.timer.Stop()
}
t.total += 1
case http.StateIdle, http.StateClosed:
delete(t.active, conn)
// Restart the timer if we've become idle
if oldActive > 0 && len(t.active) == 0 {
t.timer.Stop()
t.timer.Reset(t.Duration)
}
}
}

func (t *IdleTracker) ActiveConnections() int {
return len(t.active)
}

func (t *IdleTracker) TotalConnections() int {
return t.total
}

func (t *IdleTracker) Done() <-chan time.Time {
return t.timer.C
}
1 change: 0 additions & 1 deletion pkg/domain/infra/abi/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ func (ic *ContainerEngine) RestService(_ context.Context, opts entities.ServiceO
}()

err = server.Serve()
logrus.Debugf("%d/%d Active connections/Total connections\n", server.ActiveConnections, server.TotalConnections)
_ = listener.Close()
return err
}
Expand Down

0 comments on commit 5582eef

Please sign in to comment.