diff --git a/metric/common.go b/metric/common.go index 3dcaf1971..c370f7b52 100644 --- a/metric/common.go +++ b/metric/common.go @@ -24,7 +24,7 @@ import ( // baseMetric is common representation for gauge and cumulative metrics. // -// baseMetric maintains a value for each combination of of label values passed to +// baseMetric maintains a value for each combination of label values passed to // Set, Add, or Inc method. // // baseMetric should not be used directly, use metric specific type such as @@ -54,9 +54,23 @@ type baseEntry interface { read(t time.Time) metricdata.Point } +func (bm *baseMetric) startTime() *time.Time { + switch bm.bmType { + case cumulativeInt64, cumulativeFloat64, derivedCumulativeInt64, derivedCumulativeFloat64: + return &bm.start + default: + // gauges don't have start time. + return nil + } +} + // Read returns the current values of the baseMetric as a metric for export. func (bm *baseMetric) read() *metricdata.Metric { now := time.Now() + startTime := bm.startTime() + if startTime == nil { + startTime = &now + } m := &metricdata.Metric{ Descriptor: bm.desc, } @@ -65,7 +79,7 @@ func (bm *baseMetric) read() *metricdata.Metric { key := k.(string) labelVals := bm.decodeLabelVals(key) m.TimeSeries = append(m.TimeSeries, &metricdata.TimeSeries{ - StartTime: now, // Gauge value is instantaneous. + StartTime: *startTime, LabelValues: labelVals, Points: []metricdata.Point{ entry.read(now), diff --git a/metric/cumulative.go b/metric/cumulative.go new file mode 100644 index 000000000..6d3be3f88 --- /dev/null +++ b/metric/cumulative.go @@ -0,0 +1,224 @@ +// Copyright 2019, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric + +import ( + "math" + "sync/atomic" + "time" + + "go.opencensus.io/metric/metricdata" +) + +// Float64Cumulative represents a float64 value that can only go up. +// +// Float64Cumulative maintains a float64 value for each combination of label values +// passed to the Set or Inc methods. +type Float64Cumulative struct { + bm baseMetric +} + +// Float64CumulativeEntry represents a single value of the cumulative corresponding to a set +// of label values. +type Float64CumulativeEntry struct { + val uint64 // needs to be uint64 for atomic access, interpret with math.Float64frombits +} + +func (e *Float64CumulativeEntry) read(t time.Time) metricdata.Point { + v := math.Float64frombits(atomic.LoadUint64(&e.val)) + if v < 0 { + v = 0 + } + return metricdata.NewFloat64Point(t, v) +} + +// GetEntry returns a cumulative entry where each key for this cumulative has the value +// given. +// +// The number of label values supplied must be exactly the same as the number +// of keys supplied when this cumulative was created. +func (c *Float64Cumulative) GetEntry(labelVals ...metricdata.LabelValue) (*Float64CumulativeEntry, error) { + entry, err := c.bm.entryForValues(labelVals, func() baseEntry { + return &Float64CumulativeEntry{} + }) + if err != nil { + return nil, err + } + return entry.(*Float64CumulativeEntry), nil +} + +// Set sets the cumulative entry value to provided val. It returns without updating if the value is +// negative or lower than previously stored value. +func (e *Float64CumulativeEntry) Set(val float64) { + var swapped, equalOrLess bool + if val <= 0.0 { + return + } + for !swapped && !equalOrLess { + oldBits := atomic.LoadUint64(&e.val) + oldVal := math.Float64frombits(oldBits) + if val > oldVal { + valBits := math.Float64bits(val) + swapped = atomic.CompareAndSwapUint64(&e.val, oldBits, valBits) + } else { + equalOrLess = true + } + } +} + +// Inc increments the cumulative entry value by val. It returns without incrementing if the val +// is negative. +func (e *Float64CumulativeEntry) Inc(val float64) { + var swapped bool + if val <= 0.0 { + return + } + for !swapped { + oldVal := atomic.LoadUint64(&e.val) + newVal := math.Float64bits(math.Float64frombits(oldVal) + val) + swapped = atomic.CompareAndSwapUint64(&e.val, oldVal, newVal) + } +} + +// Int64Cumulative represents a int64 cumulative value that can only go up. +// +// Int64Cumulative maintains an int64 value for each combination of label values passed to the +// Set or Inc methods. +type Int64Cumulative struct { + bm baseMetric +} + +// Int64CumulativeEntry represents a single value of the cumulative corresponding to a set +// of label values. +type Int64CumulativeEntry struct { + val int64 +} + +func (e *Int64CumulativeEntry) read(t time.Time) metricdata.Point { + v := atomic.LoadInt64(&e.val) + if v < 0 { + v = 0.0 + } + return metricdata.NewInt64Point(t, v) +} + +// GetEntry returns a cumulative entry where each key for this cumulative has the value +// given. +// +// The number of label values supplied must be exactly the same as the number +// of keys supplied when this cumulative was created. +func (c *Int64Cumulative) GetEntry(labelVals ...metricdata.LabelValue) (*Int64CumulativeEntry, error) { + entry, err := c.bm.entryForValues(labelVals, func() baseEntry { + return &Int64CumulativeEntry{} + }) + if err != nil { + return nil, err + } + return entry.(*Int64CumulativeEntry), nil +} + +// Set sets the value of the cumulative entry to the provided value. It returns without updating +// if the val is negative or if the val is lower than previously stored value. +func (e *Int64CumulativeEntry) Set(val int64) { + var swapped, equalOrLess bool + if val <= 0 { + return + } + for !swapped && !equalOrLess { + old := atomic.LoadInt64(&e.val) + if val > old { + swapped = atomic.CompareAndSwapInt64(&e.val, old, val) + } else { + equalOrLess = true + } + } +} + +// Inc increments the current cumulative entry value by val. It returns without incrementing if +// the val is negative. +func (e *Int64CumulativeEntry) Inc(val int64) { + if val <= 0 { + return + } + atomic.AddInt64(&e.val, int64(val)) +} + +// Int64DerivedCumulative represents int64 cumulative value that is derived from an object. +// +// Int64DerivedCumulative maintains objects for each combination of label values. +// These objects implement Int64DerivedCumulativeInterface to read instantaneous value +// representing the object. +type Int64DerivedCumulative struct { + bm baseMetric +} + +type int64DerivedCumulativeEntry struct { + fn func() int64 +} + +func (e *int64DerivedCumulativeEntry) read(t time.Time) metricdata.Point { + // TODO: [rghetia] handle a condition where new value return by fn is lower than previous call. + // It requires that we maintain the old values. + return metricdata.NewInt64Point(t, e.fn()) +} + +// UpsertEntry inserts or updates a derived cumulative entry for the given set of label values. +// The object for which this cumulative entry is inserted or updated, must implement func() int64 +// +// It returns an error if +// 1. The number of label values supplied are not the same as the number +// of keys supplied when this cumulative was created. +// 2. fn func() int64 is nil. +func (c *Int64DerivedCumulative) UpsertEntry(fn func() int64, labelVals ...metricdata.LabelValue) error { + if fn == nil { + return errInvalidParam + } + return c.bm.upsertEntry(labelVals, func() baseEntry { + return &int64DerivedCumulativeEntry{fn} + }) +} + +// Float64DerivedCumulative represents float64 cumulative value that is derived from an object. +// +// Float64DerivedCumulative maintains objects for each combination of label values. +// These objects implement Float64DerivedCumulativeInterface to read instantaneous value +// representing the object. +type Float64DerivedCumulative struct { + bm baseMetric +} + +type float64DerivedCumulativeEntry struct { + fn func() float64 +} + +func (e *float64DerivedCumulativeEntry) read(t time.Time) metricdata.Point { + return metricdata.NewFloat64Point(t, e.fn()) +} + +// UpsertEntry inserts or updates a derived cumulative entry for the given set of label values. +// The object for which this cumulative entry is inserted or updated, must implement func() float64 +// +// It returns an error if +// 1. The number of label values supplied are not the same as the number +// of keys supplied when this cumulative was created. +// 2. fn func() float64 is nil. +func (c *Float64DerivedCumulative) UpsertEntry(fn func() float64, labelVals ...metricdata.LabelValue) error { + if fn == nil { + return errInvalidParam + } + return c.bm.upsertEntry(labelVals, func() baseEntry { + return &float64DerivedCumulativeEntry{fn} + }) +} diff --git a/metric/cumulative_test.go b/metric/cumulative_test.go new file mode 100644 index 000000000..2e0d3c249 --- /dev/null +++ b/metric/cumulative_test.go @@ -0,0 +1,290 @@ +// Copyright 2019, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric + +import ( + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "go.opencensus.io/metric/metricdata" +) + +func TestCumulative(t *testing.T) { + r := NewRegistry() + + f, _ := r.AddFloat64Cumulative("TestCumulative", + WithLabelKeys("k1", "k2")) + e, _ := f.GetEntry(metricdata.LabelValue{}, metricdata.LabelValue{}) + e.Set(5) + e, _ = f.GetEntry(metricdata.NewLabelValue("k1v1"), metricdata.LabelValue{}) + e.Inc(1) + e, _ = f.GetEntry(metricdata.NewLabelValue("k1v1"), metricdata.LabelValue{}) + e.Inc(1) + e, _ = f.GetEntry(metricdata.NewLabelValue("k1v2"), metricdata.NewLabelValue("k2v2")) + e.Inc(1) + m := r.Read() + want := []*metricdata.Metric{ + { + Descriptor: metricdata.Descriptor{ + Name: "TestCumulative", + LabelKeys: []string{"k1", "k2"}, + Type: metricdata.TypeCumulativeFloat64, + }, + TimeSeries: []*metricdata.TimeSeries{ + { + LabelValues: []metricdata.LabelValue{ + {}, {}, + }, + Points: []metricdata.Point{ + metricdata.NewFloat64Point(time.Time{}, 5), + }, + }, + { + LabelValues: []metricdata.LabelValue{ + metricdata.NewLabelValue("k1v1"), + {}, + }, + Points: []metricdata.Point{ + metricdata.NewFloat64Point(time.Time{}, 2), + }, + }, + { + LabelValues: []metricdata.LabelValue{ + metricdata.NewLabelValue("k1v2"), + metricdata.NewLabelValue("k2v2"), + }, + Points: []metricdata.Point{ + metricdata.NewFloat64Point(time.Time{}, 1), + }, + }, + }, + }, + } + canonicalize(m) + canonicalize(want) + if diff := cmp.Diff(m, want, cmp.Comparer(ignoreTimes)); diff != "" { + t.Errorf("-got +want: %s", diff) + } +} + +func TestCumulativeMetricDescriptor(t *testing.T) { + r := NewRegistry() + + gf, _ := r.AddFloat64Cumulative("float64_gauge") + compareType(gf.bm.desc.Type, metricdata.TypeCumulativeFloat64, t) + gi, _ := r.AddInt64Cumulative("int64_gauge") + compareType(gi.bm.desc.Type, metricdata.TypeCumulativeInt64, t) + dgf, _ := r.AddFloat64DerivedCumulative("derived_float64_gauge") + compareType(dgf.bm.desc.Type, metricdata.TypeCumulativeFloat64, t) + dgi, _ := r.AddInt64DerivedCumulative("derived_int64_gauge") + compareType(dgi.bm.desc.Type, metricdata.TypeCumulativeInt64, t) +} + +func readAndCompareInt64Val(testname string, r *Registry, want int64, t *testing.T) { + ms := r.Read() + if got := ms[0].TimeSeries[0].Points[0].Value.(int64); got != want { + t.Errorf("testname: %s, got = %v, want %v\n", testname, got, want) + } +} + +func TestInt64CumulativeEntry_IncAndSet(t *testing.T) { + r := NewRegistry() + g, _ := r.AddInt64Cumulative("bm") + e, _ := g.GetEntry() + e.Inc(5) + readAndCompareInt64Val("inc", r, 5, t) + e.Inc(-2) + readAndCompareInt64Val("inc negative", r, 5, t) + e.Set(-2) + readAndCompareInt64Val("set negative", r, 5, t) + e.Set(4) + readAndCompareInt64Val("set lower", r, 5, t) + e.Set(9) + readAndCompareInt64Val("set higher", r, 9, t) +} + +func readAndCompareFloat64Val(testname string, r *Registry, want float64, t *testing.T) { + ms := r.Read() + if got := ms[0].TimeSeries[0].Points[0].Value.(float64); got != want { + t.Errorf("testname: %s, got = %v, want %v\n", testname, got, want) + } +} + +func TestFloat64CumulativeEntry_IncAndSet(t *testing.T) { + r := NewRegistry() + g, _ := r.AddFloat64Cumulative("bm") + e, _ := g.GetEntry() + e.Inc(5.0) + readAndCompareFloat64Val("inc", r, 5.0, t) + e.Inc(-2.0) + readAndCompareFloat64Val("inc negative", r, 5.0, t) + e.Set(-2.0) + readAndCompareFloat64Val("set negative", r, 5.0, t) + e.Set(4.0) + readAndCompareFloat64Val("set lower", r, 5.0, t) + e.Set(9.9) + readAndCompareFloat64Val("set higher", r, 9.9, t) +} + +func TestCumulativeWithSameNameDiffType(t *testing.T) { + r := NewRegistry() + r.AddInt64Cumulative("bm") + _, gotErr := r.AddFloat64Cumulative("bm") + if gotErr == nil { + t.Errorf("got: nil, want error: %v", errMetricExistsWithDiffType) + } + _, gotErr = r.AddInt64DerivedCumulative("bm") + if gotErr == nil { + t.Errorf("got: nil, want error: %v", errMetricExistsWithDiffType) + } + _, gotErr = r.AddFloat64DerivedCumulative("bm") + if gotErr == nil { + t.Errorf("got: nil, want error: %v", errMetricExistsWithDiffType) + } +} + +func TestCumulativeWithLabelMismatch(t *testing.T) { + r := NewRegistry() + g, _ := r.AddInt64Cumulative("bm", WithLabelKeys("k1")) + _, gotErr := g.GetEntry(metricdata.NewLabelValue("k1v2"), metricdata.NewLabelValue("k2v2")) + if gotErr == nil { + t.Errorf("got: nil, want error: %v", errKeyValueMismatch) + } +} + +type sysUpTimeInNanoSecs struct { + size int64 +} + +func (q *sysUpTimeInNanoSecs) ToInt64() int64 { + return q.size +} + +func TestInt64DerivedCumulativeEntry_Inc(t *testing.T) { + r := NewRegistry() + q := &sysUpTimeInNanoSecs{3} + g, _ := r.AddInt64DerivedCumulative("bm", WithLabelKeys("k1", "k2")) + err := g.UpsertEntry(q.ToInt64, metricdata.NewLabelValue("k1v1"), metricdata.LabelValue{}) + if err != nil { + t.Errorf("want: nil, got: %v", err) + } + ms := r.Read() + if got, want := ms[0].TimeSeries[0].Points[0].Value.(int64), int64(3); got != want { + t.Errorf("value = %v, want %v", got, want) + } + q.size = 5 + ms = r.Read() + if got, want := ms[0].TimeSeries[0].Points[0].Value.(int64), int64(5); got != want { + t.Errorf("value = %v, want %v", got, want) + } +} + +func TestInt64DerivedCumulativeEntry_IncWithNilObj(t *testing.T) { + r := NewRegistry() + g, _ := r.AddInt64DerivedCumulative("bm", WithLabelKeys("k1", "k2")) + gotErr := g.UpsertEntry(nil, metricdata.NewLabelValue("k1v1"), metricdata.LabelValue{}) + if gotErr == nil { + t.Errorf("expected error but got nil") + } +} + +func TestInt64DerivedCumulativeEntry_IncWithInvalidLabels(t *testing.T) { + r := NewRegistry() + q := &sysUpTimeInNanoSecs{3} + g, _ := r.AddInt64DerivedCumulative("bm", WithLabelKeys("k1", "k2")) + gotErr := g.UpsertEntry(q.ToInt64, metricdata.NewLabelValue("k1v1")) + if gotErr == nil { + t.Errorf("expected error but got nil") + } +} + +func TestInt64DerivedCumulativeEntry_Update(t *testing.T) { + r := NewRegistry() + q := &sysUpTimeInNanoSecs{3} + q2 := &sysUpTimeInNanoSecs{5} + g, _ := r.AddInt64DerivedCumulative("bm", WithLabelKeys("k1", "k2")) + g.UpsertEntry(q.ToInt64, metricdata.NewLabelValue("k1v1"), metricdata.LabelValue{}) + gotErr := g.UpsertEntry(q2.ToInt64, metricdata.NewLabelValue("k1v1"), metricdata.LabelValue{}) + if gotErr != nil { + t.Errorf("got: %v, want: nil", gotErr) + } + ms := r.Read() + if got, want := ms[0].TimeSeries[0].Points[0].Value.(int64), int64(5); got != want { + t.Errorf("value = %v, want %v", got, want) + } +} + +type sysUpTimeInSeconds struct { + size float64 +} + +func (q *sysUpTimeInSeconds) ToFloat64() float64 { + return q.size +} + +func TestFloat64DerivedCumulativeEntry_Inc(t *testing.T) { + r := NewRegistry() + q := &sysUpTimeInSeconds{5.0} + g, _ := r.AddFloat64DerivedCumulative("bm", WithLabelKeys("k1", "k2")) + err := g.UpsertEntry(q.ToFloat64, metricdata.NewLabelValue("k1v1"), metricdata.LabelValue{}) + if err != nil { + t.Errorf("want: nil, got: %v", err) + } + ms := r.Read() + if got, want := ms[0].TimeSeries[0].Points[0].Value.(float64), float64(5.0); got != want { + t.Errorf("value = %v, want %v", got, want) + } + q.size = 7 + ms = r.Read() + if got, want := ms[0].TimeSeries[0].Points[0].Value.(float64), float64(7.0); got != want { + t.Errorf("value = %v, want %v", got, want) + } +} + +func TestFloat64DerivedCumulativeEntry_IncWithNilObj(t *testing.T) { + r := NewRegistry() + g, _ := r.AddFloat64DerivedCumulative("bm", WithLabelKeys("k1", "k2")) + gotErr := g.UpsertEntry(nil, metricdata.NewLabelValue("k1v1"), metricdata.LabelValue{}) + if gotErr == nil { + t.Errorf("expected error but got nil") + } +} + +func TestFloat64DerivedCumulativeEntry_IncWithInvalidLabels(t *testing.T) { + r := NewRegistry() + q := &sysUpTimeInSeconds{3} + g, _ := r.AddFloat64DerivedCumulative("bm", WithLabelKeys("k1", "k2")) + gotErr := g.UpsertEntry(q.ToFloat64, metricdata.NewLabelValue("k1v1")) + if gotErr == nil { + t.Errorf("expected error but got nil") + } +} + +func TestFloat64DerivedCumulativeEntry_Update(t *testing.T) { + r := NewRegistry() + q := &sysUpTimeInSeconds{3.0} + q2 := &sysUpTimeInSeconds{5.0} + g, _ := r.AddFloat64DerivedCumulative("bm", WithLabelKeys("k1", "k2")) + g.UpsertEntry(q.ToFloat64, metricdata.NewLabelValue("k1v1"), metricdata.LabelValue{}) + gotErr := g.UpsertEntry(q2.ToFloat64, metricdata.NewLabelValue("k1v1"), metricdata.LabelValue{}) + if gotErr != nil { + t.Errorf("got: %v, want: nil", gotErr) + } + ms := r.Read() + if got, want := ms[0].TimeSeries[0].Points[0].Value.(float64), float64(5.0); got != want { + t.Errorf("value = %v, want %v", got, want) + } +} diff --git a/metric/registry.go b/metric/registry.go index 6b1ff323f..5dc41631d 100644 --- a/metric/registry.go +++ b/metric/registry.go @@ -134,11 +134,79 @@ func bmTypeToMetricType(bm *baseMetric) metricdata.Type { return metricdata.TypeGaugeFloat64 case gaugeInt64: return metricdata.TypeGaugeInt64 + case derivedCumulativeFloat64: + return metricdata.TypeCumulativeFloat64 + case derivedCumulativeInt64: + return metricdata.TypeCumulativeInt64 + case cumulativeFloat64: + return metricdata.TypeCumulativeFloat64 + case cumulativeInt64: + return metricdata.TypeCumulativeInt64 default: panic("unsupported metric type") } } +// AddFloat64Cumulative creates and adds a new float64-valued cumulative to this registry. +func (r *Registry) AddFloat64Cumulative(name string, mos ...Options) (*Float64Cumulative, error) { + f := &Float64Cumulative{ + bm: baseMetric{ + bmType: cumulativeFloat64, + }, + } + _, err := r.initBaseMetric(&f.bm, name, mos...) + if err != nil { + return nil, err + } + return f, nil +} + +// AddInt64Cumulative creates and adds a new int64-valued cumulative to this registry. +func (r *Registry) AddInt64Cumulative(name string, mos ...Options) (*Int64Cumulative, error) { + i := &Int64Cumulative{ + bm: baseMetric{ + bmType: cumulativeInt64, + }, + } + _, err := r.initBaseMetric(&i.bm, name, mos...) + if err != nil { + return nil, err + } + return i, nil +} + +// AddInt64DerivedCumulative creates and adds a new derived int64-valued cumulative to this registry. +// A derived cumulative is convenient form of cumulative where the object associated with the cumulative +// provides its value by implementing func() int64. +func (r *Registry) AddInt64DerivedCumulative(name string, mos ...Options) (*Int64DerivedCumulative, error) { + i := &Int64DerivedCumulative{ + bm: baseMetric{ + bmType: derivedCumulativeInt64, + }, + } + _, err := r.initBaseMetric(&i.bm, name, mos...) + if err != nil { + return nil, err + } + return i, nil +} + +// AddFloat64DerivedCumulative creates and adds a new derived float64-valued gauge to this registry. +// A derived cumulative is convenient form of cumulative where the object associated with the cumulative +// provides its value by implementing func() float64. +func (r *Registry) AddFloat64DerivedCumulative(name string, mos ...Options) (*Float64DerivedCumulative, error) { + f := &Float64DerivedCumulative{ + bm: baseMetric{ + bmType: derivedCumulativeFloat64, + }, + } + _, err := r.initBaseMetric(&f.bm, name, mos...) + if err != nil { + return nil, err + } + return f, nil +} + func createMetricOption(mos ...Options) *metricOptions { o := &metricOptions{} for _, mo := range mos { @@ -169,7 +237,7 @@ func (r *Registry) initBaseMetric(bm *baseMetric, name string, mos ...Options) ( return bm, nil } -// Read reads all gauges in this registry and returns their values as metrics. +// Read reads all gauges and cumulatives in this registry and returns their values as metrics. func (r *Registry) Read() []*metricdata.Metric { ms := []*metricdata.Metric{} r.baseMetrics.Range(func(k, v interface{}) bool {