Skip to content

Commit

Permalink
Add gRPC communication between agent and collector (#1165)
Browse files Browse the repository at this point in the history
* Add gRPC communication between agent and collector

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* uncomment make and fix comment

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* Fix grpc handler initialization

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* Copy process if missing

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* log only warn and error from grpc

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* Fix review comments

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* Add todo

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* Add Roundrobin grpc load balancer

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* Fix comment

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* Fix review comments

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* Change grpc target name

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* Defaul return error

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* Fix back tchannel collector addr

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* Remove collector from grpc reporter flag

Signed-off-by: Pavol Loffay <ploffay@redhat.com>
  • Loading branch information
pavolloffay authored Nov 13, 2018
1 parent f2eb7d1 commit 9635a33
Show file tree
Hide file tree
Showing 43 changed files with 3,967 additions and 1,259 deletions.
44 changes: 27 additions & 17 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ TOP_PKGS := $(shell glide novendor | \
grep -v \
-e ./thrift-gen/... \
-e ./swagger-gen/... \
-e ./proto-gen/... \
-e ./examples/... \
-e ./scripts/...\
)
Expand All @@ -17,6 +18,7 @@ ALL_SRC := $(shell find . -name "*.go" | \
-e vendor \
-e /thrift-gen/ \
-e /swagger-gen/ \
-e /proto-gen/ \
-e /examples/ \
-e doc.go \
-e model.pb.go \
Expand Down Expand Up @@ -335,6 +337,20 @@ generate-mocks: install-mockery
echo-version:
@echo $(GIT_CLOSEST_TAG)

PROTO_INCLUDES := \
-I model/proto \
-I vendor/github.com/grpc-ecosystem/grpc-gateway \
-I vendor/github.com/gogo/googleapis \
-I vendor/github.com/gogo/protobuf
# Remapping of std types to gogo types (must not contain spaces)
PROTO_GOGO_MAPPINGS := $(shell echo \
Mgoogle/protobuf/timestamp.proto=github.com/gogo/protobuf/types, \
Mgoogle/protobuf/duration.proto=github.com/gogo/protobuf/types, \
Mgoogle/protobuf/empty.proto=github.com/gogo/protobuf/types, \
Mgoogle/api/annotations.proto=github.com/gogo/googleapis/google/api, \
Mmodel.proto=github.com/jaegertracing/jaeger/model \
| sed 's/ //g')

.PHONY: proto
proto:
# Generate gogo, gRPC-Gateway, swagger, go-validators output.
Expand Down Expand Up @@ -362,31 +378,25 @@ proto:
# (https://medium.com/@linchenon/generate-grpc-and-protobuf-libraries-with-containers-c15ba4e4f3ad)
#
protoc \
-I model/proto \
-I vendor/github.com/grpc-ecosystem/grpc-gateway/ \
-I vendor/github.com/gogo/googleapis/ \
-I vendor/ \
--gogo_out=plugins=grpc,\
Mgoogle/protobuf/timestamp.proto=github.com/gogo/protobuf/types,\
Mgoogle/protobuf/duration.proto=github.com/gogo/protobuf/types,\
Mgoogle/protobuf/empty.proto=github.com/gogo/protobuf/types,\
Mgoogle/api/annotations.proto=github.com/gogo/googleapis/google/api:\
$$GOPATH/src/github.com/jaegertracing/jaeger/model/ \
--grpc-gateway_out=\
Mgoogle/protobuf/timestamp.proto=github.com/gogo/protobuf/types,\
Mgoogle/protobuf/empty.proto=github.com/gogo/protobuf/types,\
Mgoogle/api/annotations.proto=github.com/gogo/googleapis/google/api:\
$$GOPATH/src/github.com/jaegertracing/jaeger/model \
--swagger_out=model/proto/openapi/ \
$(PROTO_INCLUDES) \
--gogo_out=plugins=grpc,$(PROTO_GOGO_MAPPINGS):$(PWD)/model/ \
model/proto/model.proto

protoc \
$(PROTO_INCLUDES) \
--gogo_out=plugins=grpc,$(PROTO_GOGO_MAPPINGS):$(PWD)/proto-gen/api_v2/ \
--grpc-gateway_out=$(PROTO_GOGO_MAPPINGS):$(PWD)/proto-gen/api_v2/ \
--swagger_out=$(PWD)/proto-gen/openapi/ \
model/proto/api_v2.proto

protoc \
-I model/proto \
--go_out=$$GOPATH/src/github.com/jaegertracing/jaeger/model/prototest/ \
--go_out=$(PWD)/model/prototest/ \
model/proto/model_test.proto

.PHONY: proto-install
proto-install:
go get -u github.com/golang/glog
go install \
./vendor/github.com/golang/protobuf/protoc-gen-go \
./vendor/github.com/gogo/protobuf/protoc-gen-gogo \
Expand Down
2 changes: 1 addition & 1 deletion cmd/agent/app/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestAgentSamplingEndpoint(t *testing.T) {
require.NoError(t, err)
body, err := ioutil.ReadAll(resp.Body)
assert.NoError(t, err)
assert.Equal(t, "tcollector error: no peers available\n", string(body))
assert.Equal(t, "collector error: no peers available\n", string(body))
})
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/agent/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func AddFlags(flags *flag.FlagSet) {
flags.String(
httpServerHostPort,
defaultHTTPServerHostPort,
"host:port of the http server (e.g. for /sampling point and /baggage endpoint)")
"host:port of the http server (e.g. for /sampling point and /baggageRestrictions endpoint)")
}

// InitFromViper initializes Builder with properties retrieved from Viper.
Expand Down
4 changes: 2 additions & 2 deletions cmd/agent/app/httpserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (h *httpHandler) serveSamplingHTTP(w http.ResponseWriter, r *http.Request,
resp, err := h.manager.GetSamplingStrategy(service)
if err != nil {
h.metrics.TCollectorProxyFailures.Inc(1)
http.Error(w, fmt.Sprintf("tcollector error: %+v", err), http.StatusInternalServerError)
http.Error(w, fmt.Sprintf("collector error: %+v", err), http.StatusInternalServerError)
return
}
jsonBytes, err := json.Marshal(resp)
Expand Down Expand Up @@ -138,7 +138,7 @@ func (h *httpHandler) serveBaggageHTTP(w http.ResponseWriter, r *http.Request) {
resp, err := h.manager.GetBaggageRestrictions(service)
if err != nil {
h.metrics.TCollectorProxyFailures.Inc(1)
http.Error(w, fmt.Sprintf("tcollector error: %+v", err), http.StatusInternalServerError)
http.Error(w, fmt.Sprintf("collector error: %+v", err), http.StatusInternalServerError)
return
}
// NB. it's literally impossible for this Marshal to fail
Expand Down
6 changes: 3 additions & 3 deletions cmd/agent/app/httpserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,10 @@ func TestHTTPHandlerErrors(t *testing.T) {
},
},
{
description: "sampler tcollector error",
description: "sampler collector error",
url: "?service=Y",
statusCode: http.StatusInternalServerError,
body: "tcollector error: no mock response provided\n",
body: "collector error: no mock response provided\n",
metrics: []mTestutils.ExpectedMetric{
{Name: "http-server.errors", Tags: map[string]string{"source": "tcollector-proxy", "status": "5xx"}, Value: 1},
},
Expand All @@ -158,7 +158,7 @@ func TestHTTPHandlerErrors(t *testing.T) {
description: "baggage tcollector error",
url: "/baggageRestrictions?service=Y",
statusCode: http.StatusInternalServerError,
body: "tcollector error: no mock response provided\n",
body: "collector error: no mock response provided\n",
metrics: []mTestutils.ExpectedMetric{
{Name: "http-server.errors", Tags: map[string]string{"source": "tcollector-proxy", "status": "5xx"}, Value: 1},
},
Expand Down
49 changes: 49 additions & 0 deletions cmd/agent/app/reporter/flags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package reporter

import (
"flag"
"fmt"

"github.com/spf13/viper"
)

const (
reporterType = "reporter.type"
// TCHANNEL is name of tchannel reporter.
TCHANNEL Type = "tchannel"
// GRPC is name of gRPC reporter.
GRPC Type = "grpc"
)

// Type defines type of reporter.
type Type string

// Options holds generic reporter configuration.
type Options struct {
ReporterType Type
}

// AddFlags adds flags for Options.
func AddFlags(flags *flag.FlagSet) {
flags.String(reporterType, string(TCHANNEL), fmt.Sprintf("Reporter type to use e.g. %s, %s", string(TCHANNEL), string(GRPC)))
}

// InitFromViper initializes Options with properties retrieved from Viper.
func (b *Options) InitFromViper(v *viper.Viper) *Options {
b.ReporterType = Type(v.GetString(reporterType))
return b
}
43 changes: 43 additions & 0 deletions cmd/agent/app/reporter/flags_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package reporter

import (
"flag"
"testing"

"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestBingFlags(t *testing.T) {
v := viper.New()
command := cobra.Command{}
flags := &flag.FlagSet{}
AddFlags(flags)
command.PersistentFlags().AddGoFlagSet(flags)
v.BindPFlags(command.PersistentFlags())

err := command.ParseFlags([]string{
"--reporter.type=grpc",
})
require.NoError(t, err)

b := &Options{}
b.InitFromViper(v)
assert.Equal(t, Type("grpc"), b.ReporterType)
}
63 changes: 63 additions & 0 deletions cmd/agent/app/reporter/grpc/collector_proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package grpc

import (
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"

"github.com/jaegertracing/jaeger/cmd/agent/app/httpserver"
aReporter "github.com/jaegertracing/jaeger/cmd/agent/app/reporter"
)

// ProxyBuilder holds objects communicating with collector
type ProxyBuilder struct {
reporter aReporter.Reporter
manager httpserver.ClientConfigManager
}

// NewCollectorProxy creates ProxyBuilder
func NewCollectorProxy(o *Options, logger *zap.Logger) *ProxyBuilder {
// It does not return error if the collector is not running
// a way to fail immediately is to call WithBlock and WithTimeout
var conn *grpc.ClientConn
if len(o.CollectorHostPort) > 1 {
r, _ := manual.GenerateAndRegisterManualResolver()
var resolvedAddrs []resolver.Address
for _, addr := range o.CollectorHostPort {
resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: addr})
}
r.InitialAddrs(resolvedAddrs)
conn, _ = grpc.Dial(r.Scheme()+":///round_robin", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
} else {
conn, _ = grpc.Dial(o.CollectorHostPort[0], grpc.WithInsecure())
}
return &ProxyBuilder{
reporter: NewReporter(conn, logger),
manager: NewSamplingManager(conn)}
}

// GetReporter returns Reporter
func (b ProxyBuilder) GetReporter() aReporter.Reporter {
return b.reporter
}

// GetManager returns manager
func (b ProxyBuilder) GetManager() httpserver.ClientConfigManager {
return b.manager
}
65 changes: 65 additions & 0 deletions cmd/agent/app/reporter/grpc/collector_proxy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package grpc

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"google.golang.org/grpc"

"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/jaegertracing/jaeger/thrift-gen/jaeger"
)

func TestProxyBuilder(t *testing.T) {
proxy := NewCollectorProxy(&Options{CollectorHostPort: []string{"localhost:0000"}}, zap.NewNop())
require.NotNil(t, proxy)
assert.NotNil(t, proxy.GetReporter())
assert.NotNil(t, proxy.GetManager())
}

func TestMultipleCollectors(t *testing.T) {
spanHandler1 := &mockSpanHandler{}
s1, addr1 := initializeGRPCTestServer(t, func(s *grpc.Server) {
api_v2.RegisterCollectorServiceServer(s, spanHandler1)
})
defer s1.Stop()
spanHandler2 := &mockSpanHandler{}
s2, addr2 := initializeGRPCTestServer(t, func(s *grpc.Server) {
api_v2.RegisterCollectorServiceServer(s, spanHandler2)
})
defer s2.Stop()

proxy := NewCollectorProxy(&Options{CollectorHostPort: []string{addr1.String(), addr2.String()}}, zap.NewNop())
require.NotNil(t, proxy)
assert.NotNil(t, proxy.GetReporter())
assert.NotNil(t, proxy.GetManager())

var bothServers = false
// TODO do not iterate, just create two batches
for i := 0; i < 10; i++ {
r := proxy.GetReporter()
err := r.EmitBatch(&jaeger.Batch{Spans: []*jaeger.Span{{OperationName: "op"}}, Process: &jaeger.Process{ServiceName: "service"}})
require.NoError(t, err)
if len(spanHandler1.getRequests()) > 0 && len(spanHandler2.getRequests()) > 0 {
bothServers = true
break
}
}
assert.Equal(t, true, bothServers)
}
Loading

0 comments on commit 9635a33

Please sign in to comment.