Skip to content

Commit

Permalink
[bugfix] Reading of message annotations (#182)
Browse files Browse the repository at this point in the history
* Fix on reading of message annotations

* Remove unused test

* Fix broken test

---------

Co-authored-by: magne <magnello@coders51.com>
  • Loading branch information
l4mby and magne authored Feb 5, 2024
1 parent 55631e7 commit 4235b92
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 42 deletions.
1 change: 1 addition & 0 deletions src/amqp10/decoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export const FormatCode = {
Map32: 0xd1,
Null: 0x40,
ULong0: 0x44,
Ubyte: 0x50,
SmallUlong: 0x53,
ULong: 0x80,
Uint: 0x70,
Expand Down
48 changes: 40 additions & 8 deletions src/amqp10/encoder.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
import { inspect } from "node:util"
import { isDate } from "node:util/types"
import { Message, MessageApplicationProperties, MessageProperties } from "../publisher"
import {
AmqpByte,
Message,
MessageAnnotations,
MessageAnnotationsValue,
MessageApplicationProperties,
MessageProperties,
} from "../publisher"
import { DataWriter } from "../requests/data_writer"

const FormatCodeType = {
Expand All @@ -23,6 +30,7 @@ const FormatCode = {
Null: 0x40,
SmallUlong: 0x53,
Uint: 0x70,
Ubyte: 0x50,
Int: 0x71,
Timestamp: 0x83,
} as const
Expand All @@ -35,14 +43,14 @@ const PropertySizeDescription =

type MessageApplicationPropertiesList = { key: string; value: string | number }[]

type MessageAnnotationsList = { key: string; value: string | number }[]
type MessageAnnotationsList = { key: string; value: MessageAnnotationsValue }[]

export function amqpEncode(
writer: DataWriter,
{ content, messageProperties, applicationProperties, messageAnnotations }: Message
): void {
writer.writeUInt32(messageSize({ content, messageProperties, applicationProperties, messageAnnotations }))
writeMessageAnnotations(writer, toList(messageAnnotations))
writeMessageAnnotations(writer, toAnnotationsList(messageAnnotations))
writeProperties(writer, messageProperties)
writeApplicationProperties(writer, toList(applicationProperties))
writeContent(writer, content)
Expand All @@ -53,7 +61,7 @@ export function messageSize({ content, messageProperties, applicationProperties,
lengthOfContent(content) +
lengthOfProperties(messageProperties) +
lengthOfApplicationProperties(toList(applicationProperties)) +
lengthOfMessageAnnotations(toList(messageAnnotations))
lengthOfMessageAnnotations(toAnnotationsList(messageAnnotations))
)
}

Expand Down Expand Up @@ -121,7 +129,11 @@ function writeMessageAnnotations(writer: DataWriter, messageAnnotationsList: Mes
.filter((elem) => elem.key)
.forEach((elem) => {
amqpWriteString(writer, elem.key)
typeof elem.value === "string" ? amqpWriteString(writer, elem.value) : amqpWriteIntNumber(writer, elem.value)
if (elem.value instanceof AmqpByte) {
amqpWriteByte(writer, elem.value)
} else {
typeof elem.value === "string" ? amqpWriteString(writer, elem.value) : amqpWriteIntNumber(writer, elem.value)
}
})
}

Expand Down Expand Up @@ -188,8 +200,12 @@ function getPropertySize(properties: MessageProperties): number {
)
}

function getListSize(list: MessageApplicationPropertiesList | MessageAnnotationsList): number {
return list.reduce((sum, elem) => sum + getSizeOf(elem.key) + getSizeOf(elem.value), 0)
function getListSize(list: MessageAnnotationsList): number {
return list.reduce(
(sum: number, elem: { key: string; value: MessageAnnotationsValue }) =>
sum + getSizeOf(elem.key) + getSizeOf(elem.value),
0
)
}

function amqpWriteString(writer: DataWriter, data?: string): void {
Expand Down Expand Up @@ -242,6 +258,11 @@ function amqpWriteIntNumber(writer: DataWriter, data?: number): void {
writer.writeInt32(data)
}

function amqpWriteByte(writer: DataWriter, data: AmqpByte): void {
writer.writeByte(FormatCode.Ubyte)
writer.writeByte(data.byteValue)
}

function amqpWriteBuffer(writer: DataWriter, data?: Buffer): void {
if (!data || !data.length) {
return amqpWriteNull(writer)
Expand Down Expand Up @@ -269,11 +290,15 @@ function amqpWriteDate(writer: DataWriter, date?: Date): void {
writer.writeUInt64(BigInt(date.getTime()))
}

function getSizeOf(value?: string | Date | number | Buffer): number {
function getSizeOf(value?: string | Date | number | Buffer | AmqpByte): number {
if (!value) {
return 1
}

if (value instanceof AmqpByte) {
return 1 + 1
}

if (typeof value === "string") {
const count = Buffer.from(value).length
return count <= 255 ? 1 + 1 + count : 1 + 4 + count
Expand All @@ -300,3 +325,10 @@ function toList(applicationProperties?: MessageApplicationProperties): MessageAp
return { key: elem[0], value: elem[1] }
})
}

function toAnnotationsList(annotations?: MessageAnnotations): MessageAnnotationsList {
if (!annotations) return []
return Object.entries(annotations).map((elem) => {
return { key: elem[0], value: elem[1] }
})
}
21 changes: 19 additions & 2 deletions src/publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,24 @@ import { PublishRequestV2 } from "./requests/publish_request_v2"

export type MessageApplicationProperties = Record<string, string | number>

export type MessageAnnotations = Record<string, string | number>
export type MessageAnnotations = Record<string, MessageAnnotationsValue>

export type MessageAnnotationsValue = string | number | AmqpByte

export class AmqpByte {
private value: number

constructor(value: number) {
if (value > 255 || value < 0) {
throw new Error("Invalid byte, value must be between 0 and 255")
}
this.value = value
}

public get byteValue() {
return this.value
}
}

export interface MessageProperties {
contentType?: string
Expand Down Expand Up @@ -54,7 +71,7 @@ export interface Message {
export interface MessageOptions {
messageProperties?: MessageProperties
applicationProperties?: Record<string, string | number>
messageAnnotations?: Record<string, string | number>
messageAnnotations?: Record<string, MessageAnnotationsValue>
}

export interface Publisher {
Expand Down
3 changes: 3 additions & 0 deletions src/response_decoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,9 @@ export function decodeFormatCode(dataResponse: DataReader, formatCode: number, s
return dataResponse.readUInt32()
case FormatCode.SmallUlong:
return dataResponse.readInt8() // Read a SmallUlong
case FormatCode.Ubyte:
dataResponse.forward(1)
return dataResponse.readUInt8()
case FormatCode.ULong:
return dataResponse.readUInt64() // Read an ULong
case FormatCode.List0:
Expand Down
55 changes: 23 additions & 32 deletions test/e2e/declare_consumer.test.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
import { expect } from "chai"
import { randomUUID } from "crypto"
import { readFileSync } from "fs"
import path from "path"
import { Client, Publisher } from "../../src"
import {
AmqpByte,
Message,
MessageAnnotations,
MessageApplicationProperties,
MessageProperties,
MessageHeader,
MessageProperties,
} from "../../src/publisher"
import { Offset } from "../../src/requests/subscribe_request"
import { BufferDataReader } from "../../src/response_decoder"
import { getMaxSharedConnectionInstances, range } from "../../src/util"
import {
createClient,
createConsumer,
Expand All @@ -16,20 +22,14 @@ import {
createStreamName,
} from "../support/fake_data"
import { Rabbit, RabbitConnectionResponse } from "../support/rabbit"
import { getMaxSharedConnectionInstances, range } from "../../src/util"
import { BufferDataReader } from "../../src/response_decoder"
import {
decodeMessageTesting,
eventually,
expectToThrowAsync,
username,
password,
createClassicPublisher,
decodeMessageTesting,
getTestNodesFromEnv,
password,
username,
} from "../support/util"
import { readFileSync } from "fs"
import path from "path"
import { randomUUID } from "crypto"

describe("declare consumer", () => {
let streamName: string
Expand Down Expand Up @@ -241,32 +241,23 @@ describe("declare consumer", () => {
await eventually(async () => expect(messageAnnotations).eql([annotations]))
}).timeout(10000)

it("messageAnnotations are ignored by a classic driver", async () => {
it("messageAnnotations with bytes are read correctly", async () => {
const messageAnnotations: MessageAnnotations[] = []
const annotations = createAnnotations()
const classicPublisher = await createClassicPublisher()
await classicPublisher.ch.assertQueue("testQ", {
exclusive: false,
durable: true,
autoDelete: false,
arguments: {
"x-queue-type": "stream", // Mandatory to define stream queue
},
})
classicPublisher.ch.sendToQueue("testQ", Buffer.from("Hello"), {
headers: {
messageAnnotations: annotations,
},
})
const annotations = { test: new AmqpByte(123) }
await rabbit.createStream("testQ")
await client.declareConsumer(
{ stream: "testQ", offset: Offset.next(), consumerRef: "test" },
(message: Message) => {
messageAnnotations.push(message.messageAnnotations ?? {})
}
)

await client.declareConsumer({ stream: "testQ", offset: Offset.first() }, (message: Message) => {
messageAnnotations.push(message.messageAnnotations || {})
})
const testP = await client.declarePublisher({ stream: "testQ" })
await testP.send(Buffer.from("Hello"), { messageAnnotations: annotations })

await eventually(async () => {
expect(messageAnnotations).not.eql([annotations])
await classicPublisher.ch.close()
await classicPublisher.conn.close()
const [messageAnnotation] = messageAnnotations
expect(messageAnnotation).to.eql({ test: 123 })
})
}).timeout(10000)

Expand Down

0 comments on commit 4235b92

Please sign in to comment.