From a8bc6f82a0bf698e6ee76b482cb37801ae0df532 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 28 Mar 2023 13:44:23 -0700 Subject: [PATCH] PoC --- sdk/go.mod | 1 + sdk/go.sum | 4 +- sdk/resource/internal/schema/compare.go | 67 +++++++++++++++++++ sdk/resource/internal/schema/registry.go | 82 ++++++++++++++++++++++++ sdk/resource/internal/schema/upgrade.go | 21 ++++-- sdk/resource/resource.go | 82 ++++++++++++++++++++++++ 6 files changed, 249 insertions(+), 8 deletions(-) create mode 100644 sdk/resource/internal/schema/compare.go create mode 100644 sdk/resource/internal/schema/registry.go diff --git a/sdk/go.mod b/sdk/go.mod index d02036a1fff..9f640d522bf 100644 --- a/sdk/go.mod +++ b/sdk/go.mod @@ -20,6 +20,7 @@ require ( github.com/go-logr/stdr v1.2.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect go.opentelemetry.io/otel/metric v1.15.0-rc.2 // indirect + gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/sdk/go.sum b/sdk/go.sum index dcf8a49b229..a901bcbeab4 100644 --- a/sdk/go.sum +++ b/sdk/go.sum @@ -19,12 +19,12 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -go.opentelemetry.io/otel/schema v0.0.4 h1:xgqNjF5c5oy7F1PDm4q6a6wDUJTm+po4jEiXmcN5ncI= -go.opentelemetry.io/otel/schema v0.0.4/go.mod h1:LBBdyW+43YB5XmeQtH4b2ET5k0hx7dh3yJgRGY4Qw+A= golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/sdk/resource/internal/schema/compare.go b/sdk/resource/internal/schema/compare.go new file mode 100644 index 00000000000..3423d1f6b80 --- /dev/null +++ b/sdk/resource/internal/schema/compare.go @@ -0,0 +1,67 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schema + +import ( + "fmt" + "net/url" + "strings" + + "github.com/Masterminds/semver/v3" +) + +type Comparison uint8 + +const ( + invalidComparison Comparison = iota + EqualTo + GreaterThan + LessThan +) + +// CompareVersions compares schema URL versions and returns the Comparison of a +// vs b (i.e. a is [comparison value] b). +func CompareVersions(a, b string) (Comparison, error) { + aVer, err := version(a) + if err != nil { + return invalidComparison, fmt.Errorf("invalid version for %q: %w", a, err) + } + + bVer, err := version(b) + if err != nil { + return invalidComparison, fmt.Errorf("invalid version for %q: %w", b, err) + } + + switch aVer.Compare(bVer) { + case -1: + return LessThan, nil + case 0: + return EqualTo, nil + case 1: + return GreaterThan, nil + default: + return invalidComparison, fmt.Errorf("unable to compare versions: %s, %s", aVer, bVer) + } +} + +func version(schemaURL string) (*semver.Version, error) { + // https://github.com/open-telemetry/oteps/blob/main/text/0152-telemetry-schemas.md#schema-url + u, err := url.Parse(schemaURL) + if err != nil { + return nil, err + } + + return semver.NewVersion(u.Path[strings.LastIndex(u.Path, "/")+1:]) +} diff --git a/sdk/resource/internal/schema/registry.go b/sdk/resource/internal/schema/registry.go new file mode 100644 index 00000000000..ceed92ebf26 --- /dev/null +++ b/sdk/resource/internal/schema/registry.go @@ -0,0 +1,82 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schema + +import ( + "context" + "net/http" + + sUtil "go.opentelemetry.io/otel/schema/v1.1" + "go.opentelemetry.io/otel/schema/v1.1/ast" +) + +type cache struct { + data map[string]*ast.Schema +} + +func (c *cache) lookup(key string, f func() (*ast.Schema, error)) (*ast.Schema, error) { + if c.data == nil { + s, err := f() + if err != nil { + return nil, err + } + c.data = map[string]*ast.Schema{key: s} + return s, nil + } + + if s, ok := c.data[key]; ok { + return s, nil + } + + s, err := f() + if err != nil { + return nil, err + } + c.data = map[string]*ast.Schema{key: s} + return s, nil +} + +// Registry hold a registration of schema files. It will cache any schema files +// it gets from external URLs. +type Registry struct { + client *http.Client + + cache cache +} + +// NewRegistry returns a Registry that uses the HTTP client. If client is nil +// it will use the default client from "net/http". +func NewRegistry(client *http.Client) *Registry { + if client == nil { + client = http.DefaultClient + } + return &Registry{client: client} +} + +// Get returns the Schema at the target schemaURL using the registry client. +func (r *Registry) Get(ctx context.Context, schemaURL string) (*ast.Schema, error) { + return r.cache.lookup(schemaURL, func() (*ast.Schema, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, schemaURL, http.NoBody) + if err != nil { + return nil, err + } + resp, err := r.client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + return sUtil.Parse(resp.Body) + }) +} diff --git a/sdk/resource/internal/schema/upgrade.go b/sdk/resource/internal/schema/upgrade.go index efb4fe6f2f1..85f6dc35aae 100644 --- a/sdk/resource/internal/schema/upgrade.go +++ b/sdk/resource/internal/schema/upgrade.go @@ -25,9 +25,9 @@ import ( "go.opentelemetry.io/otel/schema/v1.1/types" ) -// Upgrade upgrades attrs in place using schema. +// Upgrade upgrades attrs in place with schema. func Upgrade(schema *ast.Schema, attrs []attribute.KeyValue) error { - vers, err := versions(schema, false) + vers, err := versions(schema, nil, false) if err != nil { return fmt.Errorf("upgrade error: %w", err) } @@ -52,9 +52,14 @@ func Upgrade(schema *ast.Schema, attrs []attribute.KeyValue) error { return nil } -// Downgrade downgrade attrs in place using schema. -func Downgrade(schema *ast.Schema, attrs []attribute.KeyValue) error { - vers, err := versions(schema, true) +// Downgrade downgrade attrs to the schema version of url in place with schema. +func Downgrade(schema *ast.Schema, url string, attrs []attribute.KeyValue) error { + min, err := version(url) + if err != nil { + return fmt.Errorf("downgrade error: %w", err) + } + + vers, err := versions(schema, min, true) if err != nil { return fmt.Errorf("downgrade error: %w", err) } @@ -79,7 +84,8 @@ func Downgrade(schema *ast.Schema, attrs []attribute.KeyValue) error { return nil } -func versions(schema *ast.Schema, reverse bool) ([]types.TelemetryVersion, error) { +// versions returns the sorted versions contained in schema. +func versions(schema *ast.Schema, min *semver.Version, reverse bool) ([]types.TelemetryVersion, error) { // The transformations specified in each version are applied one by one. // Order the versions to ensure correct application. versions := make([]*semver.Version, 0, len(schema.Versions)) @@ -99,6 +105,9 @@ func versions(schema *ast.Schema, reverse bool) ([]types.TelemetryVersion, error out := make([]types.TelemetryVersion, len(versions)) for i := range versions { + if min != nil && min.GreaterThan(versions[i]) { + continue + } out[i] = types.TelemetryVersion(versions[i].String()) } return out, nil diff --git a/sdk/resource/resource.go b/sdk/resource/resource.go index 49958a9a60a..76687d63eba 100644 --- a/sdk/resource/resource.go +++ b/sdk/resource/resource.go @@ -21,6 +21,7 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/resource/internal/schema" ) // Resource describes an entity about which identifying information @@ -127,6 +128,62 @@ func (r *Resource) SchemaURL() string { return r.schemaURL } +// WithSchemaURL returns a copy of r with the schema URL set to url and all +// attributes transformed based on the associated schema. If the schema +// transformation fails, or url is empty, an error is returned. +func (r *Resource) WithSchemaURL(ctx context.Context, url string) (*Resource, error) { + return r.withSchemaURL(ctx, schema.NewRegistry(nil), url) +} + +func (r *Resource) withSchemaURL(ctx context.Context, reg *schema.Registry, url string) (*Resource, error) { + if url == "" { + return nil, errors.New(`invalid schema url: ""`) + } + + if r == nil || r.attrs.Len() == 0 { + return NewWithAttributes(url), nil + } + + if r.schemaURL == url { + // Resources are immutable, just return the ptr to the same value. + return r, nil + } + + comp, err := schema.CompareVersions(r.schemaURL, url) + if err != nil { + return nil, err + } + switch comp { + case schema.EqualTo: + // Resources are immutable, just return the ptr to the same value. + return r, nil + case schema.LessThan: + s, err := reg.Get(ctx, url) + if err != nil { + return nil, err + } + attrs := r.Attributes() + err = schema.Upgrade(s, attrs) + if err != nil { + return nil, err + } + return NewWithAttributes(url, attrs...), nil + case schema.GreaterThan: + s, err := reg.Get(ctx, r.schemaURL) + if err != nil { + return nil, err + } + attrs := r.Attributes() + err = schema.Downgrade(s, url, attrs) + if err != nil { + return nil, err + } + return NewWithAttributes(url, attrs...), nil + default: + panic("unknown schema URL comparison") + } +} + // Iter returns an iterator of the Resource attributes. // This is ideal to use if you do not want a copy of the attributes. func (r *Resource) Iter() attribute.Iterator { @@ -192,6 +249,31 @@ func Merge(a, b *Resource) (*Resource, error) { return merged, nil } +// MergeAt creates a new resource by combining resources at the target +// schemaURL version. +// +// If there are common keys between resources the latter resource will +// overwrite the former. +// +// Any of the resources not already at schemaURL version will be appropriately +// upgraded or downgraded to match the version. An error is returned if this is +// not possible. +func MergeAt(ctx context.Context, schemaURL string, resources ...*Resource) (*Resource, error) { + reg := schema.NewRegistry(nil) + merged := NewWithAttributes(schemaURL) + for _, r := range resources { + versioned, err := r.withSchemaURL(ctx, reg, schemaURL) + if err != nil { + return nil, err + } + merged, err = Merge(merged, versioned) + if err != nil { + return nil, err + } + } + return merged, nil +} + // Empty returns an instance of Resource with no attributes. It is // equivalent to a `nil` Resource. func Empty() *Resource {