-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add factory and update config for tail sampling processor #200
Changes from all commits
362e144
41d7adf
d93134d
c08f836
653a656
374fa6e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
// Copyright 2019, OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package tailsampling | ||
|
||
import ( | ||
"time" | ||
|
||
"go.uber.org/zap" | ||
|
||
"github.com/open-telemetry/opentelemetry-service/config/configerror" | ||
"github.com/open-telemetry/opentelemetry-service/config/configmodels" | ||
"github.com/open-telemetry/opentelemetry-service/consumer" | ||
"github.com/open-telemetry/opentelemetry-service/processor" | ||
"github.com/open-telemetry/opentelemetry-service/service/builder" | ||
) | ||
|
||
const ( | ||
// The value of "type" Tail Sampling in configuration. | ||
typeStr = "tail-sampling" | ||
) | ||
|
||
// Factory is the factory for Tail Sampling processor. | ||
type Factory struct { | ||
} | ||
|
||
// Type gets the type of the config created by this factory. | ||
func (f *Factory) Type() string { | ||
return typeStr | ||
} | ||
|
||
// CreateDefaultConfig creates the default configuration for processor. | ||
func (f *Factory) CreateDefaultConfig() configmodels.Processor { | ||
return &builder.TailBasedCfg{ | ||
DecisionWait: 30 * time.Second, | ||
NumTraces: 50000, | ||
} | ||
} | ||
|
||
// CreateTraceProcessor creates a trace processor based on this config. | ||
func (f *Factory) CreateTraceProcessor( | ||
logger *zap.Logger, | ||
nextConsumer consumer.TraceConsumer, | ||
cfg configmodels.Processor, | ||
) (processor.TraceProcessor, error) { | ||
tCfg := cfg.(*builder.TailBasedCfg) | ||
return NewTraceProcessor(logger, nextConsumer, *tCfg) | ||
} | ||
|
||
// CreateMetricsProcessor creates a metrics processor based on this config. | ||
func (f *Factory) CreateMetricsProcessor( | ||
logger *zap.Logger, | ||
nextConsumer consumer.MetricsConsumer, | ||
cfg configmodels.Processor, | ||
) (processor.MetricsProcessor, error) { | ||
return nil, configerror.ErrDataTypeIsNotSupported | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
// Copyright 2019, OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package tailsampling | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"go.uber.org/zap" | ||
|
||
"github.com/open-telemetry/opentelemetry-service/exporter/exportertest" | ||
) | ||
|
||
func TestCreateDefaultConfig(t *testing.T) { | ||
factory := &Factory{} | ||
|
||
cfg := factory.CreateDefaultConfig() | ||
assert.NotNil(t, cfg, "failed to create default config") | ||
} | ||
|
||
func TestCreateProcessor(t *testing.T) { | ||
factory := &Factory{} | ||
|
||
cfg := factory.CreateDefaultConfig() | ||
|
||
tp, err := factory.CreateTraceProcessor(zap.NewNop(), exportertest.NewNopTraceExporter(), cfg) | ||
assert.NotNil(t, tp) | ||
assert.NoError(t, err, "cannot create trace processor") | ||
|
||
mp, err := factory.CreateMetricsProcessor(zap.NewNop(), nil, cfg) | ||
assert.Nil(t, mp) | ||
assert.Error(t, err, "should not be able to create metric processor") | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,7 @@ package tailsampling | |
|
||
import ( | ||
"context" | ||
"errors" | ||
"runtime" | ||
"sync" | ||
"sync/atomic" | ||
|
@@ -30,7 +31,8 @@ import ( | |
"github.com/open-telemetry/opentelemetry-service/consumer/consumerdata" | ||
"github.com/open-telemetry/opentelemetry-service/internal/collector/processor/idbatcher" | ||
"github.com/open-telemetry/opentelemetry-service/internal/collector/sampling" | ||
"github.com/open-telemetry/opentelemetry-service/observability" | ||
"github.com/open-telemetry/opentelemetry-service/processor" | ||
"github.com/open-telemetry/opentelemetry-service/service/builder" | ||
) | ||
|
||
// Policy combines a sampling policy evaluator with the destinations to be | ||
|
@@ -54,6 +56,7 @@ type traceKey string | |
// policies to sample traces. | ||
type tailSamplingSpanProcessor struct { | ||
ctx context.Context | ||
nextConsumer consumer.TraceConsumer | ||
start sync.Once | ||
maxNumTraces uint64 | ||
policies []*Policy | ||
|
@@ -69,40 +72,33 @@ const ( | |
sourceFormat = "tail-sampling" | ||
) | ||
|
||
var _ consumer.TraceConsumer = (*tailSamplingSpanProcessor)(nil) | ||
var _ processor.TraceProcessor = (*tailSamplingSpanProcessor)(nil) | ||
|
||
// NewTailSamplingSpanProcessor creates a TailSamplingSpanProcessor with the given policies. | ||
// It will keep maxNumTraces on memory and will attempt to wait until decisionWait before evaluating if | ||
// a trace should be sampled or not. Providing expectedNewTracesPerSec helps with allocating data structures | ||
// with closer to actual usage size. | ||
func NewTailSamplingSpanProcessor( | ||
policies []*Policy, | ||
maxNumTraces, expectedNewTracesPerSec uint64, | ||
decisionWait time.Duration, | ||
logger *zap.Logger) (consumer.TraceConsumer, error) { | ||
// NewTraceProcessor returns a processor.TraceProcessor that will perform tail sampling according to the given | ||
// configuration. | ||
func NewTraceProcessor(logger *zap.Logger, nextConsumer consumer.TraceConsumer, cfg builder.TailBasedCfg) (processor.TraceProcessor, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a good reason that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that config is inherited from opencensus-service where sampling is configured through builder. I'll move it to processor/tailsampling and clean up the old configs in the next PR. |
||
if nextConsumer == nil { | ||
return nil, errors.New("nextConsumer is nil") | ||
} | ||
|
||
numDecisionBatches := uint64(decisionWait.Seconds()) | ||
inBatcher, err := idbatcher.New(numDecisionBatches, expectedNewTracesPerSec, uint64(2*runtime.NumCPU())) | ||
numDecisionBatches := uint64(cfg.DecisionWait.Seconds()) | ||
inBatcher, err := idbatcher.New(numDecisionBatches, cfg.ExpectedNewTracesPerSec, uint64(2*runtime.NumCPU())) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
tsp := &tailSamplingSpanProcessor{ | ||
ctx: context.Background(), | ||
maxNumTraces: maxNumTraces, | ||
policies: policies, | ||
nextConsumer: nextConsumer, | ||
maxNumTraces: cfg.NumTraces, | ||
logger: logger, | ||
decisionBatcher: inBatcher, | ||
} | ||
|
||
for _, policy := range policies { | ||
policyCtx, err := tag.New(tsp.ctx, tag.Upsert(tagPolicyKey, policy.Name), tag.Upsert(observability.TagKeyReceiver, sourceFormat)) | ||
if err != nil { | ||
return nil, err | ||
} | ||
policy.ctx = policyCtx | ||
} | ||
// TODO(#146): add policies to TailBasedCfg | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Policies have an interface and we will need to add the config factories for each one (as a temporary alternative we can use the old code to read them but I want to double-check that before going in that direction). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it, either way I would prefer to add it in a follow-up PR. |
||
tsp.policyTicker = &policyTicker{onTick: tsp.samplingPolicyOnTick} | ||
tsp.deleteChan = make(chan traceKey, maxNumTraces) | ||
tsp.deleteChan = make(chan traceKey, cfg.NumTraces) | ||
|
||
return tsp, nil | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,8 @@ import ( | |
"time" | ||
|
||
"github.com/spf13/viper" | ||
|
||
"github.com/open-telemetry/opentelemetry-service/config/configmodels" | ||
) | ||
|
||
const ( | ||
|
@@ -159,20 +161,16 @@ func (sCfg *SamplingCfg) InitFromViper(v *viper.Viper) *SamplingCfg { | |
|
||
// TailBasedCfg holds the configuration for tail-based sampling. | ||
type TailBasedCfg struct { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: config as a name should be consistent with other processors. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will rename it when I removed the old config and moved it to tailsampling/config.go (in the follow-up PR). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good, that's good to know. |
||
configmodels.ProcessorSettings `mapstructure:",squash"` | ||
// DecisionWait is the desired wait time from the arrival of the first span of | ||
// trace until the decision about sampling it or not is evaluated. | ||
DecisionWait time.Duration `mapstructure:"decision-wait"` | ||
// NumTraces is the number of traces kept on memory. Typically most of the data | ||
// of a trace is released after a sampling decision is taken. | ||
NumTraces uint64 `mapstructure:"num-traces"` | ||
} | ||
|
||
// NewDefaultTailBasedCfg creates a TailBasedCfg with the default values. | ||
func NewDefaultTailBasedCfg() *TailBasedCfg { | ||
return &TailBasedCfg{ | ||
DecisionWait: 30 * time.Second, | ||
NumTraces: 50000, | ||
} | ||
// ExpectedNewTracesPerSec sets the expected number of new traces sending to the tail sampling processor | ||
// per second. This helps with allocating data structures with closer to actual usage size. | ||
ExpectedNewTracesPerSec uint64 `mapstructure:"expected-new-traces-per-sec"` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this a new config setting? I cannot find it in the old implementation. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not in the old config but it's required by the tail sampler (even before this PR): |
||
} | ||
|
||
// InitFromViper initializes TailBasedCfg with properties from viper. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason for those numbers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure either. This is copied from https://github.com/open-telemetry/opentelemetry-service/blob/4c7c3ca57110c32803b56cec3fe4733afd74c952/service/builder/sampling_builder.go#L170-L176
@pjanotti any idea on those numbers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are arbitrary and there is no good criteria other than "reasonableness", that I am aware :), to select those defaults.