Skip to content
This repository has been archived by the owner on Dec 29, 2020. It is now read-only.

Commit

Permalink
feat: pass metrics to client writer
Browse files Browse the repository at this point in the history
  • Loading branch information
ssube committed Dec 1, 2019
1 parent 80de783 commit ff383d5
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 17 deletions.
17 changes: 10 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func parseFlags() *config {
}

type writer interface {
Write(samples model.Samples) error
Write(metrics postgres.Metrics, samples model.Samples) error
Name() string
}

Expand Down Expand Up @@ -203,14 +203,14 @@ func serve(logger log.Logger, addr string, writers []writer, readers []reader, a
return
}

samples := protoToSamples(&req, allowed)
metrics, samples := protoToSamples(&req, allowed)
receivedSamples.Add(float64(len(samples)))

var wg sync.WaitGroup
for _, w := range writers {
wg.Add(1)
go func(rw writer) {
sendSamples(logger, rw, samples)
sendSamples(logger, rw, metrics, samples)
wg.Done()
}(w)
}
Expand All @@ -220,7 +220,8 @@ func serve(logger log.Logger, addr string, writers []writer, readers []reader, a
return http.ListenAndServe(addr, nil)
}

func protoToSamples(req *prompb.WriteRequest, allowed []string) model.Samples {
func protoToSamples(req *prompb.WriteRequest, allowed []string) (postgres.Metrics, model.Samples) {
var metrics postgres.Metrics
var samples model.Samples
for _, ts := range req.Timeseries {
metric := make(model.Metric, len(ts.Labels))
Expand All @@ -233,6 +234,8 @@ func protoToSamples(req *prompb.WriteRequest, allowed []string) model.Samples {
continue
}

metrics = append(metrics, &metric)

for _, s := range ts.Samples {
samples = append(samples, &model.Sample{
Metric: metric,
Expand All @@ -241,12 +244,12 @@ func protoToSamples(req *prompb.WriteRequest, allowed []string) model.Samples {
})
}
}
return samples
return metrics, samples
}

func sendSamples(logger log.Logger, w writer, samples model.Samples) {
func sendSamples(logger log.Logger, w writer, metrics postgres.Metrics, samples model.Samples) {
begin := time.Now()
err := w.Write(samples)
err := w.Write(metrics, samples)
duration := time.Since(begin).Seconds()
if err != nil {
level.Warn(logger).Log("msg", "Error sending samples to remote storage", "err", err, "storage", w.Name(), "num_samples", len(samples))
Expand Down
22 changes: 12 additions & 10 deletions postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type Client struct {
db *sql.DB
}

type Metrics []*model.Metric

var (
maxOpenConns = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Expand Down Expand Up @@ -184,14 +186,14 @@ func NewClient(logger log.Logger, conn string, idle int, open int, cacheSize int
}

// Write sends a batch of samples to Postgres.
func (c *Client) Write(samples model.Samples) error {
func (c *Client) Write(metrics Metrics, samples model.Samples) error {
txn, err := c.db.Begin()
if err != nil {
return err
}
defer txn.Rollback()

err = c.WriteLabels(samples, txn)
err = c.WriteLabels(metrics, txn)
if err != nil {
return err
}
Expand All @@ -205,7 +207,7 @@ func (c *Client) Write(samples model.Samples) error {
return err
}

func (c *Client) WriteLabels(samples model.Samples, txn *sql.Tx) error {
func (c *Client) WriteLabels(metrics Metrics, txn *sql.Tx) error {
stmt, err := txn.Prepare(labels_update)
if err != nil {
level.Error(c.logger).Log("msg", "cannot prepare label statement", "err", err)
Expand All @@ -215,9 +217,10 @@ func (c *Client) WriteLabels(samples model.Samples, txn *sql.Tx) error {

newLabels := 0
skipLabels := 0
t := time.Now()

for _, s := range samples {
lid, err := c.makeLid(s.Metric)
for _, m := range metrics {
lid, err := c.makeLid(m)
if err != nil {
level.Warn(c.logger).Log("msg", "error hashing labels", "err", err)
continue
Expand All @@ -229,12 +232,11 @@ func (c *Client) WriteLabels(samples model.Samples, txn *sql.Tx) error {
continue
}

labels, err := c.marshalMetric(s.Metric)
labels, err := c.marshalMetric(m)
if err != nil {
continue
}

t := time.Unix(0, s.Timestamp.UnixNano())
_, err = stmt.Exec(lid, t, labels)
if err != nil {
level.Error(c.logger).Log("msg", "error in single label execution", "err", err, "labels", labels, "lid", lid)
Expand Down Expand Up @@ -315,7 +317,7 @@ func (c Client) Name() string {
return "postgres"
}

func (c Client) makeLid(m model.Metric) (string, error) {
func (c Client) makeLid(m *model.Metric) (string, error) {
buf := make([]byte, 16)
binary.LittleEndian.PutUint64(buf[0:], 0)
binary.LittleEndian.PutUint64(buf[8:], uint64(m.Fingerprint()))
Expand All @@ -328,7 +330,7 @@ func (c Client) makeLid(m model.Metric) (string, error) {
return u.String(), nil
}

func (c Client) marshalMetric(m model.Metric) (string, error) {
func (c Client) marshalMetric(m *model.Metric) (string, error) {
buf, err := json.Marshal(m)
if err != nil {
return "", err
Expand All @@ -337,7 +339,7 @@ func (c Client) marshalMetric(m model.Metric) (string, error) {
}

func (c Client) parseMetric(m model.Metric) (string, string, error) {
lid, err := c.makeLid(m)
lid, err := c.makeLid(&m)
if err != nil {
return "", "", err
}
Expand Down

0 comments on commit ff383d5

Please sign in to comment.