Skip to content

Commit

Permalink
Fix review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Pavol Loffay <ploffay@redhat.com>
  • Loading branch information
pavolloffay committed Nov 12, 2018
1 parent 95af75f commit 45ddf2f
Show file tree
Hide file tree
Showing 11 changed files with 203 additions and 74 deletions.
6 changes: 3 additions & 3 deletions cmd/agent/app/reporter/grpc/sampling_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,19 @@ import (

// SamplingManager returns sampling decisions from collector over gRPC.
type SamplingManager struct {
sampling api_v2.SamplingManagerClient
client api_v2.SamplingManagerClient
}

// NewSamplingManager creates gRPC sampling manager.
func NewSamplingManager(conn *grpc.ClientConn) *SamplingManager {
return &SamplingManager{
sampling: api_v2.NewSamplingManagerClient(conn),
client: api_v2.NewSamplingManagerClient(conn),
}
}

// GetSamplingStrategy returns sampling strategies from collector.
func (s *SamplingManager) GetSamplingStrategy(serviceName string) (*sampling.SamplingStrategyResponse, error) {
r, err := s.sampling.GetSamplingStrategy(context.Background(), &api_v2.SamplingStrategyParameters{ServiceName: serviceName})
r, err := s.client.GetSamplingStrategy(context.Background(), &api_v2.SamplingStrategyParameters{ServiceName: serviceName})
if err != nil {
return nil, err
}
Expand Down
8 changes: 5 additions & 3 deletions cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,12 @@ func createCollectorProxy(
case reporter.GRPC:
grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr))
return grpc.NewCollectorProxy(grpcRepOpts, logger), nil
default:
logger.Warn("Specified unknown reporter type, falling back to tchannel")
fallthrough
case reporter.TCHANNEL:
fallthrough
default:
if opts.ReporterType != reporter.TCHANNEL {
logger.Warn("Specified unknown reporter type, falling back to tchannel")
}
return tchannel.NewCollectorProxy(tchanRep, mFactory, logger)
}
}
2 changes: 1 addition & 1 deletion cmd/all-in-one/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ EXPOSE 5778
EXPOSE 14268

# Collector gRPC
EXPOSE 14270
EXPOSE 14250

# Web HTTP
EXPOSE 16686
Expand Down
51 changes: 18 additions & 33 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package main
import (
"fmt"
"io"
"io/ioutil"
"log"
"net"
"net/http"
Expand All @@ -38,7 +37,6 @@ import (
"github.com/uber/tchannel-go/thrift"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"

agentApp "github.com/jaegertracing/jaeger/cmd/agent/app"
agentRep "github.com/jaegertracing/jaeger/cmd/agent/app/reporter"
Expand All @@ -47,6 +45,7 @@ import (
basic "github.com/jaegertracing/jaeger/cmd/builder"
collectorApp "github.com/jaegertracing/jaeger/cmd/collector/app"
collector "github.com/jaegertracing/jaeger/cmd/collector/app/builder"
"github.com/jaegertracing/jaeger/cmd/collector/app/grpcserver"
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling"
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
"github.com/jaegertracing/jaeger/cmd/collector/app/zipkin"
Expand All @@ -60,7 +59,6 @@ import (
"github.com/jaegertracing/jaeger/pkg/version"
ss "github.com/jaegertracing/jaeger/plugin/sampling/strategystore"
"github.com/jaegertracing/jaeger/plugin/storage"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
storageMetrics "github.com/jaegertracing/jaeger/storage/spanstore/metrics"
Expand Down Expand Up @@ -130,7 +128,8 @@ func main() {
logger.Fatal("Failed to create dependency reader", zap.Error(err))
}

strategyStore := initializeStrategyStore(strategyStoreFactory, v, metricsFactory, logger)
strategyStoreFactory.InitFromViper(v)
strategyStore := initSamplingStrategyStore(strategyStoreFactory, metricsFactory, logger)

aOpts := new(agentApp.Builder).InitFromViper(v)
repOpts := new(agentRep.Options).InitFromViper(v)
Expand Down Expand Up @@ -212,22 +211,24 @@ func startAgent(

func createCollectorProxy(
cOpts *collector.CollectorOptions,
opts *agentRep.Options,
tchanRep *agentTchanRep.Builder,
repOpts *agentRep.Options,
tchanRepOpts *agentTchanRep.Builder,
grpcRepOpts *agentGrpcRep.Options,
logger *zap.Logger,
mFactory metrics.Factory,
) (agentApp.CollectorProxy, error) {
switch opts.ReporterType {
switch repOpts.ReporterType {
case agentRep.GRPC:
grpcRepOpts.CollectorHostPort = fmt.Sprintf("127.0.0.1:%d", cOpts.CollectorGRPCPort)
return agentGrpcRep.NewCollectorProxy(grpcRepOpts, logger), nil
default:
logger.Warn("Specified unknown reporter type, falling back to tchannel")
fallthrough
case agentRep.TCHANNEL:
tchanRep.CollectorHostPorts = append(tchanRep.CollectorHostPorts, fmt.Sprintf("127.0.0.1:%d", cOpts.CollectorPort))
return agentTchanRep.NewCollectorProxy(tchanRep, mFactory, logger)
fallthrough
default:
if repOpts.ReporterType != agentRep.TCHANNEL {
logger.Warn("Specified unknown reporter type, falling back to tchannel")
}
tchanRepOpts.CollectorHostPorts = append(tchanRepOpts.CollectorHostPorts, fmt.Sprintf("127.0.0.1:%d", cOpts.CollectorPort))
return agentTchanRep.NewCollectorProxy(tchanRepOpts, mFactory, logger)
}
}

Expand Down Expand Up @@ -272,24 +273,10 @@ func startCollector(
}

{
grpcPortStr := ":" + strconv.Itoa(cOpts.CollectorGRPCPort)
lis, err := net.Listen("tcp", grpcPortStr)
if err != nil {
logger.Fatal("Failed to listen on gRPC port", zap.Error(err))
}

grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr))

grpcSrv := grpc.NewServer()
api_v2.RegisterCollectorServiceServer(grpcSrv, grpcHandler)
api_v2.RegisterSamplingManagerServer(grpcSrv, sampling.NewGRPCHandler(strategyStore))
logger.Info("Starting Jaeger Collector gRPC server", zap.Int("grpc-port", cOpts.CollectorGRPCPort))
go func() {
if err := grpcSrv.Serve(lis); err != nil {
logger.Fatal("Could not launch gRPC service", zap.Error(err))
}
hc.Set(healthcheck.Unavailable)
}()
grpcserver.StartGRPCCollector(cOpts.CollectorGRPCPort, grpc.NewServer(), grpcHandler, strategyStore, logger,
func(err error) {
logger.Fatal("gRPC collector failed", zap.Error(err))
})
}

{
Expand Down Expand Up @@ -387,13 +374,11 @@ func startQuery(
}()
}

func initializeStrategyStore(
func initSamplingStrategyStore(
samplingStrategyStoreFactory *ss.Factory,
v *viper.Viper,
metricsFactory metrics.Factory,
logger *zap.Logger,
) strategystore.StrategyStore {
samplingStrategyStoreFactory.InitFromViper(v)
if err := samplingStrategyStoreFactory.Initialize(metricsFactory, logger); err != nil {
logger.Fatal("Failed to init sampling strategy store factory", zap.Error(err))
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/collector/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ FROM scratch
COPY --from=certs /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt

EXPOSE 14267
EXPOSE 14270
EXPOSE 14250
COPY collector-linux /go/bin/
ENTRYPOINT ["/go/bin/collector-linux"]
2 changes: 1 addition & 1 deletion cmd/collector/app/builder/builder_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const (

defaultTChannelPort = 14267
defaultHTTPPort = 14268
defaultGRPCPort = 14270
defaultGRPCPort = 14250
// CollectorDefaultHealthCheckHTTPPort is the default HTTP Port for health check
CollectorDefaultHealthCheckHTTPPort = 14269
)
Expand Down
2 changes: 1 addition & 1 deletion cmd/collector/app/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"sync"
"testing"

"github.com/crossdock/crossdock-go/assert"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand Down
71 changes: 71 additions & 0 deletions cmd/collector/app/grpcserver/grpc_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package grpcserver

import (
"io/ioutil"
"net"
"os"
"strconv"

"github.com/pkg/errors"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"

"github.com/jaegertracing/jaeger/cmd/collector/app"
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling"
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)

// StartGRPCCollector configures and starts gRPC endpoints exposed by collector.
func StartGRPCCollector(
port int,
server *grpc.Server,
handler *app.GRPCHandler,
samplingStrategy strategystore.StrategyStore,
logger *zap.Logger,
serveErr func(error),
) (net.Addr, error) {
grpcPortStr := ":" + strconv.Itoa(port)
lis, err := net.Listen("tcp", grpcPortStr)
if err != nil {
return nil, errors.Wrap(err, "Failed to listen on gRPC port")
}

grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr))

api_v2.RegisterCollectorServiceServer(server, handler)
api_v2.RegisterSamplingManagerServer(server, sampling.NewGRPCHandler(samplingStrategy))
starServer(server, lis, logger, serveErr)
return lis.Addr(), nil
}

func starServer(server *grpc.Server, lis net.Listener, logger *zap.Logger, serveErr func(error)) {
var port string
if tcpAddr, ok := lis.Addr().(*net.TCPAddr); ok {
port = strconv.Itoa(tcpAddr.Port)
} else {
port = lis.Addr().Network()
}
logger.Info("Starting jaeger-collector gRPC server", zap.String("grpc-port", port))
go func() {
if err := server.Serve(lis); err != nil {
logger.Error("Could not launch gRPC service", zap.Error(err))
serveErr(err)
}
}()
}
90 changes: 90 additions & 0 deletions cmd/collector/app/grpcserver/grpc_server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package grpcserver

import (
"context"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"
"google.golang.org/grpc"
"google.golang.org/grpc/test/bufconn"

"github.com/jaegertracing/jaeger/cmd/collector/app"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/jaegertracing/jaeger/thrift-gen/sampling"
)

// test wrong port number
func TestFailToListen(t *testing.T) {
l, _ := zap.NewDevelopment()
handler := app.NewGRPCHandler(l, &mockSpanProcessor{})
server := grpc.NewServer()
addr, err := StartGRPCCollector(1, server, handler, &mockSamplingStore{}, l, func(e error) {
})
require.Nil(t, addr)
assert.EqualError(t, err, "Failed to listen on gRPC port: listen tcp :1: bind: permission denied")
}

func TestFailServe(t *testing.T) {
lis := bufconn.Listen(0)
lis.Close()
core, logs := observer.New(zap.NewAtomicLevelAt(zapcore.ErrorLevel))
wg := &sync.WaitGroup{}
wg.Add(1)
starServer(grpc.NewServer(), lis, zap.New(core), func(e error) {
assert.Equal(t, 1, len(logs.All()))
assert.Equal(t, "Could not launch gRPC service", logs.All()[0].Message)
wg.Done()
})
wg.Wait()
}

func TestSpanCollector(t *testing.T) {
l, _ := zap.NewDevelopment()
handler := app.NewGRPCHandler(l, &mockSpanProcessor{})
server := grpc.NewServer()
addr, err := StartGRPCCollector(0, server, handler, &mockSamplingStore{}, l, func(e error) {
})
require.NoError(t, err)

conn, err := grpc.Dial(addr.String(), grpc.WithInsecure())
defer conn.Close()
defer server.Stop()
require.NoError(t, err)
c := api_v2.NewCollectorServiceClient(conn)
response, err := c.PostSpans(context.Background(), &api_v2.PostSpansRequest{})
require.NoError(t, err)
assert.Equal(t, true, response.Ok)
}

type mockSamplingStore struct{}

func (s mockSamplingStore) GetSamplingStrategy(serviceName string) (*sampling.SamplingStrategyResponse, error) {
return nil, nil
}

type mockSpanProcessor struct {
}

func (p *mockSpanProcessor) ProcessSpans(spans []*model.Span, spanFormat string) ([]bool, error) {
return []bool{}, nil
}
Loading

0 comments on commit 45ddf2f

Please sign in to comment.