From 3ffc11285bd0d3d2ef0201fecdd45fc05c689dbf Mon Sep 17 00:00:00 2001
From: Mike Tunnicliffe <mike.tunnicliffe@filament.uk.com>
Date: Mon, 2 Dec 2024 16:23:49 +0000
Subject: [PATCH 1/3] Set super-stream consumer property if needed

When a stream consumer attaches to a stream that is part of a super stream
it should set the "super-stream" property to the name of the super stream in
order to ensure the partition index will be setup enabling the consumers to
be balanced and rebalanced across the partitions.
---
 src/client.ts                | 19 ++++++++++++++++---
 src/super_stream_consumer.ts |  7 ++++++-
 2 files changed, 22 insertions(+), 4 deletions(-)

diff --git a/src/client.ts b/src/client.ts
index 89493fe8..f2d2db6a 100644
--- a/src/client.ts
+++ b/src/client.ts
@@ -184,7 +184,11 @@ export class Client {
     return res.ok
   }
 
-  public async declareConsumer(params: DeclareConsumerParams, handle: ConsumerFunc): Promise<Consumer> {
+  public async declareConsumer(
+    params: DeclareConsumerParams,
+    handle: ConsumerFunc,
+    superStreamConsumer?: SuperStreamConsumer
+  ): Promise<Consumer> {
     const connection = await this.getConnection(params.stream, "consumer", params.connectionClosedListener)
     const consumerId = connection.getNextConsumerId()
 
@@ -211,7 +215,7 @@ export class Client {
       await this.closeConsumer(consumer.extendedId)
     })
     this.consumers.set(consumer.extendedId, { connection, consumer, params })
-    await this.declareConsumerOnConnection(params, consumerId, connection)
+    await this.declareConsumerOnConnection(params, consumerId, connection, superStreamConsumer?.superStream)
     this.logger.info(
       `New consumer created with stream name ${params.stream}, consumer id ${consumerId} and offset ${params.offset.type}`
     )
@@ -245,6 +249,7 @@ export class Client {
   ): Promise<SuperStreamConsumer> {
     const partitions = await this.queryPartitions({ superStream })
     return SuperStreamConsumer.create(handle, {
+      superStream,
       locator: this,
       consumerRef: consumerRef || `${superStream}-${randomUUID()}`,
       offset: offset || Offset.first(),
@@ -468,7 +473,12 @@ export class Client {
     }
   }
 
-  private async declareConsumerOnConnection(params: DeclareConsumerParams, consumerId: number, connection: Connection) {
+  private async declareConsumerOnConnection(
+    params: DeclareConsumerParams,
+    consumerId: number,
+    connection: Connection,
+    superStream?: string
+  ) {
     const properties: Record<string, string> = {}
     if (params.singleActive && !params.consumerRef) {
       throw new Error("consumerRef is mandatory when declaring a single active consumer")
@@ -477,6 +487,9 @@ export class Client {
       properties["single-active-consumer"] = "true"
       properties["name"] = params.consumerRef!
     }
+    if (superStream) {
+      properties["super-stream"] = superStream
+    }
     if (params.filter) {
       for (let i = 0; i < params.filter.values.length; i++) {
         properties[`filter.${i}`] = params.filter.values[i]
diff --git a/src/super_stream_consumer.ts b/src/super_stream_consumer.ts
index 088b74b9..a8213739 100644
--- a/src/super_stream_consumer.ts
+++ b/src/super_stream_consumer.ts
@@ -5,6 +5,7 @@ import { Offset } from "./requests/subscribe_request"
 export class SuperStreamConsumer {
   private consumers: Map<string, Consumer> = new Map<string, Consumer>()
   public consumerRef: string
+  readonly superStream: string
   private locator: Client
   private partitions: string[]
   private offset: Offset
@@ -12,12 +13,14 @@ export class SuperStreamConsumer {
   private constructor(
     readonly handle: ConsumerFunc,
     params: {
+      superStream: string
       locator: Client
       partitions: string[]
       consumerRef: string
       offset: Offset
     }
   ) {
+    this.superStream = params.superStream
     this.consumerRef = params.consumerRef
     this.locator = params.locator
     this.partitions = params.partitions
@@ -29,7 +32,8 @@ export class SuperStreamConsumer {
       this.partitions.map(async (p) => {
         const partitionConsumer = await this.locator.declareConsumer(
           { stream: p, consumerRef: this.consumerRef, offset: this.offset, singleActive: true },
-          this.handle
+          this.handle,
+          this
         )
         this.consumers.set(p, partitionConsumer)
         return
@@ -40,6 +44,7 @@ export class SuperStreamConsumer {
   static async create(
     handle: ConsumerFunc,
     params: {
+      superStream: string
       locator: Client
       partitions: string[]
       consumerRef: string

From 84725e9c63a4fd26a051579432c35ed4a45869a2 Mon Sep 17 00:00:00 2001
From: Mike Tunnicliffe <mike.tunnicliffe@filament.uk.com>
Date: Mon, 2 Dec 2024 19:11:22 +0000
Subject: [PATCH 2/3] Add test for partitioned super stream consumer

---
 test/e2e/superstream_consumer.test.ts | 233 ++++++++++++++++----------
 1 file changed, 146 insertions(+), 87 deletions(-)

diff --git a/test/e2e/superstream_consumer.test.ts b/test/e2e/superstream_consumer.test.ts
index 32645e11..ca1d4e6f 100644
--- a/test/e2e/superstream_consumer.test.ts
+++ b/test/e2e/superstream_consumer.test.ts
@@ -1,6 +1,6 @@
 import { expect } from "chai"
 import { Client, Offset } from "../../src"
-import { Message } from "../../src/publisher"
+import { Message, MessageOptions } from "../../src/publisher"
 import { range } from "../../src/util"
 import { createClient, createStreamName } from "../support/fake_data"
 import { Rabbit } from "../support/rabbit"
@@ -17,8 +17,6 @@ describe("super stream consumer", () => {
   beforeEach(async () => {
     client = await createClient(username, password)
     superStreamName = createStreamName()
-    noOfPartitions = await rabbit.createSuperStream(superStreamName)
-    sender = await messageSender(client, superStreamName)
   })
 
   afterEach(async () => {
@@ -30,118 +28,161 @@ describe("super stream consumer", () => {
     } catch (e) {}
   })
 
-  it("querying partitions - return the same number of partitions", async () => {
-    const partitions = await client.queryPartitions({ superStream: superStreamName })
-
-    expect(partitions.length).to.be.equal(noOfPartitions)
-  })
-
-  it("querying partitions - return the name of the streams making up the superstream", async () => {
-    const partitions = await client.queryPartitions({ superStream: superStreamName })
+  describe("random partitioning", () => {
+    beforeEach(async () => {
+      noOfPartitions = await rabbit.createSuperStream(superStreamName)
+      sender = await messageSender(client, superStreamName)
+    })
 
-    expect(range(noOfPartitions).map((i) => `${superStreamName}-${i}`)).to.deep.eq(partitions)
-  })
+    it("querying partitions - return the same number of partitions", async () => {
+      const partitions = await client.queryPartitions({ superStream: superStreamName })
 
-  it("declaring a super stream consumer on an existing super stream - no error is thrown", async () => {
-    await client.declareSuperStreamConsumer({ superStream: superStreamName }, (_message: Message) => {
-      return
+      expect(partitions.length).to.be.equal(noOfPartitions)
     })
-  })
 
-  it("declaring a super stream consumer on an existing super stream - read a message", async () => {
-    await sender(1)
-    const messages: Message[] = []
+    it("querying partitions - return the name of the streams making up the superstream", async () => {
+      const partitions = await client.queryPartitions({ superStream: superStreamName })
 
-    await client.declareSuperStreamConsumer({ superStream: superStreamName }, (message: Message) => {
-      messages.push(message)
+      expect(range(noOfPartitions).map((i) => `${superStreamName}-${i}`)).to.deep.eq(partitions)
     })
 
-    await eventually(() => {
-      expect(messages).to.have.length(1)
-      const [message] = messages
-      expect(message.content.toString()).to.be.eq(`${testMessageContent}-0`)
+    it("declaring a super stream consumer on an existing super stream - no error is thrown", async () => {
+      await client.declareSuperStreamConsumer({ superStream: superStreamName }, (_message: Message) => {
+        return
+      })
     })
-  })
 
-  it("for a consumer the number of connections should be equals to the partitions' number", async () => {
-    await client.declareSuperStreamConsumer({ superStream: superStreamName }, (_) => {
-      return
-    })
+    it("declaring a super stream consumer on an existing super stream - read a message", async () => {
+      await sender(1)
+      const messages: Message[] = []
 
-    await eventually(() => {
-      expect(client.consumerCounts()).to.be.eql(noOfPartitions)
+      await client.declareSuperStreamConsumer({ superStream: superStreamName }, (message: Message) => {
+        messages.push(message)
+      })
+
+      await eventually(() => {
+        expect(messages).to.have.length(1)
+        const [message] = messages
+        expect(message.content.toString()).to.be.eq(`${testMessageContent}-0`)
+      })
     })
-  })
 
-  it("reading multiple messages - each message should be read only once", async () => {
-    const noOfMessages = 20
-    await sender(noOfMessages)
-    const messages: Message[] = []
+    it("for a consumer the number of connections should be equals to the partitions' number", async () => {
+      await client.declareSuperStreamConsumer({ superStream: superStreamName }, (_) => {
+        return
+      })
 
-    await client.declareSuperStreamConsumer({ superStream: superStreamName }, (message: Message) => {
-      messages.push(message)
+      await eventually(() => {
+        expect(client.consumerCounts()).to.be.eql(noOfPartitions)
+      })
     })
 
-    await eventually(() => {
-      expect(messages).to.have.length(noOfMessages)
+    it("reading multiple messages - each message should be read only once", async () => {
+      const noOfMessages = 20
+      await sender(noOfMessages)
+      const messages: Message[] = []
+
+      await client.declareSuperStreamConsumer({ superStream: superStreamName }, (message: Message) => {
+        messages.push(message)
+      })
+
+      await eventually(() => {
+        expect(messages).to.have.length(noOfMessages)
+      })
     })
-  })
 
-  it("multiple composite consumers with same consumerRef - each message should be read only once", async () => {
-    const noOfMessages = 20
-    const messages: Message[] = []
+    it("multiple composite consumers with same consumerRef - each message should be read only once", async () => {
+      const noOfMessages = 20
+      const messages: Message[] = []
 
-    await client.declareSuperStreamConsumer(
-      { superStream: superStreamName, consumerRef: "counting-messages" },
-      (message: Message) => messages.push(message)
-    )
-    await client.declareSuperStreamConsumer(
-      { superStream: superStreamName, consumerRef: "counting-messages" },
-      (message: Message) => messages.push(message)
-    )
+      await client.declareSuperStreamConsumer(
+        { superStream: superStreamName, consumerRef: "counting-messages" },
+        (message: Message) => messages.push(message)
+      )
+      await client.declareSuperStreamConsumer(
+        { superStream: superStreamName, consumerRef: "counting-messages" },
+        (message: Message) => messages.push(message)
+      )
 
-    await sender(noOfMessages)
+      await sender(noOfMessages)
 
-    await eventually(() => {
-      expect(messages).to.have.length(noOfMessages)
+      await eventually(() => {
+        expect(messages).to.have.length(noOfMessages)
+      })
     })
+
+    it("reading multiple messages - get messages only at a specific consuming point timestamp", async () => {
+      const noOfMessages = 20
+      await sender(5)
+      const sleepingTime = 5000
+      await sleep(sleepingTime)
+      await sender(noOfMessages)
+      const messages: Message[] = []
+
+      await client.declareSuperStreamConsumer(
+        {
+          superStream: superStreamName,
+          offset: Offset.timestamp(new Date(Date.now() - (sleepingTime - 1000))),
+        },
+        (message: Message) => {
+          messages.push(message)
+        }
+      )
+
+      await eventually(() => {
+        expect(messages).to.have.length(noOfMessages)
+      })
+    }).timeout(10000)
+
+    it("closing the locator closes all connections", async () => {
+      await client.declareSuperStreamConsumer({ superStream: superStreamName }, (_) => {
+        return
+      })
+
+      await client.close()
+
+      await eventually(async () => {
+        const connections = await rabbit.getConnections()
+        expect(connections).to.have.length(0)
+      }, 5000)
+    }).timeout(5000)
   })
 
-  it("reading multiple messages - get messages only at a specific consuming point timestamp", async () => {
-    const noOfMessages = 20
-    await sender(5)
-    const sleepingTime = 5000
-    await sleep(sleepingTime)
-    await sender(noOfMessages)
-    const messages: Message[] = []
-
-    await client.declareSuperStreamConsumer(
-      {
-        superStream: superStreamName,
-        offset: Offset.timestamp(new Date(Date.now() - (sleepingTime - 1000))),
-      },
-      (message: Message) => {
-        messages.push(message)
+  describe("deterministic partitioning", () => {
+    beforeEach(async () => {
+      noOfPartitions = await rabbit.createSuperStream(superStreamName, 2)
+      sender = await roundRobinSender(client, superStreamName, 2)
+    })
+
+    it("multiple composite consumers with same consumerRef and deterministic partition key - each consumer should read only messages for its partition", async () => {
+      const noOfMessagesPerPartition = 10
+      const messages: Message[][] = [[], []]
+
+      const allPartitionKeysAreTheSame = (messages) => {
+        const partitionKeys = messages.map(m => m.applicationProperties['partition-key'])
+        return partitionKeys.every(k => k === partitionKeys[0])
       }
-    )
 
-    await eventually(() => {
-      expect(messages).to.have.length(noOfMessages)
-    })
-  }).timeout(10000)
+      await client.declareSuperStreamConsumer(
+        { superStream: superStreamName, consumerRef: "message-partitioning" },
+        (message: Message) => messages[0].push(message)
+      )
+      await client.declareSuperStreamConsumer(
+        { superStream: superStreamName, consumerRef: "message-partitioning" },
+        (message: Message) => messages[1].push(message)
+      )
 
-  it("closing the locator closes all connections", async () => {
-    await client.declareSuperStreamConsumer({ superStream: superStreamName }, (_) => {
-      return
-    })
+      await sender(noOfMessagesPerPartition)
 
-    await client.close()
+      await eventually(() => {
+        expect(messages[0]).to.have.length(noOfMessagesPerPartition)
+        expect(allPartitionKeysAreTheSame(messages[0]))
 
-    await eventually(async () => {
-      const connections = await rabbit.getConnections()
-      expect(connections).to.have.length(0)
-    }, 5000)
-  }).timeout(5000)
+        expect(messages[1]).to.have.length(noOfMessagesPerPartition)
+        expect(allPartitionKeysAreTheSame(messages[1]))
+      })
+    })
+  })
 })
 
 const testMessageContent = "test message"
@@ -158,6 +199,24 @@ const messageSender = async (client: Client, superStreamName: string) => {
   return sendMessages
 }
 
+const roundRobinSender = async (client: Client, superStreamName: string, partitions: number) => {
+  const routingKeyExtractor = (_content: string, msgOptions: MessageOptions) =>
+    msgOptions.applicationProperties?.["partition-key"]?.toString()
+  const publisher = await client.declareSuperStreamPublisher({ superStream: superStreamName }, routingKeyExtractor)
+
+  const sendMessages = async (noOfMessagesPerPartition: number) => {
+    for (let i = 0; i < noOfMessagesPerPartition; i++) {
+      for (let p = 0; p < partitions; p++) {
+        await publisher.send(Buffer.from(`${testMessageContent}-${i * partitions + p}`), {
+          applicationProperties: { "partition-key": `${p}` },
+        })
+      }
+    }
+  }
+
+  return sendMessages
+}
+
 const sleep = (ms: number) => {
   return new Promise((res) => {
     setTimeout(() => {

From 8cb38d487b5f69df7c98aa2df4661a23d08c339c Mon Sep 17 00:00:00 2001
From: Mike Tunnicliffe <mike.tunnicliffe@filament.uk.com>
Date: Tue, 7 Jan 2025 15:44:56 +0000
Subject: [PATCH 3/3] Fix typescript compiler and lint errors

---
 test/e2e/superstream_consumer.test.ts | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/test/e2e/superstream_consumer.test.ts b/test/e2e/superstream_consumer.test.ts
index ca1d4e6f..5d196123 100644
--- a/test/e2e/superstream_consumer.test.ts
+++ b/test/e2e/superstream_consumer.test.ts
@@ -158,9 +158,9 @@ describe("super stream consumer", () => {
       const noOfMessagesPerPartition = 10
       const messages: Message[][] = [[], []]
 
-      const allPartitionKeysAreTheSame = (messages) => {
-        const partitionKeys = messages.map(m => m.applicationProperties['partition-key'])
-        return partitionKeys.every(k => k === partitionKeys[0])
+      const allPartitionKeysAreTheSame = (messagesToCheck: Message[]) => {
+        const partitionKeys = messagesToCheck.map((m) => m.applicationProperties?.["partition-key"])
+        return partitionKeys.every((k) => k === partitionKeys[0])
       }
 
       await client.declareSuperStreamConsumer(