Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] Feat/go metrics #2

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/dustin/go-humanize v1.0.0
github.com/go-redis/redis/v7 v7.2.0
github.com/prometheus/client_golang v1.5.1
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0
go.uber.org/zap v1.14.1
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/procfs v0.0.8 h1:+fpWZdT24pJBiqJdAwYBjPSk+5YmQzYNPYzQsdzLkt8=
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 h1:MkV+77GLUNo5oJ0jf870itWm3D0Sjh7+Za9gazKc5LQ=
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
Expand Down
10 changes: 10 additions & 0 deletions runtime/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,13 @@ func (l *logger) SLogger() *zap.SugaredLogger {
func (l *logger) Loggers() (*zap.Logger, *zap.SugaredLogger) {
return l.logger, l.slogger
}

type RunenvLoggerWriter struct {
runenv *RunEnv
namespace string
}

func (r *RunenvLoggerWriter) Write(p []byte) (n int, err error) {
r.runenv.RecordJsonMetric(r.namespace, p)
return len(p), nil
}
177 changes: 39 additions & 138 deletions runtime/metrics.go
Original file line number Diff line number Diff line change
@@ -1,171 +1,72 @@
package runtime

import (
"context"
"io"
"net/http"
"os"
"path"
"strconv"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/rcrowley/go-metrics"
)

// Type aliases to hide implementation details in the APIs.
type (
Counter = prometheus.Counter
Gauge = prometheus.Gauge
Histogram = prometheus.Histogram
Summary = prometheus.Summary

CounterOpts = prometheus.CounterOpts
GaugeOpts = prometheus.GaugeOpts
HistogramOpts = prometheus.HistogramOpts
SummaryOpts = prometheus.SummaryOpts

CounterVec = prometheus.CounterVec
GaugeVec = prometheus.GaugeVec
HistogramVec = prometheus.HistogramVec
SummaryVec = prometheus.SummaryVec
Counter = metrics.Counter
EWMA = metrics.EWMA
Gauge = metrics.Gauge
GaugeFloat64 = metrics.GaugeFloat64
Histogram = metrics.Histogram
Meter = metrics.Meter
Sample = metrics.Sample
Timer = metrics.Timer
)

type Metrics struct {
runenv *RunEnv
}

func (*Metrics) NewCounter(o CounterOpts) Counter {
m := prometheus.NewCounter(o)
switch err := prometheus.Register(m); err.(type) {
case nil, prometheus.AlreadyRegisteredError:
default:
panic(err)
}
return m
func (*Metrics) NewCounter(name string) Counter {
return metrics.GetOrRegister(name, metrics.NewCounter()).(metrics.Counter)
}

func (*Metrics) NewGauge(o GaugeOpts) Gauge {
m := prometheus.NewGauge(o)
switch err := prometheus.Register(m); err.(type) {
case nil, prometheus.AlreadyRegisteredError:
default:
panic(err)
}
return m
func (*Metrics) NewEWMA(name string, alpha float64) EWMA {
return metrics.GetOrRegister(name, metrics.NewEWMA(alpha)).(metrics.EWMA)
}

func (*Metrics) NewHistogram(o HistogramOpts) Histogram {
m := prometheus.NewHistogram(o)
switch err := prometheus.Register(m); err.(type) {
case nil, prometheus.AlreadyRegisteredError:
default:
panic(err)
}
return m
func (*Metrics) NewGauge(name string) Gauge {
return metrics.GetOrRegister(name, metrics.NewGauge()).(metrics.Gauge)
}

func (*Metrics) NewSummary(o SummaryOpts) Summary {
m := prometheus.NewSummary(o)
switch err := prometheus.Register(m); err.(type) {
case nil, prometheus.AlreadyRegisteredError:
default:
panic(err)
}
return m
func (*Metrics) NewGaugeFloat64(name string) GaugeFloat64 {
return metrics.GetOrRegister(name, metrics.NewGaugeFloat64()).(metrics.GaugeFloat64)
}

func (*Metrics) NewCounterVec(o CounterOpts, labels ...string) *CounterVec {
m := prometheus.NewCounterVec(o, labels)
switch err := prometheus.Register(m); err.(type) {
case nil, prometheus.AlreadyRegisteredError:
default:
panic(err)
}
return m
func (*Metrics) NewFunctionalGauge(f func() int64) Gauge {
return metrics.NewFunctionalGauge(f)
}

func (*Metrics) NewGaugeVec(o GaugeOpts, labels ...string) *GaugeVec {
m := prometheus.NewGaugeVec(o, labels)
switch err := prometheus.Register(m); err.(type) {
case nil, prometheus.AlreadyRegisteredError:
default:
panic(err)
}
return m
func (*Metrics) NewFunctionalGaugeFloat64(f func() float64) GaugeFloat64 {
return metrics.NewFunctionalGaugeFloat64(f)
}

func (*Metrics) NewHistogramVec(o HistogramOpts, labels ...string) *HistogramVec {
m := prometheus.NewHistogramVec(o, labels)
switch err := prometheus.Register(m); err.(type) {
case nil, prometheus.AlreadyRegisteredError:
default:
panic(err)
}
return m
func (*Metrics) NewHistogram(name string, s metrics.Sample) Histogram {
return metrics.GetOrRegister(name, metrics.NewHistogram(s)).(metrics.Histogram)
}

func (*Metrics) NewSummaryVec(o SummaryOpts, labels ...string) *SummaryVec {
m := prometheus.NewSummaryVec(o, labels)
switch err := prometheus.Register(m); err.(type) {
case nil, prometheus.AlreadyRegisteredError:
default:
panic(err)
}
return m
func (*Metrics) NewMeter(name string) Meter {
return metrics.GetOrRegister(name, metrics.NewMeter()).(metrics.Meter)
}

// HTTPPeriodicSnapshots periodically fetches the snapshots from the given address
// and outputs them to the out directory. Every file will be in the format timestamp.out.
func (re *RunEnv) HTTPPeriodicSnapshots(ctx context.Context, addr string, dur time.Duration, outDir string) error {
err := os.MkdirAll(path.Join(re.TestOutputsPath, outDir), 0777)
if err != nil {
return err
}

nextFile := func() (*os.File, error) {
timestamp := strconv.FormatInt(time.Now().Unix(), 10)
return os.Create(path.Join(re.TestOutputsPath, outDir, timestamp+".out"))
}

go func() {
ticker := time.NewTicker(dur)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
func() {
req, err := http.NewRequestWithContext(ctx, "GET", addr, nil)
if err != nil {
re.RecordMessage("error while creating http request: %v", err)
return
}

resp, err := http.DefaultClient.Do(req)
if err != nil {
re.RecordMessage("error while scraping http endpoint: %v", err)
return
}
defer resp.Body.Close()

file, err := nextFile()
if err != nil {
re.RecordMessage("error while getting metrics output file: %v", err)
return
}
defer file.Close()

_, err = io.Copy(file, resp.Body)
if err != nil {
re.RecordMessage("error while copying data to file: %v", err)
return
}
}()
}
}
}()

return nil
func (*Metrics) NewExpDecaySample(name string, reservoirSize int, alpha float64) Sample {
return metrics.NewExpDecaySample(reservoirSize, alpha)
}

func (*Metrics) NewUniformSample(reservoirSize int) Sample {
return metrics.NewUniformSample(reservoirSize)
}

func (*Metrics) NewTimer(name string) Timer {
return metrics.GetOrRegister(name, metrics.NewTimer()).(metrics.Timer)
}

func (*Metrics) WriteJson(duration time.Duration, w io.Writer) {
metrics.WriteJSON(metrics.DefaultRegistry, duration, w)
}
37 changes: 37 additions & 0 deletions runtime/output.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package runtime

import (
"encoding/json"
"fmt"
"runtime/debug"

Expand Down Expand Up @@ -184,6 +185,42 @@ func (l *logger) RecordMetric(metric *MetricDefinition, value float64) {
l.logger.Info("", zap.Object("event", evt))
}

// RecordNamespaced records binary data into the log, keyed by the provided namespace
func (l *logger) RecordJsonMetric(namespace string, msg []byte) {
// The schema is a map[string]map[string]<number_type>
// The number type will have to be converted into float64.
unk := make(map[string]interface{})
json.Unmarshal(msg, &unk)
var evt Event
for metric_name, v := range unk {
vmap := v.(map[string]interface{})
for unit, number := range vmap {
var value float64
switch number.(type) {
case float64:
value = number.(float64)
case int64:
value = float64(number.(int64))
}
// I don't know what the positive direction should be
metricDef := MetricDefinition{
Name: metric_name,
Unit: unit,
}
evt = Event{
Type: EventTypeMetric,
Metric: &MetricValue{
MetricDefinition: metricDef,
Value: value,
},
}
}
l.logger.Info("", zap.Object(namespace, evt))
}
// Another way to do this, just output in json-log format
// l.logger.Info("", zap.Binary(namespace, msg))
}

// Message prints out an informational message.
//
// Deprecated: use RecordMessage.
Expand Down
20 changes: 20 additions & 0 deletions runtime/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"runtime/debug"
"strings"
"time"

"net/http"
_ "net/http/pprof"
Expand Down Expand Up @@ -48,6 +49,7 @@ func Invoke(tc TestCaseFn) {
defer runenv.Close()

setupHTTPListener(runenv)
setupMetrics(runenv)

runenv.RecordStart()

Expand Down Expand Up @@ -129,3 +131,21 @@ func setupHTTPListener(runenv *RunEnv) {
_ = http.Serve(l, nil)
}()
}

func setupMetrics(runenv *RunEnv) (err error) {
// A raw file for writing json data
mfile, err := runenv.CreateRawAsset("metrics.out")
if err != nil {
runenv.RecordCrash(err)
return
}

lw := &RunenvLoggerWriter{runenv, "metrics"}

w := io.MultiWriter(mfile, lw)

duration := time.Second

go runenv.M().WriteJson(duration, w)
return nil
}