Skip to content

Commit

Permalink
Add SAMPLING_STORAGE_TYPE environment variable (#3573)
Browse files Browse the repository at this point in the history
* Added SAMPLING_STORAGE_TYPE support in factory config

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Added factory support and tests

Signed-off-by: Joe Elliott <number101010@gmail.com>

* review

Signed-off-by: Joe Elliott <number101010@gmail.com>
  • Loading branch information
joe-elliott authored Mar 10, 2022
1 parent f55a1a2 commit 4f55a70
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 2 deletions.
2 changes: 1 addition & 1 deletion plugin/sampling/strategystore/adaptive/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (f *Factory) InitFromViper(v *viper.Viper, logger *zap.Logger) {
// Initialize implements strategystore.Factory
func (f *Factory) Initialize(metricsFactory metrics.Factory, ssFactory storage.SamplingStoreFactory, logger *zap.Logger) error {
if ssFactory == nil {
return errors.New("lock or SamplingStore nil. Please configure a backend that supports adaptive sampling")
return errors.New("sampling store factory is nil. Please configure a backend that supports adaptive sampling")
}

var err error
Expand Down
18 changes: 18 additions & 0 deletions plugin/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ func NewFactory(config FactoryConfig) (*Factory, error) {
for _, storageType := range f.SpanWriterTypes {
uniqueTypes[storageType] = struct{}{}
}
// skip SamplingStorageType if it is empty. See CreateSamplingStoreFactory for details
if f.SamplingStorageType != "" {
uniqueTypes[f.SamplingStorageType] = struct{}{}
}
f.factories = make(map[string]storage.Factory)
for t := range uniqueTypes {
ff, err := f.getFactoryOfType(t)
Expand Down Expand Up @@ -162,6 +166,20 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {

// CreateSamplingStoreFactory creates a distributedlock.Lock and samplingstore.Store for use with adaptive sampling
func (f *Factory) CreateSamplingStoreFactory() (storage.SamplingStoreFactory, error) {
// if a sampling storage type was specified then use it, otherwise search all factories
// for compatibility
if f.SamplingStorageType != "" {
factory, ok := f.factories[f.SamplingStorageType]
if !ok {
return nil, fmt.Errorf("no %s backend registered for sampling store", f.SamplingStorageType)
}
ss, ok := factory.(storage.SamplingStoreFactory)
if !ok {
return nil, fmt.Errorf("storage factory of type %s does not support sampling store", f.SamplingStorageType)
}
return ss, nil
}

for _, factory := range f.factories {
ss, ok := factory.(storage.SamplingStoreFactory)
if ok {
Expand Down
6 changes: 6 additions & 0 deletions plugin/storage/factory_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,17 @@ const (
// DependencyStorageTypeEnvVar is the name of the env var that defines the type of backend used for dependencies storage.
DependencyStorageTypeEnvVar = "DEPENDENCY_STORAGE_TYPE"

// SamplingStorageTypeEnvVar is the name of the env var that defines the type of backend used for sampling data storage when using adaptive sampling.
SamplingStorageTypeEnvVar = "SAMPLING_STORAGE_TYPE"

spanStorageFlag = "--span-storage.type"
)

// FactoryConfig tells the Factory which types of backends it needs to create for different storage types.
type FactoryConfig struct {
SpanWriterTypes []string
SpanReaderType string
SamplingStorageType string
DependenciesStorageType string
DownsamplingRatio float64
DownsamplingHashSalt string
Expand Down Expand Up @@ -73,11 +77,13 @@ func FactoryConfigFromEnvAndCLI(args []string, log io.Writer) FactoryConfig {
if depStorageType == "" {
depStorageType = spanWriterTypes[0]
}
samplingStorageType := os.Getenv(SamplingStorageTypeEnvVar)
// TODO support explicit configuration for readers
return FactoryConfig{
SpanWriterTypes: spanWriterTypes,
SpanReaderType: spanWriterTypes[0],
DependenciesStorageType: depStorageType,
SamplingStorageType: samplingStorageType,
}
}

Expand Down
4 changes: 4 additions & 0 deletions plugin/storage/factory_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
func clearEnv() {
os.Setenv(SpanStorageTypeEnvVar, "")
os.Setenv(DependencyStorageTypeEnvVar, "")
os.Setenv(SamplingStorageTypeEnvVar, "")
}

func TestFactoryConfigFromEnv(t *testing.T) {
Expand All @@ -37,15 +38,18 @@ func TestFactoryConfigFromEnv(t *testing.T) {
assert.Equal(t, cassandraStorageType, f.SpanWriterTypes[0])
assert.Equal(t, cassandraStorageType, f.SpanReaderType)
assert.Equal(t, cassandraStorageType, f.DependenciesStorageType)
assert.Empty(t, f.SamplingStorageType)

os.Setenv(SpanStorageTypeEnvVar, elasticsearchStorageType)
os.Setenv(DependencyStorageTypeEnvVar, memoryStorageType)
os.Setenv(SamplingStorageTypeEnvVar, cassandraStorageType)

f = FactoryConfigFromEnvAndCLI(nil, &bytes.Buffer{})
assert.Equal(t, 1, len(f.SpanWriterTypes))
assert.Equal(t, elasticsearchStorageType, f.SpanWriterTypes[0])
assert.Equal(t, elasticsearchStorageType, f.SpanReaderType)
assert.Equal(t, memoryStorageType, f.DependenciesStorageType)
assert.Equal(t, cassandraStorageType, f.SamplingStorageType)

os.Setenv(SpanStorageTypeEnvVar, elasticsearchStorageType+","+kafkaStorageType)

Expand Down
21 changes: 20 additions & 1 deletion plugin/storage/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,20 +296,39 @@ func TestCreateError(t *testing.T) {
}
}

func CreateSamplingStoreFactory(t *testing.T) {
func TestCreateSamplingStoreFactory(t *testing.T) {
f, err := NewFactory(defaultCfg())
require.NoError(t, err)
assert.NotEmpty(t, f.factories)
assert.NotEmpty(t, f.factories[cassandraStorageType])

// if not specified sampling store is chosen from available factories
ssFactory, err := f.CreateSamplingStoreFactory()
assert.Equal(t, f.factories[cassandraStorageType], ssFactory)
assert.NoError(t, err)

// if not specified and there's no compatible factories then return nil
delete(f.factories, cassandraStorageType)
ssFactory, err = f.CreateSamplingStoreFactory()
assert.Nil(t, ssFactory)
assert.NoError(t, err)

// if an incompatible factory is specified return err
cfg := defaultCfg()
cfg.SamplingStorageType = "elasticsearch"
f, err = NewFactory(cfg)
require.NoError(t, err)
ssFactory, err = f.CreateSamplingStoreFactory()
assert.Nil(t, ssFactory)
assert.EqualError(t, err, "storage factory of type elasticsearch does not support sampling store")

// if a compatible factory is specified then return it
cfg.SamplingStorageType = "cassandra"
f, err = NewFactory(cfg)
require.NoError(t, err)
ssFactory, err = f.CreateSamplingStoreFactory()
assert.Equal(t, ssFactory, f.factories["cassandra"])
assert.NoError(t, err)
}

type configurable struct {
Expand Down

0 comments on commit 4f55a70

Please sign in to comment.