Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using cuckoo filter for new metric detection instead of cache #590

Merged
merged 5 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ max-size = 1000000
# "noop" - pick metrics to write in unspecified order,
# requires least CPU and improves cache responsiveness
write-strategy = "max"
# If > 0 use bloom filter to detect new metrics instead of cache
bloom-size = 0

[udp]
listen = ":2003"
Expand Down
55 changes: 49 additions & 6 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"sync/atomic"
"time"

cuckoo "github.com/seiflotfy/cuckoofilter"

"github.com/go-graphite/go-carbon/helper"
"github.com/go-graphite/go-carbon/points"
"github.com/go-graphite/go-carbon/tags"
Expand Down Expand Up @@ -59,6 +61,7 @@ type Cache struct {
}

newMetricsChan chan string
newMetricCf *cuckoo.Filter

throttle func(ps *points.Points, inCache bool) bool
}
Expand Down Expand Up @@ -95,6 +98,7 @@ func New() *Cache {
c.settings.Store(&settings)

c.writeoutQueue = NewWriteoutQueue(c)
c.newMetricCf = nil
return c
}

Expand Down Expand Up @@ -130,6 +134,13 @@ func (c *Cache) SetMaxSize(maxSize uint32) {
c.settings.Store(&newSettings)
}

// SetBloomSize of bloom filter
func (c *Cache) SetBloomSize(bloomSize uint) {
if bloomSize > 0 {
c.newMetricCf = cuckoo.NewFilter(bloomSize)
}
}

func (c *Cache) SetTagsEnabled(value bool) {
s := c.settings.Load().(*cacheSettings)
newSettings := *s
Expand All @@ -149,6 +160,10 @@ func (c *Cache) Stat(send helper.StatCallback) {
send("metrics", float64(c.Len()))
send("maxSize", float64(s.maxSize))
send("notConfirmed", float64(c.NotConfirmedLength()))
// report elements in bloom filter
if c.newMetricCf != nil {
send("cfCount", float64(c.newMetricCf.Count()))
}

helper.SendAndSubstractUint32("queries", &c.stat.queryCnt, send)
helper.SendAndSubstractUint32("tagsNormalizeErrors", &c.stat.tagsNormalizeErrors, send)
Expand Down Expand Up @@ -259,6 +274,15 @@ func (c *Cache) DivertToXlog(w io.Writer) {
c.settings.Store(&newSettings)
}

// send metric to the new metrics channel
func sendMetricToNewMetricChan(c *Cache, metric string) {
select {
case c.newMetricsChan <- metric:
default:
atomic.AddUint32(&c.stat.droppedRealtimeIndex, 1)
}
}

// Sets the given value under the specified key.
func (c *Cache) Add(p *points.Points) {
s := c.settings.Load().(*cacheSettings)
Expand Down Expand Up @@ -301,15 +325,22 @@ func (c *Cache) Add(p *points.Points) {
if shard.adds != nil {
shard.adds[p.Metric] = struct{}{}
}
if c.newMetricsChan != nil {
select {
case c.newMetricsChan <- p.Metric:
default:
atomic.AddUint32(&c.stat.droppedRealtimeIndex, 1)
}
// if no bloom filter - just add metric to new channel
// if missed in cache, as it was before
if c.newMetricsChan != nil && c.newMetricCf == nil {
sendMetricToNewMetricChan(c, p.Metric)
}
}

// if we have both new metric channel and bloom filter
if c.newMetricsChan != nil && c.newMetricCf != nil {
// add metric to new metric channel if missed in bloom
// despite what we have it in cache (new behaviour)
if !c.newMetricCf.Lookup([]byte(p.Metric)) {
sendMetricToNewMetricChan(c, p.Metric)
}
c.newMetricCf.Insert([]byte(p.Metric))
}
atomic.AddInt32(&c.stat.size, int32(count))
}

Expand All @@ -322,6 +353,12 @@ func (c *Cache) Pop(key string) (p *points.Points, exists bool) {
delete(shard.items, key)
shard.Unlock()

// we probably can skip that, but I'm a bit worry
// of effectiveness of bloom filter over time
if c.newMetricsChan != nil && c.newMetricCf != nil {
c.newMetricCf.Delete([]byte(p.Metric))
}

if exists {
atomic.AddInt32(&c.stat.size, -int32(len(p.Data)))
}
Expand All @@ -346,6 +383,12 @@ func (c *Cache) PopNotConfirmed(key string) (p *points.Points, exists bool) {
}
shard.Unlock()

// we probably can skip that, but I'm a bit worry
// of effectiveness of bloom filter over time
if c.newMetricsChan != nil && c.newMetricCf != nil {
c.newMetricCf.Delete([]byte(p.Metric))
}

if exists {
atomic.AddInt32(&c.stat.size, -int32(len(p.Data)))
}
Expand Down
16 changes: 16 additions & 0 deletions cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,28 @@ func TestCache(t *testing.T) {

c := New()

// init new metric channel
ch := make(chan string, 3)
c.SetNewMetricsChan(ch)
// set bloom size
c.SetBloomSize(3)

c.Add(points.OnePoint("hello.world", 42, 10))

if c.Size() != 1 {
t.FailNow()
}

// check if new metric added to bloom filter
if c.newMetricCf.Count() != 1 {
t.FailNow()
}

// check if new metric added to new metric channel
if len(c.newMetricsChan) != 1 {
t.FailNow()
}

c.Add(points.OnePoint("hello.world", 15, 12))

if c.Size() != 2 {
Expand Down
2 changes: 2 additions & 0 deletions carbon/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ func (app *App) ReloadConfig() error {
app.Cache.SetMaxSize(app.Config.Cache.MaxSize)
app.Cache.SetWriteStrategy(app.Config.Cache.WriteStrategy)
app.Cache.SetTagsEnabled(app.Config.Tags.Enabled)
app.Cache.SetBloomSize(app.Config.Cache.BloomSize)

if app.Persister != nil {
app.Persister.Stop()
Expand Down Expand Up @@ -349,6 +350,7 @@ func (app *App) Start() (err error) {
core.SetMaxSize(conf.Cache.MaxSize)
core.SetWriteStrategy(conf.Cache.WriteStrategy)
core.SetTagsEnabled(conf.Tags.Enabled)
core.SetBloomSize(conf.Cache.BloomSize)

app.Cache = core

Expand Down
1 change: 1 addition & 0 deletions carbon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type whisperConfig struct {
type cacheConfig struct {
MaxSize uint32 `toml:"max-size"`
WriteStrategy string `toml:"write-strategy"`
BloomSize uint `toml:"bloom-size"`
}

type carbonlinkConfig struct {
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ require (
)

require (
github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb
golang.org/x/net v0.25.0
google.golang.org/protobuf v1.34.1
)
Expand All @@ -47,6 +48,7 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-metro v0.0.0-20211217172704-adc40b04c140 // indirect
github.com/eapache/go-resiliency v1.6.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-expirecache v0.0.0-20170314133854-743ef98b2adb h1:X9MwMz6mVZEWcbhsri5TwaCm/Q4USFdAAmy1T7RCGjw=
github.com/dgryski/go-expirecache v0.0.0-20170314133854-743ef98b2adb/go.mod h1:pD/+9DfmmQ+xvOI1fxUltHV69BxC1aeTILPQg9Kw1hE=
github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165/go.mod h1:c9O8+fpSOX1DM8cPNSkX/qsBWdkD4yd2dpciOWQjpBw=
github.com/dgryski/go-metro v0.0.0-20211217172704-adc40b04c140 h1:y7y0Oa6UawqTFPCDw9JG6pdKt4F9pAhHv0B7FMGaGD0=
github.com/dgryski/go-metro v0.0.0-20211217172704-adc40b04c140/go.mod h1:c9O8+fpSOX1DM8cPNSkX/qsBWdkD4yd2dpciOWQjpBw=
github.com/dgryski/go-trigram v0.0.0-20160407183937-79ec494e1ad0 h1:b+7JSiBM+hnLQjP/lXztks5hnLt1PS46hktG9VOJgzo=
github.com/dgryski/go-trigram v0.0.0-20160407183937-79ec494e1ad0/go.mod h1:qzKC/DpcxK67zaSHdCmIv3L9WJViHVinYXN2S7l3RM8=
github.com/dgryski/httputil v0.0.0-20160116060654-189c2918cd08 h1:BGzXzhmOgLHlylvQ27Tcgz235JvonPEgdMtpaZaeZt0=
Expand Down Expand Up @@ -208,6 +211,8 @@ github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5X
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb h1:XfLJSPIOUX+osiMraVgIrMR27uMXnRJWGm1+GL8/63U=
github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb/go.mod h1:bR6DqgcAl1zTcOX8/pE2Qkj9XO00eCNqmKb7lXP8EAg=
github.com/sevlyar/go-daemon v0.1.6 h1:EUh1MDjEM4BI109Jign0EaknA2izkOyi0LV3ro3QQGs=
github.com/sevlyar/go-daemon v0.1.6/go.mod h1:6dJpPatBT9eUwM5VCw9Bt6CdX9Tk6UWvhW3MebLDRKE=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down Expand Up @@ -410,6 +415,7 @@ gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Expand Down
24 changes: 24 additions & 0 deletions vendor/github.com/dgryski/go-metro/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions vendor/github.com/dgryski/go-metro/README

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading