Skip to content

Commit

Permalink
Add a Downsampling writer that drop a percentage of spans (#1353)
Browse files Browse the repository at this point in the history
* adding a wrapping writer that automatic drop spans before writing with predefined config

Signed-off-by: Jude Wang <judew@uber.com>

* updating comment

Signed-off-by: Jude Wang <judew@uber.com>

* hash traceID for downsampling and code refactoring

Signed-off-by: Jude Wang <judew@uber.com>

* adding DownSamplingOptions

Signed-off-by: Jude Wang <judew@uber.com>

* address yuri's comments

Signed-off-by: Jude Wang <judew@uber.com>

* committing benchmark file & adding metrics

Signed-off-by: Jude Wang <judew@uber.com>

* address comment from Won

Signed-off-by: Jude Wang <judew@uber.com>

* get rid of defer; benchmark sync.Pool for byte array; make sure 100% code coverage

Signed-off-by: Jude Wang <judew@uber.com>

* making sure sync.Pool benchmark with 0 allocations

Signed-off-by: Jude Wang <judew@uber.com>

* refactor to read ratio and hashsalt from CLI

Signed-off-by: Jude Wang <judew@uber.com>

* address yuri comments

Signed-off-by: Jude Wang <judew@uber.com>

* refinement

Signed-off-by: Jude Wang <judew@uber.com>

* fix merge conflict

Signed-off-by: Jude Wang <judew@uber.com>

* copy hashSalt slice inside bytePool initialization

Signed-off-by: Jude Wang <judew@uber.com>

* further comments from yuri

Signed-off-by: Jude Wang <judew@uber.com>

* omit error check for MarshalTo

Signed-off-by: Jude Wang <judew@uber.com>

* update naming and some logic

Signed-off-by: Jude Wang <judew@uber.com>

* remove pointer

Signed-off-by: Jude Wang <judew@uber.com>

* updating tests

Signed-off-by: Jude Wang <judew@uber.com>

* adding default salt

Signed-off-by: Jude Wang <judew@uber.com>
  • Loading branch information
guanw authored and yurishkuro committed Mar 12, 2019
1 parent 91d9f95 commit 19b6807
Show file tree
Hide file tree
Showing 7 changed files with 423 additions and 9 deletions.
58 changes: 51 additions & 7 deletions plugin/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,22 @@ const (
elasticsearchStorageType = "elasticsearch"
memoryStorageType = "memory"
kafkaStorageType = "kafka"
downsamplingRatio = "downsampling.ratio"
downsamplingHashSalt = "downsampling.hashsalt"

// defaultDownsamplingRatio is the default downsampling ratio.
defaultDownsamplingRatio = 1.0
// defaultDownsamplingHashSalt is the default downsampling hashsalt.
defaultDownsamplingHashSalt = ""
)

var allStorageTypes = []string{cassandraStorageType, elasticsearchStorageType, memoryStorageType, kafkaStorageType}

// Factory implements storage.Factory interface as a meta-factory for storage components.
type Factory struct {
FactoryConfig

factories map[string]storage.Factory
metricsFactory metrics.Factory
factories map[string]storage.Factory
}

// NewFactory creates the meta-factory.
Expand Down Expand Up @@ -84,8 +91,9 @@ func (f *Factory) getFactoryOfType(factoryType string) (storage.Factory, error)
}
}

// Initialize implements storage.Factory
// Initialize implements storage.Factory.
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
f.metricsFactory = metricsFactory
for _, factory := range f.factories {
if err := factory.Initialize(metricsFactory, logger); err != nil {
return err
Expand All @@ -94,7 +102,7 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
return nil
}

// CreateSpanReader implements storage.Factory
// CreateSpanReader implements storage.Factory.
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
factory, ok := f.factories[f.SpanReaderType]
if !ok {
Expand All @@ -103,7 +111,7 @@ func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
return factory.CreateSpanReader()
}

// CreateSpanWriter implements storage.Factory
// CreateSpanWriter implements storage.Factory.
func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
var writers []spanstore.Writer
for _, storageType := range f.SpanWriterTypes {
Expand All @@ -117,10 +125,21 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
}
writers = append(writers, writer)
}
var spanWriter spanstore.Writer
if len(f.SpanWriterTypes) == 1 {
return writers[0], nil
spanWriter = writers[0]
} else {
spanWriter = spanstore.NewCompositeWriter(writers...)
}
return spanstore.NewCompositeWriter(writers...), nil
// Turn off DownsamplingWriter entirely if ratio == defaultDownsamplingRatio.
if f.DownsamplingRatio == defaultDownsamplingRatio {
return spanWriter, nil
}
return spanstore.NewDownsamplingWriter(spanWriter, spanstore.DownsamplingOptions{
Ratio: f.DownsamplingRatio,
HashSalt: f.DownsamplingHashSalt,
MetricsFactory: f.metricsFactory.Namespace(metrics.NSOptions{Name: "downsampling_writer"}),
}), nil
}

// CreateDependencyReader implements storage.Factory
Expand All @@ -139,6 +158,21 @@ func (f *Factory) AddFlags(flagSet *flag.FlagSet) {
conf.AddFlags(flagSet)
}
}
addDownsamplingFlags(flagSet)
}

// addDownsamplingFlags add flags for Downsampling params
func addDownsamplingFlags(flagSet *flag.FlagSet) {
flagSet.Float64(
downsamplingRatio,
defaultDownsamplingRatio,
"Ratio of spans passed to storage after downsampling (between 0 and 1), e.g ratio = 0.3 means we are keeping 30% of spans and dropping 70% of spans; ratio = 1.0 disables downsampling.",
)
flagSet.String(
downsamplingHashSalt,
defaultDownsamplingHashSalt,
"Salt used when hashing trace id for downsampling.",
)
}

// InitFromViper implements plugin.Configurable
Expand All @@ -148,6 +182,16 @@ func (f *Factory) InitFromViper(v *viper.Viper) {
conf.InitFromViper(v)
}
}
f.initDownsamplingFromViper(v)
}

func (f *Factory) initDownsamplingFromViper(v *viper.Viper) {
f.FactoryConfig.DownsamplingRatio = v.GetFloat64(downsamplingRatio)
if f.FactoryConfig.DownsamplingRatio < 0 || f.FactoryConfig.DownsamplingRatio > 1 {
// Values not in the range of 0 ~ 1.0 will be set to default.
f.FactoryConfig.DownsamplingRatio = 1.0
}
f.FactoryConfig.DownsamplingHashSalt = v.GetString(downsamplingHashSalt)
}

// CreateArchiveSpanReader implements storage.ArchiveFactory
Expand Down
2 changes: 2 additions & 0 deletions plugin/storage/factory_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type FactoryConfig struct {
SpanWriterTypes []string
SpanReaderType string
DependenciesStorageType string
DownsamplingRatio float64
DownsamplingHashSalt string
}

// FactoryConfigFromEnvAndCLI reads the desired types of storage backends from SPAN_STORAGE_TYPE and
Expand Down
4 changes: 2 additions & 2 deletions plugin/storage/factory_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestFactoryConfigFromEnv(t *testing.T) {
clearEnv()
defer clearEnv()

f := FactoryConfigFromEnvAndCLI(nil, nil)
f := FactoryConfigFromEnvAndCLI(nil, &bytes.Buffer{})
assert.Equal(t, 1, len(f.SpanWriterTypes))
assert.Equal(t, cassandraStorageType, f.SpanWriterTypes[0])
assert.Equal(t, cassandraStorageType, f.SpanReaderType)
Expand All @@ -40,7 +40,7 @@ func TestFactoryConfigFromEnv(t *testing.T) {
os.Setenv(SpanStorageTypeEnvVar, elasticsearchStorageType)
os.Setenv(DependencyStorageTypeEnvVar, memoryStorageType)

f = FactoryConfigFromEnvAndCLI(nil, nil)
f = FactoryConfigFromEnvAndCLI(nil, &bytes.Buffer{})
assert.Equal(t, 1, len(f.SpanWriterTypes))
assert.Equal(t, elasticsearchStorageType, f.SpanWriterTypes[0])
assert.Equal(t, elasticsearchStorageType, f.SpanReaderType)
Expand Down
67 changes: 67 additions & 0 deletions plugin/storage/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package storage
import (
"errors"
"flag"
"reflect"
"strings"
"testing"

"github.com/spf13/viper"
Expand All @@ -25,6 +27,7 @@ import (
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/storage"
depStoreMocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks"
"github.com/jaegertracing/jaeger/storage/mocks"
Expand All @@ -40,6 +43,8 @@ func defaultCfg() FactoryConfig {
SpanWriterTypes: []string{cassandraStorageType},
SpanReaderType: cassandraStorageType,
DependenciesStorageType: cassandraStorageType,
DownsamplingRatio: 1.0,
DownsamplingHashSalt: "",
}
}

Expand Down Expand Up @@ -129,11 +134,49 @@ func TestCreate(t *testing.T) {
assert.EqualError(t, err, "Archive storage not supported")

mock.On("CreateSpanWriter").Return(spanWriter, nil)
m := metrics.NullFactory
l := zap.NewNop()
mock.On("Initialize", m, l).Return(nil)
f.Initialize(m, l)
w, err = f.CreateSpanWriter()
assert.NoError(t, err)
assert.Equal(t, spanWriter, w)
}

func TestCreateDownsamplingWriter(t *testing.T) {
f, err := NewFactory(defaultCfg())
assert.NoError(t, err)
assert.NotEmpty(t, f.factories[cassandraStorageType])
mock := new(mocks.Factory)
f.factories[cassandraStorageType] = mock
spanWriter := new(spanStoreMocks.Writer)
mock.On("CreateSpanWriter").Return(spanWriter, nil)

m := metrics.NullFactory
l := zap.NewNop()
mock.On("Initialize", m, l).Return(nil)

var testParams = []struct {
ratio float64
writerType string
}{
{0.5, "*spanstore.DownsamplingWriter"},
{1.0, "*mocks.Writer"},
}

for _, param := range testParams {
t.Run(param.writerType, func(t *testing.T) {
f.DownsamplingRatio = param.ratio
f.Initialize(m, l)
newWriter, err := f.CreateSpanWriter()
assert.NoError(t, err)
// Currently directly assertEqual doesn't work since DownsamplingWriter initializes with different
// address for hashPool. The following workaround checks writer type instead
assert.True(t, strings.HasPrefix(reflect.TypeOf(newWriter).String(), param.writerType))
})
}
}

func TestCreateMulti(t *testing.T) {
cfg := defaultCfg()
cfg.SpanWriterTypes = append(cfg.SpanWriterTypes, elasticsearchStorageType)
Expand All @@ -156,6 +199,11 @@ func TestCreateMulti(t *testing.T) {

mock.On("CreateSpanWriter").Return(spanWriter, nil)
mock2.On("CreateSpanWriter").Return(spanWriter2, nil)
m := metrics.NullFactory
l := zap.NewNop()
mock.On("Initialize", m, l).Return(nil)
mock2.On("Initialize", m, l).Return(nil)
f.Initialize(m, l)
w, err = f.CreateSpanWriter()
assert.NoError(t, err)
assert.Equal(t, spanstore.NewCompositeWriter(spanWriter, spanWriter2), w)
Expand Down Expand Up @@ -264,3 +312,22 @@ func TestConfigurable(t *testing.T) {
assert.Equal(t, fs, mock.flagSet)
assert.Equal(t, v, mock.viper)
}

func TestParsingDownsamplingRatio(t *testing.T) {
f := Factory{}
v, command := config.Viperize(addDownsamplingFlags)
err := command.ParseFlags([]string{
"--downsampling.ratio=1.5",
"--downsampling.hashsalt=jaeger"})
assert.NoError(t, err)
f.InitFromViper(v)

assert.Equal(t, f.FactoryConfig.DownsamplingRatio, 1.0)
assert.Equal(t, f.FactoryConfig.DownsamplingHashSalt, "jaeger")

err = command.ParseFlags([]string{
"--downsampling.ratio=0.5"})
assert.NoError(t, err)
f.InitFromViper(v)
assert.Equal(t, f.FactoryConfig.DownsamplingRatio, 0.5)
}
119 changes: 119 additions & 0 deletions storage/spanstore/downsampling_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Copyright (c) 2019 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package spanstore

import (
"hash"
"hash/fnv"
"math"
"sync"

"github.com/uber/jaeger-lib/metrics"

"github.com/jaegertracing/jaeger/model"
)

const defaultHashSalt = "downsampling-default-salt"

var (
traceIDByteSize = (&model.TraceID{}).Size()
)

// hasher includes data we want to put in sync.Pool.
type hasher struct {
hash hash.Hash64
buffer []byte
}

// downsamplingWriterMetrics keeping track of total number of dropped spans and accepted spans.
type downsamplingWriterMetrics struct {
SpansDropped metrics.Counter `metric:"spans_dropped"`
SpansAccepted metrics.Counter `metric:"spans_accepted"`
}

// DownsamplingWriter is a span Writer that drops spans with a predefined downsamplingRatio.
type DownsamplingWriter struct {
spanWriter Writer
threshold uint64
lengthOfSalt int
hasherPool *sync.Pool
metrics downsamplingWriterMetrics
}

// DownsamplingOptions contains the options for constructing a DownsamplingWriter.
type DownsamplingOptions struct {
Ratio float64
HashSalt string
MetricsFactory metrics.Factory
}

// NewDownsamplingWriter creates a DownsamplingWriter.
func NewDownsamplingWriter(spanWriter Writer, downsamplingOptions DownsamplingOptions) *DownsamplingWriter {
threshold := uint64(downsamplingOptions.Ratio * float64(math.MaxUint64))
writeMetrics := &downsamplingWriterMetrics{}
metrics.Init(writeMetrics, downsamplingOptions.MetricsFactory, nil)
salt := downsamplingOptions.HashSalt
if salt == "" {
salt = defaultHashSalt
}
hashSaltBytes := []byte(salt)
pool := &sync.Pool{
New: func() interface{} {
buffer := make([]byte, len(hashSaltBytes)+traceIDByteSize)
copy(buffer, hashSaltBytes)
return &hasher{
hash: fnv.New64a(),
buffer: buffer,
}
},
}

return &DownsamplingWriter{
spanWriter: spanWriter,
threshold: threshold,
hasherPool: pool,
metrics: *writeMetrics,
lengthOfSalt: len(hashSaltBytes),
}
}

// WriteSpan calls WriteSpan on wrapped span writer.
func (ds *DownsamplingWriter) WriteSpan(span *model.Span) error {
if !ds.shouldDownsample(span) {
// Drops spans when hashVal falls beyond computed threshold.
ds.metrics.SpansDropped.Inc(1)
return nil
}
ds.metrics.SpansAccepted.Inc(1)
return ds.spanWriter.WriteSpan(span)
}

func (ds *DownsamplingWriter) shouldDownsample(span *model.Span) bool {
hasherInstance := ds.hasherPool.Get().(*hasher)
// Currently MarshalTo will only return err if size of traceIDBytes is smaller than 16
// Since we force traceIDBytes to be size of 16 metrics is not necessary here.
_, _ = span.TraceID.MarshalTo(hasherInstance.buffer[ds.lengthOfSalt:])
hashVal := hasherInstance.hashBytes()
ds.hasherPool.Put(hasherInstance)
return hashVal <= ds.threshold
}

// hashBytes returns the uint64 hash value of byte slice.
func (h *hasher) hashBytes() uint64 {
h.hash.Reset()
// Currently fnv.Write() implementation doesn't throw any error so metric is not necessary here.
_, _ = h.hash.Write(h.buffer)
return h.hash.Sum64()
}
Loading

0 comments on commit 19b6807

Please sign in to comment.