Skip to content

Commit

Permalink
fix: use UpDounCounter rather than gauge for pending request count and
Browse files Browse the repository at this point in the history
specify otel image for e2e tests

Signed-off-by: Joe Wogan <joe.wogan@10xbanking.com>
  • Loading branch information
zorocloud committed Apr 24, 2024
1 parent f5d5f03 commit 06fb14a
Show file tree
Hide file tree
Showing 9 changed files with 27 additions and 67 deletions.
25 changes: 10 additions & 15 deletions interceptor/metrics/otelmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ import (
)

type OtelMetrics struct {
meter api.Meter
requestCounter api.Int64Counter
pendingRequestGauge api.Int64ObservableGauge
meter api.Meter
requestCounter api.Int64Counter
pendingRequestCounter api.Int64UpDownCounter
}

func NewOtelMetrics(metricsConfig *config.Metrics, options ...metric.Option) *OtelMetrics {
Expand Down Expand Up @@ -63,15 +63,15 @@ func NewOtelMetrics(metricsConfig *config.Metrics, options ...metric.Option) *Ot
log.Fatalf("could not create new otelhttpmetric request counter: %v", err)
}

pendingRequestGauge, err := meter.Int64ObservableGauge("interceptor_pending_request_count", api.WithDescription("a gauge of requests pending forwarding by the interceptor proxy"))
pendingRequestCounter, err := meter.Int64UpDownCounter("interceptor_pending_request_count", api.WithDescription("a count of requests pending forwarding by the interceptor proxy"))
if err != nil {
log.Fatalf("could not create new otelhttpmetric pending request gauge: %v", err)
log.Fatalf("could not create new otelhttpmetric pending request counter: %v", err)
}

return &OtelMetrics{
meter: meter,
requestCounter: reqCounter,
pendingRequestGauge: pendingRequestGauge,
meter: meter,
requestCounter: reqCounter,
pendingRequestCounter: pendingRequestCounter,
}
}

Expand All @@ -89,19 +89,14 @@ func (om *OtelMetrics) RecordRequestCount(method string, path string, responseCo
}

func (om *OtelMetrics) RecordPendingRequestCount(host string, value int64) {
ctx := context.Background()
opt := api.WithAttributeSet(
attribute.NewSet(
attribute.Key("host").String(host),
),
)

_, err := om.meter.RegisterCallback(func(_ context.Context, o api.Observer) error {
o.ObserveInt64(om.pendingRequestGauge, value, opt)
return nil
}, om.pendingRequestGauge)
if err != nil {
log.Printf("error recording pending request values in prometheus gauge: %v", err)
}
om.pendingRequestCounter.Add(ctx, value, opt)
}

func getHeaders(s string) map[string]string {
Expand Down
4 changes: 2 additions & 2 deletions interceptor/metrics/otelmetrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestRequestCounter(t *testing.T) {
assert.Equal(t, data.Value, int64(1))
}

func TestPendingRequestGuage(t *testing.T) {
func TestPendingRequestCounter(t *testing.T) {
testOtel.RecordPendingRequestCount("test-host", 5)
got := metricdata.ResourceMetrics{}
err := testReader.Collect(context.Background(), &got)
Expand All @@ -47,7 +47,7 @@ func TestPendingRequestGuage(t *testing.T) {
assert.NotEqual(t, len(scopeMetrics.Metrics), 0)

metricInfo := retrieveMetric(scopeMetrics.Metrics, "interceptor_pending_request_count")
data := metricInfo.Data.(metricdata.Gauge[int64]).DataPoints[0]
data := metricInfo.Data.(metricdata.Sum[int64]).DataPoints[0]
assert.Equal(t, data.Value, int64(5))
}

Expand Down
25 changes: 10 additions & 15 deletions interceptor/metrics/prommetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import (
)

type PrometheusMetrics struct {
meter api.Meter
requestCounter api.Int64Counter
pendingRequestGauge api.Int64ObservableGauge
meter api.Meter
requestCounter api.Int64Counter
pendingRequestCounter api.Int64UpDownCounter
}

func NewPrometheusMetrics(options ...prometheus.Option) *PrometheusMetrics {
Expand Down Expand Up @@ -49,15 +49,15 @@ func NewPrometheusMetrics(options ...prometheus.Option) *PrometheusMetrics {
log.Fatalf("could not create new Prometheus request counter: %v", err)
}

pendingRequestGauge, err := meter.Int64ObservableGauge("interceptor_pending_request_count", api.WithDescription("a gauge of requests pending forwarding by the interceptor proxy"))
pendingRequestCounter, err := meter.Int64UpDownCounter("interceptor_pending_request_count", api.WithDescription("a count of requests pending forwarding by the interceptor proxy"))
if err != nil {
log.Fatalf("could not create new Prometheus pending request gauge: %v", err)
log.Fatalf("could not create new Prometheus pending request counter: %v", err)
}

return &PrometheusMetrics{
meter: meter,
requestCounter: reqCounter,
pendingRequestGauge: pendingRequestGauge,
meter: meter,
requestCounter: reqCounter,
pendingRequestCounter: pendingRequestCounter,
}
}

Expand All @@ -75,17 +75,12 @@ func (p *PrometheusMetrics) RecordRequestCount(method string, path string, respo
}

func (p *PrometheusMetrics) RecordPendingRequestCount(host string, value int64) {
ctx := context.Background()
opt := api.WithAttributeSet(
attribute.NewSet(
attribute.Key("host").String(host),
),
)

_, err := p.meter.RegisterCallback(func(_ context.Context, o api.Observer) error {
o.ObserveInt64(p.pendingRequestGauge, value, opt)
return nil
}, p.pendingRequestGauge)
if err != nil {
log.Printf("error recording pending request values in prometheus gauge: %v", err)
}
p.pendingRequestCounter.Add(ctx, value, opt)
}
2 changes: 1 addition & 1 deletion interceptor/metrics/prommetrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestPromPendingRequestCountMetric(t *testing.T) {
options := []promexporter.Option{promexporter.WithRegisterer(testRegistry)}
testPrometheus := NewPrometheusMetrics(options...)
expectedOutput := `
# HELP interceptor_pending_request_count a gauge of requests pending forwarding by the interceptor proxy
# HELP interceptor_pending_request_count a count of requests pending forwarding by the interceptor proxy
# TYPE interceptor_pending_request_count gauge
interceptor_pending_request_count{host="test-host",otel_scope_name="keda-interceptor-proxy",otel_scope_version=""} 10
# HELP otel_scope_info Instrumentation Scope metadata
Expand Down
10 changes: 2 additions & 8 deletions interceptor/middleware/counting.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,7 @@ func (cm *Counting) inc(logger logr.Logger, key string) bool {
return false
}

currentCount, validCount := cm.queueCounter.CurrentForHost(key)
if validCount {
metrics.RecordPendingRequestCount(key, int64(currentCount))
}
metrics.RecordPendingRequestCount(key, int64(1))

return true
}
Expand All @@ -84,10 +81,7 @@ func (cm *Counting) dec(logger logr.Logger, key string) bool {
return false
}

currentCount, validCount := cm.queueCounter.CurrentForHost(key)
if validCount {
metrics.RecordPendingRequestCount(key, int64(currentCount))
}
metrics.RecordPendingRequestCount(key, int64(-1))

return true
}
10 changes: 0 additions & 10 deletions pkg/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ type CountReader interface {
// Current returns the current count of pending requests
// for the given hostname
Current() (*Counts, error)
CurrentForHost(host string) (int, bool)
}

// QueueCounter represents a virtual HTTP queue, possibly distributed across
Expand Down Expand Up @@ -133,12 +132,3 @@ func (r *Memory) Current() (*Counts, error) {
}
return cts, nil
}

func (r *Memory) CurrentForHost(host string) (int, bool) {
r.mut.RLock()
defer r.mut.RUnlock()
if _, ok := r.concurrentMap[host]; ok {
return r.concurrentMap[host], true
}
return 0, false
}
13 changes: 0 additions & 13 deletions pkg/queue/queue_fakes.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,6 @@ func (f *FakeCounter) Current() (*Counts, error) {
return ret, nil
}

func (f *FakeCounter) CurrentForHost(host string) (int, bool) {
f.mapMut.RLock()
defer f.mapMut.RUnlock()
if _, ok := f.RetMap[host]; ok {
return f.RetMap[host].Concurrency, true
}
return 0, false
}

var _ CountReader = &FakeCountReader{}

type FakeCountReader struct {
Expand All @@ -117,7 +108,3 @@ func (f *FakeCountReader) Current() (*Counts, error) {
}
return ret, f.err
}

func (f *FakeCountReader) CurrentForHost(_ string) (int, bool) {
return f.concurrency, true
}
3 changes: 0 additions & 3 deletions pkg/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ func TestCurrent(t *testing.T) {
r.NoError(err)
r.Equal(current.Counts[host].Concurrency, memory.concurrentMap[host])
r.Equal(current.Counts[host].RPS, memory.rpsMap[host].WindowAverage(now))
currentForHost, validCount := memory.CurrentForHost("host1")
r.Equal(validCount, true)
r.Equal(currentForHost, 1)

err = memory.Increase(host, 1)
r.NoError(err)
Expand Down
2 changes: 2 additions & 0 deletions tests/utils/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (

var (
OtlpConfig = `mode: deployment
image:
repository: "otel/opentelemetry-collector-contrib"
config:
exporters:
logging:
Expand Down

0 comments on commit 06fb14a

Please sign in to comment.