Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DONT REVIEW] Begin implementation of schema processor #8344

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor v0.46.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourceprocessor v0.46.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/routingprocessor v0.46.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor v0.46.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanmetricsprocessor v0.46.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanprocessor v0.46.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor v0.46.0
Expand Down Expand Up @@ -673,6 +674,8 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/reso

replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/routingprocessor => ./processor/routingprocessor/

replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor => ./processor/schemaprocessor/

replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanmetricsprocessor => ./processor/spanmetricsprocessor/

replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanprocessor => ./processor/spanprocessor/
Expand Down
2 changes: 2 additions & 0 deletions internal/components/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourceprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/routingprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanmetricsprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor"
Expand Down Expand Up @@ -293,6 +294,7 @@ func Components() (component.Factories, error) {
spanprocessor.NewFactory(),
cumulativetodeltaprocessor.NewFactory(),
deltatorateprocessor.NewFactory(),
schemaprocessor.NewFactory(),
}
factories.Processors, err = component.MakeProcessorFactoryMap(processors...)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/components/processors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestDefaultProcessors(t *testing.T) {
},
}

assert.Equal(t, len(tests)+11 /* not tested */, len(procFactories))
assert.Equal(t, len(tests)+12 /* not tested */, len(procFactories))
for _, tt := range tests {
t.Run(string(tt.processor), func(t *testing.T) {
factory, ok := procFactories[tt.processor]
Expand Down
1 change: 1 addition & 0 deletions processor/schemaprocessor/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
30 changes: 30 additions & 0 deletions processor/schemaprocessor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Schema Processor

Supported pipeline types: traces, metrics, logs.

The schema processor modifies transforms the schema of the input data from one version to
another. Please refer to [config.go](./config.go) for the config spec.


```yaml
processors:
schema:
transform:
# A set of one or more transform rules.
# "from" defines the Schema URL to match the input data. Must be either a full
# Schema URL with version number or a wildcard URL where version number can be partial.
- from: https://opentelemetry.io/schemas/1.*
# "to" defines the Schema URL to transform the input data to. MUST belong to the
# same Schema Family as the "from" setting. MUST be a Schema URL with a specific
# version number, wildcards are not allowed.
to: https://opentelemetry.io/schemas/1.9.0

extensions:
# Optional storage where schema files can be cached. schema processor will automatically
# use a storage extension if it detects one.
file_storage:
directory: /var/lib/otelcol/mydir
```

Refer to [config.yaml](./testdata/config.yaml) for detailed
examples on using the processor.
102 changes: 102 additions & 0 deletions processor/schemaprocessor/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package schemaprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor"

import (
"errors"
"fmt"
"net/url"
"strings"

"github.com/Masterminds/semver/v3"
"go.opentelemetry.io/collector/config"
)

// Config specifies the set of attributes to be inserted, updated, upserted and
// deleted and the properties to include/exclude a span from being processed.
// This processor handles all forms of modifications to attributes within a span.
// Prior to any actions being applied, each span is compared against
// the include properties and then the exclude properties if they are specified.
// This determines if a span is to be processed or not.
// The list of actions is applied in order specified in the configuration.
type Config struct {
config.ProcessorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct
Transform []TransformConfig `mapstructure:"transform"`
}

type TransformConfig struct {
From string `mapstructure:"from"`
To string `mapstructure:"to"`
}

var _ config.Processor = (*Config)(nil)

// Validate checks if the processor configuration is valid
func (cfg *Config) Validate() error {

if len(cfg.Transform) == 0 {
return fmt.Errorf("'transform' must contain at least one element")
}

for _, transform := range cfg.Transform {
if len(transform.From) == 0 {
return fmt.Errorf("'from' Schema URL must be specified")
}
if len(transform.To) == 0 {
return fmt.Errorf("'to' Schema URL must be specified")
}

if transform.From == transform.To {
return fmt.Errorf("'from' and 'to' Schema URLs must be different (%s)", transform.From)
}

fromFamily, _, err := splitSchemaURL(transform.From)
if err != nil {
return err
}

toFamily, toVersion, err := splitSchemaURL(transform.To)
if err != nil {
return err
}

if fromFamily != toFamily {
return fmt.Errorf("'from' and 'to' Schema Families do not match (%s, %s)", transform.From, transform.To)
}

_, err = semver.StrictNewVersion(toVersion)
if err != nil {
return fmt.Errorf("'to' Schema URL is invalid, must end with version number (%s)", transform.To)
}
}

return nil
}

func splitSchemaURL(schemaURL string) (family string, version string, err error) {
_, err = url.Parse(schemaURL)
if err != nil {
return "", "", err
}

i := strings.LastIndex(schemaURL, "/")
if i < 0 {
return "", "", errors.New("invalid schema URL, must have at least one forward slash")
}

family = schemaURL[0:i]
version = schemaURL[i+1:]
return
}
61 changes: 61 additions & 0 deletions processor/schemaprocessor/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package schemaprocessor

import (
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/service/servicetest"
)

func TestLoadingConfig(t *testing.T) {
factories, err := componenttest.NopFactories()
assert.NoError(t, err)

factory := NewFactory()
factories.Processors[typeStr] = factory
cfg, err := servicetest.LoadConfigAndValidate(filepath.Join("testdata", "config.yaml"), factories)
assert.NoError(t, err)
require.NotNil(t, cfg)

p0 := cfg.Processors[config.NewComponentIDWithName(typeStr, "")]
assert.Equal(t, p0, &Config{
ProcessorSettings: config.NewProcessorSettings(config.NewComponentIDWithName(typeStr, "")),
Transform: []TransformConfig{{From: "https://opentelemetry.io/schemas/1.*", To: "https://opentelemetry.io/schemas/1.9.0"}},
})
}

func TestInvalidConfig(t *testing.T) {
cfgs := []Config{
{
Transform: []TransformConfig{},
},
{
Transform: []TransformConfig{
{From: "", To: ""},
},
},
}

for _, cfg := range cfgs {
err := cfg.Validate()
assert.Error(t, err)
}
}
17 changes: 17 additions & 0 deletions processor/schemaprocessor/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package schemaprocessor contains the logic to modify attributes of a span.
// It supports insert, update, upsert and delete as actions.
package schemaprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor"
Loading