Skip to content

Commit

Permalink
feat(Update): add the ability to specify a pipeline to an update comm…
Browse files Browse the repository at this point in the history
…and (mongodb#2017)

* feat(Update): add the ability to specify a pipeline to an update command

NODE-1920
  • Loading branch information
Sam Pal authored and Rosemary Yin committed Aug 16, 2019
1 parent 7304f26 commit d248aca
Show file tree
Hide file tree
Showing 2 changed files with 317 additions and 0 deletions.
90 changes: 90 additions & 0 deletions test-runner-poc/config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
'use strict';
const f = require('util').format;
const url = require('url');
const qs = require('querystring');
const core = require('../lib/core');
class NativeConfiguration {
constructor(environment) {
this.options = environment || {};
this.host = environment.host || 'localhost';
this.port = environment.port || 27017;
this.db = environment.db || 'integration_tests';
this.mongo = environment.mongo;
this.setName = environment.setName || 'rs';
this.require = this.mongo;
this.writeConcern = function() {
return { w: 1 };
};
this.topology = environment.topology || this.defaultTopology;
this.environment = environment;
}
usingUnifiedTopology() {
return !!process.env.MONGODB_UNIFIED_TOPOLOGY;
}
newClient(dbOptions, serverOptions) {
// support MongoClient contructor form (url, options) for `newClient`
if (typeof dbOptions === 'string') {
return new this.mongo.MongoClient(
dbOptions,
this.usingUnifiedTopology()
? Object.assign({ useUnifiedTopology: true }, serverOptions)
: serverOptions
);
}
dbOptions = dbOptions || {};
serverOptions = Object.assign({}, { haInterval: 100 }, serverOptions);
if (this.usingUnifiedTopology()) serverOptions.useUnifiedTopology = true;
// Set up the options
const keys = Object.keys(this.options);
if (keys.indexOf('sslOnNormalPorts') !== -1) serverOptions.ssl = true;
// Fall back
let dbHost = (serverOptions && serverOptions.host) || 'localhost';
const dbPort = (serverOptions && serverOptions.port) || this.options.port || 27017;
if (dbHost.indexOf('.sock') !== -1) {
dbHost = qs.escape(dbHost);
}
if (this.options.setName) {
Object.assign(dbOptions, { replicaSet: this.options.setName, auto_reconnect: false });
}
const connectionString = url.format({
protocol: 'mongodb',
slashes: true,
hostname: dbHost,
port: dbPort,
query: dbOptions,
pathname: '/'
});
return new this.mongo.MongoClient(connectionString, serverOptions);
}
newTopology(host, port, options) {
options = options || {};
return this.topology(host, port, options);
}
url(username, password) {
const url = this.options.url || 'mongodb://%slocalhost:27017/' + this.db;
// Fall back
const auth = username && password ? f('%s:%s@', username, password) : '';
return f(url, auth);
}
writeConcernMax() {
return Object.assign({}, this.options.writeConcernMax || { w: 1 });
}
server37631Workaround() {
console.log('[applying SERVER-37631 workaround]');
const configServers = this.manager.configurationServers.managers;
const proxies = this.manager.proxies;
const configServersPromise = configServers.reduce((result, server) => {
return result.then(() =>
server.executeCommand('admin.$cmd', { refreshLogicalSessionCacheNow: 1 })
);
}, Promise.resolve());
return configServersPromise.then(() => {
return proxies.reduce((promise, proxy) => {
return promise.then(() =>
proxy.executeCommand('admin.$cmd', { refreshLogicalSessionCacheNow: 1 })
);
}, Promise.resolve());
});
}
}
module.exports = NativeConfiguration;
227 changes: 227 additions & 0 deletions test-runner-poc/environments.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
'use strict';
const semver = require('semver');
const path = require('path');
const core = require('../lib/core');
/**
* Base class for environments in projects that use the test
* runner
*/
class EnvironmentBase {
/**
* The default implementation of the environment setup
*
* @param {*} callback
*/
setup(callback) {
callback();
}
constructor(status) {
if (status.primary) {
this.port = parseInt(status.primary.split(':')[1]);
this.host = status.primary.split(':')[0];
}
}
}
const genReplsetConfig = (port, options) => {
return Object.assign(
{
options: {
bind_ip: 'localhost',
port: port,
dbpath: `${__dirname}/../db/${port}`,
setParameter: ['enableTestCommands=1']
}
},
options
);
};
function usingUnifiedTopology() {
return !!process.env.MONGODB_UNIFIED_TOPOLOGY;
}
/**
*
* @param {*} discoverResult
*/
class ReplicaSetEnvironment extends EnvironmentBase {
constructor(status, version) {
super(status);
this.setName = status.setName || 'rs';
this.url = `mongodb://${this.host}:${this.port}/integration_tests?rs_name=${this.setName}`;
this.writeConcernMax = { w: 'majority', wtimeout: 30000 };
this.replicasetName = this.setName;
this.topology = function(topologyHost, topologyPort, options) {
topologyHost = topologyHost || this.host;
topologyPort = topologyPort || this.port;
options = Object.assign({}, options);
options.replicaSet = this.setName;
options.poolSize = 1;
options.autoReconnect = false;
if (usingUnifiedTopology()) {
return new core.Topology([{ topologyHost, topologyPort }], options);
}
return new core.ReplSet([{ topologyHost, topologyPort }], options);
};

// Do we have 3.2+
if (semver.satisfies(version, '>=3.2.0')) {
this.nodes = this.nodes.map(function(x) {
x.options.enableMajorityReadConcern = null;
return x;
});
}
}
}

function generateNodesArray(hosts, configFunc){
let nodesArray = [];
for (let i = 0; i < hosts.length; i++) {
nodesArray[i] = configFunc(hosts[i].port || hosts[0].port + i);
}
return nodesArray;
}

/**
*
*/
class SingleEnvironment extends EnvironmentBase {
constructor(parsedURI) {
super(parsedURI);
}
}
const genShardedConfig = (port, options, shardOptions) => {
return Object.assign(
{
options: {
bind_ip: 'localhost',
port: port,
dbpath: `${__dirname}/../db/${port}`,
shardsvr: null
}
},
options,
shardOptions
);
};
const genConfigNode = (port, options) => {
return Object.assign(
{
options: {
bind_ip: 'localhost',
port: port,
dbpath: `${__dirname}/../db/${port}`,
setParameter: ['enableTestCommands=1']
}
},
options
);
};
/**
*
*/
class ShardedEnvironment extends EnvironmentBase {
constructor(parsedURI, version) {
super(parsedURI);
// NOTE: only connect to a single shard because there can be consistency issues using
// more, revolving around the inability for shards to keep up-to-date views of
// changes to the world (such as dropping a database).
this.url = `mongodb://%s${this.host}:${this.port}/integration_tests`;
this.writeConcernMax = { w: 'majority', wtimeout: 30000 };
this.topology = (topologyHost, topologyPort, options) => {
topologyHost = topologyHost || this.host;
topologyPort = topologyPort || this.port;
options = options || {};
if (usingUnifiedTopology()) {
return new core.Topology([{ topologyHost, topologyPort }], options);
}
return new core.Mongos([{ topologyHost, topologyPort }], options);
};
this.server37631WorkaroundNeeded = semver.satisfies(version, '3.6.x');
}
setup(callback) {
const shardOptions = this.options && this.options.shard ? this.options.shard : {};
// First set of nodes
//const nodes1 = generateNodesArray(parsedURI.hosts, genShardedConfig);

const configOptions = this.options && this.options.config ? this.options.config : {};
//const configNodes = [genConfigNode(35000, configOptions)];
let proxyNodes = [
{
bind_ip: 'localhost',
port: 51000,
configdb: 'localhost:35000,localhost:35001,localhost:35002',
setParameter: ['enableTestCommands=1']
},
{
bind_ip: 'localhost',
port: 51001,
configdb: 'localhost:35000,localhost:35001,localhost:35002',
setParameter: ['enableTestCommands=1']
}
];
// Additional mapping
const self = this;
proxyNodes = proxyNodes.map(function(x) {
if (self.options && self.options.proxy) {
for (let name in self.options.proxy) {
x[name] = self.options.proxy[name];
}
}
return x;
});
this.proxies = proxyNodes.map(proxy => {
return { host: proxy.bind_ip, port: proxy.port };
});
}
}
/**
*
*/
class SslEnvironment extends EnvironmentBase {
constructor(parsedURI) {
super(parsedURI);
this.sslOnNormalPorts = null;
this.fork = null;
this.sslPEMKeyFile = __dirname + '/functional/ssl/server.pem';
this.url = `mongodb://%s${this.host}:${this.port}/integration_tests?ssl=true&sslValidate=false`;
this.topology = function(topologyHost, topologyPort, serverOptions) {
topologyHost = topologyHost || this.host;
topologyPort = topologyPort || this.port;
serverOptions = Object.assign({}, serverOptions);
serverOptions.poolSize = 1;
serverOptions.ssl = true;
serverOptions.sslValidate = false;
return new core.Server(topologyHost, topologyPort, serverOptions);
};
}
}
/**
*
*/
class AuthEnvironment extends EnvironmentBase {
constructor(parsedURI) {
super(parsedURI);
this.url = `mongodb://%s${this.host}:${this.port}/integration_tests`;
this.topology = function(topologyHost, topologyPort, serverOptions) {
topologyHost = topologyHost || this.host;
topologyPort = topologyPort || this.port;
serverOptions = Object.assign({}, serverOptions);
serverOptions.poolSize = 1;
return new core.Server(topologyHost, topologyPort, serverOptions);
};
}
}
module.exports = {
single: SingleEnvironment,
replicaset: ReplicaSetEnvironment,
sharded: ShardedEnvironment,
ssl: SslEnvironment,
auth: AuthEnvironment,
// informational aliases
kerberos: SingleEnvironment,
ldap: SingleEnvironment,
sni: SingleEnvironment,
// for compatability with evergreen template
server: SingleEnvironment,
replica_set: ReplicaSetEnvironment,
sharded_cluster: ShardedEnvironment
};

0 comments on commit d248aca

Please sign in to comment.