Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hook up MetricsQueryService to main funcs #3079

Merged
merged 11 commits into from
Jun 12, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 58 additions & 7 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package main

import (
"flag"
"fmt"
"io"
"log"
"os"
Expand Down Expand Up @@ -44,10 +46,12 @@ import (
"github.com/jaegertracing/jaeger/cmd/status"
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/version"
metricsPlugin "github.com/jaegertracing/jaeger/plugin/metrics"
ss "github.com/jaegertracing/jaeger/plugin/sampling/strategystore"
"github.com/jaegertracing/jaeger/plugin/storage"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/metricsstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
storageMetrics "github.com/jaegertracing/jaeger/storage/spanstore/metrics"
)
Expand All @@ -71,6 +75,12 @@ func main() {
log.Fatalf("Cannot initialize sampling strategy store factory: %v", err)
}

fc := metricsPlugin.FactoryConfigFromEnv()
metricsReaderFactory, err := createMetricsReaderFactory(fc)
if err != nil {
log.Fatalf("Cannot initialize metrics store factory: %v", err)
}

v := viper.New()
command := &cobra.Command{
Use: "jaeger-all-in-one",
Expand Down Expand Up @@ -107,6 +117,13 @@ by default uses only in-memory database.`,
logger.Fatal("Failed to create dependency reader", zap.Error(err))
}

// Ensure default parameter values are loaded correctly.
metricsReaderFactory.InitFromViper(v)
metricsReader, err := createMetricsReader(fc, metricsReaderFactory, logger)
if err != nil {
logger.Fatal("Failed to create metrics reader", zap.Error(err))
}

strategyStoreFactory.InitFromViper(v)
if err := strategyStoreFactory.Initialize(metricsFactory, logger); err != nil {
logger.Fatal("Failed to init sampling strategy store factory", zap.Error(err))
Expand Down Expand Up @@ -157,8 +174,8 @@ by default uses only in-memory database.`,
// query
querySrv := startQuery(
svc, qOpts, qOpts.BuildQueryServiceOptions(storageFactory, logger),
spanReader, dependencyReader,
rootMetricsFactory, metricsFactory,
spanReader, dependencyReader, metricsReader,
metricsFactory,
)

svc.RunAndThen(func() {
Expand All @@ -185,9 +202,7 @@ by default uses only in-memory database.`,
command.AddCommand(docs.Command(v))
command.AddCommand(status.Command(v, ports.CollectorAdminHTTP))

config.AddFlags(
v,
command,
inits := []func(*flag.FlagSet){
svc.AddFlags,
storageFactory.AddPipelineFlags,
agentApp.AddFlags,
Expand All @@ -196,6 +211,16 @@ by default uses only in-memory database.`,
collectorApp.AddFlags,
queryApp.AddFlags,
strategyStoreFactory.AddFlags,
}

// Only display the metrics backing store's config if the metrics query feature is enabled.
if metricsQueryEnabled(fc) {
inits = append(inits, metricsReaderFactory.AddFlags)
}
config.AddFlags(
v,
command,
inits...,
)

if err := command.Execute(); err != nil {
Expand Down Expand Up @@ -229,12 +254,13 @@ func startQuery(
queryOpts *querysvc.QueryServiceOptions,
spanReader spanstore.Reader,
depReader dependencystore.Reader,
rootFactory metrics.Factory,
metricsReader metricsstore.Reader,
baseFactory metrics.Factory,
) *queryApp.Server {
spanReader = storageMetrics.NewReadMetricsDecorator(spanReader, baseFactory.Namespace(metrics.NSOptions{Name: "query"}))
qs := querysvc.NewQueryService(spanReader, depReader, *queryOpts)
server, err := queryApp.NewServer(svc.Logger, qs, qOpts, opentracing.GlobalTracer())
mqs := querysvc.NewMetricsQueryService(metricsReader)
server, err := queryApp.NewServer(svc.Logger, qs, mqs, qOpts, opentracing.GlobalTracer())
if err != nil {
svc.Logger.Fatal("Could not start jaeger-query service", zap.Error(err))
}
Expand Down Expand Up @@ -272,3 +298,28 @@ func initTracer(metricsFactory metrics.Factory, logger *zap.Logger) io.Closer {
opentracing.SetGlobalTracer(tracer)
return closer
}

// metricsQueryEnabled returns whether if metrics querying capabilities are enabled.
// To avoid introducing a breaking change, the Metrics Querying feature must be
// explicitly opted-in by setting the METRICS_STORAGE_TYPE env var.
// An unset METRICS_STORAGE_TYPE will disable this feature.
func metricsQueryEnabled(fc metricsPlugin.FactoryConfig) bool {
return fc.MetricsStorageType != ""
}

func createMetricsReaderFactory(fc metricsPlugin.FactoryConfig) (*metricsPlugin.Factory, error) {
if !metricsQueryEnabled(fc) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are these checks still necessary? wouldn't you always get something back (perhaps disabled service)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's still necessary otherwise: Cannot initialize metrics store factory: unknown metrics type "". Valid types are [prometheus]

Do you think it's worth doing something similar with the factories? i.e. have disabledMetricsReaderFactory that simply returns a nil Reader? That way, these checks won't be necessary.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can extend the same approach - consider default metrics store type to be "none" and return a "disabled reader" in that case. Basically push this all the way down (you won't need "disabled service" in this case).

return nil, nil
}
return metricsPlugin.NewFactory(fc)
}

func createMetricsReader(fc metricsPlugin.FactoryConfig, factory *metricsPlugin.Factory, logger *zap.Logger) (metricsstore.Reader, error) {
if !metricsQueryEnabled(fc) {
return nil, nil
}
if err := factory.Initialize(logger); err != nil {
return nil, fmt.Errorf("failed to init metrics reader factory: %w", err)
}
return factory.CreateMetricsReader()
}
23 changes: 13 additions & 10 deletions cmd/query/app/querysvc/metrics_query_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,44 +28,47 @@ type MetricsQueryService struct {
metricsReader metricsstore.Reader
}

var errNilReader = errors.New("no reader defined for MetricsQueryService")
var errMetricsQueryDisabled = errors.New("metrics querying is currently disabled")

// NewMetricsQueryService returns a new MetricsQueryService.
// A nil reader will result in a nil MetricsQueryService being returned.
func NewMetricsQueryService(reader metricsstore.Reader) *MetricsQueryService {
return &MetricsQueryService{
metricsReader: reader,
}
}

func (mqs MetricsQueryService) Enabled() bool {
Copy link
Contributor Author

@albertteoh albertteoh Jun 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hopefully I understood your suggestion correctly, @yurishkuro.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, what I meant was adding this:

type nullMetricsQueryService struct {}
func (nullMetricsQueryService) GetLatencies(ctx context.Context, params *metricsstore.LatenciesQueryParameters) (*metrics.MetricFamily, error) { return nil, errNotEnabled }
...

and using it in RPC handlers. Basically, solve the problem with polymorphism, not with a bunch of nil checks all over the place.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's pretty cool :) although, unless if I'm mistaken, I see two minor issues:

  • The handlers will need to parse inputs, etc. then call GetLatencies, before it knows that Metrics Querying is disabled. I think it would be better to return early; but not sure it's possible with this approach.
  • Once calling GetLatencies, the handler needs to differentiate the errNotEnabled from an "exceptional" error case so it can return an appropriate error code rather than the generic StatusInternalServerError code, which is important as the UI needs to handle both cases differently.
    We could follow a pattern like this but there's no guarantee the error could be wrapped up, so I imagine we'd need use something like errors.Is.

what do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

point (1) is probably not particularly important, what's the downside of parsing inputs?

point (2) is valid if you want to differentiate real failure from not-implemented (grpc has UNIMPLEMENTED status code iirc). But the handler should look like this:

handler() {
    res, err := service.call()
    if err != nil {
      return nil, errWithStatusCode(err)
    }
}

There will be only one place in the code, errWithStatusCode(), that needs to understand errNotEnabled and transform it. Easy to test, rather than each handler branching to make checks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and yes, you'd want to use error.Is, it's the new Go way, what we have with if err == spanstore.ErrTraceNotFound { predates that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the downside of parsing inputs?

not a large downside, and the emphasis wasn't so much on the input parsing but on when the handler knows if the feature is enabled or disabled. I feel the intent is clearer when reading the handler code instead of needing to follow it to the point where the error is handled:

metricsHandler() {
    if notEnabled {
       return notImplementedErrorCode
    }
    res, err := service.call()
    ...
}

In any case, it's a small matter I think, and happy to go ahead with your suggestion.

return mqs.metricsReader != nil
}

// GetLatencies is the queryService implementation of metricsstore.Reader.
func (mqs MetricsQueryService) GetLatencies(ctx context.Context, params *metricsstore.LatenciesQueryParameters) (*metrics.MetricFamily, error) {
if mqs.metricsReader == nil {
return nil, errNilReader
if !mqs.Enabled() {
return nil, errMetricsQueryDisabled
}
return mqs.metricsReader.GetLatencies(ctx, params)
}

// GetCallRates is the queryService implementation of metricsstore.Reader.
func (mqs MetricsQueryService) GetCallRates(ctx context.Context, params *metricsstore.CallRateQueryParameters) (*metrics.MetricFamily, error) {
if mqs.metricsReader == nil {
return nil, errNilReader
if !mqs.Enabled() {
return nil, errMetricsQueryDisabled
}
return mqs.metricsReader.GetCallRates(ctx, params)
}

// GetErrorRates is the queryService implementation of metricsstore.Reader.
func (mqs MetricsQueryService) GetErrorRates(ctx context.Context, params *metricsstore.ErrorRateQueryParameters) (*metrics.MetricFamily, error) {
if mqs.metricsReader == nil {
return nil, errNilReader
if !mqs.Enabled() {
return nil, errMetricsQueryDisabled
}
return mqs.metricsReader.GetErrorRates(ctx, params)
}

// GetMinStepDuration is the queryService implementation of metricsstore.Reader.
func (mqs MetricsQueryService) GetMinStepDuration(ctx context.Context, params *metricsstore.MinStepDurationQueryParameters) (time.Duration, error) {
if mqs.metricsReader == nil {
return 0, errNilReader
if !mqs.Enabled() {
return 0, errMetricsQueryDisabled
}
return mqs.metricsReader.GetMinStepDuration(ctx, params)
}
8 changes: 4 additions & 4 deletions cmd/query/app/querysvc/metrics_query_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestGetLatenciesNilReader(t *testing.T) {
qParams := &metricsstore.LatenciesQueryParameters{}
r, err := qs.GetLatencies(context.Background(), qParams)
assert.Zero(t, r)
assert.EqualError(t, err, errNilReader.Error())
assert.EqualError(t, err, errMetricsQueryDisabled.Error())
}

// Test QueryService.GetCallRates()
Expand All @@ -86,7 +86,7 @@ func TestGetCallRatesNilReader(t *testing.T) {
qParams := &metricsstore.CallRateQueryParameters{}
r, err := qs.GetCallRates(context.Background(), qParams)
assert.Zero(t, r)
assert.EqualError(t, err, errNilReader.Error())
assert.EqualError(t, err, errMetricsQueryDisabled.Error())
}

// Test QueryService.GetErrorRates()
Expand All @@ -106,7 +106,7 @@ func TestGetErrorRatesNilReader(t *testing.T) {
qParams := &metricsstore.ErrorRateQueryParameters{}
r, err := qs.GetErrorRates(context.Background(), qParams)
assert.Zero(t, r)
assert.EqualError(t, err, errNilReader.Error())
assert.EqualError(t, err, errMetricsQueryDisabled.Error())
}

// Test QueryService.GetMinStepDurations()
Expand All @@ -126,5 +126,5 @@ func TestGetMinStepDurationsNilReader(t *testing.T) {
qParams := &metricsstore.MinStepDurationQueryParameters{}
r, err := qs.GetMinStepDuration(context.Background(), qParams)
assert.Zero(t, r)
assert.EqualError(t, err, errNilReader.Error())
assert.EqualError(t, err, errMetricsQueryDisabled.Error())
}
14 changes: 9 additions & 5 deletions cmd/query/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type Server struct {
}

// NewServer creates and initializes Server
func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, options *QueryOptions, tracer opentracing.Tracer) (*Server, error) {
func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, metricsQuerySvc *querysvc.MetricsQueryService, options *QueryOptions, tracer opentracing.Tracer) (*Server, error) {

_, httpPort, err := net.SplitHostPort(options.HTTPHostPort)
if err != nil {
Expand All @@ -67,12 +67,12 @@ func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, options *Que
return nil, errors.New("server with TLS enabled can not use same host ports for gRPC and HTTP. Use dedicated HTTP and gRPC host ports instead")
}

grpcServer, err := createGRPCServer(querySvc, options, logger, tracer)
grpcServer, err := createGRPCServer(querySvc, metricsQuerySvc, options, logger, tracer)
if err != nil {
return nil, err
}

httpServer, err := createHTTPServer(querySvc, options, tracer, logger)
httpServer, err := createHTTPServer(querySvc, metricsQuerySvc, options, tracer, logger)
if err != nil {
return nil, err
}
Expand All @@ -94,7 +94,7 @@ func (s Server) HealthCheckStatus() chan healthcheck.Status {
return s.unavailableChannel
}

func createGRPCServer(querySvc *querysvc.QueryService, options *QueryOptions, logger *zap.Logger, tracer opentracing.Tracer) (*grpc.Server, error) {
func createGRPCServer(querySvc *querysvc.QueryService, metricsQuerySvc *querysvc.MetricsQueryService, options *QueryOptions, logger *zap.Logger, tracer opentracing.Tracer) (*grpc.Server, error) {
var grpcOpts []grpc.ServerOption

if options.TLSGRPC.Enabled {
Expand All @@ -111,11 +111,15 @@ func createGRPCServer(querySvc *querysvc.QueryService, options *QueryOptions, lo
server := grpc.NewServer(grpcOpts...)

handler := NewGRPCHandler(querySvc, logger, tracer)

// TODO: Register MetricsQueryService
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will be addressed in a follow-up PR implementing MetricsQuery support for the gRPC handler.

api_v2.RegisterQueryServiceServer(server, handler)

return server, nil
}

func createHTTPServer(querySvc *querysvc.QueryService, queryOpts *QueryOptions, tracer opentracing.Tracer, logger *zap.Logger) (*http.Server, error) {
func createHTTPServer(querySvc *querysvc.QueryService, metricsQuerySvc *querysvc.MetricsQueryService, queryOpts *QueryOptions, tracer opentracing.Tracer, logger *zap.Logger) (*http.Server, error) {
// TODO: Add HandlerOptions.MetricsQueryService
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will be addressed in a follow-up PR implementing MetricsQuery support for the HTTP handler.

apiHandlerOptions := []HandlerOption{
HandlerOptions.Logger(logger),
HandlerOptions.Tracer(tracer),
Expand Down
30 changes: 18 additions & 12 deletions cmd/query/app/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"testing"
"time"

opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -64,7 +64,7 @@ func TestCreateTLSServerSinglePortError(t *testing.T) {
ClientCAPath: testCertKeyLocation + "/example-CA-cert.pem",
}

_, err := NewServer(zap.NewNop(), &querysvc.QueryService{},
_, err := NewServer(zap.NewNop(), &querysvc.QueryService{}, &querysvc.MetricsQueryService{},
&QueryOptions{HTTPHostPort: ":8080", GRPCHostPort: ":8080", TLSGRPC: tlsCfg, TLSHTTP: tlsCfg}, opentracing.NoopTracer{})
assert.NotNil(t, err)
}
Expand All @@ -77,7 +77,7 @@ func TestCreateTLSGrpcServerError(t *testing.T) {
ClientCAPath: "invalid/path",
}

_, err := NewServer(zap.NewNop(), &querysvc.QueryService{},
_, err := NewServer(zap.NewNop(), &querysvc.QueryService{}, &querysvc.MetricsQueryService{},
&QueryOptions{HTTPHostPort: ":8080", GRPCHostPort: ":8081", TLSGRPC: tlsCfg}, opentracing.NoopTracer{})
assert.NotNil(t, err)
}
Expand All @@ -90,7 +90,7 @@ func TestCreateTLSHttpServerError(t *testing.T) {
ClientCAPath: "invalid/path",
}

_, err := NewServer(zap.NewNop(), &querysvc.QueryService{},
_, err := NewServer(zap.NewNop(), &querysvc.QueryService{}, &querysvc.MetricsQueryService{},
&QueryOptions{HTTPHostPort: ":8080", GRPCHostPort: ":8081", TLSHTTP: tlsCfg}, opentracing.NoopTracer{})
assert.NotNil(t, err)
}
Expand Down Expand Up @@ -331,7 +331,8 @@ func TestServerHTTPTLS(t *testing.T) {
spanReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil)

querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{})
server, err := NewServer(flagsSvc.Logger, querySvc,
metricsQuerySvc := querysvc.NewMetricsQueryService(nil)
server, err := NewServer(flagsSvc.Logger, querySvc, metricsQuerySvc,
serverOptions,
opentracing.NoopTracer{})
assert.Nil(t, err)
Expand Down Expand Up @@ -491,7 +492,8 @@ func TestServerGRPCTLS(t *testing.T) {
spanReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil)

querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{})
server, err := NewServer(flagsSvc.Logger, querySvc,
metricsQuerySvc := querysvc.NewMetricsQueryService(nil)
server, err := NewServer(flagsSvc.Logger, querySvc, metricsQuerySvc,
serverOptions,
opentracing.NoopTracer{})
assert.Nil(t, err)
Expand Down Expand Up @@ -545,12 +547,12 @@ func TestServerGRPCTLS(t *testing.T) {

}
func TestServerBadHostPort(t *testing.T) {
_, err := NewServer(zap.NewNop(), &querysvc.QueryService{},
_, err := NewServer(zap.NewNop(), &querysvc.QueryService{}, &querysvc.MetricsQueryService{},
&QueryOptions{HTTPHostPort: "8080", GRPCHostPort: "127.0.0.1:8081", BearerTokenPropagation: true},
opentracing.NoopTracer{})

assert.NotNil(t, err)
_, err = NewServer(zap.NewNop(), &querysvc.QueryService{},
_, err = NewServer(zap.NewNop(), &querysvc.QueryService{}, &querysvc.MetricsQueryService{},
&QueryOptions{HTTPHostPort: "127.0.0.1:8081", GRPCHostPort: "9123", BearerTokenPropagation: true},
opentracing.NoopTracer{})

Expand All @@ -576,6 +578,7 @@ func TestServerInUseHostPort(t *testing.T) {
server, err := NewServer(
zap.NewNop(),
&querysvc.QueryService{},
&querysvc.MetricsQueryService{},
&QueryOptions{
HTTPHostPort: tc.httpHostPort,
GRPCHostPort: tc.grpcHostPort,
Expand Down Expand Up @@ -608,8 +611,8 @@ func TestServerSinglePort(t *testing.T) {
spanReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil)

querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{})

server, err := NewServer(flagsSvc.Logger, querySvc,
metricsQuerySvc := querysvc.NewMetricsQueryService(nil)
server, err := NewServer(flagsSvc.Logger, querySvc, metricsQuerySvc,
&QueryOptions{GRPCHostPort: hostPort, HTTPHostPort: hostPort, BearerTokenPropagation: true},
opentracing.NoopTracer{})
assert.Nil(t, err)
Expand Down Expand Up @@ -658,8 +661,10 @@ func TestServerGracefulExit(t *testing.T) {
hostPort := ports.PortToHostPort(ports.QueryAdminHTTP)

querySvc := &querysvc.QueryService{}
metricsQuerySvc := &querysvc.MetricsQueryService{}
tracer := opentracing.NoopTracer{}
server, err := NewServer(flagsSvc.Logger, querySvc, &QueryOptions{GRPCHostPort: hostPort, HTTPHostPort: hostPort}, tracer)

server, err := NewServer(flagsSvc.Logger, querySvc, metricsQuerySvc, &QueryOptions{GRPCHostPort: hostPort, HTTPHostPort: hostPort}, tracer)
assert.Nil(t, err)
assert.NoError(t, server.Start())
go func() {
Expand All @@ -685,8 +690,9 @@ func TestServerHandlesPortZero(t *testing.T) {
flagsSvc.Logger = zap.New(zapCore)

querySvc := &querysvc.QueryService{}
metricsQuerySvc := &querysvc.MetricsQueryService{}
tracer := opentracing.NoopTracer{}
server, err := NewServer(flagsSvc.Logger, querySvc, &QueryOptions{GRPCHostPort: ":0", HTTPHostPort: ":0"}, tracer)
server, err := NewServer(flagsSvc.Logger, querySvc, metricsQuerySvc, &QueryOptions{GRPCHostPort: ":0", HTTPHostPort: ":0"}, tracer)
assert.Nil(t, err)
assert.NoError(t, server.Start())
server.Close()
Expand Down
Loading