Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NETOBSERV-1102: fine-tuning http server settings #545

Merged
merged 1 commit into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/flowlogs-pipeline/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,14 +199,15 @@ func run() {
}

// Start health report server
operational.NewHealthServer(&opts, mainPipeline.IsAlive, mainPipeline.IsReady)
healthServer := operational.NewHealthServer(&opts, mainPipeline.IsAlive, mainPipeline.IsReady)

// Starts the flows pipeline
mainPipeline.Run()

if promServer != nil {
_ = promServer.Shutdown(context.Background())
}
_ = healthServer.Shutdown(context.Background())

// Give all threads a chance to exit and then exit the process
time.Sleep(time.Second)
Expand Down
35 changes: 14 additions & 21 deletions pkg/operational/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,35 +24,28 @@ import (

"github.com/heptiolabs/healthcheck"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/server"
log "github.com/sirupsen/logrus"
)

type Server struct {
handler healthcheck.Handler
Address string
}

func (hs *Server) Serve() {
for {
err := http.ListenAndServe(hs.Address, hs.handler)
log.Errorf("http.ListenAndServe error %v", err)
time.Sleep(60 * time.Second)
}
}

func NewHealthServer(opts *config.Options, isAlive healthcheck.Check, isReady healthcheck.Check) *Server {

func NewHealthServer(opts *config.Options, isAlive healthcheck.Check, isReady healthcheck.Check) *http.Server {
handler := healthcheck.NewHandler()
address := net.JoinHostPort(opts.Health.Address, opts.Health.Port)
handler.AddLivenessCheck("PipelineCheck", isAlive)
handler.AddReadinessCheck("PipelineCheck", isReady)

server := &Server{
handler: handler,
Address: address,
}

go server.Serve()
server := server.Default(&http.Server{
Handler: handler,
Addr: address,
})

go func() {
for {
err := server.ListenAndServe()
log.Errorf("http.ListenAndServe error %v", err)
time.Sleep(60 * time.Second)
}
}()

return server
}
2 changes: 1 addition & 1 deletion pkg/pipeline/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestNewHealthServer(t *testing.T) {
expectedAddr := fmt.Sprintf("%s:%s", opts.Health.Address, opts.Health.Port)
server := operational.NewHealthServer(&opts, tt.args.pipeline.IsAlive, tt.args.pipeline.IsReady)
require.NotNil(t, server)
require.Equal(t, expectedAddr, server.Address)
require.Equal(t, expectedAddr, server.Addr)

client := &http.Client{}

Expand Down
6 changes: 4 additions & 2 deletions pkg/prometheus/prom_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/server"
prom "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -63,7 +64,7 @@ func StartServerAsync(conn *api.PromConnectionInfo, registry *prom.Registry) *ht
addr := fmt.Sprintf("%s:%v", conn.Address, port)
plog.Infof("StartServerAsync: addr = %s", addr)

httpServer := http.Server{
httpServer := &http.Server{
Addr: addr,
// TLS clients must use TLS 1.2 or higher
TLSConfig: &tls.Config{
Expand All @@ -79,6 +80,7 @@ func StartServerAsync(conn *api.PromConnectionInfo, registry *prom.Registry) *ht
mux.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{}))
}
httpServer.Handler = mux
httpServer = server.Default(httpServer)

go func() {
var err error
Expand All @@ -92,5 +94,5 @@ func StartServerAsync(conn *api.PromConnectionInfo, registry *prom.Registry) *ht
}
}()

return &httpServer
return httpServer
}
81 changes: 81 additions & 0 deletions pkg/prometheus/prom_server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package prometheus

import (
"context"
"fmt"
"io"
"net/http"
"strings"
"testing"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestStartPromServer(t *testing.T) {
srv := InitializePrometheus(&config.MetricsSettings{})

serverURL := "http://0.0.0.0:9090"
t.Logf("Started test http server: %v", serverURL)

httpClient := &http.Client{}

// wait for our test http server to come up
checkHTTPReady(httpClient, serverURL)

r, err := http.NewRequest("GET", serverURL+"/metrics", nil)
require.NoError(t, err)

resp, err := httpClient.Do(r)
require.NoError(t, err)
defer resp.Body.Close()

bodyBytes, err := io.ReadAll(resp.Body)
require.NoError(t, err)

bodyString := string(bodyBytes)
require.Equal(t, http.StatusOK, resp.StatusCode)
require.Contains(t, bodyString, "go_gc_duration_seconds")

_ = srv.Shutdown(context.Background())
}

func TestStartPromServer_HeadersLimit(t *testing.T) {
srv := InitializePrometheus(&config.MetricsSettings{})

serverURL := "http://0.0.0.0:9090"
t.Logf("Started test http server: %v", serverURL)

httpClient := &http.Client{}

// wait for our test http server to come up
checkHTTPReady(httpClient, serverURL)

r, err := http.NewRequest("GET", serverURL+"/metrics", nil)
require.NoError(t, err)

// Set many headers
oneKBString := strings.Repeat(".", 1024)
for i := 0; i < 1025; i++ {
r.Header.Set(fmt.Sprintf("test-header-%d", i), oneKBString)
}

resp, err := httpClient.Do(r)
require.NoError(t, err)
defer resp.Body.Close()
assert.Equal(t, http.StatusRequestHeaderFieldsTooLarge, resp.StatusCode)

_ = srv.Shutdown(context.Background())
}

func checkHTTPReady(httpClient *http.Client, url string) {
for i := 0; i < 60; i++ {
if r, err := httpClient.Get(url); err == nil {
r.Body.Close()
break
}
time.Sleep(time.Second)
}
}
46 changes: 46 additions & 0 deletions pkg/server/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package server

import (
"crypto/tls"
"net/http"
"time"

"github.com/sirupsen/logrus"
)

var slog = logrus.WithField("module", "server")

func Default(srv *http.Server) *http.Server {
// defaults taken from https://bruinsslot.jp/post/go-secure-webserver/ can be overriden by caller
if srv.Handler != nil {
// No more than 2MB body
srv.Handler = http.MaxBytesHandler(srv.Handler, 2<<20)
} else {
slog.Warnf("Handler not yet set on server while securing defaults. Make sure a MaxByte middleware is used.")
}
if srv.ReadTimeout == 0 {
srv.ReadTimeout = 10 * time.Second
}
if srv.ReadHeaderTimeout == 0 {
srv.ReadHeaderTimeout = 5 * time.Second
}
if srv.WriteTimeout == 0 {
srv.WriteTimeout = 10 * time.Second
}
if srv.IdleTimeout == 0 {
srv.IdleTimeout = 120 * time.Second
}
if srv.MaxHeaderBytes == 0 {
srv.MaxHeaderBytes = 1 << 20 // 1MB
}
if srv.TLSConfig == nil {
srv.TLSConfig = &tls.Config{}
}
if srv.TLSConfig.MinVersion == 0 {
srv.TLSConfig.MinVersion = tls.VersionTLS13
}
// Disable http/2
srv.TLSNextProto = make(map[string]func(*http.Server, *tls.Conn, http.Handler), 0)

return srv
}