Skip to content

Commit

Permalink
Fix cp promise returns by converting the exec callbacks into promises
Browse files Browse the repository at this point in the history
  • Loading branch information
joeferner committed Sep 21, 2024
1 parent b351ddb commit 4117590
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 56 deletions.
94 changes: 59 additions & 35 deletions src/cp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,26 +35,39 @@ export class Cp {
command.push(srcPath);
const writerStream = fs.createWriteStream(tmpFileName);
const errStream = new WritableStreamBuffer();
this.execInstance.exec(
namespace,
podName,
containerName,
command,
writerStream,
errStream,
null,
false,
async ({ status }) => {
writerStream.close();
if (status === 'Failure' || errStream.size()) {
throw new Error(`Error from cpFromPod - details: \n ${errStream.getContentsAsString()}`);
}
await tar.x({
file: tmpFileName,
cwd: tgtPath,
});
},
);
return new Promise<void>((resolve, reject) => {
this.execInstance
.exec(
namespace,
podName,
containerName,
command,
writerStream,
errStream,
null,
false,
async ({ status }) => {
try {
writerStream.close();
if (status === 'Failure' || errStream.size()) {
return reject(
new Error(
`Error from cpFromPod - details: \n ${errStream.getContentsAsString()}`,
),
);
}
await tar.x({
file: tmpFileName,
cwd: tgtPath,
});
resolve();
} catch (e) {
reject(e);
}
},
)
.catch(reject);
});
}

/**
Expand All @@ -78,20 +91,31 @@ export class Cp {
await tar.c({ file: tmpFileName, cwd }, [srcPath]);
const readStream = fs.createReadStream(tmpFileName);
const errStream = new WritableStreamBuffer();
this.execInstance.exec(
namespace,
podName,
containerName,
command,
null,
errStream,
readStream,
false,
async ({ status }) => {
if (status === 'Failure' || errStream.size()) {
throw new Error(`Error from cpToPod - details: \n ${errStream.getContentsAsString()}`);
}
},
);
return new Promise<void>((resolve, reject) => {
this.execInstance
.exec(
namespace,
podName,
containerName,
command,
null,
errStream,
readStream,
false,
async ({ status }) => {
await fs.promises.unlink(tmpFileName);
if (status === 'Failure' || errStream.size()) {
reject(
new Error(
`Error from cpToPod - details: \n ${errStream.getContentsAsString()}`,
),
);
} else {
resolve();
}
},
)
.catch(reject);
});
}
}
144 changes: 128 additions & 16 deletions src/cp_test.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,55 @@
import { anything, anyFunction, instance, mock, verify, when } from 'ts-mockito';
import * as querystring from 'querystring';
import { expect } from 'chai';
import WebSocket = require('isomorphic-ws');

import * as fs from 'node:fs';
import * as path from 'node:path';
import { tmpdir } from 'os';
import * as tar from 'tar';
import { CallAwaiter } from '../test';
import { KubeConfig } from './config';
import { Exec } from './exec';
import { Cp } from './cp';
import { WebSocketHandler, WebSocketInterface } from './web-socket-handler';
import { BinaryHandler, WebSocketHandler, WebSocketInterface } from './web-socket-handler';
import { V1Status } from './api';
import { randomUUID } from 'crypto';
import { sleep } from './util';

describe('Cp', () => {
let tmpDir: string | undefined;

beforeEach(() => {
tmpDir = `${tmpdir()}/${randomUUID()}`;
fs.mkdirSync(tmpDir);
});

afterEach(() => {
if (tmpDir) {
fs.rmSync(tmpDir, { recursive: true, force: true });
}
});

describe('cpFromPod', () => {
it('should run create tar command to a url', async () => {
// make the compile happy
if (!tmpDir) {
throw new Error('tmpDir not initialized');
}

const kc = new KubeConfig();
const fakeWebSocket: WebSocketInterface = mock(WebSocketHandler);
const exec = new Exec(kc, instance(fakeWebSocket));
const fakeWebSocketInterface: WebSocketInterface = mock(WebSocketHandler);
const fakeWebSocket: WebSocket.WebSocket = mock(WebSocket);
const fakeConn: WebSocket.WebSocket = instance(fakeWebSocket);
const callAwaiter: CallAwaiter = new CallAwaiter();
const exec = new Exec(kc, instance(fakeWebSocketInterface));
const cp = new Cp(kc, exec);

const namespace = 'somenamespace';
const pod = 'somepod';
const container = 'container';
const srcPath = '/';
const tgtPath = '/';
const cmdArray = ['tar', 'zcf', '-', srcPath];
const path = `/api/v1/namespaces/${namespace}/pods/${pod}/exec`;
const queryPath = `/api/v1/namespaces/${namespace}/pods/${pod}/exec`;

const query = {
stdout: true,
Expand All @@ -34,14 +61,56 @@ describe('Cp', () => {
};
const queryStr = querystring.stringify(query);

await cp.cpFromPod(namespace, pod, container, srcPath, tgtPath);
// tslint:disable-next-line:max-line-length
verify(fakeWebSocket.connect(`${path}?${queryStr}`, null, anyFunction())).called();
when(fakeWebSocketInterface.connect(`${queryPath}?${queryStr}`, null, anyFunction())).thenCall(
callAwaiter.resolveCall('connect', fakeConn),
);
when(fakeWebSocket.close()).thenCall(callAwaiter.resolveCall('close'));

let complete = false;
let lastErr = undefined;
const promise = cp
.cpFromPod(namespace, pod, container, srcPath, tmpDir)
.then(() => {
complete = true;
})
.catch((err) => {
lastErr = err;
});
expect(lastErr).to.be.undefined;
expect(complete).to.be.false;

const binaryHandler: BinaryHandler = (await callAwaiter.awaitCall('connect'))[2];

// simulate a network hope with a sleep
await sleep(1);
const contents = fs.readFileSync('testdata/archive.tgz');
binaryHandler(WebSocketHandler.StdoutStream, contents);

// simulate a network hope with a sleep
await sleep(1);
const status: V1Status = {
status: 'Success',
};
binaryHandler(WebSocketHandler.StatusStream, Buffer.from(JSON.stringify(status)));

await promise;

expect(lastErr).to.be.undefined;
expect(complete).to.be.true;

const found = fs.readFileSync(path.join(tmpDir, 'archive.txt')).toString('utf8');
const expected = fs.readFileSync('testdata/archive.txt').toString('utf8');
expect(found).to.eq(expected);
});
});

describe('cpToPod', () => {
it('should run extract tar command to a url', async () => {
// make the compile happy
if (!tmpDir) {
throw new Error('tmpDir not initialized');
}

const kc = new KubeConfig();
const fakeWebSocketInterface: WebSocketInterface = mock(WebSocketHandler);
const fakeWebSocket: WebSocket.WebSocket = mock(WebSocket) as WebSocket.WebSocket;
Expand All @@ -52,10 +121,11 @@ describe('Cp', () => {
const namespace = 'somenamespace';
const pod = 'somepod';
const container = 'container';
const srcPath = 'testdata/archive.txt';
const srcPath = 'archive.txt';
const tgtPath = '/';
const cmdArray = ['tar', 'xf', '-', '-C', tgtPath];
const path = `/api/v1/namespaces/${namespace}/pods/${pod}/exec`;
const cwd = 'testdata/';
const queryPath = `/api/v1/namespaces/${namespace}/pods/${pod}/exec`;

const query = {
stdout: false,
Expand All @@ -68,14 +138,56 @@ describe('Cp', () => {
const queryStr = querystring.stringify(query);

const fakeConn: WebSocket.WebSocket = instance(fakeWebSocket);
when(fakeWebSocketInterface.connect(`${path}?${queryStr}`, null, anyFunction())).thenResolve(
fakeConn,
when(fakeWebSocketInterface.connect(`${queryPath}?${queryStr}`, null, anyFunction())).thenCall(
callAwaiter.resolveCall('connect', fakeConn),
);
when(fakeWebSocket.send(anything())).thenCall(callAwaiter.resolveCall('send'));

const outFilename = path.join(tmpDir, 'send-data.tar');
const out = fs.createWriteStream(outFilename);
when(fakeWebSocket.send(anything())).thenCall((data) => {
const streamNum = data.readInt8(0);
if (streamNum === WebSocketHandler.StdinStream) {
out.write(data.subarray(1));
} else {
console.log(streamNum);
}
});

when(fakeWebSocket.close()).thenCall(callAwaiter.resolveCall('close'));

await cp.cpToPod(namespace, pod, container, srcPath, tgtPath);
verify(fakeWebSocketInterface.connect(`${path}?${queryStr}`, null, anyFunction())).called();
let complete = false;
let lastErr = undefined;
const promise = cp
.cpToPod(namespace, pod, container, srcPath, tgtPath, cwd)
.then(() => {
complete = true;
})
.catch((err) => {
lastErr = err;
});
expect(lastErr).to.be.undefined;
expect(complete).to.be.false;

const binaryHandler: BinaryHandler = (await callAwaiter.awaitCall('connect'))[2];

// wait for all data to be written and close called
await callAwaiter.awaitCall('close');
out.close();
await tar.x({ f: outFilename, cwd: tmpDir });

// simulate a network hope with a sleep
await sleep(1);
const status: V1Status = {
status: 'Success',
};
binaryHandler(WebSocketHandler.StatusStream, Buffer.from(JSON.stringify(status)));

await promise;

expect(lastErr).to.be.undefined;
expect(complete).to.be.true;

verify(fakeWebSocketInterface.connect(`${queryPath}?${queryStr}`, null, anyFunction())).called();
});
});
});
4 changes: 4 additions & 0 deletions src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,3 +194,7 @@ export const resolvablePromise = <T>(): ResolvablePromise<T> => {
promise.reject = reject!;
return promise;
};

export const sleep = (ms: number): Promise<void> => {
return new Promise<void>((resolve) => setTimeout(resolve));
};
7 changes: 5 additions & 2 deletions src/web-socket-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ import { KubeConfig } from './config';

const protocols = ['v4.channel.k8s.io', 'v3.channel.k8s.io', 'v2.channel.k8s.io', 'channel.k8s.io'];

export type TextHandler = (text: string) => boolean;
export type BinaryHandler = (stream: number, buff: Buffer) => boolean;

export interface WebSocketInterface {
connect(
path: string,
textHandler: ((text: string) => boolean) | null,
binaryHandler: ((stream: number, buff: Buffer) => boolean) | null,
textHandler: TextHandler | null,
binaryHandler: BinaryHandler | null,
): Promise<WebSocket.WebSocket>;
}

Expand Down
9 changes: 6 additions & 3 deletions test/call-awaiter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ import { EventEmitter } from 'events';
export class CallAwaiter extends EventEmitter {
public awaitCall(event: string) {
return new Promise<any[]>((resolve) => {
this.once(event, resolve);
this.once(event, (...args: any[]) => resolve(args));
});
}

public resolveCall(event: string) {
return (...args: any[]) => this.emit(event, ...args);
public resolveCall(event: string, returnValue?: any) {
return (...args: any[]) => {
this.emit(event, ...args);
return returnValue;
}
}
}
Binary file added testdata/archive.tgz
Binary file not shown.

0 comments on commit 4117590

Please sign in to comment.