Skip to content

Commit

Permalink
Merge pull request #363 from glightfoot/cache-interface
Browse files Browse the repository at this point in the history
Better mapper cache interface
  • Loading branch information
matthiasr authored Mar 26, 2021
2 parents 8b496df + a197834 commit 12281a9
Show file tree
Hide file tree
Showing 13 changed files with 395 additions and 274 deletions.
4 changes: 2 additions & 2 deletions bridge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ func TestHandlePacket(t *testing.T) {
le := len(events)
// Flatten actual events.
actual := event.Events{}
for i := 0; i < le; i++ {
for j := 0; j < le; j++ {
actual = append(actual, <-events...)
}

Expand Down Expand Up @@ -650,7 +650,7 @@ mappings:
`
// Create mapper from config and start an Exporter with a synchronous channel
testMapper := &mapper.MetricMapper{}
err := testMapper.InitFromYAMLString(config, 0)
err := testMapper.InitFromYAMLString(config)
if err != nil {
t.Fatalf("Config load error: %s %s", config, err)
}
Expand Down
3 changes: 2 additions & 1 deletion exporter_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/go-kit/kit/log"

"github.com/prometheus/client_golang/prometheus"

"github.com/prometheus/statsd_exporter/pkg/event"
"github.com/prometheus/statsd_exporter/pkg/exporter"
"github.com/prometheus/statsd_exporter/pkg/line"
Expand Down Expand Up @@ -167,7 +168,7 @@ mappings:
`

testMapper := &mapper.MetricMapper{}
err := testMapper.InitFromYAMLString(config, 0)
err := testMapper.InitFromYAMLString(config)
if err != nil {
b.Fatalf("Config load error: %s %s", config, err)
}
Expand Down
58 changes: 43 additions & 15 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import (
"github.com/prometheus/statsd_exporter/pkg/line"
"github.com/prometheus/statsd_exporter/pkg/listener"
"github.com/prometheus/statsd_exporter/pkg/mapper"
"github.com/prometheus/statsd_exporter/pkg/mappercache/lru"
"github.com/prometheus/statsd_exporter/pkg/mappercache/randomreplacement"
)

const (
Expand Down Expand Up @@ -206,7 +208,7 @@ func serveHTTP(mux http.Handler, listenAddress string, logger log.Logger) {
os.Exit(1)
}

func sighupConfigReloader(fileName string, mapper *mapper.MetricMapper, cacheSize int, logger log.Logger, option mapper.CacheOption) {
func sighupConfigReloader(fileName string, mapper *mapper.MetricMapper, logger log.Logger) {
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGHUP)

Expand All @@ -218,12 +220,12 @@ func sighupConfigReloader(fileName string, mapper *mapper.MetricMapper, cacheSiz

level.Info(logger).Log("msg", "Received signal, attempting reload", "signal", s)

reloadConfig(fileName, mapper, cacheSize, logger, option)
reloadConfig(fileName, mapper, logger)
}
}

func reloadConfig(fileName string, mapper *mapper.MetricMapper, cacheSize int, logger log.Logger, option mapper.CacheOption) {
err := mapper.InitFromFile(fileName, cacheSize, option)
func reloadConfig(fileName string, mapper *mapper.MetricMapper, logger log.Logger) {
err := mapper.InitFromFile(fileName)
if err != nil {
level.Info(logger).Log("msg", "Error reloading config", "error", err)
configLoads.WithLabelValues("failure").Inc()
Expand All @@ -247,6 +249,29 @@ func dumpFSM(mapper *mapper.MetricMapper, dumpFilename string, logger log.Logger
return nil
}

func getCache(cacheSize int, cacheType string, registerer prometheus.Registerer) (mapper.MetricMapperCache, error) {
var cache mapper.MetricMapperCache
var err error
if cacheSize == 0 {
return nil, nil
} else {
switch cacheType {
case "lru":
cache, err = lru.NewMetricMapperLRUCache(registerer, cacheSize)
case "random":
cache, err = randomreplacement.NewMetricMapperRRCache(registerer, cacheSize)
default:
err = fmt.Errorf("unsupported cache type %q", cacheType)
}

if err != nil {
return nil, err
}
}

return cache, nil
}

func main() {
var (
listenAddress = kingpin.Flag("web.listen-address", "The address on which to expose the web interface and generated Prometheus metrics.").Default(":9102").String()
Expand Down Expand Up @@ -293,36 +318,40 @@ func main() {
parser.EnableSignalFXParsing()
}

cacheOption := mapper.WithCacheType(*cacheType)

level.Info(logger).Log("msg", "Starting StatsD -> Prometheus Exporter", "version", version.Info())
level.Info(logger).Log("msg", "Build context", "context", version.BuildContext())

events := make(chan event.Events, *eventQueueSize)
defer close(events)
eventQueue := event.NewEventQueue(events, *eventFlushThreshold, *eventFlushInterval, eventsFlushed)

mapper := &mapper.MetricMapper{Registerer: prometheus.DefaultRegisterer, MappingsCount: mappingsCount}
thisMapper := &mapper.MetricMapper{Registerer: prometheus.DefaultRegisterer, MappingsCount: mappingsCount}

cache, err := getCache(*cacheSize, *cacheType, thisMapper.Registerer)
if err != nil {
level.Error(logger).Log("msg", "Unable to setup metric mapper cache", "error", err)
os.Exit(1)
}
thisMapper.UseCache(cache)

if *mappingConfig != "" {
err := mapper.InitFromFile(*mappingConfig, *cacheSize, cacheOption)
err := thisMapper.InitFromFile(*mappingConfig)
if err != nil {
level.Error(logger).Log("msg", "error loading config", "error", err)
os.Exit(1)
}
if *dumpFSMPath != "" {
err := dumpFSM(mapper, *dumpFSMPath, logger)
err := dumpFSM(thisMapper, *dumpFSMPath, logger)
if err != nil {
level.Error(logger).Log("msg", "error dumping FSM", "error", err)
// Failure to dump the FSM is an error (the user asked for it and it
// didn't happen) but not fatal (the exporter is fully functional
// afterwards).
}
}
} else {
mapper.InitCache(*cacheSize, cacheOption)
}

exporter := exporter.NewExporter(prometheus.DefaultRegisterer, mapper, logger, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
exporter := exporter.NewExporter(prometheus.DefaultRegisterer, thisMapper, logger, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)

if *checkConfig {
level.Info(logger).Log("msg", "Configuration check successful, exiting")
Expand Down Expand Up @@ -463,7 +492,6 @@ func main() {
}
}
}

}

mux := http.NewServeMux()
Expand All @@ -489,7 +517,7 @@ func main() {
return
}
level.Info(logger).Log("msg", "Received lifecycle api reload, attempting reload")
reloadConfig(*mappingConfig, mapper, *cacheSize, logger, cacheOption)
reloadConfig(*mappingConfig, thisMapper, logger)
}
})
mux.HandleFunc("/-/quit", func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -518,7 +546,7 @@ func main() {

go serveHTTP(mux, *listenAddress, logger)

go sighupConfigReloader(*mappingConfig, mapper, *cacheSize, logger, cacheOption)
go sighupConfigReloader(*mappingConfig, thisMapper, logger)
go exporter.Listen(events)

signals := make(chan os.Signal, 1)
Expand Down
1 change: 0 additions & 1 deletion pkg/event/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,5 +84,4 @@ func TestEventIntervalFlush(t *testing.T) {
if len(events) != 10 {
t.Fatal("Expected 10 events in the event channel, but got", len(events))
}

}
2 changes: 0 additions & 2 deletions pkg/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ type Exporter struct {
// Listen handles all events sent to the given channel sequentially. It
// terminates when the channel is closed.
func (b *Exporter) Listen(e <-chan event.Events) {

removeStaleMetricsTicker := clock.NewTicker(time.Second)

for {
Expand All @@ -77,7 +76,6 @@ func (b *Exporter) Listen(e <-chan event.Events) {

// handleEvent processes a single Event according to the configured mapping.
func (b *Exporter) handleEvent(thisEvent event.Event) {

mapping, labels, present := b.Mapper.GetMapping(thisEvent.MetricName(), thisEvent.MetricType())
if mapping == nil {
mapping = &mapper.MetricMapping{}
Expand Down
15 changes: 5 additions & 10 deletions pkg/exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ func TestNegativeCounter(t *testing.T) {
prev := getTelemetryCounterValue(errorCounter)

testMapper := mapper.MetricMapper{}
testMapper.InitCache(0)

ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)
Expand Down Expand Up @@ -260,7 +259,7 @@ mappings:
name: "histogram_test"
`
testMapper := &mapper.MetricMapper{}
err := testMapper.InitFromYAMLString(config, 0)
err := testMapper.InitFromYAMLString(config)
if err != nil {
t.Fatalf("Config load error: %s %s", config, err)
}
Expand Down Expand Up @@ -318,7 +317,7 @@ mappings:
`

testMapper := &mapper.MetricMapper{}
err := testMapper.InitFromYAMLString(config, 0)
err := testMapper.InitFromYAMLString(config)
if err != nil {
t.Fatalf("Config load error: %s %s", config, err)
}
Expand Down Expand Up @@ -528,7 +527,7 @@ mappings:
for _, s := range scenarios {
t.Run(s.name, func(t *testing.T) {
testMapper := &mapper.MetricMapper{}
err := testMapper.InitFromYAMLString(config, 0)
err := testMapper.InitFromYAMLString(config)
if err != nil {
t.Fatalf("Config load error: %s %s", config, err)
}
Expand Down Expand Up @@ -585,7 +584,7 @@ mappings:
name: "${1}"
`
testMapper := &mapper.MetricMapper{}
err := testMapper.InitFromYAMLString(config, 0)
err := testMapper.InitFromYAMLString(config)
if err != nil {
t.Fatalf("Config load error: %s %s", config, err)
}
Expand Down Expand Up @@ -658,7 +657,6 @@ func TestInvalidUtf8InDatadogTagValue(t *testing.T) {
}()

testMapper := mapper.MetricMapper{}
testMapper.InitCache(0)

ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)
Expand All @@ -672,7 +670,6 @@ func TestSummaryWithQuantilesEmptyMapping(t *testing.T) {
events := make(chan event.Events)
go func() {
testMapper := mapper.MetricMapper{}
testMapper.InitCache(0)

ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)
Expand Down Expand Up @@ -717,7 +714,6 @@ func TestHistogramUnits(t *testing.T) {
events := make(chan event.Events)
go func() {
testMapper := mapper.MetricMapper{}
testMapper.InitCache(0)
ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Mapper.Defaults.ObserverType = mapper.ObserverTypeHistogram
ex.Listen(events)
Expand Down Expand Up @@ -754,7 +750,6 @@ func TestCounterIncrement(t *testing.T) {
events := make(chan event.Events)
go func() {
testMapper := mapper.MetricMapper{}
testMapper.InitCache(0)
ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)
}()
Expand Down Expand Up @@ -857,7 +852,7 @@ mappings:
`
// Create mapper from config and start an Exporter with a synchronous channel
testMapper := &mapper.MetricMapper{}
err := testMapper.InitFromYAMLString(config, 0)
err := testMapper.InitFromYAMLString(config)
if err != nil {
t.Fatalf("Config load error: %s %s", config, err)
}
Expand Down
Loading

0 comments on commit 12281a9

Please sign in to comment.