diff --git a/src/cp.ts b/src/cp.ts index 3cb8117b92..b6a4a6bc49 100644 --- a/src/cp.ts +++ b/src/cp.ts @@ -35,26 +35,39 @@ export class Cp { command.push(srcPath); const writerStream = fs.createWriteStream(tmpFileName); const errStream = new WritableStreamBuffer(); - this.execInstance.exec( - namespace, - podName, - containerName, - command, - writerStream, - errStream, - null, - false, - async ({ status }) => { - writerStream.close(); - if (status === 'Failure' || errStream.size()) { - throw new Error(`Error from cpFromPod - details: \n ${errStream.getContentsAsString()}`); - } - await tar.x({ - file: tmpFileName, - cwd: tgtPath, - }); - }, - ); + return new Promise((resolve, reject) => { + this.execInstance + .exec( + namespace, + podName, + containerName, + command, + writerStream, + errStream, + null, + false, + async ({ status }) => { + try { + writerStream.close(); + if (status === 'Failure' || errStream.size()) { + return reject( + new Error( + `Error from cpFromPod - details: \n ${errStream.getContentsAsString()}`, + ), + ); + } + await tar.x({ + file: tmpFileName, + cwd: tgtPath, + }); + resolve(); + } catch (e) { + reject(e); + } + }, + ) + .catch(reject); + }); } /** @@ -78,20 +91,31 @@ export class Cp { await tar.c({ file: tmpFileName, cwd }, [srcPath]); const readStream = fs.createReadStream(tmpFileName); const errStream = new WritableStreamBuffer(); - this.execInstance.exec( - namespace, - podName, - containerName, - command, - null, - errStream, - readStream, - false, - async ({ status }) => { - if (status === 'Failure' || errStream.size()) { - throw new Error(`Error from cpToPod - details: \n ${errStream.getContentsAsString()}`); - } - }, - ); + return new Promise((resolve, reject) => { + this.execInstance + .exec( + namespace, + podName, + containerName, + command, + null, + errStream, + readStream, + false, + async ({ status }) => { + await fs.promises.unlink(tmpFileName); + if (status === 'Failure' || errStream.size()) { + reject( + new Error( + `Error from cpToPod - details: \n ${errStream.getContentsAsString()}`, + ), + ); + } else { + resolve(); + } + }, + ) + .catch(reject); + }); } } diff --git a/src/cp_test.ts b/src/cp_test.ts index 7c1bb70585..3a7771bb37 100644 --- a/src/cp_test.ts +++ b/src/cp_test.ts @@ -1,28 +1,55 @@ import { anything, anyFunction, instance, mock, verify, when } from 'ts-mockito'; import * as querystring from 'querystring'; +import { expect } from 'chai'; import WebSocket = require('isomorphic-ws'); - +import * as fs from 'node:fs'; +import * as path from 'node:path'; +import { tmpdir } from 'os'; +import * as tar from 'tar'; import { CallAwaiter } from '../test'; import { KubeConfig } from './config'; import { Exec } from './exec'; import { Cp } from './cp'; -import { WebSocketHandler, WebSocketInterface } from './web-socket-handler'; +import { BinaryHandler, WebSocketHandler, WebSocketInterface } from './web-socket-handler'; +import { V1Status } from './api'; +import { randomUUID } from 'crypto'; +import { sleep } from './util'; describe('Cp', () => { + let tmpDir: string | undefined; + + beforeEach(() => { + tmpDir = `${tmpdir()}/${randomUUID()}`; + fs.mkdirSync(tmpDir); + }); + + afterEach(() => { + if (tmpDir) { + fs.rmSync(tmpDir, { recursive: true, force: true }); + } + }); + describe('cpFromPod', () => { it('should run create tar command to a url', async () => { + // make the compile happy + if (!tmpDir) { + throw new Error('tmpDir not initialized'); + } + const kc = new KubeConfig(); - const fakeWebSocket: WebSocketInterface = mock(WebSocketHandler); - const exec = new Exec(kc, instance(fakeWebSocket)); + const fakeWebSocketInterface: WebSocketInterface = mock(WebSocketHandler); + const fakeWebSocket: WebSocket.WebSocket = mock(WebSocket); + const fakeConn: WebSocket.WebSocket = instance(fakeWebSocket); + const callAwaiter: CallAwaiter = new CallAwaiter(); + const exec = new Exec(kc, instance(fakeWebSocketInterface)); const cp = new Cp(kc, exec); const namespace = 'somenamespace'; const pod = 'somepod'; const container = 'container'; const srcPath = '/'; - const tgtPath = '/'; const cmdArray = ['tar', 'zcf', '-', srcPath]; - const path = `/api/v1/namespaces/${namespace}/pods/${pod}/exec`; + const queryPath = `/api/v1/namespaces/${namespace}/pods/${pod}/exec`; const query = { stdout: true, @@ -34,14 +61,56 @@ describe('Cp', () => { }; const queryStr = querystring.stringify(query); - await cp.cpFromPod(namespace, pod, container, srcPath, tgtPath); - // tslint:disable-next-line:max-line-length - verify(fakeWebSocket.connect(`${path}?${queryStr}`, null, anyFunction())).called(); + when(fakeWebSocketInterface.connect(`${queryPath}?${queryStr}`, null, anyFunction())).thenCall( + callAwaiter.resolveCall('connect', fakeConn), + ); + when(fakeWebSocket.close()).thenCall(callAwaiter.resolveCall('close')); + + let complete = false; + let lastErr = undefined; + const promise = cp + .cpFromPod(namespace, pod, container, srcPath, tmpDir) + .then(() => { + complete = true; + }) + .catch((err) => { + lastErr = err; + }); + expect(lastErr).to.be.undefined; + expect(complete).to.be.false; + + const binaryHandler: BinaryHandler = (await callAwaiter.awaitCall('connect'))[2]; + + // simulate a network hope with a sleep + await sleep(1); + const contents = fs.readFileSync('testdata/archive.tgz'); + binaryHandler(WebSocketHandler.StdoutStream, contents); + + // simulate a network hope with a sleep + await sleep(1); + const status: V1Status = { + status: 'Success', + }; + binaryHandler(WebSocketHandler.StatusStream, Buffer.from(JSON.stringify(status))); + + await promise; + + expect(lastErr).to.be.undefined; + expect(complete).to.be.true; + + const found = fs.readFileSync(path.join(tmpDir, 'archive.txt')).toString('utf8'); + const expected = fs.readFileSync('testdata/archive.txt').toString('utf8'); + expect(found).to.eq(expected); }); }); describe('cpToPod', () => { it('should run extract tar command to a url', async () => { + // make the compile happy + if (!tmpDir) { + throw new Error('tmpDir not initialized'); + } + const kc = new KubeConfig(); const fakeWebSocketInterface: WebSocketInterface = mock(WebSocketHandler); const fakeWebSocket: WebSocket.WebSocket = mock(WebSocket) as WebSocket.WebSocket; @@ -52,10 +121,11 @@ describe('Cp', () => { const namespace = 'somenamespace'; const pod = 'somepod'; const container = 'container'; - const srcPath = 'testdata/archive.txt'; + const srcPath = 'archive.txt'; const tgtPath = '/'; const cmdArray = ['tar', 'xf', '-', '-C', tgtPath]; - const path = `/api/v1/namespaces/${namespace}/pods/${pod}/exec`; + const cwd = 'testdata/'; + const queryPath = `/api/v1/namespaces/${namespace}/pods/${pod}/exec`; const query = { stdout: false, @@ -68,14 +138,56 @@ describe('Cp', () => { const queryStr = querystring.stringify(query); const fakeConn: WebSocket.WebSocket = instance(fakeWebSocket); - when(fakeWebSocketInterface.connect(`${path}?${queryStr}`, null, anyFunction())).thenResolve( - fakeConn, + when(fakeWebSocketInterface.connect(`${queryPath}?${queryStr}`, null, anyFunction())).thenCall( + callAwaiter.resolveCall('connect', fakeConn), ); - when(fakeWebSocket.send(anything())).thenCall(callAwaiter.resolveCall('send')); + + const outFilename = path.join(tmpDir, 'send-data.tar'); + const out = fs.createWriteStream(outFilename); + when(fakeWebSocket.send(anything())).thenCall((data) => { + const streamNum = data.readInt8(0); + if (streamNum === WebSocketHandler.StdinStream) { + out.write(data.subarray(1)); + } else { + console.log(streamNum); + } + }); + when(fakeWebSocket.close()).thenCall(callAwaiter.resolveCall('close')); - await cp.cpToPod(namespace, pod, container, srcPath, tgtPath); - verify(fakeWebSocketInterface.connect(`${path}?${queryStr}`, null, anyFunction())).called(); + let complete = false; + let lastErr = undefined; + const promise = cp + .cpToPod(namespace, pod, container, srcPath, tgtPath, cwd) + .then(() => { + complete = true; + }) + .catch((err) => { + lastErr = err; + }); + expect(lastErr).to.be.undefined; + expect(complete).to.be.false; + + const binaryHandler: BinaryHandler = (await callAwaiter.awaitCall('connect'))[2]; + + // wait for all data to be written and close called + await callAwaiter.awaitCall('close'); + out.close(); + await tar.x({ f: outFilename, cwd: tmpDir }); + + // simulate a network hope with a sleep + await sleep(1); + const status: V1Status = { + status: 'Success', + }; + binaryHandler(WebSocketHandler.StatusStream, Buffer.from(JSON.stringify(status))); + + await promise; + + expect(lastErr).to.be.undefined; + expect(complete).to.be.true; + + verify(fakeWebSocketInterface.connect(`${queryPath}?${queryStr}`, null, anyFunction())).called(); }); }); }); diff --git a/src/util.ts b/src/util.ts index 024b4416f9..3436c350a2 100644 --- a/src/util.ts +++ b/src/util.ts @@ -194,3 +194,7 @@ export const resolvablePromise = (): ResolvablePromise => { promise.reject = reject!; return promise; }; + +export const sleep = (ms: number): Promise => { + return new Promise((resolve) => setTimeout(resolve)); +}; diff --git a/src/web-socket-handler.ts b/src/web-socket-handler.ts index 628b1fb323..9b50d2ba5b 100644 --- a/src/web-socket-handler.ts +++ b/src/web-socket-handler.ts @@ -6,11 +6,14 @@ import { KubeConfig } from './config'; const protocols = ['v4.channel.k8s.io', 'v3.channel.k8s.io', 'v2.channel.k8s.io', 'channel.k8s.io']; +export type TextHandler = (text: string) => boolean; +export type BinaryHandler = (stream: number, buff: Buffer) => boolean; + export interface WebSocketInterface { connect( path: string, - textHandler: ((text: string) => boolean) | null, - binaryHandler: ((stream: number, buff: Buffer) => boolean) | null, + textHandler: TextHandler | null, + binaryHandler: BinaryHandler | null, ): Promise; } diff --git a/test/call-awaiter.ts b/test/call-awaiter.ts index e42048c7c7..3f8e3b84c8 100644 --- a/test/call-awaiter.ts +++ b/test/call-awaiter.ts @@ -3,11 +3,14 @@ import { EventEmitter } from 'events'; export class CallAwaiter extends EventEmitter { public awaitCall(event: string) { return new Promise((resolve) => { - this.once(event, resolve); + this.once(event, (...args: any[]) => resolve(args)); }); } - public resolveCall(event: string) { - return (...args: any[]) => this.emit(event, ...args); + public resolveCall(event: string, returnValue?: any) { + return (...args: any[]) => { + this.emit(event, ...args); + return returnValue; + } } } diff --git a/testdata/archive.tgz b/testdata/archive.tgz new file mode 100644 index 0000000000..2cfdc37436 Binary files /dev/null and b/testdata/archive.tgz differ