Skip to content

Commit

Permalink
New API to send data from Node throught SCTP DataConsumer (versatica#444
Browse files Browse the repository at this point in the history
)

* New API to send data from Node throught SCTP DataConsumer

* Update CHANGELOG.md

* DataConsumer: onQueuedCallback

* Place static methods above static members

* typo

* Enhance error message

* Enhance error message
  • Loading branch information
jmillan authored Aug 7, 2020
1 parent 8d01ce0 commit fd08981
Show file tree
Hide file tree
Showing 39 changed files with 1,025 additions and 181 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

* `SctpAssociation.cpp`: Fix `OnSctpAssociationBufferedAmount()` call.
* Update deps.
* New API to send data from Node throught SCTP DataConsumer.


### 3.6.15
Expand Down
2 changes: 1 addition & 1 deletion lib/Channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class Channel extends EnhancedEventEmitter_1.EnhancedEventEmitter {
});
}
_processMessage(msg) {
// If a response retrieve its associated request.
// If a response, retrieve its associated request.
if (msg.id) {
const sent = this._sents.get(msg.id);
if (!sent) {
Expand Down
16 changes: 9 additions & 7 deletions lib/DataConsumer.d.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/// <reference types="node" />
import { EnhancedEventEmitter } from './EnhancedEventEmitter';
import { Channel } from './Channel';
import { PayloadChannel } from './PayloadChannel';
Expand Down Expand Up @@ -51,8 +52,6 @@ export declare class DataConsumer extends EnhancedEventEmitter {
private readonly _channel;
private readonly _payloadChannel;
private _closed;
private _bufferedAmountLowThreshold;
private _bufferedAmount;
private readonly _appData?;
private readonly _observer;
/**
Expand Down Expand Up @@ -80,11 +79,6 @@ export declare class DataConsumer extends EnhancedEventEmitter {
* Associated DataProducer id.
*/
get dataProducerId(): string;
/**
* Buffered amount threshold.
*/
get bufferedAmountLowThreshold(): number;
set bufferedAmountLowThreshold(value: number);
/**
* Whether the DataConsumer is closed.
*/
Expand Down Expand Up @@ -137,6 +131,14 @@ export declare class DataConsumer extends EnhancedEventEmitter {
* Get DataConsumer stats.
*/
getStats(): Promise<DataConsumerStat[]>;
/**
* Set buffered amount low threshold.
*/
setBufferedAmountLowThreshold(threshold: number): Promise<void>;
/**
* Send data.
*/
send(message: string | Buffer, ppid?: number): Promise<void>;
/**
* Get buffered amount size.
*/
Expand Down
2 changes: 1 addition & 1 deletion lib/DataConsumer.d.ts.map

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 45 additions & 20 deletions lib/DataConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ class DataConsumer extends EnhancedEventEmitter_1.EnhancedEventEmitter {
super();
// Closed flag.
this._closed = false;
// Buffered amount threshold.
this._bufferedAmountLowThreshold = 0;
// Buffered amount.
this._bufferedAmount = 0;
// Observer instance.
this._observer = new EnhancedEventEmitter_1.EnhancedEventEmitter();
logger.debug('constructor()');
Expand All @@ -44,15 +40,6 @@ class DataConsumer extends EnhancedEventEmitter_1.EnhancedEventEmitter {
get dataProducerId() {
return this._internal.dataProducerId;
}
/**
* Buffered amount threshold.
*/
get bufferedAmountLowThreshold() {
return this._bufferedAmountLowThreshold;
}
set bufferedAmountLowThreshold(value) {
this._bufferedAmountLowThreshold = value;
}
/**
* Whether the DataConsumer is closed.
*/
Expand Down Expand Up @@ -149,6 +136,49 @@ class DataConsumer extends EnhancedEventEmitter_1.EnhancedEventEmitter {
logger.debug('getStats()');
return this._channel.request('dataConsumer.getStats', this._internal);
}
/**
* Set buffered amount low threshold.
*/
async setBufferedAmountLowThreshold(threshold) {
logger.debug('setBufferedAmountLowThreshold() [threshold:%s]', threshold);
const reqData = { threshold };
await this._channel.request('dataConsumer.setBufferedAmountLowThreshold', this._internal, reqData);
}
/**
* Send data.
*/
async send(message, ppid) {
if (typeof message !== 'string' && !Buffer.isBuffer(message)) {
throw new TypeError('message must be a string or a Buffer');
}
/*
* +-------------------------------+----------+
* | Value | SCTP |
* | | PPID |
* +-------------------------------+----------+
* | WebRTC String | 51 |
* | WebRTC Binary Partial | 52 |
* | (Deprecated) | |
* | WebRTC Binary | 53 |
* | WebRTC String Partial | 54 |
* | (Deprecated) | |
* | WebRTC String Empty | 56 |
* | WebRTC Binary Empty | 57 |
* +-------------------------------+----------+
*/
if (typeof ppid !== 'number') {
ppid = (typeof message === 'string')
? message.length > 0 ? 51 : 56
: message.length > 0 ? 53 : 57;
}
// Ensure we honor PPIDs.
if (ppid === 56)
message = ' ';
else if (ppid === 57)
message = Buffer.alloc(1);
const requestData = { ppid };
await this._payloadChannel.request('dataConsumer.send', this._internal, requestData, message);
}
/**
* Get buffered amount size.
*/
Expand Down Expand Up @@ -178,15 +208,10 @@ class DataConsumer extends EnhancedEventEmitter_1.EnhancedEventEmitter {
this.safeEmit('sctpsendbufferfull');
break;
}
case 'bufferedamount':
case 'bufferedamountlow':
{
const { bufferedAmount } = data;
const previousBufferedAmount = this._bufferedAmount;
this._bufferedAmount = bufferedAmount;
if (previousBufferedAmount > this._bufferedAmountLowThreshold &&
this._bufferedAmount <= this._bufferedAmountLowThreshold) {
this.safeEmit('bufferedamountlow', bufferedAmount);
}
this.safeEmit('bufferedamountlow', bufferedAmount);
break;
}
default:
Expand Down
6 changes: 6 additions & 0 deletions lib/PayloadChannel.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ export declare class PayloadChannel extends EnhancedEventEmitter {
private _closed;
private readonly _producerSocket;
private readonly _consumerSocket;
private _nextId;
private readonly _sents;
private _recvBuffer?;
private _ongoingNotification?;
/**
Expand All @@ -21,6 +23,10 @@ export declare class PayloadChannel extends EnhancedEventEmitter {
* @private
*/
notify(event: string, internal: object, data: any | undefined, payload: string | Buffer): void;
/**
* @private
*/
request(method: string, internal: object, data: any, payload: string | Buffer): Promise<any>;
private _processData;
}
//# sourceMappingURL=PayloadChannel.d.ts.map
2 changes: 1 addition & 1 deletion lib/PayloadChannel.d.ts.map

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

98 changes: 90 additions & 8 deletions lib/PayloadChannel.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ class PayloadChannel extends EnhancedEventEmitter_1.EnhancedEventEmitter {
super();
// Closed flag.
this._closed = false;
// Next id for messages sent to the worker process.
this._nextId = 0;
// Map of pending sent requests.
this._sents = new Map();
logger.debug('constructor()');
this._producerSocket = producerSocket;
this._consumerSocket = consumerSocket;
Expand Down Expand Up @@ -123,6 +127,56 @@ class PayloadChannel extends EnhancedEventEmitter_1.EnhancedEventEmitter {
return;
}
}
/**
* @private
*/
async request(method, internal, data, payload) {
this._nextId < 4294967295 ? ++this._nextId : (this._nextId = 1);
const id = this._nextId;
logger.debug('request() [method:%s, id:%s]', method, id);
if (this._closed)
throw new errors_1.InvalidStateError('Channel closed');
const request = { id, method, internal, data };
const ns1 = netstring.nsWrite(JSON.stringify(request));
const ns2 = netstring.nsWrite(payload);
if (Buffer.byteLength(ns1) > NS_MESSAGE_MAX_LEN)
throw new Error('Channel request too big');
else if (Buffer.byteLength(ns2) > NS_MESSAGE_MAX_LEN)
throw new Error('PayloadChannel payload too big');
// This may throw if closed or remote side ended.
this._producerSocket.write(ns1);
this._producerSocket.write(ns2);
return new Promise((pResolve, pReject) => {
const timeout = 1000 * (15 + (0.1 * this._sents.size));
const sent = {
id: id,
method: method,
resolve: (data2) => {
if (!this._sents.delete(id))
return;
clearTimeout(sent.timer);
pResolve(data2);
},
reject: (error) => {
if (!this._sents.delete(id))
return;
clearTimeout(sent.timer);
pReject(error);
},
timer: setTimeout(() => {
if (!this._sents.delete(id))
return;
pReject(new Error('Channel request timeout'));
}, timeout),
close: () => {
clearTimeout(sent.timer);
pReject(new errors_1.InvalidStateError('Channel closed'));
}
};
// Add sent stuff to the map.
this._sents.set(id, sent);
});
}
_processData(data) {
if (!this._ongoingNotification) {
let msg;
Expand All @@ -133,16 +187,44 @@ class PayloadChannel extends EnhancedEventEmitter_1.EnhancedEventEmitter {
logger.error('received invalid data from the worker process: %s', String(error));
return;
}
if (!msg.targetId || !msg.event) {
logger.error('received message is not a notification');
// If a response, retrieve its associated request.
if (msg.id) {
const sent = this._sents.get(msg.id);
if (!sent) {
logger.error('received response does not match any sent request [id:%s]', msg.id);
return;
}
if (msg.accepted) {
logger.debug('request succeeded [method:%s, id:%s]', sent.method, sent.id);
sent.resolve(msg.data);
}
else if (msg.error) {
logger.warn('request failed [method:%s, id:%s]: %s', sent.method, sent.id, msg.reason);
switch (msg.error) {
case 'TypeError':
sent.reject(new TypeError(msg.reason));
break;
default:
sent.reject(new Error(msg.reason));
}
}
else {
logger.error('received response is not accepted nor rejected [method:%s, id:%s]', sent.method, sent.id);
}
}
// If a notification, create the ongoing notification instance.
else if (msg.targetId && msg.event) {
this._ongoingNotification =
{
targetId: msg.targetId,
event: msg.event,
data: msg.data
};
}
else {
logger.error('received data is not a notification nor a response');
return;
}
this._ongoingNotification =
{
targetId: msg.targetId,
event: msg.event,
data: msg.data
};
}
else {
const payload = data;
Expand Down
2 changes: 1 addition & 1 deletion lib/Worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class Worker extends EnhancedEventEmitter_1.EnhancedEventEmitter {
// options
{
env: {
MEDIASOUP_VERSION: '3.6.15'
MEDIASOUP_VERSION: '3.6.14'
},
detached: false,
// fd 0 (stdin) : Just ignore it.
Expand Down
2 changes: 1 addition & 1 deletion src/Channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ export class Channel extends EnhancedEventEmitter

private _processMessage(msg: any): void
{
// If a response retrieve its associated request.
// If a response, retrieve its associated request.
if (msg.id)
{
const sent = this._sents.get(msg.id);
Expand Down
Loading

0 comments on commit fd08981

Please sign in to comment.