Skip to content

Commit

Permalink
Streaming Receiver Tests (Azure#156)
Browse files Browse the repository at this point in the history
  • Loading branch information
HarshaNalluru authored and ramya-rao-a committed Jan 11, 2019
1 parent 19df395 commit aadf0a0
Showing 1 changed file with 157 additions and 3 deletions.
160 changes: 157 additions & 3 deletions test/streamingReceiver.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ import {
const testMessages: SendableMessageInfo[] = [
{
body: "hello1",
messageId: `test message ${generateUuid}`
messageId: `test message ${generateUuid()}`
},
{
body: "hello2",
messageId: `test message ${generateUuid}`
messageId: `test message ${generateUuid()}`
}
];

Expand All @@ -46,7 +46,9 @@ async function testPeekMsgsLength(
should.equal(peekedMsgs.length, expectedPeekLength);
}

describe("ReceiveBatch from Queue/Subscription", function(): void {
const maxDeliveryCount = 10;

describe("Streaming Receiver from Queue/Subscription", function(): void {
let namespace: Namespace;
let queueClient: QueueClient;
let topicClient: TopicClient;
Expand Down Expand Up @@ -247,4 +249,156 @@ describe("ReceiveBatch from Queue/Subscription", function(): void {

await receiveListener.stop();
});

it("Abandoned message is retained in the Queue with incremented deliveryCount. After 10 times, you can only get it from the dead letter queue.", async function(): Promise<
void
> {
await queueClient.sendBatch(testMessages);

let checkDeliveryCount0 = 0;
let checkDeliveryCount1 = 0;

const receiveListener = await queueClient.receive(
(msg: ServiceBusMessage) => {
if (msg.messageId === testMessages[0].messageId) {
should.equal(msg.deliveryCount, checkDeliveryCount0);
checkDeliveryCount0++;
} else if (msg.messageId === testMessages[1].messageId) {
should.equal(msg.deliveryCount, checkDeliveryCount1);
checkDeliveryCount1++;
}
return msg.abandon();
},
(err: Error) => {
should.not.exist(err);
},
{ autoComplete: false }
);

await delay(4000);

await receiveListener.stop();

should.equal(checkDeliveryCount0, maxDeliveryCount);
should.equal(checkDeliveryCount1, maxDeliveryCount);

await testPeekMsgsLength(queueClient, 0); // No messages in the queue

const deadLetterQueuePath = Namespace.getDeadLetterQueuePathForQueue(queueClient.name);
const deadletterQueueClient = namespace.createQueueClient(deadLetterQueuePath);

const deadLetterMsgs = await deadletterQueueClient.receiveBatch(2);
should.equal(Array.isArray(deadLetterMsgs), true);
should.equal(deadLetterMsgs.length, 2);
should.equal(deadLetterMsgs[0].deliveryCount, maxDeliveryCount);
should.equal(deadLetterMsgs[1].deliveryCount, maxDeliveryCount);
should.equal(deadLetterMsgs[0].messageId, testMessages[0].messageId);
should.equal(deadLetterMsgs[1].messageId, testMessages[1].messageId);

await deadLetterMsgs[0].complete();
await deadLetterMsgs[1].complete();

await testPeekMsgsLength(deadletterQueueClient, 0);
});

it("Abandoned message is retained in the Subsrciption with incremented deliveryCount. After 10 times, you can only get it from the dead letter.", async function(): Promise<
void
> {
await topicClient.sendBatch(testMessages);

let checkDeliveryCount0 = 0;
let checkDeliveryCount1 = 0;
const receiveListener = await subscriptionClient.receive(
(msg: ServiceBusMessage) => {
if (msg.messageId === testMessages[0].messageId) {
should.equal(msg.deliveryCount, checkDeliveryCount0);
checkDeliveryCount0++;
} else if (msg.messageId === testMessages[1].messageId) {
should.equal(msg.deliveryCount, checkDeliveryCount1);
checkDeliveryCount1++;
}
return msg.abandon();
},
(err: Error) => {
should.not.exist(err);
},
{ autoComplete: false }
);

await delay(4000);

await receiveListener.stop();

should.equal(checkDeliveryCount0, maxDeliveryCount);
should.equal(checkDeliveryCount1, maxDeliveryCount);

const peekedMsgs = await subscriptionClient.peek(2);
should.equal(peekedMsgs.length, 0);

const deadLetterSubscriptionPath = Namespace.getDeadLetterSubcriptionPathForSubcription(
topicClient.name,
subscriptionClient.subscriptionName
);

const deadletterSubscriptionClient = namespace.createSubscriptionClient(
deadLetterSubscriptionPath ? deadLetterSubscriptionPath : "",
subscriptionClient.subscriptionName
);

await testPeekMsgsLength(deadletterSubscriptionClient, 2); // Two messages in the DL

const deadLetterMsgs = await deadletterSubscriptionClient.receiveBatch(2);
should.equal(Array.isArray(deadLetterMsgs), true);
should.equal(deadLetterMsgs.length, 2);
should.equal(deadLetterMsgs[0].deliveryCount, maxDeliveryCount);
should.equal(deadLetterMsgs[1].deliveryCount, maxDeliveryCount);
should.equal(deadLetterMsgs[0].messageId, testMessages[0].messageId);
should.equal(deadLetterMsgs[1].messageId, testMessages[1].messageId);

await deadLetterMsgs[0].complete();
await deadLetterMsgs[1].complete();

await testPeekMsgsLength(deadletterSubscriptionClient, 0);
});

it("With auto-complete enabled, manual completion in the Queue by the user should not result in errors", async function(): Promise<
void
> {
await queueClient.sendBatch(testMessages);
await testPeekMsgsLength(queueClient, 2);
const receiveListener = await queueClient.receive(
(msg: ServiceBusMessage) => {
return msg.complete();
},
(err: Error) => {
should.not.exist(err);
}
);

await delay(4000);
await receiveListener.stop();

await testPeekMsgsLength(queueClient, 0);
});

it("With auto-complete enabled, manual completion in the Subscription by the user should not result in errors", async function(): Promise<
void
> {
await topicClient.sendBatch(testMessages);

const receiveListener = await subscriptionClient.receive(
(msg: ServiceBusMessage) => {
return msg.complete();
},
(err: Error) => {
should.not.exist(err);
}
);

await delay(4000);

await receiveListener.stop();

await testPeekMsgsLength(subscriptionClient, 0);
});
});

0 comments on commit aadf0a0

Please sign in to comment.