diff --git a/plugin/sampling/strategystore/adaptive/factory.go b/plugin/sampling/strategystore/adaptive/factory.go index d10e3208efb..c17a7b1b0cf 100644 --- a/plugin/sampling/strategystore/adaptive/factory.go +++ b/plugin/sampling/strategystore/adaptive/factory.go @@ -61,7 +61,7 @@ func (f *Factory) InitFromViper(v *viper.Viper, logger *zap.Logger) { // Initialize implements strategystore.Factory func (f *Factory) Initialize(metricsFactory metrics.Factory, ssFactory storage.SamplingStoreFactory, logger *zap.Logger) error { if ssFactory == nil { - return errors.New("lock or SamplingStore nil. Please configure a backend that supports adaptive sampling") + return errors.New("sampling store factory is nil. Please configure a backend that supports adaptive sampling") } var err error diff --git a/plugin/storage/factory.go b/plugin/storage/factory.go index 5a6f2261638..7a1a7f61526 100644 --- a/plugin/storage/factory.go +++ b/plugin/storage/factory.go @@ -77,6 +77,10 @@ func NewFactory(config FactoryConfig) (*Factory, error) { for _, storageType := range f.SpanWriterTypes { uniqueTypes[storageType] = struct{}{} } + // skip SamplingStorageType if it is empty. See CreateSamplingStoreFactory for details + if f.SamplingStorageType != "" { + uniqueTypes[f.SamplingStorageType] = struct{}{} + } f.factories = make(map[string]storage.Factory) for t := range uniqueTypes { ff, err := f.getFactoryOfType(t) @@ -162,6 +166,20 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { // CreateSamplingStoreFactory creates a distributedlock.Lock and samplingstore.Store for use with adaptive sampling func (f *Factory) CreateSamplingStoreFactory() (storage.SamplingStoreFactory, error) { + // if a sampling storage type was specified then use it, otherwise search all factories + // for compatibility + if f.SamplingStorageType != "" { + factory, ok := f.factories[f.SamplingStorageType] + if !ok { + return nil, fmt.Errorf("no %s backend registered for sampling store", f.SamplingStorageType) + } + ss, ok := factory.(storage.SamplingStoreFactory) + if !ok { + return nil, fmt.Errorf("storage factory of type %s does not support sampling store", f.SamplingStorageType) + } + return ss, nil + } + for _, factory := range f.factories { ss, ok := factory.(storage.SamplingStoreFactory) if ok { diff --git a/plugin/storage/factory_config.go b/plugin/storage/factory_config.go index 8432f129c41..6a2761fab69 100644 --- a/plugin/storage/factory_config.go +++ b/plugin/storage/factory_config.go @@ -29,6 +29,9 @@ const ( // DependencyStorageTypeEnvVar is the name of the env var that defines the type of backend used for dependencies storage. DependencyStorageTypeEnvVar = "DEPENDENCY_STORAGE_TYPE" + // SamplingStorageTypeEnvVar is the name of the env var that defines the type of backend used for sampling data storage when using adaptive sampling. + SamplingStorageTypeEnvVar = "SAMPLING_STORAGE_TYPE" + spanStorageFlag = "--span-storage.type" ) @@ -36,6 +39,7 @@ const ( type FactoryConfig struct { SpanWriterTypes []string SpanReaderType string + SamplingStorageType string DependenciesStorageType string DownsamplingRatio float64 DownsamplingHashSalt string @@ -73,11 +77,13 @@ func FactoryConfigFromEnvAndCLI(args []string, log io.Writer) FactoryConfig { if depStorageType == "" { depStorageType = spanWriterTypes[0] } + samplingStorageType := os.Getenv(SamplingStorageTypeEnvVar) // TODO support explicit configuration for readers return FactoryConfig{ SpanWriterTypes: spanWriterTypes, SpanReaderType: spanWriterTypes[0], DependenciesStorageType: depStorageType, + SamplingStorageType: samplingStorageType, } } diff --git a/plugin/storage/factory_config_test.go b/plugin/storage/factory_config_test.go index 369de9e741b..cfce7ca22ab 100644 --- a/plugin/storage/factory_config_test.go +++ b/plugin/storage/factory_config_test.go @@ -26,6 +26,7 @@ import ( func clearEnv() { os.Setenv(SpanStorageTypeEnvVar, "") os.Setenv(DependencyStorageTypeEnvVar, "") + os.Setenv(SamplingStorageTypeEnvVar, "") } func TestFactoryConfigFromEnv(t *testing.T) { @@ -37,15 +38,18 @@ func TestFactoryConfigFromEnv(t *testing.T) { assert.Equal(t, cassandraStorageType, f.SpanWriterTypes[0]) assert.Equal(t, cassandraStorageType, f.SpanReaderType) assert.Equal(t, cassandraStorageType, f.DependenciesStorageType) + assert.Empty(t, f.SamplingStorageType) os.Setenv(SpanStorageTypeEnvVar, elasticsearchStorageType) os.Setenv(DependencyStorageTypeEnvVar, memoryStorageType) + os.Setenv(SamplingStorageTypeEnvVar, cassandraStorageType) f = FactoryConfigFromEnvAndCLI(nil, &bytes.Buffer{}) assert.Equal(t, 1, len(f.SpanWriterTypes)) assert.Equal(t, elasticsearchStorageType, f.SpanWriterTypes[0]) assert.Equal(t, elasticsearchStorageType, f.SpanReaderType) assert.Equal(t, memoryStorageType, f.DependenciesStorageType) + assert.Equal(t, cassandraStorageType, f.SamplingStorageType) os.Setenv(SpanStorageTypeEnvVar, elasticsearchStorageType+","+kafkaStorageType) diff --git a/plugin/storage/factory_test.go b/plugin/storage/factory_test.go index b2762d7c1a0..4ea9f8b8ef1 100644 --- a/plugin/storage/factory_test.go +++ b/plugin/storage/factory_test.go @@ -296,20 +296,39 @@ func TestCreateError(t *testing.T) { } } -func CreateSamplingStoreFactory(t *testing.T) { +func TestCreateSamplingStoreFactory(t *testing.T) { f, err := NewFactory(defaultCfg()) require.NoError(t, err) assert.NotEmpty(t, f.factories) assert.NotEmpty(t, f.factories[cassandraStorageType]) + // if not specified sampling store is chosen from available factories ssFactory, err := f.CreateSamplingStoreFactory() assert.Equal(t, f.factories[cassandraStorageType], ssFactory) assert.NoError(t, err) + // if not specified and there's no compatible factories then return nil delete(f.factories, cassandraStorageType) ssFactory, err = f.CreateSamplingStoreFactory() assert.Nil(t, ssFactory) assert.NoError(t, err) + + // if an incompatible factory is specified return err + cfg := defaultCfg() + cfg.SamplingStorageType = "elasticsearch" + f, err = NewFactory(cfg) + require.NoError(t, err) + ssFactory, err = f.CreateSamplingStoreFactory() + assert.Nil(t, ssFactory) + assert.EqualError(t, err, "storage factory of type elasticsearch does not support sampling store") + + // if a compatible factory is specified then return it + cfg.SamplingStorageType = "cassandra" + f, err = NewFactory(cfg) + require.NoError(t, err) + ssFactory, err = f.CreateSamplingStoreFactory() + assert.Equal(t, ssFactory, f.factories["cassandra"]) + assert.NoError(t, err) } type configurable struct {