Skip to content

Commit

Permalink
Fix tests after SDK update.
Browse files Browse the repository at this point in the history
Signed-off-by: Vighnesh Shenoy <vshenoy@microsoft.com>
  • Loading branch information
v-shenoy committed Sep 29, 2022
1 parent f638fda commit 550e364
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 104 deletions.
7 changes: 4 additions & 3 deletions pkg/scalers/azure_servicebus_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ type azureServiceBusMetadata struct {
entityType entityType
fullyQualifiedNamespace string
useRegex bool
regexPattern *regexp.Regexp
entityNameRegex *regexp.Regexp
operation string
scalerIndex int
Expand Down Expand Up @@ -381,7 +380,7 @@ func getQueueLength(ctx context.Context, adminClient *admin.Client, meta *azureS
}

for _, queue := range page.QueueRuntimeProperties {
if meta.regexPattern.FindString(queue.QueueName) == queue.QueueName {
if meta.entityNameRegex.FindString(queue.QueueName) == queue.QueueName {
messageCounts = append(messageCounts, int64(queue.ActiveMessageCount))
}
}
Expand All @@ -400,6 +399,8 @@ func getSubscriptionLength(ctx context.Context, adminClient *admin.Client, meta
if subscriptionEntity == nil {
return -1, fmt.Errorf("subscription %s doesn't exist in topic %s", meta.subscriptionName, meta.topicName)
}

return int64(subscriptionEntity.ActiveMessageCount), nil
}

messageCounts := make([]int64, 0)
Expand All @@ -412,7 +413,7 @@ func getSubscriptionLength(ctx context.Context, adminClient *admin.Client, meta
}

for _, subscription := range page.SubscriptionRuntimeProperties {
if meta.regexPattern.FindString(subscription.SubscriptionName) == subscription.SubscriptionName {
if meta.entityNameRegex.FindString(subscription.SubscriptionName) == subscription.SubscriptionName {
messageCounts = append(messageCounts, int64(subscription.ActiveMessageCount))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import (
"os"
"testing"

servicebus "github.com/Azure/azure-service-bus-go"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/admin"
"github.com/joho/godotenv"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -131,10 +132,10 @@ func TestScaler(t *testing.T) {
require.NotEmpty(t, connectionString, "AZURE_SERVICE_BUS_CONNECTION_STRING env variable is required for service bus tests")

queueName1 := fmt.Sprintf("%s-1", queuePrefix)
sbQueueManager1, sbQueue1 := setupServiceBusQueue(t, queueName1)
setupServiceBusQueue(t, queueName1)

queueName2 := fmt.Sprintf("%s-2", queuePrefix)
sbQueueManager2, sbQueue2 := setupServiceBusQueue(t, queueName2)
client, adminClient := setupServiceBusQueue(t, queueName2)

kc := GetKubernetesClient(t)
data, templates := getTemplateData()
Expand All @@ -145,45 +146,28 @@ func TestScaler(t *testing.T) {
"replica count should be 0 after 1 minute")

// test scaling
testScale(t, kc, sbQueue1, sbQueue2, data)
testScale(t, kc, client, queueName1, queueName2, data)

// cleanup
DeleteKubernetesResources(t, kc, testNamespace, data, templates)
cleanupServiceBusQueue(t, sbQueueManager1, queueName1)
cleanupServiceBusQueue(t, sbQueueManager2, queueName2)
cleanupServiceBusQueue(t, adminClient, queueName1)
cleanupServiceBusQueue(t, adminClient, queueName2)
}

func setupServiceBusQueue(t *testing.T, queueName string) (*servicebus.QueueManager, *servicebus.Queue) {
// Connect to service bus namespace.
sbNamespace, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connectionString))
func setupServiceBusQueue(t *testing.T, queueName string) (*azservicebus.Client, *admin.Client) {
adminClient, err := admin.NewClientFromConnectionString(connectionString, nil)
assert.NoErrorf(t, err, "cannot connect to service bus namespace - %s", err)

sbQueueManager := sbNamespace.NewQueueManager()
// Delete the queue if already exists
_, _ = adminClient.DeleteQueue(context.Background(), queueName, nil)

createQueue(t, sbQueueManager, queueName)
_, err = adminClient.CreateQueue(context.Background(), queueName, nil)
assert.NoErrorf(t, err, "cannot create the queue - %s", err)

sbQueue, err := sbNamespace.NewQueue(queueName)
assert.NoErrorf(t, err, "cannot create client for queue - %s", err)

return sbQueueManager, sbQueue
}

func createQueue(t *testing.T, sbQueueManager *servicebus.QueueManager, queueName string) {
// delete queue if already exists
sbQueues, err := sbQueueManager.List(context.Background())
assert.NoErrorf(t, err, "cannot fetch queue list for service bus namespace - %s", err)

for _, queue := range sbQueues {
if queue.Name == queueName {
t.Log("Service Bus Queue already exists. Deleting.")
err := sbQueueManager.Delete(context.Background(), queueName)
assert.NoErrorf(t, err, "cannot delete existing service bus queue - %s", err)
}
}
client, err := azservicebus.NewClientFromConnectionString(connectionString, nil)
assert.NoErrorf(t, err, "cannot connect to service bus namespace - %s", err)

// create queue
_, err = sbQueueManager.Put(context.Background(), queueName)
assert.NoErrorf(t, err, "cannot create service bus queue - %s", err)
return client, adminClient
}

func getTemplateData() (templateData, []Template) {
Expand All @@ -206,10 +190,11 @@ func getTemplateData() (templateData, []Template) {
}
}

func testScale(t *testing.T, kc *kubernetes.Clientset, sbQueue1, sbQueue2 *servicebus.Queue, data templateData) {
t.Log("--- testing scale up ---")
addMessages(sbQueue1, 2)
addMessages(sbQueue2, 4)
func testScale(t *testing.T, kc *kubernetes.Clientset, client *azservicebus.Client,
queueName1, queueName2 string, data templateData) {
t.Log("--- testing scale ---")
addMessages(t, client, queueName1, 2)
addMessages(t, client, queueName2, 4)

assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 6, 60, 1),
"replica count should be 6 after 1 minute")
Expand All @@ -226,15 +211,19 @@ func testScale(t *testing.T, kc *kubernetes.Clientset, sbQueue1, sbQueue2 *servi
"replica count should be 3 after 1 minute")
}

func addMessages(sbQueue *servicebus.Queue, count int) {
func addMessages(t *testing.T, client *azservicebus.Client, queueName string, count int) {
sender, err := client.NewSender(queueName, nil)
assert.NoErrorf(t, err, "cannot create the sender - %s", err)
for i := 0; i < count; i++ {
msg := fmt.Sprintf("Message - %d", i)
_ = sbQueue.Send(context.Background(), servicebus.NewMessageFromString(msg))
_ = sender.SendMessage(context.Background(), &azservicebus.Message{
Body: []byte(msg),
}, nil)
}
}

func cleanupServiceBusQueue(t *testing.T, sbQueueManager *servicebus.QueueManager, queueName string) {
func cleanupServiceBusQueue(t *testing.T, adminClient *admin.Client, queueName string) {
t.Log("--- cleaning up ---")
err := sbQueueManager.Delete(context.Background(), queueName)
_, err := adminClient.DeleteQueue(context.Background(), queueName, nil)
assert.NoErrorf(t, err, "cannot delete service bus queue - %s", err)
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import (
"os"
"testing"

servicebus "github.com/Azure/azure-service-bus-go"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/admin"
"github.com/joho/godotenv"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -133,7 +134,7 @@ func TestScaler(t *testing.T) {
t.Log("--- setting up ---")
require.NotEmpty(t, connectionString, "AZURE_SERVICE_BUS_CONNECTION_STRING env variable is required for service bus tests")

sbTopicManager, sbTopic := setupServiceBusTopicAndSubscription(t)
client, adminClient := setupServiceBusTopicAndSubscription(t)

kc := GetKubernetesClient(t)
data, templates := getTemplateData()
Expand All @@ -144,68 +145,48 @@ func TestScaler(t *testing.T) {
"replica count should be 0 after 1 minute")

// test scaling
testScaleUp(t, kc, sbTopic, data)
testScale(t, kc, client, data)

// cleanup
DeleteKubernetesResources(t, kc, testNamespace, data, templates)
cleanupServiceBusTopic(t, sbTopicManager)
cleanupServiceBusTopic(t, adminClient, topicName)
}

func setupServiceBusTopicAndSubscription(t *testing.T) (*servicebus.TopicManager, *servicebus.Topic) {
// Connect to service bus namespace.
sbNamespace, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connectionString))
func setupServiceBusTopicAndSubscription(t *testing.T) (*azservicebus.Client, *admin.Client) {
adminClient, err := admin.NewClientFromConnectionString(connectionString, nil)
assert.NoErrorf(t, err, "cannot connect to service bus namespace - %s", err)

sbTopicManager := sbNamespace.NewTopicManager()
// Delete the topic if already exists
_, _ = adminClient.DeleteTopic(context.Background(), topicName, nil)

createTopicAndSubscriptions(t, sbNamespace, sbTopicManager)
_, err = adminClient.CreateTopic(context.Background(), topicName, nil)
assert.NoErrorf(t, err, "cannot create the topic - %s", err)

sbTopic, err := sbNamespace.NewTopic(topicName)
assert.NoErrorf(t, err, "cannot create client for topic - %s", err)
subscriptionName1 := fmt.Sprintf("%s-1", subscriptionPrefix)
setupSubscription(t, adminClient, topicName, subscriptionName1)
subscriptionName2 := fmt.Sprintf("%s-2", subscriptionPrefix)
setupSubscription(t, adminClient, topicName, subscriptionName2)

return sbTopicManager, sbTopic
}

func createTopicAndSubscriptions(t *testing.T, sbNamespace *servicebus.Namespace, sbTopicManager *servicebus.TopicManager) {
// Delete service bus topic if already exists.
sbTopics, err := sbTopicManager.List(context.Background())
assert.NoErrorf(t, err, "cannot fetch topic list for service bus namespace - %s", err)

// Delete service bus topic if already exists.
for _, topic := range sbTopics {
if topic.Name == topicName {
t.Log("Service Bus Topic already exists. Deleting.")
err := sbTopicManager.Delete(context.Background(), topicName)
assert.NoErrorf(t, err, "cannot delete existing service bus topic - %s", err)
}
}

// Create service bus topic.
_, err = sbTopicManager.Put(context.Background(), topicName)
assert.NoErrorf(t, err, "cannot create service bus topic - %s", err)

// Create subscription within topic
sbSubscriptionManager, err := sbNamespace.NewSubscriptionManager(topicName)
assert.NoErrorf(t, err, "cannot create subscription manager for topic - %s", err)

sbSub1, err := sbSubscriptionManager.Put(context.Background(), fmt.Sprintf("%s-1", subscriptionPrefix))
assert.NoErrorf(t, err, "cannot create subscription 1 for topic - %s", err)

err = sbSubscriptionManager.DeleteRule(context.Background(), sbSub1.Name, "$Default")
assert.NoErrorf(t, err, "cannot delete default rule for subscription 1 - %s", err)
client, err := azservicebus.NewClientFromConnectionString(connectionString, nil)
assert.NoErrorf(t, err, "cannot connect to service bus namespace - %s", err)

label1 := "SUB1"
_, err = sbSubscriptionManager.PutRule(context.Background(), sbSub1.Name, "testRule", servicebus.CorrelationFilter{Label: &label1})
assert.NoErrorf(t, err, "cannot create filter rule for subscription 1 - %s", err)
return client, adminClient
}

sbSub2, err := sbSubscriptionManager.Put(context.Background(), fmt.Sprintf("%s-2", subscriptionPrefix))
assert.NoErrorf(t, err, "cannot create subscription 2 for topic - %s", err)
func setupSubscription(t *testing.T, adminClient *admin.Client, topicName, subscriptionName string) {
_, err := adminClient.CreateSubscription(context.Background(), topicName, subscriptionName, nil)
assert.NoErrorf(t, err, "cannot create the subscription 1 - %s", err)

err = sbSubscriptionManager.DeleteRule(context.Background(), sbSub2.Name, "$Default")
assert.NoErrorf(t, err, "cannot delete default rule for subscription 2 - %s", err)
_, err = adminClient.DeleteRule(context.Background(), topicName, subscriptionName, "$Default", &admin.DeleteRuleOptions{})
assert.NoErrorf(t, err, "cannot delete default filter rule for subscription - %s", err)

label2 := "SUB2"
_, err = sbSubscriptionManager.PutRule(context.Background(), sbSub2.Name, "testRule", servicebus.CorrelationFilter{Label: &label2})
ruleName := "filterRule"
_, err = adminClient.CreateRule(context.Background(), topicName, subscriptionName, &admin.CreateRuleOptions{
Name: &ruleName,
Filter: &admin.CorrelationFilter{
To: &subscriptionName,
},
})
assert.NoErrorf(t, err, "cannot create filter rule for subscription - %s", err)
}

Expand All @@ -230,13 +211,13 @@ func getTemplateData() (templateData, []Template) {
}
}

func testScaleUp(t *testing.T, kc *kubernetes.Clientset, sbTopic *servicebus.Topic, data templateData) {
func testScale(t *testing.T, kc *kubernetes.Clientset, client *azservicebus.Client, data templateData) {
t.Log("--- testing scale up ---")

// send messages to subscription 1
addMessages(sbTopic, 2, "SUB1")
addMessages(t, client, fmt.Sprintf("%s-1", subscriptionPrefix), 2)
// send messages to subscription 2
addMessages(sbTopic, 4, "SUB2")
addMessages(t, client, fmt.Sprintf("%s-2", subscriptionPrefix), 4)

assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 6, 60, 1),
"replica count should be 1 after 1 minute")
Expand All @@ -253,18 +234,20 @@ func testScaleUp(t *testing.T, kc *kubernetes.Clientset, sbTopic *servicebus.Top
"replica count should be 3 after 1 minute")
}

func addMessages(sbTopic *servicebus.Topic, count int, label string) {
func addMessages(t *testing.T, client *azservicebus.Client, subscriptionName string, count int) {
sender, err := client.NewSender(topicName, nil)
assert.NoErrorf(t, err, "cannot create the sender - %s", err)
for i := 0; i < count; i++ {
msg := servicebus.NewMessageFromString(fmt.Sprintf("Message - %d", i))
if label != "" {
msg.Label = label
}
_ = sbTopic.Send(context.Background(), msg)
msg := fmt.Sprintf("Message - %d", i)
_ = sender.SendMessage(context.Background(), &azservicebus.Message{
Body: []byte(msg),
To: &subscriptionName,
}, nil)
}
}

func cleanupServiceBusTopic(t *testing.T, sbTopicManager *servicebus.TopicManager) {
func cleanupServiceBusTopic(t *testing.T, adminClient *admin.Client, topicName string) {
t.Log("--- cleaning up ---")
err := sbTopicManager.Delete(context.Background(), topicName)
_, err := adminClient.DeleteTopic(context.Background(), topicName, nil)
assert.NoErrorf(t, err, "cannot delete service bus topic - %s", err)
}

0 comments on commit 550e364

Please sign in to comment.