Skip to content

Commit

Permalink
[datasetexporter]: Upgrade library to 0.17.0 (#29446)
Browse files Browse the repository at this point in the history
Upgrade to new version of the library.

This PR is implementing following issues:

* #27650 - metrics are not collected via open telemetry, so they can be
monitored. It's better version of the previous PR #27487 which was not
working.
* #27652 - it's configurable with the `debug` option whether
`session_key` is included or not

Other change is that fields that are specified as part of the `group_by`
configuration are now transferred as part of the session info.

**Link to tracking Issue:** #27650, #27652

**Testing:** 

1. Build docker image - make docker-otelcontribcol
2. Checkout https://github.com/open-telemetry/opentelemetry-demo
3. Update configuration in `docker-compose.yaml` and in the
`src/otelcollector/otelcol-config.yml`:
* In `docker-compose.yaml` switch image to the newly build one in step 1
* In `docker-compose.yaml` enable feature gate for collecting metrics -
`--feature-gates=telemetry.useOtelForInternalMetrics`
* In `src/otelcollector/otelcol-config.yml` enable metrics scraping by
prometheus
* In `src/otelcollector/otelcol-config.yml` add configuration for
dataset
```diff
diff --git a/docker-compose.yml b/docker-compose.yml
index 001f7c8..d7edd0d 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -646,14 +646,16 @@ services:

   # OpenTelemetry Collector
   otelcol:
-    image: otel/opentelemetry-collector-contrib:0.86.0
+    image: otelcontribcol:latest
     container_name: otel-col
     deploy:
       resources:
         limits:
           memory: 125M
     restart: unless-stopped
-    command: [ "--config=/etc/otelcol-config.yml", "--config=/etc/otelcol-config-extras.yml" ]
+    command: [ "--config=/etc/otelcol-config.yml", "--config=/etc/otelcol-config-extras.yml", "--feature-gates=telemetry.useOtelForInternalMetrics" ]
     volumes:
       - ./src/otelcollector/otelcol-config.yml:/etc/otelcol-config.yml
       - ./src/otelcollector/otelcol-config-extras.yml:/etc/otelcol-config-extras.yml
diff --git a/src/otelcollector/otelcol-config.yml b/src/otelcollector/otelcol-config.yml
index f2568ae..9944562 100644
--- a/src/otelcollector/otelcol-config.yml
+++ b/src/otelcollector/otelcol-config.yml
@@ -15,6 +15,14 @@ receivers:
     targets:
       - endpoint: http://frontendproxy:${env:ENVOY_PORT}

+  prometheus:
+    config:
+      scrape_configs:
+        - job_name: 'otel-collector'
+          scrape_interval: 5s
+          static_configs:
+            - targets: ['0.0.0.0:8888']
+
 exporters:
   debug:
   otlp:
@@ -29,6 +37,22 @@ exporters:
     endpoint: "http://prometheus:9090/api/v1/otlp"
     tls:
       insecure: true
+  logging:
+  dataset:
+    api_key: API_KEY
+    dataset_url: https://SERVER.scalyr.com
+    debug: true
+    buffer:
+      group_by:
+        - resource_name
+        - resource_type
+    logs:
+      export_resource_info_on_event: true
+    server_host:
+      server_host: Martin
+      use_hostname: false
+  dataset/aaa:
+    api_key: API_KEY
+    dataset_url: https://SERVER.scalyr.com
+    debug: true
+    buffer:
+      group_by:
+        - resource_name
+        - resource_type
+    logs:
+      export_resource_info_on_event: true
+    server_host:
+      server_host: MartinAAA
+      use_hostname: false

 processors:
   batch:
@@ -47,6 +71,11 @@ processors:
           - set(description, "") where name == "queueSize"
           # FIXME: remove when this issue is resolved: open-telemetry/opentelemetry-python-contrib#1958
           - set(description, "") where name == "http.client.duration"
+  attributes:
+    actions:
+      - key: otel.demo
+        value: 29446
+        action: upsert

 connectors:
   spanmetrics:
@@ -55,13 +84,13 @@ service:
   pipelines:
     traces:
       receivers: [otlp]
-      processors: [batch]
-      exporters: [otlp, debug, spanmetrics]
+      processors: [batch, attributes]
+      exporters: [otlp, debug, spanmetrics, dataset, dataset/aaa]
     metrics:
-      receivers: [httpcheck/frontendproxy, otlp, spanmetrics]
+      receivers: [httpcheck/frontendproxy, otlp, spanmetrics, prometheus]
       processors: [filter/ottl, transform, batch]
       exporters: [otlphttp/prometheus, debug]
     logs:
       receivers: [otlp]
-      processors: [batch]
-      exporters: [otlp/logs, debug]
+      processors: [batch, attributes]
+      exporters: [otlp/logs, debug, dataset, dataset/aaa]
```
4. Run the demo - `docker compose up --abort-on-container-exit`
5. Check, that metrics are in Grafana -
http://localhost:8080/grafana/explore?
<img width="838" alt="Screenshot 2023-11-27 at 12 29 29"
src="https://github.com/open-telemetry/opentelemetry-collector-contrib/assets/122797378/43d365dd-37d8-4528-b768-1d7f0ac34989">
6. Check some metrics
![Screenshot 2023-11-22 at 14 06
56](https://github.com/open-telemetry/opentelemetry-collector-contrib/assets/122797378/81306486-eb5e-49b1-87ed-25d1eb8afcf8)
<img width="1356" alt="Screenshot 2023-11-27 at 12 59 10"
src="https://github.com/open-telemetry/opentelemetry-collector-contrib/assets/122797378/34c36e45-850e-4e74-a18a-0a54ce97cbd3">
7. Check that data are available in dataset ![Screenshot 2023-11-22 at
13 33
50](https://github.com/open-telemetry/opentelemetry-collector-contrib/assets/122797378/77cb2f31-be14-463b-91a7-fd10f8dbfe3a)

**Documentation:** 

**Library changes:**
* Group By & Debug - scalyr/dataset-go#62
* Metrics  - scalyr/dataset-go#61

---------

Co-authored-by: Andrzej Stencel <astencel@sumologic.com>
  • Loading branch information
martin-majlis-s1 and andrzej-stencel authored Nov 28, 2023
1 parent 24eca0b commit 3a29d0f
Show file tree
Hide file tree
Showing 19 changed files with 301 additions and 162 deletions.
27 changes: 27 additions & 0 deletions .chloggen/datasetexporter-update-to-0.16.0.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: datasetexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Collect usage metrics with Otel and send grouped attributes in session info.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [27650, 27652]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
2 changes: 1 addition & 1 deletion cmd/configschema/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ require (
github.com/sagikazarmark/locafero v0.3.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/scaleway/scaleway-sdk-go v1.0.0-beta.21 // indirect
github.com/scalyr/dataset-go v0.14.0 // indirect
github.com/scalyr/dataset-go v0.17.0 // indirect
github.com/seccomp/libseccomp-golang v0.9.2-0.20220502022130-f33da4d89646 // indirect
github.com/secure-systems-lab/go-securesystemslib v0.7.0 // indirect
github.com/segmentio/asm v1.2.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions cmd/configschema/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cmd/otelcontribcol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ require (
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/samber/lo v1.38.1 // indirect
github.com/scaleway/scaleway-sdk-go v1.0.0-beta.21 // indirect
github.com/scalyr/dataset-go v0.14.0 // indirect
github.com/scalyr/dataset-go v0.17.0 // indirect
github.com/seccomp/libseccomp-golang v0.9.2-0.20220502022130-f33da4d89646 // indirect
github.com/secure-systems-lab/go-securesystemslib v0.7.0 // indirect
github.com/segmentio/asm v1.2.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions cmd/otelcontribcol/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 40 additions & 3 deletions exporter/datasetexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,10 @@ Make sure to provide the appropriate server host value in the `serverHost` attri

### Optional Settings

- `debug` (default = false): Adds `session_key` to the server fields. It's useful for debugging throughput issues.
- `buffer`:
- `max_lifetime` (default = 5s): The maximum delay between sending batches from the same source.
- `group_by` (default = []): The list of attributes based on which events should be grouped.
- `group_by` (default = []): The list of attributes based on which events should be grouped. They are moved from the event attributes to the session info and shown as server fields in the UI.
- `retry_initial_interval` (default = 5s): Time to wait after the first failure before retrying.
- `retry_max_interval` (default = 30s): Is the upper bound on backoff.
- `retry_max_elapsed_time` (default = 300s): Is the maximum amount of time spent trying to send a buffer.
Expand Down Expand Up @@ -259,8 +260,6 @@ service:
exporters: [dataset/traces]
```

## Examples

### Handling `serverHost` Attribute

Based on the given configuration and scenarios, here's the expected behavior:
Expand All @@ -280,3 +279,41 @@ Based on the given configuration and scenarios, here's the expected behavior:
5. Resource: `{}`, Log: `{'attribute.foo': 'Bar'}`, Env: `SERVER_HOST=''`, Hostname: `ip-172-31-27-19`
* Since the attribute `container_id` is not set and the environmental variable `SERVER_HOST` is empty, the `hostname` of the node (`ip-172-31-27-19`) will be used as the fallback value for `serverHost`.
* Used `serverHost` will be `ip-172-31-27-19`.

## Metrics

To enable metrics you have to:
1. Run collector with enabled feature gate `telemetry.useOtelForInternalMetrics`. This can be done by executing it with one additional parameter - `--feature-gates=telemetry.useOtelForInternalMetrics`.
2. Enable metrics scraping as part of the configuration and add receiver into services:
```yaml
receivers:
prometheus:
config:
scrape_configs:
- job_name: 'otel-collector'
scrape_interval: 5s
static_configs:
- targets: ['0.0.0.0:8888']
...
service:
pipelines:
metrics:
# add prometheus among metrics receivers
receivers: [prometheus]
processors: [batch]
exporters: [otlphttp/prometheus, debug]
```
### Available Metrics
Available metrics contain `dataset` in their name. There are counters related to the
number of processed events (`events`), buffers (`buffer`), and transferred bytes (`bytes`).
There are also histograms related to response times (`responseTime`) and payload size (`payloadSize`).

There are several counters related to events/buffers:
* `enqueued` - the number of received entities
* `processed` - the number of entities that were accepted by the next layer
* `dropped` - the number of entities that were not accepted by the next layer
* `broken` - the number of entities that were somehow corrupted during processing (should be 0)

The number of entities, that are still in the queue can be computed as `enqueued - (processed + dropped + broken)`.
8 changes: 7 additions & 1 deletion exporter/datasetexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,12 @@ func newDefaultServerHostSettings() ServerHostSettings {
}
}

const debugDefault = false

type Config struct {
DatasetURL string `mapstructure:"dataset_url"`
APIKey configopaque.String `mapstructure:"api_key"`
Debug bool `mapstructure:"debug"`
BufferSettings `mapstructure:"buffer"`
TracesSettings `mapstructure:"traces"`
LogsSettings `mapstructure:"logs"`
Expand Down Expand Up @@ -180,16 +183,18 @@ func (c *Config) Validate() error {
// String returns a string representation of the Config object.
// It includes all the fields and their values in the format "field_name: field_value".
func (c *Config) String() string {
apiKey, _ := c.APIKey.MarshalText()
s := ""
s += fmt.Sprintf("%s: %s; ", "DatasetURL", c.DatasetURL)
s += fmt.Sprintf("%s: %s (%d); ", "APIKey", apiKey, len(c.APIKey))
s += fmt.Sprintf("%s: %t; ", "Debug", c.Debug)
s += fmt.Sprintf("%s: %+v; ", "BufferSettings", c.BufferSettings)
s += fmt.Sprintf("%s: %+v; ", "LogsSettings", c.LogsSettings)
s += fmt.Sprintf("%s: %+v; ", "TracesSettings", c.TracesSettings)
s += fmt.Sprintf("%s: %+v; ", "ServerHostSettings", c.ServerHostSettings)
s += fmt.Sprintf("%s: %+v; ", "RetrySettings", c.RetrySettings)
s += fmt.Sprintf("%s: %+v; ", "QueueSettings", c.QueueSettings)
s += fmt.Sprintf("%s: %+v", "TimeoutSettings", c.TimeoutSettings)

return s
}

Expand Down Expand Up @@ -218,6 +223,7 @@ func (c *Config) convert() (*ExporterConfig, error) {
UseHostName: c.ServerHostSettings.UseHostName,
ServerHost: c.ServerHostSettings.ServerHost,
},
Debug: c.Debug,
},
tracesSettings: c.TracesSettings,
logsSettings: c.LogsSettings,
Expand Down
3 changes: 2 additions & 1 deletion exporter/datasetexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func TestConfigString(t *testing.T) {
config := Config{
DatasetURL: "https://example.com",
APIKey: "secret",
Debug: true,
BufferSettings: BufferSettings{
MaxLifetime: 123,
GroupBy: []string{"field1", "field2"},
Expand Down Expand Up @@ -140,7 +141,7 @@ func TestConfigString(t *testing.T) {
}

assert.Equal(t,
"DatasetURL: https://example.com; BufferSettings: {MaxLifetime:123ns GroupBy:[field1 field2] RetryInitialInterval:0s RetryMaxInterval:0s RetryMaxElapsedTime:0s RetryShutdownTimeout:0s}; LogsSettings: {ExportResourceInfo:true ExportResourcePrefix:AAA ExportScopeInfo:true ExportScopePrefix:BBB DecomposeComplexMessageField:true DecomposedComplexMessagePrefix:EEE exportSettings:{ExportSeparator:CCC ExportDistinguishingSuffix:DDD}}; TracesSettings: {exportSettings:{ExportSeparator:TTT ExportDistinguishingSuffix:UUU}}; ServerHostSettings: {UseHostName:false ServerHost:foo-bar}; RetrySettings: {Enabled:true InitialInterval:5s RandomizationFactor:0.5 Multiplier:1.5 MaxInterval:30s MaxElapsedTime:5m0s}; QueueSettings: {Enabled:true NumConsumers:10 QueueSize:1000 StorageID:<nil>}; TimeoutSettings: {Timeout:5s}",
"DatasetURL: https://example.com; APIKey: [REDACTED] (6); Debug: true; BufferSettings: {MaxLifetime:123ns GroupBy:[field1 field2] RetryInitialInterval:0s RetryMaxInterval:0s RetryMaxElapsedTime:0s RetryShutdownTimeout:0s}; LogsSettings: {ExportResourceInfo:true ExportResourcePrefix:AAA ExportScopeInfo:true ExportScopePrefix:BBB DecomposeComplexMessageField:true DecomposedComplexMessagePrefix:EEE exportSettings:{ExportSeparator:CCC ExportDistinguishingSuffix:DDD}}; TracesSettings: {exportSettings:{ExportSeparator:TTT ExportDistinguishingSuffix:UUU}}; ServerHostSettings: {UseHostName:false ServerHost:foo-bar}; RetrySettings: {Enabled:true InitialInterval:5s RandomizationFactor:0.5 Multiplier:1.5 MaxInterval:30s MaxElapsedTime:5m0s}; QueueSettings: {Enabled:true NumConsumers:10 QueueSize:1000 StorageID:<nil>}; TimeoutSettings: {Timeout:5s}",
config.String(),
)
}
Expand Down
15 changes: 12 additions & 3 deletions exporter/datasetexporter/datasetexporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@ import (
"github.com/google/uuid"
"github.com/scalyr/dataset-go/pkg/api/add_events"
"github.com/scalyr/dataset-go/pkg/client"
"github.com/scalyr/dataset-go/pkg/meter_config"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.uber.org/zap"
"golang.org/x/time/rate"
)

type DatasetExporter struct {
client *client.DataSetClient
limiter *rate.Limiter
logger *zap.Logger
session string
exporterCfg *ExporterConfig
Expand All @@ -34,6 +33,8 @@ func newDatasetExporter(entity string, config *Config, set exporter.CreateSettin
logger.Info("Creating new DataSetExporter",
zap.String("config", config.String()),
zap.String("entity", entity),
zap.String("id.string", set.ID.String()),
zap.String("id.name", set.ID.Name()),
)
exporterCfg, err := config.convert()
if err != nil {
Expand All @@ -48,11 +49,20 @@ func newDatasetExporter(entity string, config *Config, set exporter.CreateSettin
set.BuildInfo.Version,
entity,
)

meter := set.MeterProvider.Meter("datasetexporter")
meterConfig := meter_config.NewMeterConfig(
&meter,
entity,
set.ID.Name(),
)

client, err := client.NewClient(
exporterCfg.datasetConfig,
&http.Client{Timeout: time.Second * 60},
logger,
&userAgent,
meterConfig,
)
if err != nil {
logger.Error("Cannot create DataSetClient: ", zap.Error(err))
Expand All @@ -61,7 +71,6 @@ func newDatasetExporter(entity string, config *Config, set exporter.CreateSettin

return &DatasetExporter{
client: client,
limiter: rate.NewLimiter(100*rate.Every(1*time.Minute), 100), // 100 requests / minute
session: uuid.New().String(),
logger: logger,
exporterCfg: exporterCfg,
Expand Down
1 change: 1 addition & 0 deletions exporter/datasetexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func createDefaultConfig() component.Config {
RetrySettings: exporterhelper.NewDefaultRetrySettings(),
QueueSettings: exporterhelper.NewDefaultQueueSettings(),
TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(),
Debug: debugDefault,
}
}

Expand Down
3 changes: 2 additions & 1 deletion exporter/datasetexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func TestLoadConfig(t *testing.T) {
expected: &Config{
DatasetURL: "https://app.scalyr.com",
APIKey: "key-full",
Debug: true,
BufferSettings: BufferSettings{
MaxLifetime: 3456 * time.Millisecond,
GroupBy: []string{"body.map.kubernetes.pod_id", "body.map.kubernetes.docker_id", "body.map.stream"},
Expand Down Expand Up @@ -160,7 +161,7 @@ func createExporterTests() []CreateTest {
{
name: "broken",
config: &Config{},
expectedError: fmt.Errorf("cannot get DataSetExpoter: cannot convert config: DatasetURL: ; BufferSettings: {MaxLifetime:0s GroupBy:[] RetryInitialInterval:0s RetryMaxInterval:0s RetryMaxElapsedTime:0s RetryShutdownTimeout:0s}; LogsSettings: {ExportResourceInfo:false ExportResourcePrefix: ExportScopeInfo:false ExportScopePrefix: DecomposeComplexMessageField:false DecomposedComplexMessagePrefix: exportSettings:{ExportSeparator: ExportDistinguishingSuffix:}}; TracesSettings: {exportSettings:{ExportSeparator: ExportDistinguishingSuffix:}}; ServerHostSettings: {UseHostName:false ServerHost:}; RetrySettings: {Enabled:false InitialInterval:0s RandomizationFactor:0 Multiplier:0 MaxInterval:0s MaxElapsedTime:0s}; QueueSettings: {Enabled:false NumConsumers:0 QueueSize:0 StorageID:<nil>}; TimeoutSettings: {Timeout:0s}; config is not valid: api_key is required"),
expectedError: fmt.Errorf("cannot get DataSetExporter: cannot convert config: DatasetURL: ; APIKey: [REDACTED] (0); Debug: false; BufferSettings: {MaxLifetime:0s GroupBy:[] RetryInitialInterval:0s RetryMaxInterval:0s RetryMaxElapsedTime:0s RetryShutdownTimeout:0s}; LogsSettings: {ExportResourceInfo:false ExportResourcePrefix: ExportScopeInfo:false ExportScopePrefix: DecomposeComplexMessageField:false DecomposedComplexMessagePrefix: exportSettings:{ExportSeparator: ExportDistinguishingSuffix:}}; TracesSettings: {exportSettings:{ExportSeparator: ExportDistinguishingSuffix:}}; ServerHostSettings: {UseHostName:false ServerHost:}; RetrySettings: {Enabled:false InitialInterval:0s RandomizationFactor:0 Multiplier:0 MaxInterval:0s MaxElapsedTime:0s}; QueueSettings: {Enabled:false NumConsumers:0 QueueSize:0 StorageID:<nil>}; TimeoutSettings: {Timeout:0s}; config is not valid: api_key is required"),
},
{
name: "valid",
Expand Down
3 changes: 1 addition & 2 deletions exporter/datasetexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@ require (
github.com/google/uuid v1.4.0
// github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/filestorage v0.77.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.89.0
github.com/scalyr/dataset-go v0.14.0
github.com/scalyr/dataset-go v0.17.0
github.com/stretchr/testify v1.8.4
go.opentelemetry.io/collector/component v0.89.1-0.20231127181443-575c5f5e2531
go.opentelemetry.io/collector/confmap v0.89.1-0.20231127181443-575c5f5e2531
go.opentelemetry.io/collector/exporter v0.89.1-0.20231127181443-575c5f5e2531
go.opentelemetry.io/collector/pdata v1.0.0-rcv0018.0.20231127181443-575c5f5e2531
go.uber.org/zap v1.26.0
golang.org/x/time v0.4.0

)

Expand Down
6 changes: 2 additions & 4 deletions exporter/datasetexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion exporter/datasetexporter/logs_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func createLogsExporter(ctx context.Context, set exporter.CreateSettings, config
cfg := castConfig(config)
e, err := newDatasetExporter("logs", cfg, set)
if err != nil {
return nil, fmt.Errorf("cannot get DataSetExpoter: %w", err)
return nil, fmt.Errorf("cannot get DataSetExporter: %w", err)
}

return exporterhelper.NewLogsExporter(
Expand Down
Loading

0 comments on commit 3a29d0f

Please sign in to comment.