Skip to content

Commit

Permalink
e2e: initial implementation and fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Sergiusz Urbaniak <sergiusz.urbaniak@gmail.com>
  • Loading branch information
s-urbaniak committed Apr 29, 2020
1 parent 94ce497 commit c92ed81
Show file tree
Hide file tree
Showing 10 changed files with 118 additions and 26 deletions.
3 changes: 2 additions & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) {
time.Duration(*storeResponseTimeout),
*queryReplicaLabels,
selectorLset,
*stores, *rules,
*stores,
*rules,
*enableAutodownsampling,
*enablePartialResponse,
fileSD,
Expand Down
7 changes: 6 additions & 1 deletion pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,5 +622,10 @@ func (c *Client) RulesInGRPC(ctx context.Context, base *url.URL, typeRules strin
var m struct {
Data *storepb.RuleGroups `json:"data"`
}
return m.Data.Groups, c.get2xxResultWithGRPCErrors(ctx, "/prom_rules HTTP[client]", &u, &m)

if err := c.get2xxResultWithGRPCErrors(ctx, "/prom_rules HTTP[client]", &u, &m); err != nil {
return nil, err
}

return m.Data.Groups, nil
}
2 changes: 2 additions & 0 deletions pkg/query/api/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,8 @@ func (api *API) rules(r *http.Request) (interface{}, []error, *ApiError) {
PartialResponseStrategy: grp.PartialResponseStrategy,
}

apiRuleGroup.Rules = make([]*storepb.Rule, 0, len(grp.Rules))

for _, r := range grp.Rules {
switch {
case r.GetAlert() != nil:
Expand Down
5 changes: 5 additions & 0 deletions pkg/query/storeset.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,11 @@ func (s *StoreSet) Update(ctx context.Context) {

stores[addr] = st
s.updateStoreStatus(st, nil)

if st.rule != nil {
level.Info(s.logger).Log("msg", "adding new rulesAPI to query storeset", "address", addr)
}

level.Info(s.logger).Log("msg", "adding new storeAPI to query storeset", "address", addr, "extLset", extLset)
}

Expand Down
15 changes: 11 additions & 4 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ func (s *ProxyStore) Rules(req *storepb.RulesRequest, srv storepb.Rules_RulesSer
groups []*storepb.RuleGroup
)

defer func() { close(respChan) }()

for _, rulesClient := range s.rules() {
rs := &rulesStream{
client: rulesClient,
Expand All @@ -77,6 +75,11 @@ func (s *ProxyStore) Rules(req *storepb.RulesRequest, srv storepb.Rules_RulesSer
g.Go(func() error { return rs.receive(gctx) })
}

go func() {
_ = g.Wait()
close(respChan)
}()

for resp := range respChan {
groups = append(groups, resp)
}
Expand Down Expand Up @@ -122,7 +125,7 @@ func dedupGroups(groups []*storepb.RuleGroup) []*storepb.RuleGroup {
type rulesStream struct {
client storepb.RulesClient
request *storepb.RulesRequest
channel chan *storepb.RuleGroup
channel chan<- *storepb.RuleGroup
server storepb.Rules_RulesServer
}

Expand Down Expand Up @@ -167,7 +170,11 @@ func (stream *rulesStream) receive(ctx context.Context) error {
continue
}

stream.channel <- rule.GetGroup()
select {
case stream.channel <- rule.GetGroup():
case <-ctx.Done():
return ctx.Err()
}
}
}

Expand Down
6 changes: 5 additions & 1 deletion test/e2e/e2ethanos/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func NewPrometheusWithSidecar(sharedDir string, netName string, name string, con
return prom, sidecar, nil
}

func NewQuerier(sharedDir string, name string, storeAddresses []string, fileSDStoreAddresses []string) (*Service, error) {
func NewQuerier(sharedDir string, name string, storeAddresses, fileSDStoreAddresses, ruleAddresses []string) (*Service, error) {
const replicaLabel = "replica"

args := e2e.BuildArgs(map[string]string{
Expand All @@ -115,6 +115,10 @@ func NewQuerier(sharedDir string, name string, storeAddresses []string, fileSDSt
args = append(args, "--store="+addr)
}

for _, addr := range ruleAddresses {
args = append(args, "--rule="+addr)
}

if len(fileSDStoreAddresses) > 0 {
queryFileSDDir := filepath.Join(sharedDir, "data", "querier", name)
container := filepath.Join(e2e.ContainerSharedDir, "data", "querier", name)
Expand Down
80 changes: 75 additions & 5 deletions test/e2e/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@ import (
"context"
"fmt"
"net/url"
"os"
"path/filepath"
"sort"
"testing"
"time"

"github.com/thanos-io/thanos/pkg/store/storepb"

"github.com/cortexproject/cortex/integration/e2e"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
Expand All @@ -27,7 +31,7 @@ const queryUpWithoutInstance = "sum(up) without (instance)"
// * expose 2 external labels, source and replica.
// * scrape fake target. This will produce up == 0 metric which we can assert on.
// * optionally remote write endpoint to write into.
func defaultPromConfig(name string, replica int, remoteWriteEndpoint string) string {
func defaultPromConfig(name string, replica int, remoteWriteEndpoint, ruleFile string) string {
config := fmt.Sprintf(`
global:
external_labels:
Expand All @@ -53,6 +57,15 @@ remote_write:
max_backoff: 10s
`, config, remoteWriteEndpoint)
}

if ruleFile != "" {
config = fmt.Sprintf(`
%s
rule_files:
- "%s"
`, config, ruleFile)
}

return config
}

Expand All @@ -73,13 +86,13 @@ func TestQuery(t *testing.T) {
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(receiver))

prom1, sidecar1, err := e2ethanos.NewPrometheusWithSidecar(s.SharedDir(), "e2e_test_query", "alone", defaultPromConfig("prom-alone", 0, ""), e2ethanos.DefaultPrometheusImage())
prom1, sidecar1, err := e2ethanos.NewPrometheusWithSidecar(s.SharedDir(), "e2e_test_query", "alone", defaultPromConfig("prom-alone", 0, "", ""), e2ethanos.DefaultPrometheusImage())
testutil.Ok(t, err)
prom2, sidecar2, err := e2ethanos.NewPrometheusWithSidecar(s.SharedDir(), "e2e_test_query", "remote-and-sidecar", defaultPromConfig("prom-both-remote-write-and-sidecar", 1234, e2ethanos.RemoteWriteEndpoint(receiver.NetworkEndpoint(81))), e2ethanos.DefaultPrometheusImage())
prom2, sidecar2, err := e2ethanos.NewPrometheusWithSidecar(s.SharedDir(), "e2e_test_query", "remote-and-sidecar", defaultPromConfig("prom-both-remote-write-and-sidecar", 1234, e2ethanos.RemoteWriteEndpoint(receiver.NetworkEndpoint(81)), ""), e2ethanos.DefaultPrometheusImage())
testutil.Ok(t, err)
prom3, sidecar3, err := e2ethanos.NewPrometheusWithSidecar(s.SharedDir(), "e2e_test_query", "ha1", defaultPromConfig("prom-ha", 0, ""), e2ethanos.DefaultPrometheusImage())
prom3, sidecar3, err := e2ethanos.NewPrometheusWithSidecar(s.SharedDir(), "e2e_test_query", "ha1", defaultPromConfig("prom-ha", 0, "", filepath.Join(e2e.ContainerSharedDir, "", "*.yaml")), e2ethanos.DefaultPrometheusImage())
testutil.Ok(t, err)
prom4, sidecar4, err := e2ethanos.NewPrometheusWithSidecar(s.SharedDir(), "e2e_test_query", "ha2", defaultPromConfig("prom-ha", 1, ""), e2ethanos.DefaultPrometheusImage())
prom4, sidecar4, err := e2ethanos.NewPrometheusWithSidecar(s.SharedDir(), "e2e_test_query", "ha2", defaultPromConfig("prom-ha", 1, "", filepath.Join(e2e.ContainerSharedDir, "", "*.yaml")), e2ethanos.DefaultPrometheusImage())
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(prom1, sidecar1, prom2, sidecar2, prom3, sidecar3, prom4, sidecar4))

Expand All @@ -88,6 +101,7 @@ func TestQuery(t *testing.T) {
s.SharedDir(), "1",
[]string{sidecar1.GRPCNetworkEndpoint(), sidecar2.GRPCNetworkEndpoint(), receiver.GRPCNetworkEndpoint()},
[]string{sidecar3.GRPCNetworkEndpoint(), sidecar4.GRPCNetworkEndpoint()},
nil,
)
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(q))
Expand Down Expand Up @@ -152,6 +166,39 @@ func TestQuery(t *testing.T) {
})
}

func TestRulesFanout(t *testing.T) {
t.Parallel()

netName := "e2e_test_rules_fanout"

s, err := e2e.NewScenario(netName)
testutil.Ok(t, err)
defer s.Close()

rulesSubDir := filepath.Join("rules")
testutil.Ok(t, os.MkdirAll(filepath.Join(s.SharedDir(), rulesSubDir), os.ModePerm))
createRuleFiles(t, filepath.Join(s.SharedDir(), rulesSubDir))

prom1, sidecar1, err := e2ethanos.NewPrometheusWithSidecar(s.SharedDir(), netName, "alone", defaultPromConfig("prom-alone", 0, "", filepath.Join(e2e.ContainerSharedDir, rulesSubDir, "*.yaml")), e2ethanos.DefaultPrometheusImage())
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(prom1, sidecar1))

q, err := e2ethanos.NewQuerier(s.SharedDir(), "1",
[]string{sidecar1.GRPCNetworkEndpoint()},
nil,
[]string{sidecar1.GRPCNetworkEndpoint()},
)
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(q))

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()

testutil.Ok(t, q.WaitSumMetrics(e2e.Equals(1), "thanos_store_nodes_grpc_connections"))

ruleAndAssert(t, ctx, q.HTTPEndpoint(), "", 1)
}

func urlParse(t *testing.T, addr string) *url.URL {
u, err := url.Parse(addr)
testutil.Ok(t, err)
Expand Down Expand Up @@ -191,3 +238,26 @@ func queryAndAssert(t *testing.T, ctx context.Context, addr string, query string
testutil.Equals(t, exp, result[i].Metric)
}
}

func ruleAndAssert(t *testing.T, ctx context.Context, addr string, typ string, expectedLen int) {
t.Helper()

fmt.Println("ruleAndAssert: Waiting for", expectedLen, "results for rules type", typ)
var result []*storepb.RuleGroup
testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error {
res, err := promclient.NewDefaultClient().RulesInGRPC(ctx, urlParse(t, "http://"+addr), typ)
if err != nil {
return err
}

if len(result) != len(res) {
fmt.Println("ruleAndAssert: New result:", res)
}

if len(res) != expectedLen {
return errors.Errorf("unexpected result size, expected %d; result: %v", expectedLen, res)
}
result = res
return nil
}))
}
16 changes: 8 additions & 8 deletions test/e2e/receive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ func TestReceive(t *testing.T) {
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(r1, r2, r3))

prom1, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "1", defaultPromConfig("prom1", 0, e2ethanos.RemoteWriteEndpoint(r1.NetworkEndpoint(81))), e2ethanos.DefaultPrometheusImage())
prom1, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "1", defaultPromConfig("prom1", 0, e2ethanos.RemoteWriteEndpoint(r1.NetworkEndpoint(81)), ""), e2ethanos.DefaultPrometheusImage())
testutil.Ok(t, err)
prom2, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "2", defaultPromConfig("prom2", 0, e2ethanos.RemoteWriteEndpoint(r2.NetworkEndpoint(81))), e2ethanos.DefaultPrometheusImage())
prom2, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "2", defaultPromConfig("prom2", 0, e2ethanos.RemoteWriteEndpoint(r2.NetworkEndpoint(81)), ""), e2ethanos.DefaultPrometheusImage())
testutil.Ok(t, err)
prom3, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "3", defaultPromConfig("prom3", 0, e2ethanos.RemoteWriteEndpoint(r3.NetworkEndpoint(81))), e2ethanos.DefaultPrometheusImage())
prom3, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "3", defaultPromConfig("prom3", 0, e2ethanos.RemoteWriteEndpoint(r3.NetworkEndpoint(81)), ""), e2ethanos.DefaultPrometheusImage())
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(prom1, prom2, prom3))

q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r1.GRPCNetworkEndpoint(), r2.GRPCNetworkEndpoint(), r3.GRPCNetworkEndpoint()}, nil)
q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r1.GRPCNetworkEndpoint(), r2.GRPCNetworkEndpoint(), r3.GRPCNetworkEndpoint()}, nil, nil)
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(q))

Expand Down Expand Up @@ -132,11 +132,11 @@ func TestReceive(t *testing.T) {
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(r1, r2, r3))

prom1, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "1", defaultPromConfig("prom1", 0, e2ethanos.RemoteWriteEndpoint(r1.NetworkEndpoint(81))), e2ethanos.DefaultPrometheusImage())
prom1, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "1", defaultPromConfig("prom1", 0, e2ethanos.RemoteWriteEndpoint(r1.NetworkEndpoint(81)), ""), e2ethanos.DefaultPrometheusImage())
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(prom1))

q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r1.GRPCNetworkEndpoint(), r2.GRPCNetworkEndpoint(), r3.GRPCNetworkEndpoint()}, nil)
q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r1.GRPCNetworkEndpoint(), r2.GRPCNetworkEndpoint(), r3.GRPCNetworkEndpoint()}, nil, nil)
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(q))

Expand Down Expand Up @@ -201,11 +201,11 @@ func TestReceive(t *testing.T) {
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(r1, r2))

prom1, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "1", defaultPromConfig("prom1", 0, e2ethanos.RemoteWriteEndpoint(r1.NetworkEndpoint(81))), e2ethanos.DefaultPrometheusImage())
prom1, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "1", defaultPromConfig("prom1", 0, e2ethanos.RemoteWriteEndpoint(r1.NetworkEndpoint(81)), ""), e2ethanos.DefaultPrometheusImage())
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(prom1))

q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r1.GRPCNetworkEndpoint(), r2.GRPCNetworkEndpoint()}, nil)
q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r1.GRPCNetworkEndpoint(), r2.GRPCNetworkEndpoint()}, nil, nil)
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(q))

Expand Down
6 changes: 3 additions & 3 deletions test/e2e/rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func TestRule_AlertmanagerHTTPClient(t *testing.T) {
{
EndpointsConfig: http_util.EndpointsConfig{
StaticAddresses: func() []string {
q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", nil, nil)
q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", nil, nil, nil)
testutil.Ok(t, err)
return []string{q.NetworkHTTPEndpointFor(s.NetworkName())}
}(),
Expand All @@ -246,7 +246,7 @@ func TestRule_AlertmanagerHTTPClient(t *testing.T) {
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(r))

q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r.GRPCNetworkEndpoint()}, nil)
q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r.GRPCNetworkEndpoint()}, nil, nil)
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(q))

Expand Down Expand Up @@ -322,7 +322,7 @@ func TestRule(t *testing.T) {
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(r))

q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r.GRPCNetworkEndpoint()}, nil)
q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r.GRPCNetworkEndpoint()}, nil, nil)
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(q))

Expand Down
4 changes: 1 addition & 3 deletions test/e2e/store_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,7 @@ func TestStoreGateway(t *testing.T) {
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(s1))

q, err := e2ethanos.NewQuerier(
s.SharedDir(), "1",
[]string{s1.GRPCNetworkEndpoint()}, nil)
q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{s1.GRPCNetworkEndpoint()}, nil, nil)
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(q))

Expand Down

0 comments on commit c92ed81

Please sign in to comment.