diff --git a/CHANGELOG.md b/CHANGELOG.md index 0a0b21228bc0..329d6a591a43 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,7 +20,7 @@ Main (unreleased) - `otelcol.exporter.loki` now includes the instrumentation scope in its output. (@ptodev) -- `otelcol.extension.jaeger_remote_sampling` removes the `\` HTTP endpoint. The `/sampling` endpoint is still functional. +- `otelcol.extension.jaeger_remote_sampling` removes the `/` HTTP endpoint. The `/sampling` endpoint is still functional. The change was made in PR [#18070](https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/18070) of opentelemetry-collector-contrib. (@ptodev) - The field `version` and `auth` struct block from `walk_params` in `prometheus.exporter.snmp` and SNMP integration have been removed. The auth block now can be configured at top level, together with `modules` (@marctc) @@ -29,6 +29,33 @@ Main (unreleased) discovers file on the local filesystem, and so it doesn't get confused with Prometheus' file discovery. (@rfratto) +- In the traces subsystem for Static mode, some metrics are removed and others are renamed. (@ptodev) + - Removed metrics: + - "blackbox_exporter_config_last_reload_success_timestamp_seconds" (gauge) + - "blackbox_exporter_config_last_reload_successful" (gauge) + - "blackbox_module_unknown_total" (counter) + - "traces_processor_tail_sampling_count_traces_sampled" (counter) + - "traces_processor_tail_sampling_new_trace_id_received" (counter) + - "traces_processor_tail_sampling_sampling_decision_latency" (histogram) + - "traces_processor_tail_sampling_sampling_decision_timer_latency" (histogram) + - "traces_processor_tail_sampling_sampling_policy_evaluation_error" (counter) + - "traces_processor_tail_sampling_sampling_trace_dropped_too_early" (counter) + - "traces_processor_tail_sampling_sampling_traces_on_memory" (gauge) + - "traces_receiver_accepted_spans" (counter) + - "traces_receiver_refused_spans" (counter) + - "traces_exporter_enqueue_failed_log_records" (counter) + - "traces_exporter_enqueue_failed_metric_points" (counter) + - "traces_exporter_enqueue_failed_spans" (counter) + - "traces_exporter_queue_capacity" (gauge) + - "traces_exporter_queue_size" (gauge) + + - Renamed metrics: + - "traces_receiver_refused_spans" is renamed to "traces_receiver_refused_spans_total" + - "traces_receiver_accepted_spans" is renamed to "traces_receiver_refused_spans_total" + - "traces_exporter_sent_metric_points" is renamed to "traces_exporter_sent_metric_points_total" + +- The `remote_sampling` block has been removed from `otelcol.receiver.jaeger`. (@ptodev) + ### Features - The Pyroscope scrape component computes and sends delta profiles automatically when required to reduce bandwidth usage. (@cyriltovena) @@ -72,6 +99,14 @@ Main (unreleased) - `grafana-agent tools prometheus.remote_write` holds a collection of remote write-specific tools. These have been ported over from the `agentctl` command. (@rfratto) +- A new `action` argument for `otelcol.auth.headers`. (@ptodev) + +- New `metadata_keys` and `metadata_cardinality_limit` arguments for `otelcol.processor.batch`. (@ptodev) + +- New `boolean_attribute` and `ottl_condition` sampling policies for `otelcol.processor.tail_sampling`. (@ptodev) + +- A new `initial_offset` argument for `otelcol.receiver.kafka`. (@ptodev) + ### Enhancements - Attributes and blocks set to their default values will no longer be shown in the Flow UI. (@rfratto) @@ -109,6 +144,16 @@ Main (unreleased) - Allow `prometheus.exporter.snmp` and SNMP integration to be configured passing a YAML block. (@marctc) +- Some metrics have been added to the traces subsystem for Static mode. (@ptodev) + - "traces_processor_batch_batch_send_size" (histogram) + - "traces_processor_batch_batch_size_trigger_send_total" (counter) + - "traces_processor_batch_metadata_cardinality" (gauge) + - "traces_processor_batch_timeout_trigger_send_total" (counter) + - "traces_rpc_server_duration" (histogram) + - "traces_exporter_send_failed_metric_points_total" (counter) + - "traces_exporter_send_failed_spans_total" (counter) + - "traces_exporter_sent_spans_total" (counter) + ### Bugfixes - Add signing region to remote.s3 component for use with custom endpoints so that Authorization Headers work correctly when @@ -143,6 +188,8 @@ Main (unreleased) - Mongodb integration has been re-enabled. (@jcreixell, @marctc) - Build with go 1.20.6 (@captncraig) +- `otelcol.exporter.jaeger` has been deprecated and will be removed soon. (@ptodev) + v0.34.3 (2023-06-27) -------------------- diff --git a/component/otelcol/auth/bearer/bearer.go b/component/otelcol/auth/bearer/bearer.go index 53978ff054f9..d25747737ef7 100644 --- a/component/otelcol/auth/bearer/bearer.go +++ b/component/otelcol/auth/bearer/bearer.go @@ -38,16 +38,9 @@ var DefaultArguments = Arguments{ Scheme: "Bearer", } -// UnmarshalRiver implements river.Unmarshaler. -func (args *Arguments) UnmarshalRiver(f func(interface{}) error) error { +// SetToDefault implements river.Defaulter. +func (args *Arguments) SetToDefault() { *args = DefaultArguments - - type arguments Arguments - if err := f((*arguments)(args)); err != nil { - return err - } - - return nil } // Convert implements auth.Arguments. diff --git a/component/otelcol/auth/headers/headers.go b/component/otelcol/auth/headers/headers.go index 4f470bb38ff3..1b17cd52c47d 100644 --- a/component/otelcol/auth/headers/headers.go +++ b/component/otelcol/auth/headers/headers.go @@ -2,10 +2,13 @@ package headers import ( + "encoding" "fmt" + "strings" "github.com/grafana/agent/component" "github.com/grafana/agent/component/otelcol/auth" + "github.com/grafana/agent/pkg/river" "github.com/grafana/agent/pkg/river/rivertypes" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/headerssetterextension" otelcomponent "go.opentelemetry.io/collector/component" @@ -40,6 +43,11 @@ func (args Arguments) Convert() (otelcomponent.Config, error) { Key: &h.Key, } + err := h.Action.Convert(&upstreamHeader) + if err != nil { + return nil, err + } + if h.Value != nil { upstreamHeader.Value = &h.Value.Value } @@ -65,15 +73,87 @@ func (args Arguments) Exporters() map[otelcomponent.DataType]map[otelcomponent.I return nil } +type Action string + +const ( + ActionInsert Action = "insert" + ActionUpdate Action = "update" + ActionUpsert Action = "upsert" + ActionDelete Action = "delete" +) + +var ( + _ river.Validator = (*Action)(nil) + _ encoding.TextUnmarshaler = (*Action)(nil) +) + +// Validate implements river.Validator. +func (a *Action) Validate() error { + switch *a { + case ActionInsert, ActionUpdate, ActionUpsert, ActionDelete: + // This is a valid value, do not error + default: + return fmt.Errorf("action is set to an invalid value of %q", *a) + } + return nil +} + +// Convert the River type to the Otel type. +// TODO: When headerssetterextension.actionValue is made external, +// remove the input parameter and make this output the Otel type. +func (a *Action) Convert(hc *headerssetterextension.HeaderConfig) error { + switch *a { + case ActionInsert: + hc.Action = headerssetterextension.INSERT + case ActionUpdate: + hc.Action = headerssetterextension.UPDATE + case ActionUpsert: + hc.Action = headerssetterextension.UPSERT + case ActionDelete: + hc.Action = headerssetterextension.DELETE + default: + return fmt.Errorf("action is set to an invalid value of %q", *a) + } + return nil +} + +func (a *Action) UnmarshalText(text []byte) error { + str := Action(strings.ToLower(string(text))) + switch str { + case ActionInsert, ActionUpdate, ActionUpsert, ActionDelete: + *a = str + return nil + default: + return fmt.Errorf("unknown action %v", str) + } +} + // Header is an individual Header to send along with requests. type Header struct { Key string `river:"key,attr"` Value *rivertypes.OptionalSecret `river:"value,attr,optional"` FromContext *string `river:"from_context,attr,optional"` + Action Action `river:"action,attr,optional"` +} + +var _ river.Defaulter = &Header{} + +var DefaultHeader = Header{ + Action: ActionUpsert, +} + +// SetToDefault implements river.Defaulter. +func (h *Header) SetToDefault() { + *h = DefaultHeader } // Validate implements river.Validator. func (h *Header) Validate() error { + err := h.Action.Validate() + if err != nil { + return err + } + switch { case h.Key == "": return fmt.Errorf("key must be set to a non-empty string") diff --git a/component/otelcol/auth/headers/headers_test.go b/component/otelcol/auth/headers/headers_test.go index 66a60d8ea427..c0ea19bd5982 100644 --- a/component/otelcol/auth/headers/headers_test.go +++ b/component/otelcol/auth/headers/headers_test.go @@ -12,6 +12,7 @@ import ( "github.com/grafana/agent/pkg/flow/componenttest" "github.com/grafana/agent/pkg/river" "github.com/grafana/agent/pkg/util" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/headerssetterextension" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" extauth "go.opentelemetry.io/collector/extension/auth" @@ -78,3 +79,105 @@ func Test(t *testing.T) { require.NoError(t, err, "HTTP request failed") require.Equal(t, http.StatusOK, resp.StatusCode) } + +func TestArguments_UnmarshalRiver(t *testing.T) { + tests := []struct { + cfg string + expectedKey string + expectedValue string + expectedAction any + expectUnmarshalError bool + }{ + { + cfg: ` + header { + key = "X-Scope-Org-ID" + value = "fake" + } + `, + expectedKey: "X-Scope-Org-ID", + expectedValue: "fake", + expectedAction: headerssetterextension.UPSERT, + }, + { + cfg: ` + header { + key = "X-Scope-Org-ID" + value = "fake" + action = "insert" + } + `, + expectedKey: "X-Scope-Org-ID", + expectedValue: "fake", + expectedAction: headerssetterextension.INSERT, + }, + { + cfg: ` + header { + key = "X-Scope-Org-ID" + value = "fake" + action = "update" + } + `, + expectedKey: "X-Scope-Org-ID", + expectedValue: "fake", + expectedAction: headerssetterextension.UPDATE, + }, + { + cfg: ` + header { + key = "X-Scope-Org-ID" + value = "fake" + action = "upsert" + } + `, + expectedKey: "X-Scope-Org-ID", + expectedValue: "fake", + expectedAction: headerssetterextension.UPSERT, + }, + { + cfg: ` + header { + key = "X-Scope-Org-ID" + value = "fake" + action = "delete" + } + `, + expectedKey: "X-Scope-Org-ID", + expectedValue: "fake", + expectedAction: headerssetterextension.DELETE, + }, + { + cfg: ` + header { + key = "X-Scope-Org-ID" + value = "fake" + action = "NonExistingAction" + } + `, + expectUnmarshalError: true, + }, + } + + for _, tc := range tests { + var args headers.Arguments + err := river.Unmarshal([]byte(tc.cfg), &args) + + if tc.expectUnmarshalError { + require.Error(t, err) + continue + } + require.NoError(t, err) + + ext, err := args.Convert() + + require.NoError(t, err) + otelArgs, ok := (ext).(*headerssetterextension.Config) + require.True(t, ok) + + require.Equal(t, len(otelArgs.HeadersConfig), 1) + require.Equal(t, *otelArgs.HeadersConfig[0].Key, tc.expectedKey) + require.Equal(t, *otelArgs.HeadersConfig[0].Value, tc.expectedValue) + require.Equal(t, otelArgs.HeadersConfig[0].Action, tc.expectedAction) + } +} diff --git a/component/otelcol/processor/batch/batch.go b/component/otelcol/processor/batch/batch.go index 54a5260134c2..3c205a0e4320 100644 --- a/component/otelcol/processor/batch/batch.go +++ b/component/otelcol/processor/batch/batch.go @@ -28,9 +28,11 @@ func init() { // Arguments configures the otelcol.processor.batch component. type Arguments struct { - Timeout time.Duration `river:"timeout,attr,optional"` - SendBatchSize uint32 `river:"send_batch_size,attr,optional"` - SendBatchMaxSize uint32 `river:"send_batch_max_size,attr,optional"` + Timeout time.Duration `river:"timeout,attr,optional"` + SendBatchSize uint32 `river:"send_batch_size,attr,optional"` + SendBatchMaxSize uint32 `river:"send_batch_max_size,attr,optional"` + MetadataKeys []string `river:"metadata_keys,attr,optional"` + MetadataCardinalityLimit uint32 `river:"metadata_cardinality_limit,attr,optional"` // Output configures where to send processed data. Required. Output *otelcol.ConsumerArguments `river:"output,block"` @@ -42,8 +44,9 @@ var ( // DefaultArguments holds default settings for Arguments. var DefaultArguments = Arguments{ - Timeout: 200 * time.Millisecond, - SendBatchSize: 8192, + Timeout: 200 * time.Millisecond, + SendBatchSize: 8192, + MetadataCardinalityLimit: 1000, } // SetToDefault implements river.Defaulter. @@ -62,9 +65,11 @@ func (args *Arguments) Validate() error { // Convert implements processor.Arguments. func (args Arguments) Convert() (otelcomponent.Config, error) { return &batchprocessor.Config{ - Timeout: args.Timeout, - SendBatchSize: args.SendBatchSize, - SendBatchMaxSize: args.SendBatchMaxSize, + Timeout: args.Timeout, + SendBatchSize: args.SendBatchSize, + SendBatchMaxSize: args.SendBatchMaxSize, + MetadataKeys: args.MetadataKeys, + MetadataCardinalityLimit: args.MetadataCardinalityLimit, }, nil } diff --git a/component/otelcol/processor/batch/batch_test.go b/component/otelcol/processor/batch/batch_test.go index a5056366859f..75d1fa96af08 100644 --- a/component/otelcol/processor/batch/batch_test.go +++ b/component/otelcol/processor/batch/batch_test.go @@ -15,6 +15,7 @@ import ( "github.com/grafana/dskit/backoff" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/processor/batchprocessor" ) // Test performs a basic integration test which runs the @@ -116,3 +117,73 @@ func createTestTraces() ptrace.Traces { } return data } + +func TestArguments_UnmarshalRiver(t *testing.T) { + tests := []struct { + cfg string + expectedArguments batch.Arguments + }{ + { + cfg: ` + output {} + `, + expectedArguments: batch.Arguments{ + Timeout: batch.DefaultArguments.Timeout, + SendBatchSize: batch.DefaultArguments.SendBatchSize, + SendBatchMaxSize: 0, + MetadataKeys: nil, + MetadataCardinalityLimit: batch.DefaultArguments.MetadataCardinalityLimit, + }, + }, + { + cfg: ` + timeout = "11s" + send_batch_size = 8000 + send_batch_max_size = 10000 + output {} + `, + expectedArguments: batch.Arguments{ + Timeout: 11 * time.Second, + SendBatchSize: 8000, + SendBatchMaxSize: 10000, + MetadataKeys: nil, + MetadataCardinalityLimit: batch.DefaultArguments.MetadataCardinalityLimit, + }, + }, + { + cfg: ` + timeout = "11s" + send_batch_size = 8000 + send_batch_max_size = 10000 + metadata_keys = ["tenant_id"] + metadata_cardinality_limit = 123 + output {} + `, + expectedArguments: batch.Arguments{ + Timeout: 11 * time.Second, + SendBatchSize: 8000, + SendBatchMaxSize: 10000, + MetadataKeys: []string{"tenant_id"}, + MetadataCardinalityLimit: 123, + }, + }, + } + + for _, tc := range tests { + var args batch.Arguments + err := river.Unmarshal([]byte(tc.cfg), &args) + require.NoError(t, err) + + ext, err := args.Convert() + require.NoError(t, err) + + otelArgs, ok := (ext).(*batchprocessor.Config) + require.True(t, ok) + + require.Equal(t, otelArgs.Timeout, tc.expectedArguments.Timeout) + require.Equal(t, otelArgs.SendBatchSize, tc.expectedArguments.SendBatchSize) + require.Equal(t, otelArgs.SendBatchMaxSize, tc.expectedArguments.SendBatchMaxSize) + require.Equal(t, otelArgs.MetadataKeys, tc.expectedArguments.MetadataKeys) + require.Equal(t, otelArgs.MetadataCardinalityLimit, tc.expectedArguments.MetadataCardinalityLimit) + } +} diff --git a/component/otelcol/processor/tail_sampling/tail_sampling.go b/component/otelcol/processor/tail_sampling/tail_sampling.go index 60ea4006575a..dc2f33bb661d 100644 --- a/component/otelcol/processor/tail_sampling/tail_sampling.go +++ b/component/otelcol/processor/tail_sampling/tail_sampling.go @@ -28,10 +28,10 @@ func init() { // Arguments configures the otelcol.processor.tail_sampling component. type Arguments struct { - PolicyCfgs []PolicyCfg `river:"policy,block"` - DecisionWait time.Duration `river:"decision_wait,attr,optional"` - NumTraces uint64 `river:"num_traces,attr,optional"` - ExpectedNewTracesPerSec uint64 `river:"expected_new_traces_per_sec,attr,optional"` + PolicyCfgs []PolicyConfig `river:"policy,block"` + DecisionWait time.Duration `river:"decision_wait,attr,optional"` + NumTraces uint64 `river:"num_traces,attr,optional"` + ExpectedNewTracesPerSec uint64 `river:"expected_new_traces_per_sec,attr,optional"` // Output configures where to send processed data. Required. Output *otelcol.ConsumerArguments `river:"output,block"` } diff --git a/component/otelcol/processor/tail_sampling/tail_sampling_test.go b/component/otelcol/processor/tail_sampling/tail_sampling_test.go index 27524ea217be..581cbd491c28 100644 --- a/component/otelcol/processor/tail_sampling/tail_sampling_test.go +++ b/component/otelcol/processor/tail_sampling/tail_sampling_test.go @@ -36,6 +36,35 @@ func TestBadRiverConfig(t *testing.T) { require.Error(t, river.Unmarshal([]byte(exampleBadRiverConfig), &args), "num_traces must be greater than zero") } +func TestBadRiverConfigErrorMode(t *testing.T) { + exampleBadRiverConfig := ` + decision_wait = "10s" + num_traces = 5 + expected_new_traces_per_sec = 10 + policy { + name = "test-policy-1" + type = "ottl_condition" + ottl_condition { + error_mode = "" + span = [ + "attributes[\"test_attr_key_1\"] == \"test_attr_val_1\"", + "attributes[\"test_attr_key_2\"] != \"test_attr_val_1\"", + ] + spanevent = [ + "name != \"test_span_event_name\"", + "attributes[\"test_event_attr_key_2\"] != \"test_event_attr_val_1\"", + ] + } + } + output { + // no-op: will be overridden by test code. + } +` + + var args Arguments + require.ErrorContains(t, river.Unmarshal([]byte(exampleBadRiverConfig), &args), "\"\" unknown error mode") +} + func TestBadOtelConfig(t *testing.T) { var exampleBadOtelConfig = ` decision_wait = "10s" @@ -160,6 +189,51 @@ func TestBigConfig(t *testing.T) { values = ["value1", "value2"] } } + policy { + name = "test-policy-12" + type = "boolean_attribute" + boolean_attribute { + key = "key4" + value = true + } + } + policy { + name = "test-policy-13" + type = "ottl_condition" + ottl_condition { + error_mode = "ignore" + span = [ + "attributes[\"test_attr_key_1\"] == \"test_attr_val_1\"", + "attributes[\"test_attr_key_2\"] != \"test_attr_val_1\"", + ] + spanevent = [ + "name != \"test_span_event_name\"", + "attributes[\"test_event_attr_key_2\"] != \"test_event_attr_val_1\"", + ] + } + } + policy { + name = "test-policy-14" + type = "ottl_condition" + ottl_condition { + error_mode = "ignore" + span = [ + "attributes[\"test_attr_key_1\"] == \"test_attr_val_1\"", + "attributes[\"test_attr_key_2\"] != \"test_attr_val_1\"", + ] + } + } + policy { + name = "test-policy-15" + type = "ottl_condition" + ottl_condition { + error_mode = "propagate" + spanevent = [ + "name != \"test_span_event_name\"", + "attributes[\"test_event_attr_key_2\"] != \"test_event_attr_val_1\"", + ] + } + } policy{ name = "and-policy-1" type = "and" @@ -181,6 +255,29 @@ func TestBigConfig(t *testing.T) { values = ["value1", "value2"] } } + and_sub_policy { + name = "test-and-policy-3" + type = "boolean_attribute" + boolean_attribute { + key = "key4" + value = true + } + } + and_sub_policy { + name = "test-and-policy-4" + type = "ottl_condition" + ottl_condition { + error_mode = "ignore" + span = [ + "attributes[\"test_attr_key_1\"] == \"test_attr_val_1\"", + "attributes[\"test_attr_key_2\"] != \"test_attr_val_1\"", + ] + spanevent = [ + "name != \"test_span_event_name\"", + "attributes[\"test_event_attr_key_2\"] != \"test_event_attr_val_1\"", + ] + } + } } } policy{ @@ -210,6 +307,29 @@ func TestBigConfig(t *testing.T) { name = "test-composite-policy-3" type = "always_sample" } + composite_sub_policy { + name = "test-composite-policy-4" + type = "boolean_attribute" + boolean_attribute { + key = "key4" + value = true + } + } + composite_sub_policy { + name = "test-composite-policy-5" + type = "ottl_condition" + ottl_condition { + error_mode = "ignore" + span = [ + "attributes[\"test_attr_key_1\"] == \"test_attr_val_1\"", + "attributes[\"test_attr_key_2\"] != \"test_attr_val_1\"", + ] + spanevent = [ + "name != \"test_span_event_name\"", + "attributes[\"test_event_attr_key_2\"] != \"test_event_attr_val_1\"", + ] + } + } rate_allocation { policy = "test-composite-policy-1" percent = 50 diff --git a/component/otelcol/processor/tail_sampling/types.go b/component/otelcol/processor/tail_sampling/types.go index 6dad8dcacce6..c461aaafb113 100644 --- a/component/otelcol/processor/tail_sampling/types.go +++ b/component/otelcol/processor/tail_sampling/types.go @@ -1,75 +1,85 @@ package tail_sampling import ( + "encoding" + "fmt" + "strings" + + "github.com/grafana/agent/pkg/river" "github.com/mitchellh/mapstructure" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" tsp "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor" ) -type PolicyCfg struct { - SharedPolicyCfg SharedPolicyCfg `river:",squash"` +type PolicyConfig struct { + SharedPolicyConfig SharedPolicyConfig `river:",squash"` // Configs for defining composite policy - CompositeCfg CompositeCfg `river:"composite,block,optional"` + CompositeConfig CompositeConfig `river:"composite,block,optional"` // Configs for defining and policy - AndCfg AndCfg `river:"and,block,optional"` + AndConfig AndConfig `river:"and,block,optional"` } -func (policyCfg PolicyCfg) Convert() tsp.PolicyCfg { - var otelCfg tsp.PolicyCfg +func (policyConfig PolicyConfig) Convert() tsp.PolicyCfg { + var otelConfig tsp.PolicyCfg mustDecodeMapStructure(map[string]interface{}{ - "name": policyCfg.SharedPolicyCfg.Name, - "type": policyCfg.SharedPolicyCfg.Type, - "latency": policyCfg.SharedPolicyCfg.LatencyCfg.Convert(), - "numeric_attribute": policyCfg.SharedPolicyCfg.NumericAttributeCfg.Convert(), - "probabilistic": policyCfg.SharedPolicyCfg.ProbabilisticCfg.Convert(), - "status_code": policyCfg.SharedPolicyCfg.StatusCodeCfg.Convert(), - "string_attribute": policyCfg.SharedPolicyCfg.StringAttributeCfg.Convert(), - "rate_limiting": policyCfg.SharedPolicyCfg.RateLimitingCfg.Convert(), - "span_count": policyCfg.SharedPolicyCfg.SpanCountCfg.Convert(), - "trace_state": policyCfg.SharedPolicyCfg.TraceStateCfg.Convert(), - "composite": policyCfg.CompositeCfg.Convert(), - "and": policyCfg.AndCfg.Convert(), - }, &otelCfg) - - return otelCfg + "name": policyConfig.SharedPolicyConfig.Name, + "type": policyConfig.SharedPolicyConfig.Type, + "latency": policyConfig.SharedPolicyConfig.LatencyConfig.Convert(), + "numeric_attribute": policyConfig.SharedPolicyConfig.NumericAttributeConfig.Convert(), + "probabilistic": policyConfig.SharedPolicyConfig.ProbabilisticConfig.Convert(), + "status_code": policyConfig.SharedPolicyConfig.StatusCodeConfig.Convert(), + "string_attribute": policyConfig.SharedPolicyConfig.StringAttributeConfig.Convert(), + "rate_limiting": policyConfig.SharedPolicyConfig.RateLimitingConfig.Convert(), + "span_count": policyConfig.SharedPolicyConfig.SpanCountConfig.Convert(), + "boolean_attribute": policyConfig.SharedPolicyConfig.BooleanAttributeConfig.Convert(), + "ottl_condition": policyConfig.SharedPolicyConfig.OttlConditionConfig.Convert(), + "trace_state": policyConfig.SharedPolicyConfig.TraceStateConfig.Convert(), + "composite": policyConfig.CompositeConfig.Convert(), + "and": policyConfig.AndConfig.Convert(), + }, &otelConfig) + + return otelConfig } // This cannot currently have a Convert() because tsp.sharedPolicyCfg isn't public -type SharedPolicyCfg struct { - Name string `river:"name,attr"` - Type string `river:"type,attr"` - LatencyCfg LatencyCfg `river:"latency,block,optional"` - NumericAttributeCfg NumericAttributeCfg `river:"numeric_attribute,block,optional"` - ProbabilisticCfg ProbabilisticCfg `river:"probabilistic,block,optional"` - StatusCodeCfg StatusCodeCfg `river:"status_code,block,optional"` - StringAttributeCfg StringAttributeCfg `river:"string_attribute,block,optional"` - RateLimitingCfg RateLimitingCfg `river:"rate_limiting,block,optional"` - SpanCountCfg SpanCountCfg `river:"span_count,block,optional"` - TraceStateCfg TraceStateCfg `river:"trace_state,block,optional"` -} - -// LatencyCfg holds the configurable settings to create a latency filter sampling policy +type SharedPolicyConfig struct { + Name string `river:"name,attr"` + Type string `river:"type,attr"` + LatencyConfig LatencyConfig `river:"latency,block,optional"` + NumericAttributeConfig NumericAttributeConfig `river:"numeric_attribute,block,optional"` + ProbabilisticConfig ProbabilisticConfig `river:"probabilistic,block,optional"` + StatusCodeConfig StatusCodeConfig `river:"status_code,block,optional"` + StringAttributeConfig StringAttributeConfig `river:"string_attribute,block,optional"` + RateLimitingConfig RateLimitingConfig `river:"rate_limiting,block,optional"` + SpanCountConfig SpanCountConfig `river:"span_count,block,optional"` + BooleanAttributeConfig BooleanAttributeConfig `river:"boolean_attribute,block,optional"` + OttlConditionConfig OttlConditionConfig `river:"ottl_condition,block,optional"` + TraceStateConfig TraceStateConfig `river:"trace_state,block,optional"` +} + +// LatencyConfig holds the configurable settings to create a latency filter sampling policy // evaluator -type LatencyCfg struct { +type LatencyConfig struct { // ThresholdMs in milliseconds. ThresholdMs int64 `river:"threshold_ms,attr"` } -func (latencyCfg LatencyCfg) Convert() tsp.LatencyCfg { - otelCfg := tsp.LatencyCfg{} +func (latencyConfig LatencyConfig) Convert() tsp.LatencyCfg { + otelConfig := tsp.LatencyCfg{} mustDecodeMapStructure(map[string]interface{}{ - "threshold_ms": latencyCfg.ThresholdMs, - }, &otelCfg) + "threshold_ms": latencyConfig.ThresholdMs, + }, &otelConfig) - return otelCfg + return otelConfig } -// NumericAttributeCfg holds the configurable settings to create a numeric attribute filter +// NumericAttributeConfig holds the configurable settings to create a numeric attribute filter // sampling policy evaluator. -type NumericAttributeCfg struct { +type NumericAttributeConfig struct { // Tag that the filter is going to be matching against. Key string `river:"key,attr"` // MinValue is the minimum value of the attribute to be considered a match. @@ -78,21 +88,21 @@ type NumericAttributeCfg struct { MaxValue int64 `river:"max_value,attr"` } -func (numericAttributeCfg NumericAttributeCfg) Convert() tsp.NumericAttributeCfg { - var otelCfg tsp.NumericAttributeCfg +func (numericAttributeConfig NumericAttributeConfig) Convert() tsp.NumericAttributeCfg { + var otelConfig tsp.NumericAttributeCfg mustDecodeMapStructure(map[string]interface{}{ - "key": numericAttributeCfg.Key, - "min_value": numericAttributeCfg.MinValue, - "max_value": numericAttributeCfg.MaxValue, - }, &otelCfg) + "key": numericAttributeConfig.Key, + "min_value": numericAttributeConfig.MinValue, + "max_value": numericAttributeConfig.MaxValue, + }, &otelConfig) - return otelCfg + return otelConfig } -// ProbabilisticCfg holds the configurable settings to create a probabilistic +// ProbabilisticConfig holds the configurable settings to create a probabilistic // sampling policy evaluator. -type ProbabilisticCfg struct { +type ProbabilisticConfig struct { // HashSalt allows one to configure the hashing salts. This is important in scenarios where multiple layers of collectors // have different sampling rates: if they use the same salt all passing one layer may pass the other even if they have // different sampling rates, configuring different salts avoids that. @@ -102,36 +112,36 @@ type ProbabilisticCfg struct { SamplingPercentage float64 `river:"sampling_percentage,attr"` } -func (probabilisticCfg ProbabilisticCfg) Convert() tsp.ProbabilisticCfg { - var otelCfg tsp.ProbabilisticCfg +func (probabilisticConfig ProbabilisticConfig) Convert() tsp.ProbabilisticCfg { + var otelConfig tsp.ProbabilisticCfg mustDecodeMapStructure(map[string]interface{}{ - "hash_salt": probabilisticCfg.HashSalt, - "sampling_percentage": probabilisticCfg.SamplingPercentage, - }, &otelCfg) + "hash_salt": probabilisticConfig.HashSalt, + "sampling_percentage": probabilisticConfig.SamplingPercentage, + }, &otelConfig) - return otelCfg + return otelConfig } -// StatusCodeCfg holds the configurable settings to create a status code filter sampling +// StatusCodeConfig holds the configurable settings to create a status code filter sampling // policy evaluator. -type StatusCodeCfg struct { +type StatusCodeConfig struct { StatusCodes []string `river:"status_codes,attr"` } -func (statusCodeCfg StatusCodeCfg) Convert() tsp.StatusCodeCfg { - var otelCfg tsp.StatusCodeCfg +func (statusCodeConfig StatusCodeConfig) Convert() tsp.StatusCodeCfg { + var otelConfig tsp.StatusCodeCfg mustDecodeMapStructure(map[string]interface{}{ - "status_codes": statusCodeCfg.StatusCodes, - }, &otelCfg) + "status_codes": statusCodeConfig.StatusCodes, + }, &otelConfig) - return otelCfg + return otelConfig } -// StringAttributeCfg holds the configurable settings to create a string attribute filter +// StringAttributeConfig holds the configurable settings to create a string attribute filter // sampling policy evaluator. -type StringAttributeCfg struct { +type StringAttributeConfig struct { // Tag that the filter is going to be matching against. Key string `river:"key,attr"` // Values indicate the set of values or regular expressions to use when matching against attribute values. @@ -149,97 +159,196 @@ type StringAttributeCfg struct { InvertMatch bool `river:"invert_match,attr,optional"` } -func (stringAttributeCfg StringAttributeCfg) Convert() tsp.StringAttributeCfg { - var otelCfg tsp.StringAttributeCfg +func (stringAttributeConfig StringAttributeConfig) Convert() tsp.StringAttributeCfg { + var otelConfig tsp.StringAttributeCfg mustDecodeMapStructure(map[string]interface{}{ - "key": stringAttributeCfg.Key, - "values": stringAttributeCfg.Values, - "enabled_regex_matching": stringAttributeCfg.EnabledRegexMatching, - "cache_max_size": stringAttributeCfg.CacheMaxSize, - "invert_match": stringAttributeCfg.InvertMatch, - }, &otelCfg) + "key": stringAttributeConfig.Key, + "values": stringAttributeConfig.Values, + "enabled_regex_matching": stringAttributeConfig.EnabledRegexMatching, + "cache_max_size": stringAttributeConfig.CacheMaxSize, + "invert_match": stringAttributeConfig.InvertMatch, + }, &otelConfig) - return otelCfg + return otelConfig } -// RateLimitingCfg holds the configurable settings to create a rate limiting +// RateLimitingConfig holds the configurable settings to create a rate limiting // sampling policy evaluator. -type RateLimitingCfg struct { +type RateLimitingConfig struct { // SpansPerSecond sets the limit on the maximum nuber of spans that can be processed each second. SpansPerSecond int64 `river:"spans_per_second,attr"` } -func (rateLimitingCfg RateLimitingCfg) Convert() tsp.RateLimitingCfg { - var otelCfg tsp.RateLimitingCfg +func (rateLimitingConfig RateLimitingConfig) Convert() tsp.RateLimitingCfg { + var otelConfig tsp.RateLimitingCfg mustDecodeMapStructure(map[string]interface{}{ - "spans_per_second": rateLimitingCfg.SpansPerSecond, - }, &otelCfg) + "spans_per_second": rateLimitingConfig.SpansPerSecond, + }, &otelConfig) - return otelCfg + return otelConfig } -// SpanCountCfg holds the configurable settings to create a Span Count filter sampling policy +// SpanCountConfig holds the configurable settings to create a Span Count filter sampling policy // sampling policy evaluator -type SpanCountCfg struct { +type SpanCountConfig struct { // Minimum number of spans in a Trace MinSpans int32 `river:"min_spans,attr"` } -func (spanCountCfg SpanCountCfg) Convert() tsp.SpanCountCfg { - var otelCfg tsp.SpanCountCfg +func (spanCountConfig SpanCountConfig) Convert() tsp.SpanCountCfg { + var otelConfig tsp.SpanCountCfg + + mustDecodeMapStructure(map[string]interface{}{ + "min_spans": spanCountConfig.MinSpans, + }, &otelConfig) + + return otelConfig +} + +// BooleanAttributeConfig holds the configurable settings to create a boolean attribute filter +// sampling policy evaluator. +type BooleanAttributeConfig struct { + // Tag that the filter is going to be matching against. + Key string `river:"key,attr"` + // Value indicate the bool value, either true or false to use when matching against attribute values. + // BooleanAttribute Policy will apply exact value match on Value + Value bool `river:"value,attr"` +} + +func (booleanAttributeConfig BooleanAttributeConfig) Convert() tsp.BooleanAttributeCfg { + var otelConfig tsp.BooleanAttributeCfg + + mustDecodeMapStructure(map[string]interface{}{ + "key": booleanAttributeConfig.Key, + "value": booleanAttributeConfig.Value, + }, &otelConfig) + + return otelConfig +} + +// The error mode determines whether to ignore or propagate +// errors with evaluating OTTL conditions. +type ErrorMode string + +const ( + // "ignore" causes evaluation to continue to the next statement. + ErrorModeIgnore ErrorMode = "ignore" + // "propagate" causes the evaluation to be false and an error is returned. + ErrorModePropagate ErrorMode = "propagate" +) + +var ( + _ river.Validator = (*ErrorMode)(nil) + _ encoding.TextUnmarshaler = (*ErrorMode)(nil) +) + +// Validate implements river.Validator. +func (e *ErrorMode) Validate() error { + if e == nil { + return nil + } + + var ottlError ottl.ErrorMode + return ottlError.UnmarshalText([]byte(string(*e))) +} + +// Convert the River type to the Otel type +func (e *ErrorMode) Convert() ottl.ErrorMode { + if e == nil || *e == "" { + return ottl.ErrorMode("") + } + + var ottlError ottl.ErrorMode + err := ottlError.UnmarshalText([]byte(string(*e))) + + //TODO: Rework this to return an error instead of panicking + if err != nil { + panic(err) + } + + return ottlError +} + +func (e *ErrorMode) UnmarshalText(text []byte) error { + if e == nil { + return nil + } + + str := ErrorMode(strings.ToLower(string(text))) + switch str { + case ErrorModeIgnore, ErrorModePropagate: + *e = str + return nil + default: + return fmt.Errorf("unknown error mode %v", str) + } +} + +// OttlConditionConfig holds the configurable setting to create a OTTL condition filter +// sampling policy evaluator. +type OttlConditionConfig struct { + ErrorMode ErrorMode `river:"error_mode,attr"` + SpanConditions []string `river:"span,attr,optional"` + SpanEventConditions []string `river:"spanevent,attr,optional"` +} + +func (ottlConditionConfig OttlConditionConfig) Convert() tsp.OTTLConditionCfg { + var otelConfig tsp.OTTLConditionCfg mustDecodeMapStructure(map[string]interface{}{ - "min_spans": spanCountCfg.MinSpans, - }, &otelCfg) + "error_mode": ottlConditionConfig.ErrorMode.Convert(), + "span": ottlConditionConfig.SpanConditions, + "spanevent": ottlConditionConfig.SpanEventConditions, + }, &otelConfig) - return otelCfg + return otelConfig } -type TraceStateCfg struct { +type TraceStateConfig struct { // Tag that the filter is going to be matching against. Key string `river:"key,attr"` // Values indicate the set of values to use when matching against trace_state values. Values []string `river:"values,attr"` } -func (traceStateCfg TraceStateCfg) Convert() tsp.TraceStateCfg { - var otelCfg tsp.TraceStateCfg +func (traceStateConfig TraceStateConfig) Convert() tsp.TraceStateCfg { + var otelConfig tsp.TraceStateCfg mustDecodeMapStructure(map[string]interface{}{ - "key": traceStateCfg.Key, - "values": traceStateCfg.Values, - }, &otelCfg) + "key": traceStateConfig.Key, + "values": traceStateConfig.Values, + }, &otelConfig) - return otelCfg + return otelConfig } -// CompositeCfg holds the configurable settings to create a composite +// CompositeConfig holds the configurable settings to create a composite // sampling policy evaluator. -type CompositeCfg struct { - MaxTotalSpansPerSecond int64 `river:"max_total_spans_per_second,attr"` - PolicyOrder []string `river:"policy_order,attr"` - SubPolicyCfg []CompositeSubPolicyCfg `river:"composite_sub_policy,block,optional"` - RateAllocation []RateAllocationCfg `river:"rate_allocation,block,optional"` +type CompositeConfig struct { + MaxTotalSpansPerSecond int64 `river:"max_total_spans_per_second,attr"` + PolicyOrder []string `river:"policy_order,attr"` + SubPolicyCfg []CompositeSubPolicyConfig `river:"composite_sub_policy,block,optional"` + RateAllocation []RateAllocationConfig `river:"rate_allocation,block,optional"` } -func (compositeCfg CompositeCfg) Convert() tsp.CompositeCfg { +func (compositeConfig CompositeConfig) Convert() tsp.CompositeCfg { var otelConfig tsp.CompositeCfg var otelCompositeSubPolicyCfg []tsp.CompositeSubPolicyCfg - for _, subPolicyCfg := range compositeCfg.SubPolicyCfg { + for _, subPolicyCfg := range compositeConfig.SubPolicyCfg { otelCompositeSubPolicyCfg = append(otelCompositeSubPolicyCfg, subPolicyCfg.Convert()) } var otelRateAllocationCfg []tsp.RateAllocationCfg - for _, rateAllocation := range compositeCfg.RateAllocation { + for _, rateAllocation := range compositeConfig.RateAllocation { otelRateAllocationCfg = append(otelRateAllocationCfg, rateAllocation.Convert()) } mustDecodeMapStructure(map[string]interface{}{ - "max_total_spans_per_second": compositeCfg.MaxTotalSpansPerSecond, - "policy_order": compositeCfg.PolicyOrder, + "max_total_spans_per_second": compositeConfig.MaxTotalSpansPerSecond, + "policy_order": compositeConfig.PolicyOrder, "composite_sub_policy": otelCompositeSubPolicyCfg, "rate_allocation": otelRateAllocationCfg, }, &otelConfig) @@ -247,60 +356,62 @@ func (compositeCfg CompositeCfg) Convert() tsp.CompositeCfg { return otelConfig } -// CompositeSubPolicyCfg holds the common configuration to all policies under composite policy. -type CompositeSubPolicyCfg struct { - SharedPolicyCfg SharedPolicyCfg `river:",squash"` +// CompositeSubPolicyConfig holds the common configuration to all policies under composite policy. +type CompositeSubPolicyConfig struct { + SharedPolicyConfig SharedPolicyConfig `river:",squash"` // Configs for and policy evaluator. - AndCfg AndCfg `river:"and,block,optional"` + AndConfig AndConfig `river:"and,block,optional"` } -func (compositeSubPolicyCfg CompositeSubPolicyCfg) Convert() tsp.CompositeSubPolicyCfg { - var otelCfg tsp.CompositeSubPolicyCfg +func (compositeSubPolicyConfig CompositeSubPolicyConfig) Convert() tsp.CompositeSubPolicyCfg { + var otelConfig tsp.CompositeSubPolicyCfg mustDecodeMapStructure(map[string]interface{}{ - "name": compositeSubPolicyCfg.SharedPolicyCfg.Name, - "type": compositeSubPolicyCfg.SharedPolicyCfg.Type, - "latency": compositeSubPolicyCfg.SharedPolicyCfg.LatencyCfg.Convert(), - "numeric_attribute": compositeSubPolicyCfg.SharedPolicyCfg.NumericAttributeCfg.Convert(), - "probabilistic": compositeSubPolicyCfg.SharedPolicyCfg.ProbabilisticCfg.Convert(), - "status_code": compositeSubPolicyCfg.SharedPolicyCfg.StatusCodeCfg.Convert(), - "string_attribute": compositeSubPolicyCfg.SharedPolicyCfg.StringAttributeCfg.Convert(), - "rate_limiting": compositeSubPolicyCfg.SharedPolicyCfg.RateLimitingCfg.Convert(), - "span_count": compositeSubPolicyCfg.SharedPolicyCfg.SpanCountCfg.Convert(), - "trace_state": compositeSubPolicyCfg.SharedPolicyCfg.TraceStateCfg.Convert(), - "and": compositeSubPolicyCfg.AndCfg.Convert(), - }, &otelCfg) - - return otelCfg -} - -// RateAllocationCfg used within composite policy -type RateAllocationCfg struct { + "name": compositeSubPolicyConfig.SharedPolicyConfig.Name, + "type": compositeSubPolicyConfig.SharedPolicyConfig.Type, + "latency": compositeSubPolicyConfig.SharedPolicyConfig.LatencyConfig.Convert(), + "numeric_attribute": compositeSubPolicyConfig.SharedPolicyConfig.NumericAttributeConfig.Convert(), + "probabilistic": compositeSubPolicyConfig.SharedPolicyConfig.ProbabilisticConfig.Convert(), + "status_code": compositeSubPolicyConfig.SharedPolicyConfig.StatusCodeConfig.Convert(), + "string_attribute": compositeSubPolicyConfig.SharedPolicyConfig.StringAttributeConfig.Convert(), + "rate_limiting": compositeSubPolicyConfig.SharedPolicyConfig.RateLimitingConfig.Convert(), + "span_count": compositeSubPolicyConfig.SharedPolicyConfig.SpanCountConfig.Convert(), + "boolean_attribute": compositeSubPolicyConfig.SharedPolicyConfig.BooleanAttributeConfig.Convert(), + "ottl_condition": compositeSubPolicyConfig.SharedPolicyConfig.OttlConditionConfig.Convert(), + "trace_state": compositeSubPolicyConfig.SharedPolicyConfig.TraceStateConfig.Convert(), + "and": compositeSubPolicyConfig.AndConfig.Convert(), + }, &otelConfig) + + return otelConfig +} + +// RateAllocationConfig used within composite policy +type RateAllocationConfig struct { Policy string `river:"policy,attr"` Percent int64 `river:"percent,attr"` } -func (rateAllocationCfg RateAllocationCfg) Convert() tsp.RateAllocationCfg { - var otelCfg tsp.RateAllocationCfg +func (rateAllocationConfig RateAllocationConfig) Convert() tsp.RateAllocationCfg { + var otelConfig tsp.RateAllocationCfg mustDecodeMapStructure(map[string]interface{}{ - "policy": rateAllocationCfg.Policy, - "percent": rateAllocationCfg.Percent, - }, &otelCfg) + "policy": rateAllocationConfig.Policy, + "percent": rateAllocationConfig.Percent, + }, &otelConfig) - return otelCfg + return otelConfig } -type AndCfg struct { - SubPolicyCfg []AndSubPolicyCfg `river:"and_sub_policy,block"` +type AndConfig struct { + SubPolicyConfig []AndSubPolicyConfig `river:"and_sub_policy,block"` } -func (andCfg AndCfg) Convert() tsp.AndCfg { +func (andConfig AndConfig) Convert() tsp.AndCfg { var otelConfig tsp.AndCfg var otelPolicyCfgs []tsp.AndSubPolicyCfg - for _, subPolicyCfg := range andCfg.SubPolicyCfg { + for _, subPolicyCfg := range andConfig.SubPolicyConfig { otelPolicyCfgs = append(otelPolicyCfgs, subPolicyCfg.Convert()) } @@ -311,33 +422,37 @@ func (andCfg AndCfg) Convert() tsp.AndCfg { return otelConfig } -// AndSubPolicyCfg holds the common configuration to all policies under and policy. -type AndSubPolicyCfg struct { - SharedPolicyCfg SharedPolicyCfg `river:",squash"` +// AndSubPolicyConfig holds the common configuration to all policies under and policy. +type AndSubPolicyConfig struct { + SharedPolicyConfig SharedPolicyConfig `river:",squash"` } -func (andSubPolicyCfg AndSubPolicyCfg) Convert() tsp.AndSubPolicyCfg { - var otelCfg tsp.AndSubPolicyCfg +func (andSubPolicyConfig AndSubPolicyConfig) Convert() tsp.AndSubPolicyCfg { + var otelConfig tsp.AndSubPolicyCfg mustDecodeMapStructure(map[string]interface{}{ - "name": andSubPolicyCfg.SharedPolicyCfg.Name, - "type": andSubPolicyCfg.SharedPolicyCfg.Type, - "latency": andSubPolicyCfg.SharedPolicyCfg.LatencyCfg.Convert(), - "numeric_attribute": andSubPolicyCfg.SharedPolicyCfg.NumericAttributeCfg.Convert(), - "probabilistic": andSubPolicyCfg.SharedPolicyCfg.ProbabilisticCfg.Convert(), - "status_code": andSubPolicyCfg.SharedPolicyCfg.StatusCodeCfg.Convert(), - "string_attribute": andSubPolicyCfg.SharedPolicyCfg.StringAttributeCfg.Convert(), - "rate_limiting": andSubPolicyCfg.SharedPolicyCfg.RateLimitingCfg.Convert(), - "span_count": andSubPolicyCfg.SharedPolicyCfg.SpanCountCfg.Convert(), - "trace_state": andSubPolicyCfg.SharedPolicyCfg.TraceStateCfg.Convert(), - }, &otelCfg) + "name": andSubPolicyConfig.SharedPolicyConfig.Name, + "type": andSubPolicyConfig.SharedPolicyConfig.Type, + "latency": andSubPolicyConfig.SharedPolicyConfig.LatencyConfig.Convert(), + "numeric_attribute": andSubPolicyConfig.SharedPolicyConfig.NumericAttributeConfig.Convert(), + "probabilistic": andSubPolicyConfig.SharedPolicyConfig.ProbabilisticConfig.Convert(), + "status_code": andSubPolicyConfig.SharedPolicyConfig.StatusCodeConfig.Convert(), + "string_attribute": andSubPolicyConfig.SharedPolicyConfig.StringAttributeConfig.Convert(), + "rate_limiting": andSubPolicyConfig.SharedPolicyConfig.RateLimitingConfig.Convert(), + "span_count": andSubPolicyConfig.SharedPolicyConfig.SpanCountConfig.Convert(), + "boolean_attribute": andSubPolicyConfig.SharedPolicyConfig.BooleanAttributeConfig.Convert(), + "ottl_condition": andSubPolicyConfig.SharedPolicyConfig.OttlConditionConfig.Convert(), + "trace_state": andSubPolicyConfig.SharedPolicyConfig.TraceStateConfig.Convert(), + }, &otelConfig) - return otelCfg + return otelConfig } -func mustDecodeMapStructure(source map[string]interface{}, otelCfg interface{}) { - err := mapstructure.Decode(source, otelCfg) +// TODO: Why do we do this? Can we not just create the Otel types directly? +func mustDecodeMapStructure(source map[string]interface{}, otelConfig interface{}) { + err := mapstructure.Decode(source, otelConfig) + //TODO: Rework this to return an error instead of panicking if err != nil { panic(err) } diff --git a/component/otelcol/receiver/jaeger/jaeger.go b/component/otelcol/receiver/jaeger/jaeger.go index ce8dfd306775..9d483c608845 100644 --- a/component/otelcol/receiver/jaeger/jaeger.go +++ b/component/otelcol/receiver/jaeger/jaeger.go @@ -3,7 +3,6 @@ package jaeger import ( "fmt" - "time" "github.com/alecthomas/units" "github.com/grafana/agent/component" @@ -30,8 +29,7 @@ func init() { // Arguments configures the otelcol.receiver.jaeger component. type Arguments struct { - Protocols ProtocolsArguments `river:"protocols,block"` - RemoteSampling *RemoteSamplingArguments `river:"remote_sampling,block,optional"` + Protocols ProtocolsArguments `river:"protocols,block"` // Output configures where to send received data. Required. Output *otelcol.ConsumerArguments `river:"output,block"` @@ -63,16 +61,12 @@ func (args Arguments) Convert() (otelcomponent.Config, error) { ThriftBinary: args.Protocols.ThriftBinary.Convert(), ThriftCompact: args.Protocols.ThriftCompact.Convert(), }, - RemoteSampling: args.RemoteSampling.Convert(), }, nil } // Extensions implements receiver.Arguments. func (args Arguments) Extensions() map[otelcomponent.ID]otelextension.Extension { - if args.RemoteSampling == nil { - return nil - } - return args.RemoteSampling.Client.Extensions() + return nil } // Exporters implements receiver.Arguments. @@ -216,31 +210,3 @@ func (args *ThriftBinary) Convert() *jaegerreceiver.ProtocolUDP { return args.ProtocolUDP.Convert() } - -// RemoteSamplingArguments configures remote sampling settings. -type RemoteSamplingArguments struct { - // TODO(rfratto): can we work with upstream to provide a hook to provide a - // custom strategy file and bypass the reload interval? - // - // That would let users connect a local.file to otelcol.receiver.jaeger for - // the remote sampling. - - HostEndpoint string `river:"host_endpoint,attr"` - StrategyFile string `river:"strategy_file,attr"` - StrategyFileReloadInterval time.Duration `river:"strategy_file_reload_interval,attr"` - Client otelcol.GRPCClientArguments `river:"client,block"` -} - -// Convert converts args into the upstream type. -func (args *RemoteSamplingArguments) Convert() *jaegerreceiver.RemoteSamplingConfig { - if args == nil { - return nil - } - - return &jaegerreceiver.RemoteSamplingConfig{ - HostEndpoint: args.HostEndpoint, - StrategyFile: args.StrategyFile, - StrategyFileReloadInterval: args.StrategyFileReloadInterval, - GRPCClientSettings: *args.Client.Convert(), - } -} diff --git a/component/otelcol/receiver/kafka/kafka.go b/component/otelcol/receiver/kafka/kafka.go index af86cbb3a4cc..313387a699ff 100644 --- a/component/otelcol/receiver/kafka/kafka.go +++ b/component/otelcol/receiver/kafka/kafka.go @@ -34,6 +34,7 @@ type Arguments struct { Encoding string `river:"encoding,attr,optional"` GroupID string `river:"group_id,attr,optional"` ClientID string `river:"client_id,attr,optional"` + InitialOffset string `river:"initial_offset,attr,optional"` Authentication AuthenticationArguments `river:"authentication,block,optional"` Metadata MetadataArguments `river:"metadata,block,optional"` @@ -54,11 +55,12 @@ var DefaultArguments = Arguments{ // for compatibility, even though that means using a client and group ID of // "otel-collector". - Topic: "otlp_spans", - Encoding: "otlp_proto", - Brokers: []string{"localhost:9092"}, - ClientID: "otel-collector", - GroupID: "otel-collector", + Topic: "otlp_spans", + Encoding: "otlp_proto", + Brokers: []string{"localhost:9092"}, + ClientID: "otel-collector", + GroupID: "otel-collector", + InitialOffset: "latest", Metadata: MetadataArguments{ IncludeAllTopics: true, Retry: MetadataRetryArguments{ @@ -90,6 +92,7 @@ func (args Arguments) Convert() (otelcomponent.Config, error) { Encoding: args.Encoding, GroupID: args.GroupID, ClientID: args.ClientID, + InitialOffset: args.InitialOffset, Authentication: args.Authentication.Convert(), Metadata: args.Metadata.Convert(), diff --git a/docs/sources/flow/reference/components/otelcol.auth.basic.md b/docs/sources/flow/reference/components/otelcol.auth.basic.md index 181f7349ae9d..894d9090259a 100644 --- a/docs/sources/flow/reference/components/otelcol.auth.basic.md +++ b/docs/sources/flow/reference/components/otelcol.auth.basic.md @@ -7,6 +7,8 @@ title: otelcol.auth.basic `otelcol.auth.basic` exposes a `handler` that can be used by other `otelcol` components to authenticate requests using basic authentication. +This extension supports both server and client authentication. + > **NOTE**: `otelcol.auth.basic` is a wrapper over the upstream OpenTelemetry > Collector `basicauth` extension. Bug reports or feature requests will be > redirected to the upstream repository, if necessary. diff --git a/docs/sources/flow/reference/components/otelcol.auth.bearer.md b/docs/sources/flow/reference/components/otelcol.auth.bearer.md index a3dd7a32381a..2ad7706b65d9 100644 --- a/docs/sources/flow/reference/components/otelcol.auth.bearer.md +++ b/docs/sources/flow/reference/components/otelcol.auth.bearer.md @@ -7,6 +7,8 @@ title: otelcol.auth.bearer `otelcol.auth.bearer` exposes a `handler` that can be used by other `otelcol` components to authenticate requests using bearer token authentication. +This extension supports both server and client authentication. + > **NOTE**: `otelcol.auth.bearer` is a wrapper over the upstream OpenTelemetry > Collector `bearertokenauth` extension. Bug reports or feature requests will > be redirected to the upstream repository, if necessary. diff --git a/docs/sources/flow/reference/components/otelcol.auth.headers.md b/docs/sources/flow/reference/components/otelcol.auth.headers.md index 8ebfe988b663..990274ec249b 100644 --- a/docs/sources/flow/reference/components/otelcol.auth.headers.md +++ b/docs/sources/flow/reference/components/otelcol.auth.headers.md @@ -51,6 +51,13 @@ Name | Type | Description | Default | Required `key` | `string` | Name of the header to set. | | yes `value` | `string` or `secret` | Value of the header. | | no `from_context` | `string` | Metadata name to get header value from. | | no +`action` | `string` | An action to perform on the header | "upsert" | no + +The supported values for `action` are: +* `insert`: Inserts the new header if it does not exist. +* `update`: Updates the header value if it exists. +* `upsert`: Inserts a header if it does not exist and updates the header if it exists. +* `delete`: Deletes the header. Exactly one of `value` or `from_context` must be provided for each `header` block. diff --git a/docs/sources/flow/reference/components/otelcol.exporter.jaeger.md b/docs/sources/flow/reference/components/otelcol.exporter.jaeger.md index e5d5474b0449..4dac30408b60 100644 --- a/docs/sources/flow/reference/components/otelcol.exporter.jaeger.md +++ b/docs/sources/flow/reference/components/otelcol.exporter.jaeger.md @@ -7,9 +7,11 @@ title: otelcol.exporter.jaeger `otelcol.exporter.jaeger` accepts telemetry data from other `otelcol` components and writes them over the network using the Jaeger protocol. -> **NOTE**: `otelcol.exporter.jaeger` is a wrapper over the upstream -> OpenTelemetry Collector `jaeger` exporter. Bug reports or feature requests will -> be redirected to the upstream repository, if necessary. +> **NOTE**: `otelcol.exporter.jaeger` is a wrapper over the +> [upstream](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/exporter/jaegerexporter) +> OpenTelemetry Collector `jaeger` exporter. The upstream +> exporter has been deprecated and will be removed from future versions of +> both OpenTelemetry Collector and Grafana Agent because Jaeger supports OTLP directly. Multiple `otelcol.exporter.jaeger` components can be specified by giving them different labels. diff --git a/docs/sources/flow/reference/components/otelcol.processor.batch.md b/docs/sources/flow/reference/components/otelcol.processor.batch.md index ee5e22b56e1f..848d1c79670a 100644 --- a/docs/sources/flow/reference/components/otelcol.processor.batch.md +++ b/docs/sources/flow/reference/components/otelcol.processor.batch.md @@ -7,7 +7,13 @@ title: otelcol.processor.batch `otelcol.processor.batch` accepts telemetry data from other `otelcol` components and places them into batches. Batching improves the compression of data and reduces the number of outgoing network requests required to transmit -data. +data. This processor supports both size and time based batching. + +We strongly recommend that you configure the batch processor on every Agent that +uses OpenTelemetry (otelcol) Flow components. The batch processor should be +defined in the pipeline after the `otelcol.processor.memory_limiter` as well +as any sampling processors. This is because batching should happen after any +data drops such as sampling. > **NOTE**: `otelcol.processor.batch` is a wrapper over the upstream > OpenTelemetry Collector `batch` processor. Bug reports or feature requests @@ -37,6 +43,8 @@ Name | Type | Description | Default | Required `timeout` | `duration` | How long to wait before flushing the batch. | `"200ms"` | no `send_batch_size` | `number` | Amount of data to buffer before flushing the batch. | `8192` | no `send_batch_max_size` | `number` | Upper limit of a batch size. | `0` | no +`metadata_keys` | `list(string)` | Creates a different batcher for each key/value combination of metadata. | `[]` | no +`metadata_cardinality_limit` | `number` | Limit of the unique metadata key/value combinations. | `1000` | no `otelcol.processor.batch` accumulates data into a batch until one of the following events happens: @@ -57,6 +65,19 @@ as a single batch. `send_batch_max_size` constrains how big a batch can get. When set to a non-zero value, `send_batch_max_size` must be greater or equal to `send_batch_size`. +`metadata_cardinality_limit` applies for the lifetime of the process. + +Receivers should be configured with `include_metadata = true` so that metadata +keys are available to the processor. + +Each distinct combination of metadata triggers the allocation of a new +background task in the Agent that runs for the lifetime of the process, and each +background task holds one pending batch of up to `send_batch_size` records. Batching +by metadata can therefore substantially increase the amount of memory dedicated to batching. + +The maximum number of distinct combinations is limited to the configured `metadata_cardinality_limit`, +which defaults to 1000 to limit memory impact. + ## Blocks The following blocks are supported inside the definition of @@ -93,7 +114,9 @@ configuration. `otelcol.processor.batch` does not expose any component-specific debug information. -## Example +## Examples + +### Basic usage This example batches telemetry data before sending it to [otelcol.exporter.otlp][] for further processing: @@ -114,4 +137,43 @@ otelcol.exporter.otlp "production" { } ``` +### Batching based on metadata + +Batching by metadata enables support for multi-tenant OpenTelemetry pipelines +with batching over groups of data having the same authorization metadata. + +```river +otelcol.receiver.jaeger "default" { + protocols { + grpc { + include_metadata = true + } + thrift_http {} + thrift_binary {} + thrift_compact {} + } + + output { + traces = [otelcol.processor.batch.default.input] + } +} + +otelcol.processor.batch "default" { + // batch data by tenant id + metadata_keys = ["tenant_id"] + // limit to 10 batcher processes before raising errors + metadata_cardinality_limit = 123 + + output { + traces = [otelcol.exporter.otlp.production.input] + } +} + +otelcol.exporter.otlp "production" { + client { + endpoint = env("OTLP_SERVER_ENDPOINT") + } +} +``` + [otelcol.exporter.otlp]: {{< relref "./otelcol.exporter.otlp.md" >}} diff --git a/docs/sources/flow/reference/components/otelcol.processor.tail_sampling.md b/docs/sources/flow/reference/components/otelcol.processor.tail_sampling.md index 976555eef78e..741c25bd330e 100644 --- a/docs/sources/flow/reference/components/otelcol.processor.tail_sampling.md +++ b/docs/sources/flow/reference/components/otelcol.processor.tail_sampling.md @@ -73,6 +73,8 @@ policy > status_code | [status_code] | policy > string_attribute | [string_attribute] | The policy will sample based on string attributes (resource and record) value matches. | no policy > rate_limiting | [rate_limiting] | The policy will sample based on rate. | no policy > span_count | [span_count] | The policy will sample based on the minimum number of spans within a batch. | no +policy > boolean_attribute | [boolean_attribute] | The policy will sample based on a boolean attribute (resource and record). | no +policy > ottl_condition | [ottl_condition] | The policy will sample based on a given boolean OTTL condition (span and span event).| no policy > trace_state | [trace_state] | The policy will sample based on TraceState value matches. | no policy > and | [and] | The policy will sample based on multiple policies, creates an `and` policy. | no policy > and > and_sub_policy | [and_sub_policy] [] | A set of policies underneath an `and` policy type. | no @@ -83,6 +85,8 @@ policy > and > and_sub_policy > status_code | [status_code] | policy > and > and_sub_policy > string_attribute | [string_attribute] | The policy will sample based on string attributes (resource and record) value matches. | no policy > and > and_sub_policy > rate_limiting | [rate_limiting] | The policy will sample based on rate. | no policy > and > and_sub_policy > span_count | [span_count] | The policy will sample based on the minimum number of spans within a batch. | no +policy > and > and_sub_policy > boolean_attribute | [boolean_attribute] | The policy will sample based on a boolean attribute (resource and record). | no +policy > and > and_sub_policy > ottl_condition | [ottl_condition] | The policy will sample based on a given boolean OTTL condition (span and span event). | no policy > and > and_sub_policy > trace_state | [trace_state] | The policy will sample based on TraceState value matches. | no policy > composite | [composite] | This policy will sample based on a combination of above samplers, with ordering and rate allocation per sampler. | no policy > composite > composite_sub_policy | [composite_sub_policy] [] | A set of policies underneath a `composite` policy type. | no @@ -93,6 +97,8 @@ policy > composite > composite_sub_policy > status_code | [status_code] | policy > composite > composite_sub_policy > string_attribute | [string_attribute] | The policy will sample based on string attributes (resource and record) value matches. | no policy > composite > composite_sub_policy > rate_limiting | [rate_limiting] | The policy will sample based on rate. | no policy > composite > composite_sub_policy > span_count | [span_count] | The policy will sample based on the minimum number of spans within a batch. | no +policy > composite > composite_sub_policy > boolean_attribute | [boolean_attribute] | The policy will sample based on a boolean attribute (resource and record). | no +policy > composite > composite_sub_policy > ottl_condition | [ottl_condition] | The policy will sample based on a given boolean OTTL condition (span and span event). | no policy > composite > composite_sub_policy > trace_state | [trace_state] | The policy will sample based on TraceState value matches. | no output | [output] [] | Configures where to send received telemetry data. | yes @@ -104,6 +110,8 @@ output | [output] [] | Co [string_attribute]: #string_attribute-block [rate_limiting]: #rate_limiting-block [span_count]: #span_count-block +[boolean_attribute]: #boolean_attribute-block +[ottl_condition]: #ottl_condition-block [trace_state]: #trace_state-block [and]: #and-block [and_sub_policy]: #and_sub_policy-block @@ -215,6 +223,37 @@ Name | Type | Description | Default | Required ---- | ---- | ----------- | ------- | -------- `min_spans` | `number` | Minimum number of spans in a trace. | | yes +### boolean_attribute block + +The `boolean_attribute` block configures a policy of type `boolean_attribute`. +The policy samples based on a boolean attribute (resource and record). + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`key` | `string` | Attribute key to match against. | | yes +`value` | `bool` | The bool value (`true` or `false`) to use when matching against attribute values. | | yes + +### ottl_condition block + +The `ottl_condition` block configures a policy of type `ottl_condition`. The policy samples based on a given boolean +[OTTL](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl) condition (span and span event). + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`error_mode` | `string` | Error handling if OTTL conditions fail to evaluate. | | yes +`span` | `list(string)` | OTTL conditions for spans. | `[]` | no +`spanevent` | `list(string)` | OTTL conditions for span events. | `[]` | no + +The supported values for `error_mode` are: +* `ignore`: Errors cause evaluation to continue to the next statement. +* `propagate`: Errors cause the evaluation to be false and an error is returned. + +At least one of `span` or `spanevent` should be specified. Both `span` and `spanevent` can also be specified. + ### trace_state block The `trace_state` block configures a policy of type `trace_state`. The policy samples based on TraceState value matches. @@ -406,6 +445,22 @@ otelcol.processor.tail_sampling "default" { } } + policy { + name = "test-policy-12" + type = "ottl_condition" + ottl_condition { + error_mode = "ignore" + span = [ + "attributes[\"test_attr_key_1\"] == \"test_attr_val_1\"", + "attributes[\"test_attr_key_2\"] != \"test_attr_val_1\"", + ] + spanevent = [ + "name != \"test_span_event_name\"", + "attributes[\"test_event_attr_key_2\"] != \"test_event_attr_val_1\"", + ] + } + } + policy { name = "and-policy-1" type = "and" diff --git a/docs/sources/flow/reference/components/otelcol.receiver.kafka.md b/docs/sources/flow/reference/components/otelcol.receiver.kafka.md index 4fce03ce3164..6b65757ea13e 100644 --- a/docs/sources/flow/reference/components/otelcol.receiver.kafka.md +++ b/docs/sources/flow/reference/components/otelcol.receiver.kafka.md @@ -42,6 +42,7 @@ Name | Type | Description | Default | Required `encoding` | `string` | Encoding of payload read from Kafka. | `"otlp_proto"` | no `group_id` | `string` | Consumer group to consume messages from. | `"otel-collector"` | no `client_id` | `string` | Consumer client ID to use. | `"otel-collector"` | no +`initial_offset` | `string` | Initial offset to use if no offset was previously committed. | `"latest"` | no The `encoding` argument determines how to decode messages read from Kafka. `encoding` must be one of the following strings: @@ -52,11 +53,15 @@ The `encoding` argument determines how to decode messages read from Kafka. * `"zipkin_proto"`: Decode messages as a list of Zipkin protobuf spans. * `"zipkin_json"`: Decode messages as a list of Zipkin JSON spans. * `"zipkin_thrift"`: Decode messages as a list of Zipkin Thrift spans. -* `"raw"`: Copy the message bytes into the body of a log record. +* `"raw"`: Copy the log message bytes into the body of a log record. +* `"text"`: Decode the log message as text and insert it into the body of a log record. +By default, UTF-8 is used to decode. A different encoding can be chosen by using `text_`. For example, `text_utf-8` or `text_shift_jis`. `"otlp_proto"` must be used to read all telemetry types from Kafka; other encodings are signal-specific. +`initial_offset` must be either `"latest"` or `"earliest"`. + ## Blocks The following blocks are supported inside the definition of diff --git a/docs/sources/flow/upgrade-guide.md b/docs/sources/flow/upgrade-guide.md index 8847d4b90da4..ff6cf5cc101b 100644 --- a/docs/sources/flow/upgrade-guide.md +++ b/docs/sources/flow/upgrade-guide.md @@ -18,6 +18,45 @@ Grafana Agent Flow. > [upgrade-guide-static]: {{< relref "../static/upgrade-guide.md" >}} > [upgrade-guide-operator]: {{< relref "../operator/upgrade-guide.md" >}} +## Main (unreleased) + +### Breaking change: The algorithm for the "hash" action of `otelcol.processor.attributes` has changed +The hash produced when using `action = "hash"` in the `otelcol.processor.attributes` flow component is now using the more secure SHA-256 algorithm. +The change was made in PR [#22831](https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/22831) of opentelemetry-collector-contrib. + +### Breaking change: `otelcol.exporter.loki` now includes instrumentation scope in its output + +Additional `instrumentation_scope` information will be added to the OTLP log signal, like this: +``` +{ + "body": "Example log", + "traceid": "01020304000000000000000000000000", + "spanid": "0506070800000000", + "severity": "error", + "attributes": { + "attr1": "1", + "attr2": "2" + }, + "resources": { + "host.name": "something" + }, + "instrumentation_scope": { + "name": "example-logger-name", + "version": "v1" + } +} +``` + +### Breaking change: `otelcol.extension.jaeger_remote_sampling` removes the `/` HTTP endpoint + +The `/` HTTP endpoint was the same as the `/sampling` endpoint. The `/sampling` endpoint is still functional. +The change was made in PR [#18070](https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/18070) of opentelemetry-collector-contrib. + +### The `remote_sampling` block has been removed from `otelcol.receiver.jaeger` + +The `remote_sampling` block in `otelcol.receiver.jaeger` has been an undocumented no-op configuration for some time, and has now been removed. +Customers are advised to use `otelcol.extension.jaeger_remote_sampling` instead. + ## v0.35 ### Breaking change: `auth` and `version` attributes from `walk_params` block of `prometheus.exporter.snmp` have been removed diff --git a/docs/sources/static/upgrade-guide.md b/docs/sources/static/upgrade-guide.md index f6126d4eb2f2..f5dfae40276c 100644 --- a/docs/sources/static/upgrade-guide.md +++ b/docs/sources/static/upgrade-guide.md @@ -19,6 +19,37 @@ static mode. > [upgrade-guide-operator]: {{< relref "../operator/upgrade-guide.md" >}} > [upgrade-guide-flow]: {{< relref "../flow/upgrade-guide.md" >}} +## Main (unreleased) + +### Breaking change: Removed and renamed tracing metrics + +In the traces subsystem for Static mode some metrics are removed and others are renamed. +The reason for the removal is a bug which caused the metrics to be incorrect if more than one instance of a traces configuration is specified. + +Removed metrics: +- "blackbox_exporter_config_last_reload_success_timestamp_seconds" (gauge) +- "blackbox_exporter_config_last_reload_successful" (gauge) +- "blackbox_module_unknown_total" (counter) +- "traces_processor_tail_sampling_count_traces_sampled" (counter) +- "traces_processor_tail_sampling_new_trace_id_received" (counter) +- "traces_processor_tail_sampling_sampling_decision_latency" (histogram) +- "traces_processor_tail_sampling_sampling_decision_timer_latency" (histogram) +- "traces_processor_tail_sampling_sampling_policy_evaluation_error" (counter) +- "traces_processor_tail_sampling_sampling_trace_dropped_too_early" (counter) +- "traces_processor_tail_sampling_sampling_traces_on_memory" (gauge) +- "traces_receiver_accepted_spans" (counter) +- "traces_receiver_refused_spans" (counter) +- "traces_exporter_enqueue_failed_log_records" (counter) +- "traces_exporter_enqueue_failed_metric_points" (counter) +- "traces_exporter_enqueue_failed_spans" (counter) +- "traces_exporter_queue_capacity" (gauge) +- "traces_exporter_queue_size" (gauge) + +Renamed metrics: +- "traces_receiver_refused_spans" is renamed to "traces_receiver_refused_spans_total" +- "traces_receiver_accepted_spans" is renamed to "traces_receiver_refused_spans_total" +- "traces_exporter_sent_metric_points" is renamed to "traces_exporter_sent_metric_points_total" + ## v0.35 ### Breaking change: `auth` and `version` attributes from `walk_params` block of SNMP integration have been removed diff --git a/go.mod b/go.mod index 109727abf2f9..cf0047b93334 100644 --- a/go.mod +++ b/go.mod @@ -694,14 +694,13 @@ replace ( // Excluding fixes a conflict in test packages and allows "go mod tidy" to run. exclude google.golang.org/grpc/examples v0.0.0-20200728065043-dfc0c05b2da9 -// Replacing for an internal fork that exposes internal folders -// Some funtionalities of the collector have been made internal and it's more difficult to build and configure pipelines in the newer versions. -// This is a temporary solution while a new configuration design is discussed for the collector (ref: https://github.com/open-telemetry/opentelemetry-collector/issues/3482). -replace ( - go.opentelemetry.io/collector => github.com/grafana/opentelemetry-collector v0.4.1-0.20230630115148-4bcac41449bb - go.opentelemetry.io/collector/featuregate => github.com/grafana/opentelemetry-collector/featuregate v0.0.0-20230630115148-4bcac41449bb - go.opentelemetry.io/collector/pdata => github.com/grafana/opentelemetry-collector/pdata v0.0.0-20230630115148-4bcac41449bb -) +// Replacing for an internal fork which allows us to observe metrics produced by the Collector. +// This is a temporary solution while a new configuration design is discussed for the collector. Related issues: +// https://github.com/open-telemetry/opentelemetry-collector/issues/7532 +// https://github.com/open-telemetry/opentelemetry-collector/pull/7644 +// https://github.com/open-telemetry/opentelemetry-collector/pull/7696 +// https://github.com/open-telemetry/opentelemetry-collector/issues/4970 +replace go.opentelemetry.io/collector => github.com/grafana/opentelemetry-collector v0.4.1-0.20230705075940-537e02565521 // Required until https://github.com/weaveworks/common/pull/240 is merged replace google.golang.org/grpc => google.golang.org/grpc v1.45.0 diff --git a/go.sum b/go.sum index 1541ec9096db..e91c804aa698 100644 --- a/go.sum +++ b/go.sum @@ -1744,12 +1744,8 @@ github.com/grafana/loki/pkg/push v0.0.0-20230127102416-571f88bc5765 h1:VXitROTlm github.com/grafana/loki/pkg/push v0.0.0-20230127102416-571f88bc5765/go.mod h1:DhJMrd2QInI/1CNtTN43BZuTmkccdizW1jZ+F6aHkhY= github.com/grafana/mysqld_exporter v0.12.2-0.20201015182516-5ac885b2d38a h1:D5NSR64/6xMXnSFD9y1m1DPYIcBcHvtfeuI9/M/0qtI= github.com/grafana/mysqld_exporter v0.12.2-0.20201015182516-5ac885b2d38a/go.mod h1:rjb/swXiCWLlC3gWlyugy/xEOZioF5PclbB8sf/9p/Q= -github.com/grafana/opentelemetry-collector v0.4.1-0.20230630115148-4bcac41449bb h1:B8n6Yqhqrd3wkpNMBqpBZd5mFhvLOc617KVBxeireHc= -github.com/grafana/opentelemetry-collector v0.4.1-0.20230630115148-4bcac41449bb/go.mod h1:WuFR0SJJbUsBF7YF8w0vjvfoxodzIAkFt+HIeF9eQ5A= -github.com/grafana/opentelemetry-collector/featuregate v0.0.0-20230630115148-4bcac41449bb h1:GecVP0/p7APPxfNssL3EKiCQVAs/cY/fOsi8ouhKO3s= -github.com/grafana/opentelemetry-collector/featuregate v0.0.0-20230630115148-4bcac41449bb/go.mod h1:0mE3mDLmUrOXVoNsuvj+7dV14h/9HFl/Fy9YTLoLObo= -github.com/grafana/opentelemetry-collector/pdata v0.0.0-20230630115148-4bcac41449bb h1:/vcyMvHh7wDMQCYs2bY7euMmAwP8Cvm+03t7y9r7+hM= -github.com/grafana/opentelemetry-collector/pdata v0.0.0-20230630115148-4bcac41449bb/go.mod h1:x09G/4KjEcDKNuWCjC5ZtnuDE0XEqiRwI+yrHSVjIy8= +github.com/grafana/opentelemetry-collector v0.4.1-0.20230705075940-537e02565521 h1:br7vc9p+3OoKdhCyjtxAxutrqNaqicQfDKuW7b3rDec= +github.com/grafana/opentelemetry-collector v0.4.1-0.20230705075940-537e02565521/go.mod h1:WuFR0SJJbUsBF7YF8w0vjvfoxodzIAkFt+HIeF9eQ5A= github.com/grafana/perflib_exporter v0.1.1-0.20230511173423-6166026bd090 h1:Ko80Xcl7xo1eYqkqLUb9AVVCLGVmuQp2jOV69hEEeZw= github.com/grafana/perflib_exporter v0.1.1-0.20230511173423-6166026bd090/go.mod h1:MinSWm88jguXFFrGsP56PtleUb4Qtm4tNRH/wXNXRTI= github.com/grafana/phlare/api v0.1.2 h1:1jrwd3KnsXMzj/tJih9likx5EvbY3pbvLbDqAAYem30= @@ -3309,6 +3305,10 @@ go.opentelemetry.io/collector/extension v0.80.0/go.mod h1:r61aYWq9NYZP22+LVqiLOE go.opentelemetry.io/collector/extension/auth v0.80.0 h1:BElM8HXYVho2ZikMS8OpQQjmaMizB3qFGJ+kGZ4cyoI= go.opentelemetry.io/collector/extension/auth v0.80.0/go.mod h1:wDpwb37PxV/aH/kecpPXtJqGSmiOYUyeLuQvRmWciAA= go.opentelemetry.io/collector/extension/zpagesextension v0.80.0 h1:30cJEhbyvnGdFjV+AiOW6l84GCMaU4bb1kdX2VcGzH4= +go.opentelemetry.io/collector/featuregate v1.0.0-rcv0013 h1:tiTUG9X/gEDN1oDYQOBVUFYQfhUG2CvgW9VhBc2uk1U= +go.opentelemetry.io/collector/featuregate v1.0.0-rcv0013/go.mod h1:0mE3mDLmUrOXVoNsuvj+7dV14h/9HFl/Fy9YTLoLObo= +go.opentelemetry.io/collector/pdata v1.0.0-rcv0013 h1:4sONXE9hAX+4Di8m0bQ/KaoH3Mi+OPt04cXkZ7A8W3k= +go.opentelemetry.io/collector/pdata v1.0.0-rcv0013/go.mod h1:x09G/4KjEcDKNuWCjC5ZtnuDE0XEqiRwI+yrHSVjIy8= go.opentelemetry.io/collector/processor v0.80.0 h1:UUKPh2E/pIru2WChWiFVrugc48m2Al5kHQ0d66cxh6w= go.opentelemetry.io/collector/processor v0.80.0/go.mod h1:ZFtJUIgpJmgkSnBGd4F+q4zb1TrhrQsNHGdDmijYlU8= go.opentelemetry.io/collector/processor/batchprocessor v0.80.0 h1:B3phgqGYRbAM1ZjR7OlAS4hK1XcM2/FDLvzOCycyE6A= diff --git a/pkg/traces/instance.go b/pkg/traces/instance.go index 0075112c466e..ca0eb8cd0fe8 100644 --- a/pkg/traces/instance.go +++ b/pkg/traces/instance.go @@ -166,7 +166,8 @@ func (i *Instance) buildAndStartPipeline(ctx context.Context, cfg InstanceConfig Connectors: connector.NewBuilder(otelConfig.Connectors, factories.Connectors), Extensions: extension.NewBuilder(otelConfig.Extensions, factories.Extensions), OtelMetricViews: servicegraphprocessor.OtelMetricViews(), - OtelMetricReader: *promExporter, + OtelMetricReader: promExporter, + DisableProcessMetrics: true, UseExternalMetricsServer: true, TracerProvider: trace.NewNoopTracerProvider(), //TODO: Plug in an AsyncErrorChannel to shut down the Agent in case of a fatal event diff --git a/pkg/traces/internal/traceutils/otel_meter_settings.go b/pkg/traces/internal/traceutils/otel_meter_settings.go index b02a3cb07a4a..89934d3337a8 100644 --- a/pkg/traces/internal/traceutils/otel_meter_settings.go +++ b/pkg/traces/internal/traceutils/otel_meter_settings.go @@ -3,9 +3,12 @@ package traceutils import ( "github.com/prometheus/client_golang/prometheus" otelprom "go.opentelemetry.io/otel/exporters/prometheus" + otelmetric "go.opentelemetry.io/otel/sdk/metric" ) -func PrometheusExporter(reg prometheus.Registerer) (*otelprom.Exporter, error) { +// This function is used by both production code and unit tests. +// It makes sure that uint tests use the same conventions for metric readers as production code. +func PrometheusExporter(reg prometheus.Registerer) (otelmetric.Reader, error) { return otelprom.New( otelprom.WithRegisterer(reg), otelprom.WithoutUnits(),