Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bug] creation of stream with arguments #176

Merged
merged 4 commits into from
Feb 1, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 43 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -276,7 +276,49 @@ await client.close()

### Filtering

Work in progress ⚠️
It is possible to tag messages while publishing and filter them on both the broker side and client side

```typescript
const client = await connect({
hostname: "localhost",
port: 5552,
username: "rabbit",
password: "rabbit",
vhost: "/",
})

const publisher = await client.declarePublisher(
{ stream: streamName, publisherRef: `my-publisher-${randomUUID()}` },
(msg) => msg.applicationProperties!["test"].toString() // Tags the message
)
const message1 = "test1"
const message2 = "test2"
const message3 = "test3"
const applicationProperties1 = { test: "A" }
const applicationProperties2 = { test: "B" }

await publisher.send(Buffer.from(message1), { applicationProperties: applicationProperties1 })
await publisher.send(Buffer.from(message2), { applicationProperties: applicationProperties1 })
await publisher.send(Buffer.from(message3), { applicationProperties: applicationProperties2 })

await client.declareConsumer(
{
stream: streamName,
offset: Offset.first(),
// Filter option for the consumer
filter: {
values: ["A", "B"],
postFilterFunc: (msg) => msg.applicationProperties!["test"] === "A",
matchUnfiltered: true,
},
},
(msg) => filteredMsg.push(msg.content.toString("utf-8"))
)

await sleep(2000)

await client.close()
```

## Running Examples

10 changes: 5 additions & 5 deletions src/requests/create_stream_request.ts
Original file line number Diff line number Diff line change
@@ -3,11 +3,11 @@ import { AbstractRequest } from "./abstract_request"
import { DataWriter } from "./data_writer"

export interface CreateStreamArguments {
"x-queue-leader-locator"?: string
"x-max-age"?: string
"x-stream-max-segment-size-bytes"?: number
"x-initial-cluster-size"?: number
"x-max-length-bytes"?: number
"queue-leader-locator"?: "random" | "client-local" | "least-leaders"
"max-age"?: string
"stream-max-segment-size-bytes"?: number
"initial-cluster-size"?: number
"max-length-bytes"?: number
}

export class CreateStreamRequest extends AbstractRequest {
3 changes: 3 additions & 0 deletions test/support/rabbit.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import got from "got"
import { getTestNodesFromEnv } from "./util"
import { range } from "../../src/util"
import { CreateStreamArguments } from "../../src/requests/create_stream_request"

export interface RabbitConnectionResponse {
name: string
@@ -26,6 +27,7 @@ interface MessageInfoResponse {
messages_unacknowledged: number
types: "stream" | "quorum" | "classic"
node: string
arguments?: CreateStreamArguments
}

interface RabbitPublishersResponse {
@@ -96,6 +98,7 @@ export class Rabbit {
responseType: "json",
}
)

return ret.body
}

25 changes: 20 additions & 5 deletions test/unit/create_stream.test.ts
Original file line number Diff line number Diff line change
@@ -9,11 +9,11 @@ describe("Stream", () => {
const rabbit = new Rabbit(username, password)
const streamName = `test-stream-${randomUUID()}`
const payload = {
"x-queue-leader-locator": "test",
"x-max-age": "test",
"x-stream-max-segment-size-bytes": 42,
"x-initial-cluster-size": 42,
"x-max-length-bytes": 42,
"queue-leader-locator": "random" as const,
"max-age": "120s",
"stream-max-segment-size-bytes": 1000,
"initial-cluster-size": 5,
"max-length-bytes": 20000,
}
let client: Client

@@ -43,6 +43,21 @@ describe("Stream", () => {
expect(result.name).to.be.eql(streamName)
})

it("Should create a new Stream with the given arguments", async () => {
const resp = await client.createStream({ stream: streamName, arguments: payload })

expect(resp).to.be.true
const result = await rabbit.getQueueInfo(streamName)
expect(result.arguments).to.be.eql({
"x-queue-type": "stream",
"x-queue-leader-locator": payload["queue-leader-locator"],
"x-max-age": payload["max-age"],
"x-stream-max-segment-size-bytes": payload["stream-max-segment-size-bytes"],
"x-initial-cluster-size": payload["initial-cluster-size"],
"x-max-length-bytes": payload["max-length-bytes"],
})
})

it("Should be idempotent and ignore a duplicate Stream error", async () => {
await client.createStream({ stream: streamName, arguments: payload })
const resp = await client.createStream({ stream: streamName, arguments: payload })
31 changes: 26 additions & 5 deletions test/unit/create_super_stream.test.ts
Original file line number Diff line number Diff line change
@@ -10,11 +10,11 @@ describe("Super Stream", () => {
const rabbit = new Rabbit(username, password)
const streamName = `test-stream-${randomUUID()}`
const payload = {
"x-queue-leader-locator": "test",
"x-max-age": "test",
"x-stream-max-segment-size-bytes": 42,
"x-initial-cluster-size": 42,
"x-max-length-bytes": 42,
"queue-leader-locator": "random" as const,
"max-age": "120s",
"stream-max-segment-size-bytes": 1000,
"initial-cluster-size": 5,
"max-length-bytes": 20000,
}
let client: Client

@@ -47,6 +47,27 @@ describe("Super Stream", () => {
expect(result.map((r) => r.name)).to.have.members(Array.from(Array(3).keys()).map((n) => `${streamName}-${n}`))
})

it("Should create a new Super Stream with 3 partitions by default with the given arguments", async () => {
const resp = await client.createSuperStream({ streamName, arguments: payload })

expect(resp).to.be.true
const result = await rabbit.getSuperStreamQueues("%2F", streamName)
expect(result.map((r) => r.name)).to.have.members(Array.from(Array(3).keys()).map((n) => `${streamName}-${n}`))
await Promise.all(
Array.from(Array(3).keys()).map(async (n) => {
const queue = await rabbit.getQueueInfo(`${streamName}-${n}`)
expect(queue.arguments).to.be.eql({
"x-queue-type": "stream",
"x-queue-leader-locator": payload["queue-leader-locator"],
"x-max-age": payload["max-age"],
"x-stream-max-segment-size-bytes": payload["stream-max-segment-size-bytes"],
"x-initial-cluster-size": payload["initial-cluster-size"],
"x-max-length-bytes": payload["max-length-bytes"],
})
})
)
})

it("Should create a new Super Stream with 2 partitions", async () => {
const resp = await client.createSuperStream({ streamName, arguments: payload }, undefined, 2)