diff --git a/CHANGELOG.md b/CHANGELOG.md index 136bc8d474..8bf14b493c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,10 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ## Unreleased +### Fixed + +- [#4714](https://github.com/thanos-io/thanos/pull/4714) Endpointset: Do not use info client to obtain metadata. + ## [v0.23.0](https://github.com/thanos-io/thanos/tree/release-0.23) - 2021.09.23 ### Added diff --git a/pkg/query/endpointset.go b/pkg/query/endpointset.go index 5b6f04532a..1c05ca6975 100644 --- a/pkg/query/endpointset.go +++ b/pkg/query/endpointset.go @@ -31,7 +31,8 @@ import ( ) const ( - unhealthyEndpointMessage = "removing endpoint because it's unhealthy or does not exist" + unhealthyEndpointMessage = "removing endpoint because it's unhealthy or does not exist" + noMetadataEndpointMessage = "cannot obtain metadata: neither info nor store client found" // Default minimum and maximum time values used by Prometheus when they are not passed as query parameter. MinTime = -9223309901257974 @@ -76,17 +77,27 @@ func (es *grpcEndpointSpec) Addr() string { // Metadata method for gRPC endpoint tries to call InfoAPI exposed by Thanos components until context timeout. If we are unable to get metadata after // that time, we assume that the host is unhealthy and return error. func (es *grpcEndpointSpec) Metadata(ctx context.Context, client *endpointClients) (*endpointMetadata, error) { - resp, err := client.info.Info(ctx, &infopb.InfoRequest{}, grpc.WaitForReady(true)) - if err != nil { - // Call Info method of StoreAPI, this way querier will be able to discovery old components not exposing InfoAPI. - metadata, merr := es.getMetadataUsingStoreAPI(ctx, client.store) - if merr != nil { - return nil, errors.Wrapf(merr, "fallback fetching info from %s after err: %v", es.addr, err) + // TODO(@matej-g): Info client should not be used due to https://github.com/thanos-io/thanos/issues/4699 + // Uncomment this after it is implemented in https://github.com/thanos-io/thanos/pull/4282. + // if client.info != nil { + // resp, err := client.info.Info(ctx, &infopb.InfoRequest{}, grpc.WaitForReady(true)) + // if err != nil { + // return nil, errors.Wrapf(err, "fetching info from %s", es.addr) + // } + + // return &endpointMetadata{resp}, nil + // } + + // Call Info method of StoreAPI, this way querier will be able to discovery old components not exposing InfoAPI. + if client.store != nil { + metadata, err := es.getMetadataUsingStoreAPI(ctx, client.store) + if err != nil { + return nil, errors.Wrapf(err, "fallback fetching info from %s", es.addr) } return metadata, nil } - return &endpointMetadata{resp}, nil + return nil, errors.New(noMetadataEndpointMessage) } func (es *grpcEndpointSpec) getMetadataUsingStoreAPI(ctx context.Context, client storepb.StoreClient) (*endpointMetadata, error) { @@ -494,7 +505,9 @@ func (e *EndpointSet) getActiveEndpoints(ctx context.Context, endpoints map[stri logger: e.logger, StoreClient: storepb.NewStoreClient(conn), clients: &endpointClients{ - info: infopb.NewInfoClient(conn), + // TODO(@matej-g): Info client should not be used due to https://github.com/thanos-io/thanos/issues/4699 + // Uncomment this after it is implemented in https://github.com/thanos-io/thanos/pull/4282. + // info: infopb.NewInfoClient(conn), store: storepb.NewStoreClient(conn), }, } @@ -668,6 +681,10 @@ func (er *endpointRef) ComponentType() component.Component { er.mtx.RLock() defer er.mtx.RUnlock() + if er.metadata == nil { + return component.UnknownStoreAPI + } + return component.FromString(er.metadata.ComponentType) } @@ -786,13 +803,15 @@ func (er *endpointRef) apisPresent() []string { return apisPresent } +// TODO(@matej-g): Info client should not be used due to https://github.com/thanos-io/thanos/issues/4699 +// Uncomment the nolint directive after https://github.com/thanos-io/thanos/pull/4282. type endpointClients struct { store storepb.StoreClient rule rulespb.RulesClient metricMetadata metadatapb.MetadataClient exemplar exemplarspb.ExemplarsClient target targetspb.TargetsClient - info infopb.InfoClient + info infopb.InfoClient //nolint:structcheck,unused } type endpointMetadata struct { diff --git a/pkg/query/endpointset_test.go b/pkg/query/endpointset_test.go index 3e6b89f38a..5dc7eefa45 100644 --- a/pkg/query/endpointset_test.go +++ b/pkg/query/endpointset_test.go @@ -19,6 +19,7 @@ import ( "github.com/thanos-io/thanos/pkg/info/infopb" "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/store/labelpb" + "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/testutil" ) @@ -58,7 +59,11 @@ var ( } ruleInfo = &infopb.InfoResponse{ ComponentType: component.Rule.String(), - Rules: &infopb.RulesInfo{}, + Store: &infopb.StoreInfo{ + MinTime: math.MinInt64, + MaxTime: math.MaxInt64, + }, + Rules: &infopb.RulesInfo{}, } storeGWInfo = &infopb.InfoResponse{ ComponentType: component.Store.String(), @@ -93,6 +98,28 @@ func (c *mockedEndpoint) Info(ctx context.Context, r *infopb.InfoRequest) (*info return &c.info, nil } +type mockedStoreSrv struct { + infoDelay time.Duration + info storepb.InfoResponse +} + +func (s *mockedStoreSrv) Info(context.Context, *storepb.InfoRequest) (*storepb.InfoResponse, error) { + if s.infoDelay > 0 { + time.Sleep(s.infoDelay) + } + + return &s.info, nil +} +func (s *mockedStoreSrv) Series(*storepb.SeriesRequest, storepb.Store_SeriesServer) error { + return nil +} +func (s *mockedStoreSrv) LabelNames(context.Context, *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { + return nil, nil +} +func (s *mockedStoreSrv) LabelValues(context.Context, *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) { + return nil, nil +} + type APIs struct { store bool metricMetadata bool @@ -113,6 +140,25 @@ type testEndpoints struct { exposedAPIs map[string]*APIs } +func componentTypeToStoreType(componentType string) storepb.StoreType { + switch componentType { + case component.Query.String(): + return storepb.StoreType_QUERY + case component.Rule.String(): + return storepb.StoreType_RULE + case component.Sidecar.String(): + return storepb.StoreType_SIDECAR + case component.Store.String(): + return storepb.StoreType_STORE + case component.Receive.String(): + return storepb.StoreType_RECEIVE + case component.Debug.String(): + return storepb.StoreType_DEBUG + default: + return storepb.StoreType_STORE + } +} + func startTestEndpoints(testEndpointMeta []testEndpointMeta) (*testEndpoints, error) { e := &testEndpoints{ srvs: map[string]*grpc.Server{}, @@ -130,6 +176,19 @@ func startTestEndpoints(testEndpointMeta []testEndpointMeta) (*testEndpoints, er srv := grpc.NewServer() addr := listener.Addr().String() + storeSrv := &mockedStoreSrv{ + info: storepb.InfoResponse{ + LabelSets: meta.extlsetFn(listener.Addr().String()), + StoreType: componentTypeToStoreType(meta.ComponentType), + }, + infoDelay: meta.infoDelay, + } + + if meta.Store != nil { + storeSrv.info.MinTime = meta.Store.MinTime + storeSrv.info.MaxTime = meta.Store.MaxTime + } + endpointSrv := &mockedEndpoint{ info: infopb.InfoResponse{ LabelSets: meta.extlsetFn(listener.Addr().String()), @@ -143,6 +202,7 @@ func startTestEndpoints(testEndpointMeta []testEndpointMeta) (*testEndpoints, er infoDelay: meta.infoDelay, } infopb.RegisterInfoServer(srv, endpointSrv) + storepb.RegisterStoreServer(srv, storeSrv) go func() { _ = srv.Serve(listener) }() @@ -859,7 +919,7 @@ func TestEndpointSet_APIs_Discovery(t *testing.T) { } return endpointSpec }, - expectedStores: 4, // sidecar + querier + receiver + storeGW + expectedStores: 5, // sidecar + querier + receiver + storeGW + ruler expectedRules: 3, // sidecar + querier + ruler expectedTarget: 2, // sidecar + querier expectedMetricMetadata: 2, // sidecar + querier @@ -895,7 +955,7 @@ func TestEndpointSet_APIs_Discovery(t *testing.T) { NewGRPCEndpointSpec(endpoints.orderAddrs[1], false), } }, - expectedStores: 1, // sidecar + expectedStores: 2, // sidecar + ruler expectedRules: 2, // sidecar + ruler expectedTarget: 1, // sidecar expectedMetricMetadata: 1, // sidecar @@ -908,7 +968,8 @@ func TestEndpointSet_APIs_Discovery(t *testing.T) { NewGRPCEndpointSpec(endpoints.orderAddrs[1], false), } }, - expectedRules: 1, // ruler + expectedStores: 1, // ruler + expectedRules: 1, // ruler }, }, }, @@ -1106,6 +1167,7 @@ func exposedAPIs(c string) *APIs { } case component.Rule.String(): return &APIs{ + store: true, rules: true, } case component.Store.String():