Skip to content

Commit

Permalink
Add config validation
Browse files Browse the repository at this point in the history
  • Loading branch information
tigrannajaryan committed Mar 8, 2022
1 parent 5c3d8dd commit e1ac17b
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 13 deletions.
59 changes: 59 additions & 0 deletions processor/schemaprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
package schemaprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor"

import (
"errors"
"fmt"
"net/url"
"strings"

"github.com/Masterminds/semver/v3"
"go.opentelemetry.io/collector/config"
)

Expand All @@ -39,5 +45,58 @@ var _ config.Processor = (*Config)(nil)

// Validate checks if the processor configuration is valid
func (cfg *Config) Validate() error {

if len(cfg.Transform) == 0 {
return fmt.Errorf("'transform' must contain at least one element")
}

for _, transform := range cfg.Transform {
if len(transform.From) == 0 {
return fmt.Errorf("'from' Schema URL must be specified")
}
if len(transform.To) == 0 {
return fmt.Errorf("'to' Schema URL must be specified")
}

if transform.From == transform.To {
return fmt.Errorf("'from' and 'to' Schema URLs must be different (%s)", transform.From)
}

fromFamily, _, err := splitSchemaURL(transform.From)
if err != nil {
return err
}

toFamily, toVersion, err := splitSchemaURL(transform.To)
if err != nil {
return err
}

if fromFamily != toFamily {
return fmt.Errorf("'from' and 'to' Schema Families do not match (%s, %s)", transform.From, transform.To)
}

_, err = semver.StrictNewVersion(toVersion)
if err != nil {
return fmt.Errorf("'to' Schema URL is invalid, must end with version number (%s)", transform.To)
}
}

return nil
}

func splitSchemaURL(schemaURL string) (family string, version string, err error) {
_, err = url.Parse(schemaURL)
if err != nil {
return "", "", err
}

i := strings.LastIndex(schemaURL, "/")
if i < 0 {
return "", "", errors.New("invalid schema URL, must have at least one forward slash")
}

family = schemaURL[0:i]
version = schemaURL[i+1:]
return
}
18 changes: 18 additions & 0 deletions processor/schemaprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,24 @@ func TestLoadingConfig(t *testing.T) {
p0 := cfg.Processors[config.NewComponentIDWithName(typeStr, "")]
assert.Equal(t, p0, &Config{
ProcessorSettings: config.NewProcessorSettings(config.NewComponentIDWithName(typeStr, "")),
Transform: []TransformConfig{{From: "https://opentelemetry.io/schemas/1.*", To: "https://opentelemetry.io/schemas/1.9.0"}},
})
}

func TestInvalidConfig(t *testing.T) {
cfgs := []Config{
{
Transform: []TransformConfig{},
},
{
Transform: []TransformConfig{
{From: "", To: ""},
},
},
}

for _, cfg := range cfgs {
err := cfg.Validate()
assert.Error(t, err)
}
}
18 changes: 9 additions & 9 deletions processor/schemaprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,20 @@ type factory struct {
schemas map[string]*ast.Schema
}

// NewFactory returns a new factory for the Attributes processor.
// NewFactory returns a new factory for the schema processor.
func NewFactory() component.ProcessorFactory {
return newFactoryWithFetcher(downloadSchema)
}

func newFactory(schemaFetcher schemaFetcherFunc) *factory {
func newFactoryStruct(schemaFetcher schemaFetcherFunc) *factory {
return &factory{
schemaFetcher: schemaFetcher,
schemas: map[string]*ast.Schema{},
}
}

func newFactoryWithFetcher(schemaFetcher schemaFetcherFunc) component.ProcessorFactory {
f := newFactory(schemaFetcher)
f := newFactoryStruct(schemaFetcher)
return component.NewProcessorFactory(
typeStr,
createDefaultConfig,
Expand Down Expand Up @@ -124,14 +124,14 @@ func downloadSchema(context context.Context, schemaURL string) (*ast.Schema, err
}

func (f *factory) createTracesProcessor(
_ context.Context,
ctx context.Context,
_ component.ProcessorCreateSettings,
cfg config.Processor,
nextConsumer consumer.Traces,
) (component.TracesProcessor, error) {
oCfg := cfg.(*Config)

p := newSchemaProcessor(f, oCfg)
p := newSchemaProcessor(ctx, f, oCfg)

return processorhelper.NewTracesProcessor(
cfg,
Expand All @@ -141,14 +141,14 @@ func (f *factory) createTracesProcessor(
}

func (f *factory) createMetricsProcessor(
_ context.Context,
ctx context.Context,
_ component.ProcessorCreateSettings,
cfg config.Processor,
nextConsumer consumer.Metrics,
) (component.MetricsProcessor, error) {
oCfg := cfg.(*Config)

p := newSchemaProcessor(f, oCfg)
p := newSchemaProcessor(ctx, f, oCfg)

return processorhelper.NewMetricsProcessor(
cfg,
Expand All @@ -158,14 +158,14 @@ func (f *factory) createMetricsProcessor(
}

func (f *factory) createLogProcessor(
_ context.Context,
ctx context.Context,
_ component.ProcessorCreateSettings,
cfg config.Processor,
nextConsumer consumer.Logs,
) (component.LogsProcessor, error) {
oCfg := cfg.(*Config)

p := newSchemaProcessor(f, oCfg)
p := newSchemaProcessor(ctx, f, oCfg)

return processorhelper.NewLogsProcessor(
cfg,
Expand Down
14 changes: 11 additions & 3 deletions processor/schemaprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func readTestSchema(_ context.Context, schemaURL string) (*ast.Schema, error) {
}

func TestFactory_GetSchemaConcurrently(t *testing.T) {
factory := newFactory(readTestSchema)
factory := newFactoryStruct(readTestSchema)

wg := sync.WaitGroup{}
var firstSchema *ast.Schema
Expand All @@ -84,7 +84,7 @@ func TestFactory_GetSchemaConcurrently(t *testing.T) {
}

func TestFactory_GetSchemaFail(t *testing.T) {
factory := newFactory(readTestSchema)
factory := newFactoryStruct(readTestSchema)
schema, err := factory.getSchema(
context.Background(), "https://opentelemetry.io/schemas/11.22.33",
)
Expand All @@ -100,7 +100,7 @@ func readBlockSchema(context context.Context, _ string) (*ast.Schema, error) {
}

func TestFactory_GetSchemaCancel(t *testing.T) {
factory := newFactory(readBlockSchema)
factory := newFactoryStruct(readBlockSchema)

ctx, cancelFunc := context.WithCancel(context.Background())

Expand Down Expand Up @@ -131,6 +131,14 @@ func TestFactory_GetSchemaCancel(t *testing.T) {
func TestFactory_ConsumeLogs(t *testing.T) {
factory := newFactoryWithFetcher(readTestSchema)
cfg := factory.CreateDefaultConfig()
pCfg := cfg.(*Config)
pCfg.Transform = []TransformConfig{
{
From: "https://opentelemetry.io/schemas/1.*",
To: "https://opentelemetry.io/schemas/1.7.0",
},
}

sink := &consumertest.LogsSink{}
proc, err := factory.CreateLogsProcessor(context.Background(), component.ProcessorCreateSettings{}, cfg, sink)
require.NoError(t, err)
Expand Down
6 changes: 5 additions & 1 deletion processor/schemaprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ type schemaProcessor struct {
factory *factory
}

func newSchemaProcessor(factory *factory, _ *Config) *schemaProcessor {
func newSchemaProcessor(ctx context.Context, factory *factory, cfg *Config) *schemaProcessor {
// Start prefetching schemas specified in the config.
for _, transform := range cfg.Transform {
go factory.getSchema(ctx, transform.To)
}
return &schemaProcessor{factory: factory}
}

Expand Down
9 changes: 9 additions & 0 deletions processor/schemaprocessor/testdata/config.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
processors:
schema:
transform:
# A set of one or more transform rules.
# "from" defines the Schema URL to match the input data. Must be either a full
# Schema URL with version number or a wildcard URL where version number can be partial.
- from: https://opentelemetry.io/schemas/1.*
# "to" defines the Schema URL to transform the input data to. MUST belong to the
# same Schema Family as the "from" setting. MUST be a Schema URL with a specific
# version number, wildcards are not allowed.
to: https://opentelemetry.io/schemas/1.9.0

receivers:
nop:
Expand Down

0 comments on commit e1ac17b

Please sign in to comment.