Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v2] Add SSL support to RabbitMQ Scaler #1073

Closed
wants to merge 4 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 48 additions & 4 deletions pkg/scalers/rabbitmq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package scalers

import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"strings"
"time"

"github.com/streadway/amqp"
Expand All @@ -25,6 +28,7 @@ const (
rabbitMetricType = "External"
rabbitIncludeUnacked = "includeUnacked"
defaultIncludeUnacked = false
amqps = "amqps"
)

type rabbitMQScaler struct {
Expand All @@ -38,7 +42,10 @@ type rabbitMQMetadata struct {
host string // connection string for AMQP protocol
apiHost string // connection string for management API requests
queueLength int
includeUnacked bool // if true uses HTTP API and requires apiHost, if false uses AMQP and requires host
includeUnacked bool // if true uses HTTP API and requires apiHost, if false uses AMQP and requires host
ca string //Certificate authority file for TLS client authentication. Optional. If authmode is sasl_ssl, this is required.
cert string //Certificate for client authentication. Optional. If authmode is sasl_ssl, this is required.
key string //Key for client authentication. Optional. If authmode is sasl_ssl, this is required.
}

type queueInfo struct {
Expand All @@ -59,7 +66,7 @@ func NewRabbitMQScaler(resolvedEnv, metadata, authParams map[string]string) (Sca
if meta.includeUnacked {
return &rabbitMQScaler{metadata: meta}, nil
} else {
conn, ch, err := getConnectionAndChannel(meta.host)
conn, ch, err := getConnectionAndChannel(meta.host, meta.ca, meta.cert, meta.key)
if err != nil {
return nil, fmt.Errorf("error establishing rabbitmq connection: %s", err)
}
Expand Down Expand Up @@ -114,6 +121,24 @@ func parseRabbitMQMetadata(resolvedEnv, metadata, authParams map[string]string)
}
}

if strings.HasPrefix(meta.host, amqps) {
if val, ok := authParams["ca"]; ok {
meta.ca = val
} else {
return nil, fmt.Errorf("no ca given")
}
if val, ok := authParams["cert"]; ok {
meta.cert = val
} else {
return nil, fmt.Errorf("no cert given")
}
if val, ok := authParams["key"]; ok {
meta.key = val
} else {
return nil, fmt.Errorf("no key given")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from the usage below, I'm assuming those are expected to have file paths, not actual certificate values, right? I think you would want to use metadata map instead of the authParams map, as the latter contains values resolved from a TriggerAuthentication CRD if present for a particular trigger.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also a bit of nitpicking, but perhaps rabbitmq host is using amqps://, but no key/cert/ca provided would be a bit more descriptive error message.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also a bit of nitpicking, but perhaps rabbitmq host is using amqps://, but no key/cert/ca provided would be a bit more descriptive error message.

Done

}
}

if val, ok := metadata["queueName"]; ok {
meta.queueName = val
} else {
Expand All @@ -134,8 +159,27 @@ func parseRabbitMQMetadata(resolvedEnv, metadata, authParams map[string]string)
return &meta, nil
}

func getConnectionAndChannel(host string) (*amqp.Connection, *amqp.Channel, error) {
conn, err := amqp.Dial(host)
func getConnectionAndChannel(host string, caFile string, certFile string, keyFile string) (*amqp.Connection, *amqp.Channel, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the recent change, we have added tls_config.go in pkg/util. That is supposed to be one-stop destination to enable tls encryption. Can you please move this to tls_config.go

var conn *amqp.Connection
var err error

if strings.HasPrefix(host, amqps) {
cfg := new(tls.Config)

cfg.RootCAs = x509.NewCertPool()

if ca, err := ioutil.ReadFile(caFile); err == nil {
cfg.RootCAs.AppendCertsFromPEM(ca)
}

if cert, err := tls.LoadX509KeyPair(certFile, keyFile); err == nil {
Copy link
Contributor

@aman-bansal aman-bansal Oct 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion, this should not be the required parameter. client cert and the key are only required when client authentication at the server is set to REQUIREANDVERIFY. In other cases, it could be empty.

Until and unless I am missing something completely in regards to rabbitmq tls.

cfg.Certificates = append(cfg.Certificates, cert)
}
conn, err = amqp.DialTLS(host, cfg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could fail in some cases. So DialTLS has this defined:
if config.TLSClientConfig.ServerName == "" { config.TLSClientConfig.ServerName = uri.Host }

Either specify InsecureSkipVerify or take servername as parameter. @ahmelsayed this is same what I have mentioned in this #1263

} else {
conn, err = amqp.Dial(host)
}

if err != nil {
return nil, nil, err
}
Expand Down