Skip to content

Commit

Permalink
19 - Adds frame max negotiation (#125)
Browse files Browse the repository at this point in the history
* Adds frame max negotiation

* Fix unlimited frame max setting

* fix: use get property instead of method to retrieve current frameMax value in connection

* fix: rename constants related to frameMax parameter

* remove useless comments

---------

Co-authored-by: magne <magnello@coders51.com>
Co-authored-by: Igor Cappello <igor.cappello@gmail.com>
Co-authored-by: Alberto Barrilá <alberto.barrila@gmail.com>
  • Loading branch information
4 people authored Dec 19, 2023
1 parent c08b348 commit 4282389
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 9 deletions.
21 changes: 16 additions & 5 deletions src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import {
PublishErrorListener,
ResponseDecoder,
} from "./response_decoder"
import { removeFrom } from "./util"
import { DEFAULT_FRAME_MAX, DEFAULT_UNLIMITED_FRAME_MAX, removeFrom } from "./util"
import { WaitingResponse } from "./waiting_response"
import { SubscribeResponse } from "./responses/subscribe_response"
import { TuneResponse } from "./responses/tune_response"
Expand Down Expand Up @@ -62,7 +62,7 @@ export class Connection {
private consumers = new Map<number, Consumer>()
private compressions = new Map<CompressionType, Compression>()

constructor(private readonly logger: Logger) {
constructor(private readonly logger: Logger, private frameMax: number = DEFAULT_FRAME_MAX) {
this.heartbeat = new Heartbeat(this, this.logger)
this.compressions.set(CompressionType.None, NoneCompression.create())
this.compressions.set(CompressionType.Gzip, GzipCompression.create())
Expand All @@ -89,7 +89,7 @@ export class Connection {
}

static connect(params: ConnectionParams, logger?: Logger): Promise<Connection> {
return new Connection(logger ?? new NullLogger()).start(params)
return new Connection(logger ?? new NullLogger(), params.frameMax).start(params)
}

public start(params: ConnectionParams): Promise<Connection> {
Expand Down Expand Up @@ -235,6 +235,10 @@ export class Connection {
return this.consumers.size
}

public get currentFrameMax() {
return this.frameMax
}

public send(cmd: Request): Promise<void> {
return new Promise((res, rej) => {
const body = cmd.toBuffer()
Expand Down Expand Up @@ -326,7 +330,8 @@ export class Connection {
const heartbeat = extractHeartbeatInterval(heartbeatInterval, tuneResponse)

return new Promise((res, rej) => {
const request = new TuneRequest({ frameMax: tuneResponse.frameMax, heartbeat })
this.frameMax = this.calculateFrameMaxSizeFrom(tuneResponse.frameMax)
const request = new TuneRequest({ frameMax: this.frameMax, heartbeat })
this.socket.write(request.toBuffer(), (err) => {
this.logger.debug(`Write COMPLETED for cmd TUNE: ${inspect(tuneResponse)} - err: ${err}`)
return err ? rej(err) : res({ heartbeat })
Expand Down Expand Up @@ -463,6 +468,12 @@ export class Connection {
response.messages.map((x) => consumer.handle(x))
})
}

private calculateFrameMaxSizeFrom(tuneResponseFrameMax: number) {
if (this.frameMax === DEFAULT_UNLIMITED_FRAME_MAX) return tuneResponseFrameMax
if (tuneResponseFrameMax === DEFAULT_UNLIMITED_FRAME_MAX) return this.frameMax
return Math.min(this.frameMax, tuneResponseFrameMax)
}
}

export type ListenersParams = {
Expand All @@ -477,7 +488,7 @@ export interface ConnectionParams {
username: string
password: string
vhost: string
frameMax?: number // not used
frameMax?: number
heartbeat?: number
listeners?: ListenersParams
}
Expand Down
3 changes: 3 additions & 0 deletions src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,6 @@ export function range(count: number): number[] {
}
return ret
}

export const DEFAULT_FRAME_MAX = 1048576
export const DEFAULT_UNLIMITED_FRAME_MAX = 0
38 changes: 38 additions & 0 deletions test/e2e/connect_frame_size_negotiation.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { expect } from "chai"
import { createConnection } from "../support/fake_data"
import { Rabbit } from "../support/rabbit"
import { eventually, username, password } from "../support/util"

describe("connect frame size negotiation", () => {
const rabbit = new Rabbit(username, password)

it("using 65536 as frameMax", async () => {
const frameMax = 65536

const connection = await createConnection(username, password, undefined, frameMax)

await eventually(async () => {
expect(connection.currentFrameMax).lte(frameMax)
expect(await rabbit.getConnections()).lengthOf(1)
}, 5000)
try {
await connection.close()
await rabbit.closeAllConnections()
} catch (e) {}
}).timeout(10000)

it("using 1024 as frameMax", async () => {
const frameMax = 1024

const connection = await createConnection(username, password, undefined, frameMax)

await eventually(async () => {
expect(connection.currentFrameMax).lte(frameMax)
expect(await rabbit.getConnections()).lengthOf(1)
}, 5000)
try {
await connection.close()
await rabbit.closeAllConnections()
} catch (e) {}
}).timeout(10000)
})
4 changes: 2 additions & 2 deletions test/support/fake_data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ export async function createPublisher(streamName: string, connection: Connection
return publisher
}

export function createConnection(username: string, password: string, listeners?: ListenersParams) {
export function createConnection(username: string, password: string, listeners?: ListenersParams, frameMax?: number) {
return connect({
hostname: "localhost",
port: 5552,
username,
password,
vhost: "/",
frameMax: 0, // not used
frameMax: frameMax ?? 0,
heartbeat: 0,
listeners: listeners,
})
Expand Down
4 changes: 2 additions & 2 deletions test/unit/delete_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ describe("Delete command", () => {
username,
password,
vhost: "/",
frameMax: 0, // not used
heartbeat: 0, // not user
frameMax: 0,
heartbeat: 0,
})
})

Expand Down

0 comments on commit 4282389

Please sign in to comment.