Skip to content

Commit

Permalink
Rebase fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
magne committed Dec 6, 2023
1 parent fb67fdc commit 42a403c
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 13 deletions.
31 changes: 23 additions & 8 deletions src/response_decoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import { StoreOffsetResponse } from "./responses/store_offset_response"
import { QueryOffsetResponse } from "./responses/query_offset_response"
import { Annotations } from "./amqp10/messageAnnotations"
import { Header } from "./amqp10/messageHeader"
import { CompressionType, NoneCompression } from "./compression"

// Frame => Size (Request | Response | Command)
// Size => uint32 (size without the 4 bytes of the size element)
Expand Down Expand Up @@ -193,7 +194,8 @@ function decodeDeliverResponse(dataResponse: DataReader, logger: Logger): Delive
messages.push(decodeMessage(dataResponse, chunkFirstOffset + BigInt(i)))
}
} else {
messages.push(...decodeSubEntries(dataResponse, logger))
const compressionType = (messageType & 0x70) >> 4
messages.push(...decodeSubEntries(dataResponse, compressionType, logger))
}

return { subscriptionId, messages }
Expand Down Expand Up @@ -240,17 +242,21 @@ function decodeMessage(dataResponse: DataReader, offset: bigint): Message {
return { content, messageProperties, messageHeader, applicationProperties, amqpValue, messageAnnotations, offset }
}

function decodeSubEntries(dataResponse: DataReader, logger: Logger): Message[] {
const retVal: Message[] = []
function decodeSubEntries(dataResponse: DataReader, compressionType: number, logger: Logger): Message[] {
const decodedMessages: Message[] = []
const compression = createCompression(compressionType)
const noOfRecords = dataResponse.readUInt16()
const uncompressedLength = dataResponse.readUInt32()
const length = dataResponse.readUInt32()
logger.debug(`Decoding sub entries, uncompressed length is ${uncompressedLength} while actual length is ${length}`)
const compressedLength = dataResponse.readUInt32()
const decompressedData = new BufferDataReader(compression.decompress(dataResponse.readBufferOf(compressedLength)))
logger.debug(
`Decoding sub entries, uncompressed length is ${uncompressedLength} while actual length is ${compressedLength}`
)
for (let i = 0; i < noOfRecords; i++) {
const entry: Message = decodeMessage(dataResponse, BigInt(i))
retVal.push(entry)
const entry: Message = decodeMessage(decompressedData, BigInt(i))
decodedMessages.push(entry)
}
return retVal
return decodedMessages
}

function decodeApplicationProperties(dataResponse: DataReader) {
Expand Down Expand Up @@ -407,6 +413,15 @@ export function decodeFormatCode(dataResponse: DataReader, formatCode: number, s
}
}

function createCompression(compressionType: number) {
switch (compressionType) {
case CompressionType.None:
return NoneCompression.create()
default:
throw new Error(`invalid compressionType or compression not yet implemented %#02x: ${compressionType}`)
}
}

export class BufferDataReader implements DataReader {
private offset = 0

Expand Down
7 changes: 3 additions & 4 deletions test/e2e/sub_entry_consume.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { expect } from "chai"
import { Connection } from "../../src"
import { NoneCompression } from "../../src/compression"
import { Consumer } from "../../src/consumer"
import { Message, Producer } from "../../src/producer"
import { Offset } from "../../src/requests/subscribe_request"
Expand Down Expand Up @@ -42,7 +41,7 @@ describe("consume a batch of messages", () => {
{ content: Buffer.from("Ciao4") },
]

await publisher.sendSubEntries(messages, NoneCompression.create())
await publisher.sendSubEntries(messages)
}).timeout(10000)

it("consume a batch of messages - receive the same number of messages", async () => {
Expand All @@ -58,7 +57,7 @@ describe("consume a batch of messages", () => {
{ content: Buffer.from("Ciao4") },
]

await publisher.sendSubEntries(messages, NoneCompression.create())
await publisher.sendSubEntries(messages)

await eventually(async () => {
expect(receivedMessages.length).eql(messages.length)
Expand All @@ -73,7 +72,7 @@ describe("consume a batch of messages", () => {
const messageContents = range(5).map((_, i) => `Ciao${i}`)
const messages = messageContents.map((m) => ({ content: Buffer.from(m) }))

await publisher.sendSubEntries(messages, NoneCompression.create())
await publisher.sendSubEntries(messages)

await eventually(async () => {
const receivedContent = receivedMessages.map((rm) => rm.content.toString("utf-8"))
Expand Down
1 change: 0 additions & 1 deletion test/unit/heartbeat.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { Heartbeat, HeartbeatConnection } from "../../src/heartbeat"
import { NullLogger } from "../../src/logger"
import { Request } from "../../src/requests/request"
import { eventually, wait } from "../support/util"
import { NullLogger } from "../../src/logger"

class ConnectionMock implements HeartbeatConnection {
private sendCount = 0
Expand Down

0 comments on commit 42a403c

Please sign in to comment.