Skip to content

Commit

Permalink
Feature/ruler (take 2) (#2458)
Browse files Browse the repository at this point in the history
* begins speccing out ruler

ruler memhistory encapsulates metricsHistory

removes mutex from memhistory

ruler module

* upstream conflicts

* implicit ast impls, parser for ruler

* /api/prom ruler routes, ruler enabled in single binary

* registers ruler flags, doesnt double instantiate metrics

* cleanup for old samples in ruler

* begins ruler tests

* ForStateAppenderQuerier tests

* memhistory stop

* RestoreForState test

* upstream querier ifc

* introducing loki ruler metrics

* removes rule granularity metric -- to be discussed in pr

* validates ruler cfg

* renames gauge metrics to not use total

* removes unnecessary logs

* logs synthetic restoreforstate

* logs tenant in ruler

* sets cortex to owen's unmerged fork

* begins porting rules pkg

* memstore work

* work on queryable based in memory series store

* removes unused pkgs, adds memstore test

* MemStore must be started after construction

* MemstoreTenantManager

* ruler loading

* ruler instantiation

* better metrics & logging in ruler

* grpc cortex compatibility in go.mod

* cortex vendoring compat

* increments memory cache hits only if cached

* loki in memory metrics use prometheus default registerer

* ruler only depends on ring

* managerfactory rename

* revendors cortex

* ignore emacs stashing

* adds comments

* ruler /loki/api/v1 prefix

* revendoring compat

* comment
  • Loading branch information
owen-d authored Aug 25, 2020
1 parent dab393a commit aea6c3a
Show file tree
Hide file tree
Showing 9 changed files with 868 additions and 189 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,6 @@ dist
coverage.txt
.DS_Store
.aws-sam

# emacs
.#*
188 changes: 13 additions & 175 deletions go.sum

Large diffs are not rendered by default.

24 changes: 10 additions & 14 deletions pkg/logql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ type QueryParams interface {
GetShards() []string
}

// implicit holds default implementations
type implicit struct{}

func (implicit) logQLExpr() {}

// SelectParams specifies parameters passed to data selections.
type SelectLogParams struct {
*logproto.QueryRequest
Expand Down Expand Up @@ -75,6 +80,7 @@ type LogSelectorExpr interface {

type matchersExpr struct {
matchers []*labels.Matcher
implicit
}

func newMatcherExpr(matchers []*labels.Matcher) LogSelectorExpr {
Expand Down Expand Up @@ -102,13 +108,11 @@ func (e *matchersExpr) Filter() (LineFilter, error) {
return nil, nil
}

// impl Expr
func (e *matchersExpr) logQLExpr() {}

type filterExpr struct {
left LogSelectorExpr
ty labels.MatchType
match string
implicit
}

// NewFilterExpr wraps an existing Expr with a next filter expression.
Expand Down Expand Up @@ -163,9 +167,6 @@ func (e *filterExpr) Filter() (LineFilter, error) {
return f, nil
}

// impl Expr
func (e *filterExpr) logQLExpr() {}

func mustNewMatcher(t labels.MatchType, n, v string) *labels.Matcher {
m, err := labels.NewMatcher(t, n, v)
if err != nil {
Expand Down Expand Up @@ -275,6 +276,7 @@ type SampleExpr interface {
type rangeAggregationExpr struct {
left *logRange
operation string
implicit
}

func newRangeAggregationExpr(left *logRange, operation string) SampleExpr {
Expand All @@ -288,9 +290,6 @@ func (e *rangeAggregationExpr) Selector() LogSelectorExpr {
return e.left.left
}

// impl Expr
func (e *rangeAggregationExpr) logQLExpr() {}

// impls Stringer
func (e *rangeAggregationExpr) String() string {
return formatOperation(e.operation, nil, e.left.String())
Expand Down Expand Up @@ -330,6 +329,7 @@ type vectorAggregationExpr struct {
grouping *grouping
params int
operation string
implicit
}

func mustNewVectorAggregationExpr(left SampleExpr, operation string, gr *grouping, params *string) SampleExpr {
Expand Down Expand Up @@ -368,9 +368,6 @@ func (e *vectorAggregationExpr) Extractor() (SampleExtractor, error) {
return e.left.Extractor()
}

// impl Expr
func (e *vectorAggregationExpr) logQLExpr() {}

func (e *vectorAggregationExpr) String() string {
var params []string
if e.params != 0 {
Expand Down Expand Up @@ -479,6 +476,7 @@ func reduceBinOp(op string, left, right *literalExpr) *literalExpr {

type literalExpr struct {
value float64
implicit
}

func mustNewLiteralExpr(s string, invert bool) *literalExpr {
Expand All @@ -496,8 +494,6 @@ func mustNewLiteralExpr(s string, invert bool) *literalExpr {
}
}

func (e *literalExpr) logQLExpr() {}

func (e *literalExpr) String() string {
return fmt.Sprintf("%f", e.value)
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"github.com/cortexproject/cortex/pkg/querier/frontend"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ring/kv/memberlist"
cortex_ruler "github.com/cortexproject/cortex/pkg/ruler"
"github.com/cortexproject/cortex/pkg/ruler/rules"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/runtimeconfig"
"github.com/cortexproject/cortex/pkg/util/services"
Expand All @@ -35,6 +37,7 @@ import (
"github.com/grafana/loki/pkg/lokifrontend"
"github.com/grafana/loki/pkg/querier"
"github.com/grafana/loki/pkg/querier/queryrange"
"github.com/grafana/loki/pkg/ruler"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/tracing"
serverutil "github.com/grafana/loki/pkg/util/server"
Expand All @@ -59,6 +62,7 @@ type Config struct {
TableManager chunk.TableManagerConfig `yaml:"table_manager,omitempty"`
Worker frontend.WorkerConfig `yaml:"frontend_worker,omitempty"`
Frontend lokifrontend.Config `yaml:"frontend,omitempty"`
Ruler ruler.Config `yaml:"ruler,omitempty"`
QueryRange queryrange.Config `yaml:"query_range,omitempty"`
RuntimeConfig runtimeconfig.ManagerConfig `yaml:"runtime_config,omitempty"`
MemberlistKV memberlist.KVConfig `yaml:"memberlist"`
Expand All @@ -85,6 +89,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
c.LimitsConfig.RegisterFlags(f)
c.TableManager.RegisterFlags(f)
c.Frontend.RegisterFlags(f)
c.Ruler.RegisterFlags(f)
c.Worker.RegisterFlags(f)
c.QueryRange.RegisterFlags(f)
c.RuntimeConfig.RegisterFlags(f)
Expand Down Expand Up @@ -115,6 +120,9 @@ func (c *Config) Validate(log log.Logger) error {
if err := c.TableManager.Validate(); err != nil {
return errors.Wrap(err, "invalid tablemanager config")
}
if err := c.Ruler.Validate(); err != nil {
return errors.Wrap(err, "invalid ruler config")
}
return nil
}

Expand All @@ -135,6 +143,8 @@ type Loki struct {
store storage.Store
tableManager *chunk.TableManager
frontend *frontend.Frontend
ruler *cortex_ruler.Ruler
RulerStorage rules.RuleStore
stopper queryrange.Stopper
runtimeConfig *runtimeconfig.Manager
memberlistKV *memberlist.KVInitService
Expand Down Expand Up @@ -308,6 +318,8 @@ func (t *Loki) setupModuleManager() error {
mm.RegisterModule(Ingester, t.initIngester)
mm.RegisterModule(Querier, t.initQuerier)
mm.RegisterModule(QueryFrontend, t.initQueryFrontend)
mm.RegisterModule(RulerStorage, t.initRulerStorage, modules.UserInvisibleModule)
mm.RegisterModule(Ruler, t.initRuler)
mm.RegisterModule(TableManager, t.initTableManager)
mm.RegisterModule(Compactor, t.initCompactor)
mm.RegisterModule(All, nil)
Expand All @@ -321,6 +333,7 @@ func (t *Loki) setupModuleManager() error {
Ingester: {Store, Server, MemberlistKV},
Querier: {Store, Ring, Server},
QueryFrontend: {Server, Overrides},
Ruler: {Ring, Server, Store, RulerStorage},
TableManager: {Server},
Compactor: {Server},
All: {Querier, Ingester, Distributor, TableManager},
Expand Down
71 changes: 71 additions & 0 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ring/kv/codec"
"github.com/cortexproject/cortex/pkg/ring/kv/memberlist"
cortex_ruler "github.com/cortexproject/cortex/pkg/ruler"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/runtimeconfig"
"github.com/cortexproject/cortex/pkg/util/services"
Expand All @@ -34,8 +35,10 @@ import (
"github.com/grafana/loki/pkg/distributor"
"github.com/grafana/loki/pkg/ingester"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/querier"
"github.com/grafana/loki/pkg/querier/queryrange"
"github.com/grafana/loki/pkg/ruler"
loki_storage "github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/stores/shipper"
serverutil "github.com/grafana/loki/pkg/util/server"
Expand All @@ -54,6 +57,8 @@ const (
Ingester string = "ingester"
Querier string = "querier"
QueryFrontend string = "query-frontend"
RulerStorage string = "ruler-storage"
Ruler string = "ruler"
Store string = "store"
TableManager string = "table-manager"
MemberlistKV string = "memberlist-kv"
Expand Down Expand Up @@ -353,6 +358,72 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
}), nil
}

func (t *Loki) initRulerStorage() (_ services.Service, err error) {
// if the ruler is not configured and we're in single binary then let's just log an error and continue.
// unfortunately there is no way to generate a "default" config and compare default against actual
// to determine if it's unconfigured. the following check, however, correctly tests this.
// Single binary integration tests will break if this ever drifts
if t.cfg.Target == All && t.cfg.Ruler.StoreConfig.IsDefaults() {
level.Info(util.Logger).Log("msg", "RulerStorage is not configured in single binary mode and will not be started.")
return
}

t.RulerStorage, err = cortex_ruler.NewRuleStorage(t.cfg.Ruler.StoreConfig)

return
}

func (t *Loki) initRuler() (_ services.Service, err error) {
if t.RulerStorage == nil {
level.Info(util.Logger).Log("msg", "RulerStorage is nil. Not starting the ruler.")
return nil, nil
}

t.cfg.Ruler.Ring.ListenPort = t.cfg.Server.GRPCListenPort
t.cfg.Ruler.Ring.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV
q, err := querier.New(t.cfg.Querier, t.cfg.IngesterClient, t.ring, t.store, t.overrides)
if err != nil {
return nil, err
}

engine := logql.NewEngine(t.cfg.Querier.Engine, q)

t.ruler, err = ruler.NewRuler(
t.cfg.Ruler,
engine,
prometheus.DefaultRegisterer,
util.Logger,
t.RulerStorage,
)

if err != nil {
return
}

// Expose HTTP endpoints.
if t.cfg.Ruler.EnableAPI {

t.server.HTTP.Handle("/ruler/ring", t.ruler)
cortex_ruler.RegisterRulerServer(t.server.GRPC, t.ruler)

// Ruler Legacy API Routes
t.server.HTTP.Path("/api/prom/rules").Methods("GET").Handler(t.httpAuthMiddleware.Wrap(http.HandlerFunc(t.ruler.ListRules)))
t.server.HTTP.Path("/api/prom/rules/{namespace}").Methods("GET").Handler(t.httpAuthMiddleware.Wrap(http.HandlerFunc(t.ruler.ListRules)))
t.server.HTTP.Path("/api/prom/rules/{namespace}/{groupName}").Methods("GET").Handler(t.httpAuthMiddleware.Wrap(http.HandlerFunc(t.ruler.GetRuleGroup)))
t.server.HTTP.Path("/api/prom/rules/{namespace}").Methods("POST").Handler(t.httpAuthMiddleware.Wrap(http.HandlerFunc(t.ruler.CreateRuleGroup)))
t.server.HTTP.Path("/api/prom/rules/{namespace}/{groupName}").Methods("DELETE").Handler(t.httpAuthMiddleware.Wrap(http.HandlerFunc(t.ruler.DeleteRuleGroup)))

// Ruler API Routes
t.server.HTTP.Path("/loki/api/v1/rules").Methods("GET").Handler(t.httpAuthMiddleware.Wrap(http.HandlerFunc(t.ruler.ListRules)))
t.server.HTTP.Path("/loki/api/v1/rules/{namespace}").Methods("GET").Handler(t.httpAuthMiddleware.Wrap(http.HandlerFunc(t.ruler.ListRules)))
t.server.HTTP.Path("/loki/api/v1/rules/{namespace}/{groupName}").Methods("GET").Handler(t.httpAuthMiddleware.Wrap(http.HandlerFunc(t.ruler.GetRuleGroup)))
t.server.HTTP.Path("/loki/api/v1/rules/{namespace}").Methods("POST").Handler(t.httpAuthMiddleware.Wrap(http.HandlerFunc(t.ruler.CreateRuleGroup)))
t.server.HTTP.Path("/loki/api/v1/rules/{namespace}/{groupName}").Methods("DELETE").Handler(t.httpAuthMiddleware.Wrap(http.HandlerFunc(t.ruler.DeleteRuleGroup)))
}

return t.ruler, nil
}

func (t *Loki) initMemberlistKV() (services.Service, error) {
t.cfg.MemberlistKV.MetricsRegisterer = prometheus.DefaultRegisterer
t.cfg.MemberlistKV.Codecs = []codec.Codec{
Expand Down
123 changes: 123 additions & 0 deletions pkg/ruler/manager/compat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package manager

import (
"context"
"time"

"github.com/cortexproject/cortex/pkg/ruler"
"github.com/go-kit/kit/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/notifier"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/rules"
"github.com/weaveworks/common/user"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
)

// engineQueryFunc returns a new query function using the rules.EngineQueryFunc function
// and passing an altered timestamp.
func engineQueryFunc(engine *logql.Engine, delay time.Duration) rules.QueryFunc {
return rules.QueryFunc(func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
adjusted := t.Add(-delay)
params := logql.NewLiteralParams(
qs,
adjusted,
adjusted,
0,
0,
logproto.FORWARD,
0,
nil,
)
q := engine.Query(params)

res, err := q.Exec(ctx)
if err != nil {
return nil, err
}
switch v := res.Data.(type) {
case promql.Vector:
return v, nil
case promql.Scalar:
return promql.Vector{promql.Sample{
Point: promql.Point(v),
Metric: labels.Labels{},
}}, nil
default:
return nil, errors.New("rule result is not a vector or scalar")
}
})

}

func MemstoreTenantManager(
cfg ruler.Config,
engine *logql.Engine,
) ruler.ManagerFactory {
var metrics *Metrics

return func(
ctx context.Context,
userID string,
notifier *notifier.Manager,
logger log.Logger,
reg prometheus.Registerer,
) *rules.Manager {

// We'll ignore the passed registere and use the default registerer to avoid prefix issues and other weirdness.
// This closure prevents re-registering.
if metrics == nil {
metrics = NewMetrics(prometheus.DefaultRegisterer)
}
logger = log.With(logger, "user", userID)
queryFunc := engineQueryFunc(engine, cfg.EvaluationDelay)
memStore := NewMemStore(userID, queryFunc, metrics, 5*time.Minute, log.With(logger, "subcomponent", "MemStore"))

mgr := rules.NewManager(&rules.ManagerOptions{
Appendable: NoopAppender{},
Queryable: memStore,
QueryFunc: queryFunc,
Context: user.InjectOrgID(ctx, userID),
ExternalURL: cfg.ExternalURL.URL,
NotifyFunc: ruler.SendAlerts(notifier, cfg.ExternalURL.URL.String()),
Logger: logger,
Registerer: reg,
OutageTolerance: cfg.OutageTolerance,
ForGracePeriod: cfg.ForGracePeriod,
ResendDelay: cfg.ResendDelay,
GroupLoader: groupLoader{},
})

// initialize memStore, bound to the manager's alerting rules
memStore.Start(mgr)

return mgr
}
}

type groupLoader struct {
rules.FileLoader // embed the default and override the parse method for logql queries
}

func (groupLoader) Parse(query string) (parser.Expr, error) {
expr, err := logql.ParseExpr(query)
if err != nil {
return nil, err
}

return exprAdapter{expr}, nil
}

// Allows logql expressions to be treated as promql expressions by the prometheus rules pkg.
type exprAdapter struct {
logql.Expr
}

func (exprAdapter) PositionRange() parser.PositionRange { return parser.PositionRange{} }
func (exprAdapter) PromQLExpr() {}
func (exprAdapter) Type() parser.ValueType { return parser.ValueType("unimplemented") }
Loading

0 comments on commit aea6c3a

Please sign in to comment.