Skip to content
This repository has been archived by the owner on Dec 29, 2020. It is now read-only.

Commit

Permalink
feat(postgres): retain lids in 2Q cache
Browse files Browse the repository at this point in the history
  • Loading branch information
ssube committed Nov 27, 2019
1 parent def5b21 commit 2e58188
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 19 deletions.
5 changes: 4 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (

type config struct {
allowedNames []string
pgCacheSize int
pgConnStr string
pgMaxIdle int
pgMaxOpen int
Expand Down Expand Up @@ -125,6 +126,8 @@ func parseFlags() *config {
a.Flag("allow", "The allowed metric names.").
Default("").StringsVar(&cfg.allowedNames)

a.Flag("pg.cache-size", "The maximum label cache size.").
Default("1000").IntVar(&cfg.pgCacheSize)
a.Flag("pg.conn-str", "The connection string for pq.").
Default("").StringVar(&cfg.pgConnStr)
a.Flag("pg.max-idle", "The max idle connections.").
Expand Down Expand Up @@ -166,7 +169,7 @@ func buildClients(logger log.Logger, cfg *config) ([]writer, []reader) {
level.Info(logger).Log("msg", "Starting postgres...", "conn", cfg.pgConnStr)
c := postgres.NewClient(
log.With(logger, "storage", "postgres"),
cfg.pgConnStr, cfg.pgMaxIdle, cfg.pgMaxOpen)
cfg.pgConnStr, cfg.pgMaxIdle, cfg.pgMaxOpen, cfg.pgCacheSize)
if c != nil {
writers = append(writers, c)
} else {
Expand Down
43 changes: 25 additions & 18 deletions postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
lru "github.com/hashicorp/golang-lru"
"github.com/lib/pq"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
Expand All @@ -33,8 +34,9 @@ import (
type Client struct {
logger log.Logger

db *sql.DB
cron *cron.Cron
cache *lru.TwoQueueCache
cron *cron.Cron
db *sql.DB
}

var (
Expand Down Expand Up @@ -87,7 +89,7 @@ func init() {
}

// NewClient creates a new Client.
func NewClient(logger log.Logger, conn string, idle int, open int) *Client {
func NewClient(logger log.Logger, conn string, idle int, open int, cacheSize int) *Client {
if logger == nil {
logger = log.NewNopLogger()
}
Expand All @@ -99,17 +101,23 @@ func NewClient(logger log.Logger, conn string, idle int, open int) *Client {
db.SetMaxIdleConns(idle)
db.SetMaxOpenConns(open)

cr := cron.New(cron.WithSeconds())
cache, err := lru.New2Q(cacheSize)
if err != nil {
level.Error(logger).Log("msg", "error creating lid cache", "err", err)
return nil
}

c := &Client{
cron: cr,
cache: cache,
cron: cron.New(cron.WithSeconds()),
logger: logger,
db: db,
}

cr.AddFunc("@every 15s", func() {
c.cron.AddFunc("@every 15s", func() {
c.UpdateStats()
})
cr.Start()
c.cron.Start()

c.UpdateStats()
return c
Expand All @@ -125,12 +133,12 @@ func (c *Client) Write(samples model.Samples) error {
}
defer txn.Rollback()

lids, err := c.WriteLabels(samples, txn)
err = c.WriteLabels(samples, txn)
if err != nil {
return err
}

err = c.WriteSamples(samples, txn, lids)
err = c.WriteSamples(samples, txn)
if err != nil {
return err
}
Expand All @@ -139,18 +147,17 @@ func (c *Client) Write(samples model.Samples) error {
return err
}

func (c *Client) WriteLabels(samples model.Samples, txn *sql.Tx) (map[string]string, error) {
func (c *Client) WriteLabels(samples model.Samples, txn *sql.Tx) error {
stmt, err := txn.Prepare(labels_update)
if err != nil {
level.Error(c.logger).Log("msg", "cannot prepare label statement", "err", err)
return nil, err
return err
}
defer stmt.Close()

lids := make(map[string]string)
for _, s := range samples {
l := s.Metric.String()
if _, ok := lids[l]; ok {
if c.cache.Contains(l) {
level.Debug(c.logger).Log("msg", "skipping duplicate labels", "labels", l)
continue
}
Expand All @@ -172,13 +179,13 @@ func (c *Client) WriteLabels(samples model.Samples, txn *sql.Tx) (map[string]str
continue
}

lids[l] = string(lid)
c.cache.Add(l, lid)
}

return lids, nil
return nil
}

func (c *Client) WriteSamples(samples model.Samples, txn *sql.Tx, lids map[string]string) error {
func (c *Client) WriteSamples(samples model.Samples, txn *sql.Tx) error {
stmt, err := txn.Prepare(pq.CopyIn("metric_samples", "time", "name", "value", "lid"))
if err != nil {
level.Error(c.logger).Log("msg", "cannot prepare sample statement", "err", err)
Expand All @@ -188,9 +195,9 @@ func (c *Client) WriteSamples(samples model.Samples, txn *sql.Tx, lids map[strin

for _, s := range samples {
k, l := c.parseMetric(s.Metric)
lid, ok := lids[l]
lid, ok := c.cache.Get(l)
if !ok {
level.Error(c.logger).Log("msg", "cannot write sample without labels", "name", k, "labels", l)
level.Warn(c.logger).Log("msg", "cannot write sample without labels", "name", k, "labels", l)
continue
}

Expand Down

0 comments on commit 2e58188

Please sign in to comment.