Skip to content

Commit

Permalink
56-ability-to-consume-a-batch-of-messages-with-gzip-compression (#117)
Browse files Browse the repository at this point in the history
* skip erroneous test

* Add consume batch with gzip compression

* Rebase fixes

* Remove unused param

---------

Co-authored-by: Alberto Barrilá <alberto.barrila@gmail.com>
Co-authored-by: magne <magnello@coders51.com>
  • Loading branch information
3 people authored Dec 19, 2023
1 parent 88513e6 commit c08b348
Showing 1 changed file with 60 additions and 3 deletions.
63 changes: 60 additions & 3 deletions test/e2e/sub_entry_consume.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { createConnection, createPublisher, createStreamName } from "../support/
import { Rabbit } from "../support/rabbit"
import { eventually, password, username } from "../support/util"
import { range } from "../../src/util"
import { CompressionType } from "../../src/compression"

describe("consume a batch of messages", () => {
const rabbit = new Rabbit(username, password)
Expand All @@ -32,7 +33,11 @@ describe("consume a batch of messages", () => {
} catch (e) {}
})

it("consuming a batch of messages without compression should not raise error", async () => {
it("consuming a batch of messages without compression - should not raise error", async () => {
const receivedMessages = []
consumer = await connection.declareConsumer({ stream: streamName, offset: Offset.first() }, (m: Message) =>
receivedMessages.push(m)
)
const messages = [
{ content: Buffer.from("Ciao") },
{ content: Buffer.from("Ciao1") },
Expand All @@ -44,7 +49,7 @@ describe("consume a batch of messages", () => {
await publisher.sendSubEntries(messages)
}).timeout(10000)

it("consume a batch of messages - receive the same number of messages", async () => {
it("consume a batch of messages without compression - receive the same number of messages", async () => {
const receivedMessages = []
consumer = await connection.declareConsumer({ stream: streamName, offset: Offset.first() }, (m: Message) =>
receivedMessages.push(m)
Expand All @@ -64,7 +69,7 @@ describe("consume a batch of messages", () => {
}, 10000)
}).timeout(10000)

it("consume a batch of messages - each received message contains the one that was sent", async () => {
it("consume a batch of messages without compression - each received message contains the one that was sent", async () => {
const receivedMessages: Message[] = []
consumer = await connection.declareConsumer({ stream: streamName, offset: Offset.first() }, (m: Message) =>
receivedMessages.push(m)
Expand All @@ -79,4 +84,56 @@ describe("consume a batch of messages", () => {
expect(messageContents).to.eql(receivedContent)
}, 10000)
}).timeout(10000)

it("consuming a batch of messages with compression should not raise error", async () => {
const receivedMessages = []
consumer = await connection.declareConsumer({ stream: streamName, offset: Offset.first() }, (m: Message) =>
receivedMessages.push(m)
)
const messages = [
{ content: Buffer.from("Ciao") },
{ content: Buffer.from("Ciao1") },
{ content: Buffer.from("Ciao2") },
{ content: Buffer.from("Ciao3") },
{ content: Buffer.from("Ciao4") },
]

await publisher.sendSubEntries(messages, CompressionType.Gzip)
}).timeout(10000)

it("consume a batch of messages with compression - receive the same number of messages", async () => {
const receivedMessages = []
consumer = await connection.declareConsumer({ stream: streamName, offset: Offset.first() }, (m: Message) =>
receivedMessages.push(m)
)
const messages = [
{ content: Buffer.from("Ciao") },
{ content: Buffer.from("Ciao1") },
{ content: Buffer.from("Ciao2") },
{ content: Buffer.from("Ciao3") },
{ content: Buffer.from("Ciao4") },
]

await publisher.sendSubEntries(messages, CompressionType.Gzip)

await eventually(async () => {
expect(receivedMessages.length).eql(messages.length)
}, 10000)
}).timeout(10000)

it("consume a batch of messages with compression - each received message contains the one that was sent", async () => {
const receivedMessages: Message[] = []
consumer = await connection.declareConsumer({ stream: streamName, offset: Offset.first() }, (m: Message) =>
receivedMessages.push(m)
)
const messageContents = range(5).map((_, i) => `Ciao${i}`)
const messages = messageContents.map((m) => ({ content: Buffer.from(m) }))

await publisher.sendSubEntries(messages, CompressionType.Gzip)

await eventually(async () => {
const receivedContent = receivedMessages.map((rm) => rm.content.toString("utf-8"))
expect(messageContents).to.eql(receivedContent)
}, 10000)
}).timeout(10000)
})

0 comments on commit c08b348

Please sign in to comment.