Skip to content

Commit

Permalink
Add config, logging for healthcheck
Browse files Browse the repository at this point in the history
  • Loading branch information
dchw committed Aug 2, 2022
1 parent 6805940 commit 99ff8de
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 9 deletions.
10 changes: 10 additions & 0 deletions cmd/buildkitd/config/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package config

import (
"time"

resolverconfig "github.com/moby/buildkit/util/resolver/config"
)

Expand All @@ -24,6 +26,8 @@ type Config struct {
Registries map[string]resolverconfig.RegistryConfig `toml:"registry"`

DNS *DNSConfig `toml:"dns"`

Health HealthConfig `toml:"health"`
}

type GRPCConfig struct {
Expand All @@ -37,6 +41,12 @@ type GRPCConfig struct {
// MaxSendMsgSize int `toml:"max_send_message_size"`
}

type HealthConfig struct {
Frequency time.Duration `toml:"frequency"`
Timeout time.Duration `toml:"timeout"`
AllowedFailures int `toml:"allowedFailures"`
}

type TLSConfig struct {
Cert string `toml:"cert"`
Key string `toml:"key"`
Expand Down
18 changes: 17 additions & 1 deletion cmd/buildkitd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,18 @@ func setDefaultConfig(cfg *config.Config) {
cfg.GRPC.Address = []string{appdefaults.Address}
}

if cfg.Health.Frequency == 0 {
cfg.Health.Frequency = appdefaults.HealthFrequency
}

if cfg.Health.Timeout == 0 {
cfg.Health.Timeout = appdefaults.HealthTimeout
}

if cfg.Health.AllowedFailures == 0 {
cfg.Health.AllowedFailures = appdefaults.HealthAllowedFailures
}

if cfg.Workers.OCI.Platforms == nil {
cfg.Workers.OCI.Platforms = formatPlatforms(archutil.SupportedPlatforms(false))
}
Expand Down Expand Up @@ -612,7 +624,11 @@ func serverCredentials(cfg config.TLSConfig) (*tls.Config, error) {
}

func newController(c *cli.Context, cfg *config.Config) (*control.Controller, error) {
sessionManager, err := session.NewManager()
sessionManager, err := session.NewManager(&session.ManagerOpt{
HealthFrequency: cfg.Health.Frequency,
HealthAllowedFailures: cfg.Health.AllowedFailures,
HealthTimeout: cfg.Health.Timeout,
})
if err != nil {
return nil, err
}
Expand Down
8 changes: 8 additions & 0 deletions docs/buildkitd.toml.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ insecure-entitlements = [ "network.host", "security.insecure" ]
key = "/etc/buildkit/tls.key"
ca = "/etc/buildkit/tlsca.crt"
[health]
# How often to run the healthcheck
frequency = "10s"
# How long a healthcheck can take to respond
timeout = "1m"
# How many failures can be tolerated before cancelling
allowedFailures = 3
[worker.oci]
enabled = true
# platforms is manually configure platforms, detected automatically if unset.
Expand Down
35 changes: 29 additions & 6 deletions session/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/moby/buildkit/util/bklog"
"github.com/moby/buildkit/util/grpcerrors"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/trace"
"golang.org/x/net/http2"
Expand All @@ -27,7 +28,7 @@ func serve(ctx context.Context, grpcServer *grpc.Server, conn net.Conn) {
(&http2.Server{}).ServeConn(conn, &http2.ServeConnOpts{Handler: grpcServer})
}

func grpcClientConn(ctx context.Context, conn net.Conn) (context.Context, *grpc.ClientConn, error) {
func grpcClientConn(ctx context.Context, conn net.Conn, healthCfg ManagerHealthCfg) (context.Context, *grpc.ClientConn, error) {
var unary []grpc.UnaryClientInterceptor
var stream []grpc.StreamClientInterceptor

Expand Down Expand Up @@ -70,29 +71,51 @@ func grpcClientConn(ctx context.Context, conn net.Conn) (context.Context, *grpc.
}

ctx, cancel := context.WithCancel(ctx)
go monitorHealth(ctx, cc, cancel)
go monitorHealth(ctx, cc, cancel, healthCfg)

return ctx, cc, nil
}

func monitorHealth(ctx context.Context, cc *grpc.ClientConn, cancelConn func()) {
func monitorHealth(ctx context.Context, cc *grpc.ClientConn, cancelConn func(), healthCfg ManagerHealthCfg) {
defer cancelConn()
defer cc.Close()

ticker := time.NewTicker(1 * time.Second)
ticker := time.NewTicker(healthCfg.frequency)
defer ticker.Stop()
healthClient := grpc_health_v1.NewHealthClient(cc)

consecutiveFailures := 0

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
timeoutStart := time.Now().UTC()

ctx, cancel := context.WithTimeout(ctx, healthCfg.timeout)
_, err := healthClient.Check(ctx, &grpc_health_v1.HealthCheckRequest{})
cancel()

logFields := logrus.Fields{
"timeout": healthCfg.timeout,
"actualDuration": time.Since(timeoutStart),
}

if err != nil {
return
consecutiveFailures++

logFields["allowedFailures"] = healthCfg.allowedFailures
logFields["consecutiveFailures"] = consecutiveFailures
bklog.G(ctx).WithFields(logFields).Warn("healthcheck failed")

if consecutiveFailures >= healthCfg.allowedFailures {
bklog.G(ctx).Error("healthcheck failed too many times")
return
}
} else {
bklog.G(ctx).WithFields(logFields).Debug("healthcheck completed")
consecutiveFailures = 0
}
}
}
Expand Down
24 changes: 22 additions & 2 deletions session/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/http"
"strings"
"sync"
"time"

"github.com/pkg/errors"
"google.golang.org/grpc"
Expand All @@ -31,12 +32,31 @@ type Manager struct {
sessions map[string]*client
mu sync.Mutex
updateCondition *sync.Cond
healthCfg ManagerHealthCfg
}

// ManagerHealthCfg is the healthcheck configuration for gRPC healthchecks
type ManagerHealthCfg struct {
frequency time.Duration
timeout time.Duration
allowedFailures int
}

type ManagerOpt struct {
HealthFrequency time.Duration
HealthTimeout time.Duration
HealthAllowedFailures int
}

// NewManager returns a new Manager
func NewManager() (*Manager, error) {
func NewManager(opt *ManagerOpt) (*Manager, error) {
sm := &Manager{
sessions: make(map[string]*client),
healthCfg: ManagerHealthCfg{
frequency: opt.HealthFrequency,
timeout: opt.HealthTimeout,
allowedFailures: opt.HealthAllowedFailures,
},
}
sm.updateCondition = sync.NewCond(&sm.mu)
return sm, nil
Expand Down Expand Up @@ -109,7 +129,7 @@ func (sm *Manager) handleConn(ctx context.Context, conn net.Conn, opts map[strin
name := h.Get(headerSessionName)
sharedKey := h.Get(headerSessionSharedKey)

ctx, cc, err := grpcClientConn(ctx, conn)
ctx, cc, err := grpcClientConn(ctx, conn, sm.healthCfg)
if err != nil {
sm.mu.Unlock()
return err
Expand Down
9 changes: 9 additions & 0 deletions util/appdefaults/appdefaults_all.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package appdefaults

import "time"

const (
HealthAllowedFailures = 1
HealthFrequency = 1 * time.Second
HealthTimeout = 10 * time.Second
)

0 comments on commit 99ff8de

Please sign in to comment.