diff --git a/pkg/bloombuild/builder/builder.go b/pkg/bloombuild/builder/builder.go index f05c1fc08fc3..37f5c7d616fb 100644 --- a/pkg/bloombuild/builder/builder.go +++ b/pkg/bloombuild/builder/builder.go @@ -42,7 +42,7 @@ type Builder struct { logger log.Logger tsdbStore common.TSDBStore - bloomStore bloomshipper.StoreBase + bloomStore bloomshipper.Store chunkLoader ChunkLoader client protos.PlannerForBuilderClient @@ -55,20 +55,23 @@ func New( storeCfg storage.Config, storageMetrics storage.ClientMetrics, fetcherProvider stores.ChunkFetcherProvider, - bloomStore bloomshipper.StoreBase, + bloomStore bloomshipper.Store, logger log.Logger, r prometheus.Registerer, ) (*Builder, error) { utillog.WarnExperimentalUse("Bloom Builder", logger) + builderID := uuid.NewString() + logger = log.With(logger, "builder_id", builderID) + tsdbStore, err := common.NewTSDBStores(schemaCfg, storeCfg, storageMetrics, logger) if err != nil { return nil, fmt.Errorf("error creating TSDB store: %w", err) } - metrics := NewMetrics(r, v1.NewMetrics(r)) + metrics := NewMetrics(r, bloomStore.BloomMetrics()) b := &Builder{ - ID: uuid.NewString(), + ID: builderID, cfg: cfg, limits: limits, metrics: metrics, @@ -341,7 +344,7 @@ func (b *Builder) processTask( blocksIter, b.rwFn, nil, // TODO(salvacorts): Pass reporter or remove when we address tracking - b.metrics, + b.bloomStore.BloomMetrics(), logger, ) diff --git a/pkg/bloombuild/builder/builder_test.go b/pkg/bloombuild/builder/builder_test.go index 764e8cb6350f..b04a34fb6eeb 100644 --- a/pkg/bloombuild/builder/builder_test.go +++ b/pkg/bloombuild/builder/builder_test.go @@ -20,8 +20,10 @@ import ( "github.com/grafana/loki/v3/pkg/bloombuild/protos" "github.com/grafana/loki/v3/pkg/storage" + v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" "github.com/grafana/loki/v3/pkg/storage/chunk/client/local" "github.com/grafana/loki/v3/pkg/storage/config" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" bloomshipperconfig "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper/config" "github.com/grafana/loki/v3/pkg/storage/types" ) @@ -86,7 +88,7 @@ func Test_BuilderLoop(t *testing.T) { } flagext.DefaultValues(&cfg.GrpcConfig) - builder, err := New(cfg, limits, schemaCfg, storageCfg, storage.NewClientMetrics(), nil, nil, logger, prometheus.DefaultRegisterer) + builder, err := New(cfg, limits, schemaCfg, storageCfg, storage.NewClientMetrics(), nil, fakeBloomStore{}, logger, prometheus.DefaultRegisterer) require.NoError(t, err) t.Cleanup(func() { err = services.StopAndAwaitTerminated(context.Background(), builder) @@ -240,6 +242,14 @@ func (f fakeLimits) BloomCompactorMaxBloomSize(_ string) int { panic("implement me") } +type fakeBloomStore struct { + bloomshipper.Store +} + +func (f fakeBloomStore) BloomMetrics() *v1.Metrics { + return nil +} + func parseDayTime(s string) config.DayTime { t, err := time.Parse("2006-01-02", s) if err != nil { diff --git a/pkg/bloombuild/builder/metrics.go b/pkg/bloombuild/builder/metrics.go index 658d5a2c43ac..37e53b0d9f08 100644 --- a/pkg/bloombuild/builder/metrics.go +++ b/pkg/bloombuild/builder/metrics.go @@ -16,8 +16,7 @@ const ( ) type Metrics struct { - bloomMetrics *v1.Metrics - running prometheus.Gauge + running prometheus.Gauge taskStarted prometheus.Counter taskCompleted *prometheus.CounterVec @@ -35,7 +34,6 @@ type Metrics struct { func NewMetrics(r prometheus.Registerer, bloomMetrics *v1.Metrics) *Metrics { return &Metrics{ - bloomMetrics: bloomMetrics, running: promauto.With(r).NewGauge(prometheus.GaugeOpts{ Namespace: metricsNamespace, Subsystem: metricsSubsystem, diff --git a/pkg/bloombuild/builder/spec.go b/pkg/bloombuild/builder/spec.go index 284c0c6d7fc4..a031a69c9812 100644 --- a/pkg/bloombuild/builder/spec.go +++ b/pkg/bloombuild/builder/spec.go @@ -49,7 +49,7 @@ type SimpleBloomGenerator struct { // options to build blocks with opts v1.BlockOptions - metrics *Metrics + metrics *v1.Metrics logger log.Logger readWriterFn func() (v1.BlockWriter, v1.BlockReader) @@ -70,7 +70,7 @@ func NewSimpleBloomGenerator( blocksIter v1.ResettableIterator[*v1.SeriesWithBlooms], readWriterFn func() (v1.BlockWriter, v1.BlockReader), reporter func(model.Fingerprint), - metrics *Metrics, + metrics *v1.Metrics, logger log.Logger, ) *SimpleBloomGenerator { return &SimpleBloomGenerator{ @@ -92,7 +92,7 @@ func NewSimpleBloomGenerator( opts.Schema.NGramLen(), opts.Schema.NGramSkip(), int(opts.UnencodedBlockOptions.MaxBloomSizeBytes), - metrics.bloomMetrics, + metrics, ), } } @@ -163,7 +163,7 @@ func (s *SimpleBloomGenerator) Generate(ctx context.Context) *LazyBlockBuilderIt type LazyBlockBuilderIterator struct { ctx context.Context opts v1.BlockOptions - metrics *Metrics + metrics *v1.Metrics populate v1.BloomPopulatorFunc readWriterFn func() (v1.BlockWriter, v1.BlockReader) series v1.PeekingIterator[*v1.Series] @@ -177,7 +177,7 @@ type LazyBlockBuilderIterator struct { func NewLazyBlockBuilderIterator( ctx context.Context, opts v1.BlockOptions, - metrics *Metrics, + metrics *v1.Metrics, populate v1.BloomPopulatorFunc, readWriterFn func() (v1.BlockWriter, v1.BlockReader), series v1.PeekingIterator[*v1.Series], @@ -214,7 +214,7 @@ func (b *LazyBlockBuilderIterator) Next() bool { return false } - mergeBuilder := v1.NewMergeBuilder(b.blocks, b.series, b.populate, b.metrics.bloomMetrics) + mergeBuilder := v1.NewMergeBuilder(b.blocks, b.series, b.populate, b.metrics) writer, reader := b.readWriterFn() blockBuilder, err := v1.NewBlockBuilder(b.opts, writer) if err != nil { @@ -229,7 +229,7 @@ func (b *LazyBlockBuilderIterator) Next() bool { return false } - b.curr = v1.NewBlock(reader, b.metrics.bloomMetrics) + b.curr = v1.NewBlock(reader, b.metrics) return true } diff --git a/pkg/bloombuild/builder/spec_test.go b/pkg/bloombuild/builder/spec_test.go index 77bb76f7ecaf..0e3f98c90779 100644 --- a/pkg/bloombuild/builder/spec_test.go +++ b/pkg/bloombuild/builder/spec_test.go @@ -107,7 +107,7 @@ func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v1.Iterator[*v1.Ser return v1.NewMemoryBlockWriter(indexBuf, bloomsBuf), v1.NewByteReader(indexBuf, bloomsBuf) }, nil, - NewMetrics(nil, v1.NewMetrics(nil)), + v1.NewMetrics(nil), log.NewNopLogger(), ) } diff --git a/pkg/bloombuild/planner/metrics.go b/pkg/bloombuild/planner/metrics.go index 485bf81ddd76..287c59a0955b 100644 --- a/pkg/bloombuild/planner/metrics.go +++ b/pkg/bloombuild/planner/metrics.go @@ -26,7 +26,6 @@ type Metrics struct { inflightRequests prometheus.Summary tasksRequeued prometheus.Counter taskLost prometheus.Counter - tasksFailed prometheus.Counter buildStarted prometheus.Counter buildCompleted *prometheus.CounterVec @@ -86,12 +85,6 @@ func NewMetrics( Name: "tasks_lost_total", Help: "Total number of tasks lost due to not being picked up by a builder and failed to be requeued.", }), - tasksFailed: promauto.With(r).NewCounter(prometheus.CounterOpts{ - Namespace: metricsNamespace, - Subsystem: metricsSubsystem, - Name: "tasks_failed_total", - Help: "Total number of tasks that failed to be processed by builders (after the configured retries).", - }), buildStarted: promauto.With(r).NewCounter(prometheus.CounterOpts{ Namespace: metricsNamespace, @@ -149,7 +142,7 @@ func NewMetrics( Subsystem: metricsSubsystem, Name: "tenant_tasks_completed", Help: "Number of tasks completed for a tenant during the current build iteration.", - }, []string{"tenant"}), + }, []string{"tenant", "status"}), } } diff --git a/pkg/bloombuild/planner/planner.go b/pkg/bloombuild/planner/planner.go index 510ed8f96cec..159c8f791f4a 100644 --- a/pkg/bloombuild/planner/planner.go +++ b/pkg/bloombuild/planner/planner.go @@ -524,7 +524,8 @@ func (p *Planner) loadTenantWork( // NOTE(salvacorts): We will reset them multiple times for the same tenant, for each table, but it's not a big deal. // Alternatively, we can use a Counter instead of a Gauge, but I think a Gauge is easier to reason about. p.metrics.tenantTasksPlanned.WithLabelValues(tenant).Set(0) - p.metrics.tenantTasksCompleted.WithLabelValues(tenant).Set(0) + p.metrics.tenantTasksCompleted.WithLabelValues(tenant, statusSuccess).Set(0) + p.metrics.tenantTasksCompleted.WithLabelValues(tenant, statusFailure).Set(0) level.Debug(p.logger).Log("msg", "loading work for tenant", "table", table, "tenant", tenant, "splitFactor", splitFactor) } @@ -799,7 +800,7 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer if err != nil { maxRetries := p.limits.BloomTaskMaxRetries(task.Tenant) if maxRetries > 0 && int(task.timesEnqueued.Load()) >= maxRetries { - p.metrics.tasksFailed.Inc() + p.metrics.tenantTasksCompleted.WithLabelValues(task.Tenant, statusFailure).Inc() p.removePendingTask(task) level.Error(logger).Log( "msg", "task failed after max retries", @@ -841,7 +842,7 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer "retries", task.timesEnqueued.Load()-1, // -1 because the first enqueue is not a retry ) p.removePendingTask(task) - p.metrics.tenantTasksCompleted.WithLabelValues(task.Tenant).Inc() + p.metrics.tenantTasksCompleted.WithLabelValues(task.Tenant, statusSuccess).Inc() // Send the result back to the task. The channel is buffered, so this should not block. task.resultsChannel <- result