Skip to content
This repository has been archived by the owner on Aug 11, 2020. It is now read-only.

Commit

Permalink
quic: implement sendFD() support
Browse files Browse the repository at this point in the history
Fixes: #75
PR-URL: #150
Reviewed-By: James M Snell <jasnell@gmail.com>
  • Loading branch information
addaleax committed Oct 11, 2019
1 parent 27cfb37 commit a19682e
Show file tree
Hide file tree
Showing 6 changed files with 438 additions and 4 deletions.
50 changes: 49 additions & 1 deletion doc/api/quic.md
Original file line number Diff line number Diff line change
Expand Up @@ -943,7 +943,7 @@ socket.on('ready', () => {
});
```

#### Call Results#
#### Call Results

A call on a socket that is not ready to send or no longer open may throw a
Not running Error.
Expand Down Expand Up @@ -1106,6 +1106,54 @@ added: REPLACEME

The `QuicServerSession` or `QuicClientSession`.

### quicstream.sendFD(fd[, options])
<!-- YAML
added: REPLACEME
-->

* `fd` {number|FileHandle} A readable file descriptor.
* `options` {Object}
* `offset` {number} The offset position at which to begin reading.
Default: `-1`.
* `length` {number} The amount of data from the fd to send.
Default: `-1`.

Instead of using a `Quicstream` as a writable stream, send data from a given file
descriptor.

If `offset` is set to a non-negative number, reading starts from that position
and the file offset will not be advanced.
If `length` is set to a non-negative number, it gives the maximum number of
bytes that are read from the file.

The file descriptor or `FileHandle` is not closed when the stream is closed,
so it will need to be closed manually once it is no longer needed.
Using the same file descriptor concurrently for multiple streams
is not supported and may result in data loss. Re-using a file descriptor
after a stream has finished is supported.

### quicstream.sendFile(path[, options])
<!-- YAML
added: REPLACEME
-->

* `path` {string|Buffer|URL}
* `options` {Object}
* `onError` {Function} Callback function invoked in the case of an
error before send.
* `offset` {number} The offset position at which to begin reading.
Default: `-1`.
* `length` {number} The amount of data from the fd to send.
Default: `-1`.

Instead of using a `QuicStream` as a writable stream, send data from a given file
path.

The `options.onError` callback will be called if the file could not be opened.
If `offset` is set to a non-negative number, reading starts from that position.
If `length` is set to a non-negative number, it gives the maximum number of
bytes that are read from the file.

### quicstream.unidirectional
<!-- YAML
added: REPLACEME
Expand Down
95 changes: 92 additions & 3 deletions lib/internal/quic/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@ const {
validateQuicClientSessionOptions,
validateQuicSocketOptions,
} = require('internal/quic/util');
const { validateNumber } = require('internal/validators');
const util = require('util');
const assert = require('internal/assert');
const EventEmitter = require('events');
const fs = require('fs');
const fsPromisesInternal = require('internal/fs/promises');
const { Duplex } = require('stream');
const {
createSecureContext: _createSecureContext
Expand All @@ -32,7 +35,7 @@ const {
translatePeerCertificate
} = require('_tls_common');
const {
defaultTriggerAsyncIdScope, // eslint-disable-line no-unused-vars
defaultTriggerAsyncIdScope,
symbols: {
async_id_symbol,
owner_symbol,
Expand All @@ -52,14 +55,15 @@ const {

const {
ShutdownWrap,
kReadBytesOrError, // eslint-disable-line no-unused-vars
streamBaseState // eslint-disable-line no-unused-vars
kReadBytesOrError,
streamBaseState
} = internalBinding('stream_wrap');

const {
codes: {
ERR_INVALID_ARG_TYPE,
ERR_INVALID_ARG_VALUE,
ERR_INVALID_OPT_VALUE,
ERR_INVALID_CALLBACK,
ERR_OUT_OF_RANGE,
ERR_QUIC_ERROR,
Expand All @@ -78,6 +82,10 @@ const {
exceptionWithHostPort
} = require('internal/errors');

const { FileHandle } = internalBinding('fs');
const { StreamPipe } = internalBinding('stream_pipe');
const { UV_EOF } = internalBinding('uv');

const {
QuicSocket: QuicSocketHandle,
initSecureContext,
Expand Down Expand Up @@ -2253,6 +2261,87 @@ class QuicStream extends Duplex {
streamOnResume.call(this);
}

sendFile(path, options = {}) {
fs.open(path, 'r', QuicStream.#onFileOpened.bind(this, options));
}

static #onFileOpened = function(options, err, fd) {
const onError = options.onError;
if (err) {
if (onError) {
this.close();
onError(err);
} else {
this.destroy(err);
}
return;
}

if (this.destroyed || this.closed) {
fs.close(fd, (err) => { if (err) throw err; });
return;
}

this.sendFD(fd, options, true);
}

sendFD(fd, { offset = -1, length = -1 } = {}, ownsFd = false) {
if (this.destroyed || this.#closed)
return;

if (typeof offset !== 'number')
throw new ERR_INVALID_OPT_VALUE('options.offset', offset);
if (typeof length !== 'number')
throw new ERR_INVALID_OPT_VALUE('options.length', length);

if (fd instanceof fsPromisesInternal.FileHandle)
fd = fd.fd;
else if (typeof fd !== 'number')
throw new ERR_INVALID_ARG_TYPE('fd', ['number', 'FileHandle'], fd);

this[kUpdateTimer]();
this.ownsFd = ownsFd;

// Close the writable side of the stream, but only as far as the writable
// stream implementation is concerned.
this._final = null;
this.end();

defaultTriggerAsyncIdScope(this[async_id_symbol],
QuicStream.#startFilePipe,
this, fd, offset, length);
}

static #startFilePipe = (stream, fd, offset, length) => {
const handle = new FileHandle(fd, offset, length);
handle.onread = QuicStream.#onPipedFileHandleRead;
handle.stream = stream;

const pipe = new StreamPipe(handle, stream[kHandle]);
pipe.onunpipe = QuicStream.#onFileUnpipe;
pipe.start();

// Exact length of the file doesn't matter here, since the
// stream is closing anyway - just use 1 to signify that
// a write does exist
stream[kTrackWriteState](stream, 1);
}

static #onFileUnpipe = function() { // Called on the StreamPipe instance.
const stream = this.sink[owner_symbol];
if (stream.ownsFd)
this.source.close().catch(stream.destroy.bind(stream));
else
this.source.releaseFD();
}

static #onPipedFileHandleRead = function() {
const err = streamBaseState[kReadBytesOrError];
if (err < 0 && err !== UV_EOF) {
this.stream.destroy(errnoException(err, 'sendFD'));
}
}

get resetReceived() {
return (this.#resetCode !== undefined) ?
{ code: this.#resetCode | 0, finalSize: this.#resetFinalSize | 0 } :
Expand Down
102 changes: 102 additions & 0 deletions test/parallel/test-quic-send-fd.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
'use strict';
const common = require('../common');
if (!common.hasQuic)
common.skip('missing quic');

const assert = require('assert');
const quic = require('quic');
const fs = require('fs');

const fixtures = require('../common/fixtures');
const key = fixtures.readKey('agent1-key.pem', 'binary');
const cert = fixtures.readKey('agent1-cert.pem', 'binary');
const ca = fixtures.readKey('ca1-cert.pem', 'binary');

const variants = [];
for (const variant of ['sendFD', 'sendFile', 'sendFD+fileHandle']) {
for (const offset of [-1, 0, 100]) {
for (const length of [-1, 100]) {
variants.push({ variant, offset, length });
}
}
}

for (const { variant, offset, length } of variants) {
const server = quic.createSocket({ port: 0, validateAddress: true });
let fd;

server.listen({
key,
cert,
ca,
rejectUnauthorized: false,
maxCryptoBuffer: 4096,
alpn: 'meow'
});

server.on('session', common.mustCall((session) => {
session.on('secure', common.mustCall((servername, alpn, cipher) => {
const stream = session.openStream({ halfOpen: false });

stream.on('data', common.mustNotCall());
stream.on('finish', common.mustCall());
stream.on('close', common.mustCall());
stream.on('end', common.mustCall());

if (variant === 'sendFD') {
fd = fs.openSync(__filename, 'r');
stream.sendFD(fd, { offset, length });
} else if (variant === 'sendFD+fileHandle') {
fs.promises.open(__filename, 'r').then(common.mustCall((handle) => {
fd = handle;
stream.sendFD(handle, { offset, length });
}));
} else {
assert.strictEqual(variant, 'sendFile');
stream.sendFile(__filename, { offset, length });
}
}));

session.on('close', common.mustCall());
}));

server.on('ready', common.mustCall(() => {
const client = quic.createSocket({
port: 0,
client: {
key,
cert,
ca,
alpn: 'meow'
}
});

const req = client.connect({
address: 'localhost',
port: server.address.port
});

req.on('stream', common.mustCall((stream) => {
const data = [];
stream.on('data', (chunk) => data.push(chunk));
stream.on('end', common.mustCall(() => {
let expectedContent = fs.readFileSync(__filename);
if (offset !== -1) expectedContent = expectedContent.slice(offset);
if (length !== -1) expectedContent = expectedContent.slice(0, length);
assert.deepStrictEqual(Buffer.concat(data), expectedContent);

stream.end();
client.close();
server.close();
if (fd !== undefined) {
if (fd.close) fd.close().then(common.mustCall());
else fs.closeSync(fd);
}
}));
}));

req.on('close', common.mustCall());
}));

server.on('close', common.mustCall());
}
63 changes: 63 additions & 0 deletions test/parallel/test-quic-send-file-close-before-open.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
'use strict';
const common = require('../common');
if (!common.hasQuic)
common.skip('missing quic');

const quic = require('quic');
const fs = require('fs');

const fixtures = require('../common/fixtures');
const key = fixtures.readKey('agent1-key.pem', 'binary');
const cert = fixtures.readKey('agent1-cert.pem', 'binary');
const ca = fixtures.readKey('ca1-cert.pem', 'binary');

const server = quic.createSocket({ port: 0, validateAddress: true });

server.listen({
key,
cert,
ca,
rejectUnauthorized: false,
maxCryptoBuffer: 4096,
alpn: 'meow'
});

server.on('session', common.mustCall((session) => {
session.on('secure', common.mustCall((servername, alpn, cipher) => {
const stream = session.openStream({ halfOpen: false });

fs.open = common.mustCall(fs.open);
fs.close = common.mustCall(fs.close);

stream.sendFile(__filename);
stream.destroy(); // Destroy the stream before opening the fd finishes.

session.close();
server.close();
}));

session.on('close', common.mustCall());
}));

server.on('ready', common.mustCall(() => {
const client = quic.createSocket({
port: 0,
client: {
key,
cert,
ca,
alpn: 'meow'
}
});

const req = client.connect({
address: 'localhost',
port: server.address.port
});

req.on('stream', common.mustNotCall());

req.on('close', common.mustCall(() => client.close()));
}));

server.on('close', common.mustCall());
Loading

0 comments on commit a19682e

Please sign in to comment.