Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
FedericoDiTosto committed May 5, 2023
1 parent 31d20c3 commit e4e8b77
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 17 deletions.
8 changes: 3 additions & 5 deletions src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,23 +149,21 @@ export class Connection {
}

public async closeConsumer(consumerId: number) {
const targetConsumer = this.consumers.get(consumerId)
if (!targetConsumer) {
const consumer = this.consumers.get(consumerId)
if (!consumer) {
this.logger.error("Consumer does not exist")
throw new Error(`Consumer with id: ${consumerId} does not exist`)
}
const res = await this.sendAndWait<UnsubscribeResponse>(new UnsubscribeRequest(consumerId))

if (!res.ok) {
throw new Error(`Unsubscribe command returned error with code ${res.code} - ${errorMessageOf(res.code)}`)
}

this.consumers.delete(consumerId)
this.logger.info(`Closed consumer with id: ${consumerId}`)
return res.ok
}

public getConsumersNumber() {
public consumerCounts() {
return this.consumers.size
}

Expand Down
18 changes: 6 additions & 12 deletions test/e2e/close_consumer.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { expect } from "chai"
import { Connection, connect } from "../../src"
import { Message } from "../../src/producer"
import { Offset } from "../../src/requests/subscribe_request"
import { Rabbit } from "../support/rabbit"
import { eventually, expectToThrowAsync } from "../support/util"
import { expectToThrowAsync } from "../support/util"

describe("close consumer", () => {
const rabbit = new Rabbit()
Expand All @@ -12,7 +11,6 @@ describe("close consumer", () => {

beforeEach(async () => {
await rabbit.createStream(testStreamName)

connection = await connect({
hostname: "localhost",
port: 5552,
Expand All @@ -30,23 +28,19 @@ describe("close consumer", () => {
})

it("closing a consumer in an existing stream", async () => {
const messages: Buffer[] = []
await connection.declarePublisher({ stream: testStreamName })
const consumer = await connection.declareConsumer(
{ stream: testStreamName, offset: Offset.first() },
(message: Message) => messages.push(message.content)
)
const consumer = await connection.declareConsumer({ stream: testStreamName, offset: Offset.first() }, console.log)

const response = await connection.closeConsumer(consumer.consumerId)

await eventually(() => expect(response).eql(true))
await eventually(() => expect(connection.getConsumersNumber()).eql(0))
}).timeout(10000)
expect(response).eql(true)
expect(connection.consumerCounts()).eql(0)
}).timeout(5000)

it("closing a non-existing consumer should rise an error", async () => {
const nonExistingConsumerId = 123456
await connection.declarePublisher({ stream: testStreamName })

await expectToThrowAsync(() => connection.closeConsumer(nonExistingConsumerId), Error)
}).timeout(10000)
})
})

0 comments on commit e4e8b77

Please sign in to comment.