Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: allow querier and query frontend targets to run on same process #4301

Merged
merged 7 commits into from
Sep 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ require (
github.com/fluent/fluent-bit-go v0.0.0-20190925192703-ea13c021720c
github.com/fsouza/fake-gcs-server v1.7.0
github.com/go-kit/kit v0.11.0
github.com/go-kit/log v0.1.0
github.com/go-logfmt/logfmt v0.5.0
github.com/go-redis/redis/v8 v8.9.0
github.com/gocql/gocql v0.0.0-20200526081602-cd04bd7f22a7
Expand Down Expand Up @@ -66,6 +67,7 @@ require (
github.com/ncw/swift v1.0.52
github.com/oklog/ulid v1.3.1
github.com/opentracing-contrib/go-grpc v0.0.0-20210225150812-73cb765af46e
github.com/opentracing-contrib/go-stdlib v1.0.0
github.com/opentracing/opentracing-go v1.2.0
// github.com/pierrec/lz4 v2.0.5+incompatible
github.com/pierrec/lz4/v4 v4.1.7
Expand Down
2 changes: 1 addition & 1 deletion pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ func (t *Loki) setupModuleManager() error {
Compactor: {Server, Overrides},
IndexGateway: {Server},
IngesterQuerier: {Ring},
All: {Querier, Ingester, Distributor, TableManager, Ruler},
All: {QueryFrontend, Querier, Ingester, Distributor, TableManager, Ruler},
}

// Add IngesterQuerier as a dependency for store when target is either ingester or querier.
Expand Down
66 changes: 28 additions & 38 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/cortexproject/cortex/pkg/frontend/transport"
"github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb"
"github.com/cortexproject/cortex/pkg/frontend/v2/frontendv2pb"
cortex_querier_worker "github.com/cortexproject/cortex/pkg/querier/worker"
"github.com/cortexproject/cortex/pkg/ring"
cortex_ruler "github.com/cortexproject/cortex/pkg/ruler"
"github.com/cortexproject/cortex/pkg/scheduler"
Expand All @@ -29,7 +28,6 @@ import (
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/thanos/pkg/discovery/dns"
httpgrpc_server "github.com/weaveworks/common/httpgrpc/server"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/server"
"github.com/weaveworks/common/user"
Expand Down Expand Up @@ -190,51 +188,43 @@ func (t *Loki) initDistributor() (services.Service, error) {
}

func (t *Loki) initQuerier() (services.Service, error) {
var (
worker services.Service
err error
)

// NewQuerierWorker now expects Frontend (or Scheduler) address to be set.
if t.Cfg.Worker.FrontendAddress != "" || t.Cfg.Worker.SchedulerAddress != "" {
t.Cfg.Worker.MaxConcurrentRequests = t.Cfg.Querier.MaxConcurrent
level.Debug(util_log.Logger).Log("msg", "initializing querier worker", "config", fmt.Sprintf("%+v", t.Cfg.Worker))
worker, err = cortex_querier_worker.NewQuerierWorker(t.Cfg.Worker, httpgrpc_server.NewServer(t.Server.HTTPServer.Handler), util_log.Logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
}

if t.Cfg.Ingester.QueryStoreMaxLookBackPeriod != 0 {
t.Cfg.Querier.IngesterQueryStoreMaxLookback = t.Cfg.Ingester.QueryStoreMaxLookBackPeriod
}

var err error
t.Querier, err = querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.overrides)
if err != nil {
return nil, err
}

httpMiddleware := middleware.Merge(
serverutil.RecoveryHTTPMiddleware,
t.HTTPAuthMiddleware,
serverutil.NewPrepopulateMiddleware(),
serverutil.ResponseJSONMiddleware(),
querierWorkerServiceConfig := querier.WorkerServiceConfig{
AllEnabled: t.Cfg.isModuleEnabled(All),
GrpcListenPort: t.Cfg.Server.GRPCListenPort,
QuerierMaxConcurrent: t.Cfg.Querier.MaxConcurrent,
QuerierWorkerConfig: &t.Cfg.Worker,
QueryFrontendEnabled: t.Cfg.isModuleEnabled(QueryFrontend),
QuerySchedulerEnabled: t.Cfg.isModuleEnabled(QueryScheduler),
}

var queryHandlers = map[string]http.Handler{
"/loki/api/v1/query_range": http.HandlerFunc(t.Querier.RangeQueryHandler),
"/loki/api/v1/query": http.HandlerFunc(t.Querier.InstantQueryHandler),
"/loki/api/v1/label": http.HandlerFunc(t.Querier.LabelHandler),
"/loki/api/v1/labels": http.HandlerFunc(t.Querier.LabelHandler),
"/loki/api/v1/label/{name}/values": http.HandlerFunc(t.Querier.LabelHandler),
"/loki/api/v1/tail": http.HandlerFunc(t.Querier.TailHandler),
"/loki/api/v1/series": http.HandlerFunc(t.Querier.SeriesHandler),

"/api/prom/query": http.HandlerFunc(t.Querier.LogQueryHandler),
"/api/prom/label": http.HandlerFunc(t.Querier.LabelHandler),
"/api/prom/label/{name}/values": http.HandlerFunc(t.Querier.LabelHandler),
"/api/prom/tail": http.HandlerFunc(t.Querier.TailHandler),
"/api/prom/series": http.HandlerFunc(t.Querier.SeriesHandler),
}
return querier.InitWorkerService(
querierWorkerServiceConfig, queryHandlers, t.Server.HTTP, t.Server.HTTPServer.Handler, t.HTTPAuthMiddleware,
)
t.Server.HTTP.Handle("/loki/api/v1/query_range", httpMiddleware.Wrap(http.HandlerFunc(t.Querier.RangeQueryHandler)))
t.Server.HTTP.Handle("/loki/api/v1/query", httpMiddleware.Wrap(http.HandlerFunc(t.Querier.InstantQueryHandler)))
// Prometheus compatibility requires `loki/api/v1/labels` however we already released `loki/api/v1/label`
// which is a little more consistent with `/loki/api/v1/label/{name}/values` so we are going to handle both paths.
t.Server.HTTP.Handle("/loki/api/v1/label", httpMiddleware.Wrap(http.HandlerFunc(t.Querier.LabelHandler)))
t.Server.HTTP.Handle("/loki/api/v1/labels", httpMiddleware.Wrap(http.HandlerFunc(t.Querier.LabelHandler)))
t.Server.HTTP.Handle("/loki/api/v1/label/{name}/values", httpMiddleware.Wrap(http.HandlerFunc(t.Querier.LabelHandler)))
t.Server.HTTP.Handle("/loki/api/v1/tail", httpMiddleware.Wrap(http.HandlerFunc(t.Querier.TailHandler)))
t.Server.HTTP.Handle("/loki/api/v1/series", httpMiddleware.Wrap(http.HandlerFunc(t.Querier.SeriesHandler)))

t.Server.HTTP.Handle("/api/prom/query", httpMiddleware.Wrap(http.HandlerFunc(t.Querier.LogQueryHandler)))
t.Server.HTTP.Handle("/api/prom/label", httpMiddleware.Wrap(http.HandlerFunc(t.Querier.LabelHandler)))
t.Server.HTTP.Handle("/api/prom/label/{name}/values", httpMiddleware.Wrap(http.HandlerFunc(t.Querier.LabelHandler)))
t.Server.HTTP.Handle("/api/prom/tail", httpMiddleware.Wrap(http.HandlerFunc(t.Querier.TailHandler)))
t.Server.HTTP.Handle("/api/prom/series", httpMiddleware.Wrap(http.HandlerFunc(t.Querier.SeriesHandler)))
return worker, nil // ok if worker is nil here
}

func (t *Loki) initIngester() (_ services.Service, err error) {
Expand Down
146 changes: 146 additions & 0 deletions pkg/querier/worker_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package querier

import (
"fmt"
"net/http"

querier_worker "github.com/cortexproject/cortex/pkg/querier/worker"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/go-kit/log/level"
"github.com/gorilla/mux"
"github.com/grafana/dskit/services"
"github.com/opentracing-contrib/go-stdlib/nethttp"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
httpgrpc_server "github.com/weaveworks/common/httpgrpc/server"
"github.com/weaveworks/common/middleware"

serverutil "github.com/grafana/loki/pkg/util/server"
)

type WorkerServiceConfig struct {
AllEnabled bool
GrpcListenPort int
QuerierMaxConcurrent int
QuerierWorkerConfig *querier_worker.Config
QueryFrontendEnabled bool
QuerySchedulerEnabled bool
}

// InitWorkerService takes a config object, a map of routes to handlers, an external http router and external
// http handler, and an auth middleware wrapper. This function creates an internal HTTP router that responds to all
// the provided query routes/handlers. This router can either be registered with the external Loki HTTP server, or
// be used internally by a querier worker so that it does not conflict with the routes registered by the Query Frontend module.
//
// 1. Query-Frontend Enabled: If Loki has an All or QueryFrontend target, the internal
// HTTP router is wrapped with Tenant ID parsing middleware and passed to the frontend
// worker.
//
// 2. Querier Standalone: The querier will register the internal HTTP router with the external
// HTTP router for the Prometheus API routes. Then the external HTTP server will be passed
// as a http.Handler to the frontend worker.
//
func InitWorkerService(
cfg WorkerServiceConfig,
queryRoutesToHandlers map[string]http.Handler,
externalRouter *mux.Router,
externalHandler http.Handler,
authMiddleware middleware.Interface,
) (serve services.Service, err error) {

internalRouter := mux.NewRouter()
for route, handler := range queryRoutesToHandlers {
internalRouter.Handle(route, handler)
}

// If the querier is running standalone without the query-frontend or query-scheduler, we must register the internal
// HTTP handler externally (as it's the only handler that needs to register on querier routes) and provide the
// external Loki Server HTTP handler to the frontend worker to ensure requests it processes use the default
// middleware instrumentation.
if querierRunningStandalone(cfg) {

// First, register the internal querier handler with the external HTTP server
routes := make([]string, len(queryRoutesToHandlers))
var idx = 0
for route := range queryRoutesToHandlers {
routes[idx] = route
idx++
}

registerRoutesExternally(routes, externalRouter, internalRouter, authMiddleware)

//If no frontend or scheduler address has been configured, then there is no place for the
//querier worker to request work from, so no need to start a worker service
if (*cfg.QuerierWorkerConfig).FrontendAddress == "" && (*cfg.QuerierWorkerConfig).SchedulerAddress == "" {
return nil, nil
}

// If a frontend or scheduler address has been configured, return a querier worker service that uses
// the external Loki Server HTTP server, which has now has the internal handler's routes registered with it
return querier_worker.NewQuerierWorker(
*(cfg.QuerierWorkerConfig), httpgrpc_server.NewServer(externalHandler), util_log.Logger, prometheus.DefaultRegisterer)
}

// Since we must be running a querier with either a frontend and/or scheduler at this point, if no frontend or scheduler address
// is configured, Loki will default to using the frontend on localhost on it's own GRPC listening port.
if (*cfg.QuerierWorkerConfig).FrontendAddress == "" && (*cfg.QuerierWorkerConfig).SchedulerAddress == "" {
address := fmt.Sprintf("127.0.0.1:%d", cfg.GrpcListenPort)
level.Warn(util_log.Logger).Log(
"msg", "Worker address is empty, attempting automatic worker configuration. If queries are unresponsive consider configuring the worker explicitly.",
"address", address)
cfg.QuerierWorkerConfig.FrontendAddress = address
}

// Add a middleware to extract the trace context and add a header.
var internalHandler http.Handler
internalHandler = nethttp.MiddlewareFunc(
opentracing.GlobalTracer(),
internalRouter.ServeHTTP,
nethttp.OperationNameFunc(func(r *http.Request) string {
return "internalQuerier"
}))

// If queries are processed using the external HTTP Server, we need wrap the internal querier with
// HTTP router with middleware to parse the tenant ID from the HTTP header and inject it into the
// request context, as well as make sure any x-www-url-formencoded params are correctly parsed
httpMiddleware := middleware.Merge(
authMiddleware,
serverutil.NewPrepopulateMiddleware(),
)

internalHandler = httpMiddleware.Wrap(internalHandler)

//Querier worker's max concurrent requests must be the same as the querier setting
(*cfg.QuerierWorkerConfig).MaxConcurrentRequests = cfg.QuerierMaxConcurrent

//Return a querier worker pointed to the internal querier HTTP handler so there is not a conflict in routes between the querier
//and the query frontend
return querier_worker.NewQuerierWorker(
*(cfg.QuerierWorkerConfig), httpgrpc_server.NewServer(internalHandler), util_log.Logger, prometheus.DefaultRegisterer)
}

func registerRoutesExternally(routes []string, externalRouter *mux.Router, internalHandler http.Handler, authMiddleware middleware.Interface) {
httpMiddleware := middleware.Merge(
serverutil.RecoveryHTTPMiddleware,
authMiddleware,
serverutil.NewPrepopulateMiddleware(),
serverutil.ResponseJSONMiddleware(),
)

for _, route := range routes {
externalRouter.Handle(route, httpMiddleware.Wrap(internalHandler))
}
}

func querierRunningStandalone(cfg WorkerServiceConfig) bool {
runningStandalone := !cfg.QueryFrontendEnabled && !cfg.QuerySchedulerEnabled && !cfg.AllEnabled
level.Debug(util_log.Logger).Log(
"msg", "determining if querier is running as standalone target",
"runningStandalone", runningStandalone,
"queryFrontendEnabled", cfg.QueryFrontendEnabled,
"queryScheduleEnabled", cfg.QuerySchedulerEnabled,
"allEnabled", cfg.AllEnabled,
)

return runningStandalone
}
Loading