Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Observe index gateway request count per tenant #9781

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions clients/pkg/logentry/stages/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ func (m *limitStage) Name() string {
}

func getDropCountByLabelMetric(registerer prometheus.Registerer) *prometheus.CounterVec {
return util.RegisterCounterVec(registerer, "logentry", "dropped_lines_by_label_total",
"A count of all log lines dropped as a result of a pipeline stage",
[]string{"label_name", "label_value"})
return util.RegisterCollectorAllowExisting(registerer, prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "logentry",
Name: "dropped_lones_by_label_total",
Help: "A count of all log lines dropped as a result of a pipeline stage",
}, []string{"label_name", "label_value"}))
}
58 changes: 33 additions & 25 deletions pkg/storage/stores/indexshipper/gatewayclient/gateway_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/grafana/loki/pkg/storage/stores/series/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway"
shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util"
"github.com/grafana/loki/pkg/util"
util_log "github.com/grafana/loki/pkg/util/log"
util_math "github.com/grafana/loki/pkg/util/math"
)
Expand Down Expand Up @@ -85,11 +86,33 @@ func (i *IndexGatewayClientConfig) RegisterFlags(f *flag.FlagSet) {
i.RegisterFlagsWithPrefix("index-gateway-client", f)
}

type gatewayClientMetrics struct {
latency *prometheus.HistogramVec
count *prometheus.CounterVec
}

func newGatewayClientMetrics(r prometheus.Registerer) *gatewayClientMetrics {
return &gatewayClientMetrics{
latency: util.RegisterCollectorAllowExisting(r, prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "loki",
Subsystem: "index_gateway",
Name: "request_duration_seconds",
Help: "Time (in seconds) spent on index gateway requests",
Buckets: instrument.DefBuckets,
}, []string{"operation", "status_code"})),
count: util.RegisterCollectorAllowExisting(r, prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Subsystem: "index_gateway",
Name: "client_requests_total",
Help: "Total amount of request performed by index gateway clients when running in ring mode",
}, []string{"status", "tenant"})),
}
}

type GatewayClient struct {
cfg IndexGatewayClientConfig

storeGatewayClientRequestDuration *prometheus.HistogramVec

metrics *gatewayClientMetrics
conn *grpc.ClientConn
grpcClient logproto.IndexGatewayClient

Expand All @@ -105,31 +128,15 @@ type GatewayClient struct {
// If it is configured to be in ring mode, a pool of GRPC connections to all Index Gateway instances is created.
// Otherwise, it creates a single GRPC connection to an Index Gateway instance running in simple mode.
func NewGatewayClient(cfg IndexGatewayClientConfig, r prometheus.Registerer, limits indexgateway.Limits, logger log.Logger) (*GatewayClient, error) {
latency := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "loki",
Name: "index_gateway_request_duration_seconds",
Help: "Time (in seconds) spent serving requests when using the index gateway",
Buckets: instrument.DefBuckets,
}, []string{"operation", "status_code"})
if r != nil {
err := r.Register(latency)
if err != nil {
alreadyErr, ok := err.(prometheus.AlreadyRegisteredError)
if !ok {
return nil, err
}
latency = alreadyErr.ExistingCollector.(*prometheus.HistogramVec)
}
}

m := newGatewayClientMetrics(r)
sgClient := &GatewayClient{
cfg: cfg,
storeGatewayClientRequestDuration: latency,
ring: cfg.Ring,
limits: limits,
cfg: cfg,
metrics: m,
ring: cfg.Ring,
limits: limits,
}

dialOpts, err := cfg.GRPCClientConfig.DialOption(grpcclient.Instrument(sgClient.storeGatewayClientRequestDuration))
dialOpts, err := cfg.GRPCClientConfig.DialOption(grpcclient.Instrument(sgClient.metrics.latency))
if err != nil {
return nil, errors.Wrap(err, "index gateway grpc dial option")
}
Expand Down Expand Up @@ -357,9 +364,10 @@ func (s *GatewayClient) ringModeDo(ctx context.Context, callback func(client log
if err := callback(client); err != nil {
lastErr = err
level.Error(util_log.Logger).Log("msg", fmt.Sprintf("client do failed for instance %s", addr), "err", err)
s.metrics.count.WithLabelValues("error", userID).Inc()
continue
}

s.metrics.count.WithLabelValues("success", userID).Inc()
return nil
}

Expand Down
26 changes: 12 additions & 14 deletions pkg/util/metrics_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,22 +822,20 @@ type CollectorVec interface {
Delete(labels prometheus.Labels) bool
}

// RegisterCounterVec registers new CounterVec with given name,namespace and labels.
// If metric was already registered it returns existing instance.
func RegisterCounterVec(registerer prometheus.Registerer, namespace, name, help string, labels []string) *prometheus.CounterVec {
vec := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: name,
Help: help,
}, labels)
err := registerer.Register(vec)
// RegisterCollectorAllowExisting registers a new metric of type T with the given registerer.
// If the collector was already registered it returns the existing collector.
func RegisterCollectorAllowExisting[T prometheus.Collector](r prometheus.Registerer, m T) T {
if r == nil {
// Don't register if there is no registerer
return m
}
err := r.Register(m)
if err != nil {
if existing, ok := err.(prometheus.AlreadyRegisteredError); ok {
vec = existing.ExistingCollector.(*prometheus.CounterVec)
} else {
// Same behavior as MustRegister if the error is not for AlreadyRegistered
panic(err)
return existing.ExistingCollector.(T)
}
// Same behavior as MustRegister if the error is not for AlreadyRegistered
panic(err)
}
return vec
return m
}