Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds support to use regex with HTTP protocol in RabbitMQ Scaler #1957

Merged
merged 12 commits into from
Jul 27, 2021
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
- Add Solace PubSub+ Event Broker Scaler ([#1945](https://github.com/kedacore/keda/pull/1945))
- Add fallback functionality ([#1872](https://github.com/kedacore/keda/issues/1872))
- Introduce Idle Replica Mode ([#1958](https://github.com/kedacore/keda/pull/1958))
- Support using regex to select the queues in RabbitMQ Scaler ([#1957](https://github.com/kedacore/keda/pull/1957))

### Improvements

Expand Down
121 changes: 112 additions & 9 deletions pkg/scalers/rabbitmq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ const (
defaultProtocol = autoProtocol
)

const (
sumOperation = "sum"
avgOperation = "avg"
maxOperation = "max"
defaultOperation = sumOperation
)

type rabbitMQScaler struct {
metadata *rabbitMQMetadata
connection *amqp.Connection
Expand All @@ -51,6 +58,8 @@ type rabbitMQMetadata struct {
host string // connection string for either HTTP or AMQP protocol
protocol string // either http or amqp protocol
vhostName *string // override the vhost from the connection info
useRegex bool // specify if the queueName contains a rexeg
operation string // specify the operation to apply in case of multiples queues
}

type queueInfo struct {
Expand Down Expand Up @@ -164,6 +173,25 @@ func parseRabbitMQMetadata(config *ScalerConfig) (*rabbitMQMetadata, error) {
meta.vhostName = &val
}

// Resolve useRegex
if val, ok := config.TriggerMetadata["useRegex"]; ok {
useRegex, err := strconv.ParseBool(val)
if err != nil {
return nil, fmt.Errorf("useRegex has invalid value")
}
meta.useRegex = useRegex
}

// Resolve operation
meta.operation = defaultOperation
if val, ok := config.TriggerMetadata["operation"]; ok {
meta.operation = val
}

if meta.useRegex && meta.protocol == amqpProtocol {
return nil, fmt.Errorf("configure only useRegex with http protocol")
}

_, err := parseTrigger(&meta, config)
if err != nil {
return nil, fmt.Errorf("unable to parse trigger: %s", err)
Expand Down Expand Up @@ -290,19 +318,31 @@ func (s *rabbitMQScaler) getQueueStatus() (int, float64, error) {
return items.Messages, 0, nil
}

func getJSON(httpClient *http.Client, url string, target interface{}) error {
r, err := httpClient.Get(url)
func getJSON(s *rabbitMQScaler, url string) (queueInfo, error) {
var result queueInfo
r, err := s.httpClient.Get(url)
if err != nil {
return err
return result, err
}
defer r.Body.Close()

if r.StatusCode == 200 {
return json.NewDecoder(r.Body).Decode(target)
if s.metadata.useRegex {
var results []queueInfo
err = json.NewDecoder(r.Body).Decode(&results)
if err != nil {
return result, err
}
result, err := getComposedQueue(s, results)
return result, err
}

err = json.NewDecoder(r.Body).Decode(&result)
return result, err
}

body, _ := ioutil.ReadAll(r.Body)
return fmt.Errorf("error requesting rabbitMQ API status: %s, response: %s, from: %s", r.Status, body, url)
return result, fmt.Errorf("error requesting rabbitMQ API status: %s, response: %s, from: %s", r.Status, body, url)
}

func (s *rabbitMQScaler) getQueueInfoViaHTTP() (*queueInfo, error) {
Expand All @@ -324,11 +364,15 @@ func (s *rabbitMQScaler) getQueueInfoViaHTTP() (*queueInfo, error) {
}

parsedURL.Path = ""
var getQueueInfoManagementURI string
if s.metadata.useRegex {
getQueueInfoManagementURI = fmt.Sprintf("%s/%s%s", parsedURL.String(), "api/queues?use_regex=true&pagination=false&name=", s.metadata.queueName)
} else {
getQueueInfoManagementURI = fmt.Sprintf("%s/%s%s/%s", parsedURL.String(), "api/queues", vhost, s.metadata.queueName)
}

getQueueInfoManagementURI := fmt.Sprintf("%s/%s%s/%s", parsedURL.String(), "api/queues", vhost, s.metadata.queueName)

info := queueInfo{}
err = getJSON(s.httpClient, getQueueInfoManagementURI, &info)
var info queueInfo
info, err = getJSON(s, getQueueInfoManagementURI)

if err != nil {
return nil, err
Expand Down Expand Up @@ -386,3 +430,62 @@ func (s *rabbitMQScaler) GetMetrics(ctx context.Context, metricName string, metr

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

func getComposedQueue(s *rabbitMQScaler, q []queueInfo) (queueInfo, error) {
var queue = queueInfo{}
queue.Name = "composed-queue"
queue.MessagesUnacknowledged = 0
if len(q) > 0 {
switch s.metadata.operation {
case sumOperation:
sumMessages, sumRate := getSum(q)
queue.Messages = sumMessages
queue.MessageStat.PublishDetail.Rate = sumRate
case avgOperation:
avgMessages, avgRate := getAverage(q)
queue.Messages = avgMessages
queue.MessageStat.PublishDetail.Rate = avgRate
case maxOperation:
maxMessages, maxRate := getMaximum(q)
queue.Messages = maxMessages
queue.MessageStat.PublishDetail.Rate = maxRate
default:
return queue, fmt.Errorf("operation mode %s must be one of %s, %s, %s", s.metadata.operation, sumOperation, avgOperation, maxOperation)
}
} else {
queue.Messages = 0
queue.MessageStat.PublishDetail.Rate = 0
}

return queue, nil
}

func getSum(q []queueInfo) (int, float64) {
var sumMessages int
var sumRate float64
for _, value := range q {
sumMessages += value.Messages
sumRate += value.MessageStat.PublishDetail.Rate
}
return sumMessages, sumRate
}

func getAverage(q []queueInfo) (int, float64) {
sumMessages, sumRate := getSum(q)
len := len(q)
return sumMessages / len, sumRate / float64(len)
}

func getMaximum(q []queueInfo) (int, float64) {
var maxMessages int
var maxRate float64
for _, value := range q {
if value.Messages > maxMessages {
maxMessages = value.Messages
}
if value.MessageStat.PublishDetail.Rate > maxRate {
maxRate = value.MessageStat.PublishDetail.Rate
}
}
return maxMessages, maxRate
}
113 changes: 113 additions & 0 deletions pkg/scalers/rabbitmq_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ var testRabbitMQMetadata = []parseRabbitMQMetadataTestData{
{map[string]string{"mode": "MessageRate", "value": "1000", "queueName": "sample", "host": "http://"}, false, map[string]string{}},
// message rate amqp
{map[string]string{"mode": "MessageRate", "value": "1000", "queueName": "sample", "host": "https://"}, false, map[string]string{}},
// amqp host and useRegex
{map[string]string{"queueName": "sample", "host": "amqps://", "useRegex": "true"}, true, map[string]string{}},
// http host and useRegex
{map[string]string{"queueName": "sample", "host": "http://", "useRegex": "true"}, false, map[string]string{}},
// message rate and useRegex
{map[string]string{"mode": "MessageRate", "value": "1000", "queueName": "sample", "host": "http://", "useRegex": "true"}, false, map[string]string{}},
// queue length and useRegex
{map[string]string{"mode": "QueueLength", "value": "1000", "queueName": "sample", "host": "http://", "useRegex": "true"}, false, map[string]string{}},
}

var rabbitMQMetricIdentifiers = []rabbitMQMetricIdentifier{
Expand Down Expand Up @@ -252,6 +260,111 @@ func TestGetQueueInfo(t *testing.T) {
}
}

var testRegexQueueInfoTestData = []getQueueInfoTestData{
// sum queue length
{`[{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"},{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"queueLength": "10", "useRegex": "true", "operation": "sum"}, ""},
{`[{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"},{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"queueLength": "10", "useRegex": "true", "operation": "sum"}, ""},
{`[{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"},{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"queueLength": "10", "useRegex": "true", "operation": "sum"}, ""},
{`[{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"queueLength": "10", "useRegex": "true", "operation": "sum"}, ""},
{`[{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"},{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trial2"}]`, http.StatusOK, false, map[string]string{"queueLength": "10", "useRegex": "true", "operation": "sum"}, ""},
{`[]`, http.StatusOK, false, map[string]string{"queueLength": "10", "useRegex": "true", "operation": "sum"}, ""},
// max queue length
{`[{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"},{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"queueLength": "10", "useRegex": "true", "operation": "max"}, ""},
{`[{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"},{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"queueLength": "10", "useRegex": "true", "operation": "max"}, ""},
{`[{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"},{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"queueLength": "10", "useRegex": "true", "operation": "max"}, ""},
{`[{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"queueLength": "10", "useRegex": "true", "operation": "max"}, ""},
{`[{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"},{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trial2"}]`, http.StatusOK, false, map[string]string{"queueLength": "10", "useRegex": "true", "operation": "max"}, ""},
{`[]`, http.StatusOK, false, map[string]string{"queueLength": "10", "useRegex": "true", "operation": "max"}, ""},
// avg queue length
{`[{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"},{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"queueLength": "10", "useRegex": "true", "operation": "avg"}, ""},
{`[{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"},{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"queueLength": "10", "useRegex": "true", "operation": "avg"}, ""},
{`[{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"},{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"queueLength": "10", "useRegex": "true", "operation": "avg"}, ""},
{`[{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"queueLength": "10", "useRegex": "true", "operation": "avg"}, ""},
{`[{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"},{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trial2"}]`, http.StatusOK, false, map[string]string{"queueLength": "10", "useRegex": "true", "operation": "avg"}, ""},
{`[]`, http.StatusOK, false, map[string]string{"queueLength": "10", "useRegex": "true", "operation": "avg"}, ""},
// sum message rate
{`[{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 4}}, "name": "evaluate_trials"},{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 4}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"mode": "MessageRate", "value": "1000", "useRegex": "true", "operation": "sum"}, ""},
{`[{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"},{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 4}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"mode": "MessageRate", "value": "1000", "useRegex": "true", "operation": "sum"}, ""},
{`[{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 4}}, "name": "evaluate_trials"},{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"mode": "MessageRate", "value": "1000", "useRegex": "true", "operation": "sum"}, ""},
{`[{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 4}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"mode": "MessageRate", "value": "1000", "useRegex": "true", "operation": "sum"}, ""},
{`[{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"},{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trial2"}]`, http.StatusOK, false, map[string]string{"mode": "MessageRate", "value": "1000", "useRegex": "true", "operation": "sum"}, ""},
{`[]`, http.StatusOK, false, map[string]string{"mode": "MessageRate", "value": "1000", "useRegex": "true", "operation": "sum"}, ""},
// max message rate
{`[{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 4}}, "name": "evaluate_trials"},{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 4}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"mode": "MessageRate", "value": "1000", "useRegex": "true", "operation": "max"}, ""},
{`[{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"},{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 4}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"mode": "MessageRate", "value": "1000", "useRegex": "true", "operation": "max"}, ""},
{`[{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 4}}, "name": "evaluate_trials"},{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"mode": "MessageRate", "value": "1000", "useRegex": "true", "operation": "max"}, ""},
{`[{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 4}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"mode": "MessageRate", "value": "1000", "useRegex": "true", "operation": "max"}, ""},
{`[{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"},{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trial2"}]`, http.StatusOK, false, map[string]string{"mode": "MessageRate", "value": "1000", "useRegex": "true", "operation": "max"}, ""},
{`[]`, http.StatusOK, false, map[string]string{"mode": "MessageRate", "value": "1000", "useRegex": "true", "operation": "max"}, ""},
// avg message rate
{`[{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 4}}, "name": "evaluate_trials"},{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 4}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"mode": "MessageRate", "value": "1000", "useRegex": "true", "operation": "avg"}, ""},
{`[{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"},{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 4}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"mode": "MessageRate", "value": "1000", "useRegex": "true", "operation": "avg"}, ""},
{`[{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 4}}, "name": "evaluate_trials"},{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"mode": "MessageRate", "value": "1000", "useRegex": "true", "operation": "avg"}, ""},
{`[{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 4}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"mode": "MessageRate", "value": "1000", "useRegex": "true", "operation": "avg"}, ""},
{`[{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"},{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trial2"}]`, http.StatusOK, false, map[string]string{"mode": "MessageRate", "value": "1000", "useRegex": "true", "operation": "avg"}, ""},
{`[]`, http.StatusOK, false, map[string]string{"mode": "MessageRate", "value": "1000", "useRegex": "true", "operation": "avg"}, ""},
}

func TestGetQueueInfoWithRegex(t *testing.T) {
for _, testData := range testRegexQueueInfoTestData {
var apiStub = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
expectedPath := "/api/queues?use_regex=true&pagination=false&name=evaluate_trials"
if r.RequestURI != expectedPath {
t.Error("Expect request path to =", expectedPath, "but it is", r.RequestURI)
}

w.WriteHeader(testData.responseStatus)
_, err := w.Write([]byte(testData.response))
if err != nil {
t.Error("Expect request path to =", testData.response, "but it is", err)
}
}))

resolvedEnv := map[string]string{host: fmt.Sprintf("%s%s", apiStub.URL, testData.vhostPath), "plainHost": apiStub.URL}

metadata := map[string]string{
"queueName": "evaluate_trials",
"hostFromEnv": host,
"protocol": "http",
}
for k, v := range testData.extraMetadata {
metadata[k] = v
}

s, err := NewRabbitMQScaler(
&ScalerConfig{
ResolvedEnv: resolvedEnv,
TriggerMetadata: metadata,
AuthParams: map[string]string{},
GlobalHTTPTimeout: 1000 * time.Millisecond,
},
)

if err != nil {
t.Error("Expect success", err)
}

ctx := context.TODO()
active, err := s.IsActive(ctx)

if testData.responseStatus == http.StatusOK {
if err != nil {
t.Error("Expect success", err)
}

if active != testData.isActive {
if testData.isActive {
t.Error("Expect to be active")
} else {
t.Error("Expect to not be active")
}
}
} else if !strings.Contains(err.Error(), testData.response) {
t.Error("Expect error to be like '", testData.response, "' but it's '", err, "'")
}
}
}

func TestRabbitMQGetMetricSpecForScaling(t *testing.T) {
for _, testData := range rabbitMQMetricIdentifiers {
meta, err := parseRabbitMQMetadata(&ScalerConfig{ResolvedEnv: sampleRabbitMqResolvedEnv, TriggerMetadata: testData.metadataTestData.metadata, AuthParams: nil})
Expand Down
Loading