Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

217 fix super stream consumer property #218

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,11 @@ export class Client {
return res.ok
}

public async declareConsumer(params: DeclareConsumerParams, handle: ConsumerFunc): Promise<Consumer> {
public async declareConsumer(
params: DeclareConsumerParams,
handle: ConsumerFunc,
superStreamConsumer?: SuperStreamConsumer
tarzacodes marked this conversation as resolved.
Show resolved Hide resolved
): Promise<Consumer> {
const connection = await this.getConnection(params.stream, "consumer", params.connectionClosedListener)
const consumerId = connection.getNextConsumerId()

Expand All @@ -211,7 +215,7 @@ export class Client {
await this.closeConsumer(consumer.extendedId)
})
this.consumers.set(consumer.extendedId, { connection, consumer, params })
await this.declareConsumerOnConnection(params, consumerId, connection)
await this.declareConsumerOnConnection(params, consumerId, connection, superStreamConsumer?.superStream)
this.logger.info(
`New consumer created with stream name ${params.stream}, consumer id ${consumerId} and offset ${params.offset.type}`
)
Expand Down Expand Up @@ -245,6 +249,7 @@ export class Client {
): Promise<SuperStreamConsumer> {
const partitions = await this.queryPartitions({ superStream })
return SuperStreamConsumer.create(handle, {
superStream,
locator: this,
consumerRef: consumerRef || `${superStream}-${randomUUID()}`,
offset: offset || Offset.first(),
Expand Down Expand Up @@ -468,7 +473,12 @@ export class Client {
}
}

private async declareConsumerOnConnection(params: DeclareConsumerParams, consumerId: number, connection: Connection) {
private async declareConsumerOnConnection(
params: DeclareConsumerParams,
consumerId: number,
connection: Connection,
superStream?: string
) {
const properties: Record<string, string> = {}
if (params.singleActive && !params.consumerRef) {
throw new Error("consumerRef is mandatory when declaring a single active consumer")
Expand All @@ -477,6 +487,9 @@ export class Client {
properties["single-active-consumer"] = "true"
properties["name"] = params.consumerRef!
}
if (superStream) {
properties["super-stream"] = superStream
}
if (params.filter) {
for (let i = 0; i < params.filter.values.length; i++) {
properties[`filter.${i}`] = params.filter.values[i]
Expand Down
7 changes: 6 additions & 1 deletion src/super_stream_consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,22 @@ import { Offset } from "./requests/subscribe_request"
export class SuperStreamConsumer {
private consumers: Map<string, Consumer> = new Map<string, Consumer>()
public consumerRef: string
readonly superStream: string
private locator: Client
private partitions: string[]
private offset: Offset

private constructor(
readonly handle: ConsumerFunc,
params: {
superStream: string
locator: Client
partitions: string[]
consumerRef: string
offset: Offset
}
) {
this.superStream = params.superStream
this.consumerRef = params.consumerRef
this.locator = params.locator
this.partitions = params.partitions
Expand All @@ -29,7 +32,8 @@ export class SuperStreamConsumer {
this.partitions.map(async (p) => {
const partitionConsumer = await this.locator.declareConsumer(
{ stream: p, consumerRef: this.consumerRef, offset: this.offset, singleActive: true },
this.handle
this.handle,
this
tarzacodes marked this conversation as resolved.
Show resolved Hide resolved
)
this.consumers.set(p, partitionConsumer)
return
Expand All @@ -40,6 +44,7 @@ export class SuperStreamConsumer {
static async create(
handle: ConsumerFunc,
params: {
superStream: string
locator: Client
partitions: string[]
consumerRef: string
Expand Down
233 changes: 146 additions & 87 deletions test/e2e/superstream_consumer.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { expect } from "chai"
import { Client, Offset } from "../../src"
import { Message } from "../../src/publisher"
import { Message, MessageOptions } from "../../src/publisher"
import { range } from "../../src/util"
import { createClient, createStreamName } from "../support/fake_data"
import { Rabbit } from "../support/rabbit"
Expand All @@ -17,8 +17,6 @@ describe("super stream consumer", () => {
beforeEach(async () => {
client = await createClient(username, password)
superStreamName = createStreamName()
noOfPartitions = await rabbit.createSuperStream(superStreamName)
sender = await messageSender(client, superStreamName)
})

afterEach(async () => {
Expand All @@ -30,118 +28,161 @@ describe("super stream consumer", () => {
} catch (e) {}
})

it("querying partitions - return the same number of partitions", async () => {
const partitions = await client.queryPartitions({ superStream: superStreamName })

expect(partitions.length).to.be.equal(noOfPartitions)
})

it("querying partitions - return the name of the streams making up the superstream", async () => {
const partitions = await client.queryPartitions({ superStream: superStreamName })
describe("random partitioning", () => {
beforeEach(async () => {
noOfPartitions = await rabbit.createSuperStream(superStreamName)
sender = await messageSender(client, superStreamName)
})

expect(range(noOfPartitions).map((i) => `${superStreamName}-${i}`)).to.deep.eq(partitions)
})
it("querying partitions - return the same number of partitions", async () => {
const partitions = await client.queryPartitions({ superStream: superStreamName })

it("declaring a super stream consumer on an existing super stream - no error is thrown", async () => {
await client.declareSuperStreamConsumer({ superStream: superStreamName }, (_message: Message) => {
return
expect(partitions.length).to.be.equal(noOfPartitions)
})
})

it("declaring a super stream consumer on an existing super stream - read a message", async () => {
await sender(1)
const messages: Message[] = []
it("querying partitions - return the name of the streams making up the superstream", async () => {
const partitions = await client.queryPartitions({ superStream: superStreamName })

await client.declareSuperStreamConsumer({ superStream: superStreamName }, (message: Message) => {
messages.push(message)
expect(range(noOfPartitions).map((i) => `${superStreamName}-${i}`)).to.deep.eq(partitions)
})

await eventually(() => {
expect(messages).to.have.length(1)
const [message] = messages
expect(message.content.toString()).to.be.eq(`${testMessageContent}-0`)
it("declaring a super stream consumer on an existing super stream - no error is thrown", async () => {
await client.declareSuperStreamConsumer({ superStream: superStreamName }, (_message: Message) => {
return
})
})
})

it("for a consumer the number of connections should be equals to the partitions' number", async () => {
await client.declareSuperStreamConsumer({ superStream: superStreamName }, (_) => {
return
})
it("declaring a super stream consumer on an existing super stream - read a message", async () => {
await sender(1)
const messages: Message[] = []

await eventually(() => {
expect(client.consumerCounts()).to.be.eql(noOfPartitions)
await client.declareSuperStreamConsumer({ superStream: superStreamName }, (message: Message) => {
messages.push(message)
})

await eventually(() => {
expect(messages).to.have.length(1)
const [message] = messages
expect(message.content.toString()).to.be.eq(`${testMessageContent}-0`)
})
})
})

it("reading multiple messages - each message should be read only once", async () => {
const noOfMessages = 20
await sender(noOfMessages)
const messages: Message[] = []
it("for a consumer the number of connections should be equals to the partitions' number", async () => {
await client.declareSuperStreamConsumer({ superStream: superStreamName }, (_) => {
return
})

await client.declareSuperStreamConsumer({ superStream: superStreamName }, (message: Message) => {
messages.push(message)
await eventually(() => {
expect(client.consumerCounts()).to.be.eql(noOfPartitions)
})
})

await eventually(() => {
expect(messages).to.have.length(noOfMessages)
it("reading multiple messages - each message should be read only once", async () => {
const noOfMessages = 20
await sender(noOfMessages)
const messages: Message[] = []

await client.declareSuperStreamConsumer({ superStream: superStreamName }, (message: Message) => {
messages.push(message)
})

await eventually(() => {
expect(messages).to.have.length(noOfMessages)
})
})
})

it("multiple composite consumers with same consumerRef - each message should be read only once", async () => {
const noOfMessages = 20
const messages: Message[] = []
it("multiple composite consumers with same consumerRef - each message should be read only once", async () => {
const noOfMessages = 20
const messages: Message[] = []

await client.declareSuperStreamConsumer(
{ superStream: superStreamName, consumerRef: "counting-messages" },
(message: Message) => messages.push(message)
)
await client.declareSuperStreamConsumer(
{ superStream: superStreamName, consumerRef: "counting-messages" },
(message: Message) => messages.push(message)
)
await client.declareSuperStreamConsumer(
{ superStream: superStreamName, consumerRef: "counting-messages" },
(message: Message) => messages.push(message)
)
await client.declareSuperStreamConsumer(
{ superStream: superStreamName, consumerRef: "counting-messages" },
(message: Message) => messages.push(message)
)

await sender(noOfMessages)
await sender(noOfMessages)

await eventually(() => {
expect(messages).to.have.length(noOfMessages)
await eventually(() => {
expect(messages).to.have.length(noOfMessages)
})
})

it("reading multiple messages - get messages only at a specific consuming point timestamp", async () => {
const noOfMessages = 20
await sender(5)
const sleepingTime = 5000
await sleep(sleepingTime)
await sender(noOfMessages)
const messages: Message[] = []

await client.declareSuperStreamConsumer(
{
superStream: superStreamName,
offset: Offset.timestamp(new Date(Date.now() - (sleepingTime - 1000))),
},
(message: Message) => {
messages.push(message)
}
)

await eventually(() => {
expect(messages).to.have.length(noOfMessages)
})
}).timeout(10000)

it("closing the locator closes all connections", async () => {
await client.declareSuperStreamConsumer({ superStream: superStreamName }, (_) => {
return
})

await client.close()

await eventually(async () => {
const connections = await rabbit.getConnections()
expect(connections).to.have.length(0)
}, 5000)
}).timeout(5000)
})

it("reading multiple messages - get messages only at a specific consuming point timestamp", async () => {
const noOfMessages = 20
await sender(5)
const sleepingTime = 5000
await sleep(sleepingTime)
await sender(noOfMessages)
const messages: Message[] = []

await client.declareSuperStreamConsumer(
{
superStream: superStreamName,
offset: Offset.timestamp(new Date(Date.now() - (sleepingTime - 1000))),
},
(message: Message) => {
messages.push(message)
describe("deterministic partitioning", () => {
beforeEach(async () => {
noOfPartitions = await rabbit.createSuperStream(superStreamName, 2)
sender = await roundRobinSender(client, superStreamName, 2)
})

it("multiple composite consumers with same consumerRef and deterministic partition key - each consumer should read only messages for its partition", async () => {
const noOfMessagesPerPartition = 10
const messages: Message[][] = [[], []]

const allPartitionKeysAreTheSame = (messagesToCheck: Message[]) => {
const partitionKeys = messagesToCheck.map((m) => m.applicationProperties?.["partition-key"])
return partitionKeys.every((k) => k === partitionKeys[0])
}
)

await eventually(() => {
expect(messages).to.have.length(noOfMessages)
})
}).timeout(10000)
await client.declareSuperStreamConsumer(
{ superStream: superStreamName, consumerRef: "message-partitioning" },
(message: Message) => messages[0].push(message)
)
await client.declareSuperStreamConsumer(
{ superStream: superStreamName, consumerRef: "message-partitioning" },
(message: Message) => messages[1].push(message)
)

it("closing the locator closes all connections", async () => {
await client.declareSuperStreamConsumer({ superStream: superStreamName }, (_) => {
return
})
await sender(noOfMessagesPerPartition)

await client.close()
await eventually(() => {
expect(messages[0]).to.have.length(noOfMessagesPerPartition)
expect(allPartitionKeysAreTheSame(messages[0]))

await eventually(async () => {
const connections = await rabbit.getConnections()
expect(connections).to.have.length(0)
}, 5000)
}).timeout(5000)
expect(messages[1]).to.have.length(noOfMessagesPerPartition)
expect(allPartitionKeysAreTheSame(messages[1]))
})
})
})
})

const testMessageContent = "test message"
Expand All @@ -158,6 +199,24 @@ const messageSender = async (client: Client, superStreamName: string) => {
return sendMessages
}

const roundRobinSender = async (client: Client, superStreamName: string, partitions: number) => {
const routingKeyExtractor = (_content: string, msgOptions: MessageOptions) =>
msgOptions.applicationProperties?.["partition-key"]?.toString()
const publisher = await client.declareSuperStreamPublisher({ superStream: superStreamName }, routingKeyExtractor)

const sendMessages = async (noOfMessagesPerPartition: number) => {
for (let i = 0; i < noOfMessagesPerPartition; i++) {
for (let p = 0; p < partitions; p++) {
await publisher.send(Buffer.from(`${testMessageContent}-${i * partitions + p}`), {
applicationProperties: { "partition-key": `${p}` },
})
}
}
}

return sendMessages
}

const sleep = (ms: number) => {
return new Promise((res) => {
setTimeout(() => {
Expand Down
Loading