Skip to content

Commit

Permalink
Ruler: optimised <prefix>/api/v1/rules and <prefix>/api/v1/alerts (co…
Browse files Browse the repository at this point in the history
…rtexproject#3916)

* Use a grpc clients pool in the ruler

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Concurrently fetch rules from all rulers

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Added subservices manager

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Fixed Rules() grpc call

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Added integration test

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Added CHANGELOG entry

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Addressed review comments

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Fixed CHANGELOG

Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci authored and harry671003 committed Mar 11, 2021
1 parent d27b2d0 commit 2c28f24
Show file tree
Hide file tree
Showing 8 changed files with 345 additions and 42 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

## master / unreleased

* [ENHANCEMENT] Ruler: optimized `<prefix>/api/v1/rules` and `<prefix>/api/v1/alerts` when ruler sharding is enabled. #3916
* [ENHANCEMENT] Ruler: added the following metrics when ruler sharding is enabled: #3916
* `cortex_ruler_clients`
* `cortex_ruler_client_request_duration_seconds`

## 1.8.0 in progress

* [CHANGE] Alertmanager: Don't expose cluster information to tenants via the `/alertmanager/api/v1/status` API endpoint when operating with clustering enabled.
Expand Down
8 changes: 8 additions & 0 deletions integration/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,14 @@ var (
}
}

RulerShardingFlags = func(consulAddress string) map[string]string {
return map[string]string{
"-ruler.enable-sharding": "true",
"-ruler.ring.store": "consul",
"-ruler.ring.consul.hostname": consulAddress,
}
}

BlocksStorageFlags = func() map[string]string {
return map[string]string{
"-store.engine": blocksStorageEngine,
Expand Down
50 changes: 47 additions & 3 deletions integration/e2ecortex/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/prometheus/prometheus/pkg/rulefmt"
"github.com/prometheus/prometheus/prompb"
yaml "gopkg.in/yaml.v3"

"github.com/cortexproject/cortex/pkg/ruler"
)

var (
Expand Down Expand Up @@ -213,7 +215,49 @@ type ServerStatus struct {
} `json:"data"`
}

// GetRuleGroups gets the status of an alertmanager instance
// GetPrometheusRules fetches the rules from the Prometheus endpoint /api/v1/rules.
func (c *Client) GetPrometheusRules() ([]*ruler.RuleGroup, error) {
// Create HTTP request
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/api/prom/api/v1/rules", c.rulerAddress), nil)
if err != nil {
return nil, err
}
req.Header.Set("X-Scope-OrgID", c.orgID)

ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()

// Execute HTTP request
res, err := c.httpClient.Do(req.WithContext(ctx))
if err != nil {
return nil, err
}
defer res.Body.Close()

body, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, err
}

// Decode the response.
type response struct {
Status string `json:"status"`
Data ruler.RuleDiscovery `json:"data"`
}

decoded := &response{}
if err := json.Unmarshal(body, decoded); err != nil {
return nil, err
}

if decoded.Status != "success" {
return nil, fmt.Errorf("unexpected response status '%s'", decoded.Status)
}

return decoded.Data.RuleGroups, nil
}

// GetRuleGroups gets the configured rule groups from the ruler.
func (c *Client) GetRuleGroups() (map[string][]rulefmt.RuleGroup, error) {
// Create HTTP request
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/api/prom/rules", c.rulerAddress), nil)
Expand Down Expand Up @@ -247,7 +291,7 @@ func (c *Client) GetRuleGroups() (map[string][]rulefmt.RuleGroup, error) {
return rgs, nil
}

// SetRuleGroup gets the status of an alertmanager instance
// SetRuleGroup configures the provided rulegroup to the ruler.
func (c *Client) SetRuleGroup(rulegroup rulefmt.RuleGroup, namespace string) error {
// Create write request
data, err := yaml.Marshal(rulegroup)
Expand Down Expand Up @@ -277,7 +321,7 @@ func (c *Client) SetRuleGroup(rulegroup rulefmt.RuleGroup, namespace string) err
return nil
}

// DeleteRuleGroup gets the status of an alertmanager instance
// DeleteRuleGroup deletes a rule group.
func (c *Client) DeleteRuleGroup(namespace string, groupName string) error {
// Create HTTP request
req, err := http.NewRequest("DELETE", fmt.Sprintf("http://%s/api/prom/rules/%s/%s", c.rulerAddress, url.PathEscape(namespace), url.PathEscape(groupName)), nil)
Expand Down
82 changes: 82 additions & 0 deletions integration/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"testing"
"time"
Expand All @@ -20,6 +21,7 @@ import (
"github.com/prometheus/prometheus/pkg/rulefmt"
"github.com/prometheus/prometheus/pkg/value"
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"

Expand Down Expand Up @@ -301,7 +303,87 @@ func TestRulerEvaluationDelay(t *testing.T) {
}
}
require.Equal(t, len(series.Samples), inputPos, "expect to have returned all evaluations")
}

func TestRulerSharding(t *testing.T) {
const numRulesGroups = 100

s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Generate multiple rule groups, with 1 rule each.
ruleGroups := make([]rulefmt.RuleGroup, numRulesGroups)
expectedNames := make([]string, numRulesGroups)
for i := 0; i < numRulesGroups; i++ {
var recordNode yaml.Node
var exprNode yaml.Node

recordNode.SetString(fmt.Sprintf("rule_%d", i))
exprNode.SetString(strconv.Itoa(i))
ruleName := fmt.Sprintf("test_%d", i)

expectedNames[i] = ruleName
ruleGroups[i] = rulefmt.RuleGroup{
Name: ruleName,
Interval: 60,
Rules: []rulefmt.RuleNode{{
Record: recordNode,
Expr: exprNode,
}},
}
}

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, rulestoreBucketName)
require.NoError(t, s.StartAndWaitReady(consul, minio))

// Configure the ruler.
rulerFlags := mergeFlags(
BlocksStorageFlags(),
RulerFlags(false),
RulerShardingFlags(consul.NetworkHTTPEndpoint()),
map[string]string{
// Since we're not going to run any rule, we don't need the
// store-gateway to be configured to a valid address.
"-querier.store-gateway-addresses": "localhost:12345",
// Enable the bucket index so we can skip the initial bucket scan.
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
},
)

// Start rulers.
ruler1 := e2ecortex.NewRuler("ruler-1", rulerFlags, "")
ruler2 := e2ecortex.NewRuler("ruler-2", rulerFlags, "")
rulers := e2ecortex.NewCompositeCortexService(ruler1, ruler2)
require.NoError(t, s.StartAndWaitReady(ruler1, ruler2))

// Upload rule groups to one of the rulers.
c, err := e2ecortex.NewClient("", "", "", ruler1.HTTPEndpoint(), "user-1")
require.NoError(t, err)

for _, ruleGroup := range ruleGroups {
require.NoError(t, c.SetRuleGroup(ruleGroup, "test"))
}

// Wait until rulers have loaded all rules.
require.NoError(t, rulers.WaitSumMetricsWithOptions(e2e.Equals(numRulesGroups), []string{"cortex_prometheus_rule_group_rules"}, e2e.WaitMissingMetrics))

// Since rulers have loaded all rules, we expect that rules have been sharded
// between the two rulers.
require.NoError(t, ruler1.WaitSumMetrics(e2e.Less(numRulesGroups), "cortex_prometheus_rule_group_rules"))
require.NoError(t, ruler2.WaitSumMetrics(e2e.Less(numRulesGroups), "cortex_prometheus_rule_group_rules"))

// Fetch the rules and ensure they match the configured ones.
actualGroups, err := c.GetPrometheusRules()
require.NoError(t, err)

var actualNames []string
for _, group := range actualGroups {
actualNames = append(actualNames, group.Name)
}
assert.ElementsMatch(t, expectedNames, actualNames)
}

func TestRulerAlertmanager(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ruler/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"encoding/json"
"errors"
io "io"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
Expand Down
79 changes: 79 additions & 0 deletions pkg/ruler/client_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package ruler

import (
"time"

"github.com/go-kit/kit/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/cortexproject/cortex/pkg/ring/client"
"github.com/cortexproject/cortex/pkg/util/grpcclient"
)

func newRulerClientPool(clientCfg grpcclient.Config, logger log.Logger, reg prometheus.Registerer) *client.Pool {
// We prefer sane defaults instead of exposing further config options.
poolCfg := client.PoolConfig{
CheckInterval: time.Minute,
HealthCheckEnabled: true,
HealthCheckTimeout: 10 * time.Second,
}

clientsCount := promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "cortex_ruler_clients",
Help: "The current number of ruler clients in the pool.",
})

return client.NewPool("ruler", poolCfg, nil, newRulerClientFactory(clientCfg, reg), clientsCount, logger)
}

func newRulerClientFactory(clientCfg grpcclient.Config, reg prometheus.Registerer) client.PoolFactory {
requestDuration := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_ruler_client_request_duration_seconds",
Help: "Time spent executing requests to the ruler.",
Buckets: prometheus.ExponentialBuckets(0.008, 4, 7),
}, []string{"operation", "status_code"})

return func(addr string) (client.PoolClient, error) {
return dialRulerClient(clientCfg, addr, requestDuration)
}
}

func dialRulerClient(clientCfg grpcclient.Config, addr string, requestDuration *prometheus.HistogramVec) (*rulerExtendedClient, error) {
opts, err := clientCfg.DialOption(grpcclient.Instrument(requestDuration))
if err != nil {
return nil, err
}

conn, err := grpc.Dial(addr, opts...)
if err != nil {
return nil, errors.Wrapf(err, "failed to dial ruler %s", addr)
}

return &rulerExtendedClient{
RulerClient: NewRulerClient(conn),
HealthClient: grpc_health_v1.NewHealthClient(conn),
conn: conn,
}, nil
}

type rulerExtendedClient struct {
RulerClient
grpc_health_v1.HealthClient
conn *grpc.ClientConn
}

func (c *rulerExtendedClient) Close() error {
return c.conn.Close()
}

func (c *rulerExtendedClient) String() string {
return c.RemoteAddress()
}

func (c *rulerExtendedClient) RemoteAddress() string {
return c.conn.Target()
}
68 changes: 68 additions & 0 deletions pkg/ruler/client_pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package ruler

import (
"context"
"net"
"testing"

"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"
"google.golang.org/grpc"

"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/grpcclient"
)

func Test_newRulerClientFactory(t *testing.T) {
// Create a GRPC server used to query the mocked service.
grpcServer := grpc.NewServer()
defer grpcServer.GracefulStop()

srv := &mockRulerServer{}
RegisterRulerServer(grpcServer, srv)

listener, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)

go func() {
require.NoError(t, grpcServer.Serve(listener))
}()

// Create a client factory and query back the mocked service
// with different clients.
cfg := grpcclient.Config{}
flagext.DefaultValues(&cfg)

reg := prometheus.NewPedanticRegistry()
factory := newRulerClientFactory(cfg, reg)

for i := 0; i < 2; i++ {
client, err := factory(listener.Addr().String())
require.NoError(t, err)
defer client.Close() //nolint:errcheck

ctx := user.InjectOrgID(context.Background(), "test")
_, err = client.(*rulerExtendedClient).Rules(ctx, &RulesRequest{})
assert.NoError(t, err)
}

// Assert on the request duration metric, but since it's a duration histogram and
// we can't predict the exact time it took, we need to workaround it.
metrics, err := reg.Gather()
require.NoError(t, err)

assert.Len(t, metrics, 1)
assert.Equal(t, "cortex_ruler_client_request_duration_seconds", metrics[0].GetName())
assert.Equal(t, dto.MetricType_HISTOGRAM, metrics[0].GetType())
assert.Len(t, metrics[0].GetMetric(), 1)
assert.Equal(t, uint64(2), metrics[0].GetMetric()[0].GetHistogram().GetSampleCount())
}

type mockRulerServer struct{}

func (m *mockRulerServer) Rules(context.Context, *RulesRequest) (*RulesResponse, error) {
return &RulesResponse{}, nil
}
Loading

0 comments on commit 2c28f24

Please sign in to comment.