Skip to content

Commit

Permalink
feat: 🎸 improve write stream, better flag handling
Browse files Browse the repository at this point in the history
  • Loading branch information
streamich committed Jun 20, 2023
1 parent b80f7b7 commit 531f2a7
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 26 deletions.
52 changes: 33 additions & 19 deletions src/fsa-to-node/FsaNodeFs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
getRealpathOptsAndCb,
writeFileDefaults,
getWriteFileOptions,
getOptions,
} from '../node/options';
import {
bufferToEncoding,
Expand Down Expand Up @@ -152,6 +153,16 @@ export class FsaNodeFs implements FsCallbackApi, FsSynchronousApi, FsCommonObjec
return await dir.getFileHandle(name, { create: true });
}

private async __open(filename: string, flags: number, mode: number): Promise<FsaNodeFsOpenFile> {
const [folder, name] = pathToLocation(filename);
const createIfMissing = !!(flags & FLAG.O_CREAT);
const fsaFile = await this.getFile(folder, name, 'open', createIfMissing);
const fd = this.newFdNumber();
const file = new FsaNodeFsOpenFile(fd, mode, flags, fsaFile);
this.fds.set(fd, file);
return file;
}

// ------------------------------------------------------------ FsCallbackApi

public readonly open: FsCallbackApi['open'] = (
Expand All @@ -170,16 +181,7 @@ export class FsaNodeFs implements FsCallbackApi, FsSynchronousApi, FsCommonObjec
const modeNum = modeToNumber(mode);
const filename = pathToFilename(path);
const flagsNum = flagsToNumber(flags);
const [folder, name] = pathToLocation(filename);
const createIfMissing = !!(flagsNum & FLAG.O_CREAT);
this.getFile(folder, name, 'open', createIfMissing)
.then(file => {
const fd = this.newFdNumber();
const openFile = new FsaNodeFsOpenFile(fd, modeNum, flagsNum, file);
this.fds.set(fd, openFile);
callback(null, fd);
})
.catch(error => callback(error));
this.__open(filename, flagsNum, modeNum).then(openFile => callback(null, openFile.fd), error => callback(error));
};

public readonly close: FsCallbackApi['close'] = (fd: number, callback: misc.TCallback<void>): void => {
Expand Down Expand Up @@ -822,17 +824,29 @@ export class FsaNodeFs implements FsCallbackApi, FsSynchronousApi, FsCommonObjec
path: misc.PathLike,
options?: opts.IWriteStreamOptions | string,
): FsaNodeWriteStream => {
const optionsObj: opts.IWriteStreamOptions = !options
? {}
: typeof options === 'object'
? options
: ({ encoding: options } as opts.IWriteStreamOptions);
const defaults: opts.IWriteStreamOptions = {
encoding: 'utf8',
flags: 'w',
autoClose: true,
emitClose: true
};
const optionsObj: opts.IWriteStreamOptions = getOptions(defaults, options);
const filename = pathToFilename(path);
const location = pathToLocation(filename);
const flags = flagsToNumber(optionsObj.flags ?? 'w');
const createIfMissing = !!(flags & FLAG.O_CREAT);
const handle = this.getFile(location[0], location[1], 'createWriteStream', createIfMissing);
return new FsaNodeWriteStream(handle, filename, optionsObj);
const fd: number = optionsObj.fd ? typeof optionsObj.fd === 'number' ? optionsObj.fd : optionsObj.fd.fd : 0;
const handle = fd
? this.getFileByFd(fd)
: this.__open(filename, flags, 0)
const stream = new FsaNodeWriteStream(handle, filename, optionsObj);
if (optionsObj.autoClose) {
stream.once('finish', () => {
handle.then(file => this.close(file.fd, () => {}));
});
stream.once('error', () => {
handle.then(file => this.close(file.fd, () => {}));
});
}
return stream;
};

public readonly symlink: FsCallbackApi['symlink'] = notSupported;
Expand Down
37 changes: 30 additions & 7 deletions src/fsa-to-node/FsaNodeWriteStream.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import { Writable } from 'stream';
import { Defer } from 'thingies/es6/Defer';
import { concurrency } from 'thingies/es6/concurrency';
import type { IFileSystemFileHandle, IFileSystemWritableFileStream } from '../fsa/types';
import {flagsToNumber} from '../node/util';
import {FLAG} from '../consts/FLAG';
import {FsaNodeFsOpenFile} from './FsaNodeFsOpenFile';
import type { IFileSystemWritableFileStream } from '../fsa/types';
import type { IWriteStream } from '../node/types/misc';
import type { IWriteStreamOptions } from '../node/types/options';

Expand All @@ -20,6 +23,9 @@ import type { IWriteStreamOptions } from '../node/types/options';
* file only once the stream is closed. The downside is that the written data
* is not immediately visible to other processes (because it is written to the
* swap file), but that is the trade-off we have to make.
*
* @todo Could make this flush the data to the original file periodically, so that
* the data is visible to other processes.
*/
export class FsaNodeWriteStream extends Writable implements IWriteStream {
protected __pending__: boolean = true;
Expand All @@ -29,16 +35,32 @@ export class FsaNodeWriteStream extends Writable implements IWriteStream {
protected readonly __mutex__ = concurrency(1);

public constructor(
handle: Promise<IFileSystemFileHandle>,
handle: Promise<FsaNodeFsOpenFile>,
public readonly path: string,
protected readonly options?: IWriteStreamOptions,
protected readonly options: IWriteStreamOptions,
) {
super();
if (options.start !== undefined) {
if (typeof options.start !== 'number') {
throw new TypeError('"start" option must be a Number');
}
if (options.start < 0) {
throw new TypeError('"start" must be >= zero');
}
}
const stream = new Defer<IFileSystemWritableFileStream>();
this.__stream__ = stream.promise;
(async () => {
const fsaHandle = await handle;
const writable = await fsaHandle.createWritable({keepExistingData: true});
const fileWasOpened = !options.fd;
if (fileWasOpened) this.emit('open', fsaHandle.fd);
const flags = flagsToNumber(options.flags ?? 'w');
const keepExistingData = flags & FLAG.O_APPEND ? true : false;
const writable = await fsaHandle.file.createWritable({keepExistingData});
if (keepExistingData) {
const start = Number(options.start ?? 0);
if (start) await writable.seek(start);
}
this.__pending__ = false;
stream.resolve(writable);
})().catch(error => {
Expand All @@ -59,19 +81,20 @@ export class FsaNodeWriteStream extends Writable implements IWriteStream {
}

private async __close__(): Promise<void> {
const emitClose = this.options.emitClose;
await this.__mutex__(async () => {
if (this.__closed__) {
if (this.__closed__ && emitClose) {
process.nextTick(() => this.emit('close'));
return;
}
try {
const writable = await this.__stream__;
this.__closed__ = true;
await writable.close();
this.emit('close');
if (emitClose) this.emit('close');
} catch (error) {
this.emit('error', error);
this.emit('close', error);
if (emitClose) this.emit('close', error);
}
});
}
Expand Down
117 changes: 117 additions & 0 deletions src/fsa-to-node/__tests__/FsaNodeFs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { AMODE } from '../../consts/AMODE';
import { nodeToFsa } from '../../node-to-fsa';
import { IDirent, IStats } from '../../node/types/misc';
import { FsaNodeFs } from '../FsaNodeFs';
import { tick, until, of } from 'thingies';

const setup = (json: NestedDirectoryJSON | null = null, mode: 'read' | 'readwrite' = 'readwrite') => {
const mfs = memfs({ mountpoint: json }) as IFsWithVolume;
Expand Down Expand Up @@ -649,4 +650,120 @@ describe('.createWriteStream()', () => {
'/mountpoint/f.html': 'test',
});
});

test('can use stream to overwrite existing file', async () => {
const { fs, mfs } = setup({ folder: { file: 'test' }, 'empty-folder': null, 'f.html': 'test' });
const stream = fs.createWriteStream('/folder/file');
stream.write(Buffer.from('A'));
stream.write(Buffer.from('BC'));
stream.end();
await new Promise(resolve => stream.once('close', resolve));
expect(stream.bytesWritten).toBe(3);
expect(mfs.__vol.toJSON()).toStrictEqual({
'/mountpoint/folder/file': 'ABC',
'/mountpoint/empty-folder': null,
'/mountpoint/f.html': 'test',
});
});

test('can write by file descriptor', async () => {
const { fs, mfs } = setup({ folder: { file: 'test' }, 'empty-folder': null, 'f.html': 'test' });
const handle = await fs.promises.open('/folder/file', 'a');
const stream = fs.createWriteStream('', { fd: handle.fd, start: 1, flags: 'a' });
stream.write(Buffer.from('BC'));
stream.end();
await new Promise(resolve => stream.once('close', resolve));
expect(stream.bytesWritten).toBe(2);
expect(mfs.__vol.toJSON()).toStrictEqual({
'/mountpoint/folder/file': 'tBCt',
'/mountpoint/empty-folder': null,
'/mountpoint/f.html': 'test',
});
});

test('closes file once stream ends', async () => {
const { fs } = setup({ folder: { file: 'test' }, 'empty-folder': null, 'f.html': 'test' });
const handle = await fs.promises.open('/folder/file', 'a');
const stream = fs.createWriteStream('', { fd: handle.fd, start: 1, flags: 'a' });
stream.write(Buffer.from('BC'));
const stat = async () => await new Promise((resolve, reject) => fs.fstat(handle.fd, (err, stats) => {
if (err) reject(err);
else resolve(stats);
}));
await stat();
stream.end();
await until(async () => {
const [, error] = await of(stat());
return !!error;
});
const [, error] = await of(stat());
expect((error as any).code).toBe('EBADF');
});

test('does not close file if "autoClose" is false', async () => {
const { fs } = setup({ folder: { file: 'test' }, 'empty-folder': null, 'f.html': 'test' });
const handle = await fs.promises.open('/folder/file', 'a');
const stream = fs.createWriteStream('', { fd: handle.fd, start: 1, flags: 'a', autoClose: false });
stream.write(Buffer.from('BC'));
const stat = async () => await new Promise((resolve, reject) => fs.fstat(handle.fd, (err, stats) => {
if (err) reject(err);
else resolve(stats);
}));
await stat();
stream.end();
await tick(200);
await stat();
});

test('can use stream to add to existing file', async () => {
const { fs, mfs } = setup({ folder: { file: 'test' }, 'empty-folder': null, 'f.html': 'test' });
const stream = fs.createWriteStream('/folder/file', {flags: 'a'});
stream.write(Buffer.from('A'));
stream.write(Buffer.from('BC'));
stream.end();
await new Promise(resolve => stream.once('close', resolve));
expect(stream.bytesWritten).toBe(3);
expect(mfs.__vol.toJSON()).toStrictEqual({
'/mountpoint/folder/file': 'ABCt',
'/mountpoint/empty-folder': null,
'/mountpoint/f.html': 'test',
});
});

test('can use stream to add to existing file at specified offset', async () => {
const { fs, mfs } = setup({ folder: { file: 'test' }, 'empty-folder': null, 'f.html': 'test' });
const stream = fs.createWriteStream('/folder/file', {flags: 'a', start: 1});
stream.write(Buffer.from('A'));
stream.write(Buffer.from('B'));
stream.end();
await new Promise(resolve => stream.once('close', resolve));
expect(stream.bytesWritten).toBe(2);
expect(mfs.__vol.toJSON()).toStrictEqual({
'/mountpoint/folder/file': 'tABt',
'/mountpoint/empty-folder': null,
'/mountpoint/f.html': 'test',
});
});

test('throws if "start" option is not a number', async () => {
const { fs } = setup({ folder: { file: 'test' }, 'empty-folder': null, 'f.html': 'test' });
try {
fs.createWriteStream('/folder/file', {flags: 'a', start: '1' as any});
throw new Error('should have thrown');
} catch (error) {
expect(error).toBeInstanceOf(TypeError);
expect(error.message).toBe('"start" option must be a Number');
}
});

test('throws if "start" option is negative', async () => {
const { fs } = setup({ folder: { file: 'test' }, 'empty-folder': null, 'f.html': 'test' });
try {
fs.createWriteStream('/folder/file', {flags: 'a', start: -1});
throw new Error('should have thrown');
} catch (error) {
expect(error).toBeInstanceOf(TypeError);
expect(error.message).toBe('"start" must be >= zero');
}
});
});

0 comments on commit 531f2a7

Please sign in to comment.