diff --git a/pkg/pattern/tee.go b/pkg/pattern/tee.go index 42557e39f4bc..b240dc889d27 100644 --- a/pkg/pattern/tee.go +++ b/pkg/pattern/tee.go @@ -20,8 +20,8 @@ type Tee struct { logger log.Logger ringClient RingClient - ingesterAppends *prometheus.CounterVec - fallbackIngesterAppends *prometheus.CounterVec + ingesterAppends *prometheus.CounterVec + ingesterMetricAppends *prometheus.CounterVec teedRequests *prometheus.CounterVec @@ -49,11 +49,11 @@ func NewTee( logger: log.With(logger, "component", "pattern-tee"), ingesterAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ Name: "pattern_ingester_appends_total", - Help: "The total number of batch appends of owned streams sent to pattern ingesters.", + Help: "The total number of batch appends sent to pattern ingesters.", }, []string{"ingester", "status"}), - fallbackIngesterAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ - Name: "pattern_ingester_fallback_appends_total", - Help: "The total number of batch appends sent to fallback pattern ingesters, for not owned streams.", + ingesterMetricAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ + Name: "pattern_ingester_metric_appends_total", + Help: "The total number of metric only batch appends sent to pattern ingesters. These requests will not be processed for patterns.", }, []string{"ingester", "status"}), teedRequests: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ Name: "pattern_ingester_teed_requests_total", @@ -102,9 +102,10 @@ func (t *Tee) sendStream(ctx context.Context, stream distributor.KeyedStream) er // Only owned streams are processed for patterns, however any pattern ingester can // aggregate metrics for any stream. Therefore, if we can't send the owned stream, // try to forward request to any pattern ingester so we at least capture the metrics. - replicationSet, err := t.ringClient.Ring().GetAllHealthy(ring.Read) + replicationSet, err := t.ringClient.Ring().GetReplicationSetForOperation(ring.WriteNoExtend) if replicationSet.Instances == nil { - return errors.New("no instances found") + t.ingesterMetricAppends.WithLabelValues("none", "fail").Inc() + return errors.New("no instances found for fallback") } for _, instance := range replicationSet.Instances { @@ -119,10 +120,10 @@ func (t *Tee) sendStream(ctx context.Context, stream distributor.KeyedStream) er _, err = client.(logproto.PatternClient).Push(ctx, req) if err != nil { - t.fallbackIngesterAppends.WithLabelValues(addr, "fail").Inc() + t.ingesterMetricAppends.WithLabelValues(addr, "fail").Inc() continue } - t.fallbackIngesterAppends.WithLabelValues(addr, "success").Inc() + t.ingesterMetricAppends.WithLabelValues(addr, "success").Inc() // bail after any success to prevent sending more than one return nil } @@ -156,7 +157,9 @@ func (t *Tee) sendOwnedStream(ctx context.Context, stream distributor.KeyedStrea t.ingesterAppends.WithLabelValues(addr, "fail").Inc() return err } + // Success here means the stream will be processed for both metrics and patterns t.ingesterAppends.WithLabelValues(addr, "success").Inc() + t.ingesterMetricAppends.WithLabelValues(addr, "success").Inc() return nil }