Skip to content

Commit

Permalink
Endpointset: Do not use info client to obtain metadata (for now) (tha…
Browse files Browse the repository at this point in the history
…nos-io#4714)

* Do not use info client to obtain metadata

Signed-off-by: Matej Gera <matejgera@gmail.com>

* Update CHANGELOG.

Signed-off-by: Matej Gera <matejgera@gmail.com>

* Comment out client.info usage

Signed-off-by: Matej Gera <matejgera@gmail.com>

* Fix lint error

Signed-off-by: Matej Gera <matejgera@gmail.com>
  • Loading branch information
matej-g authored Sep 30, 2021
1 parent fe0d695 commit f7f7061
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 14 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

## Unreleased

### Fixed

- [#4714](https://github.com/thanos-io/thanos/pull/4714) Endpointset: Do not use info client to obtain metadata.

## [v0.23.0](https://github.com/thanos-io/thanos/tree/release-0.23) - 2021.09.23

### Added
Expand Down
39 changes: 29 additions & 10 deletions pkg/query/endpointset.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ import (
)

const (
unhealthyEndpointMessage = "removing endpoint because it's unhealthy or does not exist"
unhealthyEndpointMessage = "removing endpoint because it's unhealthy or does not exist"
noMetadataEndpointMessage = "cannot obtain metadata: neither info nor store client found"

// Default minimum and maximum time values used by Prometheus when they are not passed as query parameter.
MinTime = -9223309901257974
Expand Down Expand Up @@ -76,17 +77,27 @@ func (es *grpcEndpointSpec) Addr() string {
// Metadata method for gRPC endpoint tries to call InfoAPI exposed by Thanos components until context timeout. If we are unable to get metadata after
// that time, we assume that the host is unhealthy and return error.
func (es *grpcEndpointSpec) Metadata(ctx context.Context, client *endpointClients) (*endpointMetadata, error) {
resp, err := client.info.Info(ctx, &infopb.InfoRequest{}, grpc.WaitForReady(true))
if err != nil {
// Call Info method of StoreAPI, this way querier will be able to discovery old components not exposing InfoAPI.
metadata, merr := es.getMetadataUsingStoreAPI(ctx, client.store)
if merr != nil {
return nil, errors.Wrapf(merr, "fallback fetching info from %s after err: %v", es.addr, err)
// TODO(@matej-g): Info client should not be used due to https://github.com/thanos-io/thanos/issues/4699
// Uncomment this after it is implemented in https://github.com/thanos-io/thanos/pull/4282.
// if client.info != nil {
// resp, err := client.info.Info(ctx, &infopb.InfoRequest{}, grpc.WaitForReady(true))
// if err != nil {
// return nil, errors.Wrapf(err, "fetching info from %s", es.addr)
// }

// return &endpointMetadata{resp}, nil
// }

// Call Info method of StoreAPI, this way querier will be able to discovery old components not exposing InfoAPI.
if client.store != nil {
metadata, err := es.getMetadataUsingStoreAPI(ctx, client.store)
if err != nil {
return nil, errors.Wrapf(err, "fallback fetching info from %s", es.addr)
}
return metadata, nil
}

return &endpointMetadata{resp}, nil
return nil, errors.New(noMetadataEndpointMessage)
}

func (es *grpcEndpointSpec) getMetadataUsingStoreAPI(ctx context.Context, client storepb.StoreClient) (*endpointMetadata, error) {
Expand Down Expand Up @@ -494,7 +505,9 @@ func (e *EndpointSet) getActiveEndpoints(ctx context.Context, endpoints map[stri
logger: e.logger,
StoreClient: storepb.NewStoreClient(conn),
clients: &endpointClients{
info: infopb.NewInfoClient(conn),
// TODO(@matej-g): Info client should not be used due to https://github.com/thanos-io/thanos/issues/4699
// Uncomment this after it is implemented in https://github.com/thanos-io/thanos/pull/4282.
// info: infopb.NewInfoClient(conn),
store: storepb.NewStoreClient(conn),
},
}
Expand Down Expand Up @@ -668,6 +681,10 @@ func (er *endpointRef) ComponentType() component.Component {
er.mtx.RLock()
defer er.mtx.RUnlock()

if er.metadata == nil {
return component.UnknownStoreAPI
}

return component.FromString(er.metadata.ComponentType)
}

Expand Down Expand Up @@ -786,13 +803,15 @@ func (er *endpointRef) apisPresent() []string {
return apisPresent
}

// TODO(@matej-g): Info client should not be used due to https://github.com/thanos-io/thanos/issues/4699
// Uncomment the nolint directive after https://github.com/thanos-io/thanos/pull/4282.
type endpointClients struct {
store storepb.StoreClient
rule rulespb.RulesClient
metricMetadata metadatapb.MetadataClient
exemplar exemplarspb.ExemplarsClient
target targetspb.TargetsClient
info infopb.InfoClient
info infopb.InfoClient //nolint:structcheck,unused
}

type endpointMetadata struct {
Expand Down
70 changes: 66 additions & 4 deletions pkg/query/endpointset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/thanos-io/thanos/pkg/info/infopb"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/testutil"
)

Expand Down Expand Up @@ -58,7 +59,11 @@ var (
}
ruleInfo = &infopb.InfoResponse{
ComponentType: component.Rule.String(),
Rules: &infopb.RulesInfo{},
Store: &infopb.StoreInfo{
MinTime: math.MinInt64,
MaxTime: math.MaxInt64,
},
Rules: &infopb.RulesInfo{},
}
storeGWInfo = &infopb.InfoResponse{
ComponentType: component.Store.String(),
Expand Down Expand Up @@ -93,6 +98,28 @@ func (c *mockedEndpoint) Info(ctx context.Context, r *infopb.InfoRequest) (*info
return &c.info, nil
}

type mockedStoreSrv struct {
infoDelay time.Duration
info storepb.InfoResponse
}

func (s *mockedStoreSrv) Info(context.Context, *storepb.InfoRequest) (*storepb.InfoResponse, error) {
if s.infoDelay > 0 {
time.Sleep(s.infoDelay)
}

return &s.info, nil
}
func (s *mockedStoreSrv) Series(*storepb.SeriesRequest, storepb.Store_SeriesServer) error {
return nil
}
func (s *mockedStoreSrv) LabelNames(context.Context, *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
return nil, nil
}
func (s *mockedStoreSrv) LabelValues(context.Context, *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) {
return nil, nil
}

type APIs struct {
store bool
metricMetadata bool
Expand All @@ -113,6 +140,25 @@ type testEndpoints struct {
exposedAPIs map[string]*APIs
}

func componentTypeToStoreType(componentType string) storepb.StoreType {
switch componentType {
case component.Query.String():
return storepb.StoreType_QUERY
case component.Rule.String():
return storepb.StoreType_RULE
case component.Sidecar.String():
return storepb.StoreType_SIDECAR
case component.Store.String():
return storepb.StoreType_STORE
case component.Receive.String():
return storepb.StoreType_RECEIVE
case component.Debug.String():
return storepb.StoreType_DEBUG
default:
return storepb.StoreType_STORE
}
}

func startTestEndpoints(testEndpointMeta []testEndpointMeta) (*testEndpoints, error) {
e := &testEndpoints{
srvs: map[string]*grpc.Server{},
Expand All @@ -130,6 +176,19 @@ func startTestEndpoints(testEndpointMeta []testEndpointMeta) (*testEndpoints, er
srv := grpc.NewServer()
addr := listener.Addr().String()

storeSrv := &mockedStoreSrv{
info: storepb.InfoResponse{
LabelSets: meta.extlsetFn(listener.Addr().String()),
StoreType: componentTypeToStoreType(meta.ComponentType),
},
infoDelay: meta.infoDelay,
}

if meta.Store != nil {
storeSrv.info.MinTime = meta.Store.MinTime
storeSrv.info.MaxTime = meta.Store.MaxTime
}

endpointSrv := &mockedEndpoint{
info: infopb.InfoResponse{
LabelSets: meta.extlsetFn(listener.Addr().String()),
Expand All @@ -143,6 +202,7 @@ func startTestEndpoints(testEndpointMeta []testEndpointMeta) (*testEndpoints, er
infoDelay: meta.infoDelay,
}
infopb.RegisterInfoServer(srv, endpointSrv)
storepb.RegisterStoreServer(srv, storeSrv)
go func() {
_ = srv.Serve(listener)
}()
Expand Down Expand Up @@ -859,7 +919,7 @@ func TestEndpointSet_APIs_Discovery(t *testing.T) {
}
return endpointSpec
},
expectedStores: 4, // sidecar + querier + receiver + storeGW
expectedStores: 5, // sidecar + querier + receiver + storeGW + ruler
expectedRules: 3, // sidecar + querier + ruler
expectedTarget: 2, // sidecar + querier
expectedMetricMetadata: 2, // sidecar + querier
Expand Down Expand Up @@ -895,7 +955,7 @@ func TestEndpointSet_APIs_Discovery(t *testing.T) {
NewGRPCEndpointSpec(endpoints.orderAddrs[1], false),
}
},
expectedStores: 1, // sidecar
expectedStores: 2, // sidecar + ruler
expectedRules: 2, // sidecar + ruler
expectedTarget: 1, // sidecar
expectedMetricMetadata: 1, // sidecar
Expand All @@ -908,7 +968,8 @@ func TestEndpointSet_APIs_Discovery(t *testing.T) {
NewGRPCEndpointSpec(endpoints.orderAddrs[1], false),
}
},
expectedRules: 1, // ruler
expectedStores: 1, // ruler
expectedRules: 1, // ruler
},
},
},
Expand Down Expand Up @@ -1106,6 +1167,7 @@ func exposedAPIs(c string) *APIs {
}
case component.Rule.String():
return &APIs{
store: true,
rules: true,
}
case component.Store.String():
Expand Down

0 comments on commit f7f7061

Please sign in to comment.