Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Cassandra Scaler #1841

Closed
wants to merge 105 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
105 commits
Select commit Hold shift + click to select a range
5eb41d8
Add Cassandra Scaler
nilayasiktoprak May 30, 2021
25f7bf7
Update pull request id
nilayasiktoprak May 30, 2021
cc771c3
correcting naming and links in README (#1842)
Ritikaa96 Jun 1, 2021
f29f780
cron: adding validation for range or hyphenated inputs for start and …
Ritikaa96 Jun 2, 2021
d80447e
Fix: READY and ACTIVE fields of ScaledJob to show status. (#1855)
Shubham82 Jun 2, 2021
68c5e7f
Fix CHANGELOG.md
nilayasiktoprak Jun 2, 2021
7e7e8ef
Update Cassandra scaler files
nilayasiktoprak Oct 23, 2021
6e7688e
Add deleted kustomization files
nilayasiktoprak Oct 23, 2021
b44a003
Update CHANGELOG.md
nilayasiktoprak Oct 23, 2021
5d3d198
Show HashiCorp Vault Address when using 'kubectl get ta' or 'kubectl …
Shubham82 Jun 7, 2021
b550036
Don't panic when HashiCorp Vault path doesn't exist (#1867)
zroubalik Jun 7, 2021
c6fb478
Support non-public cloud environments in the Azure Storage Queue and …
amirschw Jun 8, 2021
40e1ed2
Configure Renovate (#1824)
renovate[bot] Jun 14, 2021
a442bc6
docs: Add container image scanning in security policy (#1882)
tomkerkhove Jun 14, 2021
31c38fa
feat: adding version metadata to be able to specify kafka broker vers…
Dorn- Jun 14, 2021
61571c2
Extend Azure Monitor scaler to support custom metrics (#1883)
amirschw Jun 14, 2021
5a4e5ee
Update Renovate to describe updates in the commmit (#1886)
zroubalik Jun 14, 2021
26c6e7a
Enable PR controls for Renovate (#1889)
zroubalik Jun 14, 2021
55618ea
Update actions/cache action to v2.1.6 (#1884)
renovate[bot] Jun 14, 2021
c88def5
Update actions/checkout action to v2 (#1890)
renovate[bot] Jun 14, 2021
25b19f3
bump to golang 1.15.13 (#1902)
zroubalik Jun 24, 2021
10cf74d
linter: update & use revive instead of golint (#1901)
zroubalik Jun 24, 2021
6094641
Add support for Trigger Authentication for InfluxDB (#1904)
acobaugh Jun 25, 2021
9f888aa
scale handler chores (#1903)
zroubalik Jun 28, 2021
9ea3962
Handle password value correctly; add tests. (#1939)
tdolby-at-uk-ibm-com Jul 7, 2021
11ee26d
Renovate: update only actions (#1947)
zroubalik Jul 12, 2021
e9d1796
bump deps (#1948)
zroubalik Jul 12, 2021
318d319
Add ratelimitting parameters to keda metrics apiserver to allow overr…
Bez625 Jul 12, 2021
5aef970
Renovate: ignore other deps then GH actions (#1954)
zroubalik Jul 12, 2021
19a076d
Kafka scaler: batch GetOffset requests (#1956)
lionelvillard Jul 13, 2021
8196dc2
Add fallback functionality (#1910)
mishamo Jul 14, 2021
7574a1c
chore: change Keda -> KEDA (#1959)
zroubalik Jul 14, 2021
b0b256d
Provide Solace Queue Scaler (#1945)
dennis-brinley Jul 14, 2021
9f3ac2c
Introduce Idle Replica Mode (#1958)
zroubalik Jul 15, 2021
15eec7f
AZ queue - Idle Replicas test fix (#1961)
zroubalik Jul 15, 2021
62658b1
Change ClusterRoleBinding name to align with industry standard (#1616)
ubergesundheit Jul 15, 2021
8e4f0ad
update test dependencies (#1974)
zroubalik Jul 22, 2021
0285e19
Fix timing in azure-queue, redis-cluster-streams, and influxdb tests …
ahmelsayed Jul 25, 2021
5b40e11
Adds support to use regex with HTTP protocol in RabbitMQ Scaler (#1957)
JorTurFer Jul 27, 2021
c232c22
controller test - increasing timeouts (#1968)
zroubalik Jul 28, 2021
2c41523
Add Selenium Grid scaler (#1971)
prashanth-volvocars Jul 31, 2021
786ac2d
influxdb scaler: add support for integer query results (#1977)
philomory Aug 2, 2021
ae23486
add Kafka Version support to Changelog (#2000)
zroubalik Aug 2, 2021
c0309d5
prepare testenv in Makefile (#1960)
zroubalik Aug 2, 2021
088a633
use Bash instead of Dash (#2003)
zroubalik Aug 3, 2021
b521d7c
Improve and update E2E tests (#1980)
ahmelsayed Aug 3, 2021
f90e71a
Add suport custom metricName in RabbitMQ scaler (#1976)
rtnpro Aug 3, 2021
f3c625e
Support pod conditions for pending job count calculation (#1970)
yaronya Aug 3, 2021
789f24d
set timeout on deployment rollout (#2007)
ahmelsayed Aug 4, 2021
57313eb
Adding `pendingPodConditions` field to ScaledJob CRD (#2009)
zroubalik Aug 4, 2021
ede55f9
Support non-public cloud environments in the Azure Service Bus scaler…
amirschw Aug 5, 2021
5188c35
Add Kubernetes Workload scaler (#2010)
JorTurFer Aug 6, 2021
1063116
Use `scaled[object/job].keda.sh/` prefix for KEDA related labels (#2008)
zroubalik Aug 6, 2021
1bd9c6d
Prepare 2.4.0 release (#2012)
zroubalik Aug 6, 2021
d92cc3f
Update Python in dev containers and add .gitattributes (#2021)
JorTurFer Aug 9, 2021
15046c6
validation and check in cron scaler for same start and end case (#2032)
Ritikaa96 Aug 18, 2021
e00e894
chore: KEDA was accepted as CNCF Incubation project (#2039)
tomkerkhove Aug 18, 2021
1092cf9
Add Bearer Auth for Metrics API scaler (#2028)
JorTurFer Aug 18, 2021
80d6098
Update cron scaler to parse the cron with parser instead of searching…
JorTurFer Aug 20, 2021
27952d1
Adapt Prometheus test to #381 (#2051)
fbalicchia Aug 23, 2021
b56c1b2
use Patch to set FallbackCondition on ScaledObject.Status (#2037)
zroubalik Aug 26, 2021
5502157
Escape `queueName` and `vhostName` in RabbitMQ Scaler before use them…
Aug 26, 2021
baf87f8
Anonymize the host in case of HTTP failure (RabbitMQ Scaler) (#2041)
Aug 27, 2021
d3d4480
Update Makefile to ensure that bash is used instead of sh or another …
Aug 30, 2021
8d77301
Adds explicit the option of setting KEDA_HTTP_DEFAULT_TIMEOUT (#2062)
Aug 30, 2021
2e960f0
fix typo in isScalableCache (#2064)
zroubalik Aug 30, 2021
9e8a76a
build-tools image: bump Golang to 1.16.7 (#2067)
zroubalik Sep 1, 2021
31365b4
bump golangci-lint to 1.42.0 (#2066)
zroubalik Sep 1, 2021
4491e3b
bump Golang to 1.16.7 (#2065)
zroubalik Sep 1, 2021
1a9a3a7
Bump bruceadams/get-release from 1.2.2 to 1.2.3 (#2081)
dependabot[bot] Sep 1, 2021
64f0236
Migrate to Kubebuilder v3 (#2082)
zroubalik Sep 1, 2021
830078b
Bump github.com/aws/aws-sdk-go from 1.40.34 to 1.40.35 (#2088)
dependabot[bot] Sep 2, 2021
da8d74f
Bump github.com/tidwall/gjson from 1.8.1 to 1.9.0 (#2089)
dependabot[bot] Sep 2, 2021
f58278e
Adds e2e test for PostgreSQL scaler (#2091)
Sep 9, 2021
64bd86e
Add custom http timeout in RabbitMQ Scaler (#2086)
Sep 10, 2021
80d139e
TriggerAuthentication/Vault: add support for vault namespace (#2085)
chapurlatn Sep 10, 2021
588b950
ScaledJob: introduce `MultipleScalersCalculation` (#2016)
TsuyoshiUshio Sep 13, 2021
0173666
Add Graphite Scaler + e2e tests (#2092)
bpinske Sep 15, 2021
b3a78df
Set the previous behavior in dev container (#2102)
Sep 20, 2021
4ff572b
Raises an error when RabbitMQ regex matches more than 1 page (#2103)
Sep 20, 2021
e30b1f8
Add support to provide the metric name in Azure Log Analytics Scaler …
Oct 4, 2021
9c6ebf0
Add e2e test for cron scaler (#2107)
Oct 4, 2021
9caa338
Add support to get connection data from Trigger Authorization in MSSQ…
Oct 4, 2021
9b16547
Add support to get connection data from Trigger Authorization in MySQ…
Oct 4, 2021
0d87c97
Add support to get connection data from Trigger Authorization in Mong…
Oct 4, 2021
f3dbefa
Add support to get connection data from Trigger Authorization in Post…
Oct 4, 2021
f49d84d
Fix mongdb e2e tests (#2140)
Oct 4, 2021
647c809
Add Makefile mockgen targets (#2137)
mihaitodor Oct 4, 2021
7e2683c
Fix typo in Makefile (#2154)
Oct 5, 2021
b325d7b
Add pageSize (using regex) in RabbitMQ Scaler (#2162)
Oct 7, 2021
cb6cfea
Artemis Scaler parses out broker config parameters in case `restAPITe…
Ritikaa96 Oct 7, 2021
c3715f5
Add `unsafeSsl` parameter on InfluxDB (#2157)
dtsioumas Oct 7, 2021
3b1948d
bump Golang to 1.16.9 (#2186)
zroubalik Oct 12, 2021
9f35a34
bump deps (#2185)
zroubalik Oct 12, 2021
a15e44b
update liiklus protobuf generation (#2184)
zroubalik Oct 12, 2021
24e3e62
Prometheus scaler: omit `serverAddress` from generated metric name (#…
zroubalik Oct 13, 2021
1ce3d88
Improve metric name creation to be unique using scaler index inside t…
Oct 13, 2021
5cc3969
Provide support for configuring authentication through TriggerAuthent…
Ritikaa96 Oct 13, 2021
c5021f9
MySQL Scaler: don't expose connection string in `metricName` (#2171)
zroubalik Oct 13, 2021
744c3c7
chore: Provide configuration for automatically closing inactive issue…
tomkerkhove Oct 13, 2021
694277d
chore: Change `feature` as allowed label for stale issues, instead of…
tomkerkhove Oct 14, 2021
19b4f03
docs: Add step in release process for next release (#2205)
tomkerkhove Oct 20, 2021
a073d6a
Update .gitignore
nilayasiktoprak Oct 23, 2021
4671de2
Update cassandra files
nilayasiktoprak Oct 23, 2021
1a0edc9
Merge branch 'main' into cassandra_scaler
nilayasiktoprak Oct 23, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,6 @@ cover.out

# GO debug binary
cmd/manager/debug.test

# pre-commit
.pre-commit-config.yaml
6 changes: 3 additions & 3 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@ default_stages: [commit, push]
minimum_pre_commit_version: "1.20.0"
repos:
- repo: git://github.com/dnephin/pre-commit-golang
rev: v0.3.5
rev: v0.4.0
hooks:
- id: go-fmt
name: Run go fmt against the code
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v3.4.0
rev: v4.0.1
hooks:
- id: trailing-whitespace
- id: detect-private-key
- id: end-of-file-fixer
- id: check-merge-conflict
- id: mixed-line-ending
- repo: https://github.com/thlorenz/doctoc.git
rev: v2.0.0
rev: v2.1.0
hooks:
- id: doctoc
name: Add TOC for md files
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
github.com/go-redis/redis v6.15.9+incompatible
github.com/go-sql-driver/mysql v1.6.0
github.com/golang/mock v1.6.0
github.com/gocql/gocql v0.0.0-20210515062232-b7ef815b4556
github.com/golang/protobuf v1.5.2
github.com/google/go-cmp v0.5.6
github.com/hashicorp/vault/api v1.1.1
Expand Down
9 changes: 9 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,13 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k=
github.com/blang/semver v3.5.0+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk=
github.com/blang/semver v3.5.1+incompatible h1:cQNTCjp13qL8KC3Nbxr/y2Bqb63oX6wdnnjpJbkM4JQ=
github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk=
github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ=
github.com/blendle/zapdriver v1.3.1/go.mod h1:mdXfREi6u5MArG4j9fewC+FGnXaBR+T4Ox4J2u4eHCc=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q=
github.com/c2h5oh/datasize v0.0.0-20171227191756-4eba002a5eae/go.mod h1:S/7n9copUssQ56c7aAgHqftWO4LTf4xY6CGWt8Bc+3M=
github.com/cenkalti/backoff/v3 v3.0.0 h1:ske+9nBpD9qZsTBoF41nW5L+AIuFBKMeze18XQ3eG1c=
Expand Down Expand Up @@ -394,6 +397,8 @@ github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/E
github.com/godbus/dbus v0.0.0-20190422162347-ade71ed3457e/go.mod h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s=
github.com/gocql/gocql v0.0.0-20210515062232-b7ef815b4556 h1:N/MD/sr6o61X+iZBAT2qEUF023s4KbA8RWfKzl0L6MQ=
github.com/gocql/gocql v0.0.0-20210515062232-b7ef815b4556/go.mod h1:DL0ekTmBSTdlNF25Orwt/JMzqIq3EJ4MVa/J/uK64OY=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
Expand Down Expand Up @@ -438,6 +443,7 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS
github.com/golang/protobuf v1.5.1/go.mod h1:DopwsBzvsk0Fs44TXzsVbJyPhcCPeIwnvohx4u74HPM=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/snappy v0.0.0-20170215233205-553a64147049/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
Expand Down Expand Up @@ -535,6 +541,9 @@ github.com/grpc-ecosystem/grpc-gateway v1.14.6/go.mod h1:zdiPV4Yse/1gnckTHtghG4G
github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4=
github.com/hashicorp/consul/api v1.3.0/go.mod h1:MmDNSzIMUjNpY/mQ398R4bk2FnqQLoPndWW5VkKPlCE=
github.com/hashicorp/consul/sdk v0.1.1/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
Expand Down
225 changes: 225 additions & 0 deletions pkg/scalers/cassandra_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
package scalers

import (
"context"
"fmt"
"strconv"
"strings"

"github.com/gocql/gocql"
kedautil "github.com/kedacore/keda/v2/pkg/util"
v2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

// cassandraScaler exposes a data pointer to CassandraMetadata and gocql.Session connection.
type cassandraScaler struct {
metadata *CassandraMetadata
session *gocql.Session
}

// CassandraMetadata defines metadata used by KEDA to query a Cassandra table.
type CassandraMetadata struct {
username string
password string
clusterIPAddress string
port int
consistency gocql.Consistency
protocolVersion int
keyspace string
query string
targetQueryValue int
metricName string
}

var cassandraLog = logf.Log.WithName("cassandra_scaler")

// NewCassandraScaler creates a new Cassandra scaler.
func NewCassandraScaler(config *ScalerConfig) (Scaler, error) {
meta, err := ParseCassandraMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing cassandra metadata: %s", err)
}

session, err := NewCassandraSession(meta)
if err != nil {
return nil, fmt.Errorf("error establishing cassandra session: %s", err)
}

return &cassandraScaler{
metadata: meta,
session: session,
}, nil
}

// ParseCassandraMetadata parses the metadata and returns a CassandraMetadata or an error if the ScalerConfig is invalid.
func ParseCassandraMetadata(config *ScalerConfig) (*CassandraMetadata, error) {
meta := CassandraMetadata{}

if val, ok := config.TriggerMetadata["query"]; ok {
meta.query = val
} else {
return nil, fmt.Errorf("no query given")
}

if val, ok := config.TriggerMetadata["targetQueryValue"]; ok {
targetQueryValue, err := strconv.Atoi(val)
if err != nil {
return nil, fmt.Errorf("targetQueryValue parsing error %s", err.Error())
}
meta.targetQueryValue = targetQueryValue
} else {
return nil, fmt.Errorf("no targetQueryValue given")
}

if val, ok := config.TriggerMetadata["username"]; ok {
meta.username = val
} else {
return nil, fmt.Errorf("no username given")
}

if val, ok := config.TriggerMetadata["port"]; ok {
port, err := strconv.Atoi(val)
if err != nil {
return nil, fmt.Errorf("port parsing error %s", err.Error())
}
meta.port = port
}

if val, ok := config.TriggerMetadata["clusterIPAddress"]; ok {
switch p := meta.port; {
case p > 0:
meta.clusterIPAddress = fmt.Sprintf("%s:%d", val, meta.port)
case strings.Contains(val, ":"):
meta.clusterIPAddress = val
default:
return nil, fmt.Errorf("no port given")
}
} else {
return nil, fmt.Errorf("no cluster IP address given")
}

if val, ok := config.TriggerMetadata["protocolVersion"]; ok {
protocolVersion, err := strconv.Atoi(val)
if err != nil {
return nil, fmt.Errorf("protocolVersion parsing error %s", err.Error())
}
meta.protocolVersion = protocolVersion
} else {
meta.protocolVersion = 4
}

if val, ok := config.TriggerMetadata["consistency"]; ok {
meta.consistency = gocql.ParseConsistency(val)
} else {
meta.consistency = gocql.One
}

if val, ok := config.TriggerMetadata["keyspace"]; ok {
meta.keyspace = val
} else {
return nil, fmt.Errorf("no keyspace given")
}

if val, ok := config.TriggerMetadata["metricName"]; ok {
meta.metricName = kedautil.NormalizeString(fmt.Sprintf("cassandra-%s", val))
} else {
meta.metricName = kedautil.NormalizeString(fmt.Sprintf("cassandra-%s", meta.keyspace))
}

if val, ok := config.AuthParams["password"]; ok {
meta.password = val
} else {
return nil, fmt.Errorf("no password given")
}

return &meta, nil
}

// NewCassandraSession returns a new Cassandra session for the provided CassandraMetadata.
func NewCassandraSession(meta *CassandraMetadata) (*gocql.Session, error) {
cluster := gocql.NewCluster(meta.clusterIPAddress)
cluster.ProtoVersion = meta.protocolVersion
cluster.Consistency = meta.consistency
cluster.Authenticator = gocql.PasswordAuthenticator{
Username: meta.username,
Password: meta.password,
}

session, err := cluster.CreateSession()
if err != nil {
cassandraLog.Error(err, "found error creating session")
return nil, err
}

return session, nil
}

// IsActive returns true if there are pending events to be processed.
func (s *cassandraScaler) IsActive(ctx context.Context) (bool, error) {
messages, err := s.GetQueryResult()
if err != nil {
return false, fmt.Errorf("error inspecting cassandra: %s", err)
}

return messages > 0, nil
}

// GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler.
func (s *cassandraScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec {
targetQueryValue := resource.NewQuantity(int64(s.metadata.targetQueryValue), resource.DecimalSI)
externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: s.metadata.metricName,
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
AverageValue: targetQueryValue,
},
}
metricSpec := v2beta2.MetricSpec{
External: externalMetric, Type: externalMetricType,
}

return []v2beta2.MetricSpec{metricSpec}
}

// GetMetrics returns a value for a supported metric or an error if there is a problem getting the metric.
func (s *cassandraScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
num, err := s.GetQueryResult()
if err != nil {
return []external_metrics.ExternalMetricValue{}, fmt.Errorf("error inspecting cassandra: %s", err)
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(int64(num), resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

// GetQueryResult returns the result of the scaler query.
func (s *cassandraScaler) GetQueryResult() (int, error) {
var value int
if err := s.session.Query(s.metadata.query).Scan(&value); err != nil {
if err != gocql.ErrNotFound {
cassandraLog.Error(err, "query failed")
return 0, err
}
}

return value, nil
}

// Close closes the Cassandra session connection.
func (s *cassandraScaler) Close() error {
s.session.Close()

return nil
}
Loading