Skip to content

Commit

Permalink
Adding publishRate trigger
Browse files Browse the repository at this point in the history
Signed-off-by: Karg <rkarg@blizzard.com>
  • Loading branch information
rkarg-blizz committed Mar 3, 2021
1 parent 9a55f9e commit 135679d
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 30 deletions.
98 changes: 76 additions & 22 deletions pkg/scalers/rabbitmq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import (
)

const (
rabbitQueueLengthMetricName = "queueLength"
defaultRabbitMQQueueLength = 20
rabbitMetricType = "External"
rabbitQueueLengthMetricName = "queueLength"
rabbitPublishedPerSecondMetricName = "publishRate"
defaultRabbitMQQueueLength = 20
defaultRabbitMQPublishRate = 0 // Default to zero to disable publish rate metering for back compat.
rabbitMetricType = "External"
)

const (
Expand All @@ -43,15 +45,25 @@ type rabbitMQScaler struct {
type rabbitMQMetadata struct {
queueName string
queueLength int
publishRate float64 // Publish/sec. rate on the queue, requires HTTP protocol
host string // connection string for either HTTP or AMQP protocol
protocol string // either http or amqp protocol
vhostName *string // override the vhost from the connection info
}

type queueInfo struct {
Messages int `json:"messages"`
MessagesUnacknowledged int `json:"messages_unacknowledged"`
Name string `json:"name"`
Messages int `json:"messages"`
MessagesUnacknowledged int `json:"messages_unacknowledged"`
MessageStat messageStat `json:"message_stats"`
Name string `json:"name"`
}

type messageStat struct {
PublishDetail publishDetail `json:"publish_details"`
}

type publishDetail struct {
Rate float64 `json:"rate"`
}

var rabbitmqLog = logf.Log.WithName("rabbitmq_scaler")
Expand Down Expand Up @@ -145,18 +157,40 @@ func parseRabbitMQMetadata(config *ScalerConfig) (*rabbitMQMetadata, error) {
return nil, fmt.Errorf("no queue name given")
}

// Resolve queueLength
if val, ok := config.TriggerMetadata[rabbitQueueLengthMetricName]; ok {
// Resolve publishRate
if val, ok := config.TriggerMetadata[rabbitPublishedPerSecondMetricName]; ok {
publishRate, err := strconv.ParseFloat(val, 64)
if err != nil {
return nil, fmt.Errorf("can't parse %s: %s", rabbitPublishedPerSecondMetricName, err)
}

meta.publishRate = publishRate
} else {
meta.publishRate = defaultRabbitMQPublishRate
}

val, ok := config.TriggerMetadata[rabbitQueueLengthMetricName]
switch {
case ok:
queueLength, err := strconv.Atoi(val)
if err != nil {
return nil, fmt.Errorf("can't parse %s: %s", rabbitQueueLengthMetricName, err)
}

meta.queueLength = queueLength
} else {
case meta.publishRate > 0:
meta.queueLength = 0
default:
meta.queueLength = defaultRabbitMQQueueLength
}

if meta.publishRate > 0 && meta.queueLength > 0 {
return nil, fmt.Errorf("only one of queueLength or publishRate can be specified; use two separate triggers if both are desired")
}

if meta.publishRate > 0 && meta.protocol != httpProtocol {
return nil, fmt.Errorf("protocol %s not supported; must be http to use publishRate", meta.protocol)
}

// Resolve vhostName
if val, ok := config.TriggerMetadata["vhostName"]; ok {
meta.vhostName = &val
Expand Down Expand Up @@ -193,31 +227,34 @@ func (s *rabbitMQScaler) Close() error {

// IsActive returns true if there are pending messages to be processed
func (s *rabbitMQScaler) IsActive(ctx context.Context) (bool, error) {
messages, err := s.getQueueMessages()
messages, publishRate, err := s.getQueueStatus()
if err != nil {
return false, fmt.Errorf("error inspecting rabbitMQ: %s", err)
}

return messages > 0, nil
if s.metadata.queueLength > 0 {
return messages > 0, nil
}
return publishRate > 0, nil
}

func (s *rabbitMQScaler) getQueueMessages() (int, error) {
func (s *rabbitMQScaler) getQueueStatus() (int, float64, error) {
if s.metadata.protocol == httpProtocol {
info, err := s.getQueueInfoViaHTTP()
if err != nil {
return -1, err
return -1, -1, err
}

// messages count includes count of ready and unack-ed
return info.Messages, nil
return info.Messages, info.MessageStat.PublishDetail.Rate, nil
}

items, err := s.channel.QueueInspect(s.metadata.queueName)
if err != nil {
return -1, err
return -1, -1, err
}

return items.Messages, nil
return items.Messages, 0, nil
}

func getJSON(httpClient *http.Client, url string, target interface{}) error {
Expand Down Expand Up @@ -269,32 +306,49 @@ func (s *rabbitMQScaler) getQueueInfoViaHTTP() (*queueInfo, error) {

// GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler
func (s *rabbitMQScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec {
targetMetricValue := resource.NewQuantity(int64(s.metadata.queueLength), resource.DecimalSI)
var metricName string
var metricValue *resource.Quantity
if s.metadata.queueLength > 0 {
metricName = kedautil.NormalizeString(fmt.Sprintf("%s-%s", "rabbitmq", s.metadata.queueName))
metricValue = resource.NewQuantity(int64(s.metadata.queueLength), resource.DecimalSI)
} else {
metricName = kedautil.NormalizeString(fmt.Sprintf("%s-%s", "rabbitmq-rate", s.metadata.queueName))
metricValue = resource.NewMilliQuantity(int64(s.metadata.publishRate*1000), resource.DecimalSI)
}

externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: kedautil.NormalizeString(fmt.Sprintf("%s-%s", "rabbitmq", s.metadata.queueName)),
Name: metricName,
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
AverageValue: targetMetricValue,
AverageValue: metricValue,
},
}
metricSpec := v2beta2.MetricSpec{
External: externalMetric, Type: rabbitMetricType,
}

return []v2beta2.MetricSpec{metricSpec}
}

// GetMetrics returns value for a supported metric and an error if there is a problem getting the metric
func (s *rabbitMQScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
messages, err := s.getQueueMessages()
messages, publishRate, err := s.getQueueStatus()
if err != nil {
return []external_metrics.ExternalMetricValue{}, fmt.Errorf("error inspecting rabbitMQ: %s", err)
}

var metricValue resource.Quantity
if s.metadata.queueLength > 0 {
metricValue = *resource.NewQuantity(int64(messages), resource.DecimalSI)
} else {
metricValue = *resource.NewMilliQuantity(int64(publishRate*1000), resource.DecimalSI)
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(int64(messages), resource.DecimalSI),
Value: metricValue,
Timestamp: metav1.Now(),
}

Expand Down
46 changes: 38 additions & 8 deletions pkg/scalers/rabbitmq_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,18 @@ var testRabbitMQMetadata = []parseRabbitMQMetadataTestData{
{map[string]string{"queueName": "sample", "host": "http://"}, false, map[string]string{}},
// auto protocol and an HTTPS URL
{map[string]string{"queueName": "sample", "host": "https://"}, false, map[string]string{}},
// publishRate number
{map[string]string{rabbitPublishedPerSecondMetricName: "100", "queueName": "sample", "host": "https://"}, false, map[string]string{}},
// publishRate not number
{map[string]string{rabbitPublishedPerSecondMetricName: "AA", "queueName": "sample", "host": "https://"}, true, map[string]string{}},
// publishRate http
{map[string]string{rabbitPublishedPerSecondMetricName: "100", "queueName": "sample", "host": "http://"}, false, map[string]string{}},
// publishRate amqp
{map[string]string{rabbitPublishedPerSecondMetricName: "100", "queueName": "sample", "host": "amqp://"}, true, map[string]string{}},
// publishRate amqps
{map[string]string{rabbitPublishedPerSecondMetricName: "100", "queueName": "sample", "host": "amqps://"}, true, map[string]string{}},
// publishRate and queueLength
{map[string]string{rabbitPublishedPerSecondMetricName: "100", "queueLength": "10", "queueName": "sample", "host": "https://"}, true, map[string]string{}},
}

var rabbitMQMetricIdentifiers = []rabbitMQMetricIdentifier{
Expand All @@ -82,6 +94,8 @@ var testDefaultQueueLength = []parseRabbitMQMetadataTestData{
{map[string]string{"queueName": "sample", "hostFromEnv": host}, false, map[string]string{}},
// use default queueLength with includeUnacked
{map[string]string{"queueName": "sample", "hostFromEnv": host, "protocol": "http"}, false, map[string]string{}},
// use default queueLength with includeUnacked
{map[string]string{"queueName": "sample", rabbitPublishedPerSecondMetricName: "100", "hostFromEnv": host, "protocol": "http"}, false, map[string]string{}},
}

func TestParseDefaultQueueLength(t *testing.T) {
Expand All @@ -92,7 +106,9 @@ func TestParseDefaultQueueLength(t *testing.T) {
t.Error("Expected success but got error", err)
case testData.isError && err == nil:
t.Error("Expected error but got success")
case metadata.queueLength != defaultRabbitMQQueueLength:
case metadata.publishRate > 0 && metadata.queueLength != 0:
t.Error("Expected default queueLength = 0 when publishRate is specified")
case metadata.publishRate == 0 && metadata.queueLength != defaultRabbitMQQueueLength:
t.Error("Expected default queueLength =", defaultRabbitMQQueueLength, "but got", metadata.queueLength)
}
}
Expand All @@ -107,19 +123,33 @@ type getQueueInfoTestData struct {
}

var testQueueInfoTestData = []getQueueInfoTestData{
{`{"messages": 4, "messages_unacknowledged": 1, "name": "evaluate_trials"}`, http.StatusOK, true, nil, ""},
{`{"messages": 1, "messages_unacknowledged": 1, "name": "evaluate_trials"}`, http.StatusOK, true, nil, ""},
{`{"messages": 1, "messages_unacknowledged": 0, "name": "evaluate_trials"}`, http.StatusOK, true, nil, ""},
{`{"messages": 0, "messages_unacknowledged": 0, "name": "evaluate_trials"}`, http.StatusOK, false, nil, ""},
// queueLength
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, true, nil, ""},
{`{"messages": 1, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, true, nil, ""},
{`{"messages": 1, "messages_unacknowledged": 0, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, true, nil, ""},
{`{"messages": 0, "messages_unacknowledged": 0, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, false, nil, ""},
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, nil, ""},
{`{"messages": 1, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, nil, ""},
{`{"messages": 1, "messages_unacknowledged": 0, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, nil, ""},
{`{"messages": 0, "messages_unacknowledged": 0, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, false, nil, ""},
// publishRate
{`{"messages": 0, "messages_unacknowledged": 0, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{rabbitPublishedPerSecondMetricName: "100", "queueLength": "0"}, ""},
{`{"messages": 0, "messages_unacknowledged": 0, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, false, map[string]string{rabbitPublishedPerSecondMetricName: "100", "queueLength": "0"}, ""},
{`{"messages": 1, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{rabbitPublishedPerSecondMetricName: "100", "queueLength": "0"}, ""},
{`{"messages": 1, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, false, map[string]string{rabbitPublishedPerSecondMetricName: "100", "queueLength": "0"}, ""},
// error response
{`Password is incorrect`, http.StatusUnauthorized, false, nil, ""},
}

var vhostPathes = []string{"/myhost", "", "/", "//", "/%2F"}

var testQueueInfoTestDataSingleVhost = []getQueueInfoTestData{
{`{"messages": 4, "messages_unacknowledged": 1, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": "myhost"}, "/myhost"},
{`{"messages": 4, "messages_unacknowledged": 1, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": "/"}, "/"},
{`{"messages": 4, "messages_unacknowledged": 1, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": ""}, "/"},
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": "myhost"}, "/myhost"},
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": "/"}, "/"},
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": ""}, "/"},
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": "myhost"}, "/myhost"},
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": "/"}, "/"},
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": ""}, "/"},
}

func TestGetQueueInfo(t *testing.T) {
Expand Down

0 comments on commit 135679d

Please sign in to comment.