Skip to content

Commit

Permalink
Option to include quorum zone results for DistributorQueryable metada…
Browse files Browse the repository at this point in the history
…ta API (#5779)

* add zone-results-quorum for metadata APIs

Signed-off-by: Ben Ye <benye@amazon.com>

* update doc

Signed-off-by: Ben Ye <benye@amazon.com>

* integration test

Signed-off-by: Ben Ye <benye@amazon.com>

* fix tests

Signed-off-by: Ben Ye <benye@amazon.com>

* mark flag hidden

Signed-off-by: Ben Ye <benye@amazon.com>

* changelog

Signed-off-by: Ben Ye <benye@amazon.com>

* refactor interfaces

Signed-off-by: Ben Ye <benye@amazon.com>

* update comment

Signed-off-by: Ben Ye <benye@amazon.com>

---------

Signed-off-by: Ben Ye <benye@amazon.com>
  • Loading branch information
yeya24 authored Feb 21, 2024
1 parent 73c8e5a commit 0ecf317
Show file tree
Hide file tree
Showing 8 changed files with 363 additions and 61 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* [FEATURE] Tracing: Add `kuberesolver` to resolve endpoints address with `kubernetes://` prefix as Kubernetes service. #5731
* [FEATURE] Tracing: Add `tracing.otel.round-robin` flag to use `round_robin` gRPC client side LB policy for sending OTLP traces. #5731
* [FEATURE] Ruler: Add `ruler.concurrent-evals-enabled` flag to enable concurrent evaluation within a single rule group for independent rules. Maximum concurrency can be configured via `ruler.max-concurrent-evals`. #5766
* [FEATURE] Distributor Queryable: Experimental: Add config `zone_results_quorum_metadata`. When querying ingesters using metadata APIs such as label names, values and series, only results from quorum number of zones will be included and merged. #5779
* [ENHANCEMENT] Store Gateway: Added `-store-gateway.enabled-tenants` and `-store-gateway.disabled-tenants` to explicitly enable or disable store-gateway for specific tenants. #5638
* [ENHANCEMENT] Compactor: Add new compactor metric `cortex_compactor_start_duration_seconds`. #5683
* [ENHANCEMENT] Upgraded Docker base images to `alpine:3.18`. #5684
Expand Down
92 changes: 92 additions & 0 deletions integration/zone_aware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,95 @@ func TestZoneAwareReplication(t *testing.T) {
require.Equal(t, 500, res.StatusCode)

}

func TestZoneResultsQuorum(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

flags := BlocksStorageFlags()
flags["-distributor.shard-by-all-labels"] = "true"
flags["-distributor.replication-factor"] = "3"
flags["-distributor.zone-awareness-enabled"] = "true"

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(consul, minio))

// Start Cortex components.
ingesterFlags := func(zone string) map[string]string {
return mergeFlags(flags, map[string]string{
"-ingester.availability-zone": zone,
})
}

ingester1 := e2ecortex.NewIngesterWithConfigFile("ingester-1", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-a"), "")
ingester2 := e2ecortex.NewIngesterWithConfigFile("ingester-2", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-a"), "")
ingester3 := e2ecortex.NewIngesterWithConfigFile("ingester-3", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-b"), "")
ingester4 := e2ecortex.NewIngesterWithConfigFile("ingester-4", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-b"), "")
ingester5 := e2ecortex.NewIngesterWithConfigFile("ingester-5", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-c"), "")
ingester6 := e2ecortex.NewIngesterWithConfigFile("ingester-6", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-c"), "")
require.NoError(t, s.StartAndWaitReady(ingester1, ingester2, ingester3, ingester4, ingester5, ingester6))

distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
flagsZoneResultsQuorum := mergeFlags(flags, map[string]string{
"-distributor.zone-results-quorum-metadata": "true",
})
querierZoneResultsQuorum := e2ecortex.NewQuerier("querier-zrq", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flagsZoneResultsQuorum, "")
require.NoError(t, s.StartAndWaitReady(distributor, querier, querierZoneResultsQuorum))

// Wait until distributor and queriers have updated the ring.
require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(6), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))

require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Equals(6), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))

require.NoError(t, querierZoneResultsQuorum.WaitSumMetricsWithOptions(e2e.Equals(6), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))

client, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", userID)
require.NoError(t, err)
clientZoneResultsQuorum, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querierZoneResultsQuorum.HTTPEndpoint(), "", "", userID)
require.NoError(t, err)

// Push some series
now := time.Now()
numSeries := 100
expectedVectors := map[string]model.Vector{}

for i := 1; i <= numSeries; i++ {
metricName := fmt.Sprintf("series_%d", i)
series, expectedVector := generateSeries(metricName, now)
res, err := client.Push(series)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

expectedVectors[metricName] = expectedVector
}

start := now.Add(-time.Hour)
end := now.Add(time.Hour)
res1, err := client.LabelNames(start, end)
require.NoError(t, err)
res2, err := clientZoneResultsQuorum.LabelNames(start, end)
require.NoError(t, err)
assert.Equal(t, res1, res2)

values1, err := client.LabelValues(labels.MetricName, start, end, nil)
require.NoError(t, err)
values2, err := clientZoneResultsQuorum.LabelValues(labels.MetricName, start, end, nil)
require.NoError(t, err)
assert.Equal(t, values1, values2)

series1, err := client.Series([]string{`{__name__=~"series_1|series_2|series_3|series_4|series_5"}`}, start, end)
require.NoError(t, err)
series2, err := clientZoneResultsQuorum.Series([]string{`{__name__=~"series_1|series_2|series_3|series_4|series_5"}`}, start, end)
require.NoError(t, err)
assert.Equal(t, series1, series2)
}
26 changes: 16 additions & 10 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ type Config struct {
// This config is dynamically injected because defined in the querier config.
ShuffleShardingLookbackPeriod time.Duration `yaml:"-"`

// ZoneResultsQuorumMetadata enables zone results quorum when querying ingester replication set
// with metadata APIs (labels names, values and series). When zone awareness is enabled, only results
// from quorum number of zones will be included to reduce data merged and improve performance.
ZoneResultsQuorumMetadata bool `yaml:"zone_results_quorum_metadata" doc:"hidden"`

// Limits for distributor
InstanceLimits InstanceLimits `yaml:"instance_limits"`
}
Expand All @@ -167,6 +172,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.SignWriteRequestsEnabled, "distributor.sign-write-requests", false, "EXPERIMENTAL: If enabled, sign the write request between distributors and ingesters.")
f.StringVar(&cfg.ShardingStrategy, "distributor.sharding-strategy", util.ShardingStrategyDefault, fmt.Sprintf("The sharding strategy to use. Supported values are: %s.", strings.Join(supportedShardingStrategies, ", ")))
f.BoolVar(&cfg.ExtendWrites, "distributor.extend-writes", true, "Try writing to an additional ingester in the presence of an ingester not in the ACTIVE state. It is useful to disable this along with -ingester.unregister-on-shutdown=false in order to not spread samples to extra ingesters during rolling restarts with consistent naming.")
f.BoolVar(&cfg.ZoneResultsQuorumMetadata, "distributor.zone-results-quorum-metadata", false, "Experimental, this flag may change in the future. If zone awareness and this both enabled, when querying metadata APIs (labels names, values and series), only results from quorum number of zones will be included.")

f.Float64Var(&cfg.InstanceLimits.MaxIngestionRate, "distributor.instance-limits.max-ingestion-rate", 0, "Max ingestion rate (samples/sec) that this distributor will accept. This limit is per-distributor, not per-tenant. Additional push requests will be rejected. Current ingestion rate is computed as exponentially weighted moving average, updated every second. 0 = unlimited.")
f.IntVar(&cfg.InstanceLimits.MaxInflightPushRequests, "distributor.instance-limits.max-inflight-push-requests", 0, "Max inflight push requests that this distributor can handle. This limit is per-distributor, not per-tenant. Additional requests will be rejected. 0 = unlimited.")
Expand Down Expand Up @@ -924,8 +930,8 @@ func getErrorStatus(err error) string {
}

// ForReplicationSet runs f, in parallel, for all ingesters in the input replication set.
func (d *Distributor) ForReplicationSet(ctx context.Context, replicationSet ring.ReplicationSet, f func(context.Context, ingester_client.IngesterClient) (interface{}, error)) ([]interface{}, error) {
return replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) {
func (d *Distributor) ForReplicationSet(ctx context.Context, replicationSet ring.ReplicationSet, zoneResultsQuorum bool, f func(context.Context, ingester_client.IngesterClient) (interface{}, error)) ([]interface{}, error) {
return replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, zoneResultsQuorum, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) {
client, err := d.ingesterPool.GetClientFor(ing.Addr)
if err != nil {
return nil, err
Expand Down Expand Up @@ -981,7 +987,7 @@ func (d *Distributor) LabelValuesForLabelNameCommon(ctx context.Context, from, t
// LabelValuesForLabelName returns all the label values that are associated with a given label name.
func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to model.Time, labelName model.LabelName, matchers ...*labels.Matcher) ([]string, error) {
return d.LabelValuesForLabelNameCommon(ctx, from, to, labelName, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error) {
return d.ForReplicationSet(ctx, rs, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
resp, err := client.LabelValues(ctx, req)
if err != nil {
return nil, err
Expand All @@ -994,7 +1000,7 @@ func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to mode
// LabelValuesForLabelNameStream returns all the label values that are associated with a given label name.
func (d *Distributor) LabelValuesForLabelNameStream(ctx context.Context, from, to model.Time, labelName model.LabelName, matchers ...*labels.Matcher) ([]string, error) {
return d.LabelValuesForLabelNameCommon(ctx, from, to, labelName, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error) {
return d.ForReplicationSet(ctx, rs, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
stream, err := client.LabelValuesStream(ctx, req)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1059,7 +1065,7 @@ func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time,

func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time) ([]string, error) {
return d.LabelNamesCommon(ctx, from, to, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error) {
return d.ForReplicationSet(ctx, rs, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
stream, err := client.LabelNamesStream(ctx, req)
if err != nil {
return nil, err
Expand All @@ -1085,7 +1091,7 @@ func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time)
// LabelNames returns all the label names.
func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time) ([]string, error) {
return d.LabelNamesCommon(ctx, from, to, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error) {
return d.ForReplicationSet(ctx, rs, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
resp, err := client.LabelNames(ctx, req)
if err != nil {
return nil, err
Expand All @@ -1098,7 +1104,7 @@ func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time) ([]st
// MetricsForLabelMatchers gets the metrics that match said matchers
func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]metric.Metric, error) {
return d.metricsForLabelMatchersCommon(ctx, from, through, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.MetricsForLabelMatchersRequest, metrics *map[model.Fingerprint]model.Metric, mutex *sync.Mutex, queryLimiter *limiter.QueryLimiter) error {
_, err := d.ForReplicationSet(ctx, rs, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
_, err := d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
resp, err := client.MetricsForLabelMatchers(ctx, req)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1127,7 +1133,7 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through

func (d *Distributor) MetricsForLabelMatchersStream(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]metric.Metric, error) {
return d.metricsForLabelMatchersCommon(ctx, from, through, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.MetricsForLabelMatchersRequest, metrics *map[model.Fingerprint]model.Metric, mutex *sync.Mutex, queryLimiter *limiter.QueryLimiter) error {
_, err := d.ForReplicationSet(ctx, rs, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
_, err := d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
stream, err := client.MetricsForLabelMatchersStream(ctx, req)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1205,7 +1211,7 @@ func (d *Distributor) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetad

req := &ingester_client.MetricsMetadataRequest{}
// TODO(gotjosh): We only need to look in all the ingesters if shardByAllLabels is enabled.
resps, err := d.ForReplicationSet(ctx, replicationSet, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
resps, err := d.ForReplicationSet(ctx, replicationSet, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
return client.MetricsMetadata(ctx, req)
})
if err != nil {
Expand Down Expand Up @@ -1247,7 +1253,7 @@ func (d *Distributor) UserStats(ctx context.Context) (*UserStats, error) {
replicationSet.MaxErrors = 0

req := &ingester_client.UserStatsRequest{}
resps, err := d.ForReplicationSet(ctx, replicationSet, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
resps, err := d.ForReplicationSet(ctx, replicationSet, false, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
return client.UserStats(ctx, req)
})
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (d *Distributor) GetIngestersForMetadata(ctx context.Context) (ring.Replica
func (d *Distributor) queryIngesters(ctx context.Context, replicationSet ring.ReplicationSet, req *ingester_client.QueryRequest) (model.Matrix, error) {
// Fetch samples from multiple ingesters in parallel, using the replicationSet
// to deal with consistency.
results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) {
results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, false, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) {
client, err := d.ingesterPool.GetClientFor(ing.Addr)
if err != nil {
return nil, err
Expand Down Expand Up @@ -232,7 +232,7 @@ func mergeExemplarSets(a, b []cortexpb.Exemplar) []cortexpb.Exemplar {
func (d *Distributor) queryIngestersExemplars(ctx context.Context, replicationSet ring.ReplicationSet, req *ingester_client.ExemplarQueryRequest) (*ingester_client.ExemplarQueryResponse, error) {
// Fetch exemplars from multiple ingesters in parallel, using the replicationSet
// to deal with consistency.
results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) {
results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, false, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) {
client, err := d.ingesterPool.GetClientFor(ing.Addr)
if err != nil {
return nil, err
Expand Down Expand Up @@ -293,7 +293,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
)

// Fetch samples from multiple ingesters
results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) {
results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, false, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) {
client, err := d.ingesterPool.GetClientFor(ing.Addr)
if err != nil {
return nil, err
Expand Down
17 changes: 7 additions & 10 deletions pkg/ring/replication_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,19 @@ type ReplicationSet struct {
}

// Do function f in parallel for all replicas in the set, erroring is we exceed
// MaxErrors and returning early otherwise.
func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, f func(context.Context, *InstanceDesc) (interface{}, error)) ([]interface{}, error) {
// MaxErrors and returning early otherwise. zoneResultsQuorum allows only include
// results from zones that already reach quorum to improve performance.
func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, zoneResultsQuorum bool, f func(context.Context, *InstanceDesc) (interface{}, error)) ([]interface{}, error) {
type instanceResult struct {
res interface{}
err error
instance *InstanceDesc
}

// Initialise the result tracker, which is use to keep track of successes and failures.
// Initialise the result tracker, which is used to keep track of successes and failures.
var tracker replicationSetResultTracker
if r.MaxUnavailableZones > 0 {
tracker = newZoneAwareResultTracker(r.Instances, r.MaxUnavailableZones)
tracker = newZoneAwareResultTracker(r.Instances, r.MaxUnavailableZones, zoneResultsQuorum)
} else {
tracker = newDefaultResultTracker(r.Instances, r.MaxErrors)
}
Expand Down Expand Up @@ -67,12 +68,10 @@ func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, f func(cont
}(i, &r.Instances[i])
}

results := make([]interface{}, 0, len(r.Instances))

for !tracker.succeeded() {
select {
case res := <-ch:
tracker.done(res.instance, res.err)
tracker.done(res.instance, res.res, res.err)
if res.err != nil {
if tracker.failed() {
return nil, res.err
Expand All @@ -82,16 +81,14 @@ func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, f func(cont
if delay > 0 && r.MaxUnavailableZones == 0 {
forceStart <- struct{}{}
}
} else {
results = append(results, res.res)
}

case <-ctx.Done():
return nil, ctx.Err()
}
}

return results, nil
return tracker.getResults(), nil
}

// Includes returns whether the replication set includes the replica with the provided addr.
Expand Down
Loading

0 comments on commit 0ecf317

Please sign in to comment.