diff --git a/cmd/agent/app/reporter/grpc/sampling_manager.go b/cmd/agent/app/reporter/grpc/sampling_manager.go index 1eb67bd4bdd..2bf580f2a34 100644 --- a/cmd/agent/app/reporter/grpc/sampling_manager.go +++ b/cmd/agent/app/reporter/grpc/sampling_manager.go @@ -28,19 +28,19 @@ import ( // SamplingManager returns sampling decisions from collector over gRPC. type SamplingManager struct { - sampling api_v2.SamplingManagerClient + client api_v2.SamplingManagerClient } // NewSamplingManager creates gRPC sampling manager. func NewSamplingManager(conn *grpc.ClientConn) *SamplingManager { return &SamplingManager{ - sampling: api_v2.NewSamplingManagerClient(conn), + client: api_v2.NewSamplingManagerClient(conn), } } // GetSamplingStrategy returns sampling strategies from collector. func (s *SamplingManager) GetSamplingStrategy(serviceName string) (*sampling.SamplingStrategyResponse, error) { - r, err := s.sampling.GetSamplingStrategy(context.Background(), &api_v2.SamplingStrategyParameters{ServiceName: serviceName}) + r, err := s.client.GetSamplingStrategy(context.Background(), &api_v2.SamplingStrategyParameters{ServiceName: serviceName}) if err != nil { return nil, err } diff --git a/cmd/agent/main.go b/cmd/agent/main.go index 5fc2c23c796..debc437eb21 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -118,10 +118,12 @@ func createCollectorProxy( case reporter.GRPC: grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr)) return grpc.NewCollectorProxy(grpcRepOpts, logger), nil - default: - logger.Warn("Specified unknown reporter type, falling back to tchannel") - fallthrough case reporter.TCHANNEL: + fallthrough + default: + if opts.ReporterType != reporter.TCHANNEL { + logger.Warn("Specified unknown reporter type, falling back to tchannel") + } return tchannel.NewCollectorProxy(tchanRep, mFactory, logger) } } diff --git a/cmd/all-in-one/Dockerfile b/cmd/all-in-one/Dockerfile index e654f8b9a9f..1798cfb342d 100644 --- a/cmd/all-in-one/Dockerfile +++ b/cmd/all-in-one/Dockerfile @@ -16,7 +16,7 @@ EXPOSE 5778 EXPOSE 14268 # Collector gRPC -EXPOSE 14270 +EXPOSE 14250 # Web HTTP EXPOSE 16686 diff --git a/cmd/all-in-one/main.go b/cmd/all-in-one/main.go index 924917ba27b..7bc9b5a6ac9 100644 --- a/cmd/all-in-one/main.go +++ b/cmd/all-in-one/main.go @@ -17,7 +17,6 @@ package main import ( "fmt" "io" - "io/ioutil" "log" "net" "net/http" @@ -38,7 +37,6 @@ import ( "github.com/uber/tchannel-go/thrift" "go.uber.org/zap" "google.golang.org/grpc" - "google.golang.org/grpc/grpclog" agentApp "github.com/jaegertracing/jaeger/cmd/agent/app" agentRep "github.com/jaegertracing/jaeger/cmd/agent/app/reporter" @@ -47,6 +45,7 @@ import ( basic "github.com/jaegertracing/jaeger/cmd/builder" collectorApp "github.com/jaegertracing/jaeger/cmd/collector/app" collector "github.com/jaegertracing/jaeger/cmd/collector/app/builder" + "github.com/jaegertracing/jaeger/cmd/collector/app/grpcserver" "github.com/jaegertracing/jaeger/cmd/collector/app/sampling" "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore" "github.com/jaegertracing/jaeger/cmd/collector/app/zipkin" @@ -60,7 +59,6 @@ import ( "github.com/jaegertracing/jaeger/pkg/version" ss "github.com/jaegertracing/jaeger/plugin/sampling/strategystore" "github.com/jaegertracing/jaeger/plugin/storage" - "github.com/jaegertracing/jaeger/proto-gen/api_v2" "github.com/jaegertracing/jaeger/storage/dependencystore" "github.com/jaegertracing/jaeger/storage/spanstore" storageMetrics "github.com/jaegertracing/jaeger/storage/spanstore/metrics" @@ -130,7 +128,8 @@ func main() { logger.Fatal("Failed to create dependency reader", zap.Error(err)) } - strategyStore := initializeStrategyStore(strategyStoreFactory, v, metricsFactory, logger) + strategyStoreFactory.InitFromViper(v) + strategyStore := initSamplingStrategyStore(strategyStoreFactory, metricsFactory, logger) aOpts := new(agentApp.Builder).InitFromViper(v) repOpts := new(agentRep.Options).InitFromViper(v) @@ -212,22 +211,24 @@ func startAgent( func createCollectorProxy( cOpts *collector.CollectorOptions, - opts *agentRep.Options, - tchanRep *agentTchanRep.Builder, + repOpts *agentRep.Options, + tchanRepOpts *agentTchanRep.Builder, grpcRepOpts *agentGrpcRep.Options, logger *zap.Logger, mFactory metrics.Factory, ) (agentApp.CollectorProxy, error) { - switch opts.ReporterType { + switch repOpts.ReporterType { case agentRep.GRPC: grpcRepOpts.CollectorHostPort = fmt.Sprintf("127.0.0.1:%d", cOpts.CollectorGRPCPort) return agentGrpcRep.NewCollectorProxy(grpcRepOpts, logger), nil - default: - logger.Warn("Specified unknown reporter type, falling back to tchannel") - fallthrough case agentRep.TCHANNEL: - tchanRep.CollectorHostPorts = append(tchanRep.CollectorHostPorts, fmt.Sprintf("127.0.0.1:%d", cOpts.CollectorPort)) - return agentTchanRep.NewCollectorProxy(tchanRep, mFactory, logger) + fallthrough + default: + if repOpts.ReporterType != agentRep.TCHANNEL { + logger.Warn("Specified unknown reporter type, falling back to tchannel") + } + tchanRepOpts.CollectorHostPorts = append(tchanRepOpts.CollectorHostPorts, fmt.Sprintf("127.0.0.1:%d", cOpts.CollectorPort)) + return agentTchanRep.NewCollectorProxy(tchanRepOpts, mFactory, logger) } } @@ -272,24 +273,10 @@ func startCollector( } { - grpcPortStr := ":" + strconv.Itoa(cOpts.CollectorGRPCPort) - lis, err := net.Listen("tcp", grpcPortStr) - if err != nil { - logger.Fatal("Failed to listen on gRPC port", zap.Error(err)) - } - - grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr)) - - grpcSrv := grpc.NewServer() - api_v2.RegisterCollectorServiceServer(grpcSrv, grpcHandler) - api_v2.RegisterSamplingManagerServer(grpcSrv, sampling.NewGRPCHandler(strategyStore)) - logger.Info("Starting Jaeger Collector gRPC server", zap.Int("grpc-port", cOpts.CollectorGRPCPort)) - go func() { - if err := grpcSrv.Serve(lis); err != nil { - logger.Fatal("Could not launch gRPC service", zap.Error(err)) - } - hc.Set(healthcheck.Unavailable) - }() + grpcserver.StartGRPCCollector(cOpts.CollectorGRPCPort, grpc.NewServer(), grpcHandler, strategyStore, logger, + func(err error) { + logger.Fatal("gRPC collector failed", zap.Error(err)) + }) } { @@ -387,13 +374,11 @@ func startQuery( }() } -func initializeStrategyStore( +func initSamplingStrategyStore( samplingStrategyStoreFactory *ss.Factory, - v *viper.Viper, metricsFactory metrics.Factory, logger *zap.Logger, ) strategystore.StrategyStore { - samplingStrategyStoreFactory.InitFromViper(v) if err := samplingStrategyStoreFactory.Initialize(metricsFactory, logger); err != nil { logger.Fatal("Failed to init sampling strategy store factory", zap.Error(err)) } diff --git a/cmd/collector/Dockerfile b/cmd/collector/Dockerfile index 4a4ee57eecf..0c9f80d67e7 100644 --- a/cmd/collector/Dockerfile +++ b/cmd/collector/Dockerfile @@ -6,6 +6,6 @@ FROM scratch COPY --from=certs /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt EXPOSE 14267 -EXPOSE 14270 +EXPOSE 14250 COPY collector-linux /go/bin/ ENTRYPOINT ["/go/bin/collector-linux"] diff --git a/cmd/collector/app/builder/builder_flags.go b/cmd/collector/app/builder/builder_flags.go index 04489e6106e..b33337c155d 100644 --- a/cmd/collector/app/builder/builder_flags.go +++ b/cmd/collector/app/builder/builder_flags.go @@ -32,7 +32,7 @@ const ( defaultTChannelPort = 14267 defaultHTTPPort = 14268 - defaultGRPCPort = 14270 + defaultGRPCPort = 14250 // CollectorDefaultHealthCheckHTTPPort is the default HTTP Port for health check CollectorDefaultHealthCheckHTTPPort = 14269 ) diff --git a/cmd/collector/app/grpc_handler_test.go b/cmd/collector/app/grpc_handler_test.go index 6609eb8310c..72d607262ce 100644 --- a/cmd/collector/app/grpc_handler_test.go +++ b/cmd/collector/app/grpc_handler_test.go @@ -21,7 +21,7 @@ import ( "sync" "testing" - "github.com/crossdock/crossdock-go/assert" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" "google.golang.org/grpc" diff --git a/cmd/collector/app/grpcserver/grpc_server.go b/cmd/collector/app/grpcserver/grpc_server.go new file mode 100644 index 00000000000..e1c99d80fa7 --- /dev/null +++ b/cmd/collector/app/grpcserver/grpc_server.go @@ -0,0 +1,71 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package grpcserver + +import ( + "io/ioutil" + "net" + "os" + "strconv" + + "github.com/pkg/errors" + "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/grpc/grpclog" + + "github.com/jaegertracing/jaeger/cmd/collector/app" + "github.com/jaegertracing/jaeger/cmd/collector/app/sampling" + "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore" + "github.com/jaegertracing/jaeger/proto-gen/api_v2" +) + +// StartGRPCCollector configures and starts gRPC endpoints exposed by collector. +func StartGRPCCollector( + port int, + server *grpc.Server, + handler *app.GRPCHandler, + samplingStrategy strategystore.StrategyStore, + logger *zap.Logger, + serveErr func(error), +) (net.Addr, error) { + grpcPortStr := ":" + strconv.Itoa(port) + lis, err := net.Listen("tcp", grpcPortStr) + if err != nil { + return nil, errors.Wrap(err, "Failed to listen on gRPC port") + } + + grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr)) + + api_v2.RegisterCollectorServiceServer(server, handler) + api_v2.RegisterSamplingManagerServer(server, sampling.NewGRPCHandler(samplingStrategy)) + starServer(server, lis, logger, serveErr) + return lis.Addr(), nil +} + +func starServer(server *grpc.Server, lis net.Listener, logger *zap.Logger, serveErr func(error)) { + var port string + if tcpAddr, ok := lis.Addr().(*net.TCPAddr); ok { + port = strconv.Itoa(tcpAddr.Port) + } else { + port = lis.Addr().Network() + } + logger.Info("Starting jaeger-collector gRPC server", zap.String("grpc-port", port)) + go func() { + if err := server.Serve(lis); err != nil { + logger.Error("Could not launch gRPC service", zap.Error(err)) + serveErr(err) + } + }() +} diff --git a/cmd/collector/app/grpcserver/grpc_server_test.go b/cmd/collector/app/grpcserver/grpc_server_test.go new file mode 100644 index 00000000000..7feaa4e57f1 --- /dev/null +++ b/cmd/collector/app/grpcserver/grpc_server_test.go @@ -0,0 +1,90 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package grpcserver + +import ( + "context" + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest/observer" + "google.golang.org/grpc" + "google.golang.org/grpc/test/bufconn" + + "github.com/jaegertracing/jaeger/cmd/collector/app" + "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/proto-gen/api_v2" + "github.com/jaegertracing/jaeger/thrift-gen/sampling" +) + +// test wrong port number +func TestFailToListen(t *testing.T) { + l, _ := zap.NewDevelopment() + handler := app.NewGRPCHandler(l, &mockSpanProcessor{}) + server := grpc.NewServer() + addr, err := StartGRPCCollector(1, server, handler, &mockSamplingStore{}, l, func(e error) { + }) + require.Nil(t, addr) + assert.EqualError(t, err, "Failed to listen on gRPC port: listen tcp :1: bind: permission denied") +} + +func TestFailServe(t *testing.T) { + lis := bufconn.Listen(0) + lis.Close() + core, logs := observer.New(zap.NewAtomicLevelAt(zapcore.ErrorLevel)) + wg := &sync.WaitGroup{} + wg.Add(1) + starServer(grpc.NewServer(), lis, zap.New(core), func(e error) { + assert.Equal(t, 1, len(logs.All())) + assert.Equal(t, "Could not launch gRPC service", logs.All()[0].Message) + wg.Done() + }) + wg.Wait() +} + +func TestSpanCollector(t *testing.T) { + l, _ := zap.NewDevelopment() + handler := app.NewGRPCHandler(l, &mockSpanProcessor{}) + server := grpc.NewServer() + addr, err := StartGRPCCollector(0, server, handler, &mockSamplingStore{}, l, func(e error) { + }) + require.NoError(t, err) + + conn, err := grpc.Dial(addr.String(), grpc.WithInsecure()) + defer conn.Close() + defer server.Stop() + require.NoError(t, err) + c := api_v2.NewCollectorServiceClient(conn) + response, err := c.PostSpans(context.Background(), &api_v2.PostSpansRequest{}) + require.NoError(t, err) + assert.Equal(t, true, response.Ok) +} + +type mockSamplingStore struct{} + +func (s mockSamplingStore) GetSamplingStrategy(serviceName string) (*sampling.SamplingStrategyResponse, error) { + return nil, nil +} + +type mockSpanProcessor struct { +} + +func (p *mockSpanProcessor) ProcessSpans(spans []*model.Span, spanFormat string) ([]bool, error) { + return []bool{}, nil +} diff --git a/cmd/collector/main.go b/cmd/collector/main.go index 7ae01762116..d7d0b6a63b7 100644 --- a/cmd/collector/main.go +++ b/cmd/collector/main.go @@ -17,7 +17,6 @@ package main import ( "fmt" "io" - "io/ioutil" "log" "net" "net/http" @@ -34,11 +33,11 @@ import ( "github.com/uber/tchannel-go/thrift" "go.uber.org/zap" "google.golang.org/grpc" - "google.golang.org/grpc/grpclog" basicB "github.com/jaegertracing/jaeger/cmd/builder" "github.com/jaegertracing/jaeger/cmd/collector/app" "github.com/jaegertracing/jaeger/cmd/collector/app/builder" + "github.com/jaegertracing/jaeger/cmd/collector/app/grpcserver" "github.com/jaegertracing/jaeger/cmd/collector/app/sampling" "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore" "github.com/jaegertracing/jaeger/cmd/collector/app/zipkin" @@ -51,7 +50,6 @@ import ( "github.com/jaegertracing/jaeger/pkg/version" ss "github.com/jaegertracing/jaeger/plugin/sampling/strategystore" "github.com/jaegertracing/jaeger/plugin/storage" - "github.com/jaegertracing/jaeger/proto-gen/api_v2" jc "github.com/jaegertracing/jaeger/thrift-gen/jaeger" sc "github.com/jaegertracing/jaeger/thrift-gen/sampling" zc "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" @@ -121,7 +119,8 @@ func main() { } zipkinSpansHandler, jaegerBatchesHandler, grpcHandler := handlerBuilder.BuildHandlers() - strategyStore := initializeStrategyStore(strategyStoreFactory, v, metricsFactory, logger) + strategyStoreFactory.InitFromViper(v) + strategyStore := initSamplingStrategyStore(strategyStoreFactory, metricsFactory, logger) { ch, err := tchannel.NewChannel(serviceName, &tchannel.ChannelOptions{}) @@ -131,37 +130,21 @@ func main() { server := thrift.NewServer(ch) server.Register(jc.NewTChanCollectorServer(jaegerBatchesHandler)) server.Register(zc.NewTChanZipkinCollectorServer(zipkinSpansHandler)) - server.Register(sc.NewTChanSamplingManagerServer(sampling.NewHandler(strategyStore))) - portStr := ":" + strconv.Itoa(builderOpts.CollectorPort) listener, err := net.Listen("tcp", portStr) if err != nil { logger.Fatal("Unable to start listening on channel", zap.Error(err)) } - logger.Info("Starting Jaeger Collector TChannel server", zap.Int("port", builderOpts.CollectorPort)) + logger.Info("Starting jaeger-collector TChannel server", zap.Int("port", builderOpts.CollectorPort)) ch.Serve(listener) } { - grpcPortStr := ":" + strconv.Itoa(builderOpts.CollectorGRPCPort) - lis, err := net.Listen("tcp", grpcPortStr) - if err != nil { - logger.Fatal("Failed to listen on gRPC port", zap.Error(err)) - } - - grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr)) - - grpcSrv := grpc.NewServer() - api_v2.RegisterCollectorServiceServer(grpcSrv, grpcHandler) - api_v2.RegisterSamplingManagerServer(grpcSrv, sampling.NewGRPCHandler(strategyStore)) - logger.Info("Starting Jaeger Collector gRPC server", zap.Int("grpc-port", builderOpts.CollectorGRPCPort)) - go func() { - if err := grpcSrv.Serve(lis); err != nil { - logger.Fatal("Could not launch gRPC service", zap.Error(err)) - } - hc.Set(healthcheck.Unavailable) - }() + grpcserver.StartGRPCCollector(builderOpts.CollectorGRPCPort, grpc.NewServer(), grpcHandler, strategyStore, logger, + func(err error) { + logger.Fatal("gRPC collector failed", zap.Error(err)) + }) } { @@ -178,7 +161,7 @@ func main() { go startZipkinHTTPAPI(logger, builderOpts.CollectorZipkinHTTPPort, zipkinSpansHandler, recoveryHandler) - logger.Info("Starting Jaeger Collector HTTP server", zap.Int("http-port", builderOpts.CollectorHTTPPort)) + logger.Info("Starting jaeger-collector HTTP server", zap.Int("http-port", builderOpts.CollectorHTTPPort)) go func() { if err := http.ListenAndServe(httpPortStr, httpHandler); err != nil { logger.Fatal("Could not launch service", zap.Error(err)) @@ -244,13 +227,11 @@ func startZipkinHTTPAPI( } } -func initializeStrategyStore( +func initSamplingStrategyStore( samplingStrategyStoreFactory *ss.Factory, - v *viper.Viper, metricsFactory metrics.Factory, logger *zap.Logger, ) strategystore.StrategyStore { - samplingStrategyStoreFactory.InitFromViper(v) if err := samplingStrategyStoreFactory.Initialize(metricsFactory, logger); err != nil { logger.Fatal("Failed to init sampling strategy store factory", zap.Error(err)) } diff --git a/docker-compose/jaeger-docker-compose.yml b/docker-compose/jaeger-docker-compose.yml index d936aed8c6f..7509ac97d3b 100644 --- a/docker-compose/jaeger-docker-compose.yml +++ b/docker-compose/jaeger-docker-compose.yml @@ -8,7 +8,7 @@ services: - "14269" - "14268:14268" - "14267" - - "14270" + - "14250" - "9411:9411" restart: on-failure depends_on: @@ -26,7 +26,7 @@ services: jaeger-agent: image: jaegertracing/jaeger-agent - command: ["--reporter.type=grpc", "--reporter.grpc.collector.host-port=jaeger-collector:14270"] + command: ["--reporter.type=grpc", "--reporter.grpc.collector.host-port=jaeger-collector:14250"] ports: - "5775:5775/udp" - "6831:6831/udp"