Skip to content

Commit

Permalink
fix: change metrics around metric appends
Browse files Browse the repository at this point in the history
  • Loading branch information
trevorwhitney committed Aug 2, 2024
1 parent e430a96 commit b28572a
Showing 1 changed file with 13 additions and 10 deletions.
23 changes: 13 additions & 10 deletions pkg/pattern/tee.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ type Tee struct {
logger log.Logger
ringClient RingClient

ingesterAppends *prometheus.CounterVec
fallbackIngesterAppends *prometheus.CounterVec
ingesterAppends *prometheus.CounterVec
ingesterMetricAppends *prometheus.CounterVec

teedRequests *prometheus.CounterVec

Expand Down Expand Up @@ -49,11 +49,11 @@ func NewTee(
logger: log.With(logger, "component", "pattern-tee"),
ingesterAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Name: "pattern_ingester_appends_total",
Help: "The total number of batch appends of owned streams sent to pattern ingesters.",
Help: "The total number of batch appends sent to pattern ingesters.",
}, []string{"ingester", "status"}),
fallbackIngesterAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Name: "pattern_ingester_fallback_appends_total",
Help: "The total number of batch appends sent to fallback pattern ingesters, for not owned streams.",
ingesterMetricAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Name: "pattern_ingester_metric_appends_total",
Help: "The total number of metric only batch appends sent to pattern ingesters. These requests will not be processed for patterns.",
}, []string{"ingester", "status"}),
teedRequests: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Name: "pattern_ingester_teed_requests_total",
Expand Down Expand Up @@ -102,9 +102,10 @@ func (t *Tee) sendStream(ctx context.Context, stream distributor.KeyedStream) er
// Only owned streams are processed for patterns, however any pattern ingester can
// aggregate metrics for any stream. Therefore, if we can't send the owned stream,
// try to forward request to any pattern ingester so we at least capture the metrics.
replicationSet, err := t.ringClient.Ring().GetAllHealthy(ring.Read)
replicationSet, err := t.ringClient.Ring().GetReplicationSetForOperation(ring.WriteNoExtend)
if replicationSet.Instances == nil {
return errors.New("no instances found")
t.ingesterMetricAppends.WithLabelValues("none", "fail").Inc()
return errors.New("no instances found for fallback")
}

for _, instance := range replicationSet.Instances {
Expand All @@ -119,10 +120,10 @@ func (t *Tee) sendStream(ctx context.Context, stream distributor.KeyedStream) er

_, err = client.(logproto.PatternClient).Push(ctx, req)
if err != nil {
t.fallbackIngesterAppends.WithLabelValues(addr, "fail").Inc()
t.ingesterMetricAppends.WithLabelValues(addr, "fail").Inc()
continue
}
t.fallbackIngesterAppends.WithLabelValues(addr, "success").Inc()
t.ingesterMetricAppends.WithLabelValues(addr, "success").Inc()
// bail after any success to prevent sending more than one
return nil
}
Expand Down Expand Up @@ -156,7 +157,9 @@ func (t *Tee) sendOwnedStream(ctx context.Context, stream distributor.KeyedStrea
t.ingesterAppends.WithLabelValues(addr, "fail").Inc()
return err
}
// Success here means the stream will be processed for both metrics and patterns
t.ingesterAppends.WithLabelValues(addr, "success").Inc()
t.ingesterMetricAppends.WithLabelValues(addr, "success").Inc()
return nil
}

Expand Down

0 comments on commit b28572a

Please sign in to comment.