From 135679d2116b114567d2a24b5f5697c376fa71d6 Mon Sep 17 00:00:00 2001 From: Karg Date: Wed, 3 Mar 2021 15:06:19 -0800 Subject: [PATCH] Adding publishRate trigger Signed-off-by: Karg --- pkg/scalers/rabbitmq_scaler.go | 98 ++++++++++++++++++++++------- pkg/scalers/rabbitmq_scaler_test.go | 46 +++++++++++--- 2 files changed, 114 insertions(+), 30 deletions(-) diff --git a/pkg/scalers/rabbitmq_scaler.go b/pkg/scalers/rabbitmq_scaler.go index 43bda488aa4..829fd0c2f91 100644 --- a/pkg/scalers/rabbitmq_scaler.go +++ b/pkg/scalers/rabbitmq_scaler.go @@ -21,9 +21,11 @@ import ( ) const ( - rabbitQueueLengthMetricName = "queueLength" - defaultRabbitMQQueueLength = 20 - rabbitMetricType = "External" + rabbitQueueLengthMetricName = "queueLength" + rabbitPublishedPerSecondMetricName = "publishRate" + defaultRabbitMQQueueLength = 20 + defaultRabbitMQPublishRate = 0 // Default to zero to disable publish rate metering for back compat. + rabbitMetricType = "External" ) const ( @@ -43,15 +45,25 @@ type rabbitMQScaler struct { type rabbitMQMetadata struct { queueName string queueLength int + publishRate float64 // Publish/sec. rate on the queue, requires HTTP protocol host string // connection string for either HTTP or AMQP protocol protocol string // either http or amqp protocol vhostName *string // override the vhost from the connection info } type queueInfo struct { - Messages int `json:"messages"` - MessagesUnacknowledged int `json:"messages_unacknowledged"` - Name string `json:"name"` + Messages int `json:"messages"` + MessagesUnacknowledged int `json:"messages_unacknowledged"` + MessageStat messageStat `json:"message_stats"` + Name string `json:"name"` +} + +type messageStat struct { + PublishDetail publishDetail `json:"publish_details"` +} + +type publishDetail struct { + Rate float64 `json:"rate"` } var rabbitmqLog = logf.Log.WithName("rabbitmq_scaler") @@ -145,18 +157,40 @@ func parseRabbitMQMetadata(config *ScalerConfig) (*rabbitMQMetadata, error) { return nil, fmt.Errorf("no queue name given") } - // Resolve queueLength - if val, ok := config.TriggerMetadata[rabbitQueueLengthMetricName]; ok { + // Resolve publishRate + if val, ok := config.TriggerMetadata[rabbitPublishedPerSecondMetricName]; ok { + publishRate, err := strconv.ParseFloat(val, 64) + if err != nil { + return nil, fmt.Errorf("can't parse %s: %s", rabbitPublishedPerSecondMetricName, err) + } + + meta.publishRate = publishRate + } else { + meta.publishRate = defaultRabbitMQPublishRate + } + + val, ok := config.TriggerMetadata[rabbitQueueLengthMetricName] + switch { + case ok: queueLength, err := strconv.Atoi(val) if err != nil { return nil, fmt.Errorf("can't parse %s: %s", rabbitQueueLengthMetricName, err) } - meta.queueLength = queueLength - } else { + case meta.publishRate > 0: + meta.queueLength = 0 + default: meta.queueLength = defaultRabbitMQQueueLength } + if meta.publishRate > 0 && meta.queueLength > 0 { + return nil, fmt.Errorf("only one of queueLength or publishRate can be specified; use two separate triggers if both are desired") + } + + if meta.publishRate > 0 && meta.protocol != httpProtocol { + return nil, fmt.Errorf("protocol %s not supported; must be http to use publishRate", meta.protocol) + } + // Resolve vhostName if val, ok := config.TriggerMetadata["vhostName"]; ok { meta.vhostName = &val @@ -193,31 +227,34 @@ func (s *rabbitMQScaler) Close() error { // IsActive returns true if there are pending messages to be processed func (s *rabbitMQScaler) IsActive(ctx context.Context) (bool, error) { - messages, err := s.getQueueMessages() + messages, publishRate, err := s.getQueueStatus() if err != nil { return false, fmt.Errorf("error inspecting rabbitMQ: %s", err) } - return messages > 0, nil + if s.metadata.queueLength > 0 { + return messages > 0, nil + } + return publishRate > 0, nil } -func (s *rabbitMQScaler) getQueueMessages() (int, error) { +func (s *rabbitMQScaler) getQueueStatus() (int, float64, error) { if s.metadata.protocol == httpProtocol { info, err := s.getQueueInfoViaHTTP() if err != nil { - return -1, err + return -1, -1, err } // messages count includes count of ready and unack-ed - return info.Messages, nil + return info.Messages, info.MessageStat.PublishDetail.Rate, nil } items, err := s.channel.QueueInspect(s.metadata.queueName) if err != nil { - return -1, err + return -1, -1, err } - return items.Messages, nil + return items.Messages, 0, nil } func getJSON(httpClient *http.Client, url string, target interface{}) error { @@ -269,32 +306,49 @@ func (s *rabbitMQScaler) getQueueInfoViaHTTP() (*queueInfo, error) { // GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler func (s *rabbitMQScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec { - targetMetricValue := resource.NewQuantity(int64(s.metadata.queueLength), resource.DecimalSI) + var metricName string + var metricValue *resource.Quantity + if s.metadata.queueLength > 0 { + metricName = kedautil.NormalizeString(fmt.Sprintf("%s-%s", "rabbitmq", s.metadata.queueName)) + metricValue = resource.NewQuantity(int64(s.metadata.queueLength), resource.DecimalSI) + } else { + metricName = kedautil.NormalizeString(fmt.Sprintf("%s-%s", "rabbitmq-rate", s.metadata.queueName)) + metricValue = resource.NewMilliQuantity(int64(s.metadata.publishRate*1000), resource.DecimalSI) + } + externalMetric := &v2beta2.ExternalMetricSource{ Metric: v2beta2.MetricIdentifier{ - Name: kedautil.NormalizeString(fmt.Sprintf("%s-%s", "rabbitmq", s.metadata.queueName)), + Name: metricName, }, Target: v2beta2.MetricTarget{ Type: v2beta2.AverageValueMetricType, - AverageValue: targetMetricValue, + AverageValue: metricValue, }, } metricSpec := v2beta2.MetricSpec{ External: externalMetric, Type: rabbitMetricType, } + return []v2beta2.MetricSpec{metricSpec} } // GetMetrics returns value for a supported metric and an error if there is a problem getting the metric func (s *rabbitMQScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { - messages, err := s.getQueueMessages() + messages, publishRate, err := s.getQueueStatus() if err != nil { return []external_metrics.ExternalMetricValue{}, fmt.Errorf("error inspecting rabbitMQ: %s", err) } + var metricValue resource.Quantity + if s.metadata.queueLength > 0 { + metricValue = *resource.NewQuantity(int64(messages), resource.DecimalSI) + } else { + metricValue = *resource.NewMilliQuantity(int64(publishRate*1000), resource.DecimalSI) + } + metric := external_metrics.ExternalMetricValue{ MetricName: metricName, - Value: *resource.NewQuantity(int64(messages), resource.DecimalSI), + Value: metricValue, Timestamp: metav1.Now(), } diff --git a/pkg/scalers/rabbitmq_scaler_test.go b/pkg/scalers/rabbitmq_scaler_test.go index 7bb3ec681f2..79b01031879 100644 --- a/pkg/scalers/rabbitmq_scaler_test.go +++ b/pkg/scalers/rabbitmq_scaler_test.go @@ -58,6 +58,18 @@ var testRabbitMQMetadata = []parseRabbitMQMetadataTestData{ {map[string]string{"queueName": "sample", "host": "http://"}, false, map[string]string{}}, // auto protocol and an HTTPS URL {map[string]string{"queueName": "sample", "host": "https://"}, false, map[string]string{}}, + // publishRate number + {map[string]string{rabbitPublishedPerSecondMetricName: "100", "queueName": "sample", "host": "https://"}, false, map[string]string{}}, + // publishRate not number + {map[string]string{rabbitPublishedPerSecondMetricName: "AA", "queueName": "sample", "host": "https://"}, true, map[string]string{}}, + // publishRate http + {map[string]string{rabbitPublishedPerSecondMetricName: "100", "queueName": "sample", "host": "http://"}, false, map[string]string{}}, + // publishRate amqp + {map[string]string{rabbitPublishedPerSecondMetricName: "100", "queueName": "sample", "host": "amqp://"}, true, map[string]string{}}, + // publishRate amqps + {map[string]string{rabbitPublishedPerSecondMetricName: "100", "queueName": "sample", "host": "amqps://"}, true, map[string]string{}}, + // publishRate and queueLength + {map[string]string{rabbitPublishedPerSecondMetricName: "100", "queueLength": "10", "queueName": "sample", "host": "https://"}, true, map[string]string{}}, } var rabbitMQMetricIdentifiers = []rabbitMQMetricIdentifier{ @@ -82,6 +94,8 @@ var testDefaultQueueLength = []parseRabbitMQMetadataTestData{ {map[string]string{"queueName": "sample", "hostFromEnv": host}, false, map[string]string{}}, // use default queueLength with includeUnacked {map[string]string{"queueName": "sample", "hostFromEnv": host, "protocol": "http"}, false, map[string]string{}}, + // use default queueLength with includeUnacked + {map[string]string{"queueName": "sample", rabbitPublishedPerSecondMetricName: "100", "hostFromEnv": host, "protocol": "http"}, false, map[string]string{}}, } func TestParseDefaultQueueLength(t *testing.T) { @@ -92,7 +106,9 @@ func TestParseDefaultQueueLength(t *testing.T) { t.Error("Expected success but got error", err) case testData.isError && err == nil: t.Error("Expected error but got success") - case metadata.queueLength != defaultRabbitMQQueueLength: + case metadata.publishRate > 0 && metadata.queueLength != 0: + t.Error("Expected default queueLength = 0 when publishRate is specified") + case metadata.publishRate == 0 && metadata.queueLength != defaultRabbitMQQueueLength: t.Error("Expected default queueLength =", defaultRabbitMQQueueLength, "but got", metadata.queueLength) } } @@ -107,19 +123,33 @@ type getQueueInfoTestData struct { } var testQueueInfoTestData = []getQueueInfoTestData{ - {`{"messages": 4, "messages_unacknowledged": 1, "name": "evaluate_trials"}`, http.StatusOK, true, nil, ""}, - {`{"messages": 1, "messages_unacknowledged": 1, "name": "evaluate_trials"}`, http.StatusOK, true, nil, ""}, - {`{"messages": 1, "messages_unacknowledged": 0, "name": "evaluate_trials"}`, http.StatusOK, true, nil, ""}, - {`{"messages": 0, "messages_unacknowledged": 0, "name": "evaluate_trials"}`, http.StatusOK, false, nil, ""}, + // queueLength + {`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, true, nil, ""}, + {`{"messages": 1, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, true, nil, ""}, + {`{"messages": 1, "messages_unacknowledged": 0, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, true, nil, ""}, + {`{"messages": 0, "messages_unacknowledged": 0, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, false, nil, ""}, + {`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, nil, ""}, + {`{"messages": 1, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, nil, ""}, + {`{"messages": 1, "messages_unacknowledged": 0, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, nil, ""}, + {`{"messages": 0, "messages_unacknowledged": 0, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, false, nil, ""}, + // publishRate + {`{"messages": 0, "messages_unacknowledged": 0, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{rabbitPublishedPerSecondMetricName: "100", "queueLength": "0"}, ""}, + {`{"messages": 0, "messages_unacknowledged": 0, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, false, map[string]string{rabbitPublishedPerSecondMetricName: "100", "queueLength": "0"}, ""}, + {`{"messages": 1, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{rabbitPublishedPerSecondMetricName: "100", "queueLength": "0"}, ""}, + {`{"messages": 1, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, false, map[string]string{rabbitPublishedPerSecondMetricName: "100", "queueLength": "0"}, ""}, + // error response {`Password is incorrect`, http.StatusUnauthorized, false, nil, ""}, } var vhostPathes = []string{"/myhost", "", "/", "//", "/%2F"} var testQueueInfoTestDataSingleVhost = []getQueueInfoTestData{ - {`{"messages": 4, "messages_unacknowledged": 1, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": "myhost"}, "/myhost"}, - {`{"messages": 4, "messages_unacknowledged": 1, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": "/"}, "/"}, - {`{"messages": 4, "messages_unacknowledged": 1, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": ""}, "/"}, + {`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": "myhost"}, "/myhost"}, + {`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": "/"}, "/"}, + {`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": ""}, "/"}, + {`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": "myhost"}, "/myhost"}, + {`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": "/"}, "/"}, + {`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": ""}, "/"}, } func TestGetQueueInfo(t *testing.T) {