Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: issue #729 and use Buffer rather than string for CIRAChannel's s… #732

Merged
merged 1 commit into from
Nov 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions src/amt/APFProcessor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
**********************************************************************/

import { Buffer } from 'node:buffer'
import Common from '../utils/common'
import { logger } from '../logging'
import APFProcessor, { APFProtocol } from './APFProcessor'
Expand Down Expand Up @@ -259,9 +260,7 @@ describe('APFProcessor Tests', () => {

it('should return 9 if sending entire pending buffer', () => {
const fakeCiraChannel: CIRAChannel = {
sendBuffer: {
length: 1000
},
sendBuffer: Buffer.alloc(1000),
sendcredits: 1000,
socket: {
write: jest.fn()
Expand All @@ -287,7 +286,7 @@ describe('APFProcessor Tests', () => {

it('should return 9 if sending partial pending buffer', () => {
const fakeCiraChannel: CIRAChannel = {
sendBuffer: 'my fake buffer',
sendBuffer: Buffer.from('my fake buffer'),
sendcredits: 5,
socket: {
write: jest.fn()
Expand Down Expand Up @@ -461,7 +460,7 @@ describe('APFProcessor Tests', () => {
const length = 17
const data = ''
fakeCiraChannel.closing = 0
fakeCiraChannel.sendBuffer = 'fake buffer'
fakeCiraChannel.sendBuffer = Buffer.from('fake buffer')
fakeCiraChannel.onStateChange = new EventEmitter()
const result = APFProcessor.channelOpenConfirmation(fakeCiraSocket, length, data)
expect(result).toEqual(17)
Expand All @@ -475,7 +474,7 @@ describe('APFProcessor Tests', () => {
const data = ''
const readIntSpy = jest.spyOn(Common, 'ReadInt').mockReturnValue(1)
fakeCiraChannel.closing = 0
fakeCiraChannel.sendBuffer = 'fake buffer'
fakeCiraChannel.sendBuffer = Buffer.from('fake buffer')
fakeCiraChannel.onStateChange = new EventEmitter()
const result = APFProcessor.channelOpenConfirmation(fakeCiraSocket, length, data)
expect(result).toEqual(17)
Expand Down Expand Up @@ -1139,13 +1138,16 @@ describe('APFProcessor Tests', () => {
})

it('should SendChannelData', () => {
APFProcessor.SendChannelData(fakeCiraSocket, channelid, data)
const dataExpected =
writeSpy = jest.spyOn(fakeCiraSocket, 'write')
APFProcessor.SendChannelData(fakeCiraSocket, channelid, Buffer.from(data))
const dataExpected = Buffer.from(
String.fromCharCode(APFProtocol.CHANNEL_DATA) +
Common.IntToStr(channelid) +
Common.IntToStr(data.length) +
data
expect(writeSpy).toHaveBeenCalledWith(fakeCiraSocket, dataExpected)
Common.IntToStr(channelid) +
Common.IntToStr(data.length) +
data,
'binary'
)
expect(writeSpy).toHaveBeenCalledWith(dataExpected)
})

it('should SendChannelWindowAdjust', () => {
Expand Down
57 changes: 30 additions & 27 deletions src/amt/APFProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
* SPDX-License-Identifier: Apache-2.0
**********************************************************************/

import { Buffer } from 'node:buffer'
import { logger, messages } from '../logging'
import Common from '../utils/common'
import { CIRASocket } from '../models/models'
import { CIRAChannel } from './CIRAChannel'
import { EventEmitter } from 'stream'
const KEEPALIVE_INTERVAL = 30 // 30 seconds is typical keepalive interval for AMT CIRA connection

Expand Down Expand Up @@ -155,19 +157,8 @@ const APFProcessor = {
}
cirachannel.sendcredits += ByteToAdd
logger.silly(`${messages.MPS_WINDOW_ADJUST}, ${RecipientChannel.toString()}, ${ByteToAdd.toString()}, ${cirachannel.sendcredits}`)
if (cirachannel.state === 2 && cirachannel.sendBuffer != null) {
// Compute how much data we can send
if (cirachannel.sendBuffer.length <= cirachannel.sendcredits) {
// Send the entire pending buffer
APFProcessor.SendChannelData(cirachannel.socket, cirachannel.amtchannelid, cirachannel.sendBuffer)
cirachannel.sendcredits -= cirachannel.sendBuffer.length
delete cirachannel.sendBuffer
} else {
// Send a part of the pending buffer
APFProcessor.SendChannelData(cirachannel.socket, cirachannel.amtchannelid, cirachannel.sendBuffer.substring(0, cirachannel.sendcredits))
cirachannel.sendBuffer = cirachannel.sendBuffer.substring(cirachannel.sendcredits)
cirachannel.sendcredits = 0
}
if (cirachannel.state === 2) {
APFProcessor.SendPendingData(cirachannel)
}
return 9
},
Expand Down Expand Up @@ -232,19 +223,7 @@ const APFProcessor = {
} else {
cirachannel.state = 2
// Send any pending data
if (cirachannel.sendBuffer != null) {
if (cirachannel.sendBuffer.length <= cirachannel.sendcredits) {
// Send the entire pending buffer
APFProcessor.SendChannelData(cirachannel.socket, cirachannel.amtchannelid, cirachannel.sendBuffer)
cirachannel.sendcredits -= cirachannel.sendBuffer.length
delete cirachannel.sendBuffer
} else {
// Send a part of the pending buffer
APFProcessor.SendChannelData(cirachannel.socket, cirachannel.amtchannelid, cirachannel.sendBuffer.substring(0, cirachannel.sendcredits))
cirachannel.sendBuffer = cirachannel.sendBuffer.substring(cirachannel.sendcredits)
cirachannel.sendcredits = 0
}
}
APFProcessor.SendPendingData(cirachannel)
// Indicate the channel is open
if (cirachannel.onStateChange) {
cirachannel.onStateChange.emit('stateChange', cirachannel.state)
Expand Down Expand Up @@ -535,15 +514,39 @@ const APFProcessor = {
APFProcessor.Write(socket, String.fromCharCode(APFProtocol.CHANNEL_CLOSE) + Common.IntToStr(channelid))
},

SendChannelData: (socket: CIRASocket, channelid, data): void => {
SendPendingData: (cirachannel: CIRAChannel): void => {
if (cirachannel.sendBuffer != null) {
if (cirachannel.sendBuffer.length <= cirachannel.sendcredits) {
// Send the entire pending buffer
APFProcessor.SendChannelData(cirachannel.socket, cirachannel.amtchannelid, cirachannel.sendBuffer)
cirachannel.sendcredits -= cirachannel.sendBuffer.length
delete cirachannel.sendBuffer
} else {
// Send a part of the pending buffer
APFProcessor.SendChannelData(cirachannel.socket, cirachannel.amtchannelid, cirachannel.sendBuffer.subarray(0, cirachannel.sendcredits))
cirachannel.sendBuffer = cirachannel.sendBuffer.subarray(cirachannel.sendcredits)
cirachannel.sendcredits = 0
}
}
},

SendChannelData: (socket: CIRASocket, channelid, data: Buffer): void => {
logger.silly(`${messages.MPS_SEND_CHANNEL_DATA}, ${channelid}`)
const b = Buffer.allocUnsafe(9 + data.length)
b[0] = APFProtocol.CHANNEL_DATA
b.writeUInt32BE(channelid, 1)
b.writeUInt32BE(data.length, 5)
data.copy(b, 9)
socket.write(b)
/*
APFProcessor.Write(
socket,
String.fromCharCode(APFProtocol.CHANNEL_DATA) +
Common.IntToStr(channelid) +
Common.IntToStr(data.length) +
data
)
*/
},

SendChannelWindowAdjust: (socket: CIRASocket, channelid, bytestoadd): void => {
Expand Down
20 changes: 19 additions & 1 deletion src/amt/CIRAChannel.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,27 @@ describe('CIRA Channel', () => {
expect(sendChannelSpy).toHaveBeenCalledTimes(1)
expect(ciraChannel.sendcredits).toBe(0)
})
it('should write binary data to channel', async () => {
// Tests both the binary data path and appending to the sendBuffer.
// There are enough send credits for the initial sendBuffer only,
// so the string written should appear in sendBuffer.
ciraChannel.state = 2
ciraChannel.sendcredits = 10
ciraChannel.sendBuffer = Buffer.alloc(10)
const data = String.fromCharCode(1, 2, 3, 4, 0xC0, 5)
const sendChannelSpy = jest.spyOn(APFProcessor, 'SendChannelData').mockImplementation(() => {})

const writePromise = ciraChannel.writeData(data, null)
const responseData = await writePromise

expect(responseData).toEqual(null)
expect(sendChannelSpy).toHaveBeenCalledTimes(1)
expect(ciraChannel.sendcredits).toBe(0)
expect(ciraChannel.sendBuffer).toEqual(Buffer.from(data, 'binary'))
})
it('should resolve if data does not contain messageId', async () => {
ciraChannel.state = 2
ciraChannel.sendcredits = 116
ciraChannel.sendcredits = 97
const data = 'KVMR'
const params: connectionParams = {
guid: '4c4c4544-004b-4210-8033-b6c04f504633',
Expand Down
36 changes: 12 additions & 24 deletions src/amt/CIRAChannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
**********************************************************************/

// import httpZ, { HttpZResponseModel } from 'http-z'
import { Buffer } from 'node:buffer'
import { CIRASocket } from '../models/models'
import APFProcessor from './APFProcessor'
import { connectionParams, HttpHandler } from './HttpHandler'
Expand All @@ -21,7 +22,7 @@ export class CIRAChannel {
amtCiraWindow: number
ciraWindow: number
write?: (data: string) => Promise<string>
sendBuffer?: string = null
sendBuffer?: Buffer = null
amtchannelid?: number
closing?: number
onStateChange?: EventEmitter // (state: number) => void
Expand Down Expand Up @@ -83,34 +84,21 @@ export class CIRAChannel {
} else {
this.resolve = resolve
}
let wsmanRequest: any = data
if (this.state === 0) return reject(new Error('Closed'))// return false
let wsmanRequest: Buffer
if (params != null) { // this is an API Call
wsmanRequest = this.httpHandler.wrapIt(params, data)
} else {
wsmanRequest = Buffer.from(data, 'binary')
}
if (this.state === 0) return reject(new Error('Closed'))// return false
if (this.state === 1 || this.sendcredits === 0 || this.sendBuffer != null) {
if (this.sendBuffer == null) {
this.sendBuffer = wsmanRequest
} else {
this.sendBuffer += wsmanRequest
}
if (messageId == null) {
return resolve(null)
} else { return }
if (this.sendBuffer == null) {
this.sendBuffer = wsmanRequest
} else {
this.sendBuffer = Buffer.concat([this.sendBuffer, wsmanRequest])
}
// Compute how much data we can send
if (wsmanRequest?.length <= this.sendcredits) {
// Send the entire message
APFProcessor.SendChannelData(this.socket, this.amtchannelid, wsmanRequest)
this.sendcredits -= wsmanRequest.length
if (messageId == null) {
return resolve(null)
} else { return }
if (this.state !== 1 && this.sendcredits !== 0) {
APFProcessor.SendPendingData(this)
}
// Send a part of the message
this.sendBuffer = wsmanRequest.substring(this.sendcredits)
APFProcessor.SendChannelData(this.socket, this.amtchannelid, wsmanRequest.substring(0, this.sendcredits))
this.sendcredits = 0
if (messageId == null) {
resolve(null)
}
Expand Down
15 changes: 13 additions & 2 deletions src/amt/HttpHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export class HttpHandler {
this.parser = new xml2js.Parser({ ignoreAttrs: true, mergeAttrs: false, explicitArray: false, tagNameProcessors: [this.stripPrefix], valueProcessors: [myParseNumbers, xml2js.processors.parseBooleans] })
}

wrapIt (connectionParams: connectionParams, data: string): string {
wrapIt (connectionParams: connectionParams, data: string): Buffer {
try {
const url = '/wsman'
const action = 'POST'
Expand Down Expand Up @@ -71,6 +71,7 @@ export class HttpHandler {
})
message += `Authorization: ${authorizationRequestHeader}\r\n`
}
/*
// Use Chunked-Encoding
message += Buffer.from([
`Host: ${connectionParams.guid}:${connectionParams.port}`,
Expand All @@ -82,6 +83,15 @@ export class HttpHandler {
'\r\n'
].join('\r\n'), 'utf8')
return message
*/
const dataBuffer = Buffer.from(data, 'utf8')
message += `Host: ${connectionParams.guid}:${connectionParams.port}\r\nContent-Length: ${dataBuffer.length}\r\n\r\n`
const buffer = Buffer.concat([Buffer.from(message, 'utf8'), dataBuffer])
if (dataBuffer.length !== data.length) {
logger.silly(`wrapIt data length mismatch: Buffer.length = ${dataBuffer.length}, string.length = ${data.length}`)
logger.silly(buffer.toString('utf8'))
}
return buffer
} catch (err) {
logger.error(`${messages.CREATE_HASH_STRING_FAILED}:`, err.message)
return null
Expand Down Expand Up @@ -117,7 +127,8 @@ export class HttpHandler {

parseXML (xmlBody: string): any {
let wsmanResponse: string
this.parser.parseString(xmlBody, (err, result) => {
const xmlDecoded: string = Buffer.from(xmlBody, 'binary').toString('utf8')
this.parser.parseString(xmlDecoded, (err, result) => {
if (err) {
logger.error(`${messages.XML_PARSE_FAILED}:`, err)
wsmanResponse = null
Expand Down
22 changes: 21 additions & 1 deletion src/amt/httpHandler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,27 @@ it('should return a WSMan request', async () => {
password: 'P@ssw0rd'
}
const result = httpHandler.wrapIt(params, xmlRequestBody)
expect(result).toContain('Authorization')
expect(result.toString()).toContain('Authorization')
})
it('should properly encode UTF8 characters', async () => {
// À is codepoint 0x00C0, [0xC3, 0x80] when UTF8 encoded...
const xmlRequestBody = '<tag>FooÀbar</tag>'
const digestChallenge = {
realm: 'Digest:56ABC7BE224EF620C69EB88F01071DC8',
nonce: 'fVNueyEAAAAAAAAAcO8WqJ8s+WdyFUIY',
stale: 'false',
qop: 'auth'
}
const params: connectionParams = {
guid: '4c4c4544-004b-4210-8033-b6c04f504633',
port: 16992,
digestChallenge: digestChallenge,
username: 'admin',
password: 'P@ssw0rd'
}
const result: Buffer = httpHandler.wrapIt(params, xmlRequestBody)
expect(result.toString('binary')).toContain('<tag>Foo' + String.fromCharCode(0xC3, 0x80) + 'bar</tag>')
expect(result.toString('binary')).toContain('\r\nContent-Length: 19\r\n')
})
it('should return a null when no xml is passed to wrap a WSMan request', async () => {
const digestChallenge = {
Expand Down
2 changes: 1 addition & 1 deletion src/utils/redirectInterceptor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,7 @@ test('handleAuthenticateSession DIGEST with user, pass and digestRealm', () => {
r += String.fromCharCode(ws.authCNonce.length) // CNonce Length
r += ws.authCNonce // CNonce
r += String.fromCharCode(ncs.length) // NonceCount Length
r += ncs // NonceCount // NonceCount
r += ncs // NonceCount
r += String.fromCharCode(digest.length) // Response Length
r += digest // Response
r += String.fromCharCode(amt.digestQOP.length) // QOP Length
Expand Down