Skip to content

Commit

Permalink
feat(jobs): support for parallelism in metronome (#63)
Browse files Browse the repository at this point in the history
  • Loading branch information
enudler authored and NivLipetz committed Feb 24, 2019
1 parent 348d9c2 commit f116410
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 91 deletions.
6 changes: 0 additions & 6 deletions src/jobs/helpers/jobVerifier.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
'use strict';
const testsManager = require('../../tests/models/manager');
const serviceConfig = require('../../config/serviceConfig');
const consts = require('../../common/consts');

module.exports.verifyJobBody = (req, res, next) => {
let errorToThrow;
let jobBody = req.body;
if (!(jobBody.run_immediately || jobBody.cron_expression)) {
errorToThrow = new Error('Please provide run_immediately or cron_expression in order to schedule a job');
errorToThrow.statusCode = 400;
} else if (serviceConfig.jobPlatform !== consts.KUBERNETES && serviceConfig.jobPlatform !== consts.DOCKER && jobBody.parallelism > 1) {
errorToThrow = new Error(`parallelism is only supported in JOB_PLATFORM: ${consts.KUBERNETES}`);
errorToThrow.statusCode = 400;
}

next(errorToThrow);
};

Expand Down
19 changes: 7 additions & 12 deletions src/jobs/models/jobManager.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
'use strict';
let logger = require('../../common/logger');
let consts = require('../../common/consts');

let uuid = require('uuid');
let CronJob = require('cron').CronJob;
Expand Down Expand Up @@ -156,20 +155,16 @@ function createResponse(jobId, jobBody, runId) {

function createJobRequest(jobId, runId, jobBody, dockerImage) {
let jobName = util.format(JOB_PLATFORM_NAME, jobId);
let parallelism = 1;
let arrivalRatePerRunner = jobBody.arrival_rate;
let rampToPerRunner = jobBody.ramp_to;
let maxVirtualUsersPerRunner = jobBody.max_virtual_users;

if (consts.KUBERNETES === config.jobPlatform || consts.DOCKER === config.jobPlatform) {
parallelism = jobBody.parallelism || 1;
arrivalRatePerRunner = Math.ceil(jobBody.arrival_rate / parallelism);
if (jobBody.ramp_to) {
rampToPerRunner = Math.ceil(jobBody.ramp_to / parallelism);
}
if (jobBody.max_virtual_users) {
maxVirtualUsersPerRunner = Math.ceil(jobBody.max_virtual_users / parallelism);
}
let parallelism = jobBody.parallelism || 1;
let arrivalRatePerRunner = Math.ceil(jobBody.arrival_rate / parallelism);
if (jobBody.ramp_to) {
rampToPerRunner = Math.ceil(jobBody.ramp_to / parallelism);
}
if (jobBody.max_virtual_users) {
maxVirtualUsersPerRunner = Math.ceil(jobBody.max_virtual_users / parallelism);
}

let environmentVariables = {
Expand Down
37 changes: 30 additions & 7 deletions src/jobs/models/metronome/jobConnector.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,48 @@ if (metronomeConfig.metronomeToken) {
}

module.exports.runJob = async (metronomeJobConfig) => {
let parallelism = metronomeJobConfig.parallelism || 1;
delete metronomeJobConfig.parallelism;
let deployJobMethod = await chooseDeployJobMethod(metronomeJobConfig);
let deployJobResponse = await deployJob(deployJobMethod, metronomeJobConfig);
let runJobResponse = await runJob(metronomeJobConfig.id);

let runJobPromises = [];
for (let i = 0; i < parallelism; i++) {
runJobPromises.push(runJob(metronomeJobConfig.id));
}
await Promise.all(runJobPromises);

let genericJobResponse = {
jobName: deployJobResponse.id,
id: runJobResponse.id
jobName: deployJobResponse.id
};
return genericJobResponse;
};

module.exports.stopRun = async (jobPlatformName, platformSpecificInternalRunId) => {
let url = util.format('%s/v1/jobs/%s/runs/%s/actions/stop', metronomeUrl, jobPlatformName, platformSpecificInternalRunId);
module.exports.stopRun = async (jobPlatformName) => {
let url = util.format('%s/v1/jobs/%s/runs', metronomeUrl, jobPlatformName);
let options = {
method: 'POST',
method: 'GET',
url: url,
headers
};
await requestSender.send(options);

let currentJobRuns = await requestSender.send(options);

let stopJobPromises = [];
currentJobRuns.forEach((jobRun) => {
stopJobPromises.push(async () => {
let url = util.format('%s/v1/jobs/%s/runs/%s/actions/stop', metronomeUrl, jobPlatformName, jobRun.id);
let options = {
method: 'POST',
url: url,
headers
};
await requestSender.send(options);
});
});

stopJobPromises.forEach(stopJob => stopJob());
await Promise.all(stopJobPromises);
};

async function deployJob(method, metronomeJobConfig) {
Expand Down
3 changes: 2 additions & 1 deletion src/jobs/models/metronome/jobTemplate.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ module.exports.createJobRequest = (jobName, runId, parallelism, environmentVaria
image: dockerImage
},
env: environmentVariables
}
},
parallelism: parallelism
};
};
42 changes: 16 additions & 26 deletions tests/integration-tests/jobs/createJobMetronome-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,11 @@ describe('Create job specific metronome tests', () => {

it('Stop run', async () => {
nock(metronomeConfig.metronomeUrl)
.post(`/v1/jobs/predator.${jobResponseBody.id}/runs/${jobResponseBody.run_id}/actions/stop`)
.get(`/v1/jobs/predator.${jobResponseBody.id}/runs`)
.reply(200, [{ id: 1 }]);

nock(metronomeConfig.metronomeUrl)
.post(`/v1/jobs/predator.${jobResponseBody.id}/runs/1/actions/stop`)
.reply(200);

let stopRunResponse = await schedulerRequestCreator.stopRun(createJobResponse.body.id, createJobResponse.body.run_id, {
Expand Down Expand Up @@ -123,6 +127,7 @@ describe('Create job specific metronome tests', () => {
test_id: testId,
arrival_rate: 1,
duration: 1,
parallelism: 2,
environment: 'test',
run_immediately: true,
max_virtual_users: 100
Expand All @@ -142,7 +147,7 @@ describe('Create job specific metronome tests', () => {
nock(metronomeConfig.metronomeUrl).post(
url => {
return url.startsWith('/v1/jobs') && url.endsWith('/runs');
}).reply(200, {
}).times(2).reply(200, {
id: 'runId'
});

Expand All @@ -167,7 +172,15 @@ describe('Create job specific metronome tests', () => {

it('Stop run', async () => {
nock(metronomeConfig.metronomeUrl)
.post(`/v1/jobs/predator.${jobResponseBody.id}/runs/${jobResponseBody.run_id}/actions/stop`)
.get(`/v1/jobs/predator.${jobResponseBody.id}/runs`)
.reply(200, [{ id: 1 }, { id: 2 }]);

nock(metronomeConfig.metronomeUrl)
.post(`/v1/jobs/predator.${jobResponseBody.id}/runs/1/actions/stop`)
.reply(200);

nock(metronomeConfig.metronomeUrl)
.post(`/v1/jobs/predator.${jobResponseBody.id}/runs/2/actions/stop`)
.reply(200);

let stopRunResponse = await schedulerRequestCreator.stopRun(createJobResponse.body.id, createJobResponse.body.run_id, {
Expand All @@ -190,29 +203,6 @@ describe('Create job specific metronome tests', () => {
});
});
});

describe('Bad requests', () => {
describe('Create job with parallelism > 1 should return 400', () => {
it('Create the job', async () => {
let validBody = {
test_id: testId,
arrival_rate: 1,
duration: 1,
environment: 'test',
run_immediately: true,
max_virtual_users: 100,
parallelism: 2
};

let response = await schedulerRequestCreator.createJob(validBody, {
'Content-Type': 'application/json'
});

should(response.statusCode).eql(400);
should(response.body.message).eql('parallelism is only supported in JOB_PLATFORM: KUBERNETES');
});
});
});
});
}
}).timeout(20000);
44 changes: 11 additions & 33 deletions tests/unit-tests/jobs/helpers/jobVerifier-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ describe('Jobs verifier tests', function () {

describe('verifyTestExists tests', () => {
it('Should pass test id validation', async () => {
req = { body: { test_id: 'id' } };
req = {body: {test_id: 'id'}};

testsManagerStub.resolves();
await jobVerifier.verifyTestExists(req, res, nextStub);
Expand All @@ -47,9 +47,9 @@ describe('Jobs verifier tests', function () {
});

it('Should fail on test id validation when test not found', async () => {
req = { body: { test_id: 'id' } };
req = {body: {test_id: 'id'}};

testsManagerStub.rejects({ statusCode: 404 });
testsManagerStub.rejects({statusCode: 404});

await jobVerifier.verifyTestExists(req, res, nextStub);
should(nextStub.called).eql(true);
Expand All @@ -58,9 +58,9 @@ describe('Jobs verifier tests', function () {
});

it('Should fail on test id validation when error from performance framework api', async () => {
req = { body: { test_id: 'id' } };
req = {body: {test_id: 'id'}};

testsManagerStub.rejects({ statusCode: 500, message: 'failure' });
testsManagerStub.rejects({statusCode: 500, message: 'failure'});

await jobVerifier.verifyTestExists(req, res, nextStub);
should(nextStub.called).eql(true);
Expand All @@ -71,62 +71,40 @@ describe('Jobs verifier tests', function () {

describe('verifyJobBody tests', () => {
it('Run immediately is true and cron expression does not exist, should pass', async () => {
req = { body: { run_immediately: true } };
req = {body: {run_immediately: true}};
await jobVerifier.verifyJobBody(req, res, nextStub);
should(nextStub.args[0][0]).eql(undefined);
});

it('Run immediately is true and cron expression exist, should pass', async () => {
req = { body: { run_immediately: true, cron_expression: '* * *' } };
req = {body: {run_immediately: true, cron_expression: '* * *'}};
await jobVerifier.verifyJobBody(req, res, nextStub);
should(nextStub.args[0][0]).eql(undefined);
});

it('Run immediately is false and cron expression does not exist, should fail', async () => {
req = { body: { run_immediately: false } };
req = {body: {run_immediately: false}};
await jobVerifier.verifyJobBody(req, res, nextStub);
should(nextStub.args[0][0].message).eql('Please provide run_immediately or cron_expression in order to schedule a job');
should(nextStub.args[0][0].statusCode).eql(400);
});

it('Run immediately is false and cron expression exist, should pass', async () => {
req = { body: { run_immediately: false, cron_expression: '* * *' } };
req = {body: {run_immediately: false, cron_expression: '* * *'}};
await jobVerifier.verifyJobBody(req, res, nextStub);
should(nextStub.args[0][0]).eql(undefined);
});
it('Run immediately does not exits and cron expression exist, should pass', async () => {
req = { body: { cron_expression: '* * *' } };
req = {body: {cron_expression: '* * *'}};
await jobVerifier.verifyJobBody(req, res, nextStub);
should(nextStub.args[0][0]).eql(undefined);
});

it('Run immediately does not exits and cron expression does not exist, should pass', async () => {
req = { body: {} };
req = {body: {}};
await jobVerifier.verifyJobBody(req, res, nextStub);
should(nextStub.args[0][0].message).eql('Please provide run_immediately or cron_expression in order to schedule a job');
should(nextStub.args[0][0].statusCode).eql(400);
});

it('Run immediately is true and parallelism is set to 1 with metronome as job platform, should pass', async () => {
req = { body: { run_immediately: true, parallelism: 1 } };
config.jobPlatform = consts.METRONOME;
await jobVerifier.verifyJobBody(req, res, nextStub);
should(nextStub.args[0][0]).eql(undefined);
});

it('Run immediately is true and parallelism is set to 2 with metronome as job platform, should fail', async () => {
req = { body: { run_immediately: true, parallelism: 2 } };
config.jobPlatform = consts.METRONOME;
await jobVerifier.verifyJobBody(req, res, nextStub);
should(nextStub.args[0][0].message).eql('parallelism is only supported in JOB_PLATFORM: KUBERNETES');
should(nextStub.args[0][0].statusCode).eql(400);
});

it('Run immediately is true and parallelism is set to 2 with kubernetes as job platform, should pass', async () => {
req = { body: { run_immediately: true, parallelism: 2 } };
config.jobPlatform = consts.KUBERNETES;
await jobVerifier.verifyJobBody(req, res, nextStub);
should(nextStub.args[0][0]).eql(undefined);
});
});
});
24 changes: 18 additions & 6 deletions tests/unit-tests/jobs/models/metronome/jobConnector-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ describe('Metronome job connector tests', function () {
let jobResponse = await jobConnector.runJob({ id: 'predator.id' });

jobResponse.should.eql({
'id': '20190115084416zH0Ta',
'jobName': 'predator.d651ba1d-79fa-4970-b078-6f9dc4ae43e6'
});

Expand Down Expand Up @@ -70,7 +69,6 @@ describe('Metronome job connector tests', function () {
let jobResponse = await jobConnector.runJob({ id: 'predator.id' });

jobResponse.should.eql({
'id': '20190115084416zH0Ta',
'jobName': 'predator.d651ba1d-79fa-4970-b078-6f9dc4ae43e6'
});

Expand Down Expand Up @@ -136,11 +134,25 @@ describe('Metronome job connector tests', function () {

describe('Stop running job', () => {
it('Stop a running run of specific job', async () => {
requestSenderSendStub.resolves({ statusCode: 200 });
await jobConnector.stopRun('jobPlatformName', 'runId');
requestSenderSendStub.calledOnce.should.eql(true);
requestSenderSendStub.withArgs(sinon.match({ method: 'GET' })).resolves([{ id: 1 }, { id: 2 }]);
requestSenderSendStub.withArgs(sinon.match({ method: 'POST' }))
.resolves({ statusCode: 200 });

await jobConnector.stopRun('jobPlatformName');
requestSenderSendStub.calledThrice.should.eql(true);

requestSenderSendStub.args[0][0].should.eql({
'url': 'localhost:80/v1/jobs/jobPlatformName/runs/runId/actions/stop',
method: 'GET',
url: 'localhost:80/v1/jobs/jobPlatformName/runs',
headers: {}
});
requestSenderSendStub.args[1][0].should.eql({
'url': 'localhost:80/v1/jobs/jobPlatformName/runs/1/actions/stop',
method: 'POST',
headers: {}
});
requestSenderSendStub.args[2][0].should.eql({
'url': 'localhost:80/v1/jobs/jobPlatformName/runs/2/actions/stop',
method: 'POST',
headers: {}
});
Expand Down

0 comments on commit f116410

Please sign in to comment.