Skip to content

Commit

Permalink
Merge pull request grafana#1024 from grafana/cache-index-writes
Browse files Browse the repository at this point in the history
Cache index writes
  • Loading branch information
tomwilkie authored Oct 17, 2018
2 parents a6b3f72 + fe719d0 commit 9f7dc34
Show file tree
Hide file tree
Showing 17 changed files with 503 additions and 160 deletions.
8 changes: 4 additions & 4 deletions cache/background.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ type BackgroundConfig struct {
WriteBackBuffer int
}

// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *BackgroundConfig) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.WriteBackGoroutines, "memcache.write-back-goroutines", 10, "How many goroutines to use to write back to memcache.")
f.IntVar(&cfg.WriteBackBuffer, "memcache.write-back-buffer", 10000, "How many chunks to buffer for background write back.")
// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet
func (cfg *BackgroundConfig) RegisterFlagsWithPrefix(prefix string, description string, f *flag.FlagSet) {
f.IntVar(&cfg.WriteBackGoroutines, prefix+"memcache.write-back-goroutines", 10, description+"How many goroutines to use to write back to memcache.")
f.IntVar(&cfg.WriteBackBuffer, prefix+"memcache.write-back-buffer", 10000, description+"How many chunks to buffer for background write back.")
}

type backgroundCache struct {
Expand Down
50 changes: 40 additions & 10 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cache
import (
"context"
"flag"
"time"
)

// Cache byte arrays by key.
Expand All @@ -15,24 +16,36 @@ type Cache interface {
// Config for building Caches.
type Config struct {
EnableDiskcache bool
EnableFifoCache bool

DefaultValidity time.Duration

background BackgroundConfig
memcache MemcachedConfig
memcacheClient MemcachedClientConfig
diskcache DiskcacheConfig
fifocache FifoCacheConfig

// This is to name the cache metrics properly.
prefix string

// For tests to inject specific implementations.
Cache Cache
}

// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.EnableDiskcache, "cache.enable-diskcache", false, "Enable on-disk cache")
// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, description string, f *flag.FlagSet) {
cfg.background.RegisterFlagsWithPrefix(prefix, description, f)
cfg.memcache.RegisterFlagsWithPrefix(prefix, description, f)
cfg.memcacheClient.RegisterFlagsWithPrefix(prefix, description, f)
cfg.diskcache.RegisterFlagsWithPrefix(prefix, description, f)
cfg.fifocache.RegisterFlagsWithPrefix(prefix, description, f)

f.BoolVar(&cfg.EnableDiskcache, prefix+"cache.enable-diskcache", false, description+"Enable on-disk cache.")
f.BoolVar(&cfg.EnableFifoCache, prefix+"cache.enable-fifocache", false, description+"Enable in-memory cache.")
f.DurationVar(&cfg.DefaultValidity, prefix+"default-validity", 0, description+"The default validity of entries for caches unless overridden.")

cfg.background.RegisterFlags(f)
cfg.memcache.RegisterFlags(f)
cfg.memcacheClient.RegisterFlags(f)
cfg.diskcache.RegisterFlags(f)
cfg.prefix = prefix
}

// New creates a new Cache using Config.
Expand All @@ -43,23 +56,40 @@ func New(cfg Config) (Cache, error) {

caches := []Cache{}

if cfg.EnableFifoCache {
if cfg.fifocache.Validity == 0 && cfg.DefaultValidity != 0 {
cfg.fifocache.Validity = cfg.DefaultValidity
}

cache := NewFifoCache(cfg.prefix+"fifocache", cfg.fifocache)
caches = append(caches, Instrument(cfg.prefix+"fifocache", cache))
}

if cfg.EnableDiskcache {
cache, err := NewDiskcache(cfg.diskcache)
if err != nil {
return nil, err
}
caches = append(caches, NewBackground("diskcache", cfg.background, Instrument("diskcache", cache)))

cacheName := cfg.prefix + "diskcache"
caches = append(caches, NewBackground(cacheName, cfg.background, Instrument(cacheName, cache)))
}

if cfg.memcacheClient.Host != "" {
if cfg.memcache.Expiration == 0 && cfg.DefaultValidity != 0 {
cfg.memcache.Expiration = cfg.DefaultValidity
}

client := NewMemcachedClient(cfg.memcacheClient)
cache := NewMemcached(cfg.memcache, client)
caches = append(caches, NewBackground("memcache", cfg.background, Instrument("memcache", cache)))

cacheName := cfg.prefix + "memcache"
caches = append(caches, NewBackground(cacheName, cfg.background, Instrument(cacheName, cache)))
}

cache := NewTiered(caches)
if len(caches) > 1 {
cache = Instrument("tiered", cache)
cache = Instrument(cfg.prefix+"tiered", cache)
}
return cache, nil
}
2 changes: 1 addition & 1 deletion cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func TestDiskcache(t *testing.T) {
}

func TestFifoCache(t *testing.T) {
cache := cache.NewFifoCache("test", 1e3, 1*time.Hour)
cache := cache.NewFifoCache("test", cache.FifoCacheConfig{Size: 1e3, Validity: 1 * time.Hour})
testCache(t, cache)
}

Expand Down
9 changes: 7 additions & 2 deletions cache/diskcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,13 @@ type DiskcacheConfig struct {

// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *DiskcacheConfig) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.Path, "diskcache.path", "/var/run/chunks", "Path to file used to cache chunks.")
f.IntVar(&cfg.Size, "diskcache.size", 1024*1024*1024, "Size of file (bytes)")
cfg.RegisterFlagsWithPrefix("", "", f)
}

// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet
func (cfg *DiskcacheConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) {
f.StringVar(&cfg.Path, prefix+"diskcache.path", "/var/run/chunks", description+"Path to file used to cache chunks.")
f.IntVar(&cfg.Size, prefix+"diskcache.size", 1024*1024*1024, description+"Size of file (bytes)")
}

// Diskcache is an on-disk chunk cache.
Expand Down
26 changes: 19 additions & 7 deletions cache/fifo_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cache

import (
"context"
"flag"
"sync"
"time"

Expand Down Expand Up @@ -53,6 +54,18 @@ var (
}, []string{"cache"})
)

// FifoCacheConfig holds config for the FifoCache.
type FifoCacheConfig struct {
Size int
Validity time.Duration
}

// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet
func (cfg *FifoCacheConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) {
f.IntVar(&cfg.Size, prefix+"fifocache.size", 0, description+"The number of entries to cache.")
f.DurationVar(&cfg.Validity, prefix+"fifocache.duration", 0, description+"The expiry duration for the cache.")
}

// FifoCache is a simple string -> interface{} cache which uses a fifo slide to
// manage evictions. O(1) inserts and updates, O(1) gets.
type FifoCache struct {
Expand Down Expand Up @@ -82,12 +95,12 @@ type cacheEntry struct {
}

// NewFifoCache returns a new initialised FifoCache of size.
func NewFifoCache(name string, size int, validity time.Duration) *FifoCache {
func NewFifoCache(name string, cfg FifoCacheConfig) *FifoCache {
return &FifoCache{
size: size,
validity: validity,
entries: make([]cacheEntry, 0, size),
index: make(map[string]int, size),
size: cfg.Size,
validity: cfg.Validity,
entries: make([]cacheEntry, 0, cfg.Size),
index: make(map[string]int, cfg.Size),

name: name,
entriesAdded: cacheEntriesAdded.WithLabelValues(name),
Expand Down Expand Up @@ -216,8 +229,7 @@ func (c *FifoCache) Get(ctx context.Context, key string) (interface{}, bool) {
index, ok := c.index[key]
if ok {
updated := c.entries[index].updated
if time.Now().Sub(updated) < c.validity {

if c.validity == 0 || time.Now().Sub(updated) < c.validity {
return c.entries[index].value, true
}

Expand Down
4 changes: 2 additions & 2 deletions cache/fifo_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const size = 10
const overwrite = 5

func TestFifoCache(t *testing.T) {
c := NewFifoCache("test", size, 1*time.Minute)
c := NewFifoCache("test", FifoCacheConfig{Size: size, Validity: 1 * time.Minute})
ctx := context.Background()

// Check put / get works
Expand Down Expand Up @@ -74,7 +74,7 @@ func TestFifoCache(t *testing.T) {
}

func TestFifoCacheExpiry(t *testing.T) {
c := NewFifoCache("test", size, 5*time.Millisecond)
c := NewFifoCache("test", FifoCacheConfig{Size: size, Validity: 5 * time.Millisecond})
ctx := context.Background()

c.Put(ctx, []string{"0"}, []interface{}{0})
Expand Down
10 changes: 5 additions & 5 deletions cache/memcached.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ type MemcachedConfig struct {
Parallelism int
}

// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *MemcachedConfig) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.Expiration, "memcached.expiration", 0, "How long keys stay in the memcache.")
f.IntVar(&cfg.BatchSize, "memcached.batchsize", 0, "How many keys to fetch in each batch.")
f.IntVar(&cfg.Parallelism, "memcached.parallelism", 100, "Maximum active requests to memcache.")
// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet
func (cfg *MemcachedConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) {
f.DurationVar(&cfg.Expiration, prefix+"memcached.expiration", 0, description+"How long keys stay in the memcache.")
f.IntVar(&cfg.BatchSize, prefix+"memcached.batchsize", 0, description+"How many keys to fetch in each batch.")
f.IntVar(&cfg.Parallelism, prefix+"memcached.parallelism", 100, description+"Maximum active requests to memcache.")
}

// Memcached type caches chunks in memcached
Expand Down
23 changes: 5 additions & 18 deletions cache/memcached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,12 @@ type MemcachedClientConfig struct {
UpdateInterval time.Duration
}

// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *MemcachedClientConfig) RegisterFlags(f *flag.FlagSet) {
cfg.registerFlagsWithPrefix("", f)
}

// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet
func (cfg *MemcachedClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.registerFlagsWithPrefix(prefix, f)
}

func (cfg *MemcachedClientConfig) registerFlagsWithPrefix(prefix string, f *flag.FlagSet) {
if prefix != "" {
prefix = prefix + "."
}

f.StringVar(&cfg.Host, prefix+"memcached.hostname", "", "Hostname for memcached service to use when caching chunks. If empty, no memcached will be used.")
f.StringVar(&cfg.Service, prefix+"memcached.service", "memcached", "SRV service used to discover memcache servers.")
f.DurationVar(&cfg.Timeout, prefix+"memcached.timeout", 100*time.Millisecond, "Maximum time to wait before giving up on memcached requests.")
f.DurationVar(&cfg.UpdateInterval, prefix+"memcached.update-interval", 1*time.Minute, "Period with which to poll DNS for memcache servers.")
func (cfg *MemcachedClientConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) {
f.StringVar(&cfg.Host, prefix+"memcached.hostname", "", description+"Hostname for memcached service to use when caching chunks. If empty, no memcached will be used.")
f.StringVar(&cfg.Service, prefix+"memcached.service", "memcached", description+"SRV service used to discover memcache servers.")
f.DurationVar(&cfg.Timeout, prefix+"memcached.timeout", 100*time.Millisecond, description+"Maximum time to wait before giving up on memcached requests.")
f.DurationVar(&cfg.UpdateInterval, prefix+"memcached.update-interval", 1*time.Minute, description+"Period with which to poll DNS for memcache servers.")
}

// NewMemcachedClient creates a new MemcacheClient that gets its server list
Expand Down
18 changes: 11 additions & 7 deletions chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
Expand All @@ -22,7 +23,7 @@ import (
)

var (
indexEntriesPerChunk = prometheus.NewHistogram(prometheus.HistogramOpts{
indexEntriesPerChunk = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "chunk_store_index_entries_per_chunk",
Help: "Number of entries written to storage per chunk.",
Expand All @@ -37,33 +38,36 @@ var (
},
HashBuckets: 1024,
})
cacheCorrupt = prometheus.NewCounter(prometheus.CounterOpts{
cacheCorrupt = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "cortex",
Name: "cache_corrupt_chunks_total",
Help: "Total count of corrupt chunks found in cache.",
})
)

func init() {
prometheus.MustRegister(indexEntriesPerChunk)
prometheus.MustRegister(rowWrites)
prometheus.MustRegister(cacheCorrupt)
}

// StoreConfig specifies config for a ChunkStore
type StoreConfig struct {
CacheConfig cache.Config
ChunkCacheConfig cache.Config

MinChunkAge time.Duration
QueryChunkLimit int
CardinalityCacheSize int
CardinalityCacheValidity time.Duration
CardinalityLimit int

WriteDedupeCacheConfig cache.Config
}

// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *StoreConfig) RegisterFlags(f *flag.FlagSet) {
cfg.CacheConfig.RegisterFlags(f)
cfg.ChunkCacheConfig.RegisterFlagsWithPrefix("", "Cache config for chunks. ", f)

cfg.WriteDedupeCacheConfig.RegisterFlagsWithPrefix("store.index-cache-write.", "Cache config for index entry writing. ", f)

f.DurationVar(&cfg.MinChunkAge, "store.min-chunk-age", 0, "Minimum time between chunk update and being saved to the store.")
f.IntVar(&cfg.QueryChunkLimit, "store.query-chunk-limit", 2e6, "Maximum number of chunks that can be fetched in a single query.")
f.IntVar(&cfg.CardinalityCacheSize, "store.cardinality-cache-size", 0, "Size of in-memory cardinality cache, 0 to disable.")
Expand All @@ -81,7 +85,7 @@ type store struct {
}

func newStore(cfg StoreConfig, schema Schema, storage StorageClient) (Store, error) {
fetcher, err := NewChunkFetcher(cfg.CacheConfig, storage)
fetcher, err := NewChunkFetcher(cfg.ChunkCacheConfig, storage)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 9f7dc34

Please sign in to comment.