Skip to content

Commit

Permalink
Cleanup after the Otel upgrade (#4359)
Browse files Browse the repository at this point in the history
* Cleanup after the Otel upgrade

* Update docs/sources/flow/reference/components/otelcol.processor.batch.md

Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com>

* PR review improvements

* Update comment

* Fix linter issue, document remote_sampling change

---------

Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com>
  • Loading branch information
ptodev and clayton-cornell authored Jul 12, 2023
1 parent 90caf99 commit 1b7e538
Show file tree
Hide file tree
Showing 24 changed files with 957 additions and 246 deletions.
49 changes: 48 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Main (unreleased)

- `otelcol.exporter.loki` now includes the instrumentation scope in its output. (@ptodev)

- `otelcol.extension.jaeger_remote_sampling` removes the `\` HTTP endpoint. The `/sampling` endpoint is still functional.
- `otelcol.extension.jaeger_remote_sampling` removes the `/` HTTP endpoint. The `/sampling` endpoint is still functional.
The change was made in PR [#18070](https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/18070) of opentelemetry-collector-contrib. (@ptodev)

- The field `version` and `auth` struct block from `walk_params` in `prometheus.exporter.snmp` and SNMP integration have been removed. The auth block now can be configured at top level, together with `modules` (@marctc)
Expand All @@ -29,6 +29,33 @@ Main (unreleased)
discovers file on the local filesystem, and so it doesn't get confused with
Prometheus' file discovery. (@rfratto)

- In the traces subsystem for Static mode, some metrics are removed and others are renamed. (@ptodev)
- Removed metrics:
- "blackbox_exporter_config_last_reload_success_timestamp_seconds" (gauge)
- "blackbox_exporter_config_last_reload_successful" (gauge)
- "blackbox_module_unknown_total" (counter)
- "traces_processor_tail_sampling_count_traces_sampled" (counter)
- "traces_processor_tail_sampling_new_trace_id_received" (counter)
- "traces_processor_tail_sampling_sampling_decision_latency" (histogram)
- "traces_processor_tail_sampling_sampling_decision_timer_latency" (histogram)
- "traces_processor_tail_sampling_sampling_policy_evaluation_error" (counter)
- "traces_processor_tail_sampling_sampling_trace_dropped_too_early" (counter)
- "traces_processor_tail_sampling_sampling_traces_on_memory" (gauge)
- "traces_receiver_accepted_spans" (counter)
- "traces_receiver_refused_spans" (counter)
- "traces_exporter_enqueue_failed_log_records" (counter)
- "traces_exporter_enqueue_failed_metric_points" (counter)
- "traces_exporter_enqueue_failed_spans" (counter)
- "traces_exporter_queue_capacity" (gauge)
- "traces_exporter_queue_size" (gauge)

- Renamed metrics:
- "traces_receiver_refused_spans" is renamed to "traces_receiver_refused_spans_total"
- "traces_receiver_accepted_spans" is renamed to "traces_receiver_refused_spans_total"
- "traces_exporter_sent_metric_points" is renamed to "traces_exporter_sent_metric_points_total"

- The `remote_sampling` block has been removed from `otelcol.receiver.jaeger`. (@ptodev)

### Features

- The Pyroscope scrape component computes and sends delta profiles automatically when required to reduce bandwidth usage. (@cyriltovena)
Expand Down Expand Up @@ -72,6 +99,14 @@ Main (unreleased)
- `grafana-agent tools prometheus.remote_write` holds a collection of remote
write-specific tools. These have been ported over from the `agentctl` command. (@rfratto)

- A new `action` argument for `otelcol.auth.headers`. (@ptodev)

- New `metadata_keys` and `metadata_cardinality_limit` arguments for `otelcol.processor.batch`. (@ptodev)

- New `boolean_attribute` and `ottl_condition` sampling policies for `otelcol.processor.tail_sampling`. (@ptodev)

- A new `initial_offset` argument for `otelcol.receiver.kafka`. (@ptodev)

### Enhancements

- Attributes and blocks set to their default values will no longer be shown in the Flow UI. (@rfratto)
Expand Down Expand Up @@ -109,6 +144,16 @@ Main (unreleased)

- Allow `prometheus.exporter.snmp` and SNMP integration to be configured passing a YAML block. (@marctc)

- Some metrics have been added to the traces subsystem for Static mode. (@ptodev)
- "traces_processor_batch_batch_send_size" (histogram)
- "traces_processor_batch_batch_size_trigger_send_total" (counter)
- "traces_processor_batch_metadata_cardinality" (gauge)
- "traces_processor_batch_timeout_trigger_send_total" (counter)
- "traces_rpc_server_duration" (histogram)
- "traces_exporter_send_failed_metric_points_total" (counter)
- "traces_exporter_send_failed_spans_total" (counter)
- "traces_exporter_sent_spans_total" (counter)

### Bugfixes

- Add signing region to remote.s3 component for use with custom endpoints so that Authorization Headers work correctly when
Expand Down Expand Up @@ -143,6 +188,8 @@ Main (unreleased)
- Mongodb integration has been re-enabled. (@jcreixell, @marctc)
- Build with go 1.20.6 (@captncraig)

- `otelcol.exporter.jaeger` has been deprecated and will be removed soon. (@ptodev)

v0.34.3 (2023-06-27)
--------------------

Expand Down
11 changes: 2 additions & 9 deletions component/otelcol/auth/bearer/bearer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,9 @@ var DefaultArguments = Arguments{
Scheme: "Bearer",
}

// UnmarshalRiver implements river.Unmarshaler.
func (args *Arguments) UnmarshalRiver(f func(interface{}) error) error {
// SetToDefault implements river.Defaulter.
func (args *Arguments) SetToDefault() {
*args = DefaultArguments

type arguments Arguments
if err := f((*arguments)(args)); err != nil {
return err
}

return nil
}

// Convert implements auth.Arguments.
Expand Down
80 changes: 80 additions & 0 deletions component/otelcol/auth/headers/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@
package headers

import (
"encoding"
"fmt"
"strings"

"github.com/grafana/agent/component"
"github.com/grafana/agent/component/otelcol/auth"
"github.com/grafana/agent/pkg/river"
"github.com/grafana/agent/pkg/river/rivertypes"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/headerssetterextension"
otelcomponent "go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -40,6 +43,11 @@ func (args Arguments) Convert() (otelcomponent.Config, error) {
Key: &h.Key,
}

err := h.Action.Convert(&upstreamHeader)
if err != nil {
return nil, err
}

if h.Value != nil {
upstreamHeader.Value = &h.Value.Value
}
Expand All @@ -65,15 +73,87 @@ func (args Arguments) Exporters() map[otelcomponent.DataType]map[otelcomponent.I
return nil
}

type Action string

const (
ActionInsert Action = "insert"
ActionUpdate Action = "update"
ActionUpsert Action = "upsert"
ActionDelete Action = "delete"
)

var (
_ river.Validator = (*Action)(nil)
_ encoding.TextUnmarshaler = (*Action)(nil)
)

// Validate implements river.Validator.
func (a *Action) Validate() error {
switch *a {
case ActionInsert, ActionUpdate, ActionUpsert, ActionDelete:
// This is a valid value, do not error
default:
return fmt.Errorf("action is set to an invalid value of %q", *a)
}
return nil
}

// Convert the River type to the Otel type.
// TODO: When headerssetterextension.actionValue is made external,
// remove the input parameter and make this output the Otel type.
func (a *Action) Convert(hc *headerssetterextension.HeaderConfig) error {
switch *a {
case ActionInsert:
hc.Action = headerssetterextension.INSERT
case ActionUpdate:
hc.Action = headerssetterextension.UPDATE
case ActionUpsert:
hc.Action = headerssetterextension.UPSERT
case ActionDelete:
hc.Action = headerssetterextension.DELETE
default:
return fmt.Errorf("action is set to an invalid value of %q", *a)
}
return nil
}

func (a *Action) UnmarshalText(text []byte) error {
str := Action(strings.ToLower(string(text)))
switch str {
case ActionInsert, ActionUpdate, ActionUpsert, ActionDelete:
*a = str
return nil
default:
return fmt.Errorf("unknown action %v", str)
}
}

// Header is an individual Header to send along with requests.
type Header struct {
Key string `river:"key,attr"`
Value *rivertypes.OptionalSecret `river:"value,attr,optional"`
FromContext *string `river:"from_context,attr,optional"`
Action Action `river:"action,attr,optional"`
}

var _ river.Defaulter = &Header{}

var DefaultHeader = Header{
Action: ActionUpsert,
}

// SetToDefault implements river.Defaulter.
func (h *Header) SetToDefault() {
*h = DefaultHeader
}

// Validate implements river.Validator.
func (h *Header) Validate() error {
err := h.Action.Validate()
if err != nil {
return err
}

switch {
case h.Key == "":
return fmt.Errorf("key must be set to a non-empty string")
Expand Down
103 changes: 103 additions & 0 deletions component/otelcol/auth/headers/headers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/grafana/agent/pkg/flow/componenttest"
"github.com/grafana/agent/pkg/river"
"github.com/grafana/agent/pkg/util"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/headerssetterextension"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
extauth "go.opentelemetry.io/collector/extension/auth"
Expand Down Expand Up @@ -78,3 +79,105 @@ func Test(t *testing.T) {
require.NoError(t, err, "HTTP request failed")
require.Equal(t, http.StatusOK, resp.StatusCode)
}

func TestArguments_UnmarshalRiver(t *testing.T) {
tests := []struct {
cfg string
expectedKey string
expectedValue string
expectedAction any
expectUnmarshalError bool
}{
{
cfg: `
header {
key = "X-Scope-Org-ID"
value = "fake"
}
`,
expectedKey: "X-Scope-Org-ID",
expectedValue: "fake",
expectedAction: headerssetterextension.UPSERT,
},
{
cfg: `
header {
key = "X-Scope-Org-ID"
value = "fake"
action = "insert"
}
`,
expectedKey: "X-Scope-Org-ID",
expectedValue: "fake",
expectedAction: headerssetterextension.INSERT,
},
{
cfg: `
header {
key = "X-Scope-Org-ID"
value = "fake"
action = "update"
}
`,
expectedKey: "X-Scope-Org-ID",
expectedValue: "fake",
expectedAction: headerssetterextension.UPDATE,
},
{
cfg: `
header {
key = "X-Scope-Org-ID"
value = "fake"
action = "upsert"
}
`,
expectedKey: "X-Scope-Org-ID",
expectedValue: "fake",
expectedAction: headerssetterextension.UPSERT,
},
{
cfg: `
header {
key = "X-Scope-Org-ID"
value = "fake"
action = "delete"
}
`,
expectedKey: "X-Scope-Org-ID",
expectedValue: "fake",
expectedAction: headerssetterextension.DELETE,
},
{
cfg: `
header {
key = "X-Scope-Org-ID"
value = "fake"
action = "NonExistingAction"
}
`,
expectUnmarshalError: true,
},
}

for _, tc := range tests {
var args headers.Arguments
err := river.Unmarshal([]byte(tc.cfg), &args)

if tc.expectUnmarshalError {
require.Error(t, err)
continue
}
require.NoError(t, err)

ext, err := args.Convert()

require.NoError(t, err)
otelArgs, ok := (ext).(*headerssetterextension.Config)
require.True(t, ok)

require.Equal(t, len(otelArgs.HeadersConfig), 1)
require.Equal(t, *otelArgs.HeadersConfig[0].Key, tc.expectedKey)
require.Equal(t, *otelArgs.HeadersConfig[0].Value, tc.expectedValue)
require.Equal(t, otelArgs.HeadersConfig[0].Action, tc.expectedAction)
}
}
21 changes: 13 additions & 8 deletions component/otelcol/processor/batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ func init() {

// Arguments configures the otelcol.processor.batch component.
type Arguments struct {
Timeout time.Duration `river:"timeout,attr,optional"`
SendBatchSize uint32 `river:"send_batch_size,attr,optional"`
SendBatchMaxSize uint32 `river:"send_batch_max_size,attr,optional"`
Timeout time.Duration `river:"timeout,attr,optional"`
SendBatchSize uint32 `river:"send_batch_size,attr,optional"`
SendBatchMaxSize uint32 `river:"send_batch_max_size,attr,optional"`
MetadataKeys []string `river:"metadata_keys,attr,optional"`
MetadataCardinalityLimit uint32 `river:"metadata_cardinality_limit,attr,optional"`

// Output configures where to send processed data. Required.
Output *otelcol.ConsumerArguments `river:"output,block"`
Expand All @@ -42,8 +44,9 @@ var (

// DefaultArguments holds default settings for Arguments.
var DefaultArguments = Arguments{
Timeout: 200 * time.Millisecond,
SendBatchSize: 8192,
Timeout: 200 * time.Millisecond,
SendBatchSize: 8192,
MetadataCardinalityLimit: 1000,
}

// SetToDefault implements river.Defaulter.
Expand All @@ -62,9 +65,11 @@ func (args *Arguments) Validate() error {
// Convert implements processor.Arguments.
func (args Arguments) Convert() (otelcomponent.Config, error) {
return &batchprocessor.Config{
Timeout: args.Timeout,
SendBatchSize: args.SendBatchSize,
SendBatchMaxSize: args.SendBatchMaxSize,
Timeout: args.Timeout,
SendBatchSize: args.SendBatchSize,
SendBatchMaxSize: args.SendBatchMaxSize,
MetadataKeys: args.MetadataKeys,
MetadataCardinalityLimit: args.MetadataCardinalityLimit,
}, nil
}

Expand Down
Loading

0 comments on commit 1b7e538

Please sign in to comment.