Skip to content

Commit

Permalink
Add test for partitioned super stream consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
Mike Tunnicliffe committed Dec 2, 2024
1 parent 88b9c66 commit 94fbc3c
Showing 1 changed file with 146 additions and 87 deletions.
233 changes: 146 additions & 87 deletions test/e2e/superstream_consumer.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { expect } from "chai"
import { Client, Offset } from "../../src"
import { Message } from "../../src/publisher"
import { Message, MessageOptions } from "../../src/publisher"
import { range } from "../../src/util"
import { createClient, createStreamName } from "../support/fake_data"
import { Rabbit } from "../support/rabbit"
Expand All @@ -17,8 +17,6 @@ describe("super stream consumer", () => {
beforeEach(async () => {
client = await createClient(username, password)
superStreamName = createStreamName()
noOfPartitions = await rabbit.createSuperStream(superStreamName)
sender = await messageSender(client, superStreamName)
})

afterEach(async () => {
Expand All @@ -30,118 +28,161 @@ describe("super stream consumer", () => {
} catch (e) {}
})

it("querying partitions - return the same number of partitions", async () => {
const partitions = await client.queryPartitions({ superStream: superStreamName })

expect(partitions.length).to.be.equal(noOfPartitions)
})

it("querying partitions - return the name of the streams making up the superstream", async () => {
const partitions = await client.queryPartitions({ superStream: superStreamName })
describe("random partitioning", () => {
beforeEach(async () => {
noOfPartitions = await rabbit.createSuperStream(superStreamName)
sender = await messageSender(client, superStreamName)
})

expect(range(noOfPartitions).map((i) => `${superStreamName}-${i}`)).to.deep.eq(partitions)
})
it("querying partitions - return the same number of partitions", async () => {
const partitions = await client.queryPartitions({ superStream: superStreamName })

it("declaring a super stream consumer on an existing super stream - no error is thrown", async () => {
await client.declareSuperStreamConsumer({ superStream: superStreamName }, (_message: Message) => {
return
expect(partitions.length).to.be.equal(noOfPartitions)
})
})

it("declaring a super stream consumer on an existing super stream - read a message", async () => {
await sender(1)
const messages: Message[] = []
it("querying partitions - return the name of the streams making up the superstream", async () => {
const partitions = await client.queryPartitions({ superStream: superStreamName })

await client.declareSuperStreamConsumer({ superStream: superStreamName }, (message: Message) => {
messages.push(message)
expect(range(noOfPartitions).map((i) => `${superStreamName}-${i}`)).to.deep.eq(partitions)
})

await eventually(() => {
expect(messages).to.have.length(1)
const [message] = messages
expect(message.content.toString()).to.be.eq(`${testMessageContent}-0`)
it("declaring a super stream consumer on an existing super stream - no error is thrown", async () => {
await client.declareSuperStreamConsumer({ superStream: superStreamName }, (_message: Message) => {
return
})
})
})

it("for a consumer the number of connections should be equals to the partitions' number", async () => {
await client.declareSuperStreamConsumer({ superStream: superStreamName }, (_) => {
return
})
it("declaring a super stream consumer on an existing super stream - read a message", async () => {
await sender(1)
const messages: Message[] = []

await eventually(() => {
expect(client.consumerCounts()).to.be.eql(noOfPartitions)
await client.declareSuperStreamConsumer({ superStream: superStreamName }, (message: Message) => {
messages.push(message)
})

await eventually(() => {
expect(messages).to.have.length(1)
const [message] = messages
expect(message.content.toString()).to.be.eq(`${testMessageContent}-0`)
})
})
})

it("reading multiple messages - each message should be read only once", async () => {
const noOfMessages = 20
await sender(noOfMessages)
const messages: Message[] = []
it("for a consumer the number of connections should be equals to the partitions' number", async () => {
await client.declareSuperStreamConsumer({ superStream: superStreamName }, (_) => {
return
})

await client.declareSuperStreamConsumer({ superStream: superStreamName }, (message: Message) => {
messages.push(message)
await eventually(() => {
expect(client.consumerCounts()).to.be.eql(noOfPartitions)
})
})

await eventually(() => {
expect(messages).to.have.length(noOfMessages)
it("reading multiple messages - each message should be read only once", async () => {
const noOfMessages = 20
await sender(noOfMessages)
const messages: Message[] = []

await client.declareSuperStreamConsumer({ superStream: superStreamName }, (message: Message) => {
messages.push(message)
})

await eventually(() => {
expect(messages).to.have.length(noOfMessages)
})
})
})

it("multiple composite consumers with same consumerRef - each message should be read only once", async () => {
const noOfMessages = 20
const messages: Message[] = []
it("multiple composite consumers with same consumerRef - each message should be read only once", async () => {
const noOfMessages = 20
const messages: Message[] = []

await client.declareSuperStreamConsumer(
{ superStream: superStreamName, consumerRef: "counting-messages" },
(message: Message) => messages.push(message)
)
await client.declareSuperStreamConsumer(
{ superStream: superStreamName, consumerRef: "counting-messages" },
(message: Message) => messages.push(message)
)
await client.declareSuperStreamConsumer(
{ superStream: superStreamName, consumerRef: "counting-messages" },
(message: Message) => messages.push(message)
)
await client.declareSuperStreamConsumer(
{ superStream: superStreamName, consumerRef: "counting-messages" },
(message: Message) => messages.push(message)
)

await sender(noOfMessages)
await sender(noOfMessages)

await eventually(() => {
expect(messages).to.have.length(noOfMessages)
await eventually(() => {
expect(messages).to.have.length(noOfMessages)
})
})

it("reading multiple messages - get messages only at a specific consuming point timestamp", async () => {
const noOfMessages = 20
await sender(5)
const sleepingTime = 5000
await sleep(sleepingTime)
await sender(noOfMessages)
const messages: Message[] = []

await client.declareSuperStreamConsumer(
{
superStream: superStreamName,
offset: Offset.timestamp(new Date(Date.now() - (sleepingTime - 1000))),
},
(message: Message) => {
messages.push(message)
}
)

await eventually(() => {
expect(messages).to.have.length(noOfMessages)
})
}).timeout(10000)

it("closing the locator closes all connections", async () => {
await client.declareSuperStreamConsumer({ superStream: superStreamName }, (_) => {
return
})

await client.close()

await eventually(async () => {
const connections = await rabbit.getConnections()
expect(connections).to.have.length(0)
}, 5000)
}).timeout(5000)
})

it("reading multiple messages - get messages only at a specific consuming point timestamp", async () => {
const noOfMessages = 20
await sender(5)
const sleepingTime = 5000
await sleep(sleepingTime)
await sender(noOfMessages)
const messages: Message[] = []

await client.declareSuperStreamConsumer(
{
superStream: superStreamName,
offset: Offset.timestamp(new Date(Date.now() - (sleepingTime - 1000))),
},
(message: Message) => {
messages.push(message)
describe("deterministic partitioning", () => {
beforeEach(async () => {
noOfPartitions = await rabbit.createSuperStream(superStreamName, 2)
sender = await roundRobinSender(client, superStreamName, 2)
})

it("multiple composite consumers with same consumerRef and deterministic partition key - each consumer should read only messages for its partition", async () => {
const noOfMessagesPerPartition = 10
const messages: Message[][] = [[], []]

const allPartitionKeysAreTheSame = (messages) => {

Check failure on line 161 in test/e2e/superstream_consumer.test.ts

View workflow job for this annotation

GitHub Actions / build (18.x)

Parameter 'messages' implicitly has an 'any' type.

Check failure on line 161 in test/e2e/superstream_consumer.test.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

Parameter 'messages' implicitly has an 'any' type.
const partitionKeys = messages.map(m => m.applicationProperties['partition-key'])

Check failure on line 162 in test/e2e/superstream_consumer.test.ts

View workflow job for this annotation

GitHub Actions / build (18.x)

Parameter 'm' implicitly has an 'any' type.

Check failure on line 162 in test/e2e/superstream_consumer.test.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

Parameter 'm' implicitly has an 'any' type.
return partitionKeys.every(k => k === partitionKeys[0])

Check failure on line 163 in test/e2e/superstream_consumer.test.ts

View workflow job for this annotation

GitHub Actions / build (18.x)

Parameter 'k' implicitly has an 'any' type.

Check failure on line 163 in test/e2e/superstream_consumer.test.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

Parameter 'k' implicitly has an 'any' type.
}
)

await eventually(() => {
expect(messages).to.have.length(noOfMessages)
})
}).timeout(10000)
await client.declareSuperStreamConsumer(
{ superStream: superStreamName, consumerRef: "message-partitioning" },
(message: Message) => messages[0].push(message)
)
await client.declareSuperStreamConsumer(
{ superStream: superStreamName, consumerRef: "message-partitioning" },
(message: Message) => messages[1].push(message)
)

it("closing the locator closes all connections", async () => {
await client.declareSuperStreamConsumer({ superStream: superStreamName }, (_) => {
return
})
await sender(noOfMessagesPerPartition)

await client.close()
await eventually(() => {
expect(messages[0]).to.have.length(noOfMessagesPerPartition)
expect(allPartitionKeysAreTheSame(messages[0]))

await eventually(async () => {
const connections = await rabbit.getConnections()
expect(connections).to.have.length(0)
}, 5000)
}).timeout(5000)
expect(messages[1]).to.have.length(noOfMessagesPerPartition)
expect(allPartitionKeysAreTheSame(messages[1]))
})
})
})
})

const testMessageContent = "test message"
Expand All @@ -158,6 +199,24 @@ const messageSender = async (client: Client, superStreamName: string) => {
return sendMessages
}

const roundRobinSender = async (client: Client, superStreamName: string, partitions: number) => {
const routingKeyExtractor = (_content: string, msgOptions: MessageOptions) =>
msgOptions.applicationProperties?.["partition-key"]?.toString()
const publisher = await client.declareSuperStreamPublisher({ superStream: superStreamName }, routingKeyExtractor)

const sendMessages = async (noOfMessagesPerPartition: number) => {
for (let i = 0; i < noOfMessagesPerPartition; i++) {
for (let p = 0; p < partitions; p++) {
await publisher.send(Buffer.from(`${testMessageContent}-${i * partitions + p}`), {
applicationProperties: { "partition-key": `${p}` },
})
}
}
}

return sendMessages
}

const sleep = (ms: number) => {
return new Promise((res) => {
setTimeout(() => {
Expand Down

0 comments on commit 94fbc3c

Please sign in to comment.