Skip to content

Commit

Permalink
Fix metric registration by solving the "component" label conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
JoaoBraveCoding committed Nov 23, 2023
1 parent 184ea74 commit e816a3c
Show file tree
Hide file tree
Showing 14 changed files with 52 additions and 52 deletions.
2 changes: 1 addition & 1 deletion pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func New(
}

// Configure ObjectClient and IndexShipper for series and chunk management
objectClient, err := storage.NewObjectClient(periodicConfig.ObjectType, storageCfg, clientMetrics)
objectClient, err := storage.NewObjectClient("bloom-compactor", periodicConfig.ObjectType, storageCfg, clientMetrics, r)
if err != nil {
return nil, fmt.Errorf("error creating object client '%s': %w", periodicConfig.ObjectType, err)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/logcli/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,10 +497,11 @@ func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string

func GetObjectClient(store string, conf loki.Config, cm storage.ClientMetrics) (chunk.ObjectClient, error) {
oc, err := storage.NewObjectClient(
"log-cli-query",
store,
conf.StorageConfig,
cm,
)
prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
Expand Down
12 changes: 5 additions & 7 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,9 +607,7 @@ func (t *Loki) initTableManager() (services.Service, error) {
os.Exit(1)
}

reg := prometheus.WrapRegistererWith(prometheus.Labels{"component": "table-manager-store"}, prometheus.DefaultRegisterer)

tableClient, err := storage.NewTableClient(lastConfig.IndexType, *lastConfig, t.Cfg.StorageConfig, t.clientMetrics, reg, util_log.Logger)
tableClient, err := storage.NewTableClient("table-manager-store", lastConfig.IndexType, *lastConfig, t.Cfg.StorageConfig, t.clientMetrics, prometheus.DefaultRegisterer, util_log.Logger)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1215,7 +1213,7 @@ func (t *Loki) initCompactor() (services.Service, error) {
continue
}

objectClient, err := storage.NewObjectClient(periodConfig.ObjectType, t.Cfg.StorageConfig, t.clientMetrics)
objectClient, err := storage.NewObjectClient("compactor", periodConfig.ObjectType, t.Cfg.StorageConfig, t.clientMetrics, prometheus.DefaultRegisterer)
if err != nil {
return nil, fmt.Errorf("failed to create object client: %w", err)
}
Expand All @@ -1226,7 +1224,7 @@ func (t *Loki) initCompactor() (services.Service, error) {
var deleteRequestStoreClient client.ObjectClient
if t.Cfg.CompactorConfig.RetentionEnabled {
if deleteStore := t.Cfg.CompactorConfig.DeleteRequestStore; deleteStore != "" {
if deleteRequestStoreClient, err = storage.NewObjectClient(deleteStore, t.Cfg.StorageConfig, t.clientMetrics); err != nil {
if deleteRequestStoreClient, err = storage.NewObjectClient("compactor", deleteStore, t.Cfg.StorageConfig, t.clientMetrics, prometheus.DefaultRegisterer); err != nil {
return nil, fmt.Errorf("failed to create delete request store object client: %w", err)
}
} else {
Expand Down Expand Up @@ -1319,7 +1317,7 @@ func (t *Loki) initIndexGateway() (services.Service, error) {
}
tableRange := period.GetIndexTableNumberRange(periodEndTime)

indexClient, err := storage.NewIndexClient(period, tableRange, t.Cfg.StorageConfig, t.Cfg.SchemaConfig, t.Overrides, t.clientMetrics, shardingStrategy,
indexClient, err := storage.NewIndexClient("index-gateway", period, tableRange, t.Cfg.StorageConfig, t.Cfg.SchemaConfig, t.Overrides, t.clientMetrics, shardingStrategy,
prometheus.DefaultRegisterer, log.With(util_log.Logger, "index-store", fmt.Sprintf("%s-%s", period.IndexType, period.From.String())), t.Cfg.MetricsNamespace,
)
if err != nil {
Expand Down Expand Up @@ -1519,7 +1517,7 @@ func (t *Loki) initAnalytics() (services.Service, error) {
return nil, err
}

objectClient, err := storage.NewObjectClient(period.ObjectType, t.Cfg.StorageConfig, t.clientMetrics)
objectClient, err := storage.NewObjectClient("analytics", period.ObjectType, t.Cfg.StorageConfig, t.clientMetrics, prometheus.DefaultRegisterer)
if err != nil {
level.Info(util_log.Logger).Log("msg", "failed to initialize usage report", "err", err)
return nil, nil
Expand Down
5 changes: 4 additions & 1 deletion pkg/storage/bucket/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,11 @@ func bucketWithMetrics(bucketClient objstore.Bucket, name string, reg prometheus
return bucketClient
}

reg = prometheus.WrapRegistererWithPrefix("loki_", reg)
reg = prometheus.WrapRegistererWith(prometheus.Labels{"component": name}, reg)

return objstore.WrapWithMetrics(
bucketClient,
prometheus.WrapRegistererWith(prometheus.Labels{"component": name}, reg),
reg,
"")
}
34 changes: 18 additions & 16 deletions pkg/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ func (cfg *Config) Validate() error {
}

// NewIndexClient creates a new index client of the desired type specified in the PeriodConfig
func NewIndexClient(periodCfg config.PeriodConfig, tableRange config.TableRange, cfg Config, schemaCfg config.SchemaConfig, limits StoreLimits, cm ClientMetrics, shardingStrategy indexgateway.ShardingStrategy, registerer prometheus.Registerer, logger log.Logger, metricsNamespace string) (index.Client, error) {
func NewIndexClient(component string, periodCfg config.PeriodConfig, tableRange config.TableRange, cfg Config, schemaCfg config.SchemaConfig, limits StoreLimits, cm ClientMetrics, shardingStrategy indexgateway.ShardingStrategy, registerer prometheus.Registerer, logger log.Logger, metricsNamespace string) (index.Client, error) {

switch true {
case util.StringsContain(testingStorageTypes, periodCfg.IndexType):
Expand Down Expand Up @@ -444,7 +444,7 @@ func NewIndexClient(periodCfg config.PeriodConfig, tableRange config.TableRange,
return client, nil
}

objectClient, err := NewObjectClient(periodCfg.ObjectType, cfg, cm)
objectClient, err := NewObjectClient(component, periodCfg.ObjectType, cfg, cm, registerer)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -505,7 +505,7 @@ func NewIndexClient(periodCfg config.PeriodConfig, tableRange config.TableRange,
}

// NewChunkClient makes a new chunk.Client of the desired types.
func NewChunkClient(name string, cfg Config, schemaCfg config.SchemaConfig, cc congestion.Controller, registerer prometheus.Registerer, clientMetrics ClientMetrics, logger log.Logger) (client.Client, error) {
func NewChunkClient(component, name string, cfg Config, schemaCfg config.SchemaConfig, cc congestion.Controller, registerer prometheus.Registerer, clientMetrics ClientMetrics, logger log.Logger) (client.Client, error) {
var storeType = name

// lookup storeType for named stores
Expand All @@ -518,7 +518,7 @@ func NewChunkClient(name string, cfg Config, schemaCfg config.SchemaConfig, cc c
case util.StringsContain(testingStorageTypes, storeType):
switch storeType {
case config.StorageTypeInMemory:
c, err := NewObjectClient(name, cfg, clientMetrics)
c, err := NewObjectClient(component, name, cfg, clientMetrics, registerer)
if err != nil {
return nil, err
}
Expand All @@ -528,21 +528,21 @@ func NewChunkClient(name string, cfg Config, schemaCfg config.SchemaConfig, cc c
case util.StringsContain(supportedStorageTypes, storeType):
switch storeType {
case config.StorageTypeFileSystem:
c, err := NewObjectClient(name, cfg, clientMetrics)
c, err := NewObjectClient(component, name, cfg, clientMetrics, registerer)
if err != nil {
return nil, err
}
return client.NewClientWithMaxParallel(c, client.FSEncoder, cfg.MaxParallelGetChunk, schemaCfg), nil

case config.StorageTypeAWS, config.StorageTypeS3, config.StorageTypeAzure, config.StorageTypeBOS, config.StorageTypeSwift, config.StorageTypeCOS, config.StorageTypeAlibabaCloud:
c, err := NewObjectClient(name, cfg, clientMetrics)
c, err := NewObjectClient(component, name, cfg, clientMetrics, registerer)
if err != nil {
return nil, err
}
return client.NewClientWithMaxParallel(c, nil, cfg.MaxParallelGetChunk, schemaCfg), nil

case config.StorageTypeGCS:
c, err := NewObjectClient(name, cfg, clientMetrics)
c, err := NewObjectClient(component, name, cfg, clientMetrics, registerer)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -583,7 +583,7 @@ func NewChunkClient(name string, cfg Config, schemaCfg config.SchemaConfig, cc c
}

// NewTableClient makes a new table client based on the configuration.
func NewTableClient(name string, periodCfg config.PeriodConfig, cfg Config, cm ClientMetrics, registerer prometheus.Registerer, logger log.Logger) (index.TableClient, error) {
func NewTableClient(component, name string, periodCfg config.PeriodConfig, cfg Config, cm ClientMetrics, registerer prometheus.Registerer, logger log.Logger) (index.TableClient, error) {
switch true {
case util.StringsContain(testingStorageTypes, name):
switch name {
Expand All @@ -592,7 +592,7 @@ func NewTableClient(name string, periodCfg config.PeriodConfig, cfg Config, cm C
}

case util.StringsContain(supportedIndexTypes, name):
objectClient, err := NewObjectClient(periodCfg.ObjectType, cfg, cm)
objectClient, err := NewObjectClient(component, periodCfg.ObjectType, cfg, cm, registerer)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -647,8 +647,8 @@ func (c *ClientMetrics) Unregister() {
}

// NewObjectClient makes a new StorageClient with the prefix in the front.
func NewObjectClient(name string, cfg Config, clientMetrics ClientMetrics) (client.ObjectClient, error) {
actual, err := internalNewObjectClient(name, cfg, clientMetrics)
func NewObjectClient(component, name string, cfg Config, clientMetrics ClientMetrics, reg prometheus.Registerer) (client.ObjectClient, error) {
actual, err := internalNewObjectClient(component, name, cfg, clientMetrics, reg)
if err != nil {
return nil, err
}
Expand All @@ -662,12 +662,17 @@ func NewObjectClient(name string, cfg Config, clientMetrics ClientMetrics) (clie
}

// internalNewObjectClient makes the underlying StorageClient of the desired types.
func internalNewObjectClient(name string, cfg Config, clientMetrics ClientMetrics) (client.ObjectClient, error) {
func internalNewObjectClient(component, name string, cfg Config, clientMetrics ClientMetrics, reg prometheus.Registerer) (client.ObjectClient, error) {
var (
namedStore string
storeType = name
)

// preserve olf reg behaviour
if !cfg.ThanosObjStore {
reg = prometheus.WrapRegistererWith(prometheus.Labels{"component": component}, reg)
}

// lookup storeType for named stores
if nsType, ok := cfg.NamedStores.storeType[name]; ok {
storeType = nsType
Expand Down Expand Up @@ -717,10 +722,7 @@ func internalNewObjectClient(name string, cfg Config, clientMetrics ClientMetric
gcsCfg.EnableRetries = false
}
if cfg.ThanosObjStore {
// Passing "gcs" as the component name as currently it's not
// possible to get the component called this method
// TODO(JoaoBraveCoding) update compoent when bigger refactor happens
return gcp.NewGCSThanosObjectClient(context.Background(), cfg.ObjStoreConf, "gcs", utilLog.Logger, cfg.Hedging, prometheus.WrapRegistererWithPrefix("loki_", prometheus.NewRegistry()))
return gcp.NewGCSThanosObjectClient(context.Background(), cfg.ObjStoreConf, component, utilLog.Logger, cfg.Hedging, reg)
}
return gcp.NewGCSObjectClient(context.Background(), gcsCfg, cfg.Hedging)

Expand Down
9 changes: 5 additions & 4 deletions pkg/storage/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -233,7 +234,7 @@ func TestNewObjectClient_prefixing(t *testing.T) {
var cfg Config
flagext.DefaultValues(&cfg)

objectClient, err := NewObjectClient("inmemory", cfg, cm)
objectClient, err := NewObjectClient("inmemory", "inmemory", cfg, cm, prometheus.NewRegistry())
require.NoError(t, err)

_, ok := objectClient.(client.PrefixedObjectClient)
Expand All @@ -245,7 +246,7 @@ func TestNewObjectClient_prefixing(t *testing.T) {
flagext.DefaultValues(&cfg)
cfg.ObjectPrefix = "my/prefix/"

objectClient, err := NewObjectClient("inmemory", cfg, cm)
objectClient, err := NewObjectClient("inmemory", "inmemory", cfg, cm, prometheus.NewRegistry())
require.NoError(t, err)

prefixed, ok := objectClient.(client.PrefixedObjectClient)
Expand All @@ -258,7 +259,7 @@ func TestNewObjectClient_prefixing(t *testing.T) {
flagext.DefaultValues(&cfg)
cfg.ObjectPrefix = "my/prefix"

objectClient, err := NewObjectClient("inmemory", cfg, cm)
objectClient, err := NewObjectClient("inmemory", "inmemory", cfg, cm, prometheus.NewRegistry())
require.NoError(t, err)

prefixed, ok := objectClient.(client.PrefixedObjectClient)
Expand All @@ -271,7 +272,7 @@ func TestNewObjectClient_prefixing(t *testing.T) {
flagext.DefaultValues(&cfg)
cfg.ObjectPrefix = "/my/prefix/"

objectClient, err := NewObjectClient("inmemory", cfg, cm)
objectClient, err := NewObjectClient("inmemory", "inmemory", cfg, cm, prometheus.NewRegistry())
require.NoError(t, err)

prefixed, ok := objectClient.(client.PrefixedObjectClient)
Expand Down
20 changes: 6 additions & 14 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,6 @@ func (s *LokiStore) chunkClientForPeriod(p config.PeriodConfig) (client.Client,
if objectStoreType == "" {
objectStoreType = p.IndexType
}
chunkClientReg := prometheus.WrapRegistererWith(
prometheus.Labels{"component": "chunk-store-" + p.From.String()}, s.registerer)

var cc congestion.Controller
ccCfg := s.cfg.CongestionControl

Expand All @@ -230,7 +227,8 @@ func (s *LokiStore) chunkClientForPeriod(p config.PeriodConfig) (client.Client,
)
}

chunks, err := NewChunkClient(objectStoreType, s.cfg, s.schemaCfg, cc, chunkClientReg, s.clientMetrics, s.logger)
component := "chunk-store-" + p.From.String()
chunks, err := NewChunkClient(component, objectStoreType, s.cfg, s.schemaCfg, cc, s.registerer, s.clientMetrics, s.logger)
if err != nil {
return nil, errors.Wrap(err, "error creating object client")
}
Expand All @@ -253,14 +251,8 @@ func shouldUseIndexGatewayClient(cfg indexshipper.Config) bool {
}

func (s *LokiStore) storeForPeriod(p config.PeriodConfig, tableRange config.TableRange, chunkClient client.Client, f *fetcher.Fetcher) (stores.ChunkWriter, index.ReaderWriter, func(), error) {
indexClientReg := prometheus.WrapRegistererWith(
prometheus.Labels{
"component": fmt.Sprintf(
"index-store-%s-%s",
p.IndexType,
p.From.String(),
),
}, s.registerer)
component := fmt.Sprintf("index-store-%s-%s", p.IndexType, p.From.String())
indexClientReg := prometheus.WrapRegistererWith(prometheus.Labels{"component": component}, s.registerer)
indexClientLogger := log.With(s.logger, "index-store", fmt.Sprintf("%s-%s", p.IndexType, p.From.String()))

if p.IndexType == config.TSDBType {
Expand All @@ -278,7 +270,7 @@ func (s *LokiStore) storeForPeriod(p config.PeriodConfig, tableRange config.Tabl
}, nil
}

objectClient, err := NewObjectClient(p.ObjectType, s.cfg, s.clientMetrics)
objectClient, err := NewObjectClient(component, p.ObjectType, s.cfg, s.clientMetrics, s.registerer)
if err != nil {
return nil, nil, nil, err
}
Expand All @@ -301,7 +293,7 @@ func (s *LokiStore) storeForPeriod(p config.PeriodConfig, tableRange config.Tabl
}, nil
}

idx, err := NewIndexClient(p, tableRange, s.cfg, s.schemaCfg, s.limits, s.clientMetrics, nil, indexClientReg, indexClientLogger, s.metricsNamespace)
idx, err := NewIndexClient(component, p, tableRange, s.cfg, s.schemaCfg, s.limits, s.clientMetrics, nil, s.registerer, indexClientLogger, s.metricsNamespace)
if err != nil {
return nil, nil, nil, errors.Wrap(err, "error creating index client")
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/stores/shipper/bloomshipper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strings"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"

"github.com/grafana/dskit/concurrency"
Expand Down Expand Up @@ -94,7 +95,7 @@ type Client interface {
func NewBloomClient(periodicConfigs []config.PeriodConfig, storageConfig storage.Config, clientMetrics storage.ClientMetrics) (*BloomClient, error) {
periodicObjectClients := make(map[config.DayTime]client.ObjectClient)
for _, periodicConfig := range periodicConfigs {
objectClient, err := storage.NewObjectClient(periodicConfig.ObjectType, storageConfig, clientMetrics)
objectClient, err := storage.NewObjectClient("bloom-shipper", periodicConfig.ObjectType, storageConfig, clientMetrics, prometheus.DefaultRegisterer)
if err != nil {
return nil, fmt.Errorf("error creating object client '%s': %w", periodicConfig.ObjectType, err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

ww "github.com/grafana/dskit/server"
"github.com/grafana/dskit/user"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -128,11 +129,11 @@ type testObjectClient struct {
}

func newTestObjectClient(path string, clientMetrics storage.ClientMetrics) client.ObjectClient {
c, err := storage.NewObjectClient("filesystem", storage.Config{
c, err := storage.NewObjectClient("compactor", "filesystem", storage.Config{
FSConfig: local.FSConfig{
Directory: path,
},
}, clientMetrics)
}, clientMetrics, prometheus.NewRegistry())
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion tools/tsdb/bloom-tester/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func execute() {
periodCfg, tableRange, tableName, err := helpers.GetPeriodConfigForTableNumber(bucket, conf.SchemaConfig.Configs)
helpers.ExitErr("find period config for bucket", err)

objectClient, err := storage.NewObjectClient(periodCfg.ObjectType, conf.StorageConfig, clientMetrics)
objectClient, err := storage.NewObjectClient("bloom-tester", periodCfg.ObjectType, conf.StorageConfig, clientMetrics, prometheus.DefaultRegisterer)
helpers.ExitErr("creating object client", err)

chunkClient := client.NewClient(objectClient, nil, conf.SchemaConfig)
Expand Down
2 changes: 1 addition & 1 deletion tools/tsdb/bloom-tester/readlib.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func executeRead() {
periodCfg, tableRange, tableName, err := helpers.GetPeriodConfigForTableNumber(bucket, conf.SchemaConfig.Configs)
helpers.ExitErr("find period config for bucket", err)

objectClient, err := storage.NewObjectClient(periodCfg.ObjectType, conf.StorageConfig, clientMetrics)
objectClient, err := storage.NewObjectClient("bloom-tester", periodCfg.ObjectType, conf.StorageConfig, clientMetrics, prometheus.DefaultRegisterer)
helpers.ExitErr("creating object client", err)

chunkClient := client.NewClient(objectClient, nil, conf.SchemaConfig)
Expand Down
2 changes: 1 addition & 1 deletion tools/tsdb/index-analyzer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func main() {
periodCfg, tableRange, tableName, err := helpers.GetPeriodConfigForTableNumber(bucket, conf.SchemaConfig.Configs)
helpers.ExitErr("find period config for bucket", err)

objectClient, err := storage.NewObjectClient(periodCfg.ObjectType, conf.StorageConfig, clientMetrics)
objectClient, err := storage.NewObjectClient("index-analyzer", periodCfg.ObjectType, conf.StorageConfig, clientMetrics, prometheus.DefaultRegisterer)
helpers.ExitErr("creating object client", err)

shipper, err := indexshipper.NewIndexShipper(
Expand Down
2 changes: 1 addition & 1 deletion tools/tsdb/migrate-versions/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func main() {
}

func migrateTables(pCfg config.PeriodConfig, storageCfg storage.Config, clientMetrics storage.ClientMetrics, tableRange config.TableRange) error {
objClient, err := storage.NewObjectClient(pCfg.ObjectType, storageCfg, clientMetrics)
objClient, err := storage.NewObjectClient("tables-migration-tool", pCfg.ObjectType, storageCfg, clientMetrics, prometheus.DefaultRegisterer)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion tools/tsdb/migrate-versions/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -52,7 +53,7 @@ func TestMigrateTables(t *testing.T) {
}
clientMetrics := storage.NewClientMetrics()

objClient, err := storage.NewObjectClient(pcfg.ObjectType, storageCfg, clientMetrics)
objClient, err := storage.NewObjectClient("tables-migration-tool", pcfg.ObjectType, storageCfg, clientMetrics, prometheus.DefaultRegisterer)
require.NoError(t, err)
indexStorageClient := shipperstorage.NewIndexStorageClient(objClient, pcfg.IndexTables.PathPrefix)

Expand Down

0 comments on commit e816a3c

Please sign in to comment.