diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index d9940221ff..b362c1c39f 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1611,6 +1611,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store false, s.metrics.emptyPostingCount.WithLabelValues(tenant), nil, + nil, ) } else { resp = newLazyRespSet( diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index d9dd7b641f..a10a29f7ab 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -273,6 +273,15 @@ func (s *ProxyStore) TSDBInfos() []infopb.TSDBInfo { return infos } +type quorumGroup struct { + quorumGroupKey string + context context.Context + cancel context.CancelFunc + quorumValue int64 + quorumCounter *int64 + replicas int +} + func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { // TODO(bwplotka): This should be part of request logger, otherwise it does not make much sense. Also, could be // triggered by tracing span to reduce cognitive load. @@ -385,10 +394,32 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. } defer logGroupReplicaErrors() + level.Debug(reqLogger).Log("s.retrievalStrategy", s.retrievalStrategy) + + quorumGroups := make(map[string]*quorumGroup) for _, st := range stores { - st := st + if quorumGroups[st.GroupKey()] == nil { + groupCtx, cancel := context.WithCancel(ctx) + quorumGroups[st.GroupKey()] = &quorumGroup{ + context: groupCtx, + cancel: cancel, + quorumGroupKey: st.GroupKey(), + quorumValue: 2, + quorumCounter: new(int64), + replicas: 1, + } + } else { + quorumGroups[st.GroupKey()].replicas++ + } + } + level.Debug(reqLogger).Log("quorumGroups", quorumGroups) - respSet, err := newAsyncRespSet(ctx, st, r, s.responseTimeout, s.retrievalStrategy, &s.buffers, r.ShardInfo, reqLogger, s.metrics.emptyStreamResponses) + for _, st := range stores { + st := st + level.Debug(reqLogger).Log("store", st.String(), "store.group", st.GroupKey(), "store.replica", st.ReplicaKey()) + level.Debug(reqLogger).Log("store response timeout", s.responseTimeout) + qg := quorumGroups[st.GroupKey()] + respSet, err := newAsyncRespSet(qg.context, st, r, s.responseTimeout, s.retrievalStrategy, &s.buffers, r.ShardInfo, reqLogger, s.metrics.emptyStreamResponses, qg) if err != nil { level.Warn(s.logger).Log("msg", "Store failure", "group", st.GroupKey(), "replica", st.ReplicaKey(), "err", err) s.metrics.storeFailureCount.WithLabelValues(st.GroupKey(), st.ReplicaKey()).Inc() @@ -418,6 +449,7 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. level.Debug(reqLogger).Log("msg", "Series: started fanout streams", "status", strings.Join(storeDebugMsgs, ";")) var respHeap seriesStream = NewProxyResponseLoserTree(storeResponses...) + if s.enableDedup { respHeap = NewResponseDeduplicatorInternal(respHeap, s.quorumChunkDedup) } diff --git a/pkg/store/proxy_merge.go b/pkg/store/proxy_merge.go index 378ac78988..370b089bf1 100644 --- a/pkg/store/proxy_merge.go +++ b/pkg/store/proxy_merge.go @@ -9,6 +9,7 @@ import ( "io" "sort" "sync" + "sync/atomic" "time" "github.com/cespare/xxhash/v2" @@ -474,6 +475,7 @@ func newAsyncRespSet( shardInfo *storepb.ShardInfo, logger log.Logger, emptyStreamResponses prometheus.Counter, + qg *quorumGroup, ) (respSet, error) { var ( @@ -549,6 +551,7 @@ func newAsyncRespSet( applySharding, emptyStreamResponses, labelsToRemove, + qg, ), nil default: panic(fmt.Sprintf("unsupported retrieval strategy %s", retrievalStrategy)) @@ -602,6 +605,7 @@ func newEagerRespSet( applySharding bool, emptyStreamResponses prometheus.Counter, removeLabels map[string]struct{}, + qg *quorumGroup, ) respSet { ret := &eagerRespSet{ span: span, @@ -707,6 +711,10 @@ func newEagerRespSet( // Generally we need to resort here. sortWithoutLabels(l.bufferedResponses, l.removeLabels) + if qg != nil && atomic.AddInt64(qg.quorumCounter, 1) == qg.quorumValue { + qg.cancel() + } + }(ret) return ret