From 3ffc11285bd0d3d2ef0201fecdd45fc05c689dbf Mon Sep 17 00:00:00 2001 From: Mike Tunnicliffe <mike.tunnicliffe@filament.uk.com> Date: Mon, 2 Dec 2024 16:23:49 +0000 Subject: [PATCH 1/3] Set super-stream consumer property if needed When a stream consumer attaches to a stream that is part of a super stream it should set the "super-stream" property to the name of the super stream in order to ensure the partition index will be setup enabling the consumers to be balanced and rebalanced across the partitions. --- src/client.ts | 19 ++++++++++++++++--- src/super_stream_consumer.ts | 7 ++++++- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/src/client.ts b/src/client.ts index 89493fe8..f2d2db6a 100644 --- a/src/client.ts +++ b/src/client.ts @@ -184,7 +184,11 @@ export class Client { return res.ok } - public async declareConsumer(params: DeclareConsumerParams, handle: ConsumerFunc): Promise<Consumer> { + public async declareConsumer( + params: DeclareConsumerParams, + handle: ConsumerFunc, + superStreamConsumer?: SuperStreamConsumer + ): Promise<Consumer> { const connection = await this.getConnection(params.stream, "consumer", params.connectionClosedListener) const consumerId = connection.getNextConsumerId() @@ -211,7 +215,7 @@ export class Client { await this.closeConsumer(consumer.extendedId) }) this.consumers.set(consumer.extendedId, { connection, consumer, params }) - await this.declareConsumerOnConnection(params, consumerId, connection) + await this.declareConsumerOnConnection(params, consumerId, connection, superStreamConsumer?.superStream) this.logger.info( `New consumer created with stream name ${params.stream}, consumer id ${consumerId} and offset ${params.offset.type}` ) @@ -245,6 +249,7 @@ export class Client { ): Promise<SuperStreamConsumer> { const partitions = await this.queryPartitions({ superStream }) return SuperStreamConsumer.create(handle, { + superStream, locator: this, consumerRef: consumerRef || `${superStream}-${randomUUID()}`, offset: offset || Offset.first(), @@ -468,7 +473,12 @@ export class Client { } } - private async declareConsumerOnConnection(params: DeclareConsumerParams, consumerId: number, connection: Connection) { + private async declareConsumerOnConnection( + params: DeclareConsumerParams, + consumerId: number, + connection: Connection, + superStream?: string + ) { const properties: Record<string, string> = {} if (params.singleActive && !params.consumerRef) { throw new Error("consumerRef is mandatory when declaring a single active consumer") @@ -477,6 +487,9 @@ export class Client { properties["single-active-consumer"] = "true" properties["name"] = params.consumerRef! } + if (superStream) { + properties["super-stream"] = superStream + } if (params.filter) { for (let i = 0; i < params.filter.values.length; i++) { properties[`filter.${i}`] = params.filter.values[i] diff --git a/src/super_stream_consumer.ts b/src/super_stream_consumer.ts index 088b74b9..a8213739 100644 --- a/src/super_stream_consumer.ts +++ b/src/super_stream_consumer.ts @@ -5,6 +5,7 @@ import { Offset } from "./requests/subscribe_request" export class SuperStreamConsumer { private consumers: Map<string, Consumer> = new Map<string, Consumer>() public consumerRef: string + readonly superStream: string private locator: Client private partitions: string[] private offset: Offset @@ -12,12 +13,14 @@ export class SuperStreamConsumer { private constructor( readonly handle: ConsumerFunc, params: { + superStream: string locator: Client partitions: string[] consumerRef: string offset: Offset } ) { + this.superStream = params.superStream this.consumerRef = params.consumerRef this.locator = params.locator this.partitions = params.partitions @@ -29,7 +32,8 @@ export class SuperStreamConsumer { this.partitions.map(async (p) => { const partitionConsumer = await this.locator.declareConsumer( { stream: p, consumerRef: this.consumerRef, offset: this.offset, singleActive: true }, - this.handle + this.handle, + this ) this.consumers.set(p, partitionConsumer) return @@ -40,6 +44,7 @@ export class SuperStreamConsumer { static async create( handle: ConsumerFunc, params: { + superStream: string locator: Client partitions: string[] consumerRef: string From 84725e9c63a4fd26a051579432c35ed4a45869a2 Mon Sep 17 00:00:00 2001 From: Mike Tunnicliffe <mike.tunnicliffe@filament.uk.com> Date: Mon, 2 Dec 2024 19:11:22 +0000 Subject: [PATCH 2/3] Add test for partitioned super stream consumer --- test/e2e/superstream_consumer.test.ts | 233 ++++++++++++++++---------- 1 file changed, 146 insertions(+), 87 deletions(-) diff --git a/test/e2e/superstream_consumer.test.ts b/test/e2e/superstream_consumer.test.ts index 32645e11..ca1d4e6f 100644 --- a/test/e2e/superstream_consumer.test.ts +++ b/test/e2e/superstream_consumer.test.ts @@ -1,6 +1,6 @@ import { expect } from "chai" import { Client, Offset } from "../../src" -import { Message } from "../../src/publisher" +import { Message, MessageOptions } from "../../src/publisher" import { range } from "../../src/util" import { createClient, createStreamName } from "../support/fake_data" import { Rabbit } from "../support/rabbit" @@ -17,8 +17,6 @@ describe("super stream consumer", () => { beforeEach(async () => { client = await createClient(username, password) superStreamName = createStreamName() - noOfPartitions = await rabbit.createSuperStream(superStreamName) - sender = await messageSender(client, superStreamName) }) afterEach(async () => { @@ -30,118 +28,161 @@ describe("super stream consumer", () => { } catch (e) {} }) - it("querying partitions - return the same number of partitions", async () => { - const partitions = await client.queryPartitions({ superStream: superStreamName }) - - expect(partitions.length).to.be.equal(noOfPartitions) - }) - - it("querying partitions - return the name of the streams making up the superstream", async () => { - const partitions = await client.queryPartitions({ superStream: superStreamName }) + describe("random partitioning", () => { + beforeEach(async () => { + noOfPartitions = await rabbit.createSuperStream(superStreamName) + sender = await messageSender(client, superStreamName) + }) - expect(range(noOfPartitions).map((i) => `${superStreamName}-${i}`)).to.deep.eq(partitions) - }) + it("querying partitions - return the same number of partitions", async () => { + const partitions = await client.queryPartitions({ superStream: superStreamName }) - it("declaring a super stream consumer on an existing super stream - no error is thrown", async () => { - await client.declareSuperStreamConsumer({ superStream: superStreamName }, (_message: Message) => { - return + expect(partitions.length).to.be.equal(noOfPartitions) }) - }) - it("declaring a super stream consumer on an existing super stream - read a message", async () => { - await sender(1) - const messages: Message[] = [] + it("querying partitions - return the name of the streams making up the superstream", async () => { + const partitions = await client.queryPartitions({ superStream: superStreamName }) - await client.declareSuperStreamConsumer({ superStream: superStreamName }, (message: Message) => { - messages.push(message) + expect(range(noOfPartitions).map((i) => `${superStreamName}-${i}`)).to.deep.eq(partitions) }) - await eventually(() => { - expect(messages).to.have.length(1) - const [message] = messages - expect(message.content.toString()).to.be.eq(`${testMessageContent}-0`) + it("declaring a super stream consumer on an existing super stream - no error is thrown", async () => { + await client.declareSuperStreamConsumer({ superStream: superStreamName }, (_message: Message) => { + return + }) }) - }) - it("for a consumer the number of connections should be equals to the partitions' number", async () => { - await client.declareSuperStreamConsumer({ superStream: superStreamName }, (_) => { - return - }) + it("declaring a super stream consumer on an existing super stream - read a message", async () => { + await sender(1) + const messages: Message[] = [] - await eventually(() => { - expect(client.consumerCounts()).to.be.eql(noOfPartitions) + await client.declareSuperStreamConsumer({ superStream: superStreamName }, (message: Message) => { + messages.push(message) + }) + + await eventually(() => { + expect(messages).to.have.length(1) + const [message] = messages + expect(message.content.toString()).to.be.eq(`${testMessageContent}-0`) + }) }) - }) - it("reading multiple messages - each message should be read only once", async () => { - const noOfMessages = 20 - await sender(noOfMessages) - const messages: Message[] = [] + it("for a consumer the number of connections should be equals to the partitions' number", async () => { + await client.declareSuperStreamConsumer({ superStream: superStreamName }, (_) => { + return + }) - await client.declareSuperStreamConsumer({ superStream: superStreamName }, (message: Message) => { - messages.push(message) + await eventually(() => { + expect(client.consumerCounts()).to.be.eql(noOfPartitions) + }) }) - await eventually(() => { - expect(messages).to.have.length(noOfMessages) + it("reading multiple messages - each message should be read only once", async () => { + const noOfMessages = 20 + await sender(noOfMessages) + const messages: Message[] = [] + + await client.declareSuperStreamConsumer({ superStream: superStreamName }, (message: Message) => { + messages.push(message) + }) + + await eventually(() => { + expect(messages).to.have.length(noOfMessages) + }) }) - }) - it("multiple composite consumers with same consumerRef - each message should be read only once", async () => { - const noOfMessages = 20 - const messages: Message[] = [] + it("multiple composite consumers with same consumerRef - each message should be read only once", async () => { + const noOfMessages = 20 + const messages: Message[] = [] - await client.declareSuperStreamConsumer( - { superStream: superStreamName, consumerRef: "counting-messages" }, - (message: Message) => messages.push(message) - ) - await client.declareSuperStreamConsumer( - { superStream: superStreamName, consumerRef: "counting-messages" }, - (message: Message) => messages.push(message) - ) + await client.declareSuperStreamConsumer( + { superStream: superStreamName, consumerRef: "counting-messages" }, + (message: Message) => messages.push(message) + ) + await client.declareSuperStreamConsumer( + { superStream: superStreamName, consumerRef: "counting-messages" }, + (message: Message) => messages.push(message) + ) - await sender(noOfMessages) + await sender(noOfMessages) - await eventually(() => { - expect(messages).to.have.length(noOfMessages) + await eventually(() => { + expect(messages).to.have.length(noOfMessages) + }) }) + + it("reading multiple messages - get messages only at a specific consuming point timestamp", async () => { + const noOfMessages = 20 + await sender(5) + const sleepingTime = 5000 + await sleep(sleepingTime) + await sender(noOfMessages) + const messages: Message[] = [] + + await client.declareSuperStreamConsumer( + { + superStream: superStreamName, + offset: Offset.timestamp(new Date(Date.now() - (sleepingTime - 1000))), + }, + (message: Message) => { + messages.push(message) + } + ) + + await eventually(() => { + expect(messages).to.have.length(noOfMessages) + }) + }).timeout(10000) + + it("closing the locator closes all connections", async () => { + await client.declareSuperStreamConsumer({ superStream: superStreamName }, (_) => { + return + }) + + await client.close() + + await eventually(async () => { + const connections = await rabbit.getConnections() + expect(connections).to.have.length(0) + }, 5000) + }).timeout(5000) }) - it("reading multiple messages - get messages only at a specific consuming point timestamp", async () => { - const noOfMessages = 20 - await sender(5) - const sleepingTime = 5000 - await sleep(sleepingTime) - await sender(noOfMessages) - const messages: Message[] = [] - - await client.declareSuperStreamConsumer( - { - superStream: superStreamName, - offset: Offset.timestamp(new Date(Date.now() - (sleepingTime - 1000))), - }, - (message: Message) => { - messages.push(message) + describe("deterministic partitioning", () => { + beforeEach(async () => { + noOfPartitions = await rabbit.createSuperStream(superStreamName, 2) + sender = await roundRobinSender(client, superStreamName, 2) + }) + + it("multiple composite consumers with same consumerRef and deterministic partition key - each consumer should read only messages for its partition", async () => { + const noOfMessagesPerPartition = 10 + const messages: Message[][] = [[], []] + + const allPartitionKeysAreTheSame = (messages) => { + const partitionKeys = messages.map(m => m.applicationProperties['partition-key']) + return partitionKeys.every(k => k === partitionKeys[0]) } - ) - await eventually(() => { - expect(messages).to.have.length(noOfMessages) - }) - }).timeout(10000) + await client.declareSuperStreamConsumer( + { superStream: superStreamName, consumerRef: "message-partitioning" }, + (message: Message) => messages[0].push(message) + ) + await client.declareSuperStreamConsumer( + { superStream: superStreamName, consumerRef: "message-partitioning" }, + (message: Message) => messages[1].push(message) + ) - it("closing the locator closes all connections", async () => { - await client.declareSuperStreamConsumer({ superStream: superStreamName }, (_) => { - return - }) + await sender(noOfMessagesPerPartition) - await client.close() + await eventually(() => { + expect(messages[0]).to.have.length(noOfMessagesPerPartition) + expect(allPartitionKeysAreTheSame(messages[0])) - await eventually(async () => { - const connections = await rabbit.getConnections() - expect(connections).to.have.length(0) - }, 5000) - }).timeout(5000) + expect(messages[1]).to.have.length(noOfMessagesPerPartition) + expect(allPartitionKeysAreTheSame(messages[1])) + }) + }) + }) }) const testMessageContent = "test message" @@ -158,6 +199,24 @@ const messageSender = async (client: Client, superStreamName: string) => { return sendMessages } +const roundRobinSender = async (client: Client, superStreamName: string, partitions: number) => { + const routingKeyExtractor = (_content: string, msgOptions: MessageOptions) => + msgOptions.applicationProperties?.["partition-key"]?.toString() + const publisher = await client.declareSuperStreamPublisher({ superStream: superStreamName }, routingKeyExtractor) + + const sendMessages = async (noOfMessagesPerPartition: number) => { + for (let i = 0; i < noOfMessagesPerPartition; i++) { + for (let p = 0; p < partitions; p++) { + await publisher.send(Buffer.from(`${testMessageContent}-${i * partitions + p}`), { + applicationProperties: { "partition-key": `${p}` }, + }) + } + } + } + + return sendMessages +} + const sleep = (ms: number) => { return new Promise((res) => { setTimeout(() => { From 8cb38d487b5f69df7c98aa2df4661a23d08c339c Mon Sep 17 00:00:00 2001 From: Mike Tunnicliffe <mike.tunnicliffe@filament.uk.com> Date: Tue, 7 Jan 2025 15:44:56 +0000 Subject: [PATCH 3/3] Fix typescript compiler and lint errors --- test/e2e/superstream_consumer.test.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/e2e/superstream_consumer.test.ts b/test/e2e/superstream_consumer.test.ts index ca1d4e6f..5d196123 100644 --- a/test/e2e/superstream_consumer.test.ts +++ b/test/e2e/superstream_consumer.test.ts @@ -158,9 +158,9 @@ describe("super stream consumer", () => { const noOfMessagesPerPartition = 10 const messages: Message[][] = [[], []] - const allPartitionKeysAreTheSame = (messages) => { - const partitionKeys = messages.map(m => m.applicationProperties['partition-key']) - return partitionKeys.every(k => k === partitionKeys[0]) + const allPartitionKeysAreTheSame = (messagesToCheck: Message[]) => { + const partitionKeys = messagesToCheck.map((m) => m.applicationProperties?.["partition-key"]) + return partitionKeys.every((k) => k === partitionKeys[0]) } await client.declareSuperStreamConsumer(