diff --git a/src/amqp10/decoder.ts b/src/amqp10/decoder.ts index 0a7fb84d..3e19c31a 100644 --- a/src/amqp10/decoder.ts +++ b/src/amqp10/decoder.ts @@ -23,6 +23,7 @@ export const FormatCode = { Map32: 0xd1, Null: 0x40, ULong0: 0x44, + Ubyte: 0x50, SmallUlong: 0x53, ULong: 0x80, Uint: 0x70, diff --git a/src/amqp10/encoder.ts b/src/amqp10/encoder.ts index d8f39149..d8bb3283 100644 --- a/src/amqp10/encoder.ts +++ b/src/amqp10/encoder.ts @@ -1,6 +1,13 @@ import { inspect } from "node:util" import { isDate } from "node:util/types" -import { Message, MessageApplicationProperties, MessageProperties } from "../publisher" +import { + AmqpByte, + Message, + MessageAnnotations, + MessageAnnotationsValue, + MessageApplicationProperties, + MessageProperties, +} from "../publisher" import { DataWriter } from "../requests/data_writer" const FormatCodeType = { @@ -23,6 +30,7 @@ const FormatCode = { Null: 0x40, SmallUlong: 0x53, Uint: 0x70, + Ubyte: 0x50, Int: 0x71, Timestamp: 0x83, } as const @@ -35,14 +43,14 @@ const PropertySizeDescription = type MessageApplicationPropertiesList = { key: string; value: string | number }[] -type MessageAnnotationsList = { key: string; value: string | number }[] +type MessageAnnotationsList = { key: string; value: MessageAnnotationsValue }[] export function amqpEncode( writer: DataWriter, { content, messageProperties, applicationProperties, messageAnnotations }: Message ): void { writer.writeUInt32(messageSize({ content, messageProperties, applicationProperties, messageAnnotations })) - writeMessageAnnotations(writer, toList(messageAnnotations)) + writeMessageAnnotations(writer, toAnnotationsList(messageAnnotations)) writeProperties(writer, messageProperties) writeApplicationProperties(writer, toList(applicationProperties)) writeContent(writer, content) @@ -53,7 +61,7 @@ export function messageSize({ content, messageProperties, applicationProperties, lengthOfContent(content) + lengthOfProperties(messageProperties) + lengthOfApplicationProperties(toList(applicationProperties)) + - lengthOfMessageAnnotations(toList(messageAnnotations)) + lengthOfMessageAnnotations(toAnnotationsList(messageAnnotations)) ) } @@ -121,7 +129,11 @@ function writeMessageAnnotations(writer: DataWriter, messageAnnotationsList: Mes .filter((elem) => elem.key) .forEach((elem) => { amqpWriteString(writer, elem.key) - typeof elem.value === "string" ? amqpWriteString(writer, elem.value) : amqpWriteIntNumber(writer, elem.value) + if (elem.value instanceof AmqpByte) { + amqpWriteByte(writer, elem.value) + } else { + typeof elem.value === "string" ? amqpWriteString(writer, elem.value) : amqpWriteIntNumber(writer, elem.value) + } }) } @@ -188,8 +200,12 @@ function getPropertySize(properties: MessageProperties): number { ) } -function getListSize(list: MessageApplicationPropertiesList | MessageAnnotationsList): number { - return list.reduce((sum, elem) => sum + getSizeOf(elem.key) + getSizeOf(elem.value), 0) +function getListSize(list: MessageAnnotationsList): number { + return list.reduce( + (sum: number, elem: { key: string; value: MessageAnnotationsValue }) => + sum + getSizeOf(elem.key) + getSizeOf(elem.value), + 0 + ) } function amqpWriteString(writer: DataWriter, data?: string): void { @@ -242,6 +258,11 @@ function amqpWriteIntNumber(writer: DataWriter, data?: number): void { writer.writeInt32(data) } +function amqpWriteByte(writer: DataWriter, data: AmqpByte): void { + writer.writeByte(FormatCode.Ubyte) + writer.writeByte(data.byteValue) +} + function amqpWriteBuffer(writer: DataWriter, data?: Buffer): void { if (!data || !data.length) { return amqpWriteNull(writer) @@ -269,11 +290,15 @@ function amqpWriteDate(writer: DataWriter, date?: Date): void { writer.writeUInt64(BigInt(date.getTime())) } -function getSizeOf(value?: string | Date | number | Buffer): number { +function getSizeOf(value?: string | Date | number | Buffer | AmqpByte): number { if (!value) { return 1 } + if (value instanceof AmqpByte) { + return 1 + 1 + } + if (typeof value === "string") { const count = Buffer.from(value).length return count <= 255 ? 1 + 1 + count : 1 + 4 + count @@ -300,3 +325,10 @@ function toList(applicationProperties?: MessageApplicationProperties): MessageAp return { key: elem[0], value: elem[1] } }) } + +function toAnnotationsList(annotations?: MessageAnnotations): MessageAnnotationsList { + if (!annotations) return [] + return Object.entries(annotations).map((elem) => { + return { key: elem[0], value: elem[1] } + }) +} diff --git a/src/publisher.ts b/src/publisher.ts index b6f08355..a06e21f8 100644 --- a/src/publisher.ts +++ b/src/publisher.ts @@ -15,7 +15,24 @@ import { PublishRequestV2 } from "./requests/publish_request_v2" export type MessageApplicationProperties = Record -export type MessageAnnotations = Record +export type MessageAnnotations = Record + +export type MessageAnnotationsValue = string | number | AmqpByte + +export class AmqpByte { + private value: number + + constructor(value: number) { + if (value > 255 || value < 0) { + throw new Error("Invalid byte, value must be between 0 and 255") + } + this.value = value + } + + public get byteValue() { + return this.value + } +} export interface MessageProperties { contentType?: string @@ -54,7 +71,7 @@ export interface Message { export interface MessageOptions { messageProperties?: MessageProperties applicationProperties?: Record - messageAnnotations?: Record + messageAnnotations?: Record } export interface Publisher { diff --git a/src/response_decoder.ts b/src/response_decoder.ts index b438b5c4..e4cb6941 100644 --- a/src/response_decoder.ts +++ b/src/response_decoder.ts @@ -411,6 +411,9 @@ export function decodeFormatCode(dataResponse: DataReader, formatCode: number, s return dataResponse.readUInt32() case FormatCode.SmallUlong: return dataResponse.readInt8() // Read a SmallUlong + case FormatCode.Ubyte: + dataResponse.forward(1) + return dataResponse.readUInt8() case FormatCode.ULong: return dataResponse.readUInt64() // Read an ULong case FormatCode.List0: diff --git a/test/e2e/declare_consumer.test.ts b/test/e2e/declare_consumer.test.ts index 64a3e25a..675e768c 100644 --- a/test/e2e/declare_consumer.test.ts +++ b/test/e2e/declare_consumer.test.ts @@ -1,13 +1,19 @@ import { expect } from "chai" +import { randomUUID } from "crypto" +import { readFileSync } from "fs" +import path from "path" import { Client, Publisher } from "../../src" import { + AmqpByte, Message, MessageAnnotations, MessageApplicationProperties, - MessageProperties, MessageHeader, + MessageProperties, } from "../../src/publisher" import { Offset } from "../../src/requests/subscribe_request" +import { BufferDataReader } from "../../src/response_decoder" +import { getMaxSharedConnectionInstances, range } from "../../src/util" import { createClient, createConsumer, @@ -16,20 +22,14 @@ import { createStreamName, } from "../support/fake_data" import { Rabbit, RabbitConnectionResponse } from "../support/rabbit" -import { getMaxSharedConnectionInstances, range } from "../../src/util" -import { BufferDataReader } from "../../src/response_decoder" import { + decodeMessageTesting, eventually, expectToThrowAsync, - username, - password, - createClassicPublisher, - decodeMessageTesting, getTestNodesFromEnv, + password, + username, } from "../support/util" -import { readFileSync } from "fs" -import path from "path" -import { randomUUID } from "crypto" describe("declare consumer", () => { let streamName: string @@ -241,32 +241,23 @@ describe("declare consumer", () => { await eventually(async () => expect(messageAnnotations).eql([annotations])) }).timeout(10000) - it("messageAnnotations are ignored by a classic driver", async () => { + it("messageAnnotations with bytes are read correctly", async () => { const messageAnnotations: MessageAnnotations[] = [] - const annotations = createAnnotations() - const classicPublisher = await createClassicPublisher() - await classicPublisher.ch.assertQueue("testQ", { - exclusive: false, - durable: true, - autoDelete: false, - arguments: { - "x-queue-type": "stream", // Mandatory to define stream queue - }, - }) - classicPublisher.ch.sendToQueue("testQ", Buffer.from("Hello"), { - headers: { - messageAnnotations: annotations, - }, - }) + const annotations = { test: new AmqpByte(123) } + await rabbit.createStream("testQ") + await client.declareConsumer( + { stream: "testQ", offset: Offset.next(), consumerRef: "test" }, + (message: Message) => { + messageAnnotations.push(message.messageAnnotations ?? {}) + } + ) - await client.declareConsumer({ stream: "testQ", offset: Offset.first() }, (message: Message) => { - messageAnnotations.push(message.messageAnnotations || {}) - }) + const testP = await client.declarePublisher({ stream: "testQ" }) + await testP.send(Buffer.from("Hello"), { messageAnnotations: annotations }) await eventually(async () => { - expect(messageAnnotations).not.eql([annotations]) - await classicPublisher.ch.close() - await classicPublisher.conn.close() + const [messageAnnotation] = messageAnnotations + expect(messageAnnotation).to.eql({ test: 123 }) }) }).timeout(10000)