Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cache: capture metrics related to cache records and pruning #4476

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 25 additions & 7 deletions cache/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
imagespecidentity "github.com/opencontainers/image-spec/identity"
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
"golang.org/x/sync/errgroup"
)

Expand All @@ -49,6 +51,7 @@ type ManagerOpt struct {
Differ diff.Comparer
MetadataStore *metadata.Store
MountPoolRoot string
MeterProvider metric.MeterProvider
}

type Accessor interface {
Expand Down Expand Up @@ -97,6 +100,7 @@ type cacheManager struct {

muPrune sync.Mutex // make sure parallel prune is not allowed so there will not be inconsistent results
unlazyG flightcontrol.Group[struct{}]
metrics *metrics
}

func NewManager(opt ManagerOpt) (Manager, error) {
Expand Down Expand Up @@ -124,6 +128,12 @@ func NewManager(opt ManagerOpt) (Manager, error) {

// cm.scheduleGC(5 * time.Minute)

mp := opt.MeterProvider
if mp == nil {
mp = noop.NewMeterProvider()
}
cm.metrics = newMetrics(cm, mp)

return cm, nil
}

Expand Down Expand Up @@ -339,6 +349,7 @@ func (cm *cacheManager) IdentityMapping() *idtools.IdentityMapping {
// method should be called after Close.
func (cm *cacheManager) Close() error {
// TODO: allocate internal context and cancel it here
_ = cm.metrics.Close()
return cm.MetadataStore.Close()
}

Expand Down Expand Up @@ -1000,17 +1011,24 @@ func (cm *cacheManager) createDiffRef(ctx context.Context, parents parentRefs, d
}

func (cm *cacheManager) Prune(ctx context.Context, ch chan client.UsageInfo, opts ...client.PruneInfo) error {
cm.muPrune.Lock()
if err := func() error {
record := cm.metrics.MeasurePrune()
defer record(ctx)

for _, opt := range opts {
if err := cm.pruneOnce(ctx, ch, opt); err != nil {
cm.muPrune.Unlock()
return err
cm.muPrune.Lock()
defer cm.muPrune.Unlock()

for _, opt := range opts {
if err := cm.pruneOnce(ctx, ch, opt); err != nil {
cm.muPrune.Unlock()
return err
}
}
return nil
}(); err != nil {
return err
}

cm.muPrune.Unlock()

if cm.GarbageCollect != nil {
if _, err := cm.GarbageCollect(ctx); err != nil {
return err
Expand Down
85 changes: 85 additions & 0 deletions cache/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package cache

import (
"context"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
)

const (
instrumentationName = "github.com/moby/buildkit/cache"
metricCacheRecords = "cache.records.count"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if we should have a cache.records.size to collect the cache size or maybe split between cache.records.shared.size, cache.records.private.size, cache.records.reclaimable.size?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really sure to be honest. I do think that will likely take a bit more effort though to implement so I left it out of this initial version. The reason is because I wasn't quite sure how to determine the disk usage in an efficient way taking into account mutable and immutable. I figured immutable entries didn't need to be continuously updated while mutable ones would have to be rechecked. I also didn't want to hold a lock on the disk usage or perform a potentially expensive computation to count the size.

I do think it's likely worth making a new issue for cache sizes and adding some metrics to it. I'd say all of the above are good metrics.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to hold a lock on the disk usage or perform a potentially expensive computation to count the size.

Oh right that's a very good point, disk usage is indeed a resource intensive call.

metricCachePruneDuration = "cache.prune.duration"
)

type metrics struct {
CacheRecords metric.Int64ObservableGauge
CachePruneDuration metric.Int64Histogram
meter metric.Meter
regs []metric.Registration
}

func newMetrics(cm *cacheManager, mp metric.MeterProvider) *metrics {
m := &metrics{}

var err error
m.meter = mp.Meter(instrumentationName)

m.CacheRecords, err = m.meter.Int64ObservableGauge(metricCacheRecords,
metric.WithDescription("Number of cache records."),
)
if err != nil {
otel.Handle(err)
}

m.CachePruneDuration, err = m.meter.Int64Histogram(metricCachePruneDuration,
metric.WithDescription("Measures the duration of cache prune operations."),
metric.WithUnit("ms"),
)
if err != nil {
otel.Handle(err)
}

reg, err := m.meter.RegisterCallback(cm.collectMetrics, m.CacheRecords)
if err != nil {
otel.Handle(err)
}
m.regs = append(m.regs, reg)

return m
}

func (m *metrics) MeasurePrune() (record func(ctx context.Context)) {
start := time.Now()
return func(ctx context.Context) {
dur := int64(time.Since(start) / time.Millisecond)
m.CachePruneDuration.Record(ctx, dur)
}
}

func (m *metrics) Close() error {
for _, reg := range m.regs {
_ = reg.Unregister()
}
return nil
}

type cacheStats struct {
NumRecords int64
}

func (cm *cacheManager) readStats() (stats cacheStats) {
cm.mu.Lock()
defer cm.mu.Unlock()

stats.NumRecords = int64(len(cm.records))
return stats
}

func (cm *cacheManager) collectMetrics(ctx context.Context, o metric.Observer) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When does this get called?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This gets called automatically during a metrics collection interval (defined by the reader). So for the periodic reader, every 60 seconds by default. For the prometheus reader, whenever /metrics is called.

See the invocation of RegisterCallback earlier in this file and the observable gauge.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So for the periodic reader, every 60 seconds by default. For the prometheus reader, whenever /metrics is called.

q: Does this mean that the calls are duplicated because there are 2 readers?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just checked this. Short answer, yes.

The call happens for each reader so it'll happen each time /metrics is hit and also every 60 seconds.

Alternatively, if the call never happens because the metrics are never checked, it happens zero times. So if the otel collector isn't configured and the prometheus endpoint doesn't exist or isn't invoked.

stats := cm.readStats()
o.ObserveInt64(cm.metrics.CacheRecords, stats.NumRecords)
return nil
}
13 changes: 9 additions & 4 deletions cmd/buildkitd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import (
"github.com/urfave/cli"
"go.etcd.io/bbolt"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/propagation"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
Expand All @@ -86,6 +87,9 @@ func init() {

// enable in memory recording for buildkitd traces
detect.Recorder = detect.NewTraceRecorder()

// register alternative handler for otel
otel.SetErrorHandler(bklog.OTELErrorHandler{})
}

var propagators = propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})
Expand All @@ -94,6 +98,7 @@ type workerInitializerOpt struct {
config *config.Config
sessionManager *session.Manager
traceSocket string
meterProvider metric.MeterProvider
}

type workerInitializer struct {
Expand Down Expand Up @@ -316,7 +321,7 @@ func main() {
os.RemoveAll(lockPath)
}()

controller, err := newController(c, &cfg)
controller, err := newController(c, &cfg, mp)
if err != nil {
return err
}
Expand Down Expand Up @@ -711,7 +716,7 @@ func serverCredentials(cfg config.TLSConfig) (*tls.Config, error) {
return tlsConf, nil
}

func newController(c *cli.Context, cfg *config.Config) (*control.Controller, error) {
func newController(c *cli.Context, cfg *config.Config, mp metric.MeterProvider) (*control.Controller, error) {
sessionManager, err := session.NewManager()
if err != nil {
return nil, err
Expand All @@ -734,6 +739,7 @@ func newController(c *cli.Context, cfg *config.Config) (*control.Controller, err
config: cfg,
sessionManager: sessionManager,
traceSocket: traceSocket,
meterProvider: mp,
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -922,8 +928,7 @@ type traceCollector struct {
}

func (t *traceCollector) Export(ctx context.Context, req *tracev1.ExportTraceServiceRequest) (*tracev1.ExportTraceServiceResponse, error) {
err := t.exporter.ExportSpans(ctx, transform.Spans(req.GetResourceSpans()))
if err != nil {
if err := t.exporter.ExportSpans(ctx, transform.Spans(req.GetResourceSpans())); err != nil {
return nil, err
}
return &tracev1.ExportTraceServiceResponse{}, nil
Expand Down
1 change: 1 addition & 0 deletions cmd/buildkitd/main_containerd_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ func containerdWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([
opt.GCPolicy = getGCPolicy(cfg.GCConfig, common.config.Root)
opt.BuildkitVersion = getBuildkitVersion()
opt.RegistryHosts = resolverFunc(common.config)
opt.MeterProvider = common.meterProvider

if platformsStr := cfg.Platforms; len(platformsStr) != 0 {
platforms, err := parsePlatforms(platformsStr)
Expand Down
1 change: 1 addition & 0 deletions cmd/buildkitd/main_oci_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ func ociWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([]worker
opt.GCPolicy = getGCPolicy(cfg.GCConfig, common.config.Root)
opt.BuildkitVersion = getBuildkitVersion()
opt.RegistryHosts = hosts
opt.MeterProvider = common.meterProvider

if platformsStr := cfg.Platforms; len(platformsStr) != 0 {
platforms, err := parsePlatforms(platformsStr)
Expand Down
6 changes: 6 additions & 0 deletions util/bklog/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,9 @@ func TraceLevelOnlyStack() string {
}
return ""
}

type OTELErrorHandler struct{}

func (o OTELErrorHandler) Handle(err error) {
G(context.Background()).Error(err)
}
3 changes: 3 additions & 0 deletions worker/base/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
digest "github.com/opencontainers/go-digest"
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"go.opentelemetry.io/otel/metric"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
)
Expand Down Expand Up @@ -81,6 +82,7 @@ type WorkerOpt struct {
MetadataStore *metadata.Store
MountPoolRoot string
ResourceMonitor *resources.Monitor
MeterProvider metric.MeterProvider
}

// Worker is a local worker instance with dedicated snapshotter, cache, and so on.
Expand Down Expand Up @@ -111,6 +113,7 @@ func NewWorker(ctx context.Context, opt WorkerOpt) (*Worker, error) {
Differ: opt.Differ,
MetadataStore: opt.MetadataStore,
MountPoolRoot: opt.MountPoolRoot,
MeterProvider: opt.MeterProvider,
})
if err != nil {
return nil, err
Expand Down