Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(jobs): support for parallelism in metronome #63

Merged
merged 1 commit into from
Feb 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions src/jobs/helpers/jobVerifier.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
'use strict';
const testsManager = require('../../tests/models/manager');
const serviceConfig = require('../../config/serviceConfig');
const consts = require('../../common/consts');

module.exports.verifyJobBody = (req, res, next) => {
let errorToThrow;
let jobBody = req.body;
if (!(jobBody.run_immediately || jobBody.cron_expression)) {
errorToThrow = new Error('Please provide run_immediately or cron_expression in order to schedule a job');
errorToThrow.statusCode = 400;
} else if (serviceConfig.jobPlatform !== consts.KUBERNETES && serviceConfig.jobPlatform !== consts.DOCKER && jobBody.parallelism > 1) {
errorToThrow = new Error(`parallelism is only supported in JOB_PLATFORM: ${consts.KUBERNETES}`);
errorToThrow.statusCode = 400;
}

next(errorToThrow);
};

Expand Down
19 changes: 7 additions & 12 deletions src/jobs/models/jobManager.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
'use strict';
let logger = require('../../common/logger');
let consts = require('../../common/consts');

let uuid = require('uuid');
let CronJob = require('cron').CronJob;
Expand Down Expand Up @@ -156,20 +155,16 @@ function createResponse(jobId, jobBody, runId) {

function createJobRequest(jobId, runId, jobBody, dockerImage) {
let jobName = util.format(JOB_PLATFORM_NAME, jobId);
let parallelism = 1;
let arrivalRatePerRunner = jobBody.arrival_rate;
let rampToPerRunner = jobBody.ramp_to;
let maxVirtualUsersPerRunner = jobBody.max_virtual_users;

if (consts.KUBERNETES === config.jobPlatform || consts.DOCKER === config.jobPlatform) {
parallelism = jobBody.parallelism || 1;
arrivalRatePerRunner = Math.ceil(jobBody.arrival_rate / parallelism);
if (jobBody.ramp_to) {
rampToPerRunner = Math.ceil(jobBody.ramp_to / parallelism);
}
if (jobBody.max_virtual_users) {
maxVirtualUsersPerRunner = Math.ceil(jobBody.max_virtual_users / parallelism);
}
let parallelism = jobBody.parallelism || 1;
let arrivalRatePerRunner = Math.ceil(jobBody.arrival_rate / parallelism);
if (jobBody.ramp_to) {
rampToPerRunner = Math.ceil(jobBody.ramp_to / parallelism);
}
if (jobBody.max_virtual_users) {
maxVirtualUsersPerRunner = Math.ceil(jobBody.max_virtual_users / parallelism);
}

let environmentVariables = {
Expand Down
37 changes: 30 additions & 7 deletions src/jobs/models/metronome/jobConnector.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,48 @@ if (metronomeConfig.metronomeToken) {
}

module.exports.runJob = async (metronomeJobConfig) => {
let parallelism = metronomeJobConfig.parallelism || 1;
delete metronomeJobConfig.parallelism;
let deployJobMethod = await chooseDeployJobMethod(metronomeJobConfig);
let deployJobResponse = await deployJob(deployJobMethod, metronomeJobConfig);
let runJobResponse = await runJob(metronomeJobConfig.id);

let runJobPromises = [];
for (let i = 0; i < parallelism; i++) {
runJobPromises.push(runJob(metronomeJobConfig.id));
}
await Promise.all(runJobPromises);

let genericJobResponse = {
jobName: deployJobResponse.id,
id: runJobResponse.id
jobName: deployJobResponse.id
};
return genericJobResponse;
};

module.exports.stopRun = async (jobPlatformName, platformSpecificInternalRunId) => {
let url = util.format('%s/v1/jobs/%s/runs/%s/actions/stop', metronomeUrl, jobPlatformName, platformSpecificInternalRunId);
module.exports.stopRun = async (jobPlatformName) => {
let url = util.format('%s/v1/jobs/%s/runs', metronomeUrl, jobPlatformName);
let options = {
method: 'POST',
method: 'GET',
url: url,
headers
};
await requestSender.send(options);

let currentJobRuns = await requestSender.send(options);

let stopJobPromises = [];
currentJobRuns.forEach((jobRun) => {
stopJobPromises.push(async () => {
let url = util.format('%s/v1/jobs/%s/runs/%s/actions/stop', metronomeUrl, jobPlatformName, jobRun.id);
let options = {
method: 'POST',
url: url,
headers
};
await requestSender.send(options);
});
});

stopJobPromises.forEach(stopJob => stopJob());
await Promise.all(stopJobPromises);
};

async function deployJob(method, metronomeJobConfig) {
Expand Down
3 changes: 2 additions & 1 deletion src/jobs/models/metronome/jobTemplate.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ module.exports.createJobRequest = (jobName, runId, parallelism, environmentVaria
image: dockerImage
},
env: environmentVariables
}
},
parallelism: parallelism
};
};
42 changes: 16 additions & 26 deletions tests/integration-tests/jobs/createJobMetronome-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,11 @@ describe('Create job specific metronome tests', () => {

it('Stop run', async () => {
nock(metronomeConfig.metronomeUrl)
.post(`/v1/jobs/predator.${jobResponseBody.id}/runs/${jobResponseBody.run_id}/actions/stop`)
.get(`/v1/jobs/predator.${jobResponseBody.id}/runs`)
.reply(200, [{ id: 1 }]);

nock(metronomeConfig.metronomeUrl)
.post(`/v1/jobs/predator.${jobResponseBody.id}/runs/1/actions/stop`)
.reply(200);

let stopRunResponse = await schedulerRequestCreator.stopRun(createJobResponse.body.id, createJobResponse.body.run_id, {
Expand Down Expand Up @@ -123,6 +127,7 @@ describe('Create job specific metronome tests', () => {
test_id: testId,
arrival_rate: 1,
duration: 1,
parallelism: 2,
environment: 'test',
run_immediately: true,
max_virtual_users: 100
Expand All @@ -142,7 +147,7 @@ describe('Create job specific metronome tests', () => {
nock(metronomeConfig.metronomeUrl).post(
url => {
return url.startsWith('/v1/jobs') && url.endsWith('/runs');
}).reply(200, {
}).times(2).reply(200, {
id: 'runId'
});

Expand All @@ -167,7 +172,15 @@ describe('Create job specific metronome tests', () => {

it('Stop run', async () => {
nock(metronomeConfig.metronomeUrl)
.post(`/v1/jobs/predator.${jobResponseBody.id}/runs/${jobResponseBody.run_id}/actions/stop`)
.get(`/v1/jobs/predator.${jobResponseBody.id}/runs`)
.reply(200, [{ id: 1 }, { id: 2 }]);

nock(metronomeConfig.metronomeUrl)
.post(`/v1/jobs/predator.${jobResponseBody.id}/runs/1/actions/stop`)
.reply(200);

nock(metronomeConfig.metronomeUrl)
.post(`/v1/jobs/predator.${jobResponseBody.id}/runs/2/actions/stop`)
.reply(200);

let stopRunResponse = await schedulerRequestCreator.stopRun(createJobResponse.body.id, createJobResponse.body.run_id, {
Expand All @@ -190,29 +203,6 @@ describe('Create job specific metronome tests', () => {
});
});
});

describe('Bad requests', () => {
describe('Create job with parallelism > 1 should return 400', () => {
it('Create the job', async () => {
let validBody = {
test_id: testId,
arrival_rate: 1,
duration: 1,
environment: 'test',
run_immediately: true,
max_virtual_users: 100,
parallelism: 2
};

let response = await schedulerRequestCreator.createJob(validBody, {
'Content-Type': 'application/json'
});

should(response.statusCode).eql(400);
should(response.body.message).eql('parallelism is only supported in JOB_PLATFORM: KUBERNETES');
});
});
});
});
}
}).timeout(20000);
44 changes: 11 additions & 33 deletions tests/unit-tests/jobs/helpers/jobVerifier-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ describe('Jobs verifier tests', function () {

describe('verifyTestExists tests', () => {
it('Should pass test id validation', async () => {
req = { body: { test_id: 'id' } };
req = {body: {test_id: 'id'}};

testsManagerStub.resolves();
await jobVerifier.verifyTestExists(req, res, nextStub);
Expand All @@ -47,9 +47,9 @@ describe('Jobs verifier tests', function () {
});

it('Should fail on test id validation when test not found', async () => {
req = { body: { test_id: 'id' } };
req = {body: {test_id: 'id'}};

testsManagerStub.rejects({ statusCode: 404 });
testsManagerStub.rejects({statusCode: 404});

await jobVerifier.verifyTestExists(req, res, nextStub);
should(nextStub.called).eql(true);
Expand All @@ -58,9 +58,9 @@ describe('Jobs verifier tests', function () {
});

it('Should fail on test id validation when error from performance framework api', async () => {
req = { body: { test_id: 'id' } };
req = {body: {test_id: 'id'}};

testsManagerStub.rejects({ statusCode: 500, message: 'failure' });
testsManagerStub.rejects({statusCode: 500, message: 'failure'});

await jobVerifier.verifyTestExists(req, res, nextStub);
should(nextStub.called).eql(true);
Expand All @@ -71,62 +71,40 @@ describe('Jobs verifier tests', function () {

describe('verifyJobBody tests', () => {
it('Run immediately is true and cron expression does not exist, should pass', async () => {
req = { body: { run_immediately: true } };
req = {body: {run_immediately: true}};
await jobVerifier.verifyJobBody(req, res, nextStub);
should(nextStub.args[0][0]).eql(undefined);
});

it('Run immediately is true and cron expression exist, should pass', async () => {
req = { body: { run_immediately: true, cron_expression: '* * *' } };
req = {body: {run_immediately: true, cron_expression: '* * *'}};
await jobVerifier.verifyJobBody(req, res, nextStub);
should(nextStub.args[0][0]).eql(undefined);
});

it('Run immediately is false and cron expression does not exist, should fail', async () => {
req = { body: { run_immediately: false } };
req = {body: {run_immediately: false}};
await jobVerifier.verifyJobBody(req, res, nextStub);
should(nextStub.args[0][0].message).eql('Please provide run_immediately or cron_expression in order to schedule a job');
should(nextStub.args[0][0].statusCode).eql(400);
});

it('Run immediately is false and cron expression exist, should pass', async () => {
req = { body: { run_immediately: false, cron_expression: '* * *' } };
req = {body: {run_immediately: false, cron_expression: '* * *'}};
await jobVerifier.verifyJobBody(req, res, nextStub);
should(nextStub.args[0][0]).eql(undefined);
});
it('Run immediately does not exits and cron expression exist, should pass', async () => {
req = { body: { cron_expression: '* * *' } };
req = {body: {cron_expression: '* * *'}};
await jobVerifier.verifyJobBody(req, res, nextStub);
should(nextStub.args[0][0]).eql(undefined);
});

it('Run immediately does not exits and cron expression does not exist, should pass', async () => {
req = { body: {} };
req = {body: {}};
await jobVerifier.verifyJobBody(req, res, nextStub);
should(nextStub.args[0][0].message).eql('Please provide run_immediately or cron_expression in order to schedule a job');
should(nextStub.args[0][0].statusCode).eql(400);
});

it('Run immediately is true and parallelism is set to 1 with metronome as job platform, should pass', async () => {
req = { body: { run_immediately: true, parallelism: 1 } };
config.jobPlatform = consts.METRONOME;
await jobVerifier.verifyJobBody(req, res, nextStub);
should(nextStub.args[0][0]).eql(undefined);
});

it('Run immediately is true and parallelism is set to 2 with metronome as job platform, should fail', async () => {
req = { body: { run_immediately: true, parallelism: 2 } };
config.jobPlatform = consts.METRONOME;
await jobVerifier.verifyJobBody(req, res, nextStub);
should(nextStub.args[0][0].message).eql('parallelism is only supported in JOB_PLATFORM: KUBERNETES');
should(nextStub.args[0][0].statusCode).eql(400);
});

it('Run immediately is true and parallelism is set to 2 with kubernetes as job platform, should pass', async () => {
req = { body: { run_immediately: true, parallelism: 2 } };
config.jobPlatform = consts.KUBERNETES;
await jobVerifier.verifyJobBody(req, res, nextStub);
should(nextStub.args[0][0]).eql(undefined);
});
});
});
24 changes: 18 additions & 6 deletions tests/unit-tests/jobs/models/metronome/jobConnector-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ describe('Metronome job connector tests', function () {
let jobResponse = await jobConnector.runJob({ id: 'predator.id' });

jobResponse.should.eql({
'id': '20190115084416zH0Ta',
'jobName': 'predator.d651ba1d-79fa-4970-b078-6f9dc4ae43e6'
});

Expand Down Expand Up @@ -70,7 +69,6 @@ describe('Metronome job connector tests', function () {
let jobResponse = await jobConnector.runJob({ id: 'predator.id' });

jobResponse.should.eql({
'id': '20190115084416zH0Ta',
'jobName': 'predator.d651ba1d-79fa-4970-b078-6f9dc4ae43e6'
});

Expand Down Expand Up @@ -136,11 +134,25 @@ describe('Metronome job connector tests', function () {

describe('Stop running job', () => {
it('Stop a running run of specific job', async () => {
requestSenderSendStub.resolves({ statusCode: 200 });
await jobConnector.stopRun('jobPlatformName', 'runId');
requestSenderSendStub.calledOnce.should.eql(true);
requestSenderSendStub.withArgs(sinon.match({ method: 'GET' })).resolves([{ id: 1 }, { id: 2 }]);
requestSenderSendStub.withArgs(sinon.match({ method: 'POST' }))
.resolves({ statusCode: 200 });

await jobConnector.stopRun('jobPlatformName');
requestSenderSendStub.calledThrice.should.eql(true);

requestSenderSendStub.args[0][0].should.eql({
'url': 'localhost:80/v1/jobs/jobPlatformName/runs/runId/actions/stop',
method: 'GET',
url: 'localhost:80/v1/jobs/jobPlatformName/runs',
headers: {}
});
requestSenderSendStub.args[1][0].should.eql({
'url': 'localhost:80/v1/jobs/jobPlatformName/runs/1/actions/stop',
method: 'POST',
headers: {}
});
requestSenderSendStub.args[2][0].should.eql({
'url': 'localhost:80/v1/jobs/jobPlatformName/runs/2/actions/stop',
method: 'POST',
headers: {}
});
Expand Down