Skip to content

Commit

Permalink
feat: implement createReadStream and createWriteStream on `FileHa…
Browse files Browse the repository at this point in the history
…ndle` (streamich#1076)

* feat: implement `createReadStream` and `createWriteStream` on `FileHandle`

Closes streamich#1063

* test for FileHandle#create{Read,Write}Stream
  • Loading branch information
graue authored Dec 9, 2024
1 parent cf16b76 commit c413df5
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 1 deletion.
34 changes: 34 additions & 0 deletions src/__tests__/promises.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { promisify } from 'util';
import { Volume } from '../volume';
import { Readable } from 'stream';

Expand Down Expand Up @@ -82,6 +83,39 @@ describe('Promises API', () => {
});
});
// close(): covered by all other tests
it('supports createReadStream()', done => {
const vol = Volume.fromJSON({
'/test.txt': 'Hello',
});
vol.promises
.open('/test.txt', 'r')
.then(fh => {
const readStream = fh.createReadStream({});
readStream.setEncoding('utf8');
let readData = '';
readStream.on('readable', () => {
const chunk = readStream.read();
if (chunk != null) readData += chunk;
});
readStream.on('end', () => {
expect(readData).toEqual('Hello');
done();
});
})
.catch(err => {
expect(err).toBeNull();
});
});
it('supports createWriteStream()', async () => {
const vol = new Volume();
const fh = await vol.promises.open('/test.txt', 'wx', 0o600);
const writeStream = fh.createWriteStream({});
await promisify(writeStream.write.bind(writeStream))(Buffer.from('Hello'));
await promisify(writeStream.close.bind(writeStream))();
expect(vol.toJSON()).toEqual({
'/test.txt': 'Hello',
});
});
describe('datasync()', () => {
const vol = new Volume();
const { promises } = vol;
Expand Down
10 changes: 9 additions & 1 deletion src/node/FileHandle.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { promisify } from './util';
import type * as opts from './types/options';
import type { IFileHandle, IStats, TData, TDataOut, TMode, TTime } from './types/misc';
import type { IFileHandle, IReadStream, IWriteStream, IStats, TData, TDataOut, TMode, TTime } from './types/misc';
import type { FsCallbackApi } from './types';

export class FileHandle implements IFileHandle {
Expand Down Expand Up @@ -33,6 +33,14 @@ export class FileHandle implements IFileHandle {
return promisify(this.fs, 'fdatasync')(this.fd);
}

createReadStream(options: opts.IFileHandleReadStreamOptions): IReadStream {
return this.fs.createReadStream('', { ...options, fd: this });
}

createWriteStream(options: opts.IFileHandleWriteStreamOptions): IWriteStream {
return this.fs.createWriteStream('', { ...options, fd: this });
}

readableWebStream(options?: opts.IReadableWebStreamOptions): ReadableStream {
return new ReadableStream({
pull: async controller => {
Expand Down
4 changes: 4 additions & 0 deletions src/node/types/misc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import type { EventEmitter } from 'events';
import type { TSetTimeout } from '../../setTimeoutUnref';
import type {
IAppendFileOptions,
IFileHandleReadStreamOptions,
IFileHandleWriteStreamOptions,
IReadableWebStreamOptions,
IReadFileOptions,
IStatOptions,
Expand Down Expand Up @@ -138,6 +140,8 @@ export interface IFileHandle {
chmod(mode: TMode): Promise<void>;
chown(uid: number, gid: number): Promise<void>;
close(): Promise<void>;
createReadStream(options: IFileHandleReadStreamOptions): IReadStream;
createWriteStream(options: IFileHandleWriteStreamOptions): IWriteStream;
datasync(): Promise<void>;
readableWebStream(options?: IReadableWebStreamOptions): ReadableStream;
read(buffer: Buffer | Uint8Array, offset: number, length: number, position: number): Promise<TFileHandleReadResult>;
Expand Down
20 changes: 20 additions & 0 deletions src/node/types/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,26 @@ export interface IReadableWebStreamOptions {
type?: 'bytes' | undefined;
}

export interface IFileHandleReadStreamOptions {
encoding?: BufferEncoding;
autoClose?: boolean;
emitClose?: boolean;
start?: number | undefined;
end?: number;
highWaterMark?: number;
flush?: boolean;
signal?: AbortSignal | undefined;
}

export interface IFileHandleWriteStreamOptions {
encoding?: BufferEncoding;
autoClose?: boolean;
emitClose?: boolean;
start?: number;
highWaterMark?: number;
flush?: boolean;
}

export interface IReaddirOptions extends IOptions {
recursive?: boolean;
withFileTypes?: boolean;
Expand Down

0 comments on commit c413df5

Please sign in to comment.