diff --git a/docs/source/contents/metrics/operational.md b/docs/source/contents/metrics/operational.md index ec37a3b442..377a73677f 100644 --- a/docs/source/contents/metrics/operational.md +++ b/docs/source/contents/metrics/operational.md @@ -8,15 +8,25 @@ There is a Grafana dashboard (referenced below) that provides an overview of the ## List of SCv2 metrics -The list of SCv2 metrics that we are compiling is: +The list of SCv2 metrics that we are compiling is as follows. - ```{literalinclude} ../../../../scheduler/pkg/metrics/prometheus.go +For the agent that sits next to the inference servers: + + ```{literalinclude} ../../../../scheduler/pkg/metrics/agent.go + :language: golang + :start-after: // start list of metrics + :end-before: // end list of metrics + ``` + +For the pipeline gateway that handles requests to pipelines: + + ```{literalinclude} ../../../../scheduler/pkg/metrics/gateway.go :language: golang :start-after: // start list of metrics :end-before: // end list of metrics ``` -Many of these metrics are model level counters and gauges. +Many of these metrics are model and pipeline level counters and gauges. We also aggregate some of these metrics to speed up the display of graphs. This is experimental and these metrics are bound to change to reflect the trends we want to capture as we get more information about the usage of the system. @@ -35,7 +45,7 @@ Prometheus will be available at `http://localhost:9090`. ### Kubernetes Installation -Download the dashboard from [SCv2 dashboard](https://github.com/SeldonIO/seldon-core-v2/blob/master/prometheus/dashboards/Seldon%20Core%20Model%20Mesh%20Monitoring.json) and import it in Grafana, making sure that the data source is pointing to the correct Prometheus store. +Download the dashboard from [SCv2 dashboard](https://github.com/SeldonIO/seldon-core-v2/blob/master/prometheus/dashboards/seldon.json) and import it in Grafana, making sure that the data source is pointing to the correct Prometheus store. Find more information on how to import the dashboard [here](https://grafana.com/docs/grafana/latest/dashboards/export-import/). diff --git a/prometheus/dashboards/Seldon Core Model Mesh Monitoring.json b/prometheus/dashboards/seldon.json similarity index 86% rename from prometheus/dashboards/Seldon Core Model Mesh Monitoring.json rename to prometheus/dashboards/seldon.json index c15af12818..644b703ffc 100644 --- a/prometheus/dashboards/Seldon Core Model Mesh Monitoring.json +++ b/prometheus/dashboards/seldon.json @@ -1,4 +1,46 @@ { + "__inputs": [ + { + "name": "DS_DS_PROMETHEUS", + "label": "DS_PROMETHEUS", + "description": "", + "type": "datasource", + "pluginId": "prometheus", + "pluginName": "Prometheus" + } + ], + "__requires": [ + { + "type": "panel", + "id": "gauge", + "name": "Gauge", + "version": "" + }, + { + "type": "grafana", + "id": "grafana", + "name": "Grafana", + "version": "8.0.5" + }, + { + "type": "datasource", + "id": "prometheus", + "name": "Prometheus", + "version": "1.0.0" + }, + { + "type": "panel", + "id": "stat", + "name": "Stat", + "version": "" + }, + { + "type": "panel", + "id": "timeseries", + "name": "Time series", + "version": "" + } + ], "annotations": { "list": [ { @@ -19,17 +61,13 @@ ] }, "editable": true, - "fiscalYearStartMonth": 0, + "gnetId": null, "graphTooltip": 0, - "id": 26, + "id": null, "links": [], - "liveNow": false, "panels": [ { - "datasource": { - "type": "prometheus", - "uid": "P1809F7CD0C75ACF3" - }, + "datasource": "${DS_DS_PROMETHEUS}", "description": "", "fieldConfig": { "defaults": { @@ -72,13 +110,10 @@ "text": {}, "textMode": "value_and_name" }, - "pluginVersion": "8.4.6", + "pluginVersion": "8.0.5", "targets": [ { - "datasource": { - "type": "prometheus", - "uid": "P1809F7CD0C75ACF3" - }, + "datasource": "${DS_DS_PROMETHEUS}", "exemplar": true, "expr": "count (seldon_mesh_seldon_loaded_model_memory_bytes_gauge >0 )", "hide": false, @@ -87,10 +122,7 @@ "refId": "B" }, { - "datasource": { - "type": "prometheus", - "uid": "P1809F7CD0C75ACF3" - }, + "datasource": "${DS_DS_PROMETHEUS}", "exemplar": true, "expr": "sum (seldon_mesh_seldon_loaded_model_gauge)", "hide": false, @@ -104,10 +136,7 @@ "type": "stat" }, { - "datasource": { - "type": "prometheus", - "uid": "P1809F7CD0C75ACF3" - }, + "datasource": "${DS_DS_PROMETHEUS}", "description": "", "fieldConfig": { "defaults": { @@ -146,19 +175,17 @@ "fields": "", "values": true }, + "text": {}, "textMode": "auto" }, - "pluginVersion": "8.4.6", + "pluginVersion": "8.0.5", "targets": [ { - "datasource": { - "type": "prometheus", - "uid": "P1809F7CD0C75ACF3" - }, + "datasource": "${DS_DS_PROMETHEUS}", "exemplar": true, "expr": "sum by(server) (seldon_mesh_seldon_loaded_model_gauge)", "format": "table", - "instant": false, + "instant": true, "interval": "", "intervalFactor": 1, "legendFormat": "", @@ -192,10 +219,7 @@ "type": "stat" }, { - "datasource": { - "type": "prometheus", - "uid": "P1809F7CD0C75ACF3" - }, + "datasource": "${DS_DS_PROMETHEUS}", "description": "", "fieldConfig": { "defaults": { @@ -237,17 +261,14 @@ "text": {}, "textMode": "auto" }, - "pluginVersion": "8.4.6", + "pluginVersion": "8.0.5", "targets": [ { - "datasource": { - "type": "prometheus", - "uid": "P1809F7CD0C75ACF3" - }, + "datasource": "${DS_DS_PROMETHEUS}", "exemplar": true, "expr": "count by(server) (seldon_mesh_seldon_loaded_model_memory_bytes_gauge > 0)", "format": "table", - "instant": false, + "instant": true, "interval": "", "intervalFactor": 1, "legendFormat": "", @@ -281,10 +302,7 @@ "type": "stat" }, { - "datasource": { - "type": "prometheus", - "uid": "P1809F7CD0C75ACF3" - }, + "datasource": "${DS_DS_PROMETHEUS}", "description": "", "fieldConfig": { "defaults": { @@ -336,13 +354,10 @@ "showThresholdMarkers": true, "text": {} }, - "pluginVersion": "8.4.6", + "pluginVersion": "8.0.5", "targets": [ { - "datasource": { - "type": "prometheus", - "uid": "P1809F7CD0C75ACF3" - }, + "datasource": "${DS_DS_PROMETHEUS}", "exemplar": true, "expr": "sum by(server) (seldon_mesh_seldon_loaded_model_memory_bytes_gauge) / sum by(server) (seldon_mesh_seldon_server_replica_memory_capacity_overcommit_bytes_gauge)", "format": "table", @@ -434,10 +449,7 @@ "type": "gauge" }, { - "datasource": { - "type": "prometheus", - "uid": "P1809F7CD0C75ACF3" - }, + "datasource": "${DS_DS_PROMETHEUS}", "fieldConfig": { "defaults": { "color": { @@ -508,10 +520,7 @@ }, "targets": [ { - "datasource": { - "type": "prometheus", - "uid": "P1809F7CD0C75ACF3" - }, + "datasource": "${DS_DS_PROMETHEUS}", "exemplar": true, "expr": "sum(rate(seldon_mesh_seldon_cache_evict_count[1m]))", "format": "time_series", @@ -522,10 +531,7 @@ "refId": "A" }, { - "datasource": { - "type": "prometheus", - "uid": "P1809F7CD0C75ACF3" - }, + "datasource": "${DS_DS_PROMETHEUS}", "exemplar": true, "expr": "sum(rate(seldon_mesh_seldon_cache_miss_count[1m]))", "hide": false, @@ -538,10 +544,7 @@ "type": "timeseries" }, { - "datasource": { - "type": "prometheus", - "uid": "P1809F7CD0C75ACF3" - }, + "datasource": "${DS_DS_PROMETHEUS}", "fieldConfig": { "defaults": { "color": { @@ -612,10 +615,7 @@ }, "targets": [ { - "datasource": { - "type": "prometheus", - "uid": "P1809F7CD0C75ACF3" - }, + "datasource": "${DS_DS_PROMETHEUS}", "exemplar": true, "expr": "sum by (server) (rate(seldon_mesh_seldon_load_model_counter[1m]))", "format": "time_series", @@ -626,10 +626,7 @@ "refId": "A" }, { - "datasource": { - "type": "prometheus", - "uid": "P1809F7CD0C75ACF3" - }, + "datasource": "${DS_DS_PROMETHEUS}", "exemplar": true, "expr": "sum by (server) (rate(seldon_mesh_seldon_unload_model_counter[1m]))", "format": "time_series", @@ -645,10 +642,7 @@ "type": "timeseries" }, { - "datasource": { - "type": "prometheus", - "uid": "P1809F7CD0C75ACF3" - }, + "datasource": "${DS_DS_PROMETHEUS}", "description": "", "fieldConfig": { "defaults": { @@ -723,10 +717,7 @@ "pluginVersion": "8.4.6", "targets": [ { - "datasource": { - "type": "prometheus", - "uid": "P1809F7CD0C75ACF3" - }, + "datasource": "${DS_DS_PROMETHEUS}", "exemplar": true, "expr": "sum(seldon_mesh_seldon_server_replica_memory_capacity_bytes_gauge{server=\"triton\"})", "hide": false, @@ -735,10 +726,7 @@ "refId": "B" }, { - "datasource": { - "type": "prometheus", - "uid": "P1809F7CD0C75ACF3" - }, + "datasource": "${DS_DS_PROMETHEUS}", "exemplar": true, "expr": "sum(seldon_mesh_seldon_loaded_model_memory_bytes_gauge{server=\"triton\"})", "hide": false, @@ -747,10 +735,7 @@ "refId": "C" }, { - "datasource": { - "type": "prometheus", - "uid": "P1809F7CD0C75ACF3" - }, + "datasource": "${DS_DS_PROMETHEUS}", "exemplar": true, "expr": "sum(seldon_mesh_seldon_server_replica_memory_capacity_overcommit_bytes_gauge{server=\"triton\"})", "hide": false, @@ -759,10 +744,7 @@ "refId": "A" }, { - "datasource": { - "type": "prometheus", - "uid": "P1809F7CD0C75ACF3" - }, + "datasource": "${DS_DS_PROMETHEUS}", "exemplar": true, "expr": "sum(seldon_mesh_seldon_loaded_model_memory_bytes_gauge{server=\"triton\"}) + sum(seldon_mesh_seldon_evicted_model_memory_bytes_gauge{server=\"triton\"})", "hide": false, @@ -776,10 +758,7 @@ "type": "timeseries" }, { - "datasource": { - "type": "prometheus", - "uid": "P1809F7CD0C75ACF3" - }, + "datasource": "${DS_DS_PROMETHEUS}", "description": "", "fieldConfig": { "defaults": { @@ -854,10 +833,7 @@ "pluginVersion": "8.4.6", "targets": [ { - "datasource": { - "type": "prometheus", - "uid": "P1809F7CD0C75ACF3" - }, + "datasource": "${DS_DS_PROMETHEUS}", "exemplar": true, "expr": "sum(seldon_mesh_seldon_server_replica_memory_capacity_bytes_gauge{server=\"mlserver\"})", "hide": false, @@ -866,10 +842,7 @@ "refId": "B" }, { - "datasource": { - "type": "prometheus", - "uid": "P1809F7CD0C75ACF3" - }, + "datasource": "${DS_DS_PROMETHEUS}", "exemplar": true, "expr": "sum(seldon_mesh_seldon_loaded_model_memory_bytes_gauge{server=\"mlserver\"})", "hide": false, @@ -878,10 +851,7 @@ "refId": "C" }, { - "datasource": { - "type": "prometheus", - "uid": "P1809F7CD0C75ACF3" - }, + "datasource": "${DS_DS_PROMETHEUS}", "exemplar": true, "expr": "sum(seldon_mesh_seldon_server_replica_memory_capacity_overcommit_bytes_gauge{server=\"mlserver\"})", "hide": false, @@ -890,10 +860,7 @@ "refId": "A" }, { - "datasource": { - "type": "prometheus", - "uid": "P1809F7CD0C75ACF3" - }, + "datasource": "${DS_DS_PROMETHEUS}", "exemplar": true, "expr": "sum(seldon_mesh_seldon_loaded_model_memory_bytes_gauge{server=\"mlserver\"}) + sum(seldon_mesh_seldon_evicted_model_memory_bytes_gauge{server=\"mlserver\"})", "hide": false, @@ -907,10 +874,7 @@ "type": "timeseries" }, { - "datasource": { - "type": "prometheus", - "uid": "P1809F7CD0C75ACF3" - }, + "datasource": "${DS_DS_PROMETHEUS}", "fieldConfig": { "defaults": { "color": { @@ -982,10 +946,7 @@ }, "targets": [ { - "datasource": { - "type": "prometheus", - "uid": "P1809F7CD0C75ACF3" - }, + "datasource": "${DS_DS_PROMETHEUS}", "exemplar": true, "expr": "sum(container_memory_working_set_bytes{container=\"mlserver\"}) ", "interval": "10s", @@ -993,10 +954,7 @@ "refId": "A" }, { - "datasource": { - "type": "prometheus", - "uid": "P1809F7CD0C75ACF3" - }, + "datasource": "${DS_DS_PROMETHEUS}", "exemplar": true, "expr": "sum(container_memory_working_set_bytes{container=\"triton\"})", "hide": false, @@ -1009,10 +967,7 @@ "type": "timeseries" }, { - "datasource": { - "type": "prometheus", - "uid": "P1809F7CD0C75ACF3" - }, + "datasource": "${DS_DS_PROMETHEUS}", "description": "", "fieldConfig": { "defaults": { @@ -1089,12 +1044,9 @@ "pluginVersion": "8.4.6", "targets": [ { - "datasource": { - "type": "prometheus", - "uid": "P1809F7CD0C75ACF3" - }, + "datasource": "${DS_DS_PROMETHEUS}", "exemplar": true, - "expr": "avg((rate(seldon_mesh_seldon_aggregate_infer_seconds_total{container=\"agent\"}[1m]) / rate(seldon_mesh_seldon_aggregate_infer_total{container=\"agent\"}[1m])) > 0 ) by (server, method_type)", + "expr": "avg((rate(seldon_mesh_seldon_model_aggregate_infer_seconds_total{container=\"agent\"}[1m]) / rate(seldon_mesh_seldon_model_aggregate_infer_total{container=\"agent\"}[1m])) > 0 ) by (server, method_type)", "hide": false, "interval": "", "legendFormat": "{{server}}_{{method_type}}_avg", @@ -1106,10 +1058,7 @@ "type": "timeseries" }, { - "datasource": { - "type": "prometheus", - "uid": "P1809F7CD0C75ACF3" - }, + "datasource": "${DS_DS_PROMETHEUS}", "fieldConfig": { "defaults": { "color": { @@ -1180,10 +1129,7 @@ }, "targets": [ { - "datasource": { - "type": "prometheus", - "uid": "P1809F7CD0C75ACF3" - }, + "datasource": "${DS_DS_PROMETHEUS}", "exemplar": true, "expr": "rate (container_cpu_usage_seconds_total{container=\"mlserver\"}[1m])", "interval": "10s", @@ -1191,10 +1137,7 @@ "refId": "A" }, { - "datasource": { - "type": "prometheus", - "uid": "P1809F7CD0C75ACF3" - }, + "datasource": "${DS_DS_PROMETHEUS}", "exemplar": true, "expr": "rate (container_cpu_usage_seconds_total{container=\"triton\"}[1m])", "hide": false, @@ -1208,7 +1151,7 @@ } ], "refresh": "30s", - "schemaVersion": 35, + "schemaVersion": 30, "style": "dark", "tags": [], "templating": { @@ -1221,7 +1164,6 @@ "timepicker": {}, "timezone": "", "title": "Seldon Core Model Mesh Monitoring", - "uid": "MHloCP_7z", - "version": 7, - "weekStart": "" + "uid": "y5MkDIkVz", + "version": 1 } \ No newline at end of file diff --git a/samples/models/models.yaml b/samples/models/models.yaml deleted file mode 100644 index e8eda7f213..0000000000 --- a/samples/models/models.yaml +++ /dev/null @@ -1,87 +0,0 @@ ---- -# Simple. Will be placed on the first available server. -# Eventually will be automated by finding appropriate server. - -apiVersion: mlops.seldon.io/v1alpha1 -kind: Model -metadata: - name: model1 -spec: - storageUri: "gs://seldon-models/sklearn/iris-0.23.2/lr_model" - ---- -# explicit model type - -apiVersion: mlops.seldon.io/v1alpha1 -kind: Model -metadata: - name: model2 -spec: - storageUri: "gs://seldon-models/sklearn/iris-0.23.2/lr_model" - modelType: sklearn - ---- -# explicit requirements - -apiVersion: mlops.seldon.io/v1alpha1 -kind: Model -metadata: - name: model2 -spec: - storageUri: "gs://seldon-models/sklearn/iris-0.23.2/lr_model" - requirements: - - "production" - ---- -# Memory required specified - -apiVersion: mlops.seldon.io/v1alpha1 -kind: Model -metadata: - name: model3 -spec: - storageUri: "gs://seldon-models/sklearn/iris-0.23.2/lr_model" - requirements: - - "sklearn-1.0" - memory: "1G" - ---- -# Server specified - -apiVersion: mlops.seldon.io/v1alpha1 -kind: Model -metadata: - name: model4 -spec: - storageUri: "gs://seldon-models/sklearn/iris-0.23.2/lr_model" - requirements: - - "sklearn-1.0" - memory: "1G" - server: "myserver" - ---- -# replicas - -apiVersion: mlops.seldon.io/v1alpha1 -kind: Model -metadata: - name: model3 -spec: - storageUri: "gs://seldon-models/sklearn/iris-0.23.2/lr_model" - requirements: - - "sklearn-1.0" - memory: "1G" - replicas: 10 - ---- -# logger - -apiVersion: mlops.seldon.io/v1alpha1 -kind: Model -metadata: - name: model3 -spec: - storageUri: "gs://seldon-models/sklearn/iris-0.23.2/lr_model" - logger: - uri: http://logger.seldon - percent: 25 \ No newline at end of file diff --git a/samples/models/sklearn-iris-gs.yaml b/samples/models/sklearn-iris-gs.yaml index 13086b706d..cba127bc91 100644 --- a/samples/models/sklearn-iris-gs.yaml +++ b/samples/models/sklearn-iris-gs.yaml @@ -7,3 +7,4 @@ spec: storageUri: "gs://seldon-models/mlserver/iris" requirements: - sklearn + memory: 100Ki diff --git a/samples/models/tfsimple1.yaml b/samples/models/tfsimple1.yaml index 0dcbb878c4..54888a49d6 100644 --- a/samples/models/tfsimple1.yaml +++ b/samples/models/tfsimple1.yaml @@ -7,3 +7,4 @@ spec: storageUri: "gs://seldon-models/triton/simple" requirements: - tensorflow + memory: 100Ki diff --git a/samples/models/tfsimple2.yaml b/samples/models/tfsimple2.yaml index f16939f798..b3a35e41d5 100644 --- a/samples/models/tfsimple2.yaml +++ b/samples/models/tfsimple2.yaml @@ -7,3 +7,4 @@ spec: storageUri: "gs://seldon-models/triton/simple" requirements: - tensorflow + memory: 100Ki diff --git a/samples/models/tfsimple3.yaml b/samples/models/tfsimple3.yaml index 09b6b642e4..91c3dd3296 100644 --- a/samples/models/tfsimple3.yaml +++ b/samples/models/tfsimple3.yaml @@ -7,3 +7,4 @@ spec: storageUri: "gs://seldon-models/triton/simple" requirements: - tensorflow + memory: 100Ki diff --git a/scheduler/Makefile b/scheduler/Makefile index c4204328f7..6c087c2c53 100644 --- a/scheduler/Makefile +++ b/scheduler/Makefile @@ -197,8 +197,16 @@ kind-image-install-pipelinegateway: kind-image-install-dataflow: kind load -v 3 docker-image ${DATAFLOW_IMG} --name ${KIND_NAME} +.PHONY: kind-image-install-mlserver +kind-image-install-mlserver: + kind load -v 3 docker-image ${MLSERVER_IMG} --name ${KIND_NAME} + +.PHONY: kind-image-install-triton +kind-image-install-triton: + kind load -v 3 docker-image ${TRITON_IMG} --name ${KIND_NAME} + .PHONY: kind-image-install-all -kind-image-install-all: kind-image-install-scheduler kind-image-install-envoy kind-image-install-agent kind-image-install-rclone kind-image-install-modelgateway kind-image-install-pipelinegateway kind-image-install-dataflow +kind-image-install-all: kind-image-install-scheduler kind-image-install-envoy kind-image-install-agent kind-image-install-rclone kind-image-install-modelgateway kind-image-install-pipelinegateway kind-image-install-dataflow kind-image-install-mlserver kind-image-install-triton ##################################### # Start with Docker Compose @@ -494,6 +502,10 @@ stop-prometheus: start-prometheus-host: ${DOCKER_COMPOSE_SERVICE_HOST_COMMAND} up -d prometheus +.PHONY: stop-prometheus-host +stop-prometheus-host: + ${DOCKER_COMPOSE_SERVICE_HOST_COMMAND} rm --stop --force prometheus + .PHONY: start-grafana start-grafana: ${DOCKER_COMPOSE_SERVICE_COMMAND} up -d grafana diff --git a/scheduler/all-base.yaml b/scheduler/all-base.yaml index 11d2ef960a..a69a8194d5 100644 --- a/scheduler/all-base.yaml +++ b/scheduler/all-base.yaml @@ -141,8 +141,6 @@ services: prometheus: image: prom/prometheus:latest - ports: - - "9090:9090" rclone-mlserver: build: diff --git a/scheduler/all-host-network.yaml b/scheduler/all-host-network.yaml index 6acdf5b5c6..69919eb083 100644 --- a/scheduler/all-host-network.yaml +++ b/scheduler/all-host-network.yaml @@ -134,6 +134,7 @@ services: - "/mnt/config/tracing-host.json" prometheus: + network_mode: "host" command: - --config.file=/etc/prometheus/prometheus-host.yml volumes: diff --git a/scheduler/all-internal.yaml b/scheduler/all-internal.yaml index 12f1ef62a6..2c290b49bf 100644 --- a/scheduler/all-internal.yaml +++ b/scheduler/all-internal.yaml @@ -159,6 +159,8 @@ services: - ${PIPELINEGATEWAY_HTTP_PORT} - "--grpc-port" - ${PIPELINEGATEWAY_GRPC_PORT} + - "--metrics-port" + - ${PIPELINEGATEWAY_METRICS_PORT} - "--tracing-config-path" - "/mnt/config/tracing-internal.json" volumes: @@ -172,6 +174,8 @@ services: prometheus: command: - --config.file=/etc/prometheus/prometheus-internal.yml + ports: + - "9090:9090" volumes: - type: bind source: ./config diff --git a/scheduler/cmd/agent/main.go b/scheduler/cmd/agent/main.go index c11f1b46a2..93091f7d0b 100644 --- a/scheduler/cmd/agent/main.go +++ b/scheduler/cmd/agent/main.go @@ -164,7 +164,7 @@ func main() { // Create V2 Protocol Handler v2Client := agent.NewV2Client(cli.InferenceHost, cli.InferenceGrpcPort, logger, true) - promMetrics, err := metrics.NewPrometheusMetrics(cli.ServerName, cli.ReplicaIdx, cli.Namespace, logger) + promMetrics, err := metrics.NewPrometheusModelMetrics(cli.ServerName, cli.ReplicaIdx, cli.Namespace, logger) if err != nil { logger.WithError(err).Fatalf("Can't create prometheus metrics") } diff --git a/scheduler/cmd/pipelinegateway/main.go b/scheduler/cmd/pipelinegateway/main.go index d3cc9e5191..879ee82f7e 100644 --- a/scheduler/cmd/pipelinegateway/main.go +++ b/scheduler/cmd/pipelinegateway/main.go @@ -118,7 +118,7 @@ func main() { } defer km.Stop() - promMetrics, err := metrics.NewPrometheusMetrics(serviceTag, 0, namespace, logger) + promMetrics, err := metrics.NewPrometheusPipelineMetrics(namespace, logger) if err != nil { logger.WithError(err).Fatalf("Can't create prometheus metrics") } diff --git a/scheduler/config/grafana/dashboards/seldon_overview/seldon.json b/scheduler/config/grafana/dashboards/seldon_overview/seldon.json index 4ce33689a6..31ca5a16ca 100644 --- a/scheduler/config/grafana/dashboards/seldon_overview/seldon.json +++ b/scheduler/config/grafana/dashboards/seldon_overview/seldon.json @@ -795,7 +795,7 @@ { "datasource": "prometheus", "exemplar": true, - "expr": "avg((rate(seldon_aggregate_infer_seconds_total[1m]) / rate(seldon_aggregate_infer_total[1m])) > 0 ) by (server, method_type)", + "expr": "avg((rate(seldon_model_aggregate_infer_seconds_total[1m]) / rate(seldon_model_aggregate_infer_total[1m])) > 0 ) by (server, method_type)", "hide": false, "interval": "", "legendFormat": "{{server}}_{{method_type}}_avg", diff --git a/scheduler/config/prometheus-host.yml b/scheduler/config/prometheus-host.yml index f1b60db508..a91f81d43b 100644 --- a/scheduler/config/prometheus-host.yml +++ b/scheduler/config/prometheus-host.yml @@ -3,4 +3,5 @@ scrape_configs: scrape_interval: 10s static_configs: - targets: ['0.0.0.0:9006'] - - targets: ['0.0.0.0:9007'] + - targets: ['0.0.0.0:9007'] + - targets: ['0.0.0.0:9009'] diff --git a/scheduler/config/prometheus-internal.yml b/scheduler/config/prometheus-internal.yml index 7f8f74da19..c612b84c41 100644 --- a/scheduler/config/prometheus-internal.yml +++ b/scheduler/config/prometheus-internal.yml @@ -3,4 +3,5 @@ scrape_configs: scrape_interval: 10s static_configs: - targets: ['agent-mlserver:9006'] - - targets: ['agent-triton:9006'] + - targets: ['agent-triton:9006'] + - targets: ['pipelinegateway:9009'] diff --git a/scheduler/pkg/agent/client.go b/scheduler/pkg/agent/client.go index f2c61c4cb7..3eaa4b8c91 100644 --- a/scheduler/pkg/agent/client.go +++ b/scheduler/pkg/agent/client.go @@ -44,7 +44,7 @@ type Client struct { rpHTTP ClientServiceInterface rpGRPC ClientServiceInterface clientDebugService ClientServiceInterface - metrics metrics.MetricsHandler + metrics metrics.AgentMetricsHandler ClientServices SchedulerGrpcClientOptions KubernetesOptions @@ -90,7 +90,7 @@ func NewClient(serverName string, reverseProxyHTTP ClientServiceInterface, reverseProxyGRPC ClientServiceInterface, clientDebugService ClientServiceInterface, - metrics metrics.MetricsHandler, + metrics metrics.AgentMetricsHandler, ) *Client { opts := []grpc.CallOption{ diff --git a/scheduler/pkg/agent/rproxy.go b/scheduler/pkg/agent/rproxy.go index 5e096106a1..e7748f0ef7 100644 --- a/scheduler/pkg/agent/rproxy.go +++ b/scheduler/pkg/agent/rproxy.go @@ -40,13 +40,14 @@ type reverseHTTPProxy struct { backendHTTPServerPort uint servicePort uint mu sync.RWMutex - metrics metrics.MetricsHandler + metrics metrics.AgentMetricsHandler } // in the case the model is not loaded on server (return 404), we attempt to load it and then retry request type lazyModelLoadTransport struct { loader func(string) *V2Err http.RoundTripper + metrics metrics.AgentMetricsHandler } // RoundTrip implements http.RoundTripper for the Transport type. @@ -56,6 +57,9 @@ func (t *lazyModelLoadTransport) RoundTrip(req *http.Request) (*http.Response, e var originalBody []byte var err error + externalModelName := req.Header.Get(resources.SeldonModelHeader) + internalModelName := req.Header.Get(resources.SeldonInternalModelHeader) + startTime := time.Now() if req.Body != nil { originalBody, err = ioutil.ReadAll(req.Body) } @@ -77,8 +81,12 @@ func (t *lazyModelLoadTransport) RoundTrip(req *http.Request) (*http.Response, e req2 := req.Clone(req.Context()) req2.Body = ioutil.NopCloser(bytes.NewBuffer(originalBody)) - return t.RoundTripper.RoundTrip(req2) + res, err = t.RoundTripper.RoundTrip(req2) } + + elapsedTime := time.Since(startTime).Seconds() + go t.metrics.AddModelInferMetrics(externalModelName, internalModelName, metrics.MethodTypeRest, elapsedTime, metrics.HttpCodeToString(res.StatusCode)) + return res, err } @@ -90,7 +98,8 @@ func (rp *reverseHTTPProxy) rewriteHostHandler(r *http.Request) { } func (rp *reverseHTTPProxy) addHandlers(proxy http.Handler) http.Handler { - return otelhttp.NewHandler(rp.metrics.AddHistogramMetricsHandler(func(w http.ResponseWriter, r *http.Request) { + return otelhttp.NewHandler(rp.metrics.AddModelHistogramMetricsHandler(func(w http.ResponseWriter, r *http.Request) { + startTime := time.Now() rp.rewriteHostHandler(r) externalModelName := r.Header.Get(resources.SeldonModelHeader) @@ -104,18 +113,16 @@ func (rp *reverseHTTPProxy) addHandlers(proxy http.Handler) http.Handler { rp.logger.Debugf("Extracted model name %s:%s %s:%s", resources.SeldonInternalModelHeader, internalModelName, resources.SeldonModelHeader, externalModelName) } - startTime := time.Now() if err := rp.stateManager.EnsureLoadModel(internalModelName); err != nil { rp.logger.Errorf("Cannot load model in agent %s", internalModelName) + elapsedTime := time.Since(startTime).Seconds() + go rp.metrics.AddModelInferMetrics(externalModelName, internalModelName, metrics.MethodTypeRest, elapsedTime, metrics.HttpCodeToString(http.StatusNotFound)) http.NotFound(w, r) } else { r.URL.Path = rewritePath(r.URL.Path, internalModelName) rp.logger.Debugf("Calling %s", r.URL.Path) proxy.ServeHTTP(w, r) - - elapsedTime := time.Since(startTime).Seconds() - go rp.metrics.AddInferMetrics(externalModelName, internalModelName, metrics.MethodTypeRest, elapsedTime) } }), "seldon-rproxy") } @@ -135,7 +142,7 @@ func (rp *reverseHTTPProxy) Start() error { MaxConnsPerHost: maxConnsPerHostHTTP, IdleConnTimeout: idleConnTimeoutSeconds * time.Second, } - proxy.Transport = &lazyModelLoadTransport{rp.stateManager.v2Client.LoadModel, t} + proxy.Transport = &lazyModelLoadTransport{rp.stateManager.v2Client.LoadModel, t, rp.metrics} rp.logger.Infof("Start reverse proxy on port %d for %s", rp.servicePort, backend) rp.server = &http.Server{Addr: ":" + strconv.Itoa(int(rp.servicePort)), Handler: rp.addHandlers(proxy)} // TODO: check for errors? we rely for now on Ready @@ -188,7 +195,7 @@ func NewReverseHTTPProxy( backendHTTPServerHost string, backendHTTPServerPort uint, servicePort uint, - metrics metrics.MetricsHandler, + metrics metrics.AgentMetricsHandler, ) *reverseHTTPProxy { rp := reverseHTTPProxy{ diff --git a/scheduler/pkg/agent/rproxy_grpc.go b/scheduler/pkg/agent/rproxy_grpc.go index 9ac586317c..ebec4bca25 100644 --- a/scheduler/pkg/agent/rproxy_grpc.go +++ b/scheduler/pkg/agent/rproxy_grpc.go @@ -43,11 +43,11 @@ type reverseGRPCProxy struct { v2GRPCClientPool []v2.GRPCInferenceServiceClient port uint // service port mu sync.RWMutex - metrics metrics.MetricsHandler + metrics metrics.AgentMetricsHandler callOptions []grpc.CallOption } -func NewReverseGRPCProxy(metricsHandler metrics.MetricsHandler, logger log.FieldLogger, backendGRPCServerHost string, backendGRPCServerPort uint, servicePort uint) *reverseGRPCProxy { +func NewReverseGRPCProxy(metricsHandler metrics.AgentMetricsHandler, logger log.FieldLogger, backendGRPCServerHost string, backendGRPCServerPort uint, servicePort uint) *reverseGRPCProxy { opts := []grpc.CallOption{ grpc.MaxCallSendMsgSize(math.MaxInt32), grpc.MaxCallRecvMsgSize(math.MaxInt32), @@ -156,6 +156,8 @@ func (rp *reverseGRPCProxy) ModelInfer(ctx context.Context, r *v2.ModelInferRequ startTime := time.Now() err = rp.ensureLoadModel(r.ModelName) if err != nil { + elapsedTime := time.Since(startTime).Seconds() + go rp.metrics.AddModelInferMetrics(externalModelName, internalModelName, metrics.MethodTypeGrpc, elapsedTime, codes.NotFound.String()) return nil, status.Error(codes.NotFound, fmt.Sprintf("Model %s not found (err: %s)", r.ModelName, err)) } var trailer metadata.MD @@ -167,8 +169,9 @@ func (rp *reverseGRPCProxy) ModelInfer(ctx context.Context, r *v2.ModelInferRequ resp, err = rp.getV2GRPCClient().ModelInfer(ctx, r, opts...) } + grpcStatus, _ := status.FromError(err) elapsedTime := time.Since(startTime).Seconds() - go rp.metrics.AddInferMetrics(internalModelName, externalModelName, metrics.MethodTypeGrpc, elapsedTime) + go rp.metrics.AddModelInferMetrics(externalModelName, internalModelName, metrics.MethodTypeGrpc, elapsedTime, grpcStatus.Code().String()) errTrailer := grpc.SetTrailer(ctx, trailer) // pass on any trailers set by inference server such as MLServer if errTrailer != nil { logger.WithError(errTrailer).Error("Failed to set trailers") diff --git a/scheduler/pkg/agent/rproxy_test.go b/scheduler/pkg/agent/rproxy_test.go index f09f7d8b29..459898c7bf 100644 --- a/scheduler/pkg/agent/rproxy_test.go +++ b/scheduler/pkg/agent/rproxy_test.go @@ -3,6 +3,7 @@ package agent import ( "bytes" "context" + "fmt" "io" "net" "net/http" @@ -104,11 +105,15 @@ type fakeMetricsHandler struct { mu *sync.Mutex } -func (f fakeMetricsHandler) AddHistogramMetricsHandler(baseHandler http.HandlerFunc) http.HandlerFunc { +func (f fakeMetricsHandler) AddModelHistogramMetricsHandler(baseHandler http.HandlerFunc) http.HandlerFunc { return baseHandler } -func (f fakeMetricsHandler) AddInferMetrics(externalModelName string, internalModelName string, method string, elapsedTime float64) { +func (f fakeMetricsHandler) HttpCodeToString(code int) string { + return fmt.Sprintf("%d", code) +} + +func (f fakeMetricsHandler) AddModelInferMetrics(externalModelName string, internalModelName string, method string, elapsedTime float64, code string) { } func (f fakeMetricsHandler) AddLoadedModelMetrics(internalModelName string, memory uint64, isLoad, isSoft bool) { @@ -382,7 +387,8 @@ func TestLazyLoadRoundTripper(t *testing.T) { g.Expect(err).To(BeNil()) req.Header.Set("contentType", "application/json") httpClient := http.DefaultClient - httpClient.Transport = &lazyModelLoadTransport{loader, http.DefaultTransport} + metricsHandler := newFakeMetricsHandler() + httpClient.Transport = &lazyModelLoadTransport{loader, http.DefaultTransport, metricsHandler} mockMLServerState.setModelServerUnloaded(dummyModel) req.Header.Set(resources.SeldonInternalModelHeader, dummyModel) resp, err := httpClient.Do(req) diff --git a/scheduler/pkg/agent/state_manager.go b/scheduler/pkg/agent/state_manager.go index 222f7b97d2..2ace5a4a9b 100644 --- a/scheduler/pkg/agent/state_manager.go +++ b/scheduler/pkg/agent/state_manager.go @@ -26,7 +26,7 @@ type LocalStateManager struct { availableMainMemoryBytes int64 // lock for `availableMainMemoryBytes` mu sync.RWMutex - metrics metrics.MetricsHandler + metrics metrics.AgentMetricsHandler } // this should be called from control plane (if directly) @@ -356,7 +356,7 @@ func NewLocalStateManager( v2Client *V2Client, totalMainMemoryBytes uint64, overCommitPercentage uint32, - metrics metrics.MetricsHandler, + metrics metrics.AgentMetricsHandler, ) *LocalStateManager { // if we are here it means that it is a fresh instance with no state yet // i.e. should not have any models loaded / cache is empty etc. diff --git a/scheduler/pkg/kafka/pipeline/grpcserver.go b/scheduler/pkg/kafka/pipeline/grpcserver.go index 3a5a415cc4..f4e922340e 100644 --- a/scheduler/pkg/kafka/pipeline/grpcserver.go +++ b/scheduler/pkg/kafka/pipeline/grpcserver.go @@ -27,10 +27,10 @@ type GatewayGrpcServer struct { grpcServer *grpc.Server gateway PipelineInferer logger log.FieldLogger - metrics metrics.MetricsHandler + metrics metrics.PipelineMetricsHandler } -func NewGatewayGrpcServer(port int, logger log.FieldLogger, gateway PipelineInferer, metricsHandler metrics.MetricsHandler) *GatewayGrpcServer { +func NewGatewayGrpcServer(port int, logger log.FieldLogger, gateway PipelineInferer, metricsHandler metrics.PipelineMetricsHandler) *GatewayGrpcServer { return &GatewayGrpcServer{ port: port, gateway: gateway, @@ -57,7 +57,7 @@ func (g *GatewayGrpcServer) Start() error { logger.Infof("Starting grpc server on port %d", g.port) opts := []grpc.ServerOption{} opts = append(opts, grpc.MaxConcurrentStreams(maxConcurrentStreams)) - opts = append(opts, grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(otelgrpc.UnaryServerInterceptor(), g.metrics.UnaryServerInterceptor()))) + opts = append(opts, grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(otelgrpc.UnaryServerInterceptor(), g.metrics.PipelineUnaryServerInterceptor()))) g.grpcServer = grpc.NewServer(opts...) v2.RegisterGRPCInferenceServiceServer(g.grpcServer, g) return g.grpcServer.Serve(l) @@ -95,22 +95,25 @@ func (g *GatewayGrpcServer) ModelInfer(ctx context.Context, r *v2.ModelInferRequ return nil, status.Errorf(codes.FailedPrecondition, err.Error()) } kafkaRequest, err := g.gateway.Infer(ctx, resourceName, isModel, b, convertGrpcMetadataToKafkaHeaders(md)) + elapsedTime := time.Since(startTime).Seconds() if err != nil { + go g.metrics.AddPipelineInferMetrics(resourceName, metrics.MethodTypeGrpc, elapsedTime, codes.FailedPrecondition.String()) return nil, status.Errorf(codes.FailedPrecondition, err.Error()) } meta := convertKafkaHeadersToGrpcMetadata(kafkaRequest.headers) meta[RequestIdHeader] = []string{kafkaRequest.key} err = grpc.SendHeader(ctx, meta) if err != nil { + go g.metrics.AddPipelineInferMetrics(resourceName, metrics.MethodTypeGrpc, elapsedTime, codes.Internal.String()) return nil, status.Errorf(codes.Internal, err.Error()) } resProto := &v2.ModelInferResponse{} err = proto.Unmarshal(kafkaRequest.response, resProto) if err != nil { + go g.metrics.AddPipelineInferMetrics(resourceName, metrics.MethodTypeGrpc, elapsedTime, codes.Internal.String()) return nil, status.Errorf(codes.FailedPrecondition, err.Error()) } - elapsedTime := time.Since(startTime).Seconds() - go g.metrics.AddInferMetrics(header, "", metrics.MethodTypeGrpc, elapsedTime) + go g.metrics.AddPipelineInferMetrics(resourceName, metrics.MethodTypeGrpc, elapsedTime, codes.OK.String()) return resProto, nil } diff --git a/scheduler/pkg/kafka/pipeline/grpcserver_test.go b/scheduler/pkg/kafka/pipeline/grpcserver_test.go index 609b1df5fe..574b206962 100644 --- a/scheduler/pkg/kafka/pipeline/grpcserver_test.go +++ b/scheduler/pkg/kafka/pipeline/grpcserver_test.go @@ -17,22 +17,16 @@ import ( "google.golang.org/protobuf/proto" ) -type fakeMetricsHandler struct{} +type fakePipelineMetricsHandler struct{} -func (f fakeMetricsHandler) AddHistogramMetricsHandler(baseHandler http.HandlerFunc) http.HandlerFunc { +func (f fakePipelineMetricsHandler) AddPipelineHistogramMetricsHandler(baseHandler http.HandlerFunc) http.HandlerFunc { return baseHandler } -func (f fakeMetricsHandler) AddInferMetrics(externalModelName string, internalModelName string, method string, elapsedTime float64) { +func (f fakePipelineMetricsHandler) AddPipelineInferMetrics(pipelineName string, method string, elapsedTime float64, code string) { } -func (f fakeMetricsHandler) AddLoadedModelMetrics(internalModelName string, memory uint64, isLoad, isSoft bool) { -} - -func (f fakeMetricsHandler) AddServerReplicaMetrics(memory uint64, memoryWithOvercommit float32) { -} - -func (f fakeMetricsHandler) UnaryServerInterceptor() func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { +func (f fakePipelineMetricsHandler) PipelineUnaryServerInterceptor() func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { return handler(ctx, req) } @@ -104,7 +98,7 @@ func TestGrpcServer(t *testing.T) { data: []byte("result"), key: testRequestId, } - grpcServer := NewGatewayGrpcServer(port, logrus.New(), mockInferer, fakeMetricsHandler{}) + grpcServer := NewGatewayGrpcServer(port, logrus.New(), mockInferer, fakePipelineMetricsHandler{}) go func() { err := grpcServer.Start() g.Expect(err).To(BeNil()) diff --git a/scheduler/pkg/kafka/pipeline/httpserver.go b/scheduler/pkg/kafka/pipeline/httpserver.go index b15b241eeb..5b00168e52 100644 --- a/scheduler/pkg/kafka/pipeline/httpserver.go +++ b/scheduler/pkg/kafka/pipeline/httpserver.go @@ -28,7 +28,7 @@ type GatewayHttpServer struct { logger log.FieldLogger ssl *TLSDetails gateway PipelineInferer - metrics metrics.MetricsHandler + metrics metrics.PipelineMetricsHandler } type TLSDetails struct { @@ -37,7 +37,7 @@ type TLSDetails struct { KeyFilename string } -func NewGatewayHttpServer(port int, logger log.FieldLogger, ssl *TLSDetails, gateway PipelineInferer, metrics metrics.MetricsHandler) *GatewayHttpServer { +func NewGatewayHttpServer(port int, logger log.FieldLogger, ssl *TLSDetails, gateway PipelineInferer, metrics metrics.PipelineMetricsHandler) *GatewayHttpServer { return &GatewayHttpServer{ port: port, router: mux.NewRouter(), @@ -97,10 +97,10 @@ func (g *GatewayHttpServer) setupRoutes() { g.router.Use(otelmux.Middleware("pipelinegateway")) g.router.NewRoute().Path( "/v2/models/{" + ResourceNameVariable + "}/infer").HandlerFunc( - g.metrics.AddHistogramMetricsHandler(g.inferModel)) + g.metrics.AddPipelineHistogramMetricsHandler(g.inferModel)) g.router.NewRoute().Path( "/v2/pipelines/{" + ResourceNameVariable + "}/infer").HandlerFunc( - g.metrics.AddHistogramMetricsHandler(g.inferModel)) + g.metrics.AddPipelineHistogramMetricsHandler(g.inferModel)) } func (g *GatewayHttpServer) infer(w http.ResponseWriter, req *http.Request, resourceName string, isModel bool) { @@ -119,6 +119,7 @@ func (g *GatewayHttpServer) infer(w http.ResponseWriter, req *http.Request, reso return } kafkaRequest, err := g.gateway.Infer(req.Context(), resourceName, isModel, dataProto, convertHttpHeadersToKafkaHeaders(req.Header)) + elapsedTime := time.Since(startTime).Seconds() for k, vals := range convertKafkaHeadersToHttpHeaders(kafkaRequest.headers) { for _, val := range vals { w.Header().Set(k, val) @@ -132,6 +133,7 @@ func (g *GatewayHttpServer) infer(w http.ResponseWriter, req *http.Request, reso resJson, err := ConvertV2ResponseBytesToJson(kafkaRequest.response) if err != nil { logger.WithError(err).Errorf("Failed to convert v2 response to json for resource %s", resourceName) + go g.metrics.AddPipelineInferMetrics(resourceName, metrics.MethodTypeRest, elapsedTime, metrics.HttpCodeToString(http.StatusInternalServerError)) w.WriteHeader(http.StatusInternalServerError) return } @@ -140,9 +142,7 @@ func (g *GatewayHttpServer) infer(w http.ResponseWriter, req *http.Request, reso w.WriteHeader(http.StatusInternalServerError) } else { w.WriteHeader(http.StatusOK) - elapsedTime := time.Since(startTime).Seconds() - go g.metrics.AddInferMetrics(resourceName, "", metrics.MethodTypeRest, elapsedTime) - + go g.metrics.AddPipelineInferMetrics(resourceName, metrics.MethodTypeRest, elapsedTime, metrics.HttpCodeToString(http.StatusOK)) } } } diff --git a/scheduler/pkg/kafka/pipeline/httpserver_test.go b/scheduler/pkg/kafka/pipeline/httpserver_test.go index c8c52c60a6..eb554f03df 100644 --- a/scheduler/pkg/kafka/pipeline/httpserver_test.go +++ b/scheduler/pkg/kafka/pipeline/httpserver_test.go @@ -131,7 +131,7 @@ func TestHttpServer(t *testing.T) { data: []byte("result"), key: testRequestId, } - httpServer := NewGatewayHttpServer(port, logrus.New(), nil, mockInferer, fakeMetricsHandler{}) + httpServer := NewGatewayHttpServer(port, logrus.New(), nil, mockInferer, fakePipelineMetricsHandler{}) go func() { err := httpServer.Start() g.Expect(err).To(Equal(http.ErrServerClosed)) diff --git a/scheduler/pkg/metrics/prometheus.go b/scheduler/pkg/metrics/agent.go similarity index 73% rename from scheduler/pkg/metrics/prometheus.go rename to scheduler/pkg/metrics/agent.go index 824b61c026..d36d43fc47 100644 --- a/scheduler/pkg/metrics/prometheus.go +++ b/scheduler/pkg/metrics/agent.go @@ -18,11 +18,13 @@ import ( const ( // start list of metrics - HistogramName = "seldon_infer_api_seconds" - InferCounterName = "seldon_infer_total" - InferLatencyCounterName = "seldon_infer_seconds_total" - AggregateInferCounterName = "seldon_aggregate_infer_total" - AggregateInferLatencyCounterName = "seldon_aggregate_infer_seconds_total" + // Model metrics + ModelHistogramName = "seldon_model_infer_api_seconds" + ModelInferCounterName = "seldon_model_infer_total" + ModelInferLatencyCounterName = "seldon_model_infer_seconds_total" + ModelAggregateInferCounterName = "seldon_model_aggregate_infer_total" + ModelAggregateInferLatencyCounterName = "seldon_model_aggregate_infer_seconds_total" + // Agent metrics CacheEvictCounterName = "seldon_cache_evict_count" CacheMissCounterName = "seldon_cache_miss_count" LoadModelCounterName = "seldon_load_model_counter" @@ -46,10 +48,10 @@ const ( ) //TODO Revisit splitting this interface as metric handling matures -type MetricsHandler interface { - AddHistogramMetricsHandler(baseHandler http.HandlerFunc) http.HandlerFunc +type AgentMetricsHandler interface { + AddModelHistogramMetricsHandler(baseHandler http.HandlerFunc) http.HandlerFunc UnaryServerInterceptor() func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) - AddInferMetrics(externalModelName string, internalModelName string, method string, elapsedTime float64) + AddModelInferMetrics(externalModelName string, internalModelName string, method string, elapsedTime float64, code string) AddLoadedModelMetrics(internalModelName string, memory uint64, isLoad, isSoft bool) AddServerReplicaMetrics(memory uint64, memoryWithOvercommit float32) } @@ -59,15 +61,16 @@ var ( ) type PrometheusMetrics struct { - serverName string - serverReplicaIdx string - namespace string - logger log.FieldLogger - histogram *prometheus.HistogramVec - inferCounter *prometheus.CounterVec - inferLatencyCounter *prometheus.CounterVec - aggregateInferCounter *prometheus.CounterVec - aggregateInferLatencyCounter *prometheus.CounterVec + serverName string + serverReplicaIdx string + namespace string + logger log.FieldLogger + // Model metrics + modelHistogram *prometheus.HistogramVec + modelInferCounter *prometheus.CounterVec + modelInferLatencyCounter *prometheus.CounterVec + modelAggregateInferCounter *prometheus.CounterVec + modelAggregateInferLatencyCounter *prometheus.CounterVec cacheEvictCounter *prometheus.CounterVec cacheMissCounter *prometheus.CounterVec loadModelCounter *prometheus.CounterVec @@ -80,30 +83,30 @@ type PrometheusMetrics struct { server *http.Server } -func NewPrometheusMetrics(serverName string, serverReplicaIdx uint, namespace string, logger log.FieldLogger) (*PrometheusMetrics, error) { +func NewPrometheusModelMetrics(serverName string, serverReplicaIdx uint, namespace string, logger log.FieldLogger) (*PrometheusMetrics, error) { namespace = safeNamespaceName(namespace) - histogram, err := createHistogram(namespace) + histogram, err := createModelHistogram(namespace) if err != nil { return nil, err } - inferCounter, err := createInferCounter(namespace) + inferCounter, err := createModelInferCounter(namespace) if err != nil { return nil, err } - inferLatencyCounter, err := createInferLatencyCounter(namespace) + inferLatencyCounter, err := createModelInferLatencyCounter(namespace) if err != nil { return nil, err } - aggregateInferCounter, err := createAggregateInferCounter(namespace) + aggregateInferCounter, err := createModelAggregateInferCounter(namespace) if err != nil { return nil, err } - aggregateInferLatencyCounter, err := createAggregateInferLatencyCounter(namespace) + aggregateInferLatencyCounter, err := createModelAggregateInferLatencyCounter(namespace) if err != nil { return nil, err } @@ -154,23 +157,23 @@ func NewPrometheusMetrics(serverName string, serverReplicaIdx uint, namespace st } return &PrometheusMetrics{ - serverName: serverName, - serverReplicaIdx: fmt.Sprintf("%d", serverReplicaIdx), - namespace: namespace, - logger: logger.WithField("source", "PrometheusMetrics"), - histogram: histogram, - inferCounter: inferCounter, - inferLatencyCounter: inferLatencyCounter, - aggregateInferCounter: aggregateInferCounter, - aggregateInferLatencyCounter: aggregateInferLatencyCounter, - cacheEvictCounter: cacheEvictCounter, - cacheMissCounter: cacheMissCounter, - loadModelCounter: loadModelCounter, - unloadModelCounter: unloadModelCounter, - loadedModelGauge: loadedModelGauge, - loadedModelMemoryGauge: loadedModelMemoryGauge, - evictedModelMemoryGauge: evictedModelMemoryGauge, - serverReplicaMemoryCapacityGauge: serverReplicaMemoryCapacityGauge, + serverName: serverName, + serverReplicaIdx: fmt.Sprintf("%d", serverReplicaIdx), + namespace: namespace, + logger: logger.WithField("source", "PrometheusMetrics"), + modelHistogram: histogram, + modelInferCounter: inferCounter, + modelInferLatencyCounter: inferLatencyCounter, + modelAggregateInferCounter: aggregateInferCounter, + modelAggregateInferLatencyCounter: aggregateInferLatencyCounter, + cacheEvictCounter: cacheEvictCounter, + cacheMissCounter: cacheMissCounter, + loadModelCounter: loadModelCounter, + unloadModelCounter: unloadModelCounter, + loadedModelGauge: loadedModelGauge, + loadedModelMemoryGauge: loadedModelMemoryGauge, + evictedModelMemoryGauge: evictedModelMemoryGauge, + serverReplicaMemoryCapacityGauge: serverReplicaMemoryCapacityGauge, serverReplicaMemoryCapacityWithOvercommitGauge: serverReplicaMemoryCapacityWithOvercommitGauge, }, nil } @@ -179,13 +182,13 @@ func safeNamespaceName(namespace string) string { return strings.ReplaceAll(namespace, "-", "_") } -func createHistogram(namespace string) (*prometheus.HistogramVec, error) { +func createModelHistogram(namespace string) (*prometheus.HistogramVec, error) { //TODO add method for rest/grpc labelNames := []string{SeldonServerMetric, SeldonServerReplicaMetric, MethodMetric, CodeMetric} histogram := prometheus.NewHistogramVec( prometheus.HistogramOpts{ - Name: HistogramName, + Name: ModelHistogramName, Namespace: namespace, Help: "A histogram of latencies for inference server", Buckets: DefaultHistogramBuckets, @@ -203,31 +206,31 @@ func createHistogram(namespace string) (*prometheus.HistogramVec, error) { return histogram, nil } -func createInferCounter(namespace string) (*prometheus.CounterVec, error) { - labelNames := []string{SeldonServerMetric, SeldonServerReplicaMetric, SeldonModelMetric, SeldonInternalModelMetric, MethodTypeMetric} +func createModelInferCounter(namespace string) (*prometheus.CounterVec, error) { + labelNames := []string{SeldonServerMetric, SeldonServerReplicaMetric, SeldonModelMetric, SeldonInternalModelMetric, MethodTypeMetric, CodeMetric} return createCounterVec( - InferCounterName, "A count of server inference calls", + ModelInferCounterName, "A count of server inference calls", namespace, labelNames) } -func createInferLatencyCounter(namespace string) (*prometheus.CounterVec, error) { - labelNames := []string{SeldonServerMetric, SeldonServerReplicaMetric, SeldonModelMetric, SeldonInternalModelMetric, MethodTypeMetric} +func createModelInferLatencyCounter(namespace string) (*prometheus.CounterVec, error) { + labelNames := []string{SeldonServerMetric, SeldonServerReplicaMetric, SeldonModelMetric, SeldonInternalModelMetric, MethodTypeMetric, CodeMetric} return createCounterVec( - InferLatencyCounterName, "A sum of server inference call latencies", + ModelInferLatencyCounterName, "A sum of server inference call latencies", namespace, labelNames) } -func createAggregateInferCounter(namespace string) (*prometheus.CounterVec, error) { +func createModelAggregateInferCounter(namespace string) (*prometheus.CounterVec, error) { labelNames := []string{SeldonServerMetric, SeldonServerReplicaMetric, MethodTypeMetric} return createCounterVec( - AggregateInferCounterName, "A count of server inference calls (aggregate)", + ModelAggregateInferCounterName, "A count of server inference calls (aggregate)", namespace, labelNames) } -func createAggregateInferLatencyCounter(namespace string) (*prometheus.CounterVec, error) { +func createModelAggregateInferLatencyCounter(namespace string) (*prometheus.CounterVec, error) { labelNames := []string{SeldonServerMetric, SeldonServerReplicaMetric, MethodTypeMetric} return createCounterVec( - AggregateInferLatencyCounterName, "A sum of server inference call latencies (aggregate)", + ModelAggregateInferLatencyCounterName, "A sum of server inference call latencies (aggregate)", namespace, labelNames) } @@ -294,49 +297,9 @@ func createServerReplicaMemoryCapacityWithOvercommitGauge(namespace string) (*pr namespace, labelNames) } -func createCounterVec(counterName, helperName, namespace string, labelNames []string) (*prometheus.CounterVec, error) { - counter := prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: counterName, - Namespace: namespace, - Help: helperName, - }, - labelNames, - ) - err := prometheus.Register(counter) - if err != nil { - if e, ok := err.(prometheus.AlreadyRegisteredError); ok { - counter = e.ExistingCollector.(*prometheus.CounterVec) - } else { - return nil, err - } - } - return counter, nil -} - -func createGaugeVec(gaugeName, helperName, namespace string, labelNames []string) (*prometheus.GaugeVec, error) { - gauge := prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: gaugeName, - Namespace: namespace, - Help: helperName, - }, - labelNames, - ) - err := prometheus.Register(gauge) - if err != nil { - if e, ok := err.(prometheus.AlreadyRegisteredError); ok { - gauge = e.ExistingCollector.(*prometheus.GaugeVec) - } else { - return nil, err - } - } - return gauge, nil -} - -func (pm *PrometheusMetrics) AddHistogramMetricsHandler(baseHandler http.HandlerFunc) http.HandlerFunc { +func (pm *PrometheusMetrics) AddModelHistogramMetricsHandler(baseHandler http.HandlerFunc) http.HandlerFunc { handler := promhttp.InstrumentHandlerDuration( - pm.histogram.MustCurryWith(prometheus.Labels{ + pm.modelHistogram.MustCurryWith(prometheus.Labels{ SeldonServerMetric: pm.serverName, SeldonServerReplicaMetric: pm.serverReplicaIdx, }), @@ -351,14 +314,14 @@ func (pm *PrometheusMetrics) UnaryServerInterceptor() func(ctx context.Context, resp, err := handler(ctx, req) st, _ := status.FromError(err) elapsedTime := time.Since(startTime).Seconds() - pm.histogram.WithLabelValues(pm.serverName, pm.serverReplicaIdx, "grpc", st.Code().String()).Observe(elapsedTime) + pm.modelHistogram.WithLabelValues(pm.serverName, pm.serverReplicaIdx, "grpc", st.Code().String()).Observe(elapsedTime) return resp, err } } -func (pm *PrometheusMetrics) AddInferMetrics(externalModelName string, internalModelName string, method string, latency float64) { - pm.addInferLatency(externalModelName, internalModelName, method, latency) - pm.addInferCount(externalModelName, internalModelName, method) +func (pm *PrometheusMetrics) AddModelInferMetrics(externalModelName string, internalModelName string, method string, latency float64, code string) { + pm.addInferLatency(externalModelName, internalModelName, method, latency, code) + pm.addInferCount(externalModelName, internalModelName, method, code) } func (pm *PrometheusMetrics) AddLoadedModelMetrics(internalModelName string, memBytes uint64, isLoad, isSoft bool) { @@ -476,30 +439,32 @@ func (pm *PrometheusMetrics) addCacheEvictCount() { }).Inc() } -func (pm *PrometheusMetrics) addInferCount(externalModelName, internalModelName, method string) { - pm.inferCounter.With(prometheus.Labels{ +func (pm *PrometheusMetrics) addInferCount(externalModelName, internalModelName, method string, code string) { + pm.modelInferCounter.With(prometheus.Labels{ SeldonModelMetric: externalModelName, SeldonInternalModelMetric: internalModelName, SeldonServerMetric: pm.serverName, SeldonServerReplicaMetric: pm.serverReplicaIdx, MethodTypeMetric: method, + CodeMetric: code, }).Inc() - pm.aggregateInferCounter.With(prometheus.Labels{ + pm.modelAggregateInferCounter.With(prometheus.Labels{ SeldonServerMetric: pm.serverName, SeldonServerReplicaMetric: pm.serverReplicaIdx, MethodTypeMetric: method, }).Inc() } -func (pm *PrometheusMetrics) addInferLatency(externalModelName, internalModelName, method string, latency float64) { - pm.inferLatencyCounter.With(prometheus.Labels{ +func (pm *PrometheusMetrics) addInferLatency(externalModelName, internalModelName, method string, latency float64, code string) { + pm.modelInferLatencyCounter.With(prometheus.Labels{ SeldonModelMetric: externalModelName, SeldonInternalModelMetric: internalModelName, SeldonServerMetric: pm.serverName, SeldonServerReplicaMetric: pm.serverReplicaIdx, MethodTypeMetric: method, + CodeMetric: code, }).Add(latency) - pm.aggregateInferLatencyCounter.With(prometheus.Labels{ + pm.modelAggregateInferLatencyCounter.With(prometheus.Labels{ SeldonServerMetric: pm.serverName, SeldonServerReplicaMetric: pm.serverReplicaIdx, MethodTypeMetric: method, diff --git a/scheduler/pkg/metrics/prometheus_test.go b/scheduler/pkg/metrics/agent_test.go similarity index 89% rename from scheduler/pkg/metrics/prometheus_test.go rename to scheduler/pkg/metrics/agent_test.go index 8904c508c4..088d675d2d 100644 --- a/scheduler/pkg/metrics/prometheus_test.go +++ b/scheduler/pkg/metrics/agent_test.go @@ -20,7 +20,7 @@ const ( ) func createTestPrometheusMetrics() (*PrometheusMetrics, error) { - return NewPrometheusMetrics(serverName, serverIdx, namesapce, log.New()) + return NewPrometheusModelMetrics(serverName, serverIdx, namesapce, log.New()) } func TestLoadedModelMetrics(t *testing.T) { @@ -166,30 +166,32 @@ func TestInferModelMetrics(t *testing.T) { modelName := modelNamePrefix + "0" latency := float64(20) method := "rest" - promMetrics.AddInferMetrics(modelName, modelName, method, latency) + promMetrics.AddModelInferMetrics(modelName, modelName, method, latency, "200") actualVal := testutil.ToFloat64( - promMetrics.inferLatencyCounter.With(prometheus.Labels{ + promMetrics.modelInferLatencyCounter.With(prometheus.Labels{ SeldonModelMetric: modelName, SeldonInternalModelMetric: modelName, MethodTypeMetric: method, SeldonServerMetric: serverName, SeldonServerReplicaMetric: strconv.Itoa(serverIdx), + CodeMetric: "200", })) g.Expect(latency).To(Equal(actualVal)) actualVal = testutil.ToFloat64( - promMetrics.inferCounter.With(prometheus.Labels{ + promMetrics.modelInferCounter.With(prometheus.Labels{ SeldonModelMetric: modelName, SeldonInternalModelMetric: modelName, MethodTypeMetric: method, SeldonServerMetric: serverName, SeldonServerReplicaMetric: strconv.Itoa(serverIdx), + CodeMetric: "200", })) g.Expect(float64(1)).To(Equal(actualVal)) actualVal = testutil.ToFloat64( - promMetrics.aggregateInferLatencyCounter.With(prometheus.Labels{ + promMetrics.modelAggregateInferLatencyCounter.With(prometheus.Labels{ MethodTypeMetric: method, SeldonServerMetric: serverName, SeldonServerReplicaMetric: strconv.Itoa(serverIdx), @@ -197,15 +199,15 @@ func TestInferModelMetrics(t *testing.T) { g.Expect(latency).To(Equal(actualVal)) actualVal = testutil.ToFloat64( - promMetrics.aggregateInferCounter.With(prometheus.Labels{ + promMetrics.modelAggregateInferCounter.With(prometheus.Labels{ MethodTypeMetric: method, SeldonServerMetric: serverName, SeldonServerReplicaMetric: strconv.Itoa(serverIdx), })) g.Expect(float64(1)).To(Equal(actualVal)) - promMetrics.inferLatencyCounter.Reset() - promMetrics.inferCounter.Reset() - promMetrics.aggregateInferLatencyCounter.Reset() - promMetrics.aggregateInferCounter.Reset() + promMetrics.modelInferLatencyCounter.Reset() + promMetrics.modelInferCounter.Reset() + promMetrics.modelAggregateInferLatencyCounter.Reset() + promMetrics.modelAggregateInferCounter.Reset() } diff --git a/scheduler/pkg/metrics/gateway.go b/scheduler/pkg/metrics/gateway.go new file mode 100644 index 0000000000..264735ffdd --- /dev/null +++ b/scheduler/pkg/metrics/gateway.go @@ -0,0 +1,213 @@ +package metrics + +import ( + "context" + "fmt" + "net/http" + "strconv" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + log "github.com/sirupsen/logrus" + "google.golang.org/grpc" + "google.golang.org/grpc/status" +) + +const ( + + // start list of metrics + // Pipeline metrics + PipelineHistogramName = "seldon_pipeline_infer_api_seconds" + PipelineInferCounterName = "seldon_pipeline_infer_total" + PipelineInferLatencyCounterName = "seldon_pipeline_infer_seconds_total" + PipelineAggregateInferCounterName = "seldon_pipeline_aggregate_infer_total" + PipelineAggregateInferLatencyCounterName = "seldon_pipeline_aggregate_infer_seconds_total" + // end list of metrics + + SeldonPipelineMetric = "pipeline" +) + +//TODO Revisit splitting this interface as metric handling matures +type PipelineMetricsHandler interface { + AddPipelineHistogramMetricsHandler(baseHandler http.HandlerFunc) http.HandlerFunc + PipelineUnaryServerInterceptor() func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) + AddPipelineInferMetrics(pipelineName string, method string, elapsedTime float64, code string) +} + +type PrometheusPipelineMetrics struct { + serverName string + namespace string + logger log.FieldLogger + // Model metrics + pipelineHistogram *prometheus.HistogramVec + pipelineInferCounter *prometheus.CounterVec + pipelineInferLatencyCounter *prometheus.CounterVec + pipelineAggregateInferCounter *prometheus.CounterVec + pipelineAggregateInferLatencyCounter *prometheus.CounterVec + + server *http.Server +} + +func NewPrometheusPipelineMetrics(namespace string, logger log.FieldLogger) (*PrometheusPipelineMetrics, error) { + namespace = safeNamespaceName(namespace) + histogram, err := createPipelineHistogram(namespace) + if err != nil { + return nil, err + } + + inferCounter, err := createPipelineInferCounter(namespace) + if err != nil { + return nil, err + } + + inferLatencyCounter, err := createPipelineInferLatencyCounter(namespace) + if err != nil { + return nil, err + } + + aggregateInferCounter, err := createPipelineAggregateInferCounter(namespace) + if err != nil { + + return nil, err + } + + aggregateInferLatencyCounter, err := createPipelineAggregateInferLatencyCounter(namespace) + if err != nil { + return nil, err + } + + return &PrometheusPipelineMetrics{ + serverName: "pipeline-gateway", + namespace: namespace, + logger: logger.WithField("source", "PrometheusMetrics"), + pipelineHistogram: histogram, + pipelineInferCounter: inferCounter, + pipelineInferLatencyCounter: inferLatencyCounter, + pipelineAggregateInferCounter: aggregateInferCounter, + pipelineAggregateInferLatencyCounter: aggregateInferLatencyCounter, + }, nil +} + +func createPipelineHistogram(namespace string) (*prometheus.HistogramVec, error) { + //TODO add method for rest/grpc + labelNames := []string{SeldonServerMetric, MethodMetric, CodeMetric} + + histogram := prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: PipelineHistogramName, + Namespace: namespace, + Help: "A histogram of latencies for pipeline gateway", + Buckets: DefaultHistogramBuckets, + }, + labelNames, + ) + err := prometheus.Register(histogram) + if err != nil { + if e, ok := err.(prometheus.AlreadyRegisteredError); ok { + histogram = e.ExistingCollector.(*prometheus.HistogramVec) + } else { + return nil, err + } + } + return histogram, nil +} + +func createPipelineInferCounter(namespace string) (*prometheus.CounterVec, error) { + labelNames := []string{SeldonServerMetric, SeldonPipelineMetric, MethodTypeMetric, CodeMetric} + return createCounterVec( + PipelineInferCounterName, "A count of pipeline gateway calls", + namespace, labelNames) +} + +func createPipelineInferLatencyCounter(namespace string) (*prometheus.CounterVec, error) { + labelNames := []string{SeldonServerMetric, SeldonPipelineMetric, MethodTypeMetric, CodeMetric} + return createCounterVec( + PipelineInferLatencyCounterName, "A sum of pipeline gateway call latencies", + namespace, labelNames) +} + +func createPipelineAggregateInferCounter(namespace string) (*prometheus.CounterVec, error) { + labelNames := []string{SeldonServerMetric, MethodTypeMetric} + return createCounterVec( + PipelineAggregateInferCounterName, "A count of pipeline gateway calls (aggregate)", + namespace, labelNames) +} + +func createPipelineAggregateInferLatencyCounter(namespace string) (*prometheus.CounterVec, error) { + labelNames := []string{SeldonServerMetric, MethodTypeMetric} + return createCounterVec( + PipelineAggregateInferLatencyCounterName, "A sum of pipeline gateway call latencies (aggregate)", + namespace, labelNames) +} + +func (pm *PrometheusPipelineMetrics) HttpCodeToString(code int) string { + return fmt.Sprintf("%d", code) +} + +func (pm *PrometheusPipelineMetrics) AddPipelineHistogramMetricsHandler(baseHandler http.HandlerFunc) http.HandlerFunc { + handler := promhttp.InstrumentHandlerDuration( + pm.pipelineHistogram.MustCurryWith(prometheus.Labels{ + SeldonServerMetric: pm.serverName, + }), + baseHandler, + ) + return handler +} + +func (pm *PrometheusPipelineMetrics) PipelineUnaryServerInterceptor() func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + startTime := time.Now() + resp, err := handler(ctx, req) + st, _ := status.FromError(err) + elapsedTime := time.Since(startTime).Seconds() + pm.pipelineHistogram.WithLabelValues(pm.serverName, "grpc", st.Code().String()).Observe(elapsedTime) + return resp, err + } +} + +func (pm *PrometheusPipelineMetrics) AddPipelineInferMetrics(pipelineName string, method string, latency float64, code string) { + pm.addInferLatency(pipelineName, method, latency, code) + pm.addInferCount(pipelineName, method, code) +} + +func (pm *PrometheusPipelineMetrics) addInferCount(pipelineName, method string, code string) { + pm.pipelineInferCounter.With(prometheus.Labels{ + SeldonPipelineMetric: pipelineName, + SeldonServerMetric: pm.serverName, + MethodTypeMetric: method, + CodeMetric: code, + }).Inc() + pm.pipelineAggregateInferCounter.With(prometheus.Labels{ + SeldonServerMetric: pm.serverName, + MethodTypeMetric: method, + }).Inc() +} + +func (pm *PrometheusPipelineMetrics) addInferLatency(pipelineName, method string, latency float64, code string) { + pm.pipelineInferLatencyCounter.With(prometheus.Labels{ + SeldonPipelineMetric: pipelineName, + SeldonServerMetric: pm.serverName, + MethodTypeMetric: method, + CodeMetric: code, + }).Add(latency) + pm.pipelineAggregateInferLatencyCounter.With(prometheus.Labels{ + SeldonServerMetric: pm.serverName, + MethodTypeMetric: method, + }).Add(latency) +} + +func (pm *PrometheusPipelineMetrics) Start(port int) error { + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.Handler()) + pm.server = &http.Server{ + Addr: ":" + strconv.Itoa(port), + Handler: mux, + } + pm.logger.Infof("Starting metrics server on port %d", port) + return pm.server.ListenAndServe() +} + +func (pm *PrometheusPipelineMetrics) Stop() error { + return pm.server.Shutdown(context.Background()) +} diff --git a/scheduler/pkg/metrics/utils.go b/scheduler/pkg/metrics/utils.go new file mode 100644 index 0000000000..95875812aa --- /dev/null +++ b/scheduler/pkg/metrics/utils.go @@ -0,0 +1,51 @@ +package metrics + +import ( + "fmt" + + "github.com/prometheus/client_golang/prometheus" +) + +func HttpCodeToString(code int) string { + return fmt.Sprintf("%d", code) +} + +func createCounterVec(counterName, helperName, namespace string, labelNames []string) (*prometheus.CounterVec, error) { + counter := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: counterName, + Namespace: namespace, + Help: helperName, + }, + labelNames, + ) + err := prometheus.Register(counter) + if err != nil { + if e, ok := err.(prometheus.AlreadyRegisteredError); ok { + counter = e.ExistingCollector.(*prometheus.CounterVec) + } else { + return nil, err + } + } + return counter, nil +} + +func createGaugeVec(gaugeName, helperName, namespace string, labelNames []string) (*prometheus.GaugeVec, error) { + gauge := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: gaugeName, + Namespace: namespace, + Help: helperName, + }, + labelNames, + ) + err := prometheus.Register(gauge) + if err != nil { + if e, ok := err.(prometheus.AlreadyRegisteredError); ok { + gauge = e.ExistingCollector.(*prometheus.GaugeVec) + } else { + return nil, err + } + } + return gauge, nil +}