Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

19 - Adds frame max negotiation #125

Merged
merged 5 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import {
PublishErrorListener,
ResponseDecoder,
} from "./response_decoder"
import { removeFrom } from "./util"
import { DEFAULT_FRAME_MAX, DEFAULT_UNLIMITED_FRAME_MAX, removeFrom } from "./util"
import { WaitingResponse } from "./waiting_response"
import { SubscribeResponse } from "./responses/subscribe_response"
import { TuneResponse } from "./responses/tune_response"
Expand Down Expand Up @@ -62,7 +62,7 @@ export class Connection {
private consumers = new Map<number, Consumer>()
private compressions = new Map<CompressionType, Compression>()

constructor(private readonly logger: Logger) {
constructor(private readonly logger: Logger, private frameMax: number = DEFAULT_FRAME_MAX) {
this.heartbeat = new Heartbeat(this, this.logger)
this.compressions.set(CompressionType.None, NoneCompression.create())
this.compressions.set(CompressionType.Gzip, GzipCompression.create())
Expand All @@ -89,7 +89,7 @@ export class Connection {
}

static connect(params: ConnectionParams, logger?: Logger): Promise<Connection> {
return new Connection(logger ?? new NullLogger()).start(params)
return new Connection(logger ?? new NullLogger(), params.frameMax).start(params)
}

public start(params: ConnectionParams): Promise<Connection> {
Expand Down Expand Up @@ -235,6 +235,10 @@ export class Connection {
return this.consumers.size
}

public get currentFrameMax() {
return this.frameMax
}

public send(cmd: Request): Promise<void> {
return new Promise((res, rej) => {
const body = cmd.toBuffer()
Expand Down Expand Up @@ -326,7 +330,8 @@ export class Connection {
const heartbeat = extractHeartbeatInterval(heartbeatInterval, tuneResponse)

return new Promise((res, rej) => {
const request = new TuneRequest({ frameMax: tuneResponse.frameMax, heartbeat })
this.frameMax = this.calculateFrameMaxSizeFrom(tuneResponse.frameMax)
const request = new TuneRequest({ frameMax: this.frameMax, heartbeat })
this.socket.write(request.toBuffer(), (err) => {
this.logger.debug(`Write COMPLETED for cmd TUNE: ${inspect(tuneResponse)} - err: ${err}`)
return err ? rej(err) : res({ heartbeat })
Expand Down Expand Up @@ -463,6 +468,12 @@ export class Connection {
response.messages.map((x) => consumer.handle(x))
})
}

private calculateFrameMaxSizeFrom(tuneResponseFrameMax: number) {
if (this.frameMax === DEFAULT_UNLIMITED_FRAME_MAX) return tuneResponseFrameMax
if (tuneResponseFrameMax === DEFAULT_UNLIMITED_FRAME_MAX) return this.frameMax
return Math.min(this.frameMax, tuneResponseFrameMax)
}
}

export type ListenersParams = {
Expand All @@ -477,7 +488,7 @@ export interface ConnectionParams {
username: string
password: string
vhost: string
frameMax?: number // not used
frameMax?: number
heartbeat?: number
listeners?: ListenersParams
}
Expand Down
3 changes: 3 additions & 0 deletions src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,6 @@ export function range(count: number): number[] {
}
return ret
}

export const DEFAULT_FRAME_MAX = 1048576
export const DEFAULT_UNLIMITED_FRAME_MAX = 0
38 changes: 38 additions & 0 deletions test/e2e/connect_frame_size_negotiation.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { expect } from "chai"
import { createConnection } from "../support/fake_data"
import { Rabbit } from "../support/rabbit"
import { eventually, username, password } from "../support/util"

describe("connect frame size negotiation", () => {
const rabbit = new Rabbit(username, password)

it("using 65536 as frameMax", async () => {
const frameMax = 65536

const connection = await createConnection(username, password, undefined, frameMax)

await eventually(async () => {
expect(connection.currentFrameMax).lte(frameMax)
expect(await rabbit.getConnections()).lengthOf(1)
}, 5000)
try {
await connection.close()
await rabbit.closeAllConnections()
} catch (e) {}
}).timeout(10000)

it("using 1024 as frameMax", async () => {
const frameMax = 1024

const connection = await createConnection(username, password, undefined, frameMax)

await eventually(async () => {
expect(connection.currentFrameMax).lte(frameMax)
expect(await rabbit.getConnections()).lengthOf(1)
}, 5000)
try {
await connection.close()
await rabbit.closeAllConnections()
} catch (e) {}
}).timeout(10000)
})
4 changes: 2 additions & 2 deletions test/support/fake_data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ export async function createPublisher(streamName: string, connection: Connection
return publisher
}

export function createConnection(username: string, password: string, listeners?: ListenersParams) {
export function createConnection(username: string, password: string, listeners?: ListenersParams, frameMax?: number) {
return connect({
hostname: "localhost",
port: 5552,
username,
password,
vhost: "/",
frameMax: 0, // not used
frameMax: frameMax ?? 0,
heartbeat: 0,
listeners: listeners,
})
Expand Down
4 changes: 2 additions & 2 deletions test/unit/delete_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ describe("Delete command", () => {
username,
password,
vhost: "/",
frameMax: 0, // not used
heartbeat: 0, // not user
frameMax: 0,
heartbeat: 0,
})
})

Expand Down