Skip to content

Commit

Permalink
[#262] Call Interval Active Request Histogram data
Browse files Browse the repository at this point in the history
  • Loading branch information
feelform committed Feb 6, 2025
1 parent 63880f7 commit b941f14
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 50 deletions.
8 changes: 3 additions & 5 deletions demo/express/routes/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ router.get('/', async function(req, res, next) {

connection.query('SELECT id, name FROM users', function (error, results, fields) {
if (error) throw error
console.log('The solution is: ', results[0])
// console.log('The solution is: ', results[0])
})

connection.query('SELECT id, name FROM users WHERE id = ? AND name like ?', [1, 'name*'], async function (error, results, fields) {
Expand All @@ -46,11 +46,9 @@ router.get('/', async function(req, res, next) {

const response = await fetch(`http://localhost:3000/api2`)
const json = await response.json()
console.log(json)
// console.log(json)

setTimeout(() => {
res.render('index', { title: 'Express' })
}, 3000)
res.render('index', { title: 'Express' })
})

router.get('/api', function(req, res, next) {
Expand Down
109 changes: 71 additions & 38 deletions lib/client/grpc-data-sender.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ const CommandType = require('./command/command-type')
const GrpcReadableStream = require('./grpc-readable-stream')
const cmdMessages = require('../data/v1/Cmd_pb')
const wrappers = require('google-protobuf/google/protobuf/wrappers_pb')
const activeRequestRepository = require('../metric/active-request-repository')
const { setInterval } = require('node:timers/promises')

// AgentInfoSender.java
// refresh daily
Expand Down Expand Up @@ -173,44 +175,82 @@ class GrpcDataSender {
this.sendCommandEcho(cmdEchoResponse, callArguments)
},
// ActiveThreadCountStreamSocket.java
'ACTIVE_THREAD_COUNT': () => {
const commonStreamResponse = new cmdMessages.PCmdStreamResponse()
commonStreamResponse.setResponseid(requestId)
commonStreamResponse.setSequenceid(++activeThreadCountSequenceId)
const stringValue = new wrappers.StringValue()
stringValue.setValue('')
commonStreamResponse.setMessage(stringValue)

const commandActiveThreadCountResponse = new cmdMessages.PCmdActiveThreadCountRes()
commandActiveThreadCountResponse.setCommonstreamresponse(commonStreamResponse)
commandActiveThreadCountResponse.setHistogramschematype(2)
commandActiveThreadCountResponse.addActivethreadcount(1)
commandActiveThreadCountResponse.setTimestamp(Date.now())

this.sendActiveThreadCount(commandActiveThreadCountResponse, callArguments)
'ACTIVE_THREAD_COUNT': async () => {
let breakHistogramInterval = false
const activeThreadCountStream = new GrpcReadableStream(() => {
callArguments = guardCallArguments(callArguments)
const callback = callArguments.getCallback()
return this.profilerClient.commandStreamActiveThreadCount((err, response) => {
if (err) {
log.error('activeThreadCountStream err: ', err)
activeThreadCountStream.end()
breakHistogramInterval = true
}
if (callback) {
callback(err, response)
}
})
})

const getEnableBreakHistogramInterval = () => {
return breakHistogramInterval
}

activeThreadCountStream.writableStream.on('drain', () => {
log.info('activeThreadCountStream writableStream drain')
})

activeThreadCountStream.writableStream.on('status', (status) => {
log.info('activeThreadCountStream writableStream status', status)
})

activeThreadCountStream.readableStream.on('status', (status) => {
log.info('activeThreadCountStream readableStream status', status)
})

activeThreadCountStream.writableStream.on('error', (error) => {
log.info('activeThreadCountStream writableStream error', error)
})

activeThreadCountStream.writableStream.on('finish', () => {
log.info('activeThreadCountStream writableStream finish')
})

activeThreadCountStream.writableStream.on('close', (error) => {
log.info('activeThreadCountStream writableStream close', error)
})

for await (const responseId of setInterval(1000, requestId)) {
if (getEnableBreakHistogramInterval()) {
break
}

const commonStreamResponse = new cmdMessages.PCmdStreamResponse()
commonStreamResponse.setResponseid(responseId)
commonStreamResponse.setSequenceid(++activeThreadCountSequenceId)
const stringValue = new wrappers.StringValue()
stringValue.setValue('')
commonStreamResponse.setMessage(stringValue)

const commandActiveThreadCountResponse = new cmdMessages.PCmdActiveThreadCountRes()
commandActiveThreadCountResponse.setCommonstreamresponse(commonStreamResponse)
commandActiveThreadCountResponse.setHistogramschematype(2)

const histogram = activeRequestRepository.getCurrentActiveTraceHistogram()
histogram.histogramValues().forEach((value) => {
commandActiveThreadCountResponse.addActivethreadcount(value)
})
commandActiveThreadCountResponse.setTimestamp(Date.now())

activeThreadCountStream.push(commandActiveThreadCountResponse)
}
}
})
})
return writable
})
}

makeActiveThreadCountStream(callArguments) {
this.activeThreadCountStream = new GrpcReadableStream(() => {
callArguments = guardCallArguments(callArguments)
let options = callArguments.getOptions()
const callback = callArguments.getCallback()
return this.profilerClient.commandStreamActiveThreadCount(options, (err, response) => {
if (err) {
log.error('makeActiveThreadCountStream err: ', err)
}
if (callback) {
callback(err, response)
}
})
})
}

agentInfoRefreshInterval() {
return DEFAULT_AGENT_INFO_REFRESH_INTERVAL_MS
}
Expand Down Expand Up @@ -386,13 +426,6 @@ class GrpcDataSender {
})
}

sendActiveThreadCount(commandActiveThreadCountResponse, callArguments) {
if (!this.activeThreadCountStream) {
this.makeActiveThreadCountStream(callArguments)
}
this.activeThreadCountStream.push(commandActiveThreadCountResponse)
}

sendPing() {
const pPing = dataConvertor.convertPing()
if (log.isDebug()) {
Expand Down
14 changes: 7 additions & 7 deletions test/client/grpc-data-sender.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class DataSource extends DataSourceCallCountable {
initializeProfilerClients() { }
}

test.skip('Should send span', function (t) {
test('Should send span', function (t) {
agent.bindHttp()
sendSpanMethodOnDataCallback = null

Expand Down Expand Up @@ -166,7 +166,7 @@ test.skip('Should send span', function (t) {
})
})

test.skip('sendSpanChunk redis.SET.end', function (t) {
test('sendSpanChunk redis.SET.end', function (t) {
agent.bindHttp()
sendSpanMethodOnDataCallback = null
const server = new grpc.Server()
Expand Down Expand Up @@ -244,7 +244,7 @@ test.skip('sendSpanChunk redis.SET.end', function (t) {
})
})

test.skip('sendSpanChunk redis.GET.end', (t) => {
test('sendSpanChunk redis.GET.end', (t) => {
agent.bindHttp()
sendSpanMethodOnDataCallback = null
const server = new grpc.Server()
Expand Down Expand Up @@ -315,7 +315,7 @@ test.skip('sendSpanChunk redis.GET.end', (t) => {
})
})

test.skip('sendSpan', (t) => {
test('sendSpan', (t) => {
agent.bindHttp()
sendSpanMethodOnDataCallback = null
const server = new grpc.Server()
Expand Down Expand Up @@ -579,7 +579,7 @@ class ProfilerDataSource extends DataSourceCallCountable {
initializeAgentInfoScheduler() { }
}

test.skip('sendSupportedServicesCommand and commandEcho', (t) => {
test('sendSupportedServicesCommand and commandEcho', (t) => {
dataCallbackOnServerCall = null
const server = new grpc.Server()
server.addService(services.ProfilerCommandServiceService, {
Expand Down Expand Up @@ -622,7 +622,7 @@ test.skip('sendSupportedServicesCommand and commandEcho', (t) => {
})
})

test.skip('CommandStreamActiveThreadCount', (t) => {
test('CommandStreamActiveThreadCount', (t) => {
const server = new grpc.Server()
server.addService(services.ProfilerCommandServiceService, {
handleCommandV2: handleCommandV2Service,
Expand All @@ -643,7 +643,7 @@ test.skip('CommandStreamActiveThreadCount', (t) => {
t.equal(commonStreamResponse.getMessage().getValue(), '', 'message is empty')

t.equal(data.getHistogramschematype(), 2, 'histogram schema type')
t.equal(data.getActivethreadcountList()[0], 1, 'active thread count')
t.equal(data.getActivethreadcountList()[0], 0, 'active thread count')

console.log(`dataCallbackOnServerCall callCount: ${callCount}`)
if (callCount == 1) {
Expand Down

0 comments on commit b941f14

Please sign in to comment.