Skip to content

Commit

Permalink
Change Zipkin receiver behavior according to host ingestion status (#148
Browse files Browse the repository at this point in the history
)

* Make Zipkin receiver respond to host ingestion status

This change makes the Zipkin receiver respond to received data according to the host status regarding ingestion.

* Nit cleanup

* Use better type to handle back pressure setting
  • Loading branch information
Paulo Janotti authored Jul 15, 2019
1 parent cd5b4d0 commit 8721e3b
Show file tree
Hide file tree
Showing 11 changed files with 334 additions and 32 deletions.
27 changes: 23 additions & 4 deletions config/configmodels/configmodels.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,24 @@ type Pipelines map[string]*Pipeline
// These are helper structs which you can embed when implementing your specific
// receiver/exporter/processor config storage.

// BackPressureSetting defines if back pressure should be exerted or not.
type BackPressureSetting int

const (
// EnableBackPressure indicates that backpressure is enabled.
EnableBackPressure BackPressureSetting = iota
// DisableBackPressure indicates that backpressure is disabled.
DisableBackPressure
)

// ReceiverSettings defines common settings for a single-protocol receiver configuration.
// Specific receivers can embed this struct and extend it with more fields if needed.
type ReceiverSettings struct {
TypeVal string `mapstructure:"-"`
NameVal string `mapstructure:"-"`
Enabled bool `mapstructure:"enabled"`
Endpoint string `mapstructure:"endpoint"`
TypeVal string `mapstructure:"-"`
NameVal string `mapstructure:"-"`
Enabled bool `mapstructure:"enabled"`
Endpoint string `mapstructure:"endpoint"`
DisableBackPressure bool `mapstructure:"disable-backpressure"`
}

// Name gets the receiver name.
Expand All @@ -155,6 +166,14 @@ func (rs *ReceiverSettings) SetType(typeStr string) {
rs.TypeVal = typeStr
}

// BackPressureSetting gets the back pressure setting of the configuration.
func (rs *ReceiverSettings) BackPressureSetting() BackPressureSetting {
if rs.DisableBackPressure {
return DisableBackPressure
}
return EnableBackPressure
}

// ExporterSettings defines common settings for an exporter configuration.
// Specific exporters can embed this struct and extend it with more fields if needed.
type ExporterSettings struct {
Expand Down
3 changes: 2 additions & 1 deletion exporter/zipkinexporter/zipkin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
zipkinmodel "github.com/openzipkin/zipkin-go/model"
zipkinreporter "github.com/openzipkin/zipkin-go/reporter"

"github.com/open-telemetry/opentelemetry-service/config/configmodels"
"github.com/open-telemetry/opentelemetry-service/internal/config/viperutils"
"github.com/open-telemetry/opentelemetry-service/internal/testutils"
"github.com/open-telemetry/opentelemetry-service/processor/multiconsumer"
Expand Down Expand Up @@ -158,7 +159,7 @@ zipkin:

// Run the Zipkin receiver to "receive spans upload from a client application"
zexp := multiconsumer.NewTraceProcessor(tes)
zi, err := zipkinreceiver.New(":0", zexp)
zi, err := zipkinreceiver.New(":0", configmodels.EnableBackPressure, zexp)
if err != nil {
t.Fatalf("Failed to create a new Zipkin receiver: %v", err)
}
Expand Down
52 changes: 52 additions & 0 deletions observability/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,20 @@ import (
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.opencensus.io/trace"

"github.com/open-telemetry/opentelemetry-service/config/configmodels"
)

var (
mReceiverIngestionBlockedRPCs = stats.Int64(
"oc.io/receiver/ingestion_blocked_rpcs",
"Counts the number of RPCs blocked by the receiver host",
"1")
mReceiverIngestionBlockedRPCsWithDataLoss = stats.Int64(
"oc.io/receiver/ingestion_blocked_silent_data_loss",
"Counts the number of RPCs blocked by the receiver host without back pressure causing data loss",
"1")

mReceiverReceivedSpans = stats.Int64("oc.io/receiver/received_spans", "Counts the number of spans received by the receiver", "1")
mReceiverDroppedSpans = stats.Int64("oc.io/receiver/dropped_spans", "Counts the number of spans dropped by the receiver", "1")

Expand All @@ -44,6 +55,33 @@ var TagKeyReceiver, _ = tag.NewKey("oc_receiver")
// TagKeyExporter defines tag key for Exporter.
var TagKeyExporter, _ = tag.NewKey("oc_exporter")

// ViewReceiverIngestionBlockedRPCs defines the view for the receiver ingestion
// blocked metric. If it causes data loss or not depends if back pressure is
// enabled and the client has available resources to buffer and retry.
// The metric used by the view does not use number of spans to avoid requiring
// de-serializing the RPC message.
var ViewReceiverIngestionBlockedRPCs = &view.View{
Name: mReceiverIngestionBlockedRPCs.Name(),
Description: mReceiverIngestionBlockedRPCs.Description(),
Measure: mReceiverIngestionBlockedRPCs,
Aggregation: view.Sum(),
TagKeys: []tag.Key{TagKeyReceiver},
}

// ViewReceiverIngestionBlockedRPCsWithDataLoss defines the view for the receiver
// ingestion blocked without back pressure to the client. Since there is no back
// pressure the client will assume that the data was ingested and there will be
// data loss.
// The metric used by the view does not use number of spans to avoid requiring
// de-serializing the RPC message.
var ViewReceiverIngestionBlockedRPCsWithDataLoss = &view.View{
Name: mReceiverIngestionBlockedRPCsWithDataLoss.Name(),
Description: mReceiverIngestionBlockedRPCsWithDataLoss.Description(),
Measure: mReceiverIngestionBlockedRPCsWithDataLoss,
Aggregation: view.Sum(),
TagKeys: []tag.Key{TagKeyReceiver},
}

// ViewReceiverReceivedSpans defines the view for the receiver received spans metric.
var ViewReceiverReceivedSpans = &view.View{
Name: mReceiverReceivedSpans.Name(),
Expand Down Expand Up @@ -82,6 +120,8 @@ var ViewExporterDroppedSpans = &view.View{

// AllViews has the views for the metrics provided by the agent.
var AllViews = []*view.View{
ViewReceiverIngestionBlockedRPCs,
ViewReceiverIngestionBlockedRPCsWithDataLoss,
ViewReceiverReceivedSpans,
ViewReceiverDroppedSpans,
ViewExporterReceivedSpans,
Expand All @@ -96,6 +136,18 @@ func ContextWithReceiverName(ctx context.Context, receiverName string) context.C
return ctx
}

// RecordIngestionBlockedMetrics records metrics related to the receiver responses
// when the host blocks ingestion. If back pressure is disabled the metric for
// respective data loss is recorded.
// Use it with a context.Context generated using ContextWithReceiverName().
func RecordIngestionBlockedMetrics(ctxWithTraceReceiverName context.Context, backPressureSetting configmodels.BackPressureSetting) {
if backPressureSetting == configmodels.DisableBackPressure {
// In this case data loss will happen, record the proper metric.
stats.Record(ctxWithTraceReceiverName, mReceiverIngestionBlockedRPCsWithDataLoss.M(1))
}
stats.Record(ctxWithTraceReceiverName, mReceiverIngestionBlockedRPCs.M(1))
}

// RecordTraceReceiverMetrics records the number of the spans received and dropped by the receiver.
// Use it with a context.Context generated using ContextWithReceiverName().
func RecordTraceReceiverMetrics(ctxWithTraceReceiverName context.Context, receivedSpans int, droppedSpans int) {
Expand Down
9 changes: 9 additions & 0 deletions observability/observability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"testing"

"github.com/open-telemetry/opentelemetry-service/config/configmodels"
"github.com/open-telemetry/opentelemetry-service/observability"
"github.com/open-telemetry/opentelemetry-service/observability/observabilitytest"
)
Expand All @@ -35,11 +36,19 @@ func TestTracePieplineRecordedMetrics(t *testing.T) {

receiverCtx := observability.ContextWithReceiverName(context.Background(), receiverName)
observability.RecordTraceReceiverMetrics(receiverCtx, 17, 13)
observability.RecordIngestionBlockedMetrics(receiverCtx, configmodels.EnableBackPressure)
observability.RecordIngestionBlockedMetrics(receiverCtx, configmodels.DisableBackPressure)
exporterCtx := observability.ContextWithExporterName(receiverCtx, exporterName)
observability.RecordTraceExporterMetrics(exporterCtx, 27, 23)
if err := observabilitytest.CheckValueViewReceiverReceivedSpans(receiverName, 17); err != nil {
t.Fatalf("When check recorded values: want nil got %v", err)
}
if err := observabilitytest.CheckValueViewReceiverIngestionBlockedRPCs(receiverName, 2); err != nil {
t.Fatalf("When check recorded values: want nil got %v", err)
}
if err := observabilitytest.CheckValueViewReceiverIngestionBlockedRPCsWithDataLoss(receiverName, 1); err != nil {
t.Fatalf("When check recorded values: want nil got %v", err)
}
if err := observabilitytest.CheckValueViewReceiverDroppedSpans(receiverName, 13); err != nil {
t.Fatalf("When check recorded values: want nil got %v", err)
}
Expand Down
16 changes: 16 additions & 0 deletions observability/observabilitytest/observabilitytest.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,22 @@ func SetupRecordedMetricsTest() (doneFn func()) {
}
}

// CheckValueViewReceiverIngestionBlockedRPCs checks that for the current exported value in the ViewReceiverIngestionBlockedRPCs
// for {TagKeyReceiver: receiverName, TagKeyExporter: exporterTagName} is equal to "value".
// In tests that this function is called it is required to also call SetupRecordedMetricsTest as first thing.
func CheckValueViewReceiverIngestionBlockedRPCs(receiverName string, value int) error {
return checkValueForView(observability.ViewReceiverIngestionBlockedRPCs.Name,
wantsTagsForReceiverView(receiverName), int64(value))
}

// CheckValueViewReceiverIngestionBlockedRPCsWithDataLoss checks that for the current exported value in the ViewReceiverIngestionBlockedRPCsWithDataLoss
// for {TagKeyReceiver: receiverName, TagKeyExporter: exporterTagName} is equal to "value".
// In tests that this function is called it is required to also call SetupRecordedMetricsTest as first thing.
func CheckValueViewReceiverIngestionBlockedRPCsWithDataLoss(receiverName string, value int) error {
return checkValueForView(observability.ViewReceiverIngestionBlockedRPCsWithDataLoss.Name,
wantsTagsForReceiverView(receiverName), int64(value))
}

// CheckValueViewExporterReceivedSpans checks that for the current exported value in the ViewExporterReceivedSpans
// for {TagKeyReceiver: receiverName, TagKeyExporter: exporterTagName} is equal to "value".
// When this function is called it is required to also call SetupRecordedMetricsTest as first thing.
Expand Down
9 changes: 5 additions & 4 deletions receiver/zipkinreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,11 @@ func TestLoadConfig(t *testing.T) {
assert.Equal(t, r1,
&Config{
ReceiverSettings: configmodels.ReceiverSettings{
TypeVal: typeStr,
NameVal: "zipkin/customname",
Endpoint: "127.0.0.1:8765",
Enabled: true,
TypeVal: typeStr,
NameVal: "zipkin/customname",
Endpoint: "127.0.0.1:8765",
Enabled: true,
DisableBackPressure: true,
},
})
}
2 changes: 1 addition & 1 deletion receiver/zipkinreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (f *factory) CreateTraceReceiver(
) (receiver.TraceReceiver, error) {

rCfg := cfg.(*Config)
return New(rCfg.Endpoint, nextConsumer)
return New(rCfg.Endpoint, rCfg.BackPressureSetting(), nextConsumer)
}

// CreateMetricsReceiver creates a metrics receiver based on provided config.
Expand Down
9 changes: 9 additions & 0 deletions receiver/zipkinreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/open-telemetry/opentelemetry-service/config/configerror"
"github.com/open-telemetry/opentelemetry-service/config/configmodels"
"github.com/open-telemetry/opentelemetry-service/consumer/consumerdata"
"github.com/open-telemetry/opentelemetry-service/receiver"
)
Expand All @@ -47,6 +48,14 @@ func TestCreateReceiver(t *testing.T) {
tReceiver, err := factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, &mockTraceConsumer{})
assert.Nil(t, err, "receiver creation failed")
assert.NotNil(t, tReceiver, "receiver creation failed")
assert.Equal(t, configmodels.EnableBackPressure, tReceiver.(*ZipkinReceiver).backPressureSetting)

rCfg := cfg.(*Config)
rCfg.DisableBackPressure = true
tReceiver, err = factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, &mockTraceConsumer{})
assert.Nil(t, err, "receiver creation failed")
assert.NotNil(t, tReceiver, "receiver creation failed")
assert.Equal(t, configmodels.DisableBackPressure, tReceiver.(*ZipkinReceiver).backPressureSetting)

mReceiver, err := factory.CreateMetricsReceiver(zap.NewNop(), cfg, nil)
assert.Equal(t, err, configerror.ErrDataTypeIsNotSupported)
Expand Down
1 change: 1 addition & 0 deletions receiver/zipkinreceiver/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ receivers:
zipkin/customname:
endpoint: "127.0.0.1:8765"
enabled: true
disable-backpressure: true

processors:
exampleprocessor:
Expand Down
Loading

0 comments on commit 8721e3b

Please sign in to comment.