Skip to content

Commit

Permalink
StreamStats Command
Browse files Browse the repository at this point in the history
Add's streamStatsRequest function to connection. This returns an error
if one occurs, or a map of stream stats
  • Loading branch information
ablease committed May 22, 2023
1 parent 13b21fb commit 403a6ec
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 0 deletions.
11 changes: 11 additions & 0 deletions src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ import { Consumer, ConsumerFunc } from "./consumer"
import { UnsubscribeResponse } from "./responses/unsubscribe_response"
import { UnsubscribeRequest } from "./requests/unsubscribe_request"
import { CreditRequest, CreditRequestParams } from "./requests/credit_request"
import { StreamStatsRequest } from "./requests/stream_stats_request"
import { StreamStatsResponse } from "./responses/stream_stats_response"

export class Connection {
private readonly socket = new Socket()
Expand Down Expand Up @@ -258,6 +260,15 @@ export class Connection {
return res.sequence
}

public async streamStatsRequest(streamName: string) {
const res = await this.sendAndWait<StreamStatsResponse>(new StreamStatsRequest(streamName))
if (!res.ok) {
throw new Error(`Stream Stats command returned error with code ${res.code} - ${errorMessageOf(res.code)}`)
}
this.logger.info(`Statistics for stream name ${streamName}, ${res.statistics}`)
return res.statistics
}

private responseReceived<T extends Response>(response: T) {
const wr = removeFrom(this.waitingResponses as WaitingResponse<T>[], (x) => x.waitingFor(response))
return wr ? wr.resolve(response) : this.receivedResponses.push(response)
Expand Down
16 changes: 16 additions & 0 deletions src/requests/stream_stats_request.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { StreamStatsResponse } from "../responses/stream_stats_response"
import { AbstractRequest } from "./abstract_request"
import { DataWriter } from "./data_writer"

export class StreamStatsRequest extends AbstractRequest {
readonly responseKey = StreamStatsResponse.key
readonly key = 0x001c

constructor(private streamName: string) {
super()
}

writeContent(writer: DataWriter) {
writer.writeString(this.streamName)
}
}
2 changes: 2 additions & 0 deletions src/response_decoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import { Message, MessageApplicationProperties, MessageProperties } from "./prod
import { ApplicationProperties } from "./amqp10/applicationProperties"
import { TuneResponse } from "./responses/tune_response"
import { PublishErrorResponse } from "./responses/publish_error_response"
import { StreamStatsResponse } from "./responses/stream_stats_response"

// Frame => Size (Request | Response | Command)
// Size => uint32 (size without the 4 bytes of the size element)
Expand Down Expand Up @@ -471,6 +472,7 @@ export class ResponseDecoder {
this.addFactoryFor(QueryPublisherResponse)
this.addFactoryFor(SubscribeResponse)
this.addFactoryFor(UnsubscribeResponse)
this.addFactoryFor(StreamStatsResponse)
}

add(data: Buffer) {
Expand Down
19 changes: 19 additions & 0 deletions src/responses/stream_stats_response.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { AbstractResponse } from "./abstract_response"
import { RawResponse } from "./raw_response"

export class StreamStatsResponse extends AbstractResponse {
static key = 0x801c
readonly statistics: Record<string, bigint> = {}

constructor(response: RawResponse) {
super(response)
this.verifyKey(StreamStatsResponse)

const stats = this.response.payload.readInt32()
for (let i = 0; i < stats; i++) {
const statKey = this.response.payload.readString()
const statVal = this.response.payload.readInt64()
this.statistics[statKey] = statVal
}
}
}
45 changes: 45 additions & 0 deletions test/unit/stream_stats.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { Connection } from "../../src"
import { expect } from "chai"
import { Rabbit } from "../support/rabbit"
import { randomUUID } from "crypto"
import { expectToThrowAsync, username, password } from "../support/util"
import { createConnection } from "../support/fake_data"

describe("StreamStats", () => {
const rabbit = new Rabbit(username, password)
const testStreamName = "test-stream"
let connection: Connection
let publisherRef: string

beforeEach(async () => {
publisherRef = randomUUID()
await rabbit.createStream(testStreamName)
connection = await createConnection(username, password)
})

afterEach(async () => {
await connection.close()
await rabbit.deleteStream(testStreamName)
})

it("gets statistics for a stream", async () => {
const publisher = await connection.declarePublisher({ stream: testStreamName, publisherRef })
for (let i = 0; i < 5; i++) {
await publisher.send(Buffer.from(`test${randomUUID()}`))
}

const stats = await connection.streamStatsRequest(testStreamName)

expect(stats.committed_chunk_id).to.be.a("BigInt")
expect(stats.first_chunk_id).to.be.a("BigInt")
expect(stats.last_chunk_id).to.be.a("BigInt")
}).timeout(10000)

it("returns an error when the stream does not exist", async () => {
await expectToThrowAsync(
() => connection.streamStatsRequest("stream-does-not-exist"),
Error,
"Stream Stats command returned error with code 2 - Stream does not exist"
)
}).timeout(10000)
})

0 comments on commit 403a6ec

Please sign in to comment.