Skip to content

Commit

Permalink
query/storeset: do not close the connection if strict mode enabled (#…
Browse files Browse the repository at this point in the history
…2568)

* query/storeset: do not close the connection if strict mode enabled

Do not close the gRPC connection if establishing a connection has
succeeded but we have failed to get response to a Info() call. Without
this and with strict mode in such a case, we will always keep around a
closed connection that won't work anymore unless the whole Thanos Query
process will be restarted.

Signed-off-by: Giedrius Statkevičius <giedriuswork@gmail.com>

* query/storeset: add test, add CHANGELOG item

Signed-off-by: Giedrius Statkevičius <giedriuswork@gmail.com>
  • Loading branch information
GiedriusS authored May 6, 2020
1 parent f9e634e commit d9e2c3d
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel

- [#2536](https://github.com/thanos-io/thanos/pull/2536) minio-go: Fixed AWS STS endpoint url to https for Web Identity providers on AWS EKS
- [#2501](https://github.com/thanos-io/thanos/pull/2501) Query: gracefully handle additional fields in `SeriesResponse` protobuf message that may be added in the future.
- [#2568](https://github.com/thanos-io/thanos/pull/2568) Query: does not close the connection of strict, static nodes if establishing a connection had succeeded but Info() call failed

### Added

Expand Down
7 changes: 4 additions & 3 deletions pkg/query/storeset.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,14 +414,15 @@ func (s *StoreSet) getActiveStores(ctx context.Context, stores map[string]*store
level.Warn(s.logger).Log("msg", "update of store node failed", "err", errors.Wrap(err, "dialing connection"), "address", addr)
return
}
st = &storeRef{StoreClient: storepb.NewStoreClient(conn), cc: conn, addr: addr, logger: s.logger}
st = &storeRef{StoreClient: storepb.NewStoreClient(conn), storeType: component.UnknownStoreAPI, cc: conn, addr: addr, logger: s.logger}
}

// Check existing or new store. Is it healthy? What are current metadata?
labelSets, minTime, maxTime, storeType, err := spec.Metadata(ctx, st.StoreClient)
if err != nil {
if !seenAlready {
// Close only if new. Unactive `s.stores` will be closed later on.
if !seenAlready && !spec.StrictStatic() {
// Close only if new and not a strict static node.
// Unactive `s.stores` will be closed later on.
st.Close()
}
s.updateStoreStatus(st, err)
Expand Down
34 changes: 31 additions & 3 deletions pkg/query/storeset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,14 @@ var testGRPCOpts = []grpc.DialOption{
}

type testStore struct {
info storepb.InfoResponse
infoDelay time.Duration
info storepb.InfoResponse
}

func (s *testStore) Info(ctx context.Context, r *storepb.InfoRequest) (*storepb.InfoResponse, error) {
if s.infoDelay > 0 {
time.Sleep(s.infoDelay)
}
return &s.info, nil
}

Expand All @@ -54,6 +58,7 @@ type testStoreMeta struct {
extlsetFn func(addr string) []storepb.LabelSet
storeType component.StoreAPI
minTime, maxTime int64
infoDelay time.Duration
}

type testStores struct {
Expand Down Expand Up @@ -82,6 +87,7 @@ func startTestStores(storeMetas []testStoreMeta) (*testStores, error) {
MaxTime: meta.maxTime,
MinTime: meta.minTime,
},
infoDelay: meta.infoDelay,
}
if meta.storeType != nil {
storeSrv.info.StoreType = meta.storeType.ToProto()
Expand Down Expand Up @@ -585,6 +591,25 @@ func TestQuerierStrict(t *testing.T) {
},
storeType: component.Sidecar,
},
// Slow store.
{
minTime: 65644,
maxTime: 77777,
extlsetFn: func(addr string) []storepb.LabelSet {
return []storepb.LabelSet{
{
Labels: []storepb.Label{
{
Name: "addr",
Value: addr,
},
},
},
}
},
storeType: component.Sidecar,
infoDelay: 2 * time.Second,
},
})

testutil.Ok(t, err)
Expand All @@ -595,14 +620,17 @@ func TestQuerierStrict(t *testing.T) {
return []StoreSpec{
NewGRPCStoreSpec(st.StoreAddresses()[0], true),
NewGRPCStoreSpec(st.StoreAddresses()[1], false),
NewGRPCStoreSpec(st.StoreAddresses()[2], true),
}
}, testGRPCOpts, time.Minute)
defer storeSet.Close()
storeSet.gRPCInfoCallTimeout = 1 * time.Second

// Initial update.
storeSet.Update(context.Background())
testutil.Equals(t, 2, len(storeSet.stores), "two clients must be available for running store nodes")
testutil.Equals(t, 3, len(storeSet.stores), "three clients must be available for running store nodes")

testutil.Assert(t, storeSet.stores[st.StoreAddresses()[2]].cc.GetState().String() != "SHUTDOWN", "slow store's connection should not be closed")

// The store is statically defined + strict mode is enabled
// so its client + information must be retained.
Expand All @@ -619,7 +647,7 @@ func TestQuerierStrict(t *testing.T) {
storeSet.Update(context.Background())

// Check that the information is the same.
testutil.Equals(t, 1, len(storeSet.stores), "one client must remain available for a store node that is down")
testutil.Equals(t, 2, len(storeSet.stores), "two static clients must remain available")
testutil.Equals(t, curMin, storeSet.stores[staticStoreAddr].minTime, "minimum time reported by the store node is different")
testutil.Equals(t, curMax, storeSet.stores[staticStoreAddr].maxTime, "minimum time reported by the store node is different")
testutil.NotOk(t, storeSet.storeStatuses[staticStoreAddr].LastError)
Expand Down

0 comments on commit d9e2c3d

Please sign in to comment.