Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up old metrics using metrics timestamp #45

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 52 additions & 7 deletions cmd/prom-aggregation-gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http"
"sort"
"sync"
"time"

dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
Expand Down Expand Up @@ -78,14 +79,26 @@ func mergeBuckets(a, b []*dto.Bucket) []*dto.Bucket {
return output
}

func makeTimestampMs() int64 {
return time.Now().UnixNano() / int64(time.Millisecond)
}

func mergeMetric(ty dto.MetricType, a, b *dto.Metric) *dto.Metric {
// Getting the metric timestamp or creating one when that metric is merged
// It will be used to cleanup old metrics that have not been merged lately
metricTimestamp := b.GetTimestampMs()
if metricTimestamp == 0 {
metricTimestamp = makeTimestampMs()
}

switch ty {
case dto.MetricType_COUNTER:
return &dto.Metric{
Label: a.Label,
Counter: &dto.Counter{
Value: float64ptr(*a.Counter.Value + *b.Counter.Value),
},
TimestampMs: &metricTimestamp,
}

case dto.MetricType_GAUGE:
Expand All @@ -97,6 +110,7 @@ func mergeMetric(ty dto.MetricType, a, b *dto.Metric) *dto.Metric {
Gauge: &dto.Gauge{
Value: float64ptr(*a.Gauge.Value + *b.Gauge.Value),
},
TimestampMs: &metricTimestamp,
}

case dto.MetricType_HISTOGRAM:
Expand All @@ -107,6 +121,7 @@ func mergeMetric(ty dto.MetricType, a, b *dto.Metric) *dto.Metric {
SampleSum: float64ptr(*a.Histogram.SampleSum + *b.Histogram.SampleSum),
Bucket: mergeBuckets(a.Histogram.Bucket, b.Histogram.Bucket),
},
TimestampMs: &metricTimestamp,
}

case dto.MetricType_UNTYPED:
Expand All @@ -115,6 +130,7 @@ func mergeMetric(ty dto.MetricType, a, b *dto.Metric) *dto.Metric {
Untyped: &dto.Untyped{
Value: float64ptr(*a.Untyped.Value + *b.Untyped.Value),
},
TimestampMs: &metricTimestamp,
}

case dto.MetricType_SUMMARY:
Expand Down Expand Up @@ -164,13 +180,15 @@ func mergeFamily(a, b *dto.MetricFamily) (*dto.MetricFamily, error) {
}

type aggate struct {
timeToLiveMs int64
familiesLock sync.RWMutex
families map[string]*dto.MetricFamily
}

func newAggate() *aggate {
func newAggate(ttl int64) *aggate {
return &aggate{
families: map[string]*dto.MetricFamily{},
timeToLiveMs: ttl,
families: map[string]*dto.MetricFamily{},
}
}

Expand All @@ -196,6 +214,21 @@ func validateFamily(f *dto.MetricFamily) error {
return nil
}

func cleanupFamily(metrics []*dto.Metric, ttl int64) []*dto.Metric {
// CurrentTS for old metrics check
nowTS := makeTimestampMs()

// Iterating over metrics and filtering out the old, not recently merged ones
var updatedMetrics []*dto.Metric
for _, metric := range metrics {
if nowTS-metric.GetTimestampMs() <= ttl {
updatedMetrics = append(updatedMetrics, metric)
}
}

return updatedMetrics
}

func (a *aggate) parseAndMerge(r io.Reader) error {
var parser expfmt.TextParser
inFamilies, err := parser.TextToMetricFamilies(r)
Expand Down Expand Up @@ -240,11 +273,20 @@ func (a *aggate) handler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", string(contentType))
enc := expfmt.NewEncoder(w, contentType)

a.familiesLock.RLock()
defer a.familiesLock.RUnlock()
a.familiesLock.Lock()
defer a.familiesLock.Unlock()
metricNames := []string{}
for name := range a.families {
metricNames = append(metricNames, name)
// Cleaning up metrics that have not been merged for a while
a.families[name].Metric = cleanupFamily(a.families[name].GetMetric(), a.timeToLiveMs)

// Including only families that still have metrics to be scraped
if len(a.families[name].Metric) > 0 {
metricNames = append(metricNames, name)
} else {
// Remove the empty families
delete(a.families, name)
}
}
sort.Sort(sort.StringSlice(metricNames))

Expand All @@ -259,12 +301,15 @@ func (a *aggate) handler(w http.ResponseWriter, r *http.Request) {
}

func main() {
listen := flag.String("listen", ":80", "Address and port to listen on.")
listen := flag.String("listen", ":8080", "Address and port to listen on.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be a breaking change for users, and unrelated to the description. Please remove from this PR.

cors := flag.String("cors", "*", "The 'Access-Control-Allow-Origin' value to be returned.")
pushPath := flag.String("push-path", "/metrics/", "HTTP path to accept pushed metrics.")
timeToLiveMs := flag.Int64("time-to-live-ms", 3600000, "How long unmerged metrics will live, in milliseconds (default 1h)")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little user-hostile; flag.Duration would allow specifying as 1h, 10m, etc.

flag.Parse()

a := newAggate()
log.Println("PAG started on port", *listen)

a := newAggate(*timeToLiveMs)
http.HandleFunc("/metrics", a.handler)
http.HandleFunc(*pushPath, func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", *cors)
Expand Down
140 changes: 73 additions & 67 deletions cmd/prom-aggregation-gateway/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,118 +5,119 @@ import (
"net/http/httptest"
"strings"
"testing"
"time"

"github.com/pmezard/go-difflib/difflib"
)

const (
var (
in1 = `
# HELP gauge A gauge
# TYPE gauge gauge
gauge 42
gauge 42 %[1]d
# HELP counter A counter
# TYPE counter counter
counter 31
counter 31 %[1]d
# HELP histogram A histogram
# TYPE histogram histogram
histogram_bucket{le="1"} 0
histogram_bucket{le="2"} 0
histogram_bucket{le="3"} 3
histogram_bucket{le="4"} 4
histogram_bucket{le="5"} 4
histogram_bucket{le="6"} 4
histogram_bucket{le="7"} 4
histogram_bucket{le="8"} 4
histogram_bucket{le="9"} 4
histogram_bucket{le="10"} 4
histogram_bucket{le="+Inf"} 4
histogram_sum{} 2.5
histogram_count{} 1
histogram_bucket{le="1"} 0 %[1]d
histogram_bucket{le="2"} 0 %[1]d
histogram_bucket{le="3"} 3 %[1]d
histogram_bucket{le="4"} 4 %[1]d
histogram_bucket{le="5"} 4 %[1]d
histogram_bucket{le="6"} 4 %[1]d
histogram_bucket{le="7"} 4 %[1]d
histogram_bucket{le="8"} 4 %[1]d
histogram_bucket{le="9"} 4 %[1]d
histogram_bucket{le="10"} 4 %[1]d
histogram_bucket{le="+Inf"} 4 %[1]d
histogram_sum{} 2.5 %[1]d
histogram_count{} 1 %[1]d
`
in2 = `
# HELP gauge A gauge
# TYPE gauge gauge
gauge 57
gauge 57 %[1]d
# HELP counter A counter
# TYPE counter counter
counter 29
counter 29 %[1]d
# HELP histogram A histogram
# TYPE histogram histogram
histogram_bucket{le="1"} 0
histogram_bucket{le="2"} 0
histogram_bucket{le="3"} 0
histogram_bucket{le="4"} 4
histogram_bucket{le="5"} 5
histogram_bucket{le="6"} 5
histogram_bucket{le="7"} 5
histogram_bucket{le="8"} 5
histogram_bucket{le="9"} 5
histogram_bucket{le="10"} 5
histogram_bucket{le="+Inf"} 5
histogram_sum 4.5
histogram_count 1
histogram_bucket{le="1"} 0 %[1]d
histogram_bucket{le="2"} 0 %[1]d
histogram_bucket{le="3"} 0 %[1]d
histogram_bucket{le="4"} 4 %[1]d
histogram_bucket{le="5"} 5 %[1]d
histogram_bucket{le="6"} 5 %[1]d
histogram_bucket{le="7"} 5 %[1]d
histogram_bucket{le="8"} 5 %[1]d
histogram_bucket{le="9"} 5 %[1]d
histogram_bucket{le="10"} 5 %[1]d
histogram_bucket{le="+Inf"} 5 %[1]d
histogram_sum 4.5 %[1]d
histogram_count 1 %[1]d
`
want = `# HELP counter A counter
# TYPE counter counter
counter 60
counter 60 %[1]d
# HELP gauge A gauge
# TYPE gauge gauge
gauge 99
gauge 99 %[1]d
# HELP histogram A histogram
# TYPE histogram histogram
histogram_bucket{le="1"} 0
histogram_bucket{le="2"} 0
histogram_bucket{le="3"} 3
histogram_bucket{le="4"} 8
histogram_bucket{le="5"} 9
histogram_bucket{le="6"} 9
histogram_bucket{le="7"} 9
histogram_bucket{le="8"} 9
histogram_bucket{le="9"} 9
histogram_bucket{le="10"} 9
histogram_bucket{le="+Inf"} 9
histogram_sum 7
histogram_count 2
histogram_bucket{le="1"} 0 %[1]d
histogram_bucket{le="2"} 0 %[1]d
histogram_bucket{le="3"} 3 %[1]d
histogram_bucket{le="4"} 8 %[1]d
histogram_bucket{le="5"} 9 %[1]d
histogram_bucket{le="6"} 9 %[1]d
histogram_bucket{le="7"} 9 %[1]d
histogram_bucket{le="8"} 9 %[1]d
histogram_bucket{le="9"} 9 %[1]d
histogram_bucket{le="10"} 9 %[1]d
histogram_bucket{le="+Inf"} 9 %[1]d
histogram_sum 7 %[1]d
histogram_count 2 %[1]d
`

multilabel1 = `# HELP counter A counter
# TYPE counter counter
counter{a="a",b="b"} 1
counter{a="a",b="b"} 1 %[1]d
`
multilabel2 = `# HELP counter A counter
# TYPE counter counter
counter{a="a",b="b"} 2
counter{a="a",b="b"} 2 %[1]d
`
multilabelResult = `# HELP counter A counter
# TYPE counter counter
counter{a="a",b="b"} 3
counter{a="a",b="b"} 3 %[1]d
`
labelFields1 = `# HELP ui_page_render_errors A counter
# TYPE ui_page_render_errors counter
ui_page_render_errors{path="/org/:orgId"} 1
ui_page_render_errors{path="/prom/:orgId"} 1
ui_page_render_errors{path="/org/:orgId"} 1 %[1]d
ui_page_render_errors{path="/prom/:orgId"} 1 %[1]d
`
labelFields2 = `# HELP ui_page_render_errors A counter
# TYPE ui_page_render_errors counter
ui_page_render_errors{path="/prom/:orgId"} 1
ui_page_render_errors{path="/prom/:orgId"} 1 %[1]d
`
labelFieldResult = `# HELP ui_page_render_errors A counter
# TYPE ui_page_render_errors counter
ui_page_render_errors{path="/org/:orgId"} 1
ui_page_render_errors{path="/prom/:orgId"} 2
ui_page_render_errors{path="/org/:orgId"} 1 %[1]d
ui_page_render_errors{path="/prom/:orgId"} 2 %[1]d
`
gaugeInput = `
# HELP ui_external_lib_loaded A gauge with entries in un-sorted order
# TYPE ui_external_lib_loaded gauge
ui_external_lib_loaded{name="ga",loaded="true"} 1
ui_external_lib_loaded{name="Intercom",loaded="true"} 1
ui_external_lib_loaded{name="mixpanel",loaded="true"} 1
ui_external_lib_loaded{name="ga",loaded="true"} 1 %[1]d
ui_external_lib_loaded{name="Intercom",loaded="true"} 1 %[1]d
ui_external_lib_loaded{name="mixpanel",loaded="true"} 1 %[1]d
`
gaugeOutput = `# HELP ui_external_lib_loaded A gauge with entries in un-sorted order
# TYPE ui_external_lib_loaded gauge
ui_external_lib_loaded{loaded="true",name="Intercom"} 2
ui_external_lib_loaded{loaded="true",name="ga"} 2
ui_external_lib_loaded{loaded="true",name="mixpanel"} 2
ui_external_lib_loaded{loaded="true",name="Intercom"} 2 %[1]d
ui_external_lib_loaded{loaded="true",name="ga"} 2 %[1]d
ui_external_lib_loaded{loaded="true",name="mixpanel"} 2 %[1]d
`
duplicateLabels = `
# HELP ui_external_lib_loaded Test with duplicate values
Expand All @@ -128,19 +129,21 @@ ui_external_lib_loaded{name="Munchkin",loaded="true"} 1

reorderedLabels1 = `# HELP counter A counter
# TYPE counter counter
counter{a="a",b="b"} 1
counter{a="a",b="b"} 1 %[1]d
`
reorderedLabels2 = `# HELP counter A counter
# TYPE counter counter
counter{b="b",a="a"} 2
counter{b="b",a="a"} 2 %[1]d
`
reorderedLabelsResult = `# HELP counter A counter
# TYPE counter counter
counter{a="a",b="b"} 3
counter{a="a",b="b"} 3 %[1]d
`
)

func TestAggate(t *testing.T) {
now := time.Now().UnixNano() / int64(time.Millisecond)

for _, c := range []struct {
a, b string
want string
Expand All @@ -154,17 +157,20 @@ func TestAggate(t *testing.T) {
{duplicateLabels, "", "", fmt.Errorf("%s", duplicateError), nil},
{reorderedLabels1, reorderedLabels2, reorderedLabelsResult, nil, nil},
} {
a := newAggate()
a := newAggate(3600000)
if c.b != "" {
c.a, c.b, c.want = fmt.Sprintf(c.a, now), fmt.Sprintf(c.b, now), fmt.Sprintf(c.want, now)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this checked or relied on? I can't see that this test makes any use of the timestamp.

}

if err := a.parseAndMerge(strings.NewReader(c.a)); err != nil {
if c.err1 == nil {
t.Fatalf("Unexpected error: %s", err)
t.Fatalf("Unexpected error: '%s'", err)
} else if c.err1.Error() != err.Error() {
t.Fatalf("Expected %s, got %s", c.err1, err)
t.Fatalf("Expected '%s', got '%s'", c.err1, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

%q ?

}
}
if err := a.parseAndMerge(strings.NewReader(c.b)); err != c.err2 {
t.Fatalf("Expected %s, got %s", c.err2, err)
t.Fatalf("Expected '%s', got '%s'", c.err2, err)
}

r := httptest.NewRequest("GET", "http://example.com/foo", nil)
Expand Down