From b74fdf0aaa171f6d6181fa44179449fe2124c24a Mon Sep 17 00:00:00 2001 From: Michael MacDonald Date: Fri, 12 Apr 2024 14:36:47 +0000 Subject: [PATCH] DAOS-7203 control: Add histogram support to Prometheus exporter Switch the object I/O counters to histograms in order to capture per-I/O size distributions in addition to total amounts. Update the Prometheus exporter to support passthrough histograms from native DAOS telemetry format. Features: telemetry Required-githooks: true Change-Id: I7842cc48a107ec0ba0ec93472fb6684db7394d30 Signed-off-by: Michael MacDonald --- src/control/cmd/dmg/telemetry.go | 10 +- src/control/lib/control/telemetry.go | 66 ++++++ src/control/lib/control/telemetry_test.go | 69 ++++++ src/control/lib/telemetry/counter.go | 3 + src/control/lib/telemetry/duration.go | 4 + src/control/lib/telemetry/gauge.go | 6 + src/control/lib/telemetry/histogram.go | 170 ++++++++++++++ src/control/lib/telemetry/promexp/client.go | 2 +- .../lib/telemetry/promexp/client_test.go | 24 +- .../lib/telemetry/promexp/collector.go | 12 + .../lib/telemetry/promexp/engine_test.go | 169 +++++++++++-- .../lib/telemetry/promexp/histogram.go | 222 ++++++++++++++++++ src/control/lib/telemetry/promexp/source.go | 22 ++ src/control/lib/telemetry/promexp/util.go | 22 ++ src/control/lib/telemetry/snapshot.go | 2 + src/control/lib/telemetry/timestamp.go | 2 + src/gurt/telemetry.c | 13 +- src/gurt/tests/test_gurt_telem_producer.c | 61 ++--- src/object/cli_shard.c | 6 +- src/object/obj_utils.c | 29 ++- src/object/srv_mod.c | 1 - src/object/srv_obj.c | 6 +- 22 files changed, 834 insertions(+), 87 deletions(-) create mode 100644 src/control/lib/telemetry/histogram.go create mode 100644 src/control/lib/telemetry/promexp/histogram.go diff --git a/src/control/cmd/dmg/telemetry.go b/src/control/cmd/dmg/telemetry.go index fd63164b438..d4cdf50fe63 100644 --- a/src/control/cmd/dmg/telemetry.go +++ b/src/control/cmd/dmg/telemetry.go @@ -254,13 +254,11 @@ func (cmd *telemConfigCmd) configurePrometheus() (*installInfo, error) { } sc := &staticConfig{} - for _, h := range cmd.config.HostList { - host, _, err := common.SplitPort(h, 0) - if err != nil { - return nil, err - } - sc.Targets = append(sc.Targets, host+":9191") + sc.Targets, err = common.ParseHostList(cmd.config.HostList, 9191) + if err != nil { + return nil, err } + cfg.ScrapeConfigs = []*scrapeConfig{ { JobName: "daos", diff --git a/src/control/lib/control/telemetry.go b/src/control/lib/control/telemetry.go index 62854438df4..d1780ded3c7 100644 --- a/src/control/lib/control/telemetry.go +++ b/src/control/lib/control/telemetry.go @@ -10,6 +10,7 @@ import ( "context" "encoding/json" "fmt" + "math" "net/url" "sort" "strconv" @@ -281,6 +282,71 @@ func (ms *MetricSet) MarshalJSON() ([]byte, error) { }) } +// jsonFloat is a terrible hack to deal with the stdlib's inabilility +// to deal with -Inf/+Inf/NaN: https://github.com/golang/go/issues/59627 +type jsonFloat float64 + +func (jf jsonFloat) MarshalJSON() ([]byte, error) { + switch { + case math.IsInf(float64(jf), 1): + return []byte(`"+Inf"`), nil + case math.IsInf(float64(jf), -1): + return []byte(`"-Inf"`), nil + case math.IsNaN(float64(jf)): + return []byte(`"NaN"`), nil + } + return json.Marshal(float64(jf)) +} + +func (jf *jsonFloat) UnmarshalJSON(data []byte) error { + if err := json.Unmarshal(data, (*float64)(jf)); err == nil { + return nil + } + + var stringVal string + if err := json.Unmarshal(data, &stringVal); err != nil { + return err + } + + val, err := strconv.ParseFloat(stringVal, 64) + if err != nil { + return err + } + + *jf = jsonFloat(val) + + return nil +} + +func (mb *MetricBucket) MarshalJSON() ([]byte, error) { + type toJSON MetricBucket + return json.Marshal(&struct { + UpperBound jsonFloat `json:"upper_bound"` + *toJSON + }{ + UpperBound: jsonFloat(mb.UpperBound), + toJSON: (*toJSON)(mb), + }) +} + +func (mb *MetricBucket) UnmarshalJSON(data []byte) error { + type fromJSON MetricBucket + + from := &struct { + UpperBound jsonFloat `json:"upper_bound"` + *fromJSON + }{ + fromJSON: (*fromJSON)(mb), + } + if err := json.Unmarshal(data, from); err != nil { + return err + } + + mb.UpperBound = float64(from.UpperBound) + + return nil +} + // jsonMetric serves as a universal metric representation for unmarshaling from // JSON. It covers all possible fields of Metric types. type jsonMetric struct { diff --git a/src/control/lib/control/telemetry_test.go b/src/control/lib/control/telemetry_test.go index f5d8cb701ec..bcba7bef689 100644 --- a/src/control/lib/control/telemetry_test.go +++ b/src/control/lib/control/telemetry_test.go @@ -582,6 +582,71 @@ func TestControl_MetricsQuery(t *testing.T) { } } +func TestControl_MetricBucket_JSON(t *testing.T) { + for name, tc := range map[string]struct { + bucket *MetricBucket + expUpperBound float64 + expMarshalErr error + expUnmarshalErr error + }{ + "+Inf": { + bucket: &MetricBucket{ + UpperBound: math.Inf(1), + }, + expUpperBound: math.Inf(1), + }, + "-Inf": { + bucket: &MetricBucket{ + UpperBound: math.Inf(-1), + }, + expUpperBound: math.Inf(-1), + }, + "NaN": { + bucket: &MetricBucket{ + UpperBound: math.NaN(), + }, + expUpperBound: math.NaN(), + }, + "42.42": { + bucket: &MetricBucket{ + UpperBound: 42.42, + }, + expUpperBound: 42.42, + }, + "0.000": { + bucket: &MetricBucket{ + UpperBound: 0.000, + }, + expUpperBound: 0.000, + }, + } { + t.Run(name, func(t *testing.T) { + data, gotErr := json.Marshal(tc.bucket) + test.CmpErr(t, tc.expMarshalErr, gotErr) + if tc.expMarshalErr != nil { + return + } + + var gotBucket MetricBucket + gotErr = json.Unmarshal(data, &gotBucket) + test.CmpErr(t, tc.expUnmarshalErr, gotErr) + if tc.expUnmarshalErr != nil { + return + } + + if math.IsNaN(tc.expUpperBound) { + if !math.IsNaN(gotBucket.UpperBound) { + t.Fatalf("UpperBound NaN value did not survive Marshal/Unmarshal (got %f)", gotBucket.UpperBound) + } + } else { + if diff := cmp.Diff(tc.expUpperBound, gotBucket.UpperBound); diff != "" { + t.Fatalf("Bucket UpperBound value did not survive Marshal/Unmarshal (-want, +got): %s", diff) + } + } + }) + } +} + func TestControl_Metric_JSON(t *testing.T) { testLabelMap := map[string]string{ "label1": "val1", @@ -616,6 +681,10 @@ func TestControl_Metric_JSON(t *testing.T) { CumulativeCount: 55, UpperBound: 500, }, + { + CumulativeCount: 4242, + UpperBound: math.Inf(1), + }, }, }, }, diff --git a/src/control/lib/telemetry/counter.go b/src/control/lib/telemetry/counter.go index 81549a32daf..3350b5ef6b6 100644 --- a/src/control/lib/telemetry/counter.go +++ b/src/control/lib/telemetry/counter.go @@ -24,6 +24,9 @@ import ( "fmt" ) +var _ Metric = (*Counter)(nil) + +// Counter is a counter metric. type Counter struct { metricBase } diff --git a/src/control/lib/telemetry/duration.go b/src/control/lib/telemetry/duration.go index 1f32125bc90..e87b374cc49 100644 --- a/src/control/lib/telemetry/duration.go +++ b/src/control/lib/telemetry/duration.go @@ -25,8 +25,11 @@ import ( "time" ) +var _ StatsMetric = (*Duration)(nil) + type Duration struct { statsMetric + hist *Histogram // optional histogram data } func (d *Duration) Type() MetricType { @@ -63,6 +66,7 @@ func newDuration(hdl *handle, path string, name *string, node *C.struct_d_tm_nod }, }, } + d.hist = newHistogram(&d.statsMetric) // Load up statistics _ = d.Value() diff --git a/src/control/lib/telemetry/gauge.go b/src/control/lib/telemetry/gauge.go index ea84ff90504..47e460ad2e4 100644 --- a/src/control/lib/telemetry/gauge.go +++ b/src/control/lib/telemetry/gauge.go @@ -24,6 +24,9 @@ import ( "fmt" ) +var _ Metric = (*Gauge)(nil) +var _ StatsMetric = (*StatsGauge)(nil) + // Gauge is a metric that consists of a single value that may increase or decrease. type Gauge struct { metricBase @@ -89,6 +92,7 @@ func GetGauge(ctx context.Context, name string) (*Gauge, error) { // StatsGauge is a gauge with statistics gathered. type StatsGauge struct { statsMetric + hist *Histogram // optional histogram data } // Type returns the type of the gauge with stats. @@ -128,9 +132,11 @@ func newStatsGauge(hdl *handle, path string, name *string, node *C.struct_d_tm_n }, }, } + g.hist = newHistogram(&g.statsMetric) // Load up the stats _ = g.Value() + return g } diff --git a/src/control/lib/telemetry/histogram.go b/src/control/lib/telemetry/histogram.go new file mode 100644 index 00000000000..5f42f84bb5a --- /dev/null +++ b/src/control/lib/telemetry/histogram.go @@ -0,0 +1,170 @@ +// +// (C) Copyright 2024 Intel Corporation. +// +// SPDX-License-Identifier: BSD-2-Clause-Patent +// +//go:build linux && (amd64 || arm64) +// +build linux +// +build amd64 arm64 + +package telemetry + +/* +#cgo LDFLAGS: -lgurt + +#include "gurt/telemetry_common.h" +#include "gurt/telemetry_consumer.h" +*/ +import "C" + +import ( + "github.com/pkg/errors" +) + +// HistogramBucket contains the min/max values and count for a single bucket. +type HistogramBucket struct { + Min uint64 + Max uint64 + Count uint64 +} + +// HistogramSample contains the results of a histogram sample. +type HistogramSample struct { + Count uint64 + Sum uint64 + Buckets []HistogramBucket + Values []uint64 +} + +// Histogram provides access to histogram data associated with +// a parent metric. +type Histogram struct { + parent *statsMetric + cHist C.struct_d_tm_histogram_t +} + +func (h *Histogram) sample() (uint64, []uint64, []HistogramBucket) { + if h.parent.handle == nil || h.parent.node == nil || h.cHist.dth_num_buckets == 0 { + return 0, nil, nil + } + + if h.NumBuckets() == 0 { + return 0, nil, nil + } + + var cBucket C.struct_d_tm_bucket_t + var cVal C.uint64_t + var sum uint64 + // NB: We don't need to include the final bucket as its max is +Inf, + // and its min value can be derived from the max of the previous bucket. + // This plays well with Prometheus Histograms... May need to be adjusted + // if other implementations don't like this. + buckets := make([]HistogramBucket, h.cHist.dth_num_buckets-1) + vals := make([]uint64, len(buckets)) + for i := 0; i < len(buckets); i++ { + res := C.d_tm_get_bucket_range(h.parent.handle.ctx, &cBucket, C.int(i), h.parent.node) + if res != C.DER_SUCCESS { + return 0, nil, nil + } + + res = C.d_tm_get_counter(h.parent.handle.ctx, &cVal, cBucket.dtb_bucket) + if res != C.DER_SUCCESS { + return 0, nil, nil + } + + // NB: Histogram bucket values are cumulative! + // https://en.wikipedia.org/wiki/Histogram#Cumulative_histogram + sum += uint64(cVal) + buckets[i] = HistogramBucket{ + Min: uint64(cBucket.dtb_min), + Max: uint64(cBucket.dtb_max), + Count: sum, + } + vals[i] = buckets[i].Count + } + + return h.parent.Sum(), vals, buckets +} + +// NumBuckets returns a count of buckets in the histogram. +func (h *Histogram) NumBuckets() uint64 { + res := C.d_tm_get_num_buckets(h.parent.handle.ctx, &h.cHist, h.parent.node) + if res != C.DER_SUCCESS { + return 0 + } + + return uint64(h.cHist.dth_num_buckets - 1) +} + +// Buckets returns the histogram buckets. +func (h *Histogram) Buckets() []HistogramBucket { + _, _, buckets := h.sample() + return buckets +} + +// Sample returns a point-in-time sample of the histogram. +func (h *Histogram) Sample(cur float64) *HistogramSample { + _, vals, buckets := h.sample() + + return &HistogramSample{ + Count: h.parent.SampleSize(), + Sum: uint64(cur), + Buckets: buckets, + Values: vals, + } +} + +func newHistogram(parent *statsMetric) *Histogram { + h := &Histogram{ + parent: parent, + } + + if h.NumBuckets() == 0 { + return nil + } + + return h +} + +func getHistogram(m Metric) *Histogram { + if m == nil { + return nil + } + + switch o := m.(type) { + case *StatsGauge: + return o.hist + case *Duration: + return o.hist + default: + return nil + } +} + +// HasBuckets returns true if the metric has histogram data. +func HasBuckets(m Metric) bool { + if h := getHistogram(m); h != nil { + return h.NumBuckets() > 0 + } + + return false +} + +// GetBuckets returns the histogram buckets for the metric, if available. +func GetBuckets(m Metric) ([]HistogramBucket, error) { + if h := getHistogram(m); h != nil { + return h.Buckets(), nil + } + + return nil, errors.Errorf("[%s]: no histogram data", m.FullPath()) +} + +// SampleHistogram returns a point-in-time sample of the histogram for +// the metric, if available. +func SampleHistogram(m Metric) (*HistogramSample, error) { + if h := getHistogram(m); h != nil { + return h.Sample(m.FloatValue()), nil + } + + return nil, errors.Errorf("[%s]: no histogram data", m.FullPath()) +} diff --git a/src/control/lib/telemetry/promexp/client.go b/src/control/lib/telemetry/promexp/client.go index e6eefeaf396..7cc216ee719 100644 --- a/src/control/lib/telemetry/promexp/client.go +++ b/src/control/lib/telemetry/promexp/client.go @@ -60,7 +60,7 @@ func extractClientLabels(log logging.Logger, in string) (labels labelMap, name s compsIdx++ } - for i, label := range []string{"job", "pid", "tid"} { + for i, label := range []string{"jobid", "pid", "tid"} { if i > 0 { // After jobid, we should have a pid and/or tid, and // then move on to the engine labels. diff --git a/src/control/lib/telemetry/promexp/client_test.go b/src/control/lib/telemetry/promexp/client_test.go index d0274f157b5..c21a7255b3b 100644 --- a/src/control/lib/telemetry/promexp/client_test.go +++ b/src/control/lib/telemetry/promexp/client_test.go @@ -50,36 +50,36 @@ func TestPromExp_extractClientLabels(t *testing.T) { input: testPath("io/ops/update/active"), expName: "io_ops_update_active", expLabels: labelMap{ - "job": jobID, - "pid": pid, - "tid": tid, + "jobid": jobID, + "pid": pid, + "tid": tid, }, }, "fetch latency 1MB": { input: testPath("io/latency/fetch/1MB"), expName: "io_latency_fetch", expLabels: labelMap{ - "job": jobID, - "pid": pid, - "tid": tid, - "size": "1MB", + "jobid": jobID, + "pid": pid, + "tid": tid, + "size": "1MB", }, }, "started_at": { input: fmt.Sprintf("ID: %d/%s/%s/started_at", shmID, jobID, pid), expName: "started_at", expLabels: labelMap{ - "job": jobID, - "pid": pid, + "jobid": jobID, + "pid": pid, }, }, "pool ops": { input: fmt.Sprintf("ID: %d/%s/%s/pool/%s/ops/foo", shmID, jobID, pid, test.MockPoolUUID(1)), expName: "pool_ops_foo", expLabels: labelMap{ - "job": jobID, - "pid": pid, - "pool": test.MockPoolUUID(1).String(), + "jobid": jobID, + "pid": pid, + "pool": test.MockPoolUUID(1).String(), }, }, } { diff --git a/src/control/lib/telemetry/promexp/collector.go b/src/control/lib/telemetry/promexp/collector.go index ec70c0e8fbd..0b533874151 100644 --- a/src/control/lib/telemetry/promexp/collector.go +++ b/src/control/lib/telemetry/promexp/collector.go @@ -77,6 +77,18 @@ func (c *metricsCollector) Collect(ch chan<- prometheus.Metric) { telemetry.MetricTypeSnapshot: err = sm.gvm.set(sm.baseName, sm.metric.FloatValue(), sm.labels) case telemetry.MetricTypeStatsGauge, telemetry.MetricTypeDuration: + if telemetry.HasBuckets(sm.metric) { + sample, err := telemetry.SampleHistogram(sm.metric) + if err != nil { + c.log.Errorf("[%s]: failed to get histogram sample", sm.baseName) + break + } + if err := sm.hvm.set(sm.baseName, sm.labels, sample.Count, sample.Sum, sample.Values); err != nil { + c.log.Errorf("[%s]: failed to set bucket values: %+v", sm.baseName, err) + } + break + } + if err = sm.gvm.set(sm.baseName, sm.metric.FloatValue(), sm.labels); err != nil { break } diff --git a/src/control/lib/telemetry/promexp/engine_test.go b/src/control/lib/telemetry/promexp/engine_test.go index b21839b7ba0..6f5278cd1d3 100644 --- a/src/control/lib/telemetry/promexp/engine_test.go +++ b/src/control/lib/telemetry/promexp/engine_test.go @@ -19,6 +19,7 @@ import ( "github.com/google/uuid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" "github.com/daos-stack/daos/src/control/common/test" "github.com/daos-stack/daos/src/control/lib/telemetry" @@ -441,7 +442,8 @@ func TestPromExp_Collector_Collect(t *testing.T) { expMetricNames []string }{ "nil channel": { - collector: defaultCollector, + collector: defaultCollector, + expMetricNames: []string{}, }, "success": { collector: defaultCollector, @@ -454,9 +456,9 @@ func TestPromExp_Collector_Collect(t *testing.T) { "engine_stats_gauge2_max", "engine_stats_gauge2_mean", "engine_stats_gauge2_sum", + "engine_stats_gauge2_samples", "engine_stats_gauge2_stddev", "engine_stats_gauge2_sumsquares", - "engine_stats_gauge2_samples", "engine_timer_stamp", "engine_timer_snapshot", "engine_timer_duration", @@ -464,9 +466,9 @@ func TestPromExp_Collector_Collect(t *testing.T) { "engine_timer_duration_max", "engine_timer_duration_mean", "engine_timer_duration_sum", + "engine_timer_duration_samples", "engine_timer_duration_stddev", "engine_timer_duration_sumsquares", - "engine_timer_duration_samples", }, }, "ignore some metrics": { @@ -492,19 +494,18 @@ func TestPromExp_Collector_Collect(t *testing.T) { } } - test.AssertEqual(t, len(tc.expMetricNames), len(gotMetrics), "wrong number of metrics returned") - for _, exp := range tc.expMetricNames { - found := false - for _, got := range gotMetrics { - if strings.Contains(got.Desc().String(), exp) { - found = true - break - } - } - - if !found { - t.Errorf("expected metric %q not found", exp) - } + fqNameRe := regexp.MustCompile(`fqName: "(\w*)"`) + gotMetricNames := make([]string, len(gotMetrics)) + for i, m := range gotMetrics { + gotMetricNames[i] = fqNameRe.FindStringSubmatch(m.Desc().String())[1] + } + cmpOpts := cmp.Options{ + cmpopts.SortSlices(func(a, b string) bool { + return a < b + }), + } + if diff := cmp.Diff(tc.expMetricNames, gotMetricNames, cmpOpts...); diff != "" { + t.Fatalf("unexpected set of metrics (-want,+got: %s)", diff) } }) } @@ -615,6 +616,14 @@ func TestPromExp_extractEngineLabels(t *testing.T) { "target": "5", }, }, + "pool_xferred_update": { + input: "ID: 1/pool/86eacd2c-eceb-4054-8621-017f4f661fe2/xferred/update/tgt_5", + expName: "pool_xferred_update", + expLabels: labelMap{ + "pool": "86eacd2c-eceb-4054-8621-017f4f661fe2", + "target": "5", + }, + }, "nvme_vendor_host_reads_raw": { input: "ID: 1/nvme/d70505:05:00.0/vendor/host_reads_raw", expName: "nvme_vendor_host_reads_raw", @@ -820,3 +829,131 @@ func TestPromExp_Collector_RemoveSource(t *testing.T) { }) } } + +func TestPromExp_Collector_DaosHistograms(t *testing.T) { + var ( + poolID string = "86eacd2c-eceb-4054-8621-017f4f661fe2" + ) + + type sample struct { + count uint64 + sum uint64 + values []uint64 + } + type metricData struct { + name string + labels labelMap + samples []sample + buckets []float64 + } + testData := map[string]*metricData{ + "rank_3/tgt_5": { + name: "pool_xferred_update", + labels: labelMap{ + "rank": "3", + "pool": poolID, + "target": "5", + }, + samples: []sample{ + { + count: 1, + sum: 2, + values: []uint64{0, 2}, + }, + { + count: 2, + sum: 3, + values: []uint64{1, 2}, + }, + { + count: 3, + sum: 3, + values: []uint64{1, 2}, + }, + }, + buckets: []float64{1, 2}, + }, + "rank_0/tgt_1": { + name: "pool_xferred_update", + labels: labelMap{ + "rank": "0", + "pool": poolID, + "target": "1", + }, + samples: []sample{ + { + count: 0, + sum: 0, + values: []uint64{0, 0}, + }, + { + count: 0, + sum: 0, + values: []uint64{0, 0}, + }, + { + count: 0, + sum: 0, + values: []uint64{0, 0}, + }, + }, + buckets: []float64{1, 2}, + }, + } + + hvm := make(hvMap) + + for _, v := range testData { + hvm.add(v.name, "help", v.labels, v.buckets) + for _, samp := range v.samples { + if err := hvm.set(v.name, v.labels, samp.count, samp.sum, samp.values); err != nil { + t.Fatal(err) + } + } + } + + ch := make(chan prometheus.Metric) + go func() { + for _, hv := range hvm { + hv.Collect(ch) + } + close(ch) + }() + + for m := range ch { + var mp dto.Metric + if err := m.Write(&mp); err != nil { + t.Fatal(err) + } + + hist := mp.GetHistogram() + if hist == nil { + t.Fatalf("received metric %+v did not contain Histogram", mp) + } + + var poolLabel string + var rankLabel string + for _, pair := range mp.Label { + switch pair.GetName() { + case "pool": + poolLabel = pair.GetValue() + case "rank": + rankLabel = pair.GetValue() + } + } + + t.Logf("mp: %+v", mp) + test.AssertEqual(t, poolLabel, poolID, "unexpected pool label value") + + switch rankLabel { + case "3": + test.AssertEqual(t, hist.GetSampleCount(), uint64(3), "unexpected sample count for rank 3") + test.AssertEqual(t, hist.GetSampleSum(), float64(3), "unexpected sample sum for rank 3") + case "0": + test.AssertEqual(t, hist.GetSampleCount(), uint64(0), "unexpected sample count for rank 0") + test.AssertEqual(t, hist.GetSampleSum(), float64(0), "unexpected sample sum for rank 0") + default: + t.Fatalf("unexpected rank %s in results", rankLabel) + } + } +} diff --git a/src/control/lib/telemetry/promexp/histogram.go b/src/control/lib/telemetry/promexp/histogram.go new file mode 100644 index 00000000000..66648e97f0d --- /dev/null +++ b/src/control/lib/telemetry/promexp/histogram.go @@ -0,0 +1,222 @@ +// +// (C) Copyright 2024 Intel Corporation. +// +// SPDX-License-Identifier: BSD-2-Clause-Patent +// + +package promexp + +import ( + "sort" + "sync" + + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/model" +) + +var ( + _ prometheus.Metric = &daosHistogram{} + _ prometheus.Collector = &daosHistogramVec{} +) + +// daosHistogram is a pass-through structure for pre-bucketed histogram +// data retrieved from the DAOS telemetry library. +type daosHistogram struct { + desc *prometheus.Desc + name string + help string + sum float64 + count uint64 + labelVals []string + buckets []float64 + bucketMap map[float64]uint64 +} + +func newDaosHistogram(name, help string, desc *prometheus.Desc, buckets []float64, labelVals []string) *daosHistogram { + dh := &daosHistogram{ + name: name, + help: help, + desc: desc, + labelVals: labelVals, + buckets: buckets, + bucketMap: make(map[float64]uint64), + } + + for _, b := range buckets { + dh.bucketMap[b] = 0 + } + + return dh +} + +func (dh *daosHistogram) Desc() *prometheus.Desc { + return dh.desc +} + +func (dh *daosHistogram) Write(out *dto.Metric) error { + ch, err := prometheus.NewConstHistogram(dh.desc, dh.count, dh.sum, dh.bucketMap, dh.labelVals...) + if err != nil { + return err + } + + return ch.Write(out) +} + +func (dh *daosHistogram) AddSample(sampleCount uint64, sum float64, values []uint64) error { + if len(values) != len(dh.buckets) { + return errors.Errorf("expected %d values, got %d", len(dh.buckets), len(values)) + } + + for i, b := range dh.buckets { + dh.bucketMap[b] = values[i] + } + + dh.count = sampleCount + dh.sum = sum + + return nil +} + +func (dh *daosHistogram) MustAddSample(sampleCount uint64, sum float64, values []uint64) { + if err := dh.AddSample(sampleCount, sum, values); err != nil { + panic(err) + } +} + +type hashedMetricValue struct { + metric prometheus.Metric + labelVals []string +} + +type hashedMetrics map[uint64][]*hashedMetricValue + +// daosHistogramVec is a simplified custom implementation of prometheus.HistogramVec. +// It is not designed for concurrency or currying. +type daosHistogramVec struct { + opts prometheus.HistogramOpts + desc *prometheus.Desc + labelKeys []string // stored here because prometheus.Desc is opaque to us + histograms hashedMetrics + mu sync.RWMutex +} + +func (dhv *daosHistogramVec) Describe(ch chan<- *prometheus.Desc) { + ch <- dhv.desc +} + +func (dhv *daosHistogramVec) Collect(ch chan<- prometheus.Metric) { + dhv.mu.RLock() + defer dhv.mu.RUnlock() + + for _, histograms := range dhv.histograms { + for _, histogram := range histograms { + ch <- histogram.metric + } + } +} + +func labelVals(labels prometheus.Labels) []string { + keys := make([]string, 0, len(labels)) + for key := range labels { + keys = append(keys, key) + } + sort.Strings(keys) + + vals := make([]string, 0, len(labels)) + for _, key := range keys { + vals = append(vals, labels[key]) + } + return vals +} + +func cmpLabelVals(a, b []string) bool { + if len(a) != len(b) { + return false + } + + for i := range a { + if a[i] != b[i] { + return false + } + } + + return true +} + +func (dhv *daosHistogramVec) addWithLabelValues(hashKey uint64, lvs []string) *hashedMetricValue { + dh := newDaosHistogram(dhv.opts.Name, dhv.opts.Help, dhv.desc, dhv.opts.Buckets, lvs) + hmv := &hashedMetricValue{ + metric: dh, + labelVals: lvs, + } + // NB: must be done under lock to be thread-safe + dhv.histograms[hashKey] = append(dhv.histograms[hashKey], hmv) + + return hmv +} + +func (dhv *daosHistogramVec) GetWith(labels prometheus.Labels) (*daosHistogram, error) { + hashKey := model.LabelsToSignature(labels) + + var hmv *hashedMetricValue + lvs := labelVals(labels) + + dhv.mu.Lock() + _, found := dhv.histograms[hashKey] + if !found { + hmv = dhv.addWithLabelValues(hashKey, lvs) + } + + if hmv == nil { + for _, h := range dhv.histograms[hashKey] { + if cmpLabelVals(h.labelVals, lvs) { + hmv = h + break + } + } + + if hmv == nil { + hmv = dhv.addWithLabelValues(hashKey, lvs) + } + } + dhv.mu.Unlock() + + dh, ok := hmv.metric.(*daosHistogram) + if !ok { + return nil, errors.New("stored something other than *daosHistogram") + } + return dh, nil +} + +func (dhv *daosHistogramVec) With(labels prometheus.Labels) *daosHistogram { + dh, err := dhv.GetWith(labels) + if err != nil { + panic(err) + } + return dh +} + +func (dhv *daosHistogramVec) Reset() { + dhv.mu.Lock() + defer dhv.mu.Unlock() + + for k := range dhv.histograms { + delete(dhv.histograms, k) + } +} + +func newDaosHistogramVec(opts prometheus.HistogramOpts, labelNames []string) *daosHistogramVec { + return &daosHistogramVec{ + desc: prometheus.NewDesc( + prometheus.BuildFQName(opts.Namespace, opts.Subsystem, opts.Name), + opts.Help, + labelNames, + opts.ConstLabels, + ), + opts: opts, + labelKeys: labelNames, + histograms: make(hashedMetrics), + } +} diff --git a/src/control/lib/telemetry/promexp/source.go b/src/control/lib/telemetry/promexp/source.go index 2212b319ff7..8b3b278da47 100644 --- a/src/control/lib/telemetry/promexp/source.go +++ b/src/control/lib/telemetry/promexp/source.go @@ -90,6 +90,7 @@ type sourceMetric struct { labels labelMap gvm gvMap cvm cvMap + hvm hvMap } // collect sends the metrics vectors in the sourceMetric struct to the provided channel. @@ -100,6 +101,9 @@ func (bm *sourceMetric) collect(ch chan<- prometheus.Metric) { for _, cv := range bm.cvm { cv.Collect(ch) } + for _, hv := range bm.hvm { + hv.Collect(ch) + } } // resetVecs resets all the metrics vectors in the sourceMetric struct. @@ -110,6 +114,9 @@ func (bm *sourceMetric) resetVecs() { for _, cv := range bm.cvm { cv.Reset() } + for _, hv := range bm.hvm { + hv.Reset() + } } // newSourceMetric initializes a new sourceMetric struct. @@ -120,6 +127,7 @@ func newSourceMetric(log logging.Logger, m telemetry.Metric, baseName string, la labels: labels, gvm: make(gvMap), cvm: make(cvMap), + hvm: make(hvMap), } desc := m.Desc() @@ -129,6 +137,20 @@ func newSourceMetric(log logging.Logger, m telemetry.Metric, baseName string, la telemetry.MetricTypeSnapshot: sm.gvm.add(sm.baseName, desc, sm.labels) case telemetry.MetricTypeStatsGauge, telemetry.MetricTypeDuration: + if telemetry.HasBuckets(sm.metric) { + buckets, err := telemetry.GetBuckets(sm.metric) + if err != nil { + log.Errorf("[%s]: failed to get histogram buckets", baseName) + break + } + cfgBuckets := make([]float64, 0, len(buckets)) + for _, b := range buckets { + cfgBuckets = append(cfgBuckets, float64(b.Max)) + } + sm.hvm.add(sm.baseName, desc, sm.labels, cfgBuckets) + break // We don't need the parent metric, as its sum value is included in the histogram + } + sm.gvm.add(sm.baseName, desc, sm.labels) for _, ms := range getMetricStats(sm.baseName, sm.metric) { if ms.isCounter { diff --git a/src/control/lib/telemetry/promexp/util.go b/src/control/lib/telemetry/promexp/util.go index 6ddc46623d3..435ccfd65dd 100644 --- a/src/control/lib/telemetry/promexp/util.go +++ b/src/control/lib/telemetry/promexp/util.go @@ -110,6 +110,28 @@ func (m cvMap) set(name string, value float64, labels labelMap) error { return nil } +type hvMap map[string]*daosHistogramVec + +func (m hvMap) add(name, help string, labels labelMap, buckets []float64) { + if _, found := m[name]; !found { + hv := newDaosHistogramVec(prometheus.HistogramOpts{ + Name: name, + Help: help, + Buckets: buckets, + }, labels.keys()) + m[name] = hv + } +} + +func (m hvMap) set(name string, labels labelMap, samples, sum uint64, values []uint64) error { + hv, found := m[name] + if !found { + return errors.Errorf("histogram vector %s not found", name) + } + + return hv.With(prometheus.Labels(labels)).AddSample(samples, float64(sum), values) +} + type metricStat struct { name string desc string diff --git a/src/control/lib/telemetry/snapshot.go b/src/control/lib/telemetry/snapshot.go index 2ffa23296c3..940733f5659 100644 --- a/src/control/lib/telemetry/snapshot.go +++ b/src/control/lib/telemetry/snapshot.go @@ -25,6 +25,8 @@ import ( "time" ) +var _ Metric = (*Snapshot)(nil) + type Snapshot struct { metricBase } diff --git a/src/control/lib/telemetry/timestamp.go b/src/control/lib/telemetry/timestamp.go index 97ef5bb1ed9..ef8be196051 100644 --- a/src/control/lib/telemetry/timestamp.go +++ b/src/control/lib/telemetry/timestamp.go @@ -25,6 +25,8 @@ import ( "time" ) +var _ Metric = (*Timestamp)(nil) + type Timestamp struct { metricBase } diff --git a/src/gurt/telemetry.c b/src/gurt/telemetry.c index 8f08192d072..7ec8fe1057f 100644 --- a/src/gurt/telemetry.c +++ b/src/gurt/telemetry.c @@ -3133,8 +3133,7 @@ d_tm_init_histogram(struct d_tm_node_t *node, char *path, int num_buckets, struct d_tm_bucket_t *dth_buckets; struct d_tm_shmem_hdr *shmem; uint64_t min = 0; - uint64_t max = 0; - uint64_t prev_width = 0; + uint64_t max = 0; int rc = DER_SUCCESS; int i; char *meta_data; @@ -3198,10 +3197,11 @@ d_tm_init_histogram(struct d_tm_node_t *node, char *path, int num_buckets, min = 0; max = initial_width - 1; - prev_width = initial_width; for (i = 0; i < num_buckets; i++) { - D_ASPRINTF(meta_data, "histogram bucket %d [%lu .. %lu]", - i, min, max); + if (max == UINT64_MAX && i == (num_buckets - 1)) + D_ASPRINTF(meta_data, "histogram bucket %d [%lu .. inf]", i, min); + else + D_ASPRINTF(meta_data, "histogram bucket %d [%lu .. %lu]", i, min, max); if (meta_data == NULL) { rc = -DER_NOMEM; goto failure; @@ -3230,9 +3230,8 @@ d_tm_init_histogram(struct d_tm_node_t *node, char *path, int num_buckets, } else if (multiplier == 1) { max += initial_width; } else { - max = min + (prev_width * multiplier) - 1; + max = (min * multiplier) - 1; } - prev_width = (max - min) + 1; } D_DEBUG(DB_TRACE, "Successfully added histogram for: [%s]\n", path); diff --git a/src/gurt/tests/test_gurt_telem_producer.c b/src/gurt/tests/test_gurt_telem_producer.c index 83e172d1afb..172ce138edb 100644 --- a/src/gurt/tests/test_gurt_telem_producer.c +++ b/src/gurt/tests/test_gurt_telem_producer.c @@ -377,6 +377,7 @@ check_bucket_metadata(struct d_tm_node_t *node, int bucket_id) char *desc; char *units; int rc; + char max_str[21]; char exp_desc[D_TM_MAX_DESC_LEN]; printf("Checking bucket %d\n", bucket_id); @@ -385,9 +386,13 @@ check_bucket_metadata(struct d_tm_node_t *node, int bucket_id) assert_rc_equal(rc, DER_SUCCESS); assert_non_null(bucket.dtb_bucket); - snprintf(exp_desc, sizeof(exp_desc), - "histogram bucket %d [%lu .. %lu]", - bucket_id, bucket.dtb_min, bucket.dtb_max); + if (bucket.dtb_max == UINT64_MAX) + snprintf(max_str, sizeof(max_str), "inf"); + else + snprintf(max_str, sizeof(max_str), "%lu", bucket.dtb_max); + + snprintf(exp_desc, sizeof(exp_desc), "histogram bucket %d [%lu .. %s]", bucket_id, + bucket.dtb_min, max_str); rc = d_tm_get_metadata(cli_ctx, &desc, &units, bucket.dtb_bucket); assert_rc_equal(rc, DER_SUCCESS); @@ -533,32 +538,32 @@ check_histogram_m2_data(char *path) assert_rc_equal(rc, DER_SUCCESS); assert_int_equal(histogram.dth_num_buckets, 5); - assert_int_equal(histogram.dth_initial_width, 2048); + assert_int_equal(histogram.dth_initial_width, 256); assert_int_equal(histogram.dth_value_multiplier, 2); rc = d_tm_get_bucket_range(cli_ctx, &bucket, 0, gauge); assert_rc_equal(rc, DER_SUCCESS); assert_int_equal(bucket.dtb_min, 0); - assert_int_equal(bucket.dtb_max, 2047); + assert_int_equal(bucket.dtb_max, 255); rc = d_tm_get_bucket_range(cli_ctx, &bucket, 1, gauge); assert_rc_equal(rc, DER_SUCCESS); - assert_int_equal(bucket.dtb_min, 2048); - assert_int_equal(bucket.dtb_max, 6143); + assert_int_equal(bucket.dtb_min, 256); + assert_int_equal(bucket.dtb_max, 511); rc = d_tm_get_bucket_range(cli_ctx, &bucket, 2, gauge); assert_rc_equal(rc, DER_SUCCESS); - assert_int_equal(bucket.dtb_min, 6144); - assert_int_equal(bucket.dtb_max, 14335); + assert_int_equal(bucket.dtb_min, 512); + assert_int_equal(bucket.dtb_max, 1023); rc = d_tm_get_bucket_range(cli_ctx, &bucket, 3, gauge); assert_rc_equal(rc, DER_SUCCESS); - assert_int_equal(bucket.dtb_min, 14336); - assert_int_equal(bucket.dtb_max, 30719); + assert_int_equal(bucket.dtb_min, 1024); + assert_int_equal(bucket.dtb_max, 2047); rc = d_tm_get_bucket_range(cli_ctx, &bucket, 4, gauge); assert_rc_equal(rc, DER_SUCCESS); - assert_int_equal(bucket.dtb_min, 30720); + assert_int_equal(bucket.dtb_min, 2048); assert_true(bucket.dtb_max == UINT64_MAX); rc = d_tm_get_bucket_range(cli_ctx, &bucket, 5, gauge); @@ -583,7 +588,7 @@ test_gauge_with_histogram_multiplier_2(void **state) assert_rc_equal(rc, DER_SUCCESS); num_buckets = 5; - initial_width = 2048; + initial_width = 256; multiplier = 2; rc = d_tm_init_histogram(gauge, path, num_buckets, @@ -592,29 +597,29 @@ test_gauge_with_histogram_multiplier_2(void **state) /* bucket 0 - gets 3 values */ d_tm_set_gauge(gauge, 0); - d_tm_set_gauge(gauge, 512); - d_tm_set_gauge(gauge, 2047); + d_tm_set_gauge(gauge, 128); + d_tm_set_gauge(gauge, 255); /* bucket 1 - gets 4 values */ - d_tm_set_gauge(gauge, 2048); - d_tm_set_gauge(gauge, 2049); - d_tm_set_gauge(gauge, 3000); - d_tm_set_gauge(gauge, 6143); + d_tm_set_gauge(gauge, 256); + d_tm_set_gauge(gauge, 312); + d_tm_set_gauge(gauge, 480); + d_tm_set_gauge(gauge, 511); /* bucket 2 - gets 2 values */ - d_tm_set_gauge(gauge, 6144); - d_tm_set_gauge(gauge, 14335); + d_tm_set_gauge(gauge, 512); + d_tm_set_gauge(gauge, 1023); /* bucket 3 - gets 3 values */ - d_tm_set_gauge(gauge, 14336); - d_tm_set_gauge(gauge, 16383); - d_tm_set_gauge(gauge, 30719); + d_tm_set_gauge(gauge, 1024); + d_tm_set_gauge(gauge, 2000); + d_tm_set_gauge(gauge, 2047); /* bucket 4 - gets 4 values */ - d_tm_set_gauge(gauge, 30720); - d_tm_set_gauge(gauge, 35000); - d_tm_set_gauge(gauge, 40000); - d_tm_set_gauge(gauge, 65000); + d_tm_set_gauge(gauge, 2048); + d_tm_set_gauge(gauge, 3000); + d_tm_set_gauge(gauge, 4000); + d_tm_set_gauge(gauge, 8193); /* Verify result data */ check_histogram_m2_data(path); diff --git a/src/object/cli_shard.c b/src/object/cli_shard.c index c7f4fa11a3c..0214e8a5039 100644 --- a/src/object/cli_shard.c +++ b/src/object/cli_shard.c @@ -716,12 +716,12 @@ obj_shard_update_metrics_end(crt_rpc_t *rpc, uint64_t send_time, void *arg, int D_ASSERTF(opm != NULL, "pool %p\n", pool); if (opc == DAOS_OBJ_RPC_UPDATE) { size = daos_sgls_packed_size(rw_args->rwaa_sgls, orw->orw_nr, NULL); - d_tm_inc_counter(opm->opm_update_bytes, size); - lat = tls->cot_update_lat[lat_bucket(size)]; + lat = tls->cot_update_lat[lat_bucket(size)]; + d_tm_inc_gauge(opm->opm_update_bytes, size); } else { size = obj_get_fetch_size(rw_args); lat = tls->cot_fetch_lat[lat_bucket(size)]; - d_tm_inc_counter(opm->opm_fetch_bytes, size); + d_tm_inc_gauge(opm->opm_fetch_bytes, size); } break; default: diff --git a/src/object/obj_utils.c b/src/object/obj_utils.c index 631bcea696f..a2dbc7758a3 100644 --- a/src/object/obj_utils.c +++ b/src/object/obj_utils.c @@ -157,6 +157,7 @@ obj_metrics_alloc_internal(const char *path, int tgt_id, bool server) { struct obj_pool_metrics *metrics; char tgt_path[32]; + char tmp_path[D_TM_MAX_NAME_LEN]; uint32_t opc; int rc; @@ -203,19 +204,27 @@ obj_metrics_alloc_internal(const char *path, int tgt_id, bool server) if (rc) D_WARN("Failed to create retry cnt sensor: " DF_RC "\n", DP_RC(rc)); - /** Total bytes read */ - rc = d_tm_add_metric(&metrics->opm_fetch_bytes, D_TM_COUNTER, - "total number of bytes fetched/read", "bytes", "%s/xferred/fetch%s", - path, tgt_path); + /** Total bytes read, with I/Os bucketed by size */ + snprintf(tmp_path, sizeof(tmp_path), "%s/xferred/fetch%s", path, tgt_path); + rc = d_tm_add_metric(&metrics->opm_fetch_bytes, D_TM_STATS_GAUGE, + "total number of bytes fetched/read", "bytes", tmp_path); + if (rc) + DL_WARN(rc, "Failed to create bytes fetch gauge"); + + rc = d_tm_init_histogram(metrics->opm_fetch_bytes, tmp_path, NR_LATENCY_BUCKETS, 256, 2); + if (rc) + DL_WARN(rc, "Failed to init per-I/O fetch histogram"); + + /** Total bytes written, with I/Os bucketed by size */ + snprintf(tmp_path, sizeof(tmp_path), "%s/xferred/update%s", path, tgt_path); + rc = d_tm_add_metric(&metrics->opm_update_bytes, D_TM_STATS_GAUGE, + "total number of bytes updated/written", "bytes", tmp_path); if (rc) - D_WARN("Failed to create bytes fetch counter: " DF_RC "\n", DP_RC(rc)); + DL_WARN(rc, "Failed to create bytes update gauge"); - /** Total bytes written */ - rc = d_tm_add_metric(&metrics->opm_update_bytes, D_TM_COUNTER, - "total number of bytes updated/written", "bytes", - "%s/xferred/update%s", path, tgt_path); + rc = d_tm_init_histogram(metrics->opm_update_bytes, tmp_path, NR_LATENCY_BUCKETS, 256, 2); if (rc) - D_WARN("Failed to create bytes update counter: " DF_RC "\n", DP_RC(rc)); + DL_WARN(rc, "Failed to init per-I/O update histogram"); /** Total number of EC full-stripe update operations, of type counter */ rc = d_tm_add_metric(&metrics->opm_update_ec_full, D_TM_COUNTER, diff --git a/src/object/srv_mod.c b/src/object/srv_mod.c index 9ca9d0b1f36..8c204b6c388 100644 --- a/src/object/srv_mod.c +++ b/src/object/srv_mod.c @@ -126,7 +126,6 @@ obj_tls_init(int tags, int xs_id, int tgt_id) * Maintain per-I/O size latency for update & fetch RPCs * of type gauge */ - obj_latency_tm_init(DAOS_OBJ_RPC_UPDATE, tgt_id, tls->ot_update_lat, obj_opc_to_str(DAOS_OBJ_RPC_UPDATE), "update RPC processing time", true); diff --git a/src/object/srv_obj.c b/src/object/srv_obj.c index bce108295d2..ebf53a4298a 100644 --- a/src/object/srv_obj.c +++ b/src/object/srv_obj.c @@ -2227,7 +2227,7 @@ obj_update_sensors(struct obj_io_context *ioc, int err) switch (opc) { case DAOS_OBJ_RPC_UPDATE: - d_tm_inc_counter(opm->opm_update_bytes, ioc->ioc_io_size); + d_tm_inc_gauge(opm->opm_update_bytes, ioc->ioc_io_size); lat = tls->ot_update_lat[lat_bucket(ioc->ioc_io_size)]; orw = crt_req_get(ioc->ioc_rpc); if (orw->orw_iod_array.oia_iods != NULL) @@ -2235,11 +2235,11 @@ obj_update_sensors(struct obj_io_context *ioc, int err) break; case DAOS_OBJ_RPC_TGT_UPDATE: - d_tm_inc_counter(opm->opm_update_bytes, ioc->ioc_io_size); + d_tm_inc_gauge(opm->opm_update_bytes, ioc->ioc_io_size); lat = tls->ot_tgt_update_lat[lat_bucket(ioc->ioc_io_size)]; break; case DAOS_OBJ_RPC_FETCH: - d_tm_inc_counter(opm->opm_fetch_bytes, ioc->ioc_io_size); + d_tm_inc_gauge(opm->opm_fetch_bytes, ioc->ioc_io_size); lat = tls->ot_fetch_lat[lat_bucket(ioc->ioc_io_size)]; break; default: