Skip to content
This repository has been archived by the owner on Feb 4, 2022. It is now read-only.

Commit

Permalink
feat(connection): Implement fast fallback
Browse files Browse the repository at this point in the history
Implements fast fallback and refactors connection
to allow family to be specified for ssl and unsecure
connections.

Fixes NODE-1580
  • Loading branch information
Sophie Saskin authored and mbroadst committed Sep 4, 2018
1 parent 13a59d2 commit 622394a
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 139 deletions.
226 changes: 128 additions & 98 deletions lib/connection/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -529,97 +529,142 @@ function merge(options1, options2) {
}
}

function makeSSLConnection(self, _options) {
let sslOptions = {
socket: self.connection,
rejectUnauthorized: self.rejectUnauthorized
};

// Merge in options
merge(sslOptions, self.options);
merge(sslOptions, _options);

// Set options for ssl
if (self.ca) sslOptions.ca = self.ca;
if (self.crl) sslOptions.crl = self.crl;
if (self.cert) sslOptions.cert = self.cert;
if (self.key) sslOptions.key = self.key;
if (self.passphrase) sslOptions.passphrase = self.passphrase;

// Override checkServerIdentity behavior
if (self.checkServerIdentity === false) {
// Skip the identiy check by retuning undefined as per node documents
// https://nodejs.org/api/tls.html#tls_tls_connect_options_callback
sslOptions.checkServerIdentity = function() {
return undefined;
function prepareConnectionOptions(self, _options) {
let options;
if (self.ssl) {
options = {
socket: self.connection,
rejectUnauthorized: self.rejectUnauthorized
};
} else if (typeof self.checkServerIdentity === 'function') {
sslOptions.checkServerIdentity = self.checkServerIdentity;
}

// Set default sni servername to be the same as host
if (sslOptions.servername == null) {
sslOptions.servername = self.host;
}
// Merge in options
merge(options, self.options);
merge(options, _options);

// Set options for ssl
if (self.ca) options.ca = self.ca;
if (self.crl) options.crl = self.crl;
if (self.cert) options.cert = self.cert;
if (self.key) options.key = self.key;
if (self.passphrase) options.passphrase = self.passphrase;

// Override checkServerIdentity behavior
if (self.checkServerIdentity === false) {
// Skip the identiy check by retuning undefined as per node documents
// https://nodejs.org/api/tls.html#tls_tls_connect_options_callback
options.checkServerIdentity = function() {
return undefined;
};
} else if (typeof self.checkServerIdentity === 'function') {
options.checkServerIdentity = self.checkServerIdentity;
}

// Attempt SSL connection
const connection = tls.connect(self.port, self.host, sslOptions, function() {
// Error on auth or skip
if (connection.authorizationError && self.rejectUnauthorized) {
return self.emit('error', connection.authorizationError, self, { ssl: true });
// Set default sni servername to be the same as host
if (options.servername == null) {
options.servername = self.host;
}

options = Object.assign({}, options, { host: self.host, port: self.port });

return options;
} else {
if (self.domainSocket) {
options = { path: self.host };
} else {
options = { port: self.port, host: self.host };
}
return options;
}
}

function makeConnection(self, options, callback) {
const module = options.ssl ? tls : net;

const connection = module.connect(options, function() {
if (self.ssl) {
// Error on auth or skip
if (connection.authorizationError && self.rejectUnauthorized) {
return self.emit('error', connection.authorizationError, self, { ssl: true });
}
}
// Set socket timeout instead of connection timeout

connection.setTimeout(self.socketTimeout);
// We are done emit connect
self.emit('connect', self);
return callback(null, connection);
});

// Set the options for the connection
connection.setKeepAlive(self.keepAlive, self.keepAliveInitialDelay);
connection.setTimeout(self.connectionTimeout);
connection.setNoDelay(self.noDelay);

return connection;
// Add handlers for events
connection.once('error', err =>
callback({ err: err, type: 'error', family: options.family }, null)
);
connection.once('timeout', err => callback({ err: err, type: 'timeout' }, null));
connection.once('close', err => callback({ err: err, type: 'close' }, null));
connection.on('data', err => callback({ err: err, type: 'data' }, null));
return;
}

function makeUnsecureConnection(self, family) {
// Create new connection instance
let connection_options;
if (self.domainSocket) {
connection_options = { path: self.host };
} else {
connection_options = { port: self.port, host: self.host };
connection_options.family = family;
}

const connection = net.createConnection(connection_options);

// Set the options for the connection
connection.setKeepAlive(self.keepAlive, self.keepAliveInitialDelay);
connection.setTimeout(self.connectionTimeout);
connection.setNoDelay(self.noDelay);

connection.once('connect', function() {
// Set socket timeout instead of connection timeout
connection.setTimeout(self.socketTimeout);
// Emit connect event
self.emit('connect', self);
});
function fastFallback(self, _options, callback) {
const options = prepareConnectionOptions(self, _options);

let connection;
const connectionHandler = (err, _connection) => {
const _errorHandler = errorHandler(self);
const _timeoutHandler = timeoutHandler(self);
const _closeHandler = closeHandler(self);
const _dataHandler = dataHandler(self);

if (err) {
switch (err.type) {
case 'error':
if (err.family === 6) {
return function() {
if (self.logger.isDebug()) {
self.logger.debug(
f(
'connection %s for [%s:%s] errored out with [%s]',
self.id,
self.host,
self.port,
JSON.stringify(err)
)
);
}
};
} else {
return _errorHandler(err.err);
}
case 'timeout':
return _timeoutHandler(err.err);
case 'close':
return _closeHandler(err.err);
case 'data':
return _dataHandler(err.err);
}
}

return connection;
}
if (connection) {
_connection.removeAllListeners('error');
_connection.removeAllListeners('timeout');
_connection.removeAllListeners('close');
_connection.removeAllListeners('data');
_connection.end();
_connection.unref();
} else {
connection = _connection;
}

function doConnect(self, family, _options, _errorHandler) {
self.connection = self.ssl
? makeSSLConnection(self, _options)
: makeUnsecureConnection(self, family);
return callback(null, connection);
};

// Add handlers for events
self.connection.once('error', _errorHandler);
self.connection.once('timeout', timeoutHandler(self));
self.connection.once('close', closeHandler(self));
self.connection.on('data', dataHandler(self));
makeConnection(self, Object.assign({}, { family: 6 }, options), connectionHandler);
setTimeout(() => {
makeConnection(self, Object.assign({}, { family: 4 }, options), connectionHandler);
}, 250);
}

/**
Expand All @@ -637,33 +682,18 @@ Connection.prototype.connect = function(_options) {
this.responseOptions.promoteBuffers = _options.promoteBuffers;
}

const _errorHandler = errorHandler(this);

if (this.family !== void 0) {
return doConnect(this, this.family, _options, _errorHandler);
}

return doConnect(this, 6, _options, err => {
if (this.logger.isDebug()) {
this.logger.debug(
f(
'connection %s for [%s:%s] errored out with [%s]',
this.id,
this.host,
this.port,
JSON.stringify(err)
)
);
return fastFallback(this, _options, (err, connection) => {
if (err) {
return;
}

// clean up existing event handlers
this.connection.removeAllListeners('error');
this.connection.removeAllListeners('timeout');
this.connection.removeAllListeners('close');
this.connection.removeAllListeners('data');
this.connection = undefined;

return doConnect(this, 4, _options, _errorHandler);
// Add handlers for events
connection.once('error', errorHandler(this));
connection.once('timeout', timeoutHandler(this));
connection.once('close', closeHandler(this));
connection.on('data', dataHandler(this));
this.connection = connection;
this.emit('connect', this);
return;
});
};

Expand Down
40 changes: 0 additions & 40 deletions test/tests/functional/pool_tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,46 +48,6 @@ describe('Pool tests', function() {
}
});

it('Should only listen on connect once', {
metadata: { requires: { topology: 'single' } },

test: function(done) {
// Attempt to connect
var pool = new Pool(null, {
host: this.configuration.host,
port: this.configuration.port,
bson: new Bson(),
messageHandler: function() {}
});

let connection;

// Add event listeners
pool.on('connect', function() {
var connections = pool.allConnections();

process.nextTick(() => {
// Now that we are in next tick, connection should still exist, but there
// should be no connect listeners
expect(connection.connection.listenerCount('connect')).to.equal(0);
expect(connections).to.have.lengthOf(1);

pool.destroy();
done();
});
});

expect(pool.allConnections()).to.have.lengthOf(0);

// Start connection
pool.connect();

expect(pool.allConnections()).to.have.lengthOf(1);
connection = pool.allConnections()[0];
expect(connection.connection.listenerCount('connect')).to.equal(1);
}
});

it('should correctly write ismaster operation to the server', {
metadata: { requires: { topology: 'single' } },

Expand Down
2 changes: 1 addition & 1 deletion test/tests/functional/rs_mocks/add_remove_tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ describe('ReplSet Add Remove (mocks)', function() {
server.destroy();
done();
}, 1000);
}, 500);
}, 630); // This connection implementation slows down conneciton to nodes, need to fix this

server.on('left', function(_type, _server) {
if (_type === 'secondary' && _server.name === 'localhost:32003') {
Expand Down

0 comments on commit 622394a

Please sign in to comment.