Skip to content

Commit

Permalink
Upgraded Azure Pipelines to support demands (kedacore#2795)
Browse files Browse the repository at this point in the history
Signed-off-by: mortx <mortxbox@live.com>
  • Loading branch information
Eldarrin authored and goku321 committed Aug 8, 2022
1 parent 119e868 commit 8b4e563
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ To learn more about our roadmap, we recommend reading [this document](ROADMAP.md
- **General:** Use `mili` scale for the returned metrics ([#3135](https://github.com/kedacore/keda/issue/3135))
- **General:** Use more readable timestamps in KEDA Operator logs ([#3066](https://github.com/kedacore/keda/issue/3066))
- **AWS SQS Queue Scaler:** Support for scaling to include in-flight messages. ([#3133](https://github.com/kedacore/keda/issues/3133))
- **Azure Pipelines Scaler:** Add support for Azure Pipelines to support demands (capabilities) ([#2328](https://github.com/kedacore/keda/issues/2328))
- **GCP Stackdriver Scaler:** Added aggregation parameters ([#3008](https://github.com/kedacore/keda/issues/3008))
- **Kafka Scaler:** Handle Sarama errors properly ([#3056](https://github.com/kedacore/keda/issues/3056))
- **Kafka Scaler:** Support of passphrase encrypted PKCS #\8 private key ([3449](https://github.com/kedacore/keda/issues/3449))
Expand Down
69 changes: 67 additions & 2 deletions pkg/scalers/azure_pipelines_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type azurePipelinesMetadata struct {
organizationURL string
organizationName string
personalAccessToken string
parent string
demands string
poolID int
targetPipelinesQueueLength int64
activationTargetPipelinesQueueLength int64
Expand Down Expand Up @@ -117,6 +119,18 @@ func parseAzurePipelinesMetadata(ctx context.Context, config *ScalerConfig, http
return nil, fmt.Errorf("no personalAccessToken given")
}

if val, ok := config.TriggerMetadata["parent"]; ok && val != "" {
meta.parent = config.TriggerMetadata["parent"]
} else {
meta.parent = ""
}

if val, ok := config.TriggerMetadata["demands"]; ok && val != "" {
meta.demands = config.TriggerMetadata["demands"]
} else {
meta.demands = ""
}

if val, ok := config.TriggerMetadata["poolName"]; ok && val != "" {
var err error
meta.poolID, err = getPoolIDFromName(ctx, val, &meta, httpClient)
Expand Down Expand Up @@ -242,16 +256,67 @@ func (s *azurePipelinesScaler) GetAzurePipelinesQueueLength(ctx context.Context)
return -1, fmt.Errorf("the Azure DevOps REST API result returned no value data despite successful code. url: %s", url)
}

// for each job check if it parent fulfilled, then demand fulfilled, then finally pool fulfilled
for _, value := range jobs {
v := value.(map[string]interface{})
if v["result"] == nil {
count++
if s.metadata.parent == "" && s.metadata.demands == "" {
// no plan defined, just add a count
count++
} else {
if s.metadata.parent == "" {
// doesn't use parent, switch to demand
if getCanAgentDemandFulfilJob(v, s.metadata) {
count++
}
} else {
// does use parent
if getCanAgentParentFulfilJob(v, s.metadata) {
count++
}
}
}
}
}

return count, err
}

// Determine if the scaledjob has the right demands to spin up
func getCanAgentDemandFulfilJob(v map[string]interface{}, metadata *azurePipelinesMetadata) bool {
var demandsReq = v["demands"].([]interface{})
var demandsAvail = strings.Split(metadata.demands, ",")
var countDemands = 0
for _, dr := range demandsReq {
for _, da := range demandsAvail {
strDr := fmt.Sprintf("%v", dr)
if !strings.HasPrefix(strDr, "Agent.Version") {
if strDr == da {
countDemands++
}
}
}
}

return countDemands == len(demandsReq)-1
}

// Determine if the Job and Parent Agent Template have matching capabilities
func getCanAgentParentFulfilJob(v map[string]interface{}, metadata *azurePipelinesMetadata) bool {
matchedAgents, ok := v["matchedAgents"].([]interface{})
if !ok {
// ADO is already processing
return false
}

for _, m := range matchedAgents {
n := m.(map[string]interface{})
if metadata.parent == n["name"].(string) {
return true
}
}
return false
}

func (s *azurePipelinesScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec {
externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Expand Down
106 changes: 106 additions & 0 deletions pkg/scalers/azure_pipelines_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,109 @@ func TestAzurePipelinesGetMetricSpecForScaling(t *testing.T) {
}
}
}

func getMatchedAgentMetaData(url string) *azurePipelinesMetadata {
meta := azurePipelinesMetadata{}
meta.organizationName = "testOrg"
meta.organizationURL = url
meta.parent = "test-keda-template"
meta.personalAccessToken = "testPAT"
meta.poolID = 1
meta.targetPipelinesQueueLength = 1

return &meta
}

func TestAzurePipelinesMatchedAgent(t *testing.T) {
var response = `{"count":1,"value":[{"demands":["Agent.Version -gtVersion 2.144.0"],"matchedAgents":[{"id":1,"name":"test-keda-template"}]}]}`

var apiStub = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(response))
}))

meta := getMatchedAgentMetaData(apiStub.URL)

mockAzurePipelinesScaler := azurePipelinesScaler{
metadata: meta,
httpClient: http.DefaultClient,
}

queuelen, err := mockAzurePipelinesScaler.GetAzurePipelinesQueueLength(context.TODO())

if err != nil {
t.Fail()
}

if queuelen < 1 {
t.Fail()
}
}

func getDemandJobMetaData(url string) *azurePipelinesMetadata {
meta := getMatchedAgentMetaData(url)
meta.parent = ""
meta.demands = "testDemand,kubernetes"

return meta
}

func getMismatchDemandJobMetaData(url string) *azurePipelinesMetadata {
meta := getMatchedAgentMetaData(url)
meta.parent = ""
meta.demands = "testDemand,iamnotademand"

return meta
}

func TestAzurePipelinesMatchedDemandAgent(t *testing.T) {
var response = `{"count":1,"value":[{"demands":["Agent.Version -gtVersion 2.144.0", "testDemand", "kubernetes"],"matchedAgents":[{"id":1,"name":"test-keda-template"}]}]}`

var apiStub = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(response))
}))

meta := getDemandJobMetaData(apiStub.URL)

mockAzurePipelinesScaler := azurePipelinesScaler{
metadata: meta,
httpClient: http.DefaultClient,
}

queuelen, err := mockAzurePipelinesScaler.GetAzurePipelinesQueueLength(context.TODO())

if err != nil {
t.Fail()
}

if queuelen < 1 {
t.Fail()
}
}

func TestAzurePipelinesNonMatchedDemandAgent(t *testing.T) {
var response = `{"count":1,"value":[{"demands":["Agent.Version -gtVersion 2.144.0", "testDemand", "kubernetes"],"matchedAgents":[{"id":1,"name":"test-keda-template"}]}]}`

var apiStub = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(response))
}))

meta := getMismatchDemandJobMetaData(apiStub.URL)

mockAzurePipelinesScaler := azurePipelinesScaler{
metadata: meta,
httpClient: http.DefaultClient,
}

queuelen, err := mockAzurePipelinesScaler.GetAzurePipelinesQueueLength(context.TODO())

if err != nil {
t.Fail()
}

if queuelen > 0 {
t.Fail()
}
}

0 comments on commit 8b4e563

Please sign in to comment.