diff --git a/processor/schemaprocessor/config.go b/processor/schemaprocessor/config.go index 0c1d194fc61b..e77a885270b4 100644 --- a/processor/schemaprocessor/config.go +++ b/processor/schemaprocessor/config.go @@ -15,6 +15,12 @@ package schemaprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor" import ( + "errors" + "fmt" + "net/url" + "strings" + + "github.com/Masterminds/semver/v3" "go.opentelemetry.io/collector/config" ) @@ -39,5 +45,58 @@ var _ config.Processor = (*Config)(nil) // Validate checks if the processor configuration is valid func (cfg *Config) Validate() error { + + if len(cfg.Transform) == 0 { + return fmt.Errorf("'transform' must contain at least one element") + } + + for _, transform := range cfg.Transform { + if len(transform.From) == 0 { + return fmt.Errorf("'from' Schema URL must be specified") + } + if len(transform.To) == 0 { + return fmt.Errorf("'to' Schema URL must be specified") + } + + if transform.From == transform.To { + return fmt.Errorf("'from' and 'to' Schema URLs must be different (%s)", transform.From) + } + + fromFamily, _, err := splitSchemaURL(transform.From) + if err != nil { + return err + } + + toFamily, toVersion, err := splitSchemaURL(transform.To) + if err != nil { + return err + } + + if fromFamily != toFamily { + return fmt.Errorf("'from' and 'to' Schema Families do not match (%s, %s)", transform.From, transform.To) + } + + _, err = semver.StrictNewVersion(toVersion) + if err != nil { + return fmt.Errorf("'to' Schema URL is invalid, must end with version number (%s)", transform.To) + } + } + return nil } + +func splitSchemaURL(schemaURL string) (family string, version string, err error) { + _, err = url.Parse(schemaURL) + if err != nil { + return "", "", err + } + + i := strings.LastIndex(schemaURL, "/") + if i < 0 { + return "", "", errors.New("invalid schema URL, must have at least one forward slash") + } + + family = schemaURL[0:i] + version = schemaURL[i+1:] + return +} diff --git a/processor/schemaprocessor/config_test.go b/processor/schemaprocessor/config_test.go index 36b2e1370ac0..617d9bd50059 100644 --- a/processor/schemaprocessor/config_test.go +++ b/processor/schemaprocessor/config_test.go @@ -38,6 +38,24 @@ func TestLoadingConfig(t *testing.T) { p0 := cfg.Processors[config.NewComponentIDWithName(typeStr, "")] assert.Equal(t, p0, &Config{ ProcessorSettings: config.NewProcessorSettings(config.NewComponentIDWithName(typeStr, "")), + Transform: []TransformConfig{{From: "https://opentelemetry.io/schemas/1.*", To: "https://opentelemetry.io/schemas/1.9.0"}}, }) +} + +func TestInvalidConfig(t *testing.T) { + cfgs := []Config{ + { + Transform: []TransformConfig{}, + }, + { + Transform: []TransformConfig{ + {From: "", To: ""}, + }, + }, + } + for _, cfg := range cfgs { + err := cfg.Validate() + assert.Error(t, err) + } } diff --git a/processor/schemaprocessor/factory.go b/processor/schemaprocessor/factory.go index 733ea6a42547..798d91a00902 100644 --- a/processor/schemaprocessor/factory.go +++ b/processor/schemaprocessor/factory.go @@ -44,12 +44,12 @@ type factory struct { schemas map[string]*ast.Schema } -// NewFactory returns a new factory for the Attributes processor. +// NewFactory returns a new factory for the schema processor. func NewFactory() component.ProcessorFactory { return newFactoryWithFetcher(downloadSchema) } -func newFactory(schemaFetcher schemaFetcherFunc) *factory { +func newFactoryStruct(schemaFetcher schemaFetcherFunc) *factory { return &factory{ schemaFetcher: schemaFetcher, schemas: map[string]*ast.Schema{}, @@ -57,7 +57,7 @@ func newFactory(schemaFetcher schemaFetcherFunc) *factory { } func newFactoryWithFetcher(schemaFetcher schemaFetcherFunc) component.ProcessorFactory { - f := newFactory(schemaFetcher) + f := newFactoryStruct(schemaFetcher) return component.NewProcessorFactory( typeStr, createDefaultConfig, @@ -124,14 +124,14 @@ func downloadSchema(context context.Context, schemaURL string) (*ast.Schema, err } func (f *factory) createTracesProcessor( - _ context.Context, + ctx context.Context, _ component.ProcessorCreateSettings, cfg config.Processor, nextConsumer consumer.Traces, ) (component.TracesProcessor, error) { oCfg := cfg.(*Config) - p := newSchemaProcessor(f, oCfg) + p := newSchemaProcessor(ctx, f, oCfg) return processorhelper.NewTracesProcessor( cfg, @@ -141,14 +141,14 @@ func (f *factory) createTracesProcessor( } func (f *factory) createMetricsProcessor( - _ context.Context, + ctx context.Context, _ component.ProcessorCreateSettings, cfg config.Processor, nextConsumer consumer.Metrics, ) (component.MetricsProcessor, error) { oCfg := cfg.(*Config) - p := newSchemaProcessor(f, oCfg) + p := newSchemaProcessor(ctx, f, oCfg) return processorhelper.NewMetricsProcessor( cfg, @@ -158,14 +158,14 @@ func (f *factory) createMetricsProcessor( } func (f *factory) createLogProcessor( - _ context.Context, + ctx context.Context, _ component.ProcessorCreateSettings, cfg config.Processor, nextConsumer consumer.Logs, ) (component.LogsProcessor, error) { oCfg := cfg.(*Config) - p := newSchemaProcessor(f, oCfg) + p := newSchemaProcessor(ctx, f, oCfg) return processorhelper.NewLogsProcessor( cfg, diff --git a/processor/schemaprocessor/factory_test.go b/processor/schemaprocessor/factory_test.go index af075885b0fc..2d6064cd7ca4 100644 --- a/processor/schemaprocessor/factory_test.go +++ b/processor/schemaprocessor/factory_test.go @@ -58,7 +58,7 @@ func readTestSchema(_ context.Context, schemaURL string) (*ast.Schema, error) { } func TestFactory_GetSchemaConcurrently(t *testing.T) { - factory := newFactory(readTestSchema) + factory := newFactoryStruct(readTestSchema) wg := sync.WaitGroup{} var firstSchema *ast.Schema @@ -84,7 +84,7 @@ func TestFactory_GetSchemaConcurrently(t *testing.T) { } func TestFactory_GetSchemaFail(t *testing.T) { - factory := newFactory(readTestSchema) + factory := newFactoryStruct(readTestSchema) schema, err := factory.getSchema( context.Background(), "https://opentelemetry.io/schemas/11.22.33", ) @@ -100,7 +100,7 @@ func readBlockSchema(context context.Context, _ string) (*ast.Schema, error) { } func TestFactory_GetSchemaCancel(t *testing.T) { - factory := newFactory(readBlockSchema) + factory := newFactoryStruct(readBlockSchema) ctx, cancelFunc := context.WithCancel(context.Background()) @@ -131,6 +131,14 @@ func TestFactory_GetSchemaCancel(t *testing.T) { func TestFactory_ConsumeLogs(t *testing.T) { factory := newFactoryWithFetcher(readTestSchema) cfg := factory.CreateDefaultConfig() + pCfg := cfg.(*Config) + pCfg.Transform = []TransformConfig{ + { + From: "https://opentelemetry.io/schemas/1.*", + To: "https://opentelemetry.io/schemas/1.7.0", + }, + } + sink := &consumertest.LogsSink{} proc, err := factory.CreateLogsProcessor(context.Background(), component.ProcessorCreateSettings{}, cfg, sink) require.NoError(t, err) diff --git a/processor/schemaprocessor/processor.go b/processor/schemaprocessor/processor.go index b310fe730157..bfa80058d66c 100644 --- a/processor/schemaprocessor/processor.go +++ b/processor/schemaprocessor/processor.go @@ -10,7 +10,11 @@ type schemaProcessor struct { factory *factory } -func newSchemaProcessor(factory *factory, _ *Config) *schemaProcessor { +func newSchemaProcessor(ctx context.Context, factory *factory, cfg *Config) *schemaProcessor { + // Start prefetching schemas specified in the config. + for _, transform := range cfg.Transform { + go factory.getSchema(ctx, transform.To) + } return &schemaProcessor{factory: factory} } diff --git a/processor/schemaprocessor/testdata/config.yaml b/processor/schemaprocessor/testdata/config.yaml index b44f4357826a..ee9de69a5c57 100644 --- a/processor/schemaprocessor/testdata/config.yaml +++ b/processor/schemaprocessor/testdata/config.yaml @@ -1,5 +1,14 @@ processors: schema: + transform: + # A set of one or more transform rules. + # "from" defines the Schema URL to match the input data. Must be either a full + # Schema URL with version number or a wildcard URL where version number can be partial. + - from: https://opentelemetry.io/schemas/1.* + # "to" defines the Schema URL to transform the input data to. MUST belong to the + # same Schema Family as the "from" setting. MUST be a Schema URL with a specific + # version number, wildcards are not allowed. + to: https://opentelemetry.io/schemas/1.9.0 receivers: nop: