Skip to content

Commit

Permalink
Enable retry and sending queue for Jaeger exporter
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu committed Jul 20, 2020
1 parent 4046234 commit 6ed0990
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 27 deletions.
13 changes: 13 additions & 0 deletions exporter/jaegerexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,19 @@ connection. See [grpc.WithInsecure()](https://godoc.org/google.golang.org/grpc#W
of authority (e.g. :authority header field) in requests (typically used for testing).
- `balancer_name`(default = pick_first): Sets the balancer in grpclb_policy to discover the servers.
See [grpc loadbalancing example](https://github.com/grpc/grpc-go/blob/master/examples/features/load_balancing/README.md).
- `timeout` (default = 5s): Is the timeout for every attempt to send data to the backend.
- `retry_on_failure`
- `disabled` (default = false)
- `initial_interval` (default = 5s): Time to wait after the first failure before retrying; ignored if `disabled` is `true`
- `max_interval` (default = 30s): Is the upper bound on backoff; ignored if `disabled` is `true`
- `max_elapsed_time` (default = 120s): Is the maximum amount of time spent trying to send a batch; ignored if `disabled` is `true`
- `sending_queue`
- `disabled` (default = true)
- `num_consumers` (default = 10): Number of consumers that dequeue batches; ignored if `disabled` is `true`
- `queue_size` (default = 5000): Maximum number of batches kept in memory before data; ignored if `disabled` is `true`;
User should calculate this as `num_seconds * requests_per_second` where:
- `num_seconds` is the number of seconds to buffer in case of a backend outage
- `requests_per_second` is the average number of requests per seconds.

Example:

Expand Down
6 changes: 5 additions & 1 deletion exporter/jaegerexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@ package jaegerexporter
import (
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

// Config defines configuration for Jaeger gRPC exporter.
type Config struct {
configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
exporterhelper.TimeoutSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
exporterhelper.QueueSettings `mapstructure:"sending_queue"`
exporterhelper.RetrySettings `mapstructure:"retry_on_failure"`

configgrpc.GRPCClientSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
}
45 changes: 32 additions & 13 deletions exporter/jaegerexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ import (
"context"
"path"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

func TestLoadConfig(t *testing.T) {
Expand All @@ -40,20 +43,36 @@ func TestLoadConfig(t *testing.T) {
require.NotNil(t, cfg)

e0 := cfg.Exporters["jaeger"]

// Endpoint doesn't have a default value so set it directly.
defaultCfg := factory.CreateDefaultConfig().(*Config)
defaultCfg.Endpoint = "some.target:55678"
defaultCfg.GRPCClientSettings.Endpoint = defaultCfg.Endpoint
defaultCfg.GRPCClientSettings.TLSSetting = configtls.TLSClientSetting{
Insecure: true,
}
assert.Equal(t, defaultCfg, e0)
assert.Equal(t, e0, factory.CreateDefaultConfig())

e1 := cfg.Exporters["jaeger/2"]
assert.Equal(t, "jaeger/2", e1.(*Config).Name())
assert.Equal(t, "a.new.target:1234", e1.(*Config).Endpoint)
assert.Equal(t, "round_robin", e1.(*Config).GRPCClientSettings.BalancerName)
assert.Equal(t, e1,
&Config{
ExporterSettings: configmodels.ExporterSettings{
NameVal: "jaeger/2",
TypeVal: "jaeger",
},
TimeoutSettings: exporterhelper.TimeoutSettings{
Timeout: 10 * time.Second,
},
RetrySettings: exporterhelper.RetrySettings{
Disabled: false,
InitialInterval: 10 * time.Second,
MaxInterval: 1 * time.Minute,
MaxElapsedTime: 10 * time.Minute,
},
QueueSettings: exporterhelper.QueueSettings{
Disabled: false,
NumConsumers: 2,
QueueSize: 10,
},
GRPCClientSettings: configgrpc.GRPCClientSettings{
Endpoint: "a.new.target:1234",
WriteBufferSize: 512 * 1024,
BalancerName: "round_robin",
},
})

params := component.ExporterCreateParams{Logger: zap.NewNop()}
te, err := factory.CreateTraceExporter(context.Background(), params, e1)
require.NoError(t, err)
Expand Down
19 changes: 12 additions & 7 deletions exporter/jaegerexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,29 +29,34 @@ import (
jaegertranslator "go.opentelemetry.io/collector/translator/trace/jaeger"
)

// New returns a new Jaeger gRPC exporter.
// newTraceExporter returns a new Jaeger gRPC exporter.
// The exporter name is the name to be used in the observability of the exporter.
// The collectorEndpoint should be of the form "hostname:14250" (a gRPC target).
func New(config *Config) (component.TraceExporter, error) {
func newTraceExporter(cfg *Config) (component.TraceExporter, error) {

opts, err := config.GRPCClientSettings.ToDialOptions()
opts, err := cfg.GRPCClientSettings.ToDialOptions()
if err != nil {
return nil, err
}

client, err := grpc.Dial(config.GRPCClientSettings.Endpoint, opts...)
client, err := grpc.Dial(cfg.GRPCClientSettings.Endpoint, opts...)
if err != nil {
return nil, err
}

collectorServiceClient := jaegerproto.NewCollectorServiceClient(client)
s := &protoGRPCSender{
client: collectorServiceClient,
metadata: metadata.New(config.GRPCClientSettings.Headers),
waitForReady: config.WaitForReady,
metadata: metadata.New(cfg.GRPCClientSettings.Headers),
waitForReady: cfg.WaitForReady,
}

exp, err := exporterhelper.NewTraceExporter(config, s.pushTraceData)
exp, err := exporterhelper.NewTraceExporter(
cfg, s.pushTraceData,
exporterhelper.WithTimeout(cfg.TimeoutSettings),
exporterhelper.WithRetry(cfg.RetrySettings),
exporterhelper.WithQueue(cfg.QueueSettings),
)

return exp, err
}
Expand Down
4 changes: 2 additions & 2 deletions exporter/jaegerexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,9 @@ func TestNew(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := New(&tt.args.config)
got, err := newTraceExporter(&tt.args.config)
if (err != nil) != tt.wantErr {
t.Errorf("New() error = %v, wantErr %v", err, tt.wantErr)
t.Errorf("newTraceExporter() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got == nil {
Expand Down
8 changes: 7 additions & 1 deletion exporter/jaegerexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,17 @@ func NewFactory() component.ExporterFactory {
}

func createDefaultConfig() configmodels.Exporter {
// TODO: Enable the queued settings.
qs := exporterhelper.CreateDefaultQueueSettings()
qs.Disabled = true
return &Config{
ExporterSettings: configmodels.ExporterSettings{
TypeVal: typeStr,
NameVal: typeStr,
},
TimeoutSettings: exporterhelper.CreateDefaultTimeoutSettings(),
RetrySettings: exporterhelper.CreateDefaultRetrySettings(),
QueueSettings: qs,
GRPCClientSettings: configgrpc.GRPCClientSettings{
// We almost read 0 bytes, so no need to tune ReadBufferSize.
WriteBufferSize: 512 * 1024,
Expand All @@ -65,7 +71,7 @@ func createTraceExporter(
return nil, err
}

exp, err := New(expCfg)
exp, err := newTraceExporter(expCfg)
if err != nil {
return nil, err
}
Expand Down
13 changes: 10 additions & 3 deletions exporter/jaegerexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,19 @@ processors:

exporters:
jaeger:
endpoint: "some.target:55678"
insecure: true
jaeger/2:
endpoint: "a.new.target:1234"
balancer_name: "round_robin"

timeout: 10s
sending_queue:
disabled: false
num_consumers: 2
queue_size: 10
retry_on_failure:
disabled: false
initial_interval: 10s
max_interval: 60s
max_elapsed_time: 10m

service:
pipelines:
Expand Down
2 changes: 2 additions & 0 deletions testbed/testbed/senders.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ func NewJaegerGRPCDataSender(host string, port int) *JaegerGRPCDataSender {
func (je *JaegerGRPCDataSender) Start() error {
factory := jaegerexporter.NewFactory()
cfg := factory.CreateDefaultConfig().(*jaegerexporter.Config)
// Disable retries, we should push data and if error just log it.
cfg.RetrySettings.Disabled = true
cfg.Endpoint = fmt.Sprintf("%s:%d", je.Host, je.Port)
cfg.TLSSetting = configtls.TLSClientSetting{
Insecure: true,
Expand Down

0 comments on commit 6ed0990

Please sign in to comment.