diff --git a/src/fsa-to-node/FsaNodeFs.ts b/src/fsa-to-node/FsaNodeFs.ts index 54476a76e..42b0671d1 100644 --- a/src/fsa-to-node/FsaNodeFs.ts +++ b/src/fsa-to-node/FsaNodeFs.ts @@ -48,6 +48,7 @@ import { FsaNodeStats } from './FsaNodeStats'; import process from '../process'; import { FsSynchronousApi } from '../node/types/FsSynchronousApi'; import { FsaNodeWriteStream } from './FsaNodeWriteStream'; +import { FsaNodeReadStream } from './FsaNodeReadStream'; import { FsaNodeCore } from './FsaNodeCore'; import type { FsCallbackApi, FsPromisesApi } from '../node/types'; import type * as misc from '../node/types/misc'; @@ -758,11 +759,32 @@ export class FsaNodeFs extends FsaNodeCore implements FsCallbackApi, FsSynchrono return stream; }; + public readonly createReadStream: FsCallbackApi['createReadStream'] = (path: misc.PathLike, options?: opts.IReadStreamOptions | string): misc.IReadStream => { + const defaults: opts.IReadStreamOptions = { + flags: 'r', + fd: null, + mode: 0o666, + autoClose: true, + emitClose: true, + start: 0, + end: Infinity, + highWaterMark: 64 * 1024, + fs: null, + signal: null, + }; + const optionsObj: opts.IReadStreamOptions = getOptions(defaults, options); + const filename = pathToFilename(path); + const flags = flagsToNumber(optionsObj.flags); + const fd: number = optionsObj.fd ? (typeof optionsObj.fd === 'number' ? optionsObj.fd : optionsObj.fd.fd) : 0; + const handle = fd ? this.getFileByFdAsync(fd) : this.__open(filename, flags, 0); + const stream = new FsaNodeReadStream(this, handle, filename, optionsObj); + return stream; + }; + public readonly symlink: FsCallbackApi['symlink'] = notSupported; public readonly link: FsCallbackApi['link'] = notSupported; public readonly watchFile: FsCallbackApi['watchFile'] = notSupported; public readonly unwatchFile: FsCallbackApi['unwatchFile'] = notSupported; - public readonly createReadStream: FsCallbackApi['createReadStream'] = notSupported; public readonly watch: FsCallbackApi['watch'] = notSupported; // --------------------------------------------------------- FsSynchronousApi @@ -1033,10 +1055,10 @@ export class FsaNodeFs extends FsaNodeCore implements FsCallbackApi, FsSynchrono public readonly Dirent = FsaNodeDirent; public readonly Stats = FsaNodeStats; public readonly WriteStream = FsaNodeWriteStream; + public readonly ReadStream = FsaNodeReadStream; public readonly StatFs = 0 as any; public readonly Dir = 0 as any; public readonly StatsWatcher = 0 as any; public readonly FSWatcher = 0 as any; - public readonly ReadStream = 0 as any; } diff --git a/src/fsa-to-node/FsaNodeReadStream.ts b/src/fsa-to-node/FsaNodeReadStream.ts new file mode 100644 index 000000000..b36807162 --- /dev/null +++ b/src/fsa-to-node/FsaNodeReadStream.ts @@ -0,0 +1,93 @@ +import { Readable } from 'stream'; +import { Defer } from 'thingies/es6/Defer'; +import { concurrency } from 'thingies/es6/concurrency'; +import type { FsaNodeFsOpenFile } from './FsaNodeFsOpenFile'; +import type { IReadStream } from '../node/types/misc'; +import type { IReadStreamOptions } from '../node/types/options'; +import type { FsaNodeFs } from './FsaNodeFs'; + +export class FsaNodeReadStream extends Readable implements IReadStream { + protected __pending__: boolean = true; + protected __closed__: boolean = false; + protected __bytes__: number = 0; + protected readonly __mutex__ = concurrency(1); + protected readonly __file__ = new Defer(); + + public constructor( + protected readonly fs: FsaNodeFs, + protected readonly handle: Promise, + public readonly path: string, + protected readonly options: IReadStreamOptions, + ) { + super(); + handle + .then((file) => { + if (this.__closed__) return; + this.__file__.resolve(file); + if (this.options.fd !== undefined) this.emit('open', file.fd); + this.emit('ready'); + }) + .catch(error => { + this.__file__.reject(error); + }) + .finally(() => { + this.__pending__ = false; + }); + } + + private async __read__(): Promise { + return await this.__mutex__(async () => { + if (this.__closed__) return; + const {file} = await this.__file__.promise; + const blob = await file.getFile(); + const buffer = await blob.arrayBuffer(); + const start = this.options.start || 0; + let end = typeof this.options.end === 'number' ? this.options.end + 1 : buffer.byteLength; + if (end > buffer.byteLength) end = buffer.byteLength; + const uint8 = new Uint8Array(buffer, start, end - start); + return uint8; + }); + } + + private __close__(): void { + if (this.__closed__) return; + this.__closed__ = true; + if (this.options.autoClose) { + this.__file__.promise + .then(file => { + this.fs.close(file.fd, () => { + this.emit('close'); + }) + return file.close(); + }) + .catch(error => {}); + } + } + + // -------------------------------------------------------------- IReadStream + + public get bytesRead(): number { + return this.__bytes__; + } + + public get pending(): boolean { + return this.__pending__; + } + + // ----------------------------------------------------------------- Readable + + _read() { + this.__read__() + .then((uint8: Uint8Array) => { + if (this.__closed__) return; + if (!uint8) return this.push(null); + this.__bytes__ += uint8.length; + this.__close__(); + this.push(uint8); + this.push(null); + }, (error) => { + this.__close__(); + this.destroy(error); + }); + } +} diff --git a/src/fsa-to-node/FsaNodeWriteStream copy.ts b/src/fsa-to-node/FsaNodeWriteStream copy.ts deleted file mode 100644 index de25dbba4..000000000 --- a/src/fsa-to-node/FsaNodeWriteStream copy.ts +++ /dev/null @@ -1,25 +0,0 @@ -import { Readable } from 'stream'; -import type { IReadStream } from '../node/types/misc'; - -export class FsaNodeReadStream extends Readable implements IReadStream { - public constructor() { - super(); - } - - // -------------------------------------------------------------- IReadStream - public get bytesRead(): number { - throw new Error('Method not implemented.'); - } - - public get path(): string | Buffer { - throw new Error('Method not implemented.'); - } - - public get pending(): boolean { - throw new Error('Method not implemented.'); - } - - // ----------------------------------------------------------------- Readable - - _read(size: number) {} -} diff --git a/src/fsa-to-node/__tests__/FsaNodeFs.test.ts b/src/fsa-to-node/__tests__/FsaNodeFs.test.ts index 318d79ca7..493a98f0d 100644 --- a/src/fsa-to-node/__tests__/FsaNodeFs.test.ts +++ b/src/fsa-to-node/__tests__/FsaNodeFs.test.ts @@ -816,4 +816,70 @@ onlyOnNode20('FsaNodeFs', () => { } }); }); + + describe('.createReadStream()', () => { + test('can pipe fs.ReadStream to fs.WriteStream', async () => { + const { fs, mfs } = setup({ folder: { file: 'test' }, 'empty-folder': null, 'f.html': 'test' }); + const readStream = fs.createReadStream('/folder/file'); + const writeStream = fs.createWriteStream('/folder/file2'); + readStream.pipe(writeStream); + await new Promise(resolve => writeStream.once('close', resolve)); + expect(mfs.__vol.toJSON()).toStrictEqual({ + '/mountpoint/folder/file': 'test', + '/mountpoint/folder/file2': 'test', + '/mountpoint/empty-folder': null, + '/mountpoint/f.html': 'test', + }); + }); + + test('emits "open" event', async () => { + const { fs } = setup({ folder: { file: 'test' }, 'empty-folder': null, 'f.html': 'test' }); + const readStream = fs.createReadStream('/folder/file'); + const fd = await new Promise(resolve => readStream.once('open', resolve)); + expect(typeof fd).toBe('number'); + }); + + test('emits "ready" event', async () => { + const { fs } = setup({ folder: { file: 'test' }, 'empty-folder': null, 'f.html': 'test' }); + const readStream = fs.createReadStream('/folder/file'); + await new Promise(resolve => readStream.once('ready', resolve)); + }); + + test('emits "close" event', async () => { + const { fs } = setup({ folder: { file: 'test' }, 'empty-folder': null, 'f.html': 'test' }); + const readStream = fs.createReadStream('/folder/file', {emitClose: true}); + const writeStream = fs.createWriteStream('/folder/file2'); + readStream.pipe(writeStream); + await new Promise(resolve => readStream.once('close', resolve)); + }); + + test('can write to already open file', async () => { + const { fs, mfs } = setup({ folder: { file: 'test' }, 'empty-folder': null, 'f.html': 'test' }); + const handle = await fs.promises.open('/folder/file'); + const readStream = fs.createReadStream('xyz', {fd: handle.fd}); + const writeStream = fs.createWriteStream('/folder/file2'); + readStream.pipe(writeStream); + await new Promise(resolve => writeStream.once('close', resolve)); + expect(mfs.__vol.toJSON()).toStrictEqual({ + '/mountpoint/folder/file': 'test', + '/mountpoint/folder/file2': 'test', + '/mountpoint/empty-folder': null, + '/mountpoint/f.html': 'test', + }); + }); + + test('can read a specified slice of a file', async () => { + const { fs, mfs } = setup({ folder: { file: 'test' }, 'empty-folder': null, 'f.html': 'test' }); + const readStream = fs.createReadStream('/folder/file', {start: 1, end: 2}); + const writeStream = fs.createWriteStream('/folder/file2'); + readStream.pipe(writeStream); + await new Promise(resolve => writeStream.once('close', resolve)); + expect(mfs.__vol.toJSON()).toStrictEqual({ + '/mountpoint/folder/file': 'test', + '/mountpoint/folder/file2': 'es', + '/mountpoint/empty-folder': null, + '/mountpoint/f.html': 'test', + }); + }); + }); }); diff --git a/src/node/types/options.ts b/src/node/types/options.ts index 5b4edacef..9ec0b9237 100644 --- a/src/node/types/options.ts +++ b/src/node/types/options.ts @@ -60,11 +60,11 @@ export interface IWatchFileOptions { interval?: number; } -export interface IReadStreamOptions { +export interface IReadStreamOptions extends IOptions { /** Defaults to `'r'`. */ flags?: TFlags; /** Defaults to `null`. */ - encoding?: BufferEncoding | null; + encoding?: BufferEncoding; /** Defaults to `null`. */ fd?: number | IFileHandle | null; /** Defaults to 0o666 */