Skip to content

Commit

Permalink
Merge branch 'main' into exporter/postgres
Browse files Browse the repository at this point in the history
  • Loading branch information
destrex271 authored Sep 29, 2024
2 parents a3b2930 + 70b26af commit 1183327
Show file tree
Hide file tree
Showing 34 changed files with 1,041 additions and 40 deletions.
27 changes: 27 additions & 0 deletions .chloggen/opampsupervisor_bootstrap_timeout_option.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: opampsupervisor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add config option for setting the timeout for the initial bootstrap information retrieval from the agent

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34996]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
27 changes: 27 additions & 0 deletions .chloggen/ottl-get-xml.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/ottl

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add GetXML Converter

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35462]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
27 changes: 27 additions & 0 deletions .chloggen/sqlqueryreceiver-collect-errors.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: sqlqueryreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fail if value for log column in result set is missing, collect errors

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35068]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
3 changes: 3 additions & 0 deletions cmd/opampsupervisor/specification/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ agent:
# The interval on which the Collector checks to see if it's been orphaned.
orphan_detection_interval: 5s

# The maximum wait duration for retrieving bootstrapping information from the agent
bootstrap_timeout: 3s

# Extra command line flags to pass to the Collector executable.
args:

Expand Down
6 changes: 6 additions & 0 deletions cmd/opampsupervisor/supervisor/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ type Agent struct {
Executable string
OrphanDetectionInterval time.Duration `mapstructure:"orphan_detection_interval"`
Description AgentDescription `mapstructure:"description"`
BootstrapTimeout time.Duration `mapstructure:"bootstrap_timeout"`
HealthCheckPort int `mapstructure:"health_check_port"`
}

Expand All @@ -129,6 +130,10 @@ func (a Agent) Validate() error {
return errors.New("agent::orphan_detection_interval must be positive")
}

if a.BootstrapTimeout <= 0 {
return errors.New("agent::bootstrap_timeout must be positive")
}

if a.HealthCheckPort < 0 || a.HealthCheckPort > 65535 {
return errors.New("agent::health_check_port must be a valid port number")
}
Expand Down Expand Up @@ -180,6 +185,7 @@ func DefaultSupervisor() Supervisor {
},
Agent: Agent{
OrphanDetectionInterval: 5 * time.Second,
BootstrapTimeout: 3 * time.Second,
},
}
}
32 changes: 32 additions & 0 deletions cmd/opampsupervisor/supervisor/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func TestValidate(t *testing.T) {
Agent: Agent{
Executable: "${file_path}",
OrphanDetectionInterval: 5 * time.Second,
BootstrapTimeout: 5 * time.Second,
},
Capabilities: Capabilities{
AcceptsRemoteConfig: true,
Expand Down Expand Up @@ -163,6 +164,7 @@ func TestValidate(t *testing.T) {
Agent: Agent{
Executable: "",
OrphanDetectionInterval: 5 * time.Second,
BootstrapTimeout: 5 * time.Second,
},
Capabilities: Capabilities{
AcceptsRemoteConfig: true,
Expand All @@ -188,6 +190,7 @@ func TestValidate(t *testing.T) {
Agent: Agent{
Executable: "./path/does/not/exist",
OrphanDetectionInterval: 5 * time.Second,
BootstrapTimeout: 5 * time.Second,
},
Capabilities: Capabilities{
AcceptsRemoteConfig: true,
Expand Down Expand Up @@ -239,6 +242,7 @@ func TestValidate(t *testing.T) {
Executable: "${file_path}",
OrphanDetectionInterval: 5 * time.Second,
HealthCheckPort: 65536,
BootstrapTimeout: 5 * time.Second,
},
Capabilities: Capabilities{
AcceptsRemoteConfig: true,
Expand All @@ -265,6 +269,7 @@ func TestValidate(t *testing.T) {
Executable: "${file_path}",
OrphanDetectionInterval: 5 * time.Second,
HealthCheckPort: 0,
BootstrapTimeout: 5 * time.Second,
},
Capabilities: Capabilities{
AcceptsRemoteConfig: true,
Expand All @@ -290,6 +295,7 @@ func TestValidate(t *testing.T) {
Executable: "${file_path}",
OrphanDetectionInterval: 5 * time.Second,
HealthCheckPort: 29848,
BootstrapTimeout: 5 * time.Second,
},
Capabilities: Capabilities{
AcceptsRemoteConfig: true,
Expand All @@ -299,6 +305,32 @@ func TestValidate(t *testing.T) {
},
},
},
{
name: "config with invalid agent bootstrap timeout",
config: Supervisor{
Server: OpAMPServer{
Endpoint: "wss://localhost:9090/opamp",
Headers: http.Header{
"Header1": []string{"HeaderValue"},
},
TLSSetting: configtls.ClientConfig{
Insecure: true,
},
},
Agent: Agent{
Executable: "${file_path}",
OrphanDetectionInterval: 5 * time.Second,
BootstrapTimeout: -5 * time.Second,
},
Capabilities: Capabilities{
AcceptsRemoteConfig: true,
},
Storage: Storage{
Directory: "/etc/opamp-supervisor/storage",
},
},
expectedError: "agent::bootstrap_timeout must be positive",
},
}

// create some fake files for validating agent config
Expand Down
3 changes: 1 addition & 2 deletions cmd/opampsupervisor/supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,8 +365,7 @@ func (s *Supervisor) getBootstrapInfo() (err error) {
}()

select {
// TODO make timeout configurable
case <-time.After(3 * time.Second):
case <-time.After(s.config.Agent.BootstrapTimeout):
if connected.Load() {
return errors.New("collector connected but never responded with an AgentDescription message")
} else {
Expand Down
30 changes: 30 additions & 0 deletions internal/exp/metrics/staleness/staleness.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,33 @@ func (s *Staleness[T]) Evict() (identity.Stream, bool) {
func (s *Staleness[T]) Clear() {
s.items.Clear()
}

type Tracker struct {
pq PriorityQueue
}

func NewTracker() Tracker {
return Tracker{pq: NewPriorityQueue()}
}

func (stale Tracker) Refresh(ts time.Time, ids ...identity.Stream) {
for _, id := range ids {
stale.pq.Update(id, ts)
}
}

func (stale Tracker) Collect(max time.Duration) []identity.Stream {
now := NowFunc()

var ids []identity.Stream
for stale.pq.Len() > 0 {
_, ts := stale.pq.Peek()
if now.Sub(ts) < max {
break
}
id, _ := stale.pq.Pop()
ids = append(ids, id)
}

return ids
}
19 changes: 11 additions & 8 deletions internal/sqlquery/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package sqlquery // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/sqlquery"

import (
"errors"
"fmt"
"strconv"

Expand All @@ -18,36 +19,38 @@ func rowToMetric(row StringMap, cfg MetricCfg, dest pmetric.Metric, startTime pc
dest.SetUnit(cfg.Unit)
dataPointSlice := setMetricFields(cfg, dest)
dataPoint := dataPointSlice.AppendEmpty()
var errs []error
if cfg.StartTsColumn != "" {
if val, found := row[cfg.StartTsColumn]; found {
timestamp, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse uint64 for %q, value was %q: %w", cfg.StartTsColumn, val, err)
errs = append(errs, fmt.Errorf("failed to parse uint64 for %q, value was %q: %w", cfg.StartTsColumn, val, err))
}
startTime = pcommon.Timestamp(timestamp)
} else {
return fmt.Errorf("rowToMetric: start_ts_column not found")
errs = append(errs, fmt.Errorf("rowToMetric: start_ts_column not found"))
}
}
if cfg.TsColumn != "" {
if val, found := row[cfg.TsColumn]; found {
timestamp, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse uint64 for %q, value was %q: %w", cfg.TsColumn, val, err)
errs = append(errs, fmt.Errorf("failed to parse uint64 for %q, value was %q: %w", cfg.TsColumn, val, err))
}
ts = pcommon.Timestamp(timestamp)
} else {
return fmt.Errorf("rowToMetric: ts_column not found")
errs = append(errs, fmt.Errorf("rowToMetric: ts_column not found"))
}
}
setTimestamp(cfg, dataPoint, startTime, ts, scrapeCfg)
value, found := row[cfg.ValueColumn]
if !found {
return fmt.Errorf("rowToMetric: value_column '%s' not found in result set", cfg.ValueColumn)
errs = append(errs, fmt.Errorf("rowToMetric: value_column '%s' not found in result set", cfg.ValueColumn))
}

err := setDataPointValue(cfg, value, dataPoint)
if err != nil {
return fmt.Errorf("rowToMetric: %w", err)
errs = append(errs, fmt.Errorf("rowToMetric: %w", err))
}
attrs := dataPoint.Attributes()
for k, v := range cfg.StaticAttributes {
Expand All @@ -57,10 +60,10 @@ func rowToMetric(row StringMap, cfg MetricCfg, dest pmetric.Metric, startTime pc
if attrVal, found := row[columnName]; found {
attrs.PutStr(columnName, attrVal)
} else {
return fmt.Errorf("rowToMetric: attribute_column not found: '%s'", columnName)
errs = append(errs, fmt.Errorf("rowToMetric: attribute_column '%s' not found in result set", columnName))
}
}
return nil
return errors.Join(errs...)
}

func setTimestamp(cfg MetricCfg, dp pmetric.NumberDataPoint, startTime pcommon.Timestamp, ts pcommon.Timestamp, scrapeCfg scraperhelper.ControllerConfig) {
Expand Down
29 changes: 29 additions & 0 deletions internal/sqlquery/scraper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,35 @@ func TestScraper_StartAndTS_ErrorOnColumnNotFound(t *testing.T) {
assert.Error(t, err)
}

func TestScraper_CollectRowToMetricsErrors(t *testing.T) {
client := &FakeDBClient{
StringMaps: [][]StringMap{{
{
"mycol": "42",
},
}},
}
scrpr := Scraper{
Client: client,
Query: Query{
Metrics: []MetricCfg{{
MetricName: "my.name",
ValueColumn: "mycol_na",
TsColumn: "Ts",
StartTsColumn: "StartTs",
AttributeColumns: []string{"attr_na"},
DataType: MetricTypeSum,
Aggregation: MetricAggregationCumulative,
}},
},
}
_, err := scrpr.Scrape(context.Background())
assert.ErrorContains(t, err, "rowToMetric: start_ts_column not found")
assert.ErrorContains(t, err, "rowToMetric: ts_column not found")
assert.ErrorContains(t, err, "rowToMetric: value_column 'mycol_na' not found in result set")
assert.ErrorContains(t, err, "rowToMetric: attribute_column 'attr_na' not found in result set")
}

func TestScraper_StartAndTS_ErrorOnParse(t *testing.T) {
client := &FakeDBClient{
StringMaps: [][]StringMap{{
Expand Down
6 changes: 6 additions & 0 deletions pkg/ottl/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,12 @@ func Test_e2e_converters(t *testing.T) {
tCtx.GetLogRecord().Attributes().PutInt("test", 1)
},
},
{
statement: `set(attributes["test"], GetXML("<a><b>1</b><c><b>2</b></c></a>", "/a//b"))`,
want: func(tCtx ottllog.TransformContext) {
tCtx.GetLogRecord().Attributes().PutStr("test", "<b>1</b><b>2</b>")
},
},
{
statement: `set(attributes["test"], Hex(1.0))`,
want: func(tCtx ottllog.TransformContext) {
Expand Down
Loading

0 comments on commit 1183327

Please sign in to comment.