Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add factory and update config for tail sampling processor #200

Merged
merged 6 commits into from
Jul 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions processor/tailsampling/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tailsampling

import (
"time"

"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-service/config/configerror"
"github.com/open-telemetry/opentelemetry-service/config/configmodels"
"github.com/open-telemetry/opentelemetry-service/consumer"
"github.com/open-telemetry/opentelemetry-service/processor"
"github.com/open-telemetry/opentelemetry-service/service/builder"
)

const (
// The value of "type" Tail Sampling in configuration.
typeStr = "tail-sampling"
)

// Factory is the factory for Tail Sampling processor.
type Factory struct {
}

// Type gets the type of the config created by this factory.
func (f *Factory) Type() string {
return typeStr
}

// CreateDefaultConfig creates the default configuration for processor.
func (f *Factory) CreateDefaultConfig() configmodels.Processor {
return &builder.TailBasedCfg{
DecisionWait: 30 * time.Second,
NumTraces: 50000,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason for those numbers?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are arbitrary and there is no good criteria other than "reasonableness", that I am aware :), to select those defaults.

}
}

// CreateTraceProcessor creates a trace processor based on this config.
func (f *Factory) CreateTraceProcessor(
logger *zap.Logger,
nextConsumer consumer.TraceConsumer,
cfg configmodels.Processor,
) (processor.TraceProcessor, error) {
tCfg := cfg.(*builder.TailBasedCfg)
return NewTraceProcessor(logger, nextConsumer, *tCfg)
}

// CreateMetricsProcessor creates a metrics processor based on this config.
func (f *Factory) CreateMetricsProcessor(
logger *zap.Logger,
nextConsumer consumer.MetricsConsumer,
cfg configmodels.Processor,
) (processor.MetricsProcessor, error) {
return nil, configerror.ErrDataTypeIsNotSupported
}
45 changes: 45 additions & 0 deletions processor/tailsampling/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tailsampling

import (
"testing"

"github.com/stretchr/testify/assert"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-service/exporter/exportertest"
)

func TestCreateDefaultConfig(t *testing.T) {
factory := &Factory{}

cfg := factory.CreateDefaultConfig()
assert.NotNil(t, cfg, "failed to create default config")
}

func TestCreateProcessor(t *testing.T) {
factory := &Factory{}

cfg := factory.CreateDefaultConfig()

tp, err := factory.CreateTraceProcessor(zap.NewNop(), exportertest.NewNopTraceExporter(), cfg)
assert.NotNil(t, tp)
assert.NoError(t, err, "cannot create trace processor")

mp, err := factory.CreateMetricsProcessor(zap.NewNop(), nil, cfg)
assert.Nil(t, mp)
assert.Error(t, err, "should not be able to create metric processor")
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package tailsampling

import (
"context"
"errors"
"runtime"
"sync"
"sync/atomic"
Expand All @@ -30,7 +31,8 @@ import (
"github.com/open-telemetry/opentelemetry-service/consumer/consumerdata"
"github.com/open-telemetry/opentelemetry-service/internal/collector/processor/idbatcher"
"github.com/open-telemetry/opentelemetry-service/internal/collector/sampling"
"github.com/open-telemetry/opentelemetry-service/observability"
"github.com/open-telemetry/opentelemetry-service/processor"
"github.com/open-telemetry/opentelemetry-service/service/builder"
)

// Policy combines a sampling policy evaluator with the destinations to be
Expand All @@ -54,6 +56,7 @@ type traceKey string
// policies to sample traces.
type tailSamplingSpanProcessor struct {
ctx context.Context
nextConsumer consumer.TraceConsumer
start sync.Once
maxNumTraces uint64
policies []*Policy
Expand All @@ -69,40 +72,33 @@ const (
sourceFormat = "tail-sampling"
)

var _ consumer.TraceConsumer = (*tailSamplingSpanProcessor)(nil)
var _ processor.TraceProcessor = (*tailSamplingSpanProcessor)(nil)

// NewTailSamplingSpanProcessor creates a TailSamplingSpanProcessor with the given policies.
// It will keep maxNumTraces on memory and will attempt to wait until decisionWait before evaluating if
// a trace should be sampled or not. Providing expectedNewTracesPerSec helps with allocating data structures
// with closer to actual usage size.
func NewTailSamplingSpanProcessor(
policies []*Policy,
maxNumTraces, expectedNewTracesPerSec uint64,
decisionWait time.Duration,
logger *zap.Logger) (consumer.TraceConsumer, error) {
// NewTraceProcessor returns a processor.TraceProcessor that will perform tail sampling according to the given
// configuration.
func NewTraceProcessor(logger *zap.Logger, nextConsumer consumer.TraceConsumer, cfg builder.TailBasedCfg) (processor.TraceProcessor, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a good reason that TailBasedCfg is in builder package? If not then perhaps it should be in this package?
Actually, I am not sure sampling_builder.go should exist in builder package at all, but that's probably a topic for a separate cleanup PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that config is inherited from opencensus-service where sampling is configured through builder. I'll move it to processor/tailsampling and clean up the old configs in the next PR.

if nextConsumer == nil {
return nil, errors.New("nextConsumer is nil")
}

numDecisionBatches := uint64(decisionWait.Seconds())
inBatcher, err := idbatcher.New(numDecisionBatches, expectedNewTracesPerSec, uint64(2*runtime.NumCPU()))
numDecisionBatches := uint64(cfg.DecisionWait.Seconds())
inBatcher, err := idbatcher.New(numDecisionBatches, cfg.ExpectedNewTracesPerSec, uint64(2*runtime.NumCPU()))
if err != nil {
return nil, err
}

tsp := &tailSamplingSpanProcessor{
ctx: context.Background(),
maxNumTraces: maxNumTraces,
policies: policies,
nextConsumer: nextConsumer,
maxNumTraces: cfg.NumTraces,
logger: logger,
decisionBatcher: inBatcher,
}

for _, policy := range policies {
policyCtx, err := tag.New(tsp.ctx, tag.Upsert(tagPolicyKey, policy.Name), tag.Upsert(observability.TagKeyReceiver, sourceFormat))
if err != nil {
return nil, err
}
policy.ctx = policyCtx
}
// TODO(#146): add policies to TailBasedCfg
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Policies have an interface and we will need to add the config factories for each one (as a temporary alternative we can use the old code to read them but I want to double-check that before going in that direction).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, either way I would prefer to add it in a follow-up PR.

tsp.policyTicker = &policyTicker{onTick: tsp.samplingPolicyOnTick}
tsp.deleteChan = make(chan traceKey, maxNumTraces)
tsp.deleteChan = make(chan traceKey, cfg.NumTraces)

return tsp, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@ import (
"testing"
"time"

tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-service/consumer"
"github.com/open-telemetry/opentelemetry-service/consumer/consumerdata"
"github.com/open-telemetry/opentelemetry-service/exporter/exportertest"
"github.com/open-telemetry/opentelemetry-service/internal/collector/processor/idbatcher"
"github.com/open-telemetry/opentelemetry-service/internal/collector/sampling"
"github.com/open-telemetry/opentelemetry-service/service/builder"
tracetranslator "github.com/open-telemetry/opentelemetry-service/translator/trace"

tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
"go.uber.org/zap"
)

const (
Expand All @@ -36,7 +38,12 @@ const (

func TestSequentialTraceArrival(t *testing.T) {
traceIds, batches := generateIdsAndBatches(128)
sp, _ := NewTailSamplingSpanProcessor(newTestPolicy(), uint64(2*len(traceIds)), 64, defaultTestDecisionWait, zap.NewNop())
cfg := builder.TailBasedCfg{
DecisionWait: defaultTestDecisionWait,
NumTraces: uint64(2 * len(traceIds)),
ExpectedNewTracesPerSec: 64,
}
sp, _ := NewTraceProcessor(zap.NewNop(), &exportertest.SinkTraceExporter{}, cfg)
tsp := sp.(*tailSamplingSpanProcessor)
for _, batch := range batches {
tsp.ConsumeTraceData(context.Background(), batch)
Expand All @@ -57,7 +64,12 @@ func TestConcurrentTraceArrival(t *testing.T) {
traceIds, batches := generateIdsAndBatches(128)

var wg sync.WaitGroup
sp, _ := NewTailSamplingSpanProcessor(newTestPolicy(), uint64(2*len(traceIds)), 64, defaultTestDecisionWait, zap.NewNop())
cfg := builder.TailBasedCfg{
DecisionWait: defaultTestDecisionWait,
NumTraces: uint64(2 * len(traceIds)),
ExpectedNewTracesPerSec: 64,
}
sp, _ := NewTraceProcessor(zap.NewNop(), &exportertest.SinkTraceExporter{}, cfg)
tsp := sp.(*tailSamplingSpanProcessor)
for _, batch := range batches {
// Add the same traceId twice.
Expand Down Expand Up @@ -90,7 +102,12 @@ func TestConcurrentTraceArrival(t *testing.T) {
func TestSequentialTraceMapSize(t *testing.T) {
traceIds, batches := generateIdsAndBatches(210)
const maxSize = 100
sp, _ := NewTailSamplingSpanProcessor(newTestPolicy(), uint64(maxSize), 64, defaultTestDecisionWait, zap.NewNop())
cfg := builder.TailBasedCfg{
DecisionWait: defaultTestDecisionWait,
NumTraces: uint64(maxSize),
ExpectedNewTracesPerSec: 64,
}
sp, _ := NewTraceProcessor(zap.NewNop(), &exportertest.SinkTraceExporter{}, cfg)
tsp := sp.(*tailSamplingSpanProcessor)
for _, batch := range batches {
tsp.ConsumeTraceData(context.Background(), batch)
Expand All @@ -108,7 +125,12 @@ func TestConcurrentTraceMapSize(t *testing.T) {
_, batches := generateIdsAndBatches(210)
const maxSize = 100
var wg sync.WaitGroup
sp, _ := NewTailSamplingSpanProcessor(newTestPolicy(), uint64(maxSize), 64, defaultTestDecisionWait, zap.NewNop())
cfg := builder.TailBasedCfg{
DecisionWait: defaultTestDecisionWait,
NumTraces: uint64(maxSize),
ExpectedNewTracesPerSec: 64,
}
sp, _ := NewTraceProcessor(zap.NewNop(), &exportertest.SinkTraceExporter{}, cfg)
tsp := sp.(*tailSamplingSpanProcessor)
for _, batch := range batches {
wg.Add(1)
Expand All @@ -133,19 +155,17 @@ func TestConcurrentTraceMapSize(t *testing.T) {
}

func TestSamplingPolicyTypicalPath(t *testing.T) {
t.Skip("TODO(#146): add policies to TailBasedCfg and fix this test")
const maxSize = 100
const decisionWaitSeconds = 5
decisionWait := time.Second * decisionWaitSeconds
msp := &mockSpanProcessor{}
mpe := &mockPolicyEvaluator{}
testPolicy := []*Policy{
{
Name: "test",
Evaluator: mpe,
Destination: msp,
},
cfg := builder.TailBasedCfg{
DecisionWait: defaultTestDecisionWait,
NumTraces: uint64(maxSize),
ExpectedNewTracesPerSec: 64,
}
sp, _ := NewTailSamplingSpanProcessor(testPolicy, maxSize, 64, decisionWait, zap.NewNop())
sp, _ := NewTraceProcessor(zap.NewNop(), &exportertest.SinkTraceExporter{}, cfg)
tsp := sp.(*tailSamplingSpanProcessor)

// For this test explicitly control the timer calls and batcher.
Expand Down
16 changes: 0 additions & 16 deletions service/builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,22 +138,6 @@ func TestTailSamplingPoliciesConfiguration(t *testing.T) {
}
}

func TestTailSamplingConfig(t *testing.T) {
v, err := loadViperFromFile("./testdata/sampling_config.yaml")
if err != nil {
t.Fatalf("Failed to load viper from test file: %v", err)
}

wCfg := NewDefaultTailBasedCfg()
wCfg.DecisionWait = 31 * time.Second
wCfg.NumTraces = 20001

gCfg := NewDefaultTailBasedCfg().InitFromViper(v)
if !reflect.DeepEqual(gCfg, wCfg) {
t.Fatalf("Wanted %+v but got %+v", *wCfg, *gCfg)
}
}

func loadViperFromFile(file string) (*viper.Viper, error) {
v := viper.New()
v.SetConfigFile(file)
Expand Down
14 changes: 6 additions & 8 deletions service/builder/sampling_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"time"

"github.com/spf13/viper"

"github.com/open-telemetry/opentelemetry-service/config/configmodels"
)

const (
Expand Down Expand Up @@ -159,20 +161,16 @@ func (sCfg *SamplingCfg) InitFromViper(v *viper.Viper) *SamplingCfg {

// TailBasedCfg holds the configuration for tail-based sampling.
type TailBasedCfg struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: config as a name should be consistent with other processors.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will rename it when I removed the old config and moved it to tailsampling/config.go (in the follow-up PR).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, that's good to know.

configmodels.ProcessorSettings `mapstructure:",squash"`
// DecisionWait is the desired wait time from the arrival of the first span of
// trace until the decision about sampling it or not is evaluated.
DecisionWait time.Duration `mapstructure:"decision-wait"`
// NumTraces is the number of traces kept on memory. Typically most of the data
// of a trace is released after a sampling decision is taken.
NumTraces uint64 `mapstructure:"num-traces"`
}

// NewDefaultTailBasedCfg creates a TailBasedCfg with the default values.
func NewDefaultTailBasedCfg() *TailBasedCfg {
return &TailBasedCfg{
DecisionWait: 30 * time.Second,
NumTraces: 50000,
}
// ExpectedNewTracesPerSec sets the expected number of new traces sending to the tail sampling processor
// per second. This helps with allocating data structures with closer to actual usage size.
ExpectedNewTracesPerSec uint64 `mapstructure:"expected-new-traces-per-sec"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a new config setting? I cannot find it in the old implementation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

// InitFromViper initializes TailBasedCfg with properties from viper.
Expand Down
Loading