Skip to content
This repository has been archived by the owner on Dec 29, 2020. It is now read-only.

Commit

Permalink
fix: make label metric names consistent with samples
Browse files Browse the repository at this point in the history
  • Loading branch information
ssube committed Dec 11, 2019
1 parent 57ded15 commit aed9b40
Showing 1 changed file with 54 additions and 44 deletions.
98 changes: 54 additions & 44 deletions postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,14 @@ var (
},
[]string{"remote"},
)
pingTime = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "ping_seconds",
Namespace: "adapter",
Subsystem: "connections",
},
[]string{"remote"},
)
labelCacheSize = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "cache_current",
Expand All @@ -89,25 +97,17 @@ var (
},
[]string{"remote"},
)
pingTime = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "ping_seconds",
Namespace: "adapter",
Subsystem: "connections",
},
[]string{"remote"},
)
totalNewLabels = prometheus.NewCounterVec(
totalSkippedLabels = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "new_total",
Name: "skipped_total",
Namespace: "adapter",
Subsystem: "labels",
},
[]string{"remote"},
)
totalSkipLabels = prometheus.NewCounterVec(
totalWrittenLabels = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "skip_total",
Name: "written_total",
Namespace: "adapter",
Subsystem: "labels",
},
Expand All @@ -130,8 +130,8 @@ var (
[]string{"remote"},
)

labels_nothing = "INSERT INTO metric_labels(lid, time, labels) VALUES ( $1, $2, $3 ) ON CONFLICT (lid) DO NOTHING"
labels_update = "INSERT INTO metric_labels(lid, time, labels) VALUES ( $1, $2, $3 ) ON CONFLICT (lid) DO UPDATE SET time = EXCLUDED.time"
labelsNothing = "INSERT INTO metric_labels(lid, time, labels) VALUES ( $1, $2, $3 ) ON CONFLICT (lid) DO NOTHING"
labelsUpdate = "INSERT INTO metric_labels(lid, time, labels) VALUES ( $1, $2, $3 ) ON CONFLICT (lid) DO UPDATE SET time = EXCLUDED.time"
)

func init() {
Expand All @@ -141,8 +141,8 @@ func init() {
prometheus.MustRegister(maxOpenConns)
prometheus.MustRegister(labelCacheSize)
prometheus.MustRegister(pingTime)
prometheus.MustRegister(totalNewLabels)
prometheus.MustRegister(totalSkipLabels)
prometheus.MustRegister(totalSkippedLabels)
prometheus.MustRegister(totalWrittenLabels)
prometheus.MustRegister(totalInvalidSamples)
prometheus.MustRegister(totalWrittenSamples)
}
Expand Down Expand Up @@ -220,7 +220,7 @@ func (c *Client) Write(metrics Metrics, samples model.Samples) error {
}

func (c *Client) WriteLabels(metrics Metrics) error {
txn, stmt, err := c.PrepareStmt(labels_update)
txn, stmt, err := c.PrepareStmt(labelsUpdate)
if err != nil {
level.Error(c.logger).Log("msg", "cannot prepare label statement", "err", err)
return err
Expand All @@ -229,37 +229,19 @@ func (c *Client) WriteLabels(metrics Metrics) error {
defer stmt.Close()
defer txn.Rollback()

newLabels := 0
skipLabels := 0
writtenLabels := 0
skippedLabels := 0
t := time.Now()

for _, m := range metrics {
lid, err := c.makeLid(m)
if err != nil {
level.Warn(c.logger).Log("msg", "error hashing labels", "err", err)
continue
}

if c.cache.Contains(lid) {
skipLabels++
level.Debug(c.logger).Log("msg", "skipping duplicate labels", "lid", lid)
continue
}

labels, err := c.marshalMetric(m)
written, skipped, err := c.WriteLabel(m, stmt, t)
if err != nil {
level.Warn(c.logger).Log("msg", "error marshaling metric", "err", err, "lid", lid)
continue
}

_, err = stmt.Exec(lid, t, labels)
if err != nil {
level.Warn(c.logger).Log("msg", "error in single label execution", "err", err, "labels", labels, "lid", lid)
continue
level.Error(c.logger).Log("msg", "error writing single label", "err", err)
return err
}

c.cache.Add(lid, nil)
newLabels++
writtenLabels += written
skippedLabels += skipped
}

err = stmt.Close()
Expand All @@ -273,12 +255,40 @@ func (c *Client) WriteLabels(metrics Metrics) error {
return err
}

totalNewLabels.WithLabelValues(c.Name()).Add(float64(newLabels))
totalSkipLabels.WithLabelValues(c.Name()).Add(float64(skipLabels))
totalSkippedLabels.WithLabelValues(c.Name()).Add(float64(skippedLabels))
totalWrittenLabels.WithLabelValues(c.Name()).Add(float64(writtenLabels))

return nil
}

func (c *Client) WriteLabel(m *model.Metric, stmt *sql.Stmt, t time.Time) (int, int, error) {
lid, err := c.makeLid(m)
if err != nil {
level.Warn(c.logger).Log("msg", "error hashing labels", "err", err)
return 0, 0, err
}

if c.cache.Contains(lid) {
level.Debug(c.logger).Log("msg", "skipping duplicate labels", "lid", lid)
return 0, 1, err
}

labels, err := c.marshalMetric(m)
if err != nil {
level.Warn(c.logger).Log("msg", "error marshaling metric", "err", err, "lid", lid)
return 0, 0, err
}

_, err = stmt.Exec(lid, t, labels)
if err != nil {
level.Warn(c.logger).Log("msg", "error in single label execution", "err", err, "labels", labels, "lid", lid)
return 0, 0, err
}

c.cache.Add(lid, nil)
return 1, 0, nil
}

func (c *Client) WriteSamples(samples model.Samples) error {
txn, stmt, err := c.PrepareStmt(pq.CopyIn("metric_samples", "time", "name", "value", "lid"))
if err != nil {
Expand Down

0 comments on commit aed9b40

Please sign in to comment.