Skip to content

Commit

Permalink
fix: issue open-amt-cloud-toolkit#729 and use Buffer rather than stri…
Browse files Browse the repository at this point in the history
…ng for CIRAChannel's sendBuffer

Added test for CIRAChannel writeData() binary path
  • Loading branch information
orinem authored and matt-primrose committed Nov 22, 2022
1 parent 2be2aed commit de565d1
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 68 deletions.
26 changes: 14 additions & 12 deletions src/amt/APFProcessor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
**********************************************************************/

import { Buffer } from 'node:buffer'
import Common from '../utils/common'
import { logger } from '../logging'
import APFProcessor, { APFProtocol } from './APFProcessor'
Expand Down Expand Up @@ -259,9 +260,7 @@ describe('APFProcessor Tests', () => {

it('should return 9 if sending entire pending buffer', () => {
const fakeCiraChannel: CIRAChannel = {
sendBuffer: {
length: 1000
},
sendBuffer: Buffer.alloc(1000),
sendcredits: 1000,
socket: {
write: jest.fn()
Expand All @@ -287,7 +286,7 @@ describe('APFProcessor Tests', () => {

it('should return 9 if sending partial pending buffer', () => {
const fakeCiraChannel: CIRAChannel = {
sendBuffer: 'my fake buffer',
sendBuffer: Buffer.from('my fake buffer'),
sendcredits: 5,
socket: {
write: jest.fn()
Expand Down Expand Up @@ -461,7 +460,7 @@ describe('APFProcessor Tests', () => {
const length = 17
const data = ''
fakeCiraChannel.closing = 0
fakeCiraChannel.sendBuffer = 'fake buffer'
fakeCiraChannel.sendBuffer = Buffer.from('fake buffer')
fakeCiraChannel.onStateChange = new EventEmitter()
const result = APFProcessor.channelOpenConfirmation(fakeCiraSocket, length, data)
expect(result).toEqual(17)
Expand All @@ -475,7 +474,7 @@ describe('APFProcessor Tests', () => {
const data = ''
const readIntSpy = jest.spyOn(Common, 'ReadInt').mockReturnValue(1)
fakeCiraChannel.closing = 0
fakeCiraChannel.sendBuffer = 'fake buffer'
fakeCiraChannel.sendBuffer = Buffer.from('fake buffer')
fakeCiraChannel.onStateChange = new EventEmitter()
const result = APFProcessor.channelOpenConfirmation(fakeCiraSocket, length, data)
expect(result).toEqual(17)
Expand Down Expand Up @@ -1139,13 +1138,16 @@ describe('APFProcessor Tests', () => {
})

it('should SendChannelData', () => {
APFProcessor.SendChannelData(fakeCiraSocket, channelid, data)
const dataExpected =
writeSpy = jest.spyOn(fakeCiraSocket, 'write')
APFProcessor.SendChannelData(fakeCiraSocket, channelid, Buffer.from(data))
const dataExpected = Buffer.from(
String.fromCharCode(APFProtocol.CHANNEL_DATA) +
Common.IntToStr(channelid) +
Common.IntToStr(data.length) +
data
expect(writeSpy).toHaveBeenCalledWith(fakeCiraSocket, dataExpected)
Common.IntToStr(channelid) +
Common.IntToStr(data.length) +
data,
'binary'
)
expect(writeSpy).toHaveBeenCalledWith(dataExpected)
})

it('should SendChannelWindowAdjust', () => {
Expand Down
57 changes: 30 additions & 27 deletions src/amt/APFProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
* SPDX-License-Identifier: Apache-2.0
**********************************************************************/

import { Buffer } from 'node:buffer'
import { logger, messages } from '../logging'
import Common from '../utils/common'
import { CIRASocket } from '../models/models'
import { CIRAChannel } from './CIRAChannel'
import { EventEmitter } from 'stream'
const KEEPALIVE_INTERVAL = 30 // 30 seconds is typical keepalive interval for AMT CIRA connection

Expand Down Expand Up @@ -155,19 +157,8 @@ const APFProcessor = {
}
cirachannel.sendcredits += ByteToAdd
logger.silly(`${messages.MPS_WINDOW_ADJUST}, ${RecipientChannel.toString()}, ${ByteToAdd.toString()}, ${cirachannel.sendcredits}`)
if (cirachannel.state === 2 && cirachannel.sendBuffer != null) {
// Compute how much data we can send
if (cirachannel.sendBuffer.length <= cirachannel.sendcredits) {
// Send the entire pending buffer
APFProcessor.SendChannelData(cirachannel.socket, cirachannel.amtchannelid, cirachannel.sendBuffer)
cirachannel.sendcredits -= cirachannel.sendBuffer.length
delete cirachannel.sendBuffer
} else {
// Send a part of the pending buffer
APFProcessor.SendChannelData(cirachannel.socket, cirachannel.amtchannelid, cirachannel.sendBuffer.substring(0, cirachannel.sendcredits))
cirachannel.sendBuffer = cirachannel.sendBuffer.substring(cirachannel.sendcredits)
cirachannel.sendcredits = 0
}
if (cirachannel.state === 2) {
APFProcessor.SendPendingData(cirachannel)
}
return 9
},
Expand Down Expand Up @@ -232,19 +223,7 @@ const APFProcessor = {
} else {
cirachannel.state = 2
// Send any pending data
if (cirachannel.sendBuffer != null) {
if (cirachannel.sendBuffer.length <= cirachannel.sendcredits) {
// Send the entire pending buffer
APFProcessor.SendChannelData(cirachannel.socket, cirachannel.amtchannelid, cirachannel.sendBuffer)
cirachannel.sendcredits -= cirachannel.sendBuffer.length
delete cirachannel.sendBuffer
} else {
// Send a part of the pending buffer
APFProcessor.SendChannelData(cirachannel.socket, cirachannel.amtchannelid, cirachannel.sendBuffer.substring(0, cirachannel.sendcredits))
cirachannel.sendBuffer = cirachannel.sendBuffer.substring(cirachannel.sendcredits)
cirachannel.sendcredits = 0
}
}
APFProcessor.SendPendingData(cirachannel)
// Indicate the channel is open
if (cirachannel.onStateChange) {
cirachannel.onStateChange.emit('stateChange', cirachannel.state)
Expand Down Expand Up @@ -535,15 +514,39 @@ const APFProcessor = {
APFProcessor.Write(socket, String.fromCharCode(APFProtocol.CHANNEL_CLOSE) + Common.IntToStr(channelid))
},

SendChannelData: (socket: CIRASocket, channelid, data): void => {
SendPendingData: (cirachannel: CIRAChannel): void => {
if (cirachannel.sendBuffer != null) {
if (cirachannel.sendBuffer.length <= cirachannel.sendcredits) {
// Send the entire pending buffer
APFProcessor.SendChannelData(cirachannel.socket, cirachannel.amtchannelid, cirachannel.sendBuffer)
cirachannel.sendcredits -= cirachannel.sendBuffer.length
delete cirachannel.sendBuffer
} else {
// Send a part of the pending buffer
APFProcessor.SendChannelData(cirachannel.socket, cirachannel.amtchannelid, cirachannel.sendBuffer.subarray(0, cirachannel.sendcredits))
cirachannel.sendBuffer = cirachannel.sendBuffer.subarray(cirachannel.sendcredits)
cirachannel.sendcredits = 0
}
}
},

SendChannelData: (socket: CIRASocket, channelid, data: Buffer): void => {
logger.silly(`${messages.MPS_SEND_CHANNEL_DATA}, ${channelid}`)
const b = Buffer.allocUnsafe(9 + data.length)
b[0] = APFProtocol.CHANNEL_DATA
b.writeUInt32BE(channelid, 1)
b.writeUInt32BE(data.length, 5)
data.copy(b, 9)
socket.write(b)
/*
APFProcessor.Write(
socket,
String.fromCharCode(APFProtocol.CHANNEL_DATA) +
Common.IntToStr(channelid) +
Common.IntToStr(data.length) +
data
)
*/
},

SendChannelWindowAdjust: (socket: CIRASocket, channelid, bytestoadd): void => {
Expand Down
20 changes: 19 additions & 1 deletion src/amt/CIRAChannel.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,27 @@ describe('CIRA Channel', () => {
expect(sendChannelSpy).toHaveBeenCalledTimes(1)
expect(ciraChannel.sendcredits).toBe(0)
})
it('should write binary data to channel', async () => {
// Tests both the binary data path and appending to the sendBuffer.
// There are enough send credits for the initial sendBuffer only,
// so the string written should appear in sendBuffer.
ciraChannel.state = 2
ciraChannel.sendcredits = 10
ciraChannel.sendBuffer = Buffer.alloc(10)
const data = String.fromCharCode(1, 2, 3, 4, 0xC0, 5)
const sendChannelSpy = jest.spyOn(APFProcessor, 'SendChannelData').mockImplementation(() => {})

const writePromise = ciraChannel.writeData(data, null)
const responseData = await writePromise

expect(responseData).toEqual(null)
expect(sendChannelSpy).toHaveBeenCalledTimes(1)
expect(ciraChannel.sendcredits).toBe(0)
expect(ciraChannel.sendBuffer).toEqual(Buffer.from(data, 'binary'))
})
it('should resolve if data does not contain messageId', async () => {
ciraChannel.state = 2
ciraChannel.sendcredits = 116
ciraChannel.sendcredits = 97
const data = 'KVMR'
const params: connectionParams = {
guid: '4c4c4544-004b-4210-8033-b6c04f504633',
Expand Down
36 changes: 12 additions & 24 deletions src/amt/CIRAChannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
**********************************************************************/

// import httpZ, { HttpZResponseModel } from 'http-z'
import { Buffer } from 'node:buffer'
import { CIRASocket } from '../models/models'
import APFProcessor from './APFProcessor'
import { connectionParams, HttpHandler } from './HttpHandler'
Expand All @@ -21,7 +22,7 @@ export class CIRAChannel {
amtCiraWindow: number
ciraWindow: number
write?: (data: string) => Promise<string>
sendBuffer?: string = null
sendBuffer?: Buffer = null
amtchannelid?: number
closing?: number
onStateChange?: EventEmitter // (state: number) => void
Expand Down Expand Up @@ -83,34 +84,21 @@ export class CIRAChannel {
} else {
this.resolve = resolve
}
let wsmanRequest: any = data
if (this.state === 0) return reject(new Error('Closed'))// return false
let wsmanRequest: Buffer
if (params != null) { // this is an API Call
wsmanRequest = this.httpHandler.wrapIt(params, data)
} else {
wsmanRequest = Buffer.from(data, 'binary')
}
if (this.state === 0) return reject(new Error('Closed'))// return false
if (this.state === 1 || this.sendcredits === 0 || this.sendBuffer != null) {
if (this.sendBuffer == null) {
this.sendBuffer = wsmanRequest
} else {
this.sendBuffer += wsmanRequest
}
if (messageId == null) {
return resolve(null)
} else { return }
if (this.sendBuffer == null) {
this.sendBuffer = wsmanRequest
} else {
this.sendBuffer = Buffer.concat([this.sendBuffer, wsmanRequest])
}
// Compute how much data we can send
if (wsmanRequest?.length <= this.sendcredits) {
// Send the entire message
APFProcessor.SendChannelData(this.socket, this.amtchannelid, wsmanRequest)
this.sendcredits -= wsmanRequest.length
if (messageId == null) {
return resolve(null)
} else { return }
if (this.state !== 1 && this.sendcredits !== 0) {
APFProcessor.SendPendingData(this)
}
// Send a part of the message
this.sendBuffer = wsmanRequest.substring(this.sendcredits)
APFProcessor.SendChannelData(this.socket, this.amtchannelid, wsmanRequest.substring(0, this.sendcredits))
this.sendcredits = 0
if (messageId == null) {
resolve(null)
}
Expand Down
15 changes: 13 additions & 2 deletions src/amt/HttpHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export class HttpHandler {
this.parser = new xml2js.Parser({ ignoreAttrs: true, mergeAttrs: false, explicitArray: false, tagNameProcessors: [this.stripPrefix], valueProcessors: [myParseNumbers, xml2js.processors.parseBooleans] })
}

wrapIt (connectionParams: connectionParams, data: string): string {
wrapIt (connectionParams: connectionParams, data: string): Buffer {
try {
const url = '/wsman'
const action = 'POST'
Expand Down Expand Up @@ -71,6 +71,7 @@ export class HttpHandler {
})
message += `Authorization: ${authorizationRequestHeader}\r\n`
}
/*
// Use Chunked-Encoding
message += Buffer.from([
`Host: ${connectionParams.guid}:${connectionParams.port}`,
Expand All @@ -82,6 +83,15 @@ export class HttpHandler {
'\r\n'
].join('\r\n'), 'utf8')
return message
*/
const dataBuffer = Buffer.from(data, 'utf8')
message += `Host: ${connectionParams.guid}:${connectionParams.port}\r\nContent-Length: ${dataBuffer.length}\r\n\r\n`
const buffer = Buffer.concat([Buffer.from(message, 'utf8'), dataBuffer])
if (dataBuffer.length !== data.length) {
logger.silly(`wrapIt data length mismatch: Buffer.length = ${dataBuffer.length}, string.length = ${data.length}`)
logger.silly(buffer.toString('utf8'))
}
return buffer
} catch (err) {
logger.error(`${messages.CREATE_HASH_STRING_FAILED}:`, err.message)
return null
Expand Down Expand Up @@ -117,7 +127,8 @@ export class HttpHandler {

parseXML (xmlBody: string): any {
let wsmanResponse: string
this.parser.parseString(xmlBody, (err, result) => {
const xmlDecoded: string = Buffer.from(xmlBody, 'binary').toString('utf8')
this.parser.parseString(xmlDecoded, (err, result) => {
if (err) {
logger.error(`${messages.XML_PARSE_FAILED}:`, err)
wsmanResponse = null
Expand Down
22 changes: 21 additions & 1 deletion src/amt/httpHandler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,27 @@ it('should return a WSMan request', async () => {
password: 'P@ssw0rd'
}
const result = httpHandler.wrapIt(params, xmlRequestBody)
expect(result).toContain('Authorization')
expect(result.toString()).toContain('Authorization')
})
it('should properly encode UTF8 characters', async () => {
// À is codepoint 0x00C0, [0xC3, 0x80] when UTF8 encoded...
const xmlRequestBody = '<tag>FooÀbar</tag>'
const digestChallenge = {
realm: 'Digest:56ABC7BE224EF620C69EB88F01071DC8',
nonce: 'fVNueyEAAAAAAAAAcO8WqJ8s+WdyFUIY',
stale: 'false',
qop: 'auth'
}
const params: connectionParams = {
guid: '4c4c4544-004b-4210-8033-b6c04f504633',
port: 16992,
digestChallenge: digestChallenge,
username: 'admin',
password: 'P@ssw0rd'
}
const result: Buffer = httpHandler.wrapIt(params, xmlRequestBody)
expect(result.toString('binary')).toContain('<tag>Foo' + String.fromCharCode(0xC3, 0x80) + 'bar</tag>')
expect(result.toString('binary')).toContain('\r\nContent-Length: 19\r\n')
})
it('should return a null when no xml is passed to wrap a WSMan request', async () => {
const digestChallenge = {
Expand Down
2 changes: 1 addition & 1 deletion src/utils/redirectInterceptor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,7 @@ test('handleAuthenticateSession DIGEST with user, pass and digestRealm', () => {
r += String.fromCharCode(ws.authCNonce.length) // CNonce Length
r += ws.authCNonce // CNonce
r += String.fromCharCode(ncs.length) // NonceCount Length
r += ncs // NonceCount // NonceCount
r += ncs // NonceCount
r += String.fromCharCode(digest.length) // Response Length
r += digest // Response
r += String.fromCharCode(amt.digestQOP.length) // QOP Length
Expand Down

0 comments on commit de565d1

Please sign in to comment.