From 84725e9c63a4fd26a051579432c35ed4a45869a2 Mon Sep 17 00:00:00 2001 From: Mike Tunnicliffe Date: Mon, 2 Dec 2024 19:11:22 +0000 Subject: [PATCH] Add test for partitioned super stream consumer --- test/e2e/superstream_consumer.test.ts | 233 ++++++++++++++++---------- 1 file changed, 146 insertions(+), 87 deletions(-) diff --git a/test/e2e/superstream_consumer.test.ts b/test/e2e/superstream_consumer.test.ts index 32645e11..ca1d4e6f 100644 --- a/test/e2e/superstream_consumer.test.ts +++ b/test/e2e/superstream_consumer.test.ts @@ -1,6 +1,6 @@ import { expect } from "chai" import { Client, Offset } from "../../src" -import { Message } from "../../src/publisher" +import { Message, MessageOptions } from "../../src/publisher" import { range } from "../../src/util" import { createClient, createStreamName } from "../support/fake_data" import { Rabbit } from "../support/rabbit" @@ -17,8 +17,6 @@ describe("super stream consumer", () => { beforeEach(async () => { client = await createClient(username, password) superStreamName = createStreamName() - noOfPartitions = await rabbit.createSuperStream(superStreamName) - sender = await messageSender(client, superStreamName) }) afterEach(async () => { @@ -30,118 +28,161 @@ describe("super stream consumer", () => { } catch (e) {} }) - it("querying partitions - return the same number of partitions", async () => { - const partitions = await client.queryPartitions({ superStream: superStreamName }) - - expect(partitions.length).to.be.equal(noOfPartitions) - }) - - it("querying partitions - return the name of the streams making up the superstream", async () => { - const partitions = await client.queryPartitions({ superStream: superStreamName }) + describe("random partitioning", () => { + beforeEach(async () => { + noOfPartitions = await rabbit.createSuperStream(superStreamName) + sender = await messageSender(client, superStreamName) + }) - expect(range(noOfPartitions).map((i) => `${superStreamName}-${i}`)).to.deep.eq(partitions) - }) + it("querying partitions - return the same number of partitions", async () => { + const partitions = await client.queryPartitions({ superStream: superStreamName }) - it("declaring a super stream consumer on an existing super stream - no error is thrown", async () => { - await client.declareSuperStreamConsumer({ superStream: superStreamName }, (_message: Message) => { - return + expect(partitions.length).to.be.equal(noOfPartitions) }) - }) - it("declaring a super stream consumer on an existing super stream - read a message", async () => { - await sender(1) - const messages: Message[] = [] + it("querying partitions - return the name of the streams making up the superstream", async () => { + const partitions = await client.queryPartitions({ superStream: superStreamName }) - await client.declareSuperStreamConsumer({ superStream: superStreamName }, (message: Message) => { - messages.push(message) + expect(range(noOfPartitions).map((i) => `${superStreamName}-${i}`)).to.deep.eq(partitions) }) - await eventually(() => { - expect(messages).to.have.length(1) - const [message] = messages - expect(message.content.toString()).to.be.eq(`${testMessageContent}-0`) + it("declaring a super stream consumer on an existing super stream - no error is thrown", async () => { + await client.declareSuperStreamConsumer({ superStream: superStreamName }, (_message: Message) => { + return + }) }) - }) - it("for a consumer the number of connections should be equals to the partitions' number", async () => { - await client.declareSuperStreamConsumer({ superStream: superStreamName }, (_) => { - return - }) + it("declaring a super stream consumer on an existing super stream - read a message", async () => { + await sender(1) + const messages: Message[] = [] - await eventually(() => { - expect(client.consumerCounts()).to.be.eql(noOfPartitions) + await client.declareSuperStreamConsumer({ superStream: superStreamName }, (message: Message) => { + messages.push(message) + }) + + await eventually(() => { + expect(messages).to.have.length(1) + const [message] = messages + expect(message.content.toString()).to.be.eq(`${testMessageContent}-0`) + }) }) - }) - it("reading multiple messages - each message should be read only once", async () => { - const noOfMessages = 20 - await sender(noOfMessages) - const messages: Message[] = [] + it("for a consumer the number of connections should be equals to the partitions' number", async () => { + await client.declareSuperStreamConsumer({ superStream: superStreamName }, (_) => { + return + }) - await client.declareSuperStreamConsumer({ superStream: superStreamName }, (message: Message) => { - messages.push(message) + await eventually(() => { + expect(client.consumerCounts()).to.be.eql(noOfPartitions) + }) }) - await eventually(() => { - expect(messages).to.have.length(noOfMessages) + it("reading multiple messages - each message should be read only once", async () => { + const noOfMessages = 20 + await sender(noOfMessages) + const messages: Message[] = [] + + await client.declareSuperStreamConsumer({ superStream: superStreamName }, (message: Message) => { + messages.push(message) + }) + + await eventually(() => { + expect(messages).to.have.length(noOfMessages) + }) }) - }) - it("multiple composite consumers with same consumerRef - each message should be read only once", async () => { - const noOfMessages = 20 - const messages: Message[] = [] + it("multiple composite consumers with same consumerRef - each message should be read only once", async () => { + const noOfMessages = 20 + const messages: Message[] = [] - await client.declareSuperStreamConsumer( - { superStream: superStreamName, consumerRef: "counting-messages" }, - (message: Message) => messages.push(message) - ) - await client.declareSuperStreamConsumer( - { superStream: superStreamName, consumerRef: "counting-messages" }, - (message: Message) => messages.push(message) - ) + await client.declareSuperStreamConsumer( + { superStream: superStreamName, consumerRef: "counting-messages" }, + (message: Message) => messages.push(message) + ) + await client.declareSuperStreamConsumer( + { superStream: superStreamName, consumerRef: "counting-messages" }, + (message: Message) => messages.push(message) + ) - await sender(noOfMessages) + await sender(noOfMessages) - await eventually(() => { - expect(messages).to.have.length(noOfMessages) + await eventually(() => { + expect(messages).to.have.length(noOfMessages) + }) }) + + it("reading multiple messages - get messages only at a specific consuming point timestamp", async () => { + const noOfMessages = 20 + await sender(5) + const sleepingTime = 5000 + await sleep(sleepingTime) + await sender(noOfMessages) + const messages: Message[] = [] + + await client.declareSuperStreamConsumer( + { + superStream: superStreamName, + offset: Offset.timestamp(new Date(Date.now() - (sleepingTime - 1000))), + }, + (message: Message) => { + messages.push(message) + } + ) + + await eventually(() => { + expect(messages).to.have.length(noOfMessages) + }) + }).timeout(10000) + + it("closing the locator closes all connections", async () => { + await client.declareSuperStreamConsumer({ superStream: superStreamName }, (_) => { + return + }) + + await client.close() + + await eventually(async () => { + const connections = await rabbit.getConnections() + expect(connections).to.have.length(0) + }, 5000) + }).timeout(5000) }) - it("reading multiple messages - get messages only at a specific consuming point timestamp", async () => { - const noOfMessages = 20 - await sender(5) - const sleepingTime = 5000 - await sleep(sleepingTime) - await sender(noOfMessages) - const messages: Message[] = [] - - await client.declareSuperStreamConsumer( - { - superStream: superStreamName, - offset: Offset.timestamp(new Date(Date.now() - (sleepingTime - 1000))), - }, - (message: Message) => { - messages.push(message) + describe("deterministic partitioning", () => { + beforeEach(async () => { + noOfPartitions = await rabbit.createSuperStream(superStreamName, 2) + sender = await roundRobinSender(client, superStreamName, 2) + }) + + it("multiple composite consumers with same consumerRef and deterministic partition key - each consumer should read only messages for its partition", async () => { + const noOfMessagesPerPartition = 10 + const messages: Message[][] = [[], []] + + const allPartitionKeysAreTheSame = (messages) => { + const partitionKeys = messages.map(m => m.applicationProperties['partition-key']) + return partitionKeys.every(k => k === partitionKeys[0]) } - ) - await eventually(() => { - expect(messages).to.have.length(noOfMessages) - }) - }).timeout(10000) + await client.declareSuperStreamConsumer( + { superStream: superStreamName, consumerRef: "message-partitioning" }, + (message: Message) => messages[0].push(message) + ) + await client.declareSuperStreamConsumer( + { superStream: superStreamName, consumerRef: "message-partitioning" }, + (message: Message) => messages[1].push(message) + ) - it("closing the locator closes all connections", async () => { - await client.declareSuperStreamConsumer({ superStream: superStreamName }, (_) => { - return - }) + await sender(noOfMessagesPerPartition) - await client.close() + await eventually(() => { + expect(messages[0]).to.have.length(noOfMessagesPerPartition) + expect(allPartitionKeysAreTheSame(messages[0])) - await eventually(async () => { - const connections = await rabbit.getConnections() - expect(connections).to.have.length(0) - }, 5000) - }).timeout(5000) + expect(messages[1]).to.have.length(noOfMessagesPerPartition) + expect(allPartitionKeysAreTheSame(messages[1])) + }) + }) + }) }) const testMessageContent = "test message" @@ -158,6 +199,24 @@ const messageSender = async (client: Client, superStreamName: string) => { return sendMessages } +const roundRobinSender = async (client: Client, superStreamName: string, partitions: number) => { + const routingKeyExtractor = (_content: string, msgOptions: MessageOptions) => + msgOptions.applicationProperties?.["partition-key"]?.toString() + const publisher = await client.declareSuperStreamPublisher({ superStream: superStreamName }, routingKeyExtractor) + + const sendMessages = async (noOfMessagesPerPartition: number) => { + for (let i = 0; i < noOfMessagesPerPartition; i++) { + for (let p = 0; p < partitions; p++) { + await publisher.send(Buffer.from(`${testMessageContent}-${i * partitions + p}`), { + applicationProperties: { "partition-key": `${p}` }, + }) + } + } + } + + return sendMessages +} + const sleep = (ms: number) => { return new Promise((res) => { setTimeout(() => {