diff --git a/test/e2e/sub_entry_consume.test.ts b/test/e2e/sub_entry_consume.test.ts index 7dc628ac..949db788 100644 --- a/test/e2e/sub_entry_consume.test.ts +++ b/test/e2e/sub_entry_consume.test.ts @@ -7,6 +7,7 @@ import { createConnection, createPublisher, createStreamName } from "../support/ import { Rabbit } from "../support/rabbit" import { eventually, password, username } from "../support/util" import { range } from "../../src/util" +import { CompressionType } from "../../src/compression" describe("consume a batch of messages", () => { const rabbit = new Rabbit(username, password) @@ -32,7 +33,11 @@ describe("consume a batch of messages", () => { } catch (e) {} }) - it("consuming a batch of messages without compression should not raise error", async () => { + it("consuming a batch of messages without compression - should not raise error", async () => { + const receivedMessages = [] + consumer = await connection.declareConsumer({ stream: streamName, offset: Offset.first() }, (m: Message) => + receivedMessages.push(m) + ) const messages = [ { content: Buffer.from("Ciao") }, { content: Buffer.from("Ciao1") }, @@ -44,7 +49,7 @@ describe("consume a batch of messages", () => { await publisher.sendSubEntries(messages) }).timeout(10000) - it("consume a batch of messages - receive the same number of messages", async () => { + it("consume a batch of messages without compression - receive the same number of messages", async () => { const receivedMessages = [] consumer = await connection.declareConsumer({ stream: streamName, offset: Offset.first() }, (m: Message) => receivedMessages.push(m) @@ -64,7 +69,7 @@ describe("consume a batch of messages", () => { }, 10000) }).timeout(10000) - it("consume a batch of messages - each received message contains the one that was sent", async () => { + it("consume a batch of messages without compression - each received message contains the one that was sent", async () => { const receivedMessages: Message[] = [] consumer = await connection.declareConsumer({ stream: streamName, offset: Offset.first() }, (m: Message) => receivedMessages.push(m) @@ -79,4 +84,56 @@ describe("consume a batch of messages", () => { expect(messageContents).to.eql(receivedContent) }, 10000) }).timeout(10000) + + it("consuming a batch of messages with compression should not raise error", async () => { + const receivedMessages = [] + consumer = await connection.declareConsumer({ stream: streamName, offset: Offset.first() }, (m: Message) => + receivedMessages.push(m) + ) + const messages = [ + { content: Buffer.from("Ciao") }, + { content: Buffer.from("Ciao1") }, + { content: Buffer.from("Ciao2") }, + { content: Buffer.from("Ciao3") }, + { content: Buffer.from("Ciao4") }, + ] + + await publisher.sendSubEntries(messages, CompressionType.Gzip) + }).timeout(10000) + + it("consume a batch of messages with compression - receive the same number of messages", async () => { + const receivedMessages = [] + consumer = await connection.declareConsumer({ stream: streamName, offset: Offset.first() }, (m: Message) => + receivedMessages.push(m) + ) + const messages = [ + { content: Buffer.from("Ciao") }, + { content: Buffer.from("Ciao1") }, + { content: Buffer.from("Ciao2") }, + { content: Buffer.from("Ciao3") }, + { content: Buffer.from("Ciao4") }, + ] + + await publisher.sendSubEntries(messages, CompressionType.Gzip) + + await eventually(async () => { + expect(receivedMessages.length).eql(messages.length) + }, 10000) + }).timeout(10000) + + it("consume a batch of messages with compression - each received message contains the one that was sent", async () => { + const receivedMessages: Message[] = [] + consumer = await connection.declareConsumer({ stream: streamName, offset: Offset.first() }, (m: Message) => + receivedMessages.push(m) + ) + const messageContents = range(5).map((_, i) => `Ciao${i}`) + const messages = messageContents.map((m) => ({ content: Buffer.from(m) })) + + await publisher.sendSubEntries(messages, CompressionType.Gzip) + + await eventually(async () => { + const receivedContent = receivedMessages.map((rm) => rm.content.toString("utf-8")) + expect(messageContents).to.eql(receivedContent) + }, 10000) + }).timeout(10000) })