Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a Downsampling writer that drop a percentage of spans #1353

Merged
merged 20 commits into from
Mar 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 51 additions & 7 deletions plugin/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,22 @@ const (
elasticsearchStorageType = "elasticsearch"
memoryStorageType = "memory"
kafkaStorageType = "kafka"
downsamplingRatio = "downsampling.ratio"
downsamplingHashSalt = "downsampling.hashsalt"

// defaultDownsamplingRatio is the default downsampling ratio.
guanw marked this conversation as resolved.
Show resolved Hide resolved
defaultDownsamplingRatio = 1.0
// defaultDownsamplingHashSalt is the default downsampling hashsalt.
guanw marked this conversation as resolved.
Show resolved Hide resolved
defaultDownsamplingHashSalt = ""
)

var allStorageTypes = []string{cassandraStorageType, elasticsearchStorageType, memoryStorageType, kafkaStorageType}

// Factory implements storage.Factory interface as a meta-factory for storage components.
type Factory struct {
FactoryConfig

factories map[string]storage.Factory
metricsFactory metrics.Factory
factories map[string]storage.Factory
}

// NewFactory creates the meta-factory.
Expand Down Expand Up @@ -84,8 +91,9 @@ func (f *Factory) getFactoryOfType(factoryType string) (storage.Factory, error)
}
}

// Initialize implements storage.Factory
// Initialize implements storage.Factory.
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
f.metricsFactory = metricsFactory
for _, factory := range f.factories {
if err := factory.Initialize(metricsFactory, logger); err != nil {
return err
Expand All @@ -94,7 +102,7 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
return nil
}

// CreateSpanReader implements storage.Factory
// CreateSpanReader implements storage.Factory.
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
factory, ok := f.factories[f.SpanReaderType]
if !ok {
Expand All @@ -103,7 +111,7 @@ func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
return factory.CreateSpanReader()
}

// CreateSpanWriter implements storage.Factory
// CreateSpanWriter implements storage.Factory.
func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
var writers []spanstore.Writer
for _, storageType := range f.SpanWriterTypes {
Expand All @@ -117,10 +125,21 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
}
writers = append(writers, writer)
}
var spanWriter spanstore.Writer
if len(f.SpanWriterTypes) == 1 {
guanw marked this conversation as resolved.
Show resolved Hide resolved
return writers[0], nil
spanWriter = writers[0]
} else {
spanWriter = spanstore.NewCompositeWriter(writers...)
}
return spanstore.NewCompositeWriter(writers...), nil
// Turn off DownsamplingWriter entirely if ratio == defaultDownsamplingRatio.
if f.DownsamplingRatio == defaultDownsamplingRatio {
return spanWriter, nil
}
return spanstore.NewDownsamplingWriter(spanWriter, spanstore.DownsamplingOptions{
Ratio: f.DownsamplingRatio,
HashSalt: f.DownsamplingHashSalt,
MetricsFactory: f.metricsFactory.Namespace(metrics.NSOptions{Name: "downsampling_writer"}),
}), nil
}

// CreateDependencyReader implements storage.Factory
Expand All @@ -139,6 +158,21 @@ func (f *Factory) AddFlags(flagSet *flag.FlagSet) {
conf.AddFlags(flagSet)
}
}
addDownsamplingFlags(flagSet)
}

// addDownsamplingFlags add flags for Downsampling params
func addDownsamplingFlags(flagSet *flag.FlagSet) {
flagSet.Float64(
downsamplingRatio,
defaultDownsamplingRatio,
"Ratio of spans passed to storage after downsampling (between 0 and 1), e.g ratio = 0.3 means we are keeping 30% of spans and dropping 70% of spans; ratio = 1.0 disables downsampling.",
)
flagSet.String(
downsamplingHashSalt,
defaultDownsamplingHashSalt,
"Salt used when hashing trace id for downsampling.",
)
}

// InitFromViper implements plugin.Configurable
Expand All @@ -148,6 +182,16 @@ func (f *Factory) InitFromViper(v *viper.Viper) {
conf.InitFromViper(v)
}
}
f.initDownsamplingFromViper(v)
}

func (f *Factory) initDownsamplingFromViper(v *viper.Viper) {
f.FactoryConfig.DownsamplingRatio = v.GetFloat64(downsamplingRatio)
if f.FactoryConfig.DownsamplingRatio < 0 || f.FactoryConfig.DownsamplingRatio > 1 {
// Values not in the range of 0 ~ 1.0 will be set to default.
f.FactoryConfig.DownsamplingRatio = 1.0
guanw marked this conversation as resolved.
Show resolved Hide resolved
}
f.FactoryConfig.DownsamplingHashSalt = v.GetString(downsamplingHashSalt)
}

// CreateArchiveSpanReader implements storage.ArchiveFactory
Expand Down
2 changes: 2 additions & 0 deletions plugin/storage/factory_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type FactoryConfig struct {
SpanWriterTypes []string
SpanReaderType string
DependenciesStorageType string
DownsamplingRatio float64
DownsamplingHashSalt string
}

// FactoryConfigFromEnvAndCLI reads the desired types of storage backends from SPAN_STORAGE_TYPE and
Expand Down
4 changes: 2 additions & 2 deletions plugin/storage/factory_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestFactoryConfigFromEnv(t *testing.T) {
clearEnv()
defer clearEnv()

f := FactoryConfigFromEnvAndCLI(nil, nil)
f := FactoryConfigFromEnvAndCLI(nil, &bytes.Buffer{})
assert.Equal(t, 1, len(f.SpanWriterTypes))
assert.Equal(t, cassandraStorageType, f.SpanWriterTypes[0])
assert.Equal(t, cassandraStorageType, f.SpanReaderType)
Expand All @@ -40,7 +40,7 @@ func TestFactoryConfigFromEnv(t *testing.T) {
os.Setenv(SpanStorageTypeEnvVar, elasticsearchStorageType)
os.Setenv(DependencyStorageTypeEnvVar, memoryStorageType)

f = FactoryConfigFromEnvAndCLI(nil, nil)
f = FactoryConfigFromEnvAndCLI(nil, &bytes.Buffer{})
assert.Equal(t, 1, len(f.SpanWriterTypes))
assert.Equal(t, elasticsearchStorageType, f.SpanWriterTypes[0])
assert.Equal(t, elasticsearchStorageType, f.SpanReaderType)
Expand Down
67 changes: 67 additions & 0 deletions plugin/storage/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package storage
import (
"errors"
"flag"
"reflect"
"strings"
"testing"

"github.com/spf13/viper"
Expand All @@ -25,6 +27,7 @@ import (
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/config"
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
"github.com/jaegertracing/jaeger/storage"
depStoreMocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks"
"github.com/jaegertracing/jaeger/storage/mocks"
Expand All @@ -40,6 +43,8 @@ func defaultCfg() FactoryConfig {
SpanWriterTypes: []string{cassandraStorageType},
SpanReaderType: cassandraStorageType,
DependenciesStorageType: cassandraStorageType,
DownsamplingRatio: 1.0,
DownsamplingHashSalt: "",
}
}

Expand Down Expand Up @@ -129,11 +134,49 @@ func TestCreate(t *testing.T) {
assert.EqualError(t, err, "Archive storage not supported")

mock.On("CreateSpanWriter").Return(spanWriter, nil)
m := metrics.NullFactory
l := zap.NewNop()
mock.On("Initialize", m, l).Return(nil)
f.Initialize(m, l)
w, err = f.CreateSpanWriter()
assert.NoError(t, err)
assert.Equal(t, spanWriter, w)
}

func TestCreateDownsamplingWriter(t *testing.T) {
f, err := NewFactory(defaultCfg())
assert.NoError(t, err)
assert.NotEmpty(t, f.factories[cassandraStorageType])
mock := new(mocks.Factory)
f.factories[cassandraStorageType] = mock
spanWriter := new(spanStoreMocks.Writer)
mock.On("CreateSpanWriter").Return(spanWriter, nil)

m := metrics.NullFactory
l := zap.NewNop()
mock.On("Initialize", m, l).Return(nil)

var testParams = []struct {
ratio float64
writerType string
}{
{0.5, "*spanstore.DownsamplingWriter"},
{1.0, "*mocks.Writer"},
}

for _, param := range testParams {
t.Run(param.writerType, func(t *testing.T) {
f.DownsamplingRatio = param.ratio
f.Initialize(m, l)
newWriter, err := f.CreateSpanWriter()
assert.NoError(t, err)
// Currently directly assertEqual doesn't work since DownsamplingWriter initializes with different
// address for hashPool. The following workaround checks writer type instead
assert.True(t, strings.HasPrefix(reflect.TypeOf(newWriter).String(), param.writerType))
})
}
}

func TestCreateMulti(t *testing.T) {
cfg := defaultCfg()
cfg.SpanWriterTypes = append(cfg.SpanWriterTypes, elasticsearchStorageType)
Expand All @@ -156,6 +199,11 @@ func TestCreateMulti(t *testing.T) {

mock.On("CreateSpanWriter").Return(spanWriter, nil)
mock2.On("CreateSpanWriter").Return(spanWriter2, nil)
m := metrics.NullFactory
l := zap.NewNop()
mock.On("Initialize", m, l).Return(nil)
mock2.On("Initialize", m, l).Return(nil)
f.Initialize(m, l)
w, err = f.CreateSpanWriter()
assert.NoError(t, err)
assert.Equal(t, spanstore.NewCompositeWriter(spanWriter, spanWriter2), w)
Expand Down Expand Up @@ -264,3 +312,22 @@ func TestConfigurable(t *testing.T) {
assert.Equal(t, fs, mock.flagSet)
assert.Equal(t, v, mock.viper)
}

func TestParsingDownsamplingRatio(t *testing.T) {
f := Factory{}
v, command := config.Viperize(addDownsamplingFlags)
err := command.ParseFlags([]string{
"--downsampling.ratio=1.5",
"--downsampling.hashsalt=jaeger"})
assert.NoError(t, err)
f.InitFromViper(v)

assert.Equal(t, f.FactoryConfig.DownsamplingRatio, 1.0)
assert.Equal(t, f.FactoryConfig.DownsamplingHashSalt, "jaeger")

err = command.ParseFlags([]string{
"--downsampling.ratio=0.5"})
assert.NoError(t, err)
f.InitFromViper(v)
assert.Equal(t, f.FactoryConfig.DownsamplingRatio, 0.5)
}
119 changes: 119 additions & 0 deletions storage/spanstore/downsampling_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Copyright (c) 2019 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package spanstore

import (
"hash"
"hash/fnv"
"math"
"sync"

"github.com/uber/jaeger-lib/metrics"

"github.com/jaegertracing/jaeger/model"
)

const defaultHashSalt = "downsampling-default-salt"

var (
traceIDByteSize = (&model.TraceID{}).Size()
)
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved

// hasher includes data we want to put in sync.Pool.
type hasher struct {
hash hash.Hash64
buffer []byte
}

// downsamplingWriterMetrics keeping track of total number of dropped spans and accepted spans.
type downsamplingWriterMetrics struct {
SpansDropped metrics.Counter `metric:"spans_dropped"`
SpansAccepted metrics.Counter `metric:"spans_accepted"`
}

// DownsamplingWriter is a span Writer that drops spans with a predefined downsamplingRatio.
type DownsamplingWriter struct {
spanWriter Writer
threshold uint64
lengthOfSalt int
hasherPool *sync.Pool
metrics downsamplingWriterMetrics
}

// DownsamplingOptions contains the options for constructing a DownsamplingWriter.
type DownsamplingOptions struct {
Ratio float64
HashSalt string
MetricsFactory metrics.Factory
}

// NewDownsamplingWriter creates a DownsamplingWriter.
func NewDownsamplingWriter(spanWriter Writer, downsamplingOptions DownsamplingOptions) *DownsamplingWriter {
threshold := uint64(downsamplingOptions.Ratio * float64(math.MaxUint64))
writeMetrics := &downsamplingWriterMetrics{}
metrics.Init(writeMetrics, downsamplingOptions.MetricsFactory, nil)
salt := downsamplingOptions.HashSalt
if salt == "" {
salt = defaultHashSalt
}
hashSaltBytes := []byte(salt)
pool := &sync.Pool{
New: func() interface{} {
buffer := make([]byte, len(hashSaltBytes)+traceIDByteSize)
copy(buffer, hashSaltBytes)
return &hasher{
hash: fnv.New64a(),
buffer: buffer,
}
},
}

return &DownsamplingWriter{
spanWriter: spanWriter,
threshold: threshold,
hasherPool: pool,
metrics: *writeMetrics,
lengthOfSalt: len(hashSaltBytes),
}
}

// WriteSpan calls WriteSpan on wrapped span writer.
func (ds *DownsamplingWriter) WriteSpan(span *model.Span) error {
if !ds.shouldDownsample(span) {
// Drops spans when hashVal falls beyond computed threshold.
ds.metrics.SpansDropped.Inc(1)
return nil
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
}
ds.metrics.SpansAccepted.Inc(1)
return ds.spanWriter.WriteSpan(span)
}

func (ds *DownsamplingWriter) shouldDownsample(span *model.Span) bool {
hasherInstance := ds.hasherPool.Get().(*hasher)
// Currently MarshalTo will only return err if size of traceIDBytes is smaller than 16
// Since we force traceIDBytes to be size of 16 metrics is not necessary here.
_, _ = span.TraceID.MarshalTo(hasherInstance.buffer[ds.lengthOfSalt:])
hashVal := hasherInstance.hashBytes()
ds.hasherPool.Put(hasherInstance)
return hashVal <= ds.threshold
}

// hashBytes returns the uint64 hash value of byte slice.
func (h *hasher) hashBytes() uint64 {
h.hash.Reset()
// Currently fnv.Write() implementation doesn't throw any error so metric is not necessary here.
_, _ = h.hash.Write(h.buffer)
return h.hash.Sum64()
}
Loading