Skip to content

Commit

Permalink
[#262] Call Interval Active Request Histogram data
Browse files Browse the repository at this point in the history
  • Loading branch information
feelform committed Feb 6, 2025
1 parent 63880f7 commit ce2999d
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 102 deletions.
8 changes: 3 additions & 5 deletions demo/express/routes/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ router.get('/', async function(req, res, next) {

connection.query('SELECT id, name FROM users', function (error, results, fields) {
if (error) throw error
console.log('The solution is: ', results[0])
// console.log('The solution is: ', results[0])
})

connection.query('SELECT id, name FROM users WHERE id = ? AND name like ?', [1, 'name*'], async function (error, results, fields) {
Expand All @@ -46,11 +46,9 @@ router.get('/', async function(req, res, next) {

const response = await fetch(`http://localhost:3000/api2`)
const json = await response.json()
console.log(json)
// console.log(json)

setTimeout(() => {
res.render('index', { title: 'Express' })
}, 3000)
res.render('index', { title: 'Express' })
})

router.get('/api', function(req, res, next) {
Expand Down
84 changes: 43 additions & 41 deletions lib/client/grpc-data-sender.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ const CommandType = require('./command/command-type')
const GrpcReadableStream = require('./grpc-readable-stream')
const cmdMessages = require('../data/v1/Cmd_pb')
const wrappers = require('google-protobuf/google/protobuf/wrappers_pb')
const activeRequestRepository = require('../metric/active-request-repository')
const { setInterval } = require('node:timers/promises')

// AgentInfoSender.java
// refresh daily
Expand Down Expand Up @@ -75,9 +77,6 @@ class GrpcDataSender {
if (this.commandStream) {
this.commandStream.end()
}
if (this.activeThreadCountStream) {
this.activeThreadCountStream.end()
}
}

initializeClients() {
Expand Down Expand Up @@ -173,44 +172,54 @@ class GrpcDataSender {
this.sendCommandEcho(cmdEchoResponse, callArguments)
},
// ActiveThreadCountStreamSocket.java
'ACTIVE_THREAD_COUNT': () => {
const commonStreamResponse = new cmdMessages.PCmdStreamResponse()
commonStreamResponse.setResponseid(requestId)
commonStreamResponse.setSequenceid(++activeThreadCountSequenceId)
const stringValue = new wrappers.StringValue()
stringValue.setValue('')
commonStreamResponse.setMessage(stringValue)

const commandActiveThreadCountResponse = new cmdMessages.PCmdActiveThreadCountRes()
commandActiveThreadCountResponse.setCommonstreamresponse(commonStreamResponse)
commandActiveThreadCountResponse.setHistogramschematype(2)
commandActiveThreadCountResponse.addActivethreadcount(1)
commandActiveThreadCountResponse.setTimestamp(Date.now())

this.sendActiveThreadCount(commandActiveThreadCountResponse, callArguments)
'ACTIVE_THREAD_COUNT': async () => {
let breakHistogramInterval = false
const activeThreadCountStream = new GrpcReadableStream(() => {
callArguments = guardCallArguments(callArguments)
const callback = callArguments.getCallback()
return this.profilerClient.commandStreamActiveThreadCount((err, response) => {
if (err) {
log.error('activeThreadCountStream err: ', err)
}
activeThreadCountStream.end()
breakHistogramInterval = true
if (callback) {
callback(err, response)
}
})
})

for await (const responseId of setInterval(1000, requestId)) {
if (breakHistogramInterval) {
break
}

const commonStreamResponse = new cmdMessages.PCmdStreamResponse()
commonStreamResponse.setResponseid(responseId)
commonStreamResponse.setSequenceid(++activeThreadCountSequenceId)
const stringValue = new wrappers.StringValue()
stringValue.setValue('')
commonStreamResponse.setMessage(stringValue)

const commandActiveThreadCountResponse = new cmdMessages.PCmdActiveThreadCountRes()
commandActiveThreadCountResponse.setCommonstreamresponse(commonStreamResponse)
commandActiveThreadCountResponse.setHistogramschematype(2)

const histogram = activeRequestRepository.getCurrentActiveTraceHistogram()
histogram.histogramValues().forEach((value) => {
commandActiveThreadCountResponse.addActivethreadcount(value)
})
commandActiveThreadCountResponse.setTimestamp(Date.now())

activeThreadCountStream.push(commandActiveThreadCountResponse)
}
}
})
})
return writable
})
}

makeActiveThreadCountStream(callArguments) {
this.activeThreadCountStream = new GrpcReadableStream(() => {
callArguments = guardCallArguments(callArguments)
let options = callArguments.getOptions()
const callback = callArguments.getCallback()
return this.profilerClient.commandStreamActiveThreadCount(options, (err, response) => {
if (err) {
log.error('makeActiveThreadCountStream err: ', err)
}
if (callback) {
callback(err, response)
}
})
})
}

agentInfoRefreshInterval() {
return DEFAULT_AGENT_INFO_REFRESH_INTERVAL_MS
}
Expand Down Expand Up @@ -386,13 +395,6 @@ class GrpcDataSender {
})
}

sendActiveThreadCount(commandActiveThreadCountResponse, callArguments) {
if (!this.activeThreadCountStream) {
this.makeActiveThreadCountStream(callArguments)
}
this.activeThreadCountStream.push(commandActiveThreadCountResponse)
}

sendPing() {
const pPing = dataConvertor.convertPing()
if (log.isDebug()) {
Expand Down
88 changes: 32 additions & 56 deletions test/client/grpc-data-sender.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class DataSource extends DataSourceCallCountable {
initializeProfilerClients() { }
}

test.skip('Should send span', function (t) {
test('Should send span', function (t) {
agent.bindHttp()
sendSpanMethodOnDataCallback = null

Expand Down Expand Up @@ -166,7 +166,7 @@ test.skip('Should send span', function (t) {
})
})

test.skip('sendSpanChunk redis.SET.end', function (t) {
test('sendSpanChunk redis.SET.end', function (t) {
agent.bindHttp()
sendSpanMethodOnDataCallback = null
const server = new grpc.Server()
Expand Down Expand Up @@ -244,7 +244,7 @@ test.skip('sendSpanChunk redis.SET.end', function (t) {
})
})

test.skip('sendSpanChunk redis.GET.end', (t) => {
test('sendSpanChunk redis.GET.end', (t) => {
agent.bindHttp()
sendSpanMethodOnDataCallback = null
const server = new grpc.Server()
Expand Down Expand Up @@ -315,7 +315,7 @@ test.skip('sendSpanChunk redis.GET.end', (t) => {
})
})

test.skip('sendSpan', (t) => {
test('sendSpan', (t) => {
agent.bindHttp()
sendSpanMethodOnDataCallback = null
const server = new grpc.Server()
Expand Down Expand Up @@ -471,45 +471,6 @@ test('sendStat', (t) => {
collectorServer.tryShutdown(() => {
})
})
// let expectedStat = {
// 'agentId': 'express-node-sample-id',
// 'agentStartTime': 1593058531421,
// 'timestamp': 1593058537472,
// 'collectInterval': 1000,
// 'memory': {
// 'heapUsed': 37042600,
// 'heapTotal': 62197760
// },
// 'cpu': {
// 'user': 0.0003919068831319893,
// 'system': 0
// },
// 'activeTrace': {
// 'schema': {
// 'typeCode': 2,
// 'fast': 1000,
// 'normal': 3000,
// 'slow': 5000
// },
// 'typeCode': 2,
// 'fastCount': 0,
// 'normalCount': 0,
// 'slowCount': 0,
// 'verySlowCount': 0
// }
// }
// grpcDataSender.sendStat(expectedStat)

// const pStatMessage = grpcDataSender.actualPStatMessage
// const pAgentStat = pStatMessage.getAgentstat()
// t.plan(4)

// t.equal(pAgentStat.getTimestamp(), 1593058537472, 'timestamp')
// t.equal(pAgentStat.getCollectinterval(), 1000, 'collectInterval')

// const pCpuLoad = pAgentStat.getCpuload()
// t.equal(pCpuLoad.getJvmcpuload(), 0.0003919068831319893, 'cpu.user')
// t.equal(pCpuLoad.getSystemcpuload(), 0, 'cpu.system')
})

let requestId = 0
Expand Down Expand Up @@ -579,7 +540,7 @@ class ProfilerDataSource extends DataSourceCallCountable {
initializeAgentInfoScheduler() { }
}

test.skip('sendSupportedServicesCommand and commandEcho', (t) => {
test('sendSupportedServicesCommand and commandEcho', (t) => {
dataCallbackOnServerCall = null
const server = new grpc.Server()
server.addService(services.ProfilerCommandServiceService, {
Expand Down Expand Up @@ -622,16 +583,32 @@ test.skip('sendSupportedServicesCommand and commandEcho', (t) => {
})
})

test.skip('CommandStreamActiveThreadCount', (t) => {


test('CommandStreamActiveThreadCount', (t) => {
const server = new grpc.Server()
let interval
let ended = false
server.addService(services.ProfilerCommandServiceService, {
handleCommandV2: handleCommandV2Service,
handleCommand: handleCommandV2Service,
commandEcho: emptyResponseService,
commandStreamActiveThreadCount: emptyResponseService
commandStreamActiveThreadCount: (call, callback) => {
call.on('data', (data) => {
dataCallbackOnServerCall(data)
})
interval = setInterval(() => {
if (ended) {
callback(null, new Empty())
process.nextTick(() => {
t.end()
})
}
}, 1000)
}
})
let dataSender
server.bindAsync('127.0.0.1:0', grpc.ServerCredentials.createInsecure(), (error, port) => {
server.bindAsync('localhost:0', grpc.ServerCredentials.createInsecure(), (error, port) => {
dataSender = beforeSpecificOne(port, ProfilerDataSource)

let callCount = 0
Expand All @@ -643,16 +620,9 @@ test.skip('CommandStreamActiveThreadCount', (t) => {
t.equal(commonStreamResponse.getMessage().getValue(), '', 'message is empty')

t.equal(data.getHistogramschematype(), 2, 'histogram schema type')
t.equal(data.getActivethreadcountList()[0], 1, 'active thread count')
t.equal(data.getActivethreadcountList()[0], 0, 'active thread count')

console.log(`dataCallbackOnServerCall callCount: ${callCount}`)
if (callCount == 1) {
dataSender.commandStream.writableStream.on('close', () => {
t.end()
})
dataSender.close()
server.forceShutdown()
}
ended = true
}

const callArguments = new CallArgumentsBuilder(function () {
Expand All @@ -667,4 +637,10 @@ test.skip('CommandStreamActiveThreadCount', (t) => {
}).build()
dataSender.sendSupportedServicesCommand(callArguments)
})

t.teardown(() => {
clearInterval(interval)
dataSender.close()
server.forceShutdown()
})
})

0 comments on commit ce2999d

Please sign in to comment.