From aed9b406fb7b37526d163791515585cd07c349d9 Mon Sep 17 00:00:00 2001 From: ssube Date: Wed, 11 Dec 2019 05:59:07 -0600 Subject: [PATCH] fix: make label metric names consistent with samples --- postgres/client.go | 98 +++++++++++++++++++++++++--------------------- 1 file changed, 54 insertions(+), 44 deletions(-) diff --git a/postgres/client.go b/postgres/client.go index 6558dfe..3caec92 100644 --- a/postgres/client.go +++ b/postgres/client.go @@ -80,6 +80,14 @@ var ( }, []string{"remote"}, ) + pingTime = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "ping_seconds", + Namespace: "adapter", + Subsystem: "connections", + }, + []string{"remote"}, + ) labelCacheSize = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "cache_current", @@ -89,25 +97,17 @@ var ( }, []string{"remote"}, ) - pingTime = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Name: "ping_seconds", - Namespace: "adapter", - Subsystem: "connections", - }, - []string{"remote"}, - ) - totalNewLabels = prometheus.NewCounterVec( + totalSkippedLabels = prometheus.NewCounterVec( prometheus.CounterOpts{ - Name: "new_total", + Name: "skipped_total", Namespace: "adapter", Subsystem: "labels", }, []string{"remote"}, ) - totalSkipLabels = prometheus.NewCounterVec( + totalWrittenLabels = prometheus.NewCounterVec( prometheus.CounterOpts{ - Name: "skip_total", + Name: "written_total", Namespace: "adapter", Subsystem: "labels", }, @@ -130,8 +130,8 @@ var ( []string{"remote"}, ) - labels_nothing = "INSERT INTO metric_labels(lid, time, labels) VALUES ( $1, $2, $3 ) ON CONFLICT (lid) DO NOTHING" - labels_update = "INSERT INTO metric_labels(lid, time, labels) VALUES ( $1, $2, $3 ) ON CONFLICT (lid) DO UPDATE SET time = EXCLUDED.time" + labelsNothing = "INSERT INTO metric_labels(lid, time, labels) VALUES ( $1, $2, $3 ) ON CONFLICT (lid) DO NOTHING" + labelsUpdate = "INSERT INTO metric_labels(lid, time, labels) VALUES ( $1, $2, $3 ) ON CONFLICT (lid) DO UPDATE SET time = EXCLUDED.time" ) func init() { @@ -141,8 +141,8 @@ func init() { prometheus.MustRegister(maxOpenConns) prometheus.MustRegister(labelCacheSize) prometheus.MustRegister(pingTime) - prometheus.MustRegister(totalNewLabels) - prometheus.MustRegister(totalSkipLabels) + prometheus.MustRegister(totalSkippedLabels) + prometheus.MustRegister(totalWrittenLabels) prometheus.MustRegister(totalInvalidSamples) prometheus.MustRegister(totalWrittenSamples) } @@ -220,7 +220,7 @@ func (c *Client) Write(metrics Metrics, samples model.Samples) error { } func (c *Client) WriteLabels(metrics Metrics) error { - txn, stmt, err := c.PrepareStmt(labels_update) + txn, stmt, err := c.PrepareStmt(labelsUpdate) if err != nil { level.Error(c.logger).Log("msg", "cannot prepare label statement", "err", err) return err @@ -229,37 +229,19 @@ func (c *Client) WriteLabels(metrics Metrics) error { defer stmt.Close() defer txn.Rollback() - newLabels := 0 - skipLabels := 0 + writtenLabels := 0 + skippedLabels := 0 t := time.Now() for _, m := range metrics { - lid, err := c.makeLid(m) - if err != nil { - level.Warn(c.logger).Log("msg", "error hashing labels", "err", err) - continue - } - - if c.cache.Contains(lid) { - skipLabels++ - level.Debug(c.logger).Log("msg", "skipping duplicate labels", "lid", lid) - continue - } - - labels, err := c.marshalMetric(m) + written, skipped, err := c.WriteLabel(m, stmt, t) if err != nil { - level.Warn(c.logger).Log("msg", "error marshaling metric", "err", err, "lid", lid) - continue - } - - _, err = stmt.Exec(lid, t, labels) - if err != nil { - level.Warn(c.logger).Log("msg", "error in single label execution", "err", err, "labels", labels, "lid", lid) - continue + level.Error(c.logger).Log("msg", "error writing single label", "err", err) + return err } - c.cache.Add(lid, nil) - newLabels++ + writtenLabels += written + skippedLabels += skipped } err = stmt.Close() @@ -273,12 +255,40 @@ func (c *Client) WriteLabels(metrics Metrics) error { return err } - totalNewLabels.WithLabelValues(c.Name()).Add(float64(newLabels)) - totalSkipLabels.WithLabelValues(c.Name()).Add(float64(skipLabels)) + totalSkippedLabels.WithLabelValues(c.Name()).Add(float64(skippedLabels)) + totalWrittenLabels.WithLabelValues(c.Name()).Add(float64(writtenLabels)) return nil } +func (c *Client) WriteLabel(m *model.Metric, stmt *sql.Stmt, t time.Time) (int, int, error) { + lid, err := c.makeLid(m) + if err != nil { + level.Warn(c.logger).Log("msg", "error hashing labels", "err", err) + return 0, 0, err + } + + if c.cache.Contains(lid) { + level.Debug(c.logger).Log("msg", "skipping duplicate labels", "lid", lid) + return 0, 1, err + } + + labels, err := c.marshalMetric(m) + if err != nil { + level.Warn(c.logger).Log("msg", "error marshaling metric", "err", err, "lid", lid) + return 0, 0, err + } + + _, err = stmt.Exec(lid, t, labels) + if err != nil { + level.Warn(c.logger).Log("msg", "error in single label execution", "err", err, "labels", labels, "lid", lid) + return 0, 0, err + } + + c.cache.Add(lid, nil) + return 1, 0, nil +} + func (c *Client) WriteSamples(samples model.Samples) error { txn, stmt, err := c.PrepareStmt(pq.CopyIn("metric_samples", "time", "name", "value", "lid")) if err != nil {