From c08b34889be9054879757173a2c7005f5665a7bf Mon Sep 17 00:00:00 2001
From: l4mby <38959260+l4mby@users.noreply.github.com>
Date: Tue, 19 Dec 2023 14:39:25 +0100
Subject: [PATCH] 
 56-ability-to-consume-a-batch-of-messages-with-gzip-compression (#117)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

* skip erroneous test

* Add consume batch with gzip compression

* Rebase fixes

* Remove unused param

---------

Co-authored-by: Alberto Barrilá <alberto.barrila@gmail.com>
Co-authored-by: magne <magnello@coders51.com>
---
 test/e2e/sub_entry_consume.test.ts | 63 ++++++++++++++++++++++++++++--
 1 file changed, 60 insertions(+), 3 deletions(-)

diff --git a/test/e2e/sub_entry_consume.test.ts b/test/e2e/sub_entry_consume.test.ts
index 7dc628ac..949db788 100644
--- a/test/e2e/sub_entry_consume.test.ts
+++ b/test/e2e/sub_entry_consume.test.ts
@@ -7,6 +7,7 @@ import { createConnection, createPublisher, createStreamName } from "../support/
 import { Rabbit } from "../support/rabbit"
 import { eventually, password, username } from "../support/util"
 import { range } from "../../src/util"
+import { CompressionType } from "../../src/compression"
 
 describe("consume a batch of messages", () => {
   const rabbit = new Rabbit(username, password)
@@ -32,7 +33,11 @@ describe("consume a batch of messages", () => {
     } catch (e) {}
   })
 
-  it("consuming a batch of messages without compression should not raise error", async () => {
+  it("consuming a batch of messages without compression - should not raise error", async () => {
+    const receivedMessages = []
+    consumer = await connection.declareConsumer({ stream: streamName, offset: Offset.first() }, (m: Message) =>
+      receivedMessages.push(m)
+    )
     const messages = [
       { content: Buffer.from("Ciao") },
       { content: Buffer.from("Ciao1") },
@@ -44,7 +49,7 @@ describe("consume a batch of messages", () => {
     await publisher.sendSubEntries(messages)
   }).timeout(10000)
 
-  it("consume a batch of messages - receive the same number of messages", async () => {
+  it("consume a batch of messages without compression - receive the same number of messages", async () => {
     const receivedMessages = []
     consumer = await connection.declareConsumer({ stream: streamName, offset: Offset.first() }, (m: Message) =>
       receivedMessages.push(m)
@@ -64,7 +69,7 @@ describe("consume a batch of messages", () => {
     }, 10000)
   }).timeout(10000)
 
-  it("consume a batch of messages - each received message contains the one that was sent", async () => {
+  it("consume a batch of messages without compression - each received message contains the one that was sent", async () => {
     const receivedMessages: Message[] = []
     consumer = await connection.declareConsumer({ stream: streamName, offset: Offset.first() }, (m: Message) =>
       receivedMessages.push(m)
@@ -79,4 +84,56 @@ describe("consume a batch of messages", () => {
       expect(messageContents).to.eql(receivedContent)
     }, 10000)
   }).timeout(10000)
+
+  it("consuming a batch of messages with compression should not raise error", async () => {
+    const receivedMessages = []
+    consumer = await connection.declareConsumer({ stream: streamName, offset: Offset.first() }, (m: Message) =>
+      receivedMessages.push(m)
+    )
+    const messages = [
+      { content: Buffer.from("Ciao") },
+      { content: Buffer.from("Ciao1") },
+      { content: Buffer.from("Ciao2") },
+      { content: Buffer.from("Ciao3") },
+      { content: Buffer.from("Ciao4") },
+    ]
+
+    await publisher.sendSubEntries(messages, CompressionType.Gzip)
+  }).timeout(10000)
+
+  it("consume a batch of messages with compression - receive the same number of messages", async () => {
+    const receivedMessages = []
+    consumer = await connection.declareConsumer({ stream: streamName, offset: Offset.first() }, (m: Message) =>
+      receivedMessages.push(m)
+    )
+    const messages = [
+      { content: Buffer.from("Ciao") },
+      { content: Buffer.from("Ciao1") },
+      { content: Buffer.from("Ciao2") },
+      { content: Buffer.from("Ciao3") },
+      { content: Buffer.from("Ciao4") },
+    ]
+
+    await publisher.sendSubEntries(messages, CompressionType.Gzip)
+
+    await eventually(async () => {
+      expect(receivedMessages.length).eql(messages.length)
+    }, 10000)
+  }).timeout(10000)
+
+  it("consume a batch of messages with compression - each received message contains the one that was sent", async () => {
+    const receivedMessages: Message[] = []
+    consumer = await connection.declareConsumer({ stream: streamName, offset: Offset.first() }, (m: Message) =>
+      receivedMessages.push(m)
+    )
+    const messageContents = range(5).map((_, i) => `Ciao${i}`)
+    const messages = messageContents.map((m) => ({ content: Buffer.from(m) }))
+
+    await publisher.sendSubEntries(messages, CompressionType.Gzip)
+
+    await eventually(async () => {
+      const receivedContent = receivedMessages.map((rm) => rm.content.toString("utf-8"))
+      expect(messageContents).to.eql(receivedContent)
+    }, 10000)
+  }).timeout(10000)
 })