diff --git a/cmd/all-in-one/main.go b/cmd/all-in-one/main.go index e159f159af4..8e2032265df 100644 --- a/cmd/all-in-one/main.go +++ b/cmd/all-in-one/main.go @@ -19,23 +19,16 @@ import ( "fmt" "io" "log" - "net" - "net/http" "os" - "strconv" - "github.com/gorilla/mux" "github.com/opentracing/opentracing-go" "github.com/spf13/cobra" "github.com/spf13/viper" jaegerClientConfig "github.com/uber/jaeger-client-go/config" jaegerClientZapLog "github.com/uber/jaeger-client-go/log/zap" "github.com/uber/jaeger-lib/metrics" - "github.com/uber/tchannel-go" - "github.com/uber/tchannel-go/thrift" _ "go.uber.org/automaxprocs" "go.uber.org/zap" - "google.golang.org/grpc" agentApp "github.com/jaegertracing/jaeger/cmd/agent/app" agentRep "github.com/jaegertracing/jaeger/cmd/agent/app/reporter" @@ -43,20 +36,12 @@ import ( agentTchanRep "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/tchannel" "github.com/jaegertracing/jaeger/cmd/all-in-one/setupcontext" collectorApp "github.com/jaegertracing/jaeger/cmd/collector/app" - collector "github.com/jaegertracing/jaeger/cmd/collector/app/builder" - "github.com/jaegertracing/jaeger/cmd/collector/app/grpcserver" - "github.com/jaegertracing/jaeger/cmd/collector/app/sampling" - "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore" - "github.com/jaegertracing/jaeger/cmd/collector/app/zipkin" "github.com/jaegertracing/jaeger/cmd/docs" "github.com/jaegertracing/jaeger/cmd/env" "github.com/jaegertracing/jaeger/cmd/flags" queryApp "github.com/jaegertracing/jaeger/cmd/query/app" "github.com/jaegertracing/jaeger/cmd/query/app/querysvc" - clientcfgHandler "github.com/jaegertracing/jaeger/pkg/clientcfg/clientcfghttp" "github.com/jaegertracing/jaeger/pkg/config" - "github.com/jaegertracing/jaeger/pkg/healthcheck" - "github.com/jaegertracing/jaeger/pkg/recoveryhandler" "github.com/jaegertracing/jaeger/pkg/version" ss "github.com/jaegertracing/jaeger/plugin/sampling/strategystore" "github.com/jaegertracing/jaeger/plugin/storage" @@ -65,9 +50,6 @@ import ( "github.com/jaegertracing/jaeger/storage/dependencystore" "github.com/jaegertracing/jaeger/storage/spanstore" storageMetrics "github.com/jaegertracing/jaeger/storage/spanstore/metrics" - jc "github.com/jaegertracing/jaeger/thrift-gen/jaeger" - sc "github.com/jaegertracing/jaeger/thrift-gen/sampling" - zc "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" ) // all-in-one/main is a standalone full-stack jaeger backend, backed by a memory store @@ -108,6 +90,7 @@ by default uses only in-memory database.`, if err := storageFactory.Initialize(metricsFactory, logger); err != nil { logger.Fatal("Failed to init storage factory", zap.Error(err)) } + spanReader, err := storageFactory.CreateSpanReader() if err != nil { logger.Fatal("Failed to create span reader", zap.Error(err)) @@ -122,17 +105,32 @@ by default uses only in-memory database.`, } strategyStoreFactory.InitFromViper(v) - strategyStore := initSamplingStrategyStore(strategyStoreFactory, metricsFactory, logger) + if err := strategyStoreFactory.Initialize(metricsFactory, logger); err != nil { + logger.Fatal("Failed to init sampling strategy store factory", zap.Error(err)) + } + strategyStore, err := strategyStoreFactory.CreateStrategyStore() + if err != nil { + logger.Fatal("Failed to create sampling strategy store", zap.Error(err)) + } aOpts := new(agentApp.Builder).InitFromViper(v) repOpts := new(agentRep.Options).InitFromViper(v, logger) tchanBuilder := agentTchanRep.NewBuilder().InitFromViper(v, logger) grpcBuilder := agentGrpcRep.NewConnBuilder().InitFromViper(v) - cOpts := new(collector.CollectorOptions).InitFromViper(v) + cOpts := new(collectorApp.CollectorOptions).InitFromViper(v) qOpts := new(queryApp.QueryOptions).InitFromViper(v, logger) - collectorSrv := startCollector(cOpts, spanWriter, logger, metricsFactory, strategyStore, svc.HC()) - startAgent(aOpts, repOpts, tchanBuilder, grpcBuilder, cOpts, logger, metricsFactory) + c := collectorApp.New(&collectorApp.CollectorParams{ + ServiceName: "jaeger-collector", + Logger: logger, + MetricsFactory: metricsFactory, + SpanWriter: spanWriter, + StrategyStore: strategyStore, + HealthCheck: svc.HC(), + }) + c.Start(cOpts) + + startAgent(aOpts, repOpts, tchanBuilder, grpcBuilder, cOpts.CollectorGRPCPort, logger, metricsFactory) querySrv := startQuery( svc, qOpts, archiveOptions(storageFactory, logger), spanReader, dependencyReader, @@ -140,7 +138,7 @@ by default uses only in-memory database.`, ) svc.RunAndThen(func() { - collectorSrv.GracefulStop() + c.Close() querySrv.Close() if closer, ok := spanWriter.(io.Closer); ok { err := closer.Close() @@ -167,7 +165,7 @@ by default uses only in-memory database.`, agentRep.AddFlags, agentTchanRep.AddFlags, agentGrpcRep.AddFlags, - collector.AddFlags, + collectorApp.AddFlags, queryApp.AddFlags, strategyStoreFactory.AddFlags, ) @@ -183,13 +181,13 @@ func startAgent( repOpts *agentRep.Options, tchanBuilder *agentTchanRep.Builder, grpcBuilder *agentGrpcRep.ConnBuilder, - cOpts *collector.CollectorOptions, + collectorGRPCPort int, logger *zap.Logger, baseFactory metrics.Factory, ) { metricsFactory := baseFactory.Namespace(metrics.NSOptions{Name: "agent", Tags: nil}) - grpcBuilder.CollectorHostPorts = append(grpcBuilder.CollectorHostPorts, fmt.Sprintf("127.0.0.1:%d", cOpts.CollectorGRPCPort)) + grpcBuilder.CollectorHostPorts = append(grpcBuilder.CollectorHostPorts, fmt.Sprintf("127.0.0.1:%d", collectorGRPCPort)) cp, err := agentApp.CreateCollectorProxy(repOpts, tchanBuilder, grpcBuilder, logger, metricsFactory) if err != nil { logger.Fatal("Could not create collector proxy", zap.Error(err)) @@ -206,116 +204,6 @@ func startAgent( } } -func startCollector( - cOpts *collector.CollectorOptions, - spanWriter spanstore.Writer, - logger *zap.Logger, - baseFactory metrics.Factory, - strategyStore strategystore.StrategyStore, - hc *healthcheck.HealthCheck, -) *grpc.Server { - metricsFactory := baseFactory.Namespace(metrics.NSOptions{Name: "collector", Tags: nil}) - - spanHandlerBuilder := &collector.SpanHandlerBuilder{ - SpanWriter: spanWriter, - CollectorOpts: *cOpts, - Logger: logger, - MetricsFactory: metricsFactory, - } - - zipkinSpansHandler, jaegerBatchesHandler, grpcHandler := spanHandlerBuilder.BuildHandlers() - - { - ch, err := tchannel.NewChannel("jaeger-collector", &tchannel.ChannelOptions{}) - if err != nil { - logger.Fatal("Unable to create new TChannel", zap.Error(err)) - } - server := thrift.NewServer(ch) - batchHandler := collectorApp.NewTChannelHandler(jaegerBatchesHandler, zipkinSpansHandler) - server.Register(jc.NewTChanCollectorServer(batchHandler)) - server.Register(zc.NewTChanZipkinCollectorServer(batchHandler)) - server.Register(sc.NewTChanSamplingManagerServer(sampling.NewHandler(strategyStore))) - portStr := ":" + strconv.Itoa(cOpts.CollectorPort) - listener, err := net.Listen("tcp", portStr) - if err != nil { - logger.Fatal("Unable to start listening on channel", zap.Error(err)) - } - logger.Info("Starting jaeger-collector TChannel server", zap.Int("port", cOpts.CollectorPort)) - logger.Warn("TChannel has been deprecated and will be removed in a future release") - ch.Serve(listener) - } - - server, err := startGRPCServer(cOpts.CollectorGRPCPort, grpcHandler, strategyStore, logger) - if err != nil { - logger.Fatal("Could not start gRPC collector", zap.Error(err)) - } - - { - r := mux.NewRouter() - apiHandler := collectorApp.NewAPIHandler(jaegerBatchesHandler) - apiHandler.RegisterRoutes(r) - - cfgHandler := clientcfgHandler.NewHTTPHandler(clientcfgHandler.HTTPHandlerParams{ - ConfigManager: &clientcfgHandler.ConfigManager{ - SamplingStrategyStore: strategyStore, - // TODO provide baggage manager - }, - MetricsFactory: metricsFactory, - BasePath: "/api", - LegacySamplingEndpoint: false, - }) - cfgHandler.RegisterRoutes(r) - - recoveryHandler := recoveryhandler.NewRecoveryHandler(logger, true) - go startZipkinHTTPAPI(logger, cOpts.CollectorZipkinHTTPPort, zipkinSpansHandler, recoveryHandler) - - httpPortStr := ":" + strconv.Itoa(cOpts.CollectorHTTPPort) - logger.Info("Starting jaeger-collector HTTP server", zap.String("http-host-port", httpPortStr)) - go func() { - if err := http.ListenAndServe(httpPortStr, recoveryHandler(r)); err != nil { - logger.Fatal("Could not launch jaeger-collector HTTP server", zap.Error(err)) - } - hc.Set(healthcheck.Unavailable) - }() - } - return server -} - -func startGRPCServer( - port int, - handler *collectorApp.GRPCHandler, - samplingStore strategystore.StrategyStore, - logger *zap.Logger, -) (*grpc.Server, error) { - server := grpc.NewServer() - _, err := grpcserver.StartGRPCCollector(port, server, handler, samplingStore, logger, func(err error) { - logger.Fatal("gRPC collector failed", zap.Error(err)) - }) - if err != nil { - return nil, err - } - return server, err -} - -func startZipkinHTTPAPI( - logger *zap.Logger, - zipkinPort int, - zipkinSpansHandler collectorApp.ZipkinSpansHandler, - recoveryHandler func(http.Handler) http.Handler, -) { - if zipkinPort != 0 { - r := mux.NewRouter() - zHandler := zipkin.NewAPIHandler(zipkinSpansHandler) - zHandler.RegisterRoutes(r) - httpPortStr := ":" + strconv.Itoa(zipkinPort) - logger.Info("Listening for Zipkin HTTP traffic", zap.Int("zipkin.http-port", zipkinPort)) - - if err := http.ListenAndServe(httpPortStr, recoveryHandler(r)); err != nil { - logger.Fatal("Could not launch service", zap.Error(err)) - } - } -} - func startQuery( svc *flags.Service, qOpts *queryApp.QueryOptions, @@ -334,21 +222,6 @@ func startQuery( return server } -func initSamplingStrategyStore( - samplingStrategyStoreFactory *ss.Factory, - metricsFactory metrics.Factory, - logger *zap.Logger, -) strategystore.StrategyStore { - if err := samplingStrategyStoreFactory.Initialize(metricsFactory, logger); err != nil { - logger.Fatal("Failed to init sampling strategy store factory", zap.Error(err)) - } - strategyStore, err := samplingStrategyStoreFactory.CreateStrategyStore() - if err != nil { - logger.Fatal("Failed to create sampling strategy store", zap.Error(err)) - } - return strategyStore -} - func archiveOptions(storageFactory istorage.Factory, logger *zap.Logger) *querysvc.QueryServiceOptions { opts := &querysvc.QueryServiceOptions{} if !opts.InitArchiveStorage(storageFactory, logger) { diff --git a/cmd/collector/app/builder/builder_flags.go b/cmd/collector/app/builder_flags.go similarity index 94% rename from cmd/collector/app/builder/builder_flags.go rename to cmd/collector/app/builder_flags.go index 4a9b5f598b3..ca207f6b37e 100644 --- a/cmd/collector/app/builder/builder_flags.go +++ b/cmd/collector/app/builder_flags.go @@ -13,14 +13,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -package builder +package app import ( "flag" "github.com/spf13/viper" - "github.com/jaegertracing/jaeger/cmd/collector/app" "github.com/jaegertracing/jaeger/cmd/flags" "github.com/jaegertracing/jaeger/pkg/config/tlscfg" "github.com/jaegertracing/jaeger/ports" @@ -74,8 +73,8 @@ type CollectorOptions struct { // AddFlags adds flags for CollectorOptions func AddFlags(flags *flag.FlagSet) { flags.Uint(collectorDynQueueSizeMemory, 0, "(experimental) The max memory size in MiB to use for the dynamic queue.") - flags.Int(collectorQueueSize, app.DefaultQueueSize, "The queue size of the collector") - flags.Int(collectorNumWorkers, app.DefaultNumWorkers, "The number of workers pulling items from the queue") + flags.Int(collectorQueueSize, DefaultQueueSize, "The queue size of the collector") + flags.Int(collectorNumWorkers, DefaultNumWorkers, "The number of workers pulling items from the queue") flags.Int(collectorPort, ports.CollectorTChannel, "The TChannel port for the collector service") flags.Int(collectorHTTPPort, ports.CollectorHTTP, "The HTTP port for the collector service") flags.Int(collectorGRPCPort, ports.CollectorGRPC, "The gRPC port for the collector service") diff --git a/cmd/collector/app/collector.go b/cmd/collector/app/collector.go new file mode 100644 index 00000000000..59ab4372d4b --- /dev/null +++ b/cmd/collector/app/collector.go @@ -0,0 +1,183 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package app + +import ( + "context" + "io" + "net/http" + "time" + + "github.com/uber/jaeger-lib/metrics" + "github.com/uber/tchannel-go" + "go.uber.org/zap" + "google.golang.org/grpc" + + "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore" + "github.com/jaegertracing/jaeger/cmd/collector/app/server" + "github.com/jaegertracing/jaeger/pkg/healthcheck" + "github.com/jaegertracing/jaeger/pkg/recoveryhandler" + "github.com/jaegertracing/jaeger/storage/spanstore" +) + +// Collector returns the collector as a manageable unit of work +type Collector struct { + // required to start a new collector + serviceName string + logger *zap.Logger + metricsFactory metrics.Factory + spanWriter spanstore.Writer + strategyStore strategystore.StrategyStore + hCheck *healthcheck.HealthCheck + + // state, read only + hServer *http.Server + zkServer *http.Server + grpcServer *grpc.Server + tchServer *tchannel.Channel +} + +// CollectorParams to construct a new Jaeger Collector. +type CollectorParams struct { + ServiceName string + Logger *zap.Logger + MetricsFactory metrics.Factory + SpanWriter spanstore.Writer + StrategyStore strategystore.StrategyStore + HealthCheck *healthcheck.HealthCheck +} + +// New constructs a new collector component, ready to be started +func New(params *CollectorParams) *Collector { + return &Collector{ + serviceName: params.ServiceName, + logger: params.Logger, + metricsFactory: params.MetricsFactory, + spanWriter: params.SpanWriter, + strategyStore: params.StrategyStore, + hCheck: params.HealthCheck, + } +} + +// Start the component and underlying dependencies +func (c *Collector) Start(builderOpts *CollectorOptions) error { + handlerBuilder := &SpanHandlerBuilder{ + SpanWriter: c.spanWriter, + CollectorOpts: *builderOpts, + Logger: c.logger, + MetricsFactory: c.metricsFactory, + } + zipkinSpansHandler, jaegerBatchesHandler, grpcHandler := handlerBuilder.BuildHandlers() + recoveryHandler := recoveryhandler.NewRecoveryHandler(c.logger, true) + + if tchServer, err := server.StartThriftServer(&server.ThriftServerParams{ + ServiceName: c.serviceName, + Port: builderOpts.CollectorPort, + JaegerBatchesHandler: jaegerBatchesHandler, + ZipkinSpansHandler: zipkinSpansHandler, + StrategyStore: c.strategyStore, + Logger: c.logger, + }); err != nil { + c.logger.Fatal("Could not start Thrift collector", zap.Error(err)) + } else { + c.tchServer = tchServer + } + + if grpcServer, err := server.StartGRPCServer(&server.GRPCServerParams{ + Port: builderOpts.CollectorGRPCPort, + Handler: grpcHandler, + TLSConfig: builderOpts.TLS, + SamplingStore: c.strategyStore, + Logger: c.logger, + }); err != nil { + c.logger.Fatal("Could not start gRPC collector", zap.Error(err)) + } else { + c.grpcServer = grpcServer + } + + if httpServer, err := server.StartHTTPServer(&server.HTTPServerParams{ + Port: builderOpts.CollectorHTTPPort, + Handler: jaegerBatchesHandler, + RecoveryHandler: recoveryHandler, + HealthCheck: c.hCheck, + MetricsFactory: c.metricsFactory, + SamplingStore: c.strategyStore, + Logger: c.logger, + }); err != nil { + c.logger.Fatal("Could not start the HTTP server", zap.Error(err)) + } else { + c.hServer = httpServer + } + + if zkServer, err := server.StartZipkinServer(&server.ZipkinServerParams{ + Port: builderOpts.CollectorZipkinHTTPPort, + Handler: zipkinSpansHandler, + RecoveryHandler: recoveryHandler, + AllowedHeaders: builderOpts.CollectorZipkinAllowedHeaders, + AllowedOrigins: builderOpts.CollectorZipkinAllowedOrigins, + Logger: c.logger, + }); err != nil { + c.logger.Fatal("Could not start the Zipkin server", zap.Error(err)) + } else { + c.zkServer = zkServer + } + + return nil +} + +// Close the component and all its underlying dependencies +func (c *Collector) Close() error { + // gRPC server + if c.grpcServer != nil { + c.grpcServer.GracefulStop() + } + + // TChannel server + if c.tchServer != nil { + c.tchServer.Close() + } + + // HTTP server + if c.hServer != nil { + timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) + err := c.hServer.Shutdown(timeout) + if err != nil { + c.logger.Error("Failed to stop the main HTTP server", zap.Error(err)) + } + defer cancel() + } + + // Zipkin server + if c.zkServer != nil { + timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) + err := c.zkServer.Shutdown(timeout) + if err != nil { + c.logger.Error("Failed to stop the Zipkin server", zap.Error(err)) + } + defer cancel() + } + + // by now, we shouldn't have any in-flight requests anymore + if c.spanWriter != nil { + if closer, ok := c.spanWriter.(io.Closer); ok { + err := closer.Close() // SpanWriter + if err != nil { + c.logger.Error("Failed to close span writer", zap.Error(err)) + } + } + } + + return nil +} diff --git a/cmd/collector/app/collector_test.go b/cmd/collector/app/collector_test.go new file mode 100644 index 00000000000..d9c1767e530 --- /dev/null +++ b/cmd/collector/app/collector_test.go @@ -0,0 +1,62 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package app + +import ( + "io" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/uber/jaeger-lib/metrics/metricstest" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/pkg/healthcheck" + "github.com/jaegertracing/jaeger/thrift-gen/sampling" +) + +var _ (io.Closer) = (*Collector)(nil) + +func TestNewCollector(t *testing.T) { + // prepare + hc := healthcheck.New() + logger := zap.NewNop() + baseMetrics := metricstest.NewFactory(time.Hour) + spanWriter := &fakeSpanWriter{} + strategyStore := &mockStrategyStore{} + + c := New(&CollectorParams{ + ServiceName: "collector", + Logger: logger, + MetricsFactory: baseMetrics, + SpanWriter: spanWriter, + StrategyStore: strategyStore, + HealthCheck: hc, + }) + collectorOpts := &CollectorOptions{} + + // test + c.Start(collectorOpts) + + // verify + assert.NoError(t, c.Close()) +} + +type mockStrategyStore struct { +} + +func (m *mockStrategyStore) GetSamplingStrategy(serviceName string) (*sampling.SamplingStrategyResponse, error) { + return &sampling.SamplingStrategyResponse{}, nil +} diff --git a/cmd/collector/app/grpcserver/grpc_server.go b/cmd/collector/app/grpcserver/grpc_server.go deleted file mode 100644 index 745af9f49bd..00000000000 --- a/cmd/collector/app/grpcserver/grpc_server.go +++ /dev/null @@ -1,71 +0,0 @@ -// Copyright (c) 2018 The Jaeger Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package grpcserver - -import ( - "io/ioutil" - "net" - "os" - "strconv" - - "github.com/pkg/errors" - "go.uber.org/zap" - "google.golang.org/grpc" - "google.golang.org/grpc/grpclog" - - "github.com/jaegertracing/jaeger/cmd/collector/app" - "github.com/jaegertracing/jaeger/cmd/collector/app/sampling" - "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore" - "github.com/jaegertracing/jaeger/proto-gen/api_v2" -) - -// StartGRPCCollector configures and starts gRPC endpoints exposed by collector. -func StartGRPCCollector( - port int, - server *grpc.Server, - handler *app.GRPCHandler, - samplingStrategy strategystore.StrategyStore, - logger *zap.Logger, - serveErr func(error), -) (net.Addr, error) { - grpcPortStr := ":" + strconv.Itoa(port) - lis, err := net.Listen("tcp", grpcPortStr) - if err != nil { - return nil, errors.Wrap(err, "failed to listen on gRPC port") - } - - grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr)) - - api_v2.RegisterCollectorServiceServer(server, handler) - api_v2.RegisterSamplingManagerServer(server, sampling.NewGRPCHandler(samplingStrategy)) - startServer(server, lis, logger, serveErr) - return lis.Addr(), nil -} - -func startServer(server *grpc.Server, lis net.Listener, logger *zap.Logger, serveErr func(error)) { - var port string - if tcpAddr, ok := lis.Addr().(*net.TCPAddr); ok { - port = strconv.Itoa(tcpAddr.Port) - } else { - port = lis.Addr().Network() - } - logger.Info("Starting jaeger-collector gRPC server", zap.String("grpc-port", port)) - go func() { - if err := server.Serve(lis); err != nil { - logger.Error("Could not launch gRPC service", zap.Error(err)) - serveErr(err) - } - }() -} diff --git a/cmd/collector/app/grpc_handler.go b/cmd/collector/app/handler/grpc_handler.go similarity index 78% rename from cmd/collector/app/grpc_handler.go rename to cmd/collector/app/handler/grpc_handler.go index dc989cb270d..351fce5a255 100644 --- a/cmd/collector/app/grpc_handler.go +++ b/cmd/collector/app/handler/grpc_handler.go @@ -12,24 +12,25 @@ // See the License for the specific language governing permissions and // limitations under the License. -package app +package handler import ( "context" "go.uber.org/zap" + "github.com/jaegertracing/jaeger/cmd/collector/app/processor" "github.com/jaegertracing/jaeger/proto-gen/api_v2" ) // GRPCHandler implements gRPC CollectorService. type GRPCHandler struct { logger *zap.Logger - spanProcessor SpanProcessor + spanProcessor processor.SpanProcessor } // NewGRPCHandler registers routes for this handler on the given router. -func NewGRPCHandler(logger *zap.Logger, spanProcessor SpanProcessor) *GRPCHandler { +func NewGRPCHandler(logger *zap.Logger, spanProcessor processor.SpanProcessor) *GRPCHandler { return &GRPCHandler{ logger: logger, spanProcessor: spanProcessor, @@ -43,9 +44,9 @@ func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) span.Process = r.Batch.Process } } - _, err := g.spanProcessor.ProcessSpans(r.GetBatch().Spans, ProcessSpansOptions{ - InboundTransport: GRPCTransport, - SpanFormat: ProtoSpanFormat, + _, err := g.spanProcessor.ProcessSpans(r.GetBatch().Spans, processor.SpansOptions{ + InboundTransport: processor.GRPCTransport, + SpanFormat: processor.ProtoSpanFormat, }) if err != nil { g.logger.Error("cannot process spans", zap.Error(err)) diff --git a/cmd/collector/app/grpc_handler_test.go b/cmd/collector/app/handler/grpc_handler_test.go similarity index 96% rename from cmd/collector/app/grpc_handler_test.go rename to cmd/collector/app/handler/grpc_handler_test.go index 329a83a05e4..f969fbc4b3b 100644 --- a/cmd/collector/app/grpc_handler_test.go +++ b/cmd/collector/app/handler/grpc_handler_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package app +package handler import ( "context" @@ -26,6 +26,7 @@ import ( "go.uber.org/zap" "google.golang.org/grpc" + "github.com/jaegertracing/jaeger/cmd/collector/app/processor" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/proto-gen/api_v2" ) @@ -36,7 +37,7 @@ type mockSpanProcessor struct { spans []*model.Span } -func (p *mockSpanProcessor) ProcessSpans(spans []*model.Span, opts ProcessSpansOptions) ([]bool, error) { +func (p *mockSpanProcessor) ProcessSpans(spans []*model.Span, opts processor.SpansOptions) ([]bool, error) { p.mux.Lock() defer p.mux.Unlock() p.spans = append(p.spans, spans...) diff --git a/cmd/collector/app/http_handler.go b/cmd/collector/app/handler/http_handler.go similarity index 95% rename from cmd/collector/app/http_handler.go rename to cmd/collector/app/handler/http_handler.go index 1869a4c7816..a243bd81057 100644 --- a/cmd/collector/app/http_handler.go +++ b/cmd/collector/app/handler/http_handler.go @@ -13,7 +13,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package app +package handler import ( "fmt" @@ -24,6 +24,7 @@ import ( "github.com/apache/thrift/lib/go/thrift" "github.com/gorilla/mux" + "github.com/jaegertracing/jaeger/cmd/collector/app/processor" tJaeger "github.com/jaegertracing/jaeger/thrift-gen/jaeger" ) @@ -87,7 +88,7 @@ func (aH *APIHandler) SaveSpan(w http.ResponseWriter, r *http.Request) { return } batches := []*tJaeger.Batch{batch} - opts := SubmitBatchOptions{InboundTransport: HTTPTransport} + opts := SubmitBatchOptions{InboundTransport: processor.HTTPTransport} if _, err = aH.jaegerBatchesHandler.SubmitBatches(batches, opts); err != nil { http.Error(w, fmt.Sprintf("Cannot submit Jaeger batch: %v", err), http.StatusInternalServerError) return diff --git a/cmd/collector/app/http_handler_test.go b/cmd/collector/app/handler/http_handler_test.go similarity index 99% rename from cmd/collector/app/http_handler_test.go rename to cmd/collector/app/handler/http_handler_test.go index 82106012e90..d4939f04ed2 100644 --- a/cmd/collector/app/http_handler_test.go +++ b/cmd/collector/app/handler/http_handler_test.go @@ -13,7 +13,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package app +package handler import ( "bytes" diff --git a/cmd/collector/app/tchannel_handler.go b/cmd/collector/app/handler/tchannel_handler.go similarity index 90% rename from cmd/collector/app/tchannel_handler.go rename to cmd/collector/app/handler/tchannel_handler.go index 80cdd798e77..42c1713f2c6 100644 --- a/cmd/collector/app/tchannel_handler.go +++ b/cmd/collector/app/handler/tchannel_handler.go @@ -12,11 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -package app +package handler import ( "github.com/uber/tchannel-go/thrift" + "github.com/jaegertracing/jaeger/cmd/collector/app/processor" "github.com/jaegertracing/jaeger/thrift-gen/jaeger" "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" ) @@ -44,7 +45,7 @@ func (h *TChannelHandler) SubmitZipkinBatch( spans []*zipkincore.Span, ) ([]*zipkincore.Response, error) { return h.zipkinHandler.SubmitZipkinBatch(spans, SubmitBatchOptions{ - InboundTransport: TChannelTransport, + InboundTransport: processor.TChannelTransport, }) } @@ -54,6 +55,6 @@ func (h *TChannelHandler) SubmitBatches( batches []*jaeger.Batch, ) ([]*jaeger.BatchSubmitResponse, error) { return h.jaegerHandler.SubmitBatches(batches, SubmitBatchOptions{ - InboundTransport: TChannelTransport, + InboundTransport: processor.TChannelTransport, }) } diff --git a/cmd/collector/app/tchannel_handler_test.go b/cmd/collector/app/handler/tchannel_handler_test.go similarity index 98% rename from cmd/collector/app/tchannel_handler_test.go rename to cmd/collector/app/handler/tchannel_handler_test.go index a6e505820c9..fa908ec5b77 100644 --- a/cmd/collector/app/tchannel_handler_test.go +++ b/cmd/collector/app/handler/tchannel_handler_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package app +package handler import ( "testing" diff --git a/cmd/collector/app/thrift_span_handler.go b/cmd/collector/app/handler/thrift_span_handler.go similarity index 85% rename from cmd/collector/app/thrift_span_handler.go rename to cmd/collector/app/handler/thrift_span_handler.go index efd8957a82f..fd232a846f0 100644 --- a/cmd/collector/app/thrift_span_handler.go +++ b/cmd/collector/app/handler/thrift_span_handler.go @@ -13,11 +13,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -package app +package handler import ( "go.uber.org/zap" + "github.com/jaegertracing/jaeger/cmd/collector/app/processor" zipkinS "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer/zipkin" "github.com/jaegertracing/jaeger/model" jConv "github.com/jaegertracing/jaeger/model/converter/thrift/jaeger" @@ -28,7 +29,7 @@ import ( // SubmitBatchOptions are passed to Submit methods of the handlers. type SubmitBatchOptions struct { - InboundTransport InboundTransport + InboundTransport processor.InboundTransport } // ZipkinSpansHandler consumes and handles zipkin spans @@ -45,11 +46,11 @@ type JaegerBatchesHandler interface { type jaegerBatchesHandler struct { logger *zap.Logger - modelProcessor SpanProcessor + modelProcessor processor.SpanProcessor } // NewJaegerSpanHandler returns a JaegerBatchesHandler -func NewJaegerSpanHandler(logger *zap.Logger, modelProcessor SpanProcessor) JaegerBatchesHandler { +func NewJaegerSpanHandler(logger *zap.Logger, modelProcessor processor.SpanProcessor) JaegerBatchesHandler { return &jaegerBatchesHandler{ logger: logger, modelProcessor: modelProcessor, @@ -64,9 +65,9 @@ func (jbh *jaegerBatchesHandler) SubmitBatches(batches []*jaeger.Batch, options mSpan := jConv.ToDomainSpan(span, batch.Process) mSpans = append(mSpans, mSpan) } - oks, err := jbh.modelProcessor.ProcessSpans(mSpans, ProcessSpansOptions{ + oks, err := jbh.modelProcessor.ProcessSpans(mSpans, processor.SpansOptions{ InboundTransport: options.InboundTransport, - SpanFormat: JaegerSpanFormat, + SpanFormat: processor.JaegerSpanFormat, }) if err != nil { jbh.logger.Error("Collector failed to process span batch", zap.Error(err)) @@ -92,11 +93,11 @@ func (jbh *jaegerBatchesHandler) SubmitBatches(batches []*jaeger.Batch, options type zipkinSpanHandler struct { logger *zap.Logger sanitizer zipkinS.Sanitizer - modelProcessor SpanProcessor + modelProcessor processor.SpanProcessor } // NewZipkinSpanHandler returns a ZipkinSpansHandler -func NewZipkinSpanHandler(logger *zap.Logger, modelHandler SpanProcessor, sanitizer zipkinS.Sanitizer) ZipkinSpansHandler { +func NewZipkinSpanHandler(logger *zap.Logger, modelHandler processor.SpanProcessor, sanitizer zipkinS.Sanitizer) ZipkinSpansHandler { return &zipkinSpanHandler{ logger: logger, modelProcessor: modelHandler, @@ -111,9 +112,9 @@ func (h *zipkinSpanHandler) SubmitZipkinBatch(spans []*zipkincore.Span, options sanitized := h.sanitizer.Sanitize(span) mSpans = append(mSpans, convertZipkinToModel(sanitized, h.logger)...) } - bools, err := h.modelProcessor.ProcessSpans(mSpans, ProcessSpansOptions{ + bools, err := h.modelProcessor.ProcessSpans(mSpans, processor.SpansOptions{ InboundTransport: options.InboundTransport, - SpanFormat: ZipkinSpanFormat, + SpanFormat: processor.ZipkinSpanFormat, }) if err != nil { h.logger.Error("Collector failed to process Zipkin span batch", zap.Error(err)) diff --git a/cmd/collector/app/thrift_span_handler_test.go b/cmd/collector/app/handler/thrift_span_handler_test.go similarity index 95% rename from cmd/collector/app/thrift_span_handler_test.go rename to cmd/collector/app/handler/thrift_span_handler_test.go index d520cd7bd95..150645ba18f 100644 --- a/cmd/collector/app/thrift_span_handler_test.go +++ b/cmd/collector/app/handler/thrift_span_handler_test.go @@ -13,7 +13,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package app +package handler import ( "errors" @@ -22,6 +22,7 @@ import ( "github.com/stretchr/testify/assert" "go.uber.org/zap" + "github.com/jaegertracing/jaeger/cmd/collector/app/processor" "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer/zipkin" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/thrift-gen/jaeger" @@ -65,7 +66,7 @@ type shouldIErrorProcessor struct { var errTestError = errors.New("Whoops") -func (s *shouldIErrorProcessor) ProcessSpans(mSpans []*model.Span, _ ProcessSpansOptions) ([]bool, error) { +func (s *shouldIErrorProcessor) ProcessSpans(mSpans []*model.Span, _ processor.SpansOptions) ([]bool, error) { if s.shouldError { return nil, errTestError } diff --git a/cmd/collector/app/metrics.go b/cmd/collector/app/metrics.go index f4eb8722099..9f68fd83dbe 100644 --- a/cmd/collector/app/metrics.go +++ b/cmd/collector/app/metrics.go @@ -21,7 +21,9 @@ import ( "github.com/uber/jaeger-lib/metrics" + "github.com/jaegertracing/jaeger/cmd/collector/app/processor" "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/pkg/normalizer" ) const ( @@ -96,39 +98,11 @@ type metricsBySvc struct { traces traceCountsBySvc // number of traces originated per service } -// InboundTransport identifies the transport used to receive spans. -type InboundTransport string - -const ( - // GRPCTransport indicates spans received over gRPC. - GRPCTransport InboundTransport = "grpc" - // TChannelTransport indicates spans received over TChannel. - TChannelTransport InboundTransport = "tchannel" - // HTTPTransport indicates spans received over HTTP. - HTTPTransport InboundTransport = "http" - // UnknownTransport is the fallback/catch-all category. - UnknownTransport InboundTransport = "unknown" -) - -// SpanFormat identifies the data format in which the span was originally received. -type SpanFormat string - -const ( - // JaegerSpanFormat is for Jaeger Thrift spans. - JaegerSpanFormat SpanFormat = "jaeger" - // ZipkinSpanFormat is for Zipkin Thrift spans. - ZipkinSpanFormat SpanFormat = "zipkin" - // ProtoSpanFormat is for Jaeger protobuf Spans. - ProtoSpanFormat SpanFormat = "proto" - // UnknownSpanFormat is the fallback/catch-all category. - UnknownSpanFormat SpanFormat = "unknown" -) - // SpanCountsByFormat groups metrics by different span formats (thrift, proto, etc.) -type SpanCountsByFormat map[SpanFormat]SpanCountsByTransport +type SpanCountsByFormat map[processor.SpanFormat]SpanCountsByTransport // SpanCountsByTransport groups metrics by inbound transport (e.g http, grpc, tchannel) -type SpanCountsByTransport map[InboundTransport]SpanCounts +type SpanCountsByTransport map[processor.InboundTransport]SpanCounts // SpanCounts contains counts for received and rejected spans. type SpanCounts struct { @@ -139,12 +113,12 @@ type SpanCounts struct { } // NewSpanProcessorMetrics returns a SpanProcessorMetrics -func NewSpanProcessorMetrics(serviceMetrics metrics.Factory, hostMetrics metrics.Factory, otherFormatTypes []SpanFormat) *SpanProcessorMetrics { +func NewSpanProcessorMetrics(serviceMetrics metrics.Factory, hostMetrics metrics.Factory, otherFormatTypes []processor.SpanFormat) *SpanProcessorMetrics { spanCounts := SpanCountsByFormat{ - ZipkinSpanFormat: newCountsByTransport(serviceMetrics, ZipkinSpanFormat), - JaegerSpanFormat: newCountsByTransport(serviceMetrics, JaegerSpanFormat), - ProtoSpanFormat: newCountsByTransport(serviceMetrics, ProtoSpanFormat), - UnknownSpanFormat: newCountsByTransport(serviceMetrics, UnknownSpanFormat), + processor.ZipkinSpanFormat: newCountsByTransport(serviceMetrics, processor.ZipkinSpanFormat), + processor.JaegerSpanFormat: newCountsByTransport(serviceMetrics, processor.JaegerSpanFormat), + processor.ProtoSpanFormat: newCountsByTransport(serviceMetrics, processor.ProtoSpanFormat), + processor.UnknownSpanFormat: newCountsByTransport(serviceMetrics, processor.UnknownSpanFormat), } for _, otherFormatType := range otherFormatTypes { spanCounts[otherFormatType] = newCountsByTransport(serviceMetrics, otherFormatType) @@ -217,17 +191,17 @@ func newSpanCountsBySvc(factory metrics.Factory, category string, maxServiceName } } -func newCountsByTransport(factory metrics.Factory, format SpanFormat) SpanCountsByTransport { +func newCountsByTransport(factory metrics.Factory, format processor.SpanFormat) SpanCountsByTransport { factory = factory.Namespace(metrics.NSOptions{Tags: map[string]string{"format": string(format)}}) return SpanCountsByTransport{ - HTTPTransport: newCounts(factory, HTTPTransport), - TChannelTransport: newCounts(factory, TChannelTransport), - GRPCTransport: newCounts(factory, GRPCTransport), - UnknownTransport: newCounts(factory, UnknownTransport), + processor.HTTPTransport: newCounts(factory, processor.HTTPTransport), + processor.TChannelTransport: newCounts(factory, processor.TChannelTransport), + processor.GRPCTransport: newCounts(factory, processor.GRPCTransport), + processor.UnknownTransport: newCounts(factory, processor.UnknownTransport), } } -func newCounts(factory metrics.Factory, transport InboundTransport) SpanCounts { +func newCounts(factory metrics.Factory, transport processor.InboundTransport) SpanCounts { factory = factory.Namespace(metrics.NSOptions{Tags: map[string]string{"transport": string(transport)}}) return SpanCounts{ RejectedBySvc: newMetricsBySvc(factory, "rejected"), @@ -236,14 +210,14 @@ func newCounts(factory metrics.Factory, transport InboundTransport) SpanCounts { } // GetCountsForFormat gets the SpanCounts for a given format and transport. If none exists, we use the Unknown format. -func (m *SpanProcessorMetrics) GetCountsForFormat(spanFormat SpanFormat, transport InboundTransport) SpanCounts { +func (m *SpanProcessorMetrics) GetCountsForFormat(spanFormat processor.SpanFormat, transport processor.InboundTransport) SpanCounts { c, ok := m.spanCounts[spanFormat] if !ok { - c = m.spanCounts[UnknownSpanFormat] + c = m.spanCounts[processor.UnknownSpanFormat] } t, ok := c[transport] if !ok { - t = c[UnknownTransport] + t = c[processor.UnknownTransport] } return t } @@ -288,7 +262,7 @@ func (m metricsBySvc) countTracesByServiceName(serviceName string, isDebug bool, // an alert should be raised to investigate what's causing so many unique // service names. func (m *traceCountsBySvc) countByServiceName(serviceName string, isDebug bool, samplerType string) { - serviceName = NormalizeServiceName(serviceName) + serviceName = normalizer.ServiceName(serviceName) counts := m.counts if isDebug { counts = m.debugCounts @@ -341,7 +315,7 @@ func (m *traceCountsBySvc) countByServiceName(serviceName string, isDebug bool, // an alert should be raised to investigate what's causing so many unique // service names. func (m *spanCountsBySvc) countByServiceName(serviceName string, isDebug bool) { - serviceName = NormalizeServiceName(serviceName) + serviceName = normalizer.ServiceName(serviceName) counts := m.counts if isDebug { counts = m.debugCounts diff --git a/cmd/collector/app/metrics_test.go b/cmd/collector/app/metrics_test.go index 198bf4fb4d0..3e8b9486354 100644 --- a/cmd/collector/app/metrics_test.go +++ b/cmd/collector/app/metrics_test.go @@ -23,6 +23,7 @@ import ( jaegerM "github.com/uber/jaeger-lib/metrics" "github.com/uber/jaeger-lib/metrics/metricstest" + "github.com/jaegertracing/jaeger/cmd/collector/app/processor" "github.com/jaegertracing/jaeger/model" ) @@ -30,13 +31,13 @@ func TestProcessorMetrics(t *testing.T) { baseMetrics := metricstest.NewFactory(time.Hour) serviceMetrics := baseMetrics.Namespace(jaegerM.NSOptions{Name: "service", Tags: nil}) hostMetrics := baseMetrics.Namespace(jaegerM.NSOptions{Name: "host", Tags: nil}) - spm := NewSpanProcessorMetrics(serviceMetrics, hostMetrics, []SpanFormat{SpanFormat("scruffy")}) - benderFormatHTTPMetrics := spm.GetCountsForFormat("bender", HTTPTransport) + spm := NewSpanProcessorMetrics(serviceMetrics, hostMetrics, []processor.SpanFormat{processor.SpanFormat("scruffy")}) + benderFormatHTTPMetrics := spm.GetCountsForFormat("bender", processor.HTTPTransport) assert.NotNil(t, benderFormatHTTPMetrics) - benderFormatGRPCMetrics := spm.GetCountsForFormat("bender", GRPCTransport) + benderFormatGRPCMetrics := spm.GetCountsForFormat("bender", processor.GRPCTransport) assert.NotNil(t, benderFormatGRPCMetrics) - jTChannelFormat := spm.GetCountsForFormat(JaegerSpanFormat, TChannelTransport) + jTChannelFormat := spm.GetCountsForFormat(processor.JaegerSpanFormat, processor.TChannelTransport) assert.NotNil(t, jTChannelFormat) jTChannelFormat.ReceivedBySvc.ReportServiceNameForSpan(&model.Span{ Process: &model.Process{}, diff --git a/cmd/collector/app/options.go b/cmd/collector/app/options.go index a24957a2fd2..46a73271ebe 100644 --- a/cmd/collector/app/options.go +++ b/cmd/collector/app/options.go @@ -19,6 +19,7 @@ import ( "github.com/uber/jaeger-lib/metrics" "go.uber.org/zap" + "github.com/jaegertracing/jaeger/cmd/collector/app/processor" "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer" "github.com/jaegertracing/jaeger/model" ) @@ -44,7 +45,7 @@ type options struct { dynQueueSizeWarmup uint dynQueueSizeMemory uint reportBusy bool - extraFormatTypes []SpanFormat + extraFormatTypes []processor.SpanFormat collectorTags map[string]string } @@ -146,7 +147,7 @@ func (options) ReportBusy(reportBusy bool) Option { } // ExtraFormatTypes creates an Option that initializes the extra list of format types -func (options) ExtraFormatTypes(extraFormatTypes []SpanFormat) Option { +func (options) ExtraFormatTypes(extraFormatTypes []processor.SpanFormat) Option { return func(b *options) { b.extraFormatTypes = extraFormatTypes } diff --git a/cmd/collector/app/options_test.go b/cmd/collector/app/options_test.go index 4e0341175f3..78d32fc538e 100644 --- a/cmd/collector/app/options_test.go +++ b/cmd/collector/app/options_test.go @@ -22,11 +22,12 @@ import ( "github.com/uber/jaeger-lib/metrics" "go.uber.org/zap" + "github.com/jaegertracing/jaeger/cmd/collector/app/processor" "github.com/jaegertracing/jaeger/model" ) func TestAllOptionSet(t *testing.T) { - types := []SpanFormat{SpanFormat("sneh")} + types := []processor.SpanFormat{processor.SpanFormat("sneh")} opts := Options.apply( Options.ReportBusy(true), Options.BlockingSubmit(true), diff --git a/cmd/collector/app/processor/empty_test.go b/cmd/collector/app/processor/empty_test.go new file mode 100644 index 00000000000..270cab51e63 --- /dev/null +++ b/cmd/collector/app/processor/empty_test.go @@ -0,0 +1,15 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processor diff --git a/cmd/collector/app/processor/span.go b/cmd/collector/app/processor/span.go new file mode 100644 index 00000000000..65ca34da22d --- /dev/null +++ b/cmd/collector/app/processor/span.go @@ -0,0 +1,59 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processor + +import ( + "github.com/jaegertracing/jaeger/model" +) + +// SpansOptions additional options passed to processor along with the spans. +type SpansOptions struct { + SpanFormat SpanFormat + InboundTransport InboundTransport +} + +// SpanProcessor handles model spans +type SpanProcessor interface { + // ProcessSpans processes model spans and return with either a list of true/false success or an error + ProcessSpans(mSpans []*model.Span, options SpansOptions) ([]bool, error) +} + +// InboundTransport identifies the transport used to receive spans. +type InboundTransport string + +const ( + // GRPCTransport indicates spans received over gRPC. + GRPCTransport InboundTransport = "grpc" + // TChannelTransport indicates spans received over TChannel. + TChannelTransport InboundTransport = "tchannel" + // HTTPTransport indicates spans received over HTTP. + HTTPTransport InboundTransport = "http" + // UnknownTransport is the fallback/catch-all category. + UnknownTransport InboundTransport = "unknown" +) + +// SpanFormat identifies the data format in which the span was originally received. +type SpanFormat string + +const ( + // JaegerSpanFormat is for Jaeger Thrift spans. + JaegerSpanFormat SpanFormat = "jaeger" + // ZipkinSpanFormat is for Zipkin Thrift spans. + ZipkinSpanFormat SpanFormat = "zipkin" + // ProtoSpanFormat is for Jaeger protobuf Spans. + ProtoSpanFormat SpanFormat = "proto" + // UnknownSpanFormat is the fallback/catch-all category. + UnknownSpanFormat SpanFormat = "unknown" +) diff --git a/cmd/collector/app/server/grpc.go b/cmd/collector/app/server/grpc.go new file mode 100644 index 00000000000..bbba6664074 --- /dev/null +++ b/cmd/collector/app/server/grpc.go @@ -0,0 +1,93 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "io/ioutil" + "net" + "os" + "strconv" + + "github.com/pkg/errors" + "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/grpclog" + + "github.com/jaegertracing/jaeger/cmd/collector/app/handler" + "github.com/jaegertracing/jaeger/cmd/collector/app/sampling" + "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore" + "github.com/jaegertracing/jaeger/pkg/config/tlscfg" + "github.com/jaegertracing/jaeger/proto-gen/api_v2" +) + +// GRPCServerParams to construct a new Jaeger Collector gRPC Server +type GRPCServerParams struct { + TLSConfig tlscfg.Options + Port int + Handler *handler.GRPCHandler + SamplingStore strategystore.StrategyStore + Logger *zap.Logger + OnError func(error) +} + +// StartGRPCServer based on the given parameters +func StartGRPCServer(params *GRPCServerParams) (*grpc.Server, error) { + var server *grpc.Server + grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr)) + + if params.TLSConfig.Enabled { + // user requested a server with TLS, setup creds + tlsCfg, err := params.TLSConfig.Config() + if err != nil { + return nil, err + } + + creds := credentials.NewTLS(tlsCfg) + server = grpc.NewServer(grpc.Creds(creds)) + } else { + // server without TLS + server = grpc.NewServer() + } + + grpcPortStr := ":" + strconv.Itoa(params.Port) + listener, err := net.Listen("tcp", grpcPortStr) + if err != nil { + return nil, errors.Wrap(err, "failed to listen on gRPC port") + } + + if err := serveGRPC(server, listener, params); err != nil { + return nil, err + } + + return server, nil +} + +func serveGRPC(server *grpc.Server, listener net.Listener, params *GRPCServerParams) error { + api_v2.RegisterCollectorServiceServer(server, params.Handler) + api_v2.RegisterSamplingManagerServer(server, sampling.NewGRPCHandler(params.SamplingStore)) + + params.Logger.Info("Starting jaeger-collector gRPC server", zap.Int("grpc-port", params.Port)) + go func(server *grpc.Server) { + if err := server.Serve(listener); err != nil { + params.Logger.Error("Could not launch gRPC service", zap.Error(err)) + if params.OnError != nil { + params.OnError(err) + } + } + }(server) + + return nil +} diff --git a/cmd/collector/app/grpcserver/grpc_server_test.go b/cmd/collector/app/server/grpc_test.go similarity index 55% rename from cmd/collector/app/grpcserver/grpc_server_test.go rename to cmd/collector/app/server/grpc_test.go index 0e7db20a4a1..4629452896e 100644 --- a/cmd/collector/app/grpcserver/grpc_server_test.go +++ b/cmd/collector/app/server/grpc_test.go @@ -1,4 +1,4 @@ -// Copyright (c) 2018 The Jaeger Authors. +// Copyright (c) 2020 The Jaeger Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,10 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -package grpcserver +package server import ( "context" + "net" "sync" "testing" @@ -27,21 +28,20 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/test/bufconn" - "github.com/jaegertracing/jaeger/cmd/collector/app" - "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/cmd/collector/app/handler" "github.com/jaegertracing/jaeger/proto-gen/api_v2" - "github.com/jaegertracing/jaeger/thrift-gen/sampling" ) // test wrong port number func TestFailToListen(t *testing.T) { - l, _ := zap.NewDevelopment() - handler := app.NewGRPCHandler(l, &mockSpanProcessor{}) - server := grpc.NewServer() - const invalidPort = -1 - addr, err := StartGRPCCollector(invalidPort, server, handler, &mockSamplingStore{}, l, func(e error) { + logger, _ := zap.NewDevelopment() + server, err := StartGRPCServer(&GRPCServerParams{ + Port: -1, + Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}), + SamplingStore: &mockSamplingStore{}, + Logger: logger, }) - assert.Nil(t, addr) + assert.Nil(t, server) assert.EqualError(t, err, "failed to listen on gRPC port: listen tcp: address -1: invalid port") } @@ -51,42 +51,44 @@ func TestFailServe(t *testing.T) { core, logs := observer.New(zap.NewAtomicLevelAt(zapcore.ErrorLevel)) var wg sync.WaitGroup wg.Add(1) - startServer(grpc.NewServer(), lis, zap.New(core), func(e error) { - assert.Equal(t, 1, len(logs.All())) - assert.Equal(t, "Could not launch gRPC service", logs.All()[0].Message) - wg.Done() + + logger := zap.New(core) + serveGRPC(grpc.NewServer(), lis, &GRPCServerParams{ + Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}), + SamplingStore: &mockSamplingStore{}, + Logger: logger, + OnError: func(e error) { + assert.Equal(t, 1, len(logs.All())) + assert.Equal(t, "Could not launch gRPC service", logs.All()[0].Message) + wg.Done() + }, }) wg.Wait() } func TestSpanCollector(t *testing.T) { - l, _ := zap.NewDevelopment() - handler := app.NewGRPCHandler(l, &mockSpanProcessor{}) + logger, _ := zap.NewDevelopment() + params := &GRPCServerParams{ + Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}), + SamplingStore: &mockSamplingStore{}, + Logger: logger, + } + server := grpc.NewServer() - addr, err := StartGRPCCollector(0, server, handler, &mockSamplingStore{}, l, func(e error) { - }) + defer server.Stop() + + listener, err := net.Listen("tcp", ":0") require.NoError(t, err) + defer listener.Close() - conn, err := grpc.Dial(addr.String(), grpc.WithInsecure()) - //lint:ignore SA5001 don't care about errors - defer conn.Close() - defer server.Stop() + serveGRPC(server, listener, params) + + conn, err := grpc.Dial(listener.Addr().String(), grpc.WithInsecure()) require.NoError(t, err) + defer conn.Close() + c := api_v2.NewCollectorServiceClient(conn) response, err := c.PostSpans(context.Background(), &api_v2.PostSpansRequest{}) require.NoError(t, err) require.NotNil(t, response) } - -type mockSamplingStore struct{} - -func (s mockSamplingStore) GetSamplingStrategy(serviceName string) (*sampling.SamplingStrategyResponse, error) { - return nil, nil -} - -type mockSpanProcessor struct { -} - -func (p *mockSpanProcessor) ProcessSpans(spans []*model.Span, _ app.ProcessSpansOptions) ([]bool, error) { - return []bool{}, nil -} diff --git a/cmd/collector/app/server/http.go b/cmd/collector/app/server/http.go new file mode 100644 index 00000000000..4748f6c6b44 --- /dev/null +++ b/cmd/collector/app/server/http.go @@ -0,0 +1,84 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "net" + "net/http" + "strconv" + + "github.com/gorilla/mux" + "github.com/uber/jaeger-lib/metrics" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/cmd/collector/app/handler" + "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore" + clientcfgHandler "github.com/jaegertracing/jaeger/pkg/clientcfg/clientcfghttp" + "github.com/jaegertracing/jaeger/pkg/healthcheck" +) + +// HTTPServerParams to construct a new Jaeger Collector HTTP Server +type HTTPServerParams struct { + Port int + Handler handler.JaegerBatchesHandler + RecoveryHandler func(http.Handler) http.Handler + SamplingStore strategystore.StrategyStore + MetricsFactory metrics.Factory + HealthCheck *healthcheck.HealthCheck + Logger *zap.Logger +} + +// StartHTTPServer based on the given parameters +func StartHTTPServer(params *HTTPServerParams) (*http.Server, error) { + httpPortStr := ":" + strconv.Itoa(params.Port) + params.Logger.Info("Starting jaeger-collector HTTP server", zap.String("http-host-port", httpPortStr)) + + listener, err := net.Listen("tcp", httpPortStr) + if err != nil { + return nil, err + } + + server := &http.Server{Addr: httpPortStr} + serveHTTP(server, listener, params) + + return server, nil +} + +func serveHTTP(server *http.Server, listener net.Listener, params *HTTPServerParams) { + r := mux.NewRouter() + apiHandler := handler.NewAPIHandler(params.Handler) + apiHandler.RegisterRoutes(r) + + cfgHandler := clientcfgHandler.NewHTTPHandler(clientcfgHandler.HTTPHandlerParams{ + ConfigManager: &clientcfgHandler.ConfigManager{ + SamplingStrategyStore: params.SamplingStore, + // TODO provide baggage manager + }, + MetricsFactory: params.MetricsFactory, + BasePath: "/api", + LegacySamplingEndpoint: false, + }) + cfgHandler.RegisterRoutes(r) + + server.Handler = params.RecoveryHandler(r) + go func(listener net.Listener, hServer *http.Server) { + if err := hServer.Serve(listener); err != nil { + if err != http.ErrServerClosed { + params.Logger.Fatal("Could not start HTTP collector", zap.Error(err)) + } + } + params.HealthCheck.Set(healthcheck.Unavailable) + }(listener, server) +} diff --git a/cmd/collector/app/server/test.go b/cmd/collector/app/server/test.go new file mode 100644 index 00000000000..7a4bf7eaf47 --- /dev/null +++ b/cmd/collector/app/server/test.go @@ -0,0 +1,34 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "github.com/jaegertracing/jaeger/cmd/collector/app/processor" + "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/thrift-gen/sampling" +) + +type mockSamplingStore struct{} + +func (s mockSamplingStore) GetSamplingStrategy(serviceName string) (*sampling.SamplingStrategyResponse, error) { + return nil, nil +} + +type mockSpanProcessor struct { +} + +func (p *mockSpanProcessor) ProcessSpans(spans []*model.Span, _ processor.SpansOptions) ([]bool, error) { + return []bool{}, nil +} diff --git a/cmd/collector/app/server/thrift.go b/cmd/collector/app/server/thrift.go new file mode 100644 index 00000000000..c0051c43ce6 --- /dev/null +++ b/cmd/collector/app/server/thrift.go @@ -0,0 +1,80 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "net" + "strconv" + + "github.com/uber/tchannel-go" + "github.com/uber/tchannel-go/thrift" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/cmd/collector/app/handler" + "github.com/jaegertracing/jaeger/cmd/collector/app/sampling" + "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore" + jc "github.com/jaegertracing/jaeger/thrift-gen/jaeger" + sc "github.com/jaegertracing/jaeger/thrift-gen/sampling" + zc "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" +) + +// ThriftServerParams to construct a new Jaeger Collector Thrift Server +type ThriftServerParams struct { + JaegerBatchesHandler handler.JaegerBatchesHandler + ZipkinSpansHandler handler.ZipkinSpansHandler + StrategyStore strategystore.StrategyStore + ServiceName string + Port int + Logger *zap.Logger +} + +// StartThriftServer based on the given parameters +func StartThriftServer(params *ThriftServerParams) (*tchannel.Channel, error) { + portStr := ":" + strconv.Itoa(params.Port) + listener, err := net.Listen("tcp", portStr) + if err != nil { + params.Logger.Fatal("Unable to start listening on channel", zap.Error(err)) + return nil, err + } + + var tchServer *tchannel.Channel + if tchServer, err = tchannel.NewChannel(params.ServiceName, &tchannel.ChannelOptions{}); err != nil { + params.Logger.Fatal("Unable to create new TChannel", zap.Error(err)) + return nil, err + } + + if err := serveThrift(tchServer, listener, params); err != nil { + return nil, err + } + + return tchServer, nil +} + +func serveThrift(tchServer *tchannel.Channel, listener net.Listener, params *ThriftServerParams) error { + server := thrift.NewServer(tchServer) + batchHandler := handler.NewTChannelHandler(params.JaegerBatchesHandler, params.ZipkinSpansHandler) + server.Register(jc.NewTChanCollectorServer(batchHandler)) + server.Register(zc.NewTChanZipkinCollectorServer(batchHandler)) + server.Register(sc.NewTChanSamplingManagerServer(sampling.NewHandler(params.StrategyStore))) + + params.Logger.Info("Starting jaeger-collector TChannel server", zap.Int("port", params.Port)) + params.Logger.Warn("TChannel has been deprecated and will be removed in a future release") + + if err := tchServer.Serve(listener); err != nil { + return err + } + + return nil +} diff --git a/cmd/collector/app/server/zipkin.go b/cmd/collector/app/server/zipkin.go new file mode 100644 index 00000000000..0a8e116be95 --- /dev/null +++ b/cmd/collector/app/server/zipkin.go @@ -0,0 +1,84 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "net" + "net/http" + "strconv" + "strings" + + "github.com/gorilla/mux" + "github.com/rs/cors" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/cmd/collector/app/handler" + "github.com/jaegertracing/jaeger/cmd/collector/app/zipkin" + "github.com/jaegertracing/jaeger/pkg/healthcheck" +) + +// ZipkinServerParams to construct a new Jaeger Collector Zipkin Server +type ZipkinServerParams struct { + Port int + Handler handler.ZipkinSpansHandler + RecoveryHandler func(http.Handler) http.Handler + AllowedOrigins string + AllowedHeaders string + HealthCheck *healthcheck.HealthCheck + Logger *zap.Logger +} + +// StartZipkinServer based on the given parameters +func StartZipkinServer(params *ZipkinServerParams) (*http.Server, error) { + if params.Port == 0 { + return nil, nil + } + + httpPortStr := ":" + strconv.Itoa(params.Port) + params.Logger.Info("Listening for Zipkin HTTP traffic", zap.Int("zipkin.http-port", params.Port)) + + listener, err := net.Listen("tcp", httpPortStr) + if err != nil { + return nil, err + } + + server := &http.Server{Addr: httpPortStr} + serveZipkin(server, listener, params) + + return server, nil +} + +func serveZipkin(server *http.Server, listener net.Listener, params *ZipkinServerParams) { + r := mux.NewRouter() + zHandler := zipkin.NewAPIHandler(params.Handler) + zHandler.RegisterRoutes(r) + + origins := strings.Split(strings.ReplaceAll(params.AllowedOrigins, " ", ""), ",") + headers := strings.Split(strings.ReplaceAll(params.AllowedHeaders, " ", ""), ",") + + cors := cors.New(cors.Options{ + AllowedOrigins: origins, + AllowedMethods: []string{"POST"}, // Allowing only POST, because that's the only handled one + AllowedHeaders: headers, + }) + + server.Handler = cors.Handler(params.RecoveryHandler(r)) + go func(listener net.Listener, server *http.Server) { + if err := server.Serve(listener); err != nil { + params.Logger.Fatal("Could not launch Zipkin server", zap.Error(err)) + } + params.HealthCheck.Set(healthcheck.Unavailable) + }(listener, server) +} diff --git a/cmd/collector/app/builder/span_handler_builder.go b/cmd/collector/app/span_handler_builder.go similarity index 66% rename from cmd/collector/app/builder/span_handler_builder.go rename to cmd/collector/app/span_handler_builder.go index 9c512b7af63..8c49395ae94 100644 --- a/cmd/collector/app/builder/span_handler_builder.go +++ b/cmd/collector/app/span_handler_builder.go @@ -13,7 +13,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package builder +package app import ( "os" @@ -21,7 +21,7 @@ import ( "github.com/uber/jaeger-lib/metrics" "go.uber.org/zap" - "github.com/jaegertracing/jaeger/cmd/collector/app" + "github.com/jaegertracing/jaeger/cmd/collector/app/handler" zs "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer/zipkin" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/storage/spanstore" @@ -37,30 +37,30 @@ type SpanHandlerBuilder struct { // BuildHandlers builds span handlers (Zipkin, Jaeger) func (b *SpanHandlerBuilder) BuildHandlers() ( - app.ZipkinSpansHandler, - app.JaegerBatchesHandler, - *app.GRPCHandler, + handler.ZipkinSpansHandler, + handler.JaegerBatchesHandler, + *handler.GRPCHandler, ) { hostname, _ := os.Hostname() svcMetrics := b.metricsFactory() hostMetrics := svcMetrics.Namespace(metrics.NSOptions{Tags: map[string]string{"host": hostname}}) - spanProcessor := app.NewSpanProcessor( + spanProcessor := NewSpanProcessor( b.SpanWriter, - app.Options.ServiceMetrics(svcMetrics), - app.Options.HostMetrics(hostMetrics), - app.Options.Logger(b.logger()), - app.Options.SpanFilter(defaultSpanFilter), - app.Options.NumWorkers(b.CollectorOpts.NumWorkers), - app.Options.QueueSize(b.CollectorOpts.QueueSize), - app.Options.CollectorTags(b.CollectorOpts.CollectorTags), - app.Options.DynQueueSizeWarmup(uint(b.CollectorOpts.QueueSize)), // same as queue size for now - app.Options.DynQueueSizeMemory(b.CollectorOpts.DynQueueSizeMemory), + Options.ServiceMetrics(svcMetrics), + Options.HostMetrics(hostMetrics), + Options.Logger(b.logger()), + Options.SpanFilter(defaultSpanFilter), + Options.NumWorkers(b.CollectorOpts.NumWorkers), + Options.QueueSize(b.CollectorOpts.QueueSize), + Options.CollectorTags(b.CollectorOpts.CollectorTags), + Options.DynQueueSizeWarmup(uint(b.CollectorOpts.QueueSize)), // same as queue size for now + Options.DynQueueSizeMemory(b.CollectorOpts.DynQueueSizeMemory), ) - return app.NewZipkinSpanHandler(b.Logger, spanProcessor, zs.NewChainedSanitizer(zs.StandardSanitizers...)), - app.NewJaegerSpanHandler(b.Logger, spanProcessor), - app.NewGRPCHandler(b.Logger, spanProcessor) + return handler.NewZipkinSpanHandler(b.Logger, spanProcessor, zs.NewChainedSanitizer(zs.StandardSanitizers...)), + handler.NewJaegerSpanHandler(b.Logger, spanProcessor), + handler.NewGRPCHandler(b.Logger, spanProcessor) } func defaultSpanFilter(*model.Span) bool { diff --git a/cmd/collector/app/builder/span_handler_builder_test.go b/cmd/collector/app/span_handler_builder_test.go similarity index 99% rename from cmd/collector/app/builder/span_handler_builder_test.go rename to cmd/collector/app/span_handler_builder_test.go index 2c99ec496cc..e6313c405b7 100644 --- a/cmd/collector/app/builder/span_handler_builder_test.go +++ b/cmd/collector/app/span_handler_builder_test.go @@ -13,7 +13,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package builder +package app import ( "testing" diff --git a/cmd/collector/app/span_processor.go b/cmd/collector/app/span_processor.go index 3a77cd3b86a..5f2d77e49a6 100644 --- a/cmd/collector/app/span_processor.go +++ b/cmd/collector/app/span_processor.go @@ -23,6 +23,7 @@ import ( "go.uber.org/atomic" "go.uber.org/zap" + "github.com/jaegertracing/jaeger/cmd/collector/app/processor" "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/queue" @@ -37,18 +38,6 @@ const ( minRequiredChange = 1.2 ) -// ProcessSpansOptions additional options passed to processor along with the spans. -type ProcessSpansOptions struct { - SpanFormat SpanFormat - InboundTransport InboundTransport -} - -// SpanProcessor handles model spans -type SpanProcessor interface { - // ProcessSpans processes model spans and return with either a list of true/false success or an error - ProcessSpans(mSpans []*model.Span, options ProcessSpansOptions) ([]bool, error) -} - type spanProcessor struct { queue *queue.BoundedQueue queueResizeMu sync.Mutex @@ -78,7 +67,7 @@ type queueItem struct { func NewSpanProcessor( spanWriter spanstore.Writer, opts ...Option, -) SpanProcessor { +) processor.SpanProcessor { sp := newSpanProcessor(spanWriter, opts...) sp.queue.StartConsumers(sp.numWorkers, func(item interface{}) { @@ -167,7 +156,7 @@ func (sp *spanProcessor) countSpan(span *model.Span) { sp.spansProcessed.Inc() } -func (sp *spanProcessor) ProcessSpans(mSpans []*model.Span, options ProcessSpansOptions) ([]bool, error) { +func (sp *spanProcessor) ProcessSpans(mSpans []*model.Span, options processor.SpansOptions) ([]bool, error) { sp.preProcessSpans(mSpans) sp.metrics.BatchSize.Update(int64(len(mSpans))) retMe := make([]bool, len(mSpans)) @@ -193,7 +182,7 @@ func (sp *spanProcessor) addCollectorTags(span *model.Span) { } } -func (sp *spanProcessor) enqueueSpan(span *model.Span, originalFormat SpanFormat, transport InboundTransport) bool { +func (sp *spanProcessor) enqueueSpan(span *model.Span, originalFormat processor.SpanFormat, transport processor.InboundTransport) bool { spanCounts := sp.metrics.GetCountsForFormat(originalFormat, transport) spanCounts.ReceivedBySvc.ReportServiceNameForSpan(span) diff --git a/cmd/collector/app/span_processor_test.go b/cmd/collector/app/span_processor_test.go index fc0c9d01ea8..e9184007103 100644 --- a/cmd/collector/app/span_processor_test.go +++ b/cmd/collector/app/span_processor_test.go @@ -17,6 +17,7 @@ package app import ( "fmt" + "io" "sync" "testing" "time" @@ -27,6 +28,8 @@ import ( "go.uber.org/atomic" "go.uber.org/zap" + "github.com/jaegertracing/jaeger/cmd/collector/app/handler" + "github.com/jaegertracing/jaeger/cmd/collector/app/processor" zipkinSanitizer "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer/zipkin" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/testutils" @@ -34,19 +37,20 @@ import ( zc "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" ) +var _ (io.Closer) = (*fakeSpanWriter)(nil) var blackListedService = "zoidberg" func TestBySvcMetrics(t *testing.T) { allowedService := "bender" type TestCase struct { - format SpanFormat + format processor.SpanFormat serviceName string rootSpan bool debug bool } - spanFormat := [2]SpanFormat{ZipkinSpanFormat, JaegerSpanFormat} + spanFormat := [2]processor.SpanFormat{processor.ZipkinSpanFormat, processor.JaegerSpanFormat} serviceNames := [2]string{allowedService, blackListedService} rootSpanEnabled := [2]bool{true, false} debugEnabled := [2]bool{true, false} @@ -73,7 +77,7 @@ func TestBySvcMetrics(t *testing.T) { logger := zap.NewNop() serviceMetrics := mb.Namespace(metrics.NSOptions{Name: "service", Tags: nil}) hostMetrics := mb.Namespace(metrics.NSOptions{Name: "host", Tags: nil}) - processor := newSpanProcessor( + sp := newSpanProcessor( &fakeSpanWriter{}, Options.ServiceMetrics(serviceMetrics), Options.HostMetrics(hostMetrics), @@ -85,15 +89,15 @@ func TestBySvcMetrics(t *testing.T) { ) var metricPrefix, format string switch test.format { - case ZipkinSpanFormat: + case processor.ZipkinSpanFormat: span := makeZipkinSpan(test.serviceName, test.rootSpan, test.debug) - zHandler := NewZipkinSpanHandler(logger, processor, zipkinSanitizer.NewParentIDSanitizer()) - zHandler.SubmitZipkinBatch([]*zc.Span{span, span}, SubmitBatchOptions{}) + zHandler := handler.NewZipkinSpanHandler(logger, sp, zipkinSanitizer.NewParentIDSanitizer()) + zHandler.SubmitZipkinBatch([]*zc.Span{span, span}, handler.SubmitBatchOptions{}) metricPrefix = "service" format = "zipkin" - case JaegerSpanFormat: + case processor.JaegerSpanFormat: span, process := makeJaegerSpan(test.serviceName, test.rootSpan, test.debug) - jHandler := NewJaegerSpanHandler(logger, processor) + jHandler := handler.NewJaegerSpanHandler(logger, sp) jHandler.SubmitBatches([]*jaeger.Batch{ { Spans: []*jaeger.Span{ @@ -102,7 +106,7 @@ func TestBySvcMetrics(t *testing.T) { }, Process: process, }, - }, SubmitBatchOptions{}) + }, handler.SubmitBatchOptions{}) metricPrefix = "service" format = "jaeger" default: @@ -162,6 +166,10 @@ func (n *fakeSpanWriter) WriteSpan(span *model.Span) error { return n.err } +func (n *fakeSpanWriter) Close() error { + return nil +} + func makeZipkinSpan(service string, rootSpan bool, debugEnabled bool) *zc.Span { var parentID *int64 if !rootSpan { @@ -215,7 +223,7 @@ func TestSpanProcessor(t *testing.T) { ServiceName: "x", }, }, - }, ProcessSpansOptions{SpanFormat: JaegerSpanFormat}) + }, processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat}) assert.NoError(t, err) assert.Equal(t, []bool{true}, res) } @@ -239,7 +247,7 @@ func TestSpanProcessorErrors(t *testing.T) { ServiceName: "x", }, }, - }, ProcessSpansOptions{SpanFormat: JaegerSpanFormat}) + }, processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat}) assert.NoError(t, err) assert.Equal(t, []bool{true}, res) @@ -297,7 +305,7 @@ func TestSpanProcessorBusy(t *testing.T) { ServiceName: "x", }, }, - }, ProcessSpansOptions{SpanFormat: JaegerSpanFormat}) + }, processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat}) assert.Error(t, err, "expcting busy error") assert.Nil(t, res) diff --git a/cmd/collector/app/zipkin/http_handler.go b/cmd/collector/app/zipkin/http_handler.go index 4883350cdc7..4b74a6b5ad4 100644 --- a/cmd/collector/app/zipkin/http_handler.go +++ b/cmd/collector/app/zipkin/http_handler.go @@ -30,7 +30,8 @@ import ( "github.com/golang/protobuf/proto" "github.com/gorilla/mux" - "github.com/jaegertracing/jaeger/cmd/collector/app" + "github.com/jaegertracing/jaeger/cmd/collector/app/handler" + "github.com/jaegertracing/jaeger/cmd/collector/app/processor" "github.com/jaegertracing/jaeger/model/converter/thrift/zipkin" zipkinProto "github.com/jaegertracing/jaeger/proto-gen/zipkin" "github.com/jaegertracing/jaeger/swagger-gen/models" @@ -41,13 +42,13 @@ import ( // APIHandler handles all HTTP calls to the collector type APIHandler struct { - zipkinSpansHandler app.ZipkinSpansHandler + zipkinSpansHandler handler.ZipkinSpansHandler zipkinV2Formats strfmt.Registry } // NewAPIHandler returns a new APIHandler func NewAPIHandler( - zipkinSpansHandler app.ZipkinSpansHandler, + zipkinSpansHandler handler.ZipkinSpansHandler, ) *APIHandler { swaggerSpec, _ := loads.Analyzed(restapi.SwaggerJSON, "") return &APIHandler{ @@ -68,7 +69,7 @@ func (aH *APIHandler) saveSpans(w http.ResponseWriter, r *http.Request) { if strings.Contains(r.Header.Get("Content-Encoding"), "gzip") { gz, err := gunzip(bRead) if err != nil { - http.Error(w, fmt.Sprintf(app.UnableToReadBodyErrFormat, err), http.StatusBadRequest) + http.Error(w, fmt.Sprintf(handler.UnableToReadBodyErrFormat, err), http.StatusBadRequest) return } defer gz.Close() @@ -77,7 +78,7 @@ func (aH *APIHandler) saveSpans(w http.ResponseWriter, r *http.Request) { bodyBytes, err := ioutil.ReadAll(bRead) if err != nil { - http.Error(w, fmt.Sprintf(app.UnableToReadBodyErrFormat, err), http.StatusInternalServerError) + http.Error(w, fmt.Sprintf(handler.UnableToReadBodyErrFormat, err), http.StatusInternalServerError) return } @@ -99,7 +100,7 @@ func (aH *APIHandler) saveSpans(w http.ResponseWriter, r *http.Request) { return } if err != nil { - http.Error(w, fmt.Sprintf(app.UnableToReadBodyErrFormat, err), http.StatusBadRequest) + http.Error(w, fmt.Sprintf(handler.UnableToReadBodyErrFormat, err), http.StatusBadRequest) return } @@ -117,7 +118,7 @@ func (aH *APIHandler) saveSpansV2(w http.ResponseWriter, r *http.Request) { if strings.Contains(r.Header.Get("Content-Encoding"), "gzip") { gz, err := gunzip(bRead) if err != nil { - http.Error(w, fmt.Sprintf(app.UnableToReadBodyErrFormat, err), http.StatusBadRequest) + http.Error(w, fmt.Sprintf(handler.UnableToReadBodyErrFormat, err), http.StatusBadRequest) return } defer gz.Close() @@ -126,7 +127,7 @@ func (aH *APIHandler) saveSpansV2(w http.ResponseWriter, r *http.Request) { bodyBytes, err := ioutil.ReadAll(bRead) if err != nil { - http.Error(w, fmt.Sprintf(app.UnableToReadBodyErrFormat, err), http.StatusInternalServerError) + http.Error(w, fmt.Sprintf(handler.UnableToReadBodyErrFormat, err), http.StatusInternalServerError) return } @@ -149,7 +150,7 @@ func (aH *APIHandler) saveSpansV2(w http.ResponseWriter, r *http.Request) { } if err != nil { - http.Error(w, fmt.Sprintf(app.UnableToReadBodyErrFormat, err), http.StatusBadRequest) + http.Error(w, fmt.Sprintf(handler.UnableToReadBodyErrFormat, err), http.StatusBadRequest) return } @@ -200,7 +201,7 @@ func gunzip(r io.ReadCloser) (*gzip.Reader, error) { func (aH *APIHandler) saveThriftSpans(tSpans []*zipkincore.Span) error { if len(tSpans) > 0 { - opts := app.SubmitBatchOptions{InboundTransport: app.HTTPTransport} + opts := handler.SubmitBatchOptions{InboundTransport: processor.HTTPTransport} if _, err := aH.zipkinSpansHandler.SubmitZipkinBatch(tSpans, opts); err != nil { return err } diff --git a/cmd/collector/app/zipkin/http_handler_test.go b/cmd/collector/app/zipkin/http_handler_test.go index 383996cc0a1..01639b9109b 100644 --- a/cmd/collector/app/zipkin/http_handler_test.go +++ b/cmd/collector/app/zipkin/http_handler_test.go @@ -34,7 +34,7 @@ import ( jaegerClient "github.com/uber/jaeger-client-go" zipkinTransport "github.com/uber/jaeger-client-go/transport/zipkin" - "github.com/jaegertracing/jaeger/cmd/collector/app" + "github.com/jaegertracing/jaeger/cmd/collector/app/handler" zipkinTrift "github.com/jaegertracing/jaeger/model/converter/thrift/zipkin" zipkinProto "github.com/jaegertracing/jaeger/proto-gen/zipkin" "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" @@ -48,7 +48,7 @@ type mockZipkinHandler struct { spans []*zipkincore.Span } -func (p *mockZipkinHandler) SubmitZipkinBatch(spans []*zipkincore.Span, opts app.SubmitBatchOptions) ([]*zipkincore.Response, error) { +func (p *mockZipkinHandler) SubmitZipkinBatch(spans []*zipkincore.Span, opts handler.SubmitBatchOptions) ([]*zipkincore.Response, error) { p.mux.Lock() defer p.mux.Unlock() p.spans = append(p.spans, spans...) diff --git a/cmd/collector/main.go b/cmd/collector/main.go index 79aba598232..60a6efce8f1 100644 --- a/cmd/collector/main.go +++ b/cmd/collector/main.go @@ -17,46 +17,24 @@ package main import ( "fmt" - "io" "log" - "net" - "net/http" "os" - "strconv" - "strings" - "github.com/gorilla/mux" - "github.com/rs/cors" "github.com/spf13/cobra" "github.com/spf13/viper" "github.com/uber/jaeger-lib/metrics" - "github.com/uber/tchannel-go" - "github.com/uber/tchannel-go/thrift" _ "go.uber.org/automaxprocs" "go.uber.org/zap" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials" "github.com/jaegertracing/jaeger/cmd/collector/app" - "github.com/jaegertracing/jaeger/cmd/collector/app/builder" - "github.com/jaegertracing/jaeger/cmd/collector/app/grpcserver" - "github.com/jaegertracing/jaeger/cmd/collector/app/sampling" - "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore" - "github.com/jaegertracing/jaeger/cmd/collector/app/zipkin" "github.com/jaegertracing/jaeger/cmd/docs" "github.com/jaegertracing/jaeger/cmd/env" "github.com/jaegertracing/jaeger/cmd/flags" - clientcfgHandler "github.com/jaegertracing/jaeger/pkg/clientcfg/clientcfghttp" "github.com/jaegertracing/jaeger/pkg/config" - "github.com/jaegertracing/jaeger/pkg/healthcheck" - "github.com/jaegertracing/jaeger/pkg/recoveryhandler" "github.com/jaegertracing/jaeger/pkg/version" ss "github.com/jaegertracing/jaeger/plugin/sampling/strategystore" "github.com/jaegertracing/jaeger/plugin/storage" "github.com/jaegertracing/jaeger/ports" - jc "github.com/jaegertracing/jaeger/thrift-gen/jaeger" - sc "github.com/jaegertracing/jaeger/thrift-gen/sampling" - zc "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" ) const serviceName = "jaeger-collector" @@ -85,6 +63,7 @@ func main() { logger := svc.Logger // shortcut baseFactory := svc.MetricsFactory.Namespace(metrics.NSOptions{Name: "jaeger"}) metricsFactory := baseFactory.Namespace(metrics.NSOptions{Name: "collector"}) + strategyStoreFactory.InitFromViper(v) storageFactory.InitFromViper(v) if err := storageFactory.Initialize(baseFactory, logger); err != nil { @@ -95,82 +74,28 @@ func main() { logger.Fatal("Failed to create span writer", zap.Error(err)) } - builderOpts := new(builder.CollectorOptions).InitFromViper(v) - handlerBuilder := &builder.SpanHandlerBuilder{ - SpanWriter: spanWriter, - CollectorOpts: *builderOpts, - Logger: logger, - MetricsFactory: metricsFactory, - } - - zipkinSpansHandler, jaegerBatchesHandler, grpcHandler := handlerBuilder.BuildHandlers() strategyStoreFactory.InitFromViper(v) - strategyStore := initSamplingStrategyStore(strategyStoreFactory, metricsFactory, logger) - - { - ch, err := tchannel.NewChannel(serviceName, &tchannel.ChannelOptions{}) - if err != nil { - logger.Fatal("Unable to create new TChannel", zap.Error(err)) - } - server := thrift.NewServer(ch) - batchHandler := app.NewTChannelHandler(jaegerBatchesHandler, zipkinSpansHandler) - server.Register(jc.NewTChanCollectorServer(batchHandler)) - server.Register(zc.NewTChanZipkinCollectorServer(batchHandler)) - server.Register(sc.NewTChanSamplingManagerServer(sampling.NewHandler(strategyStore))) - portStr := ":" + strconv.Itoa(builderOpts.CollectorPort) - listener, err := net.Listen("tcp", portStr) - if err != nil { - logger.Fatal("Unable to start listening on channel", zap.Error(err)) - } - logger.Info("Starting jaeger-collector TChannel server", zap.Int("port", builderOpts.CollectorPort)) - logger.Warn("TChannel has been deprecated and will be removed in a future release") - ch.Serve(listener) + if err := strategyStoreFactory.Initialize(metricsFactory, logger); err != nil { + logger.Fatal("Failed to init sampling strategy store factory", zap.Error(err)) } - - server, err := startGRPCServer(builderOpts, grpcHandler, strategyStore, logger) + strategyStore, err := strategyStoreFactory.CreateStrategyStore() if err != nil { - logger.Fatal("Could not start gRPC collector", zap.Error(err)) + logger.Fatal("Failed to create sampling strategy store", zap.Error(err)) } - { - r := mux.NewRouter() - apiHandler := app.NewAPIHandler(jaegerBatchesHandler) - apiHandler.RegisterRoutes(r) - - cfgHandler := clientcfgHandler.NewHTTPHandler(clientcfgHandler.HTTPHandlerParams{ - ConfigManager: &clientcfgHandler.ConfigManager{ - SamplingStrategyStore: strategyStore, - // TODO provide baggage manager - }, - MetricsFactory: metricsFactory, - BasePath: "/api", - LegacySamplingEndpoint: false, - }) - cfgHandler.RegisterRoutes(r) - - recoveryHandler := recoveryhandler.NewRecoveryHandler(logger, true) - httpHandler := recoveryHandler(r) - - go startZipkinHTTPAPI(logger, builderOpts.CollectorZipkinHTTPPort, builderOpts.CollectorZipkinAllowedOrigins, builderOpts.CollectorZipkinAllowedHeaders, zipkinSpansHandler, recoveryHandler) - - httpPortStr := ":" + strconv.Itoa(builderOpts.CollectorHTTPPort) - logger.Info("Starting jaeger-collector HTTP server", zap.String("http-host-port", httpPortStr)) - go func() { - if err := http.ListenAndServe(httpPortStr, httpHandler); err != nil { - logger.Fatal("Could not launch service", zap.Error(err)) - } - svc.HC().Set(healthcheck.Unavailable) - }() - } + c := app.New(&app.CollectorParams{ + ServiceName: serviceName, + Logger: logger, + MetricsFactory: metricsFactory, + SpanWriter: spanWriter, + StrategyStore: strategyStore, + HealthCheck: svc.HC(), + }) + collectorOpts := new(app.CollectorOptions).InitFromViper(v) + c.Start(collectorOpts) svc.RunAndThen(func() { - if closer, ok := spanWriter.(io.Closer); ok { - server.GracefulStop() - err := closer.Close() - if err != nil { - logger.Error("Failed to close span writer", zap.Error(err)) - } - } + c.Close() }) return nil }, @@ -184,7 +109,7 @@ func main() { v, command, svc.AddFlags, - builder.AddFlags, + app.AddFlags, storageFactory.AddFlags, strategyStoreFactory.AddFlags, ) @@ -194,77 +119,3 @@ func main() { os.Exit(1) } } - -func startGRPCServer( - opts *builder.CollectorOptions, - handler *app.GRPCHandler, - samplingStore strategystore.StrategyStore, - logger *zap.Logger, -) (*grpc.Server, error) { - var server *grpc.Server - - if opts.TLS.Enabled { // user requested a server with TLS, setup creds - tlsCfg, err := opts.TLS.Config() - if err != nil { - return nil, err - } - - creds := credentials.NewTLS(tlsCfg) - server = grpc.NewServer(grpc.Creds(creds)) - } else { // server without TLS - server = grpc.NewServer() - } - _, err := grpcserver.StartGRPCCollector(opts.CollectorGRPCPort, server, handler, samplingStore, logger, func(err error) { - logger.Fatal("gRPC collector failed", zap.Error(err)) - }) - if err != nil { - return nil, err - } - return server, err -} - -func startZipkinHTTPAPI( - logger *zap.Logger, - zipkinPort int, - allowedOrigins string, - allowedHeaders string, - zipkinSpansHandler app.ZipkinSpansHandler, - recoveryHandler func(http.Handler) http.Handler, -) { - if zipkinPort != 0 { - zHandler := zipkin.NewAPIHandler(zipkinSpansHandler) - r := mux.NewRouter() - zHandler.RegisterRoutes(r) - - origins := strings.Split(strings.Replace(allowedOrigins, " ", "", -1), ",") - headers := strings.Split(strings.Replace(allowedHeaders, " ", "", -1), ",") - - c := cors.New(cors.Options{ - AllowedOrigins: origins, - AllowedMethods: []string{"POST"}, // Allowing only POST, because that's the only handled one - AllowedHeaders: headers, - }) - - httpPortStr := ":" + strconv.Itoa(zipkinPort) - logger.Info("Listening for Zipkin HTTP traffic", zap.Int("zipkin.http-port", zipkinPort)) - - if err := http.ListenAndServe(httpPortStr, c.Handler(recoveryHandler(r))); err != nil { - logger.Fatal("Could not launch service", zap.Error(err)) - } - } -} - -func initSamplingStrategyStore( - samplingStrategyStoreFactory *ss.Factory, - metricsFactory metrics.Factory, - logger *zap.Logger, -) strategystore.StrategyStore { - if err := samplingStrategyStoreFactory.Initialize(metricsFactory, logger); err != nil { - logger.Fatal("Failed to init sampling strategy store factory", zap.Error(err)) - } - strategyStore, err := samplingStrategyStoreFactory.CreateStrategyStore() - if err != nil { - logger.Fatal("Failed to create sampling strategy store", zap.Error(err)) - } - return strategyStore -} diff --git a/cmd/collector/app/service_name_normalizer.go b/pkg/normalizer/service_name.go similarity index 90% rename from cmd/collector/app/service_name_normalizer.go rename to pkg/normalizer/service_name.go index e243d225238..8070f8048d3 100644 --- a/cmd/collector/app/service_name_normalizer.go +++ b/pkg/normalizer/service_name.go @@ -13,14 +13,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -package app +package normalizer import ( "strings" ) -// NormalizeServiceName converts service name to a lowercase string that is safe to use in metrics -func NormalizeServiceName(serviceName string) string { +// ServiceName converts service name to a lowercase string that is safe to use in metrics +func ServiceName(serviceName string) string { return serviceNameReplacer.Replace(serviceName) } diff --git a/cmd/collector/app/service_name_normalizer_test.go b/pkg/normalizer/service_name_test.go similarity index 72% rename from cmd/collector/app/service_name_normalizer_test.go rename to pkg/normalizer/service_name_test.go index f369b49074f..be9155348e8 100644 --- a/cmd/collector/app/service_name_normalizer_test.go +++ b/pkg/normalizer/service_name_test.go @@ -13,7 +13,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package app +package normalizer import ( "testing" @@ -22,7 +22,7 @@ import ( ) func TestServiceNameReplacer(t *testing.T) { - assert.Equal(t, "abc", NormalizeServiceName("ABC"), "lower case conversion") - assert.Equal(t, "a_b_c__", NormalizeServiceName("a&b%c/:"), "disallowed runes to underscore") - assert.Equal(t, "a_z_0123456789.", NormalizeServiceName("A_Z_0123456789."), "allowed runes") + assert.Equal(t, "abc", ServiceName("ABC"), "lower case conversion") + assert.Equal(t, "a_b_c__", ServiceName("a&b%c/:"), "disallowed runes to underscore") + assert.Equal(t, "a_z_0123456789.", ServiceName("A_Z_0123456789."), "allowed runes") }