Skip to content

Commit

Permalink
address naming changes for postgresql scaler (#593)
Browse files Browse the repository at this point in the history
  • Loading branch information
dimberman authored Feb 3, 2020
1 parent e9b8e1f commit b068c64
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 45 deletions.
4 changes: 2 additions & 2 deletions pkg/handler/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,8 @@ func (h *ScaleHandler) getScaler(name, namespace, triggerType string, resolvedEn
return scalers.NewHuaweiCloudeyeScaler(triggerMetadata, authParams)
case "azure-blob":
return scalers.NewAzureBlobScaler(resolvedEnv, triggerMetadata, authParams, podIdentity)
case "postgres":
return scalers.NewPostgresScaler(resolvedEnv, triggerMetadata, authParams)
case "postgresql":
return scalers.NewPostgreSQLScaler(resolvedEnv, triggerMetadata, authParams)
case "mysql":
return scalers.NewMySQLScaler(resolvedEnv, triggerMetadata, authParams)
default:
Expand Down
86 changes: 43 additions & 43 deletions pkg/scalers/postgres_scaler.go → pkg/scalers/postgresql_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,64 +14,64 @@ import (
)

const (
pgMetricName = "num"
defaultPostgresPassword = ""
pgMetricName = "num"
defaultPostgreSQLPassword = ""
)

type postGRESScaler struct {
metadata *postGRESMetadata
type postgreSQLScaler struct {
metadata *postgreSQLMetadata
connection *sql.DB
}

type postGRESMetadata struct {
connStr string
userName string
password string
host string
port string
query string
dbName string
sslmode string
type postgreSQLMetadata struct {
connection string
userName string
password string
host string
port string
query string
dbName string
sslmode string
}

var postgresLog = logf.Log.WithName("postgres_scaler")
var postgreSQLLog = logf.Log.WithName("postgreSQL_scaler")

// NewPostgresScaler creates a new postgres scaler
func NewPostgresScaler(resolvedEnv, metadata, authParams map[string]string) (Scaler, error) {
meta, err := parsePostgresMetadata(resolvedEnv, metadata, authParams)
// NewPostgreSQLScaler creates a new postgreSQL scaler
func NewPostgreSQLScaler(resolvedEnv, metadata, authParams map[string]string) (Scaler, error) {
meta, err := parsePostgreSQLMetadata(resolvedEnv, metadata, authParams)
if err != nil {
return nil, fmt.Errorf("error parsing postgres metadata: %s", err)
return nil, fmt.Errorf("error parsing postgreSQL metadata: %s", err)
}

conn, err := getConnection(meta)
if err != nil {
return nil, fmt.Errorf("error establishing postgres connection: %s", err)
return nil, fmt.Errorf("error establishing postgreSQL connection: %s", err)
}
return &postGRESScaler{
return &postgreSQLScaler{
metadata: meta,
connection: conn,
}, nil
}

func parsePostgresMetadata(resolvedEnv, metadata, authParams map[string]string) (*postGRESMetadata, error) {
meta := postGRESMetadata{}
func parsePostgreSQLMetadata(resolvedEnv, metadata, authParams map[string]string) (*postgreSQLMetadata, error) {
meta := postgreSQLMetadata{}

if val, ok := metadata["query"]; ok {
meta.query = val
} else {
return nil, fmt.Errorf("no query given")
}

if val, ok := authParams["connStr"]; ok {
meta.connStr = val
} else if val, ok := metadata["connStr"]; ok {
if val, ok := authParams["connection"]; ok {
meta.connection = val
} else if val, ok := metadata["connection"]; ok {
hostSetting := val

if val, ok := resolvedEnv[hostSetting]; ok {
meta.connStr = val
meta.connection = val
}
} else {
meta.connStr = ""
meta.connection = ""
if val, ok := metadata["host"]; ok {
meta.host = val
} else {
Expand All @@ -98,7 +98,7 @@ func parsePostgresMetadata(resolvedEnv, metadata, authParams map[string]string)
} else {
return nil, fmt.Errorf("no sslmode name given")
}
meta.password = defaultPostgresPassword
meta.password = defaultPostgreSQLPassword
if val, ok := authParams["password"]; ok {
meta.password = val
} else if val, ok := metadata["password"]; ok && val != "" {
Expand All @@ -111,10 +111,10 @@ func parsePostgresMetadata(resolvedEnv, metadata, authParams map[string]string)
return &meta, nil
}

func getConnection(meta *postGRESMetadata) (*sql.DB, error) {
func getConnection(meta *postgreSQLMetadata) (*sql.DB, error) {
var connStr string
if meta.connStr != "" {
connStr = meta.connStr
if meta.connection != "" {
connStr = meta.connection
} else {
connStr = fmt.Sprintf(
"host=%s port=%s user=%s dbname=%s sslmode=%s password=%s",
Expand All @@ -128,49 +128,49 @@ func getConnection(meta *postGRESMetadata) (*sql.DB, error) {
}
db, err := sql.Open("postgres", connStr)
if err != nil {
postgresLog.Error(err, fmt.Sprintf("Found error opening: %s", err))
postgreSQLLog.Error(err, fmt.Sprintf("Found error opening postgreSQL: %s", err))
return nil, err
}
err = db.Ping()
if err != nil {
postgresLog.Error(err, fmt.Sprintf("Found error pinging: %s", err))
postgreSQLLog.Error(err, fmt.Sprintf("Found error pinging postgreSQL: %s", err))
return nil, err
}
return db, nil
}

// Close disposes of postgres connections
func (s *postGRESScaler) Close() error {
func (s *postgreSQLScaler) Close() error {
err := s.connection.Close()
if err != nil {
postgresLog.Error(err, "Error closing postgres connection")
postgreSQLLog.Error(err, "Error closing postgreSQL connection")
return err
}
return nil
}

// IsActive returns true if there are pending messages to be processed
func (s *postGRESScaler) IsActive(ctx context.Context) (bool, error) {
func (s *postgreSQLScaler) IsActive(ctx context.Context) (bool, error) {
messages, err := s.getActiveNumber()
if err != nil {
return false, fmt.Errorf("error inspecting postgres: %s", err)
return false, fmt.Errorf("error inspecting postgreSQL: %s", err)
}

return messages > 0, nil
}

func (s *postGRESScaler) getActiveNumber() (int, error) {
func (s *postgreSQLScaler) getActiveNumber() (int, error) {
var id int
err := s.connection.QueryRow(s.metadata.query).Scan(&id)
if err != nil {
postgresLog.Error(err, fmt.Sprintf("could not query PG: %s", err))
return 0, fmt.Errorf("could not query PG: %s", err)
postgreSQLLog.Error(err, fmt.Sprintf("could not query postgreSQL: %s", err))
return 0, fmt.Errorf("could not query postgreSQL: %s", err)
}
return id, nil
}

// GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler
func (s *postGRESScaler) GetMetricSpecForScaling() []v2beta1.MetricSpec {
func (s *postgreSQLScaler) GetMetricSpecForScaling() []v2beta1.MetricSpec {
targetListLengthQty := resource.NewQuantity(1, resource.DecimalSI)
externalMetric := &v2beta1.ExternalMetricSource{
MetricName: pgMetricName,
Expand All @@ -183,10 +183,10 @@ func (s *postGRESScaler) GetMetricSpecForScaling() []v2beta1.MetricSpec {
}

// GetMetrics returns value for a supported metric and an error if there is a problem getting the metric
func (s *postGRESScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
func (s *postgreSQLScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
num, err := s.getActiveNumber()
if err != nil {
return []external_metrics.ExternalMetricValue{}, fmt.Errorf("error inspecting postgres: %s", err)
return []external_metrics.ExternalMetricValue{}, fmt.Errorf("error inspecting postgreSQL: %s", err)
}

metric := external_metrics.ExternalMetricValue{
Expand Down

0 comments on commit b068c64

Please sign in to comment.