Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Option to include quorum zone results for DistributorQueryable metadata API #5779

Merged
merged 8 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* [FEATURE] Tracing: Add `kuberesolver` to resolve endpoints address with `kubernetes://` prefix as Kubernetes service. #5731
* [FEATURE] Tracing: Add `tracing.otel.round-robin` flag to use `round_robin` gRPC client side LB policy for sending OTLP traces. #5731
* [FEATURE] Ruler: Add `ruler.concurrent-evals-enabled` flag to enable concurrent evaluation within a single rule group for independent rules. Maximum concurrency can be configured via `ruler.max-concurrent-evals`. #5766
* [FEATURE] Distributor Queryable: Experimental: Add config `zone_results_quorum_metadata`. When querying ingesters using metadata APIs such as label names, values and series, only results from quorum number of zones will be included and merged. #5779
* [ENHANCEMENT] Store Gateway: Added `-store-gateway.enabled-tenants` and `-store-gateway.disabled-tenants` to explicitly enable or disable store-gateway for specific tenants. #5638
* [ENHANCEMENT] Compactor: Add new compactor metric `cortex_compactor_start_duration_seconds`. #5683
* [ENHANCEMENT] Upgraded Docker base images to `alpine:3.18`. #5684
Expand Down
92 changes: 92 additions & 0 deletions integration/zone_aware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,95 @@ func TestZoneAwareReplication(t *testing.T) {
require.Equal(t, 500, res.StatusCode)

}

func TestZoneResultsQuorum(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

flags := BlocksStorageFlags()
flags["-distributor.shard-by-all-labels"] = "true"
flags["-distributor.replication-factor"] = "3"
flags["-distributor.zone-awareness-enabled"] = "true"

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(consul, minio))

// Start Cortex components.
ingesterFlags := func(zone string) map[string]string {
return mergeFlags(flags, map[string]string{
"-ingester.availability-zone": zone,
})
}

ingester1 := e2ecortex.NewIngesterWithConfigFile("ingester-1", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-a"), "")
ingester2 := e2ecortex.NewIngesterWithConfigFile("ingester-2", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-a"), "")
ingester3 := e2ecortex.NewIngesterWithConfigFile("ingester-3", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-b"), "")
ingester4 := e2ecortex.NewIngesterWithConfigFile("ingester-4", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-b"), "")
ingester5 := e2ecortex.NewIngesterWithConfigFile("ingester-5", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-c"), "")
ingester6 := e2ecortex.NewIngesterWithConfigFile("ingester-6", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-c"), "")
require.NoError(t, s.StartAndWaitReady(ingester1, ingester2, ingester3, ingester4, ingester5, ingester6))

distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
flagsZoneResultsQuorum := mergeFlags(flags, map[string]string{
"-distributor.zone-results-quorum-metadata": "true",
})
querierZoneResultsQuorum := e2ecortex.NewQuerier("querier-zrq", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flagsZoneResultsQuorum, "")
require.NoError(t, s.StartAndWaitReady(distributor, querier, querierZoneResultsQuorum))

// Wait until distributor and queriers have updated the ring.
require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(6), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))

require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Equals(6), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))

require.NoError(t, querierZoneResultsQuorum.WaitSumMetricsWithOptions(e2e.Equals(6), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))

client, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", userID)
require.NoError(t, err)
clientZoneResultsQuorum, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querierZoneResultsQuorum.HTTPEndpoint(), "", "", userID)
require.NoError(t, err)

// Push some series
now := time.Now()
numSeries := 100
expectedVectors := map[string]model.Vector{}

for i := 1; i <= numSeries; i++ {
metricName := fmt.Sprintf("series_%d", i)
series, expectedVector := generateSeries(metricName, now)
res, err := client.Push(series)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

expectedVectors[metricName] = expectedVector
}

start := now.Add(-time.Hour)
end := now.Add(time.Hour)
res1, err := client.LabelNames(start, end)
require.NoError(t, err)
res2, err := clientZoneResultsQuorum.LabelNames(start, end)
require.NoError(t, err)
assert.Equal(t, res1, res2)

values1, err := client.LabelValues(labels.MetricName, start, end, nil)
require.NoError(t, err)
values2, err := clientZoneResultsQuorum.LabelValues(labels.MetricName, start, end, nil)
require.NoError(t, err)
assert.Equal(t, values1, values2)

series1, err := client.Series([]string{`{__name__=~"series_1|series_2|series_3|series_4|series_5"}`}, start, end)
require.NoError(t, err)
series2, err := clientZoneResultsQuorum.Series([]string{`{__name__=~"series_1|series_2|series_3|series_4|series_5"}`}, start, end)
require.NoError(t, err)
assert.Equal(t, series1, series2)
}
26 changes: 16 additions & 10 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ type Config struct {
// This config is dynamically injected because defined in the querier config.
ShuffleShardingLookbackPeriod time.Duration `yaml:"-"`

// ZoneResultsQuorumMetadata enables zone results quorum when querying ingester replication set
// with metadata APIs (labels names, values and series). When zone awareness is enabled, only results
// from quorum number of zones will be included to reduce data merged and improve performance.
ZoneResultsQuorumMetadata bool `yaml:"zone_results_quorum_metadata" doc:"hidden"`

// Limits for distributor
InstanceLimits InstanceLimits `yaml:"instance_limits"`
}
Expand All @@ -167,6 +172,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.SignWriteRequestsEnabled, "distributor.sign-write-requests", false, "EXPERIMENTAL: If enabled, sign the write request between distributors and ingesters.")
f.StringVar(&cfg.ShardingStrategy, "distributor.sharding-strategy", util.ShardingStrategyDefault, fmt.Sprintf("The sharding strategy to use. Supported values are: %s.", strings.Join(supportedShardingStrategies, ", ")))
f.BoolVar(&cfg.ExtendWrites, "distributor.extend-writes", true, "Try writing to an additional ingester in the presence of an ingester not in the ACTIVE state. It is useful to disable this along with -ingester.unregister-on-shutdown=false in order to not spread samples to extra ingesters during rolling restarts with consistent naming.")
f.BoolVar(&cfg.ZoneResultsQuorumMetadata, "distributor.zone-results-quorum-metadata", false, "Experimental, this flag may change in the future. If zone awareness and this both enabled, when querying metadata APIs (labels names, values and series), only results from quorum number of zones will be included.")

f.Float64Var(&cfg.InstanceLimits.MaxIngestionRate, "distributor.instance-limits.max-ingestion-rate", 0, "Max ingestion rate (samples/sec) that this distributor will accept. This limit is per-distributor, not per-tenant. Additional push requests will be rejected. Current ingestion rate is computed as exponentially weighted moving average, updated every second. 0 = unlimited.")
f.IntVar(&cfg.InstanceLimits.MaxInflightPushRequests, "distributor.instance-limits.max-inflight-push-requests", 0, "Max inflight push requests that this distributor can handle. This limit is per-distributor, not per-tenant. Additional requests will be rejected. 0 = unlimited.")
Expand Down Expand Up @@ -924,8 +930,8 @@ func getErrorStatus(err error) string {
}

// ForReplicationSet runs f, in parallel, for all ingesters in the input replication set.
func (d *Distributor) ForReplicationSet(ctx context.Context, replicationSet ring.ReplicationSet, f func(context.Context, ingester_client.IngesterClient) (interface{}, error)) ([]interface{}, error) {
return replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) {
func (d *Distributor) ForReplicationSet(ctx context.Context, replicationSet ring.ReplicationSet, zoneResultsQuorum bool, f func(context.Context, ingester_client.IngesterClient) (interface{}, error)) ([]interface{}, error) {
return replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, zoneResultsQuorum, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) {
client, err := d.ingesterPool.GetClientFor(ing.Addr)
if err != nil {
return nil, err
Expand Down Expand Up @@ -981,7 +987,7 @@ func (d *Distributor) LabelValuesForLabelNameCommon(ctx context.Context, from, t
// LabelValuesForLabelName returns all the label values that are associated with a given label name.
func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to model.Time, labelName model.LabelName, matchers ...*labels.Matcher) ([]string, error) {
return d.LabelValuesForLabelNameCommon(ctx, from, to, labelName, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error) {
return d.ForReplicationSet(ctx, rs, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
resp, err := client.LabelValues(ctx, req)
if err != nil {
return nil, err
Expand All @@ -994,7 +1000,7 @@ func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to mode
// LabelValuesForLabelNameStream returns all the label values that are associated with a given label name.
func (d *Distributor) LabelValuesForLabelNameStream(ctx context.Context, from, to model.Time, labelName model.LabelName, matchers ...*labels.Matcher) ([]string, error) {
return d.LabelValuesForLabelNameCommon(ctx, from, to, labelName, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error) {
return d.ForReplicationSet(ctx, rs, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
stream, err := client.LabelValuesStream(ctx, req)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1059,7 +1065,7 @@ func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time,

func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time) ([]string, error) {
return d.LabelNamesCommon(ctx, from, to, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error) {
return d.ForReplicationSet(ctx, rs, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
stream, err := client.LabelNamesStream(ctx, req)
if err != nil {
return nil, err
Expand All @@ -1085,7 +1091,7 @@ func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time)
// LabelNames returns all the label names.
func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time) ([]string, error) {
return d.LabelNamesCommon(ctx, from, to, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error) {
return d.ForReplicationSet(ctx, rs, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
resp, err := client.LabelNames(ctx, req)
if err != nil {
return nil, err
Expand All @@ -1098,7 +1104,7 @@ func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time) ([]st
// MetricsForLabelMatchers gets the metrics that match said matchers
func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]metric.Metric, error) {
return d.metricsForLabelMatchersCommon(ctx, from, through, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.MetricsForLabelMatchersRequest, metrics *map[model.Fingerprint]model.Metric, mutex *sync.Mutex, queryLimiter *limiter.QueryLimiter) error {
_, err := d.ForReplicationSet(ctx, rs, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
_, err := d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
resp, err := client.MetricsForLabelMatchers(ctx, req)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1127,7 +1133,7 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through

func (d *Distributor) MetricsForLabelMatchersStream(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]metric.Metric, error) {
return d.metricsForLabelMatchersCommon(ctx, from, through, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.MetricsForLabelMatchersRequest, metrics *map[model.Fingerprint]model.Metric, mutex *sync.Mutex, queryLimiter *limiter.QueryLimiter) error {
_, err := d.ForReplicationSet(ctx, rs, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
_, err := d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
stream, err := client.MetricsForLabelMatchersStream(ctx, req)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1205,7 +1211,7 @@ func (d *Distributor) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetad

req := &ingester_client.MetricsMetadataRequest{}
// TODO(gotjosh): We only need to look in all the ingesters if shardByAllLabels is enabled.
resps, err := d.ForReplicationSet(ctx, replicationSet, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
resps, err := d.ForReplicationSet(ctx, replicationSet, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
return client.MetricsMetadata(ctx, req)
})
if err != nil {
Expand Down Expand Up @@ -1247,7 +1253,7 @@ func (d *Distributor) UserStats(ctx context.Context) (*UserStats, error) {
replicationSet.MaxErrors = 0

req := &ingester_client.UserStatsRequest{}
resps, err := d.ForReplicationSet(ctx, replicationSet, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
resps, err := d.ForReplicationSet(ctx, replicationSet, false, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
return client.UserStats(ctx, req)
})
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (d *Distributor) GetIngestersForMetadata(ctx context.Context) (ring.Replica
func (d *Distributor) queryIngesters(ctx context.Context, replicationSet ring.ReplicationSet, req *ingester_client.QueryRequest) (model.Matrix, error) {
// Fetch samples from multiple ingesters in parallel, using the replicationSet
// to deal with consistency.
results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) {
results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, false, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) {
client, err := d.ingesterPool.GetClientFor(ing.Addr)
if err != nil {
return nil, err
Expand Down Expand Up @@ -232,7 +232,7 @@ func mergeExemplarSets(a, b []cortexpb.Exemplar) []cortexpb.Exemplar {
func (d *Distributor) queryIngestersExemplars(ctx context.Context, replicationSet ring.ReplicationSet, req *ingester_client.ExemplarQueryRequest) (*ingester_client.ExemplarQueryResponse, error) {
// Fetch exemplars from multiple ingesters in parallel, using the replicationSet
// to deal with consistency.
results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) {
results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, false, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) {
client, err := d.ingesterPool.GetClientFor(ing.Addr)
if err != nil {
return nil, err
Expand Down Expand Up @@ -293,7 +293,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
)

// Fetch samples from multiple ingesters
results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) {
results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, false, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) {
client, err := d.ingesterPool.GetClientFor(ing.Addr)
if err != nil {
return nil, err
Expand Down
17 changes: 7 additions & 10 deletions pkg/ring/replication_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,19 @@ type ReplicationSet struct {
}

// Do function f in parallel for all replicas in the set, erroring is we exceed
// MaxErrors and returning early otherwise.
func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, f func(context.Context, *InstanceDesc) (interface{}, error)) ([]interface{}, error) {
// MaxErrors and returning early otherwise. zoneResultsQuorum allows only include
// results from zones that already reach quorum to improve performance.
func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, zoneResultsQuorum bool, f func(context.Context, *InstanceDesc) (interface{}, error)) ([]interface{}, error) {
type instanceResult struct {
res interface{}
err error
instance *InstanceDesc
}

// Initialise the result tracker, which is use to keep track of successes and failures.
// Initialise the result tracker, which is used to keep track of successes and failures.
var tracker replicationSetResultTracker
if r.MaxUnavailableZones > 0 {
tracker = newZoneAwareResultTracker(r.Instances, r.MaxUnavailableZones)
tracker = newZoneAwareResultTracker(r.Instances, r.MaxUnavailableZones, zoneResultsQuorum)
} else {
tracker = newDefaultResultTracker(r.Instances, r.MaxErrors)
}
Expand Down Expand Up @@ -67,12 +68,10 @@ func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, f func(cont
}(i, &r.Instances[i])
}

results := make([]interface{}, 0, len(r.Instances))

for !tracker.succeeded() {
select {
case res := <-ch:
tracker.done(res.instance, res.err)
tracker.done(res.instance, res.res, res.err)
if res.err != nil {
if tracker.failed() {
return nil, res.err
Expand All @@ -82,16 +81,14 @@ func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, f func(cont
if delay > 0 && r.MaxUnavailableZones == 0 {
forceStart <- struct{}{}
}
} else {
results = append(results, res.res)
}

case <-ctx.Done():
return nil, ctx.Err()
}
}

return results, nil
return tracker.getResults(), nil
}

// Includes returns whether the replication set includes the replica with the provided addr.
Expand Down
Loading
Loading