Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v2] Add SSL support to RabbitMQ Scaler #1073

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 48 additions & 4 deletions pkg/scalers/rabbitmq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package scalers

import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"strings"
"time"

"github.com/streadway/amqp"
Expand All @@ -25,6 +28,7 @@ const (
rabbitMetricType = "External"
rabbitIncludeUnacked = "includeUnacked"
defaultIncludeUnacked = false
amqps = "amqps"
)

type rabbitMQScaler struct {
Expand All @@ -38,7 +42,10 @@ type rabbitMQMetadata struct {
host string // connection string for AMQP protocol
apiHost string // connection string for management API requests
queueLength int
includeUnacked bool // if true uses HTTP API and requires apiHost, if false uses AMQP and requires host
includeUnacked bool // if true uses HTTP API and requires apiHost, if false uses AMQP and requires host
ca string //Certificate authority file for TLS client authentication. Optional. If authmode is sasl_ssl, this is required.
cert string //Certificate for client authentication. Optional. If authmode is sasl_ssl, this is required.
key string //Key for client authentication. Optional. If authmode is sasl_ssl, this is required.
}

type queueInfo struct {
Expand All @@ -59,7 +66,7 @@ func NewRabbitMQScaler(resolvedEnv, metadata, authParams map[string]string) (Sca
if meta.includeUnacked {
return &rabbitMQScaler{metadata: meta}, nil
} else {
conn, ch, err := getConnectionAndChannel(meta.host)
conn, ch, err := getConnectionAndChannel(meta.host, meta.ca, meta.cert, meta.key)
if err != nil {
return nil, fmt.Errorf("error establishing rabbitmq connection: %s", err)
}
Expand Down Expand Up @@ -110,6 +117,24 @@ func parseRabbitMQMetadata(resolvedEnv, metadata, authParams map[string]string)
}
}

if strings.HasPrefix(meta.host, amqps) {
if val, ok := authParams["ca"]; ok {
meta.ca = val
} else {
return nil, fmt.Errorf("rabbitmq host is using amqps://, but no ca given")
}
if val, ok := authParams["cert"]; ok {
meta.cert = val
} else {
return nil, fmt.Errorf("rabbitmq host is using amqps://, no cert given")
}
if val, ok := authParams["key"]; ok {
meta.key = val
} else {
return nil, fmt.Errorf("rabbitmq host is using amqps://, no key given")
}
}

if val, ok := metadata["queueName"]; ok {
meta.queueName = val
} else {
Expand All @@ -130,8 +155,27 @@ func parseRabbitMQMetadata(resolvedEnv, metadata, authParams map[string]string)
return &meta, nil
}

func getConnectionAndChannel(host string) (*amqp.Connection, *amqp.Channel, error) {
conn, err := amqp.Dial(host)
func getConnectionAndChannel(host string, caFile string, certFile string, keyFile string) (*amqp.Connection, *amqp.Channel, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the recent change, we have added tls_config.go in pkg/util. That is supposed to be one-stop destination to enable tls encryption. Can you please move this to tls_config.go

var conn *amqp.Connection
var err error

if strings.HasPrefix(host, amqps) {
cfg := new(tls.Config)

cfg.RootCAs = x509.NewCertPool()

if ca, err := ioutil.ReadFile(caFile); err == nil {
cfg.RootCAs.AppendCertsFromPEM(ca)
}

if cert, err := tls.LoadX509KeyPair(certFile, keyFile); err == nil {
Copy link
Contributor

@aman-bansal aman-bansal Oct 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion, this should not be the required parameter. client cert and the key are only required when client authentication at the server is set to REQUIREANDVERIFY. In other cases, it could be empty.

Until and unless I am missing something completely in regards to rabbitmq tls.

cfg.Certificates = append(cfg.Certificates, cert)
}
conn, err = amqp.DialTLS(host, cfg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could fail in some cases. So DialTLS has this defined:
if config.TLSClientConfig.ServerName == "" { config.TLSClientConfig.ServerName = uri.Host }

Either specify InsecureSkipVerify or take servername as parameter. @ahmelsayed this is same what I have mentioned in this #1263

} else {
conn, err = amqp.Dial(host)
}

if err != nil {
return nil, nil, err
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/scalers/rabbitmq_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ var testRabbitMQMetadata = []parseRabbitMQMetadataTestData{
{map[string]string{"queueLength": "10"}, true, map[string]string{"host": host}},
// properly formed metadata with includeUnacked
{map[string]string{"queueLength": "10", "queueName": "sample", "apiHostFromEnv": apiHost, "includeUnacked": "true"}, false, map[string]string{}},
// properly formed metadata with amqps but no cert/ca/key
{map[string]string{"queueLength": "10", "queueName": "sample", "hostFromEnv": host}, true, map[string]string{"host": "amqps://user:sercet@somehost.com:5236/vhost"}},
// properly formed metadata with amqps
{map[string]string{"queueLength": "10", "queueName": "sample", "hostFromEnv": host}, false, map[string]string{"host": "amqps://user:sercet@somehost.com:5236/vhost", "ca": "ca", "cert": "cert", "key": "key"}},
}

var rabbitMQMetricIdentifiers = []rabbitMQMetricIdentifier{
Expand Down