Skip to content

Commit

Permalink
init http detector
Browse files Browse the repository at this point in the history
Signed-off-by: Jared Tan <jian.tan@daocloud.io>
  • Loading branch information
JaredTan95 committed Aug 30, 2024
1 parent bf7f46a commit 5bb8a12
Show file tree
Hide file tree
Showing 9 changed files with 272 additions and 8 deletions.
13 changes: 13 additions & 0 deletions processor/resourcedetectionprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@
package resourcedetectionprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor"

import (
"time"

"go.opentelemetry.io/collector/config/confighttp"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/http"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/aws/ec2"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/aws/ecs"
Expand Down Expand Up @@ -41,6 +45,9 @@ type Config struct {
// If a supplied attribute is not a valid attribute of a supplied detector it will be ignored.
// Deprecated: Please use detector's resource_attributes config instead
Attributes []string `mapstructure:"attributes"`

// interval of detect action
DetectInterval time.Duration `mapstructure:"detect_interval"`
}

// DetectorConfig contains user-specified configurations unique to all individual detectors
Expand Down Expand Up @@ -86,6 +93,9 @@ type DetectorConfig struct {

// K8SNode contains user-specified configurations for the K8SNode detector
K8SNodeConfig k8snode.Config `mapstructure:"k8snode"`

// Http contains user-specified configurations for the Http detector
HttpConfig http.Config `mapstructure:"http"`
}

func detectorCreateDefaultConfig() DetectorConfig {
Expand All @@ -104,6 +114,7 @@ func detectorCreateDefaultConfig() DetectorConfig {
SystemConfig: system.CreateDefaultConfig(),
OpenShiftConfig: openshift.CreateDefaultConfig(),
K8SNodeConfig: k8snode.CreateDefaultConfig(),
HttpConfig: http.CreateDefaultConfig(),
}
}

Expand Down Expand Up @@ -137,6 +148,8 @@ func (d *DetectorConfig) GetConfigFromType(detectorType internal.DetectorType) i
return d.OpenShiftConfig
case k8snode.TypeStr:
return d.K8SNodeConfig
case http.TypeStr:
return d.HttpConfig
default:
return nil
}
Expand Down
5 changes: 5 additions & 0 deletions processor/resourcedetectionprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"sync"
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/http"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/consumer"
Expand Down Expand Up @@ -63,6 +65,7 @@ func NewFactory() processor.Factory {
system.TypeStr: system.NewDetector,
openshift.TypeStr: openshift.NewDetector,
k8snode.TypeStr: k8snode.NewDetector,
http.TypeStr: http.NewDetector,
})

f := &factory{
Expand Down Expand Up @@ -90,6 +93,7 @@ func createDefaultConfig() component.Config {
Override: true,
Attributes: nil,
DetectorConfig: detectorCreateDefaultConfig(),
DetectInterval: time.Minute * 10,
// TODO: Once issue(https://github.com/open-telemetry/opentelemetry-collector/issues/4001) gets resolved,
// Set the default value of 'hostname_source' here instead of 'system' detector
}
Expand Down Expand Up @@ -182,6 +186,7 @@ func (f *factory) getResourceDetectionProcessor(
override: oCfg.Override,
httpClientSettings: oCfg.ClientConfig,
telemetrySettings: params.TelemetrySettings,
detectInterval: oCfg.DetectInterval,
}, nil
}

Expand Down
24 changes: 24 additions & 0 deletions processor/resourcedetectionprocessor/internal/http/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package http // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/http"
import (
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configopaque"
)

type Config struct {
// APIKey is the authentication token
APIKey configopaque.String `mapstructure:"api_key"`

// API URL to use
APIURL string `mapstructure:"api_url"`

confighttp.ClientConfig `mapstructure:",squash"`
}

func CreateDefaultConfig() Config {
return Config{
APIURL: "http://localhost:8080",
}
}
98 changes: 98 additions & 0 deletions processor/resourcedetectionprocessor/internal/http/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package http // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/http"
import (
"context"
"fmt"
"io"
"net/http"
"time"

jsoniter "github.com/json-iterator/go"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/processor"
conventions "go.opentelemetry.io/collector/semconv/v1.22.0"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal"
)

const (
// TypeStr is type of detector.
TypeStr = "http"
)

type resourceAttribute struct {
Key string
Value string
}

var _ internal.Detector = (*detector)(nil)

type detector struct {
logger *zap.Logger
//
set component.TelemetrySettings
interval time.Duration
client *http.Client
apiURL string
apiKey configopaque.String
requestIntervalTicker *time.Ticker
}

// NewDetector returns a detector which can detect resource attributes on Heroku
func NewDetector(set processor.Settings, dcfg internal.DetectorConfig) (internal.Detector, error) {
cfg := dcfg.(Config)

if cfg.APIURL == "" {
return nil, fmt.Errorf("apiUrl could not be empty")
}

return &detector{
apiKey: cfg.APIKey,
apiURL: cfg.APIURL,
logger: set.Logger,
// TODO: interval request
interval: time.Second * 5,
}, nil
}

// Detect detects http response metadata and returns a resource with the available ones
func (d detector) Detect(ctx context.Context) (resource pcommon.Resource, schemaURL string, err error) {
res := pcommon.NewResource()

detectedResources := d.requestResourceAttributes()

for _, resAttr := range detectedResources {
res.Attributes().PutStr(resAttr.Key, resAttr.Value)
}

return res, conventions.SchemaURL, nil
}

func (d detector) requestResourceAttributes() []resourceAttribute {
var resources []resourceAttribute
resp, err := http.Get(d.apiURL)
if err != nil {
d.logger.Warn("Failed to fetch resource", zap.Error(err))
return resources
}

defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
d.logger.Warn("Failed to fetch resource", zap.Error(err))
return resources
}

err = jsoniter.Unmarshal(body, &resources)
if err != nil {
d.logger.Warn("Failed to fetch resource", zap.Error(err))
return resources
}

return resources
}
40 changes: 40 additions & 0 deletions processor/resourcedetectionprocessor/internal/http/http_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package http

import (
"context"
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/processor/processortest"
conventions "go.opentelemetry.io/collector/semconv/v1.22.0"
)

type mockMetadata struct {
mock.Mock
}

func TestDetect(t *testing.T) {
handler := http.NotFound
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
handler(w, r)
}))
defer ts.Close()
handler = func(w http.ResponseWriter, r *http.Request) {
outPut := `[{"key":"attributes_1","value":"foo"},{"key":"attributes_2","value":"bar"}]`
_, _ = w.Write([]byte(outPut))
}
defaultCfg := CreateDefaultConfig()
defaultCfg.APIURL = ts.URL

d, err := NewDetector(processortest.NewNopSettings(), defaultCfg)
require.NoError(t, err)
res, schemaURL, err := d.Detect(context.Background())
require.NoError(t, err)
require.Equal(t, 2, res.Attributes().Len())
require.NotNil(t, res)
assert.Equal(t, conventions.SchemaURL, schemaURL)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
type: resourcedetectionprocessor/http

parent: resourcedetection

status:
class: pkg
codeowners:
active: [JaredTan95]
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,9 @@ func NewResourceProvider(logger *zap.Logger, timeout time.Duration, attributesTo
}

func (p *ResourceProvider) Get(ctx context.Context, client *http.Client) (resource pcommon.Resource, schemaURL string, err error) {
p.once.Do(func() {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, client.Timeout)
defer cancel()
p.detectResource(ctx)
})
ctx, cancel := context.WithTimeout(ctx, client.Timeout)
defer cancel()
p.detectResource(ctx)

return p.detectedResource.resource, p.detectedResource.schemaURL, p.detectedResource.err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ package resourcedetectionprocessor // import "github.com/open-telemetry/opentele

import (
"context"
"net/http"
"time"

"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confighttp"
Expand All @@ -23,15 +27,28 @@ type resourceDetectionProcessor struct {
override bool
httpClientSettings confighttp.ClientConfig
telemetrySettings component.TelemetrySettings
detectInterval time.Duration
}

// Start is invoked during service startup.
func (rdp *resourceDetectionProcessor) Start(ctx context.Context, host component.Host) error {
client, _ := rdp.httpClientSettings.ToClient(ctx, host, rdp.telemetrySettings)
ctx = internal.ContextWithClient(ctx, client)
go rdp.tickerDetect(ctx, client)
return nil
}

func (rdp *resourceDetectionProcessor) tickerDetect(ctx context.Context, client *http.Client) {
intervalTicker := time.NewTicker(rdp.detectInterval)
defer intervalTicker.Stop()

var err error
rdp.resource, rdp.schemaURL, err = rdp.provider.Get(ctx, client)
return err
for range intervalTicker.C {
rdp.resource, rdp.schemaURL, err = rdp.provider.Get(ctx, client)
if err != nil {
rdp.telemetrySettings.Logger.Error("failed to retrieve resource from provider: %v", zap.Error(err))
}
}
}

// processTraces implements the ProcessTracesFunc type.
Expand Down
62 changes: 62 additions & 0 deletions processor/resourcedetectionprocessor/testdata/otel-col-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317

exporters:
debug:
verbosity: detailed
sampling_initial: 5
sampling_thereafter: 200
otlp/jaeger:
endpoint: localhost:14317
tls:
insecure: true
retry_on_failure:
enabled: true
max_elapsed_time: 500s
sending_queue:
enabled: true

processors:
batch:
resourcedetection/sgm:
detectors: [env,http]
timeout: 2s
override: false
detect_interval: 10s
http:
api_url: https://apifoxmock.com/m1/3552517-2307922-default/mock/auto-tagging
resourcedetection/system:
detectors: [env, system]
timeout: 2s
override: false
system:
resource_attributes:
host.name:
enabled: true
host.id:
enabled: true
os.type:
enabled: true
attributes: ["a", "b"]

extensions:
health_check:
pprof:
endpoint: :1888
zpages:
endpoint: :55679

service:
extensions: [pprof, zpages, health_check]
pipelines:
traces:
receivers: [otlp]
processors: [batch,resourcedetection/sgm]
exporters: [debug,otlp/jaeger]
metrics:
receivers: [otlp]
processors: [batch,resourcedetection/sgm]
exporters: [debug]

0 comments on commit 5bb8a12

Please sign in to comment.