Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hook up MetricsQueryService to main funcs #3079

Merged
merged 11 commits into from
Jun 12, 2021
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 30 additions & 4 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package main

import (
"fmt"
"io"
"log"
"os"
Expand Down Expand Up @@ -44,10 +45,12 @@ import (
"github.com/jaegertracing/jaeger/cmd/status"
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/version"
metricsPlugin "github.com/jaegertracing/jaeger/plugin/metrics"
ss "github.com/jaegertracing/jaeger/plugin/sampling/strategystore"
"github.com/jaegertracing/jaeger/plugin/storage"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/metricsstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
storageMetrics "github.com/jaegertracing/jaeger/storage/spanstore/metrics"
)
Expand All @@ -71,6 +74,12 @@ func main() {
log.Fatalf("Cannot initialize sampling strategy store factory: %v", err)
}

fc := metricsPlugin.FactoryConfigFromEnv()
metricsReaderFactory, err := metricsPlugin.NewFactory(fc)
if err != nil {
log.Fatalf("Cannot initialize metrics store factory: %v", err)
}

v := viper.New()
command := &cobra.Command{
Use: "jaeger-all-in-one",
Expand Down Expand Up @@ -107,6 +116,11 @@ by default uses only in-memory database.`,
logger.Fatal("Failed to create dependency reader", zap.Error(err))
}

metricsReader, err := createMetricsReader(metricsReaderFactory, v, logger)
if err != nil {
logger.Fatal("Failed to create metrics reader", zap.Error(err))
}

strategyStoreFactory.InitFromViper(v)
if err := strategyStoreFactory.Initialize(metricsFactory, logger); err != nil {
logger.Fatal("Failed to init sampling strategy store factory", zap.Error(err))
Expand Down Expand Up @@ -157,8 +171,8 @@ by default uses only in-memory database.`,
// query
querySrv := startQuery(
svc, qOpts, qOpts.BuildQueryServiceOptions(storageFactory, logger),
spanReader, dependencyReader,
rootMetricsFactory, metricsFactory,
spanReader, dependencyReader, metricsReader,
metricsFactory,
)

svc.RunAndThen(func() {
Expand Down Expand Up @@ -196,6 +210,7 @@ by default uses only in-memory database.`,
collectorApp.AddFlags,
queryApp.AddFlags,
strategyStoreFactory.AddFlags,
metricsReaderFactory.AddFlags,
)

if err := command.Execute(); err != nil {
Expand Down Expand Up @@ -229,12 +244,13 @@ func startQuery(
queryOpts *querysvc.QueryServiceOptions,
spanReader spanstore.Reader,
depReader dependencystore.Reader,
rootFactory metrics.Factory,
metricsReader metricsstore.Reader,
baseFactory metrics.Factory,
) *queryApp.Server {
spanReader = storageMetrics.NewReadMetricsDecorator(spanReader, baseFactory.Namespace(metrics.NSOptions{Name: "query"}))
qs := querysvc.NewQueryService(spanReader, depReader, *queryOpts)
server, err := queryApp.NewServer(svc.Logger, qs, qOpts, opentracing.GlobalTracer())
mqs := querysvc.NewMetricsQueryService(metricsReader)
server, err := queryApp.NewServer(svc.Logger, qs, mqs, qOpts, opentracing.GlobalTracer())
if err != nil {
svc.Logger.Fatal("Could not start jaeger-query service", zap.Error(err))
}
Expand Down Expand Up @@ -272,3 +288,13 @@ func initTracer(metricsFactory metrics.Factory, logger *zap.Logger) io.Closer {
opentracing.SetGlobalTracer(tracer)
return closer
}

func createMetricsReader(factory *metricsPlugin.Factory, v *viper.Viper, logger *zap.Logger) (metricsstore.Reader, error) {
if err := factory.Initialize(logger); err != nil {
return nil, fmt.Errorf("failed to init metrics reader factory: %w", err)
}

// Ensure default parameter values are loaded correctly.
factory.InitFromViper(v)
return factory.CreateMetricsReader()
}
18 changes: 1 addition & 17 deletions cmd/query/app/querysvc/metrics_query_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,18 @@ package querysvc

import (
"context"
"errors"
"time"

"github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics"
"github.com/jaegertracing/jaeger/storage/metricsstore"
)

// MetricsQueryService contains the underlying reader required for querying the metrics store.
// MetricsQueryService provides a means of querying R.E.D metrics from an underlying metrics store.
type MetricsQueryService struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is MetricsQueryService supposed to add on top of metricsstore.Reader?

Note that we used to have HTTP handler use SpanReader directly, but then refactored it into QueryService because we needed to share it with GRPC Handler and the query service was an abstraction on top of two span readers: primary and archive. In your case there doesn't seem to be any additional abstraction needed, you just proxy all calls into the reader directly. Not a huge issue to leave as is, but just curious if there's any thinking that a layer of abstraction would be needed in the future.

Copy link
Contributor Author

@albertteoh albertteoh Jun 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed MetricsQueryService adds little on top of metricsstore.Reader. It has exactly the same function signature as the Reader and yes, it simply forwards the call. I also don't see a need for multiple metrics readers at runtime, for now at least.

It was added to follow the same design as span/dependency querying to avoid confusing/surprising those following the code and seeing a QueryService being used by handlers to query spans/dependencies, then a Reader used to query metrics; when they may expect the handlers to refer to "QueryService" abstractions for querying spans, dependencies and metrics.

One possibility to avoid confusion, yet minimizing unnecessary code, is to make MetricsQueryService an interface, embedding metricsstore.Reader, meaning we can remove the MetricsQueryService struct and its proxy functions altogether and simply give the handlers a metricsstore.Reader instance. That way, handlers still refer to the "MetricsQueryService" alias for metrics, but instead using a Reader implementation. Something like:

type MetricsQueryService interface {
	metricsstore.Reader
}

metricsReader metricsstore.Reader
}

var errNilReader = errors.New("no reader defined for MetricsQueryService")

// NewMetricsQueryService returns a new MetricsQueryService.
// A nil reader will result in a nil MetricsQueryService being returned.
func NewMetricsQueryService(reader metricsstore.Reader) *MetricsQueryService {
return &MetricsQueryService{
metricsReader: reader,
Expand All @@ -40,32 +36,20 @@ func NewMetricsQueryService(reader metricsstore.Reader) *MetricsQueryService {

// GetLatencies is the queryService implementation of metricsstore.Reader.
func (mqs MetricsQueryService) GetLatencies(ctx context.Context, params *metricsstore.LatenciesQueryParameters) (*metrics.MetricFamily, error) {
if mqs.metricsReader == nil {
return nil, errNilReader
}
return mqs.metricsReader.GetLatencies(ctx, params)
}

// GetCallRates is the queryService implementation of metricsstore.Reader.
func (mqs MetricsQueryService) GetCallRates(ctx context.Context, params *metricsstore.CallRateQueryParameters) (*metrics.MetricFamily, error) {
if mqs.metricsReader == nil {
return nil, errNilReader
}
return mqs.metricsReader.GetCallRates(ctx, params)
}

// GetErrorRates is the queryService implementation of metricsstore.Reader.
func (mqs MetricsQueryService) GetErrorRates(ctx context.Context, params *metricsstore.ErrorRateQueryParameters) (*metrics.MetricFamily, error) {
if mqs.metricsReader == nil {
return nil, errNilReader
}
return mqs.metricsReader.GetErrorRates(ctx, params)
}

// GetMinStepDuration is the queryService implementation of metricsstore.Reader.
func (mqs MetricsQueryService) GetMinStepDuration(ctx context.Context, params *metricsstore.MinStepDurationQueryParameters) (time.Duration, error) {
if mqs.metricsReader == nil {
return 0, errNilReader
}
return mqs.metricsReader.GetMinStepDuration(ctx, params)
}
34 changes: 0 additions & 34 deletions cmd/query/app/querysvc/metrics_query_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,9 @@ type testMetricsQueryService struct {

func initializeTestMetricsQueryService() *testMetricsQueryService {
metricsReader := &metricsmocks.Reader{}

tqs := testMetricsQueryService{
metricsReader: metricsReader,
}

tqs.queryService = NewMetricsQueryService(metricsReader)
return &tqs
}
Expand All @@ -58,14 +56,6 @@ func TestGetLatencies(t *testing.T) {
assert.Equal(t, expectedLatencies, actualLatencies)
}

func TestGetLatenciesNilReader(t *testing.T) {
qs := NewMetricsQueryService(nil)
qParams := &metricsstore.LatenciesQueryParameters{}
r, err := qs.GetLatencies(context.Background(), qParams)
assert.Zero(t, r)
assert.EqualError(t, err, errNilReader.Error())
}

// Test QueryService.GetCallRates()
func TestGetCallRates(t *testing.T) {
tqs := initializeTestMetricsQueryService()
Expand All @@ -81,14 +71,6 @@ func TestGetCallRates(t *testing.T) {
assert.Equal(t, expectedCallRates, actualCallRates)
}

func TestGetCallRatesNilReader(t *testing.T) {
qs := NewMetricsQueryService(nil)
qParams := &metricsstore.CallRateQueryParameters{}
r, err := qs.GetCallRates(context.Background(), qParams)
assert.Zero(t, r)
assert.EqualError(t, err, errNilReader.Error())
}

// Test QueryService.GetErrorRates()
func TestGetErrorRates(t *testing.T) {
tqs := initializeTestMetricsQueryService()
Expand All @@ -101,14 +83,6 @@ func TestGetErrorRates(t *testing.T) {
assert.Equal(t, expectedErrorRates, actualErrorRates)
}

func TestGetErrorRatesNilReader(t *testing.T) {
qs := NewMetricsQueryService(nil)
qParams := &metricsstore.ErrorRateQueryParameters{}
r, err := qs.GetErrorRates(context.Background(), qParams)
assert.Zero(t, r)
assert.EqualError(t, err, errNilReader.Error())
}

// Test QueryService.GetMinStepDurations()
func TestGetMinStepDurations(t *testing.T) {
tqs := initializeTestMetricsQueryService()
Expand All @@ -120,11 +94,3 @@ func TestGetMinStepDurations(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, expectedMinStep, actualMinStep)
}

func TestGetMinStepDurationsNilReader(t *testing.T) {
qs := NewMetricsQueryService(nil)
qParams := &metricsstore.MinStepDurationQueryParameters{}
r, err := qs.GetMinStepDuration(context.Background(), qParams)
assert.Zero(t, r)
assert.EqualError(t, err, errNilReader.Error())
}
14 changes: 9 additions & 5 deletions cmd/query/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type Server struct {
}

// NewServer creates and initializes Server
func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, options *QueryOptions, tracer opentracing.Tracer) (*Server, error) {
func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, metricsQuerySvc *querysvc.MetricsQueryService, options *QueryOptions, tracer opentracing.Tracer) (*Server, error) {

_, httpPort, err := net.SplitHostPort(options.HTTPHostPort)
if err != nil {
Expand All @@ -67,12 +67,12 @@ func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, options *Que
return nil, errors.New("server with TLS enabled can not use same host ports for gRPC and HTTP. Use dedicated HTTP and gRPC host ports instead")
}

grpcServer, err := createGRPCServer(querySvc, options, logger, tracer)
grpcServer, err := createGRPCServer(querySvc, metricsQuerySvc, options, logger, tracer)
if err != nil {
return nil, err
}

httpServer, err := createHTTPServer(querySvc, options, tracer, logger)
httpServer, err := createHTTPServer(querySvc, metricsQuerySvc, options, tracer, logger)
if err != nil {
return nil, err
}
Expand All @@ -94,7 +94,7 @@ func (s Server) HealthCheckStatus() chan healthcheck.Status {
return s.unavailableChannel
}

func createGRPCServer(querySvc *querysvc.QueryService, options *QueryOptions, logger *zap.Logger, tracer opentracing.Tracer) (*grpc.Server, error) {
func createGRPCServer(querySvc *querysvc.QueryService, metricsQuerySvc *querysvc.MetricsQueryService, options *QueryOptions, logger *zap.Logger, tracer opentracing.Tracer) (*grpc.Server, error) {
var grpcOpts []grpc.ServerOption

if options.TLSGRPC.Enabled {
Expand All @@ -111,11 +111,15 @@ func createGRPCServer(querySvc *querysvc.QueryService, options *QueryOptions, lo
server := grpc.NewServer(grpcOpts...)

handler := NewGRPCHandler(querySvc, logger, tracer)

// TODO: Register MetricsQueryService
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will be addressed in a follow-up PR implementing MetricsQuery support for the gRPC handler.

api_v2.RegisterQueryServiceServer(server, handler)

return server, nil
}

func createHTTPServer(querySvc *querysvc.QueryService, queryOpts *QueryOptions, tracer opentracing.Tracer, logger *zap.Logger) (*http.Server, error) {
func createHTTPServer(querySvc *querysvc.QueryService, metricsQuerySvc *querysvc.MetricsQueryService, queryOpts *QueryOptions, tracer opentracing.Tracer, logger *zap.Logger) (*http.Server, error) {
// TODO: Add HandlerOptions.MetricsQueryService
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will be addressed in a follow-up PR implementing MetricsQuery support for the HTTP handler.

apiHandlerOptions := []HandlerOption{
HandlerOptions.Logger(logger),
HandlerOptions.Tracer(tracer),
Expand Down
30 changes: 18 additions & 12 deletions cmd/query/app/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"testing"
"time"

opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -64,7 +64,7 @@ func TestCreateTLSServerSinglePortError(t *testing.T) {
ClientCAPath: testCertKeyLocation + "/example-CA-cert.pem",
}

_, err := NewServer(zap.NewNop(), &querysvc.QueryService{},
_, err := NewServer(zap.NewNop(), &querysvc.QueryService{}, querysvc.NewMetricsQueryService(nil),
&QueryOptions{HTTPHostPort: ":8080", GRPCHostPort: ":8080", TLSGRPC: tlsCfg, TLSHTTP: tlsCfg}, opentracing.NoopTracer{})
assert.NotNil(t, err)
}
Expand All @@ -77,7 +77,7 @@ func TestCreateTLSGrpcServerError(t *testing.T) {
ClientCAPath: "invalid/path",
}

_, err := NewServer(zap.NewNop(), &querysvc.QueryService{},
_, err := NewServer(zap.NewNop(), &querysvc.QueryService{}, querysvc.NewMetricsQueryService(nil),
&QueryOptions{HTTPHostPort: ":8080", GRPCHostPort: ":8081", TLSGRPC: tlsCfg}, opentracing.NoopTracer{})
assert.NotNil(t, err)
}
Expand All @@ -90,7 +90,7 @@ func TestCreateTLSHttpServerError(t *testing.T) {
ClientCAPath: "invalid/path",
}

_, err := NewServer(zap.NewNop(), &querysvc.QueryService{},
_, err := NewServer(zap.NewNop(), &querysvc.QueryService{}, querysvc.NewMetricsQueryService(nil),
&QueryOptions{HTTPHostPort: ":8080", GRPCHostPort: ":8081", TLSHTTP: tlsCfg}, opentracing.NoopTracer{})
assert.NotNil(t, err)
}
Expand Down Expand Up @@ -331,7 +331,8 @@ func TestServerHTTPTLS(t *testing.T) {
spanReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil)

querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{})
server, err := NewServer(flagsSvc.Logger, querySvc,
metricsQuerySvc := querysvc.NewMetricsQueryService(nil)
server, err := NewServer(flagsSvc.Logger, querySvc, metricsQuerySvc,
serverOptions,
opentracing.NoopTracer{})
assert.Nil(t, err)
Expand Down Expand Up @@ -491,7 +492,8 @@ func TestServerGRPCTLS(t *testing.T) {
spanReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil)

querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{})
server, err := NewServer(flagsSvc.Logger, querySvc,
metricsQuerySvc := querysvc.NewMetricsQueryService(nil)
server, err := NewServer(flagsSvc.Logger, querySvc, metricsQuerySvc,
serverOptions,
opentracing.NoopTracer{})
assert.Nil(t, err)
Expand Down Expand Up @@ -545,12 +547,12 @@ func TestServerGRPCTLS(t *testing.T) {

}
func TestServerBadHostPort(t *testing.T) {
_, err := NewServer(zap.NewNop(), &querysvc.QueryService{},
_, err := NewServer(zap.NewNop(), &querysvc.QueryService{}, querysvc.NewMetricsQueryService(nil),
&QueryOptions{HTTPHostPort: "8080", GRPCHostPort: "127.0.0.1:8081", BearerTokenPropagation: true},
opentracing.NoopTracer{})

assert.NotNil(t, err)
_, err = NewServer(zap.NewNop(), &querysvc.QueryService{},
_, err = NewServer(zap.NewNop(), &querysvc.QueryService{}, querysvc.NewMetricsQueryService(nil),
&QueryOptions{HTTPHostPort: "127.0.0.1:8081", GRPCHostPort: "9123", BearerTokenPropagation: true},
opentracing.NoopTracer{})

Expand All @@ -576,6 +578,7 @@ func TestServerInUseHostPort(t *testing.T) {
server, err := NewServer(
zap.NewNop(),
&querysvc.QueryService{},
querysvc.NewMetricsQueryService(nil),
&QueryOptions{
HTTPHostPort: tc.httpHostPort,
GRPCHostPort: tc.grpcHostPort,
Expand Down Expand Up @@ -608,8 +611,8 @@ func TestServerSinglePort(t *testing.T) {
spanReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil)

querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{})

server, err := NewServer(flagsSvc.Logger, querySvc,
metricsQuerySvc := querysvc.NewMetricsQueryService(nil)
server, err := NewServer(flagsSvc.Logger, querySvc, metricsQuerySvc,
&QueryOptions{GRPCHostPort: hostPort, HTTPHostPort: hostPort, BearerTokenPropagation: true},
opentracing.NoopTracer{})
assert.Nil(t, err)
Expand Down Expand Up @@ -658,8 +661,10 @@ func TestServerGracefulExit(t *testing.T) {
hostPort := ports.PortToHostPort(ports.QueryAdminHTTP)

querySvc := &querysvc.QueryService{}
metricsQuerySvc := querysvc.NewMetricsQueryService(nil)
tracer := opentracing.NoopTracer{}
server, err := NewServer(flagsSvc.Logger, querySvc, &QueryOptions{GRPCHostPort: hostPort, HTTPHostPort: hostPort}, tracer)

server, err := NewServer(flagsSvc.Logger, querySvc, metricsQuerySvc, &QueryOptions{GRPCHostPort: hostPort, HTTPHostPort: hostPort}, tracer)
assert.Nil(t, err)
assert.NoError(t, server.Start())
go func() {
Expand All @@ -685,8 +690,9 @@ func TestServerHandlesPortZero(t *testing.T) {
flagsSvc.Logger = zap.New(zapCore)

querySvc := &querysvc.QueryService{}
metricsQuerySvc := querysvc.NewMetricsQueryService(nil)
tracer := opentracing.NoopTracer{}
server, err := NewServer(flagsSvc.Logger, querySvc, &QueryOptions{GRPCHostPort: ":0", HTTPHostPort: ":0"}, tracer)
server, err := NewServer(flagsSvc.Logger, querySvc, metricsQuerySvc, &QueryOptions{GRPCHostPort: ":0", HTTPHostPort: ":0"}, tracer)
assert.Nil(t, err)
assert.NoError(t, server.Start())
server.Close()
Expand Down
Loading