Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

77-Unsubscribe #89

Merged
merged 6 commits into from
May 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions src/connection.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import { Socket } from "net"
import { inspect } from "util"
import { Consumer, ConsumerFunc } from "./consumer"
import { STREAM_ALREADY_EXISTS_ERROR_CODE } from "./error_codes"
import { Heartbeat } from "./heartbeat"
import { Producer } from "./producer"
import { CloseRequest } from "./requests/close_request"
import { CreateStreamArguments, CreateStreamRequest } from "./requests/create_stream_request"
import { CreditRequest, CreditRequestParams } from "./requests/credit_request"
import { DeclarePublisherRequest } from "./requests/declare_publisher_request"
import { DeleteStreamRequest } from "./requests/delete_stream_request"
import { OpenRequest } from "./requests/open_request"
Expand All @@ -15,7 +13,6 @@ import { QueryPublisherRequest } from "./requests/query_publisher_request"
import { Request } from "./requests/request"
import { SaslAuthenticateRequest } from "./requests/sasl_authenticate_request"
import { SaslHandshakeRequest } from "./requests/sasl_handshake_request"
import { Offset, SubscribeRequest } from "./requests/subscribe_request"
import { TuneRequest } from "./requests/tune_request"
import { MetadataUpdateListener, ResponseDecoder } from "./response_decoder"
import { CloseResponse } from "./responses/close_response"
Expand All @@ -33,6 +30,11 @@ import { SubscribeResponse } from "./responses/subscribe_response"
import { TuneResponse } from "./responses/tune_response"
import { SaslHandshakeResponse } from "./responses/sasl_handshake_response"
import { SaslAuthenticateResponse } from "./responses/sasl_authenticate_response"
import { Offset, SubscribeRequest } from "./requests/subscribe_request"
import { Consumer, ConsumerFunc } from "./consumer"
import { CreditRequest, CreditRequestParams } from "./requests/credit_request"
import { UnsubscribeResponse } from "./responses/unsubscribe_response"
import { UnsubscribeRequest } from "./requests/unsubscribe_request"

export class Connection {
private readonly socket = new Socket()
Expand Down Expand Up @@ -127,7 +129,7 @@ export class Connection {

public async declareConsumer(params: DeclareConsumerParams, handle: ConsumerFunc): Promise<Consumer> {
const consumerId = this.incConsumerId()
const consumer = new Consumer(handle)
const consumer = new Consumer(handle, consumerId)
this.consumers.set(consumerId, consumer)

const res = await this.sendAndWait<SubscribeResponse>(
Expand All @@ -146,6 +148,25 @@ export class Connection {
return consumer
}

public async closeConsumer(consumerId: number) {
const consumer = this.consumers.get(consumerId)
if (!consumer) {
this.logger.error("Consumer does not exist")
throw new Error(`Consumer with id: ${consumerId} does not exist`)
}
const res = await this.sendAndWait<UnsubscribeResponse>(new UnsubscribeRequest(consumerId))
if (!res.ok) {
throw new Error(`Unsubscribe command returned error with code ${res.code} - ${errorMessageOf(res.code)}`)
}
this.consumers.delete(consumerId)
this.logger.info(`Closed consumer with id: ${consumerId}`)
return res.ok
}

public consumerCounts() {
return this.consumers.size
}

public send(cmd: Request): Promise<void> {
return new Promise((res, rej) => {
const body = cmd.toBuffer()
Expand Down
2 changes: 1 addition & 1 deletion src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Message } from "./producer"
export type ConsumerFunc = (message: Message) => void

export class Consumer {
constructor(readonly handle: ConsumerFunc) {}
constructor(readonly handle: ConsumerFunc, readonly consumerId: number) {}

async close(): Promise<void> {
throw new Error("Method not implemented.")
Expand Down
16 changes: 16 additions & 0 deletions src/requests/unsubscribe_request.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { UnsubscribeResponse } from "../responses/unsubscribe_response"
import { AbstractRequest } from "./abstract_request"
import { DataWriter } from "./data_writer"

export class UnsubscribeRequest extends AbstractRequest {
readonly key = 0x000c
readonly responseKey = UnsubscribeResponse.key

constructor(private subscriptionId: number) {
super()
}

protected writeContent(writer: DataWriter): void {
writer.writeUInt8(this.subscriptionId)
}
}
2 changes: 2 additions & 0 deletions src/response_decoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import { SubscribeResponse } from "./responses/subscribe_response"
import { DeliverResponse } from "./responses/deliver_response"
import { FormatCodeType, FormatCode } from "./amqp10/decoder"
import { CreditResponse } from "./responses/credit_response"
import { UnsubscribeResponse } from "./responses/unsubscribe_response"

// Frame => Size (Request | Response | Command)
// Size => uint32 (size without the 4 bytes of the size element)
Expand Down Expand Up @@ -315,6 +316,7 @@ export class ResponseDecoder {
this.addFactoryFor(DeleteStreamResponse)
this.addFactoryFor(QueryPublisherResponse)
this.addFactoryFor(SubscribeResponse)
this.addFactoryFor(UnsubscribeResponse)
}

add(data: Buffer) {
Expand Down
5 changes: 5 additions & 0 deletions src/responses/unsubscribe_response.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import { AbstractResponse } from "./abstract_response"

export class UnsubscribeResponse extends AbstractResponse {
static key = 0x800c
}
46 changes: 46 additions & 0 deletions test/e2e/close_consumer.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { expect } from "chai"
import { Connection, connect } from "../../src"
import { Offset } from "../../src/requests/subscribe_request"
import { Rabbit } from "../support/rabbit"
import { expectToThrowAsync } from "../support/util"

describe("close consumer", () => {
const rabbit = new Rabbit()
const testStreamName = "test-stream"
let connection: Connection

beforeEach(async () => {
await rabbit.createStream(testStreamName)
connection = await connect({
hostname: "localhost",
port: 5552,
username: "rabbit",
password: "rabbit",
vhost: "/",
frameMax: 0,
heartbeat: 0,
})
})

afterEach(async () => {
await connection.close()
await rabbit.deleteStream(testStreamName)
})

it("closing a consumer in an existing stream", async () => {
await connection.declarePublisher({ stream: testStreamName })
const consumer = await connection.declareConsumer({ stream: testStreamName, offset: Offset.first() }, console.log)

const response = await connection.closeConsumer(consumer.consumerId)

expect(response).eql(true)
expect(connection.consumerCounts()).eql(0)
}).timeout(5000)

it("closing a non-existing consumer should rise an error", async () => {
const nonExistingConsumerId = 123456
await connection.declarePublisher({ stream: testStreamName })

await expectToThrowAsync(() => connection.closeConsumer(nonExistingConsumerId), Error)
})
})