From f116410e5f8505ab04e9c0349dd9af44a062b4a5 Mon Sep 17 00:00:00 2001 From: enudler Date: Sun, 24 Feb 2019 12:38:22 +0200 Subject: [PATCH] feat(jobs): support for parallelism in metronome (#63) --- src/jobs/helpers/jobVerifier.js | 6 --- src/jobs/models/jobManager.js | 19 +++----- src/jobs/models/metronome/jobConnector.js | 37 +++++++++++++--- src/jobs/models/metronome/jobTemplate.js | 3 +- .../jobs/createJobMetronome-test.js | 42 +++++++----------- .../jobs/helpers/jobVerifier-test.js | 44 +++++-------------- .../models/metronome/jobConnector-test.js | 24 +++++++--- 7 files changed, 84 insertions(+), 91 deletions(-) diff --git a/src/jobs/helpers/jobVerifier.js b/src/jobs/helpers/jobVerifier.js index b453d4bb0..44b54a046 100644 --- a/src/jobs/helpers/jobVerifier.js +++ b/src/jobs/helpers/jobVerifier.js @@ -1,7 +1,5 @@ 'use strict'; const testsManager = require('../../tests/models/manager'); -const serviceConfig = require('../../config/serviceConfig'); -const consts = require('../../common/consts'); module.exports.verifyJobBody = (req, res, next) => { let errorToThrow; @@ -9,11 +7,7 @@ module.exports.verifyJobBody = (req, res, next) => { if (!(jobBody.run_immediately || jobBody.cron_expression)) { errorToThrow = new Error('Please provide run_immediately or cron_expression in order to schedule a job'); errorToThrow.statusCode = 400; - } else if (serviceConfig.jobPlatform !== consts.KUBERNETES && serviceConfig.jobPlatform !== consts.DOCKER && jobBody.parallelism > 1) { - errorToThrow = new Error(`parallelism is only supported in JOB_PLATFORM: ${consts.KUBERNETES}`); - errorToThrow.statusCode = 400; } - next(errorToThrow); }; diff --git a/src/jobs/models/jobManager.js b/src/jobs/models/jobManager.js index e75bd91ed..c76161860 100644 --- a/src/jobs/models/jobManager.js +++ b/src/jobs/models/jobManager.js @@ -1,6 +1,5 @@ 'use strict'; let logger = require('../../common/logger'); -let consts = require('../../common/consts'); let uuid = require('uuid'); let CronJob = require('cron').CronJob; @@ -156,20 +155,16 @@ function createResponse(jobId, jobBody, runId) { function createJobRequest(jobId, runId, jobBody, dockerImage) { let jobName = util.format(JOB_PLATFORM_NAME, jobId); - let parallelism = 1; - let arrivalRatePerRunner = jobBody.arrival_rate; let rampToPerRunner = jobBody.ramp_to; let maxVirtualUsersPerRunner = jobBody.max_virtual_users; - if (consts.KUBERNETES === config.jobPlatform || consts.DOCKER === config.jobPlatform) { - parallelism = jobBody.parallelism || 1; - arrivalRatePerRunner = Math.ceil(jobBody.arrival_rate / parallelism); - if (jobBody.ramp_to) { - rampToPerRunner = Math.ceil(jobBody.ramp_to / parallelism); - } - if (jobBody.max_virtual_users) { - maxVirtualUsersPerRunner = Math.ceil(jobBody.max_virtual_users / parallelism); - } + let parallelism = jobBody.parallelism || 1; + let arrivalRatePerRunner = Math.ceil(jobBody.arrival_rate / parallelism); + if (jobBody.ramp_to) { + rampToPerRunner = Math.ceil(jobBody.ramp_to / parallelism); + } + if (jobBody.max_virtual_users) { + maxVirtualUsersPerRunner = Math.ceil(jobBody.max_virtual_users / parallelism); } let environmentVariables = { diff --git a/src/jobs/models/metronome/jobConnector.js b/src/jobs/models/metronome/jobConnector.js index ad3e60d21..430f7eecd 100644 --- a/src/jobs/models/metronome/jobConnector.js +++ b/src/jobs/models/metronome/jobConnector.js @@ -12,25 +12,48 @@ if (metronomeConfig.metronomeToken) { } module.exports.runJob = async (metronomeJobConfig) => { + let parallelism = metronomeJobConfig.parallelism || 1; + delete metronomeJobConfig.parallelism; let deployJobMethod = await chooseDeployJobMethod(metronomeJobConfig); let deployJobResponse = await deployJob(deployJobMethod, metronomeJobConfig); - let runJobResponse = await runJob(metronomeJobConfig.id); + + let runJobPromises = []; + for (let i = 0; i < parallelism; i++) { + runJobPromises.push(runJob(metronomeJobConfig.id)); + } + await Promise.all(runJobPromises); let genericJobResponse = { - jobName: deployJobResponse.id, - id: runJobResponse.id + jobName: deployJobResponse.id }; return genericJobResponse; }; -module.exports.stopRun = async (jobPlatformName, platformSpecificInternalRunId) => { - let url = util.format('%s/v1/jobs/%s/runs/%s/actions/stop', metronomeUrl, jobPlatformName, platformSpecificInternalRunId); +module.exports.stopRun = async (jobPlatformName) => { + let url = util.format('%s/v1/jobs/%s/runs', metronomeUrl, jobPlatformName); let options = { - method: 'POST', + method: 'GET', url: url, headers }; - await requestSender.send(options); + + let currentJobRuns = await requestSender.send(options); + + let stopJobPromises = []; + currentJobRuns.forEach((jobRun) => { + stopJobPromises.push(async () => { + let url = util.format('%s/v1/jobs/%s/runs/%s/actions/stop', metronomeUrl, jobPlatformName, jobRun.id); + let options = { + method: 'POST', + url: url, + headers + }; + await requestSender.send(options); + }); + }); + + stopJobPromises.forEach(stopJob => stopJob()); + await Promise.all(stopJobPromises); }; async function deployJob(method, metronomeJobConfig) { diff --git a/src/jobs/models/metronome/jobTemplate.js b/src/jobs/models/metronome/jobTemplate.js index e47a5ff7d..4f63a3f83 100644 --- a/src/jobs/models/metronome/jobTemplate.js +++ b/src/jobs/models/metronome/jobTemplate.js @@ -13,6 +13,7 @@ module.exports.createJobRequest = (jobName, runId, parallelism, environmentVaria image: dockerImage }, env: environmentVariables - } + }, + parallelism: parallelism }; }; diff --git a/tests/integration-tests/jobs/createJobMetronome-test.js b/tests/integration-tests/jobs/createJobMetronome-test.js index 19155a4eb..c0f162728 100644 --- a/tests/integration-tests/jobs/createJobMetronome-test.js +++ b/tests/integration-tests/jobs/createJobMetronome-test.js @@ -90,7 +90,11 @@ describe('Create job specific metronome tests', () => { it('Stop run', async () => { nock(metronomeConfig.metronomeUrl) - .post(`/v1/jobs/predator.${jobResponseBody.id}/runs/${jobResponseBody.run_id}/actions/stop`) + .get(`/v1/jobs/predator.${jobResponseBody.id}/runs`) + .reply(200, [{ id: 1 }]); + + nock(metronomeConfig.metronomeUrl) + .post(`/v1/jobs/predator.${jobResponseBody.id}/runs/1/actions/stop`) .reply(200); let stopRunResponse = await schedulerRequestCreator.stopRun(createJobResponse.body.id, createJobResponse.body.run_id, { @@ -123,6 +127,7 @@ describe('Create job specific metronome tests', () => { test_id: testId, arrival_rate: 1, duration: 1, + parallelism: 2, environment: 'test', run_immediately: true, max_virtual_users: 100 @@ -142,7 +147,7 @@ describe('Create job specific metronome tests', () => { nock(metronomeConfig.metronomeUrl).post( url => { return url.startsWith('/v1/jobs') && url.endsWith('/runs'); - }).reply(200, { + }).times(2).reply(200, { id: 'runId' }); @@ -167,7 +172,15 @@ describe('Create job specific metronome tests', () => { it('Stop run', async () => { nock(metronomeConfig.metronomeUrl) - .post(`/v1/jobs/predator.${jobResponseBody.id}/runs/${jobResponseBody.run_id}/actions/stop`) + .get(`/v1/jobs/predator.${jobResponseBody.id}/runs`) + .reply(200, [{ id: 1 }, { id: 2 }]); + + nock(metronomeConfig.metronomeUrl) + .post(`/v1/jobs/predator.${jobResponseBody.id}/runs/1/actions/stop`) + .reply(200); + + nock(metronomeConfig.metronomeUrl) + .post(`/v1/jobs/predator.${jobResponseBody.id}/runs/2/actions/stop`) .reply(200); let stopRunResponse = await schedulerRequestCreator.stopRun(createJobResponse.body.id, createJobResponse.body.run_id, { @@ -190,29 +203,6 @@ describe('Create job specific metronome tests', () => { }); }); }); - - describe('Bad requests', () => { - describe('Create job with parallelism > 1 should return 400', () => { - it('Create the job', async () => { - let validBody = { - test_id: testId, - arrival_rate: 1, - duration: 1, - environment: 'test', - run_immediately: true, - max_virtual_users: 100, - parallelism: 2 - }; - - let response = await schedulerRequestCreator.createJob(validBody, { - 'Content-Type': 'application/json' - }); - - should(response.statusCode).eql(400); - should(response.body.message).eql('parallelism is only supported in JOB_PLATFORM: KUBERNETES'); - }); - }); - }); }); } }).timeout(20000); \ No newline at end of file diff --git a/tests/unit-tests/jobs/helpers/jobVerifier-test.js b/tests/unit-tests/jobs/helpers/jobVerifier-test.js index e60b48012..b062c8b47 100644 --- a/tests/unit-tests/jobs/helpers/jobVerifier-test.js +++ b/tests/unit-tests/jobs/helpers/jobVerifier-test.js @@ -38,7 +38,7 @@ describe('Jobs verifier tests', function () { describe('verifyTestExists tests', () => { it('Should pass test id validation', async () => { - req = { body: { test_id: 'id' } }; + req = {body: {test_id: 'id'}}; testsManagerStub.resolves(); await jobVerifier.verifyTestExists(req, res, nextStub); @@ -47,9 +47,9 @@ describe('Jobs verifier tests', function () { }); it('Should fail on test id validation when test not found', async () => { - req = { body: { test_id: 'id' } }; + req = {body: {test_id: 'id'}}; - testsManagerStub.rejects({ statusCode: 404 }); + testsManagerStub.rejects({statusCode: 404}); await jobVerifier.verifyTestExists(req, res, nextStub); should(nextStub.called).eql(true); @@ -58,9 +58,9 @@ describe('Jobs verifier tests', function () { }); it('Should fail on test id validation when error from performance framework api', async () => { - req = { body: { test_id: 'id' } }; + req = {body: {test_id: 'id'}}; - testsManagerStub.rejects({ statusCode: 500, message: 'failure' }); + testsManagerStub.rejects({statusCode: 500, message: 'failure'}); await jobVerifier.verifyTestExists(req, res, nextStub); should(nextStub.called).eql(true); @@ -71,62 +71,40 @@ describe('Jobs verifier tests', function () { describe('verifyJobBody tests', () => { it('Run immediately is true and cron expression does not exist, should pass', async () => { - req = { body: { run_immediately: true } }; + req = {body: {run_immediately: true}}; await jobVerifier.verifyJobBody(req, res, nextStub); should(nextStub.args[0][0]).eql(undefined); }); it('Run immediately is true and cron expression exist, should pass', async () => { - req = { body: { run_immediately: true, cron_expression: '* * *' } }; + req = {body: {run_immediately: true, cron_expression: '* * *'}}; await jobVerifier.verifyJobBody(req, res, nextStub); should(nextStub.args[0][0]).eql(undefined); }); it('Run immediately is false and cron expression does not exist, should fail', async () => { - req = { body: { run_immediately: false } }; + req = {body: {run_immediately: false}}; await jobVerifier.verifyJobBody(req, res, nextStub); should(nextStub.args[0][0].message).eql('Please provide run_immediately or cron_expression in order to schedule a job'); should(nextStub.args[0][0].statusCode).eql(400); }); it('Run immediately is false and cron expression exist, should pass', async () => { - req = { body: { run_immediately: false, cron_expression: '* * *' } }; + req = {body: {run_immediately: false, cron_expression: '* * *'}}; await jobVerifier.verifyJobBody(req, res, nextStub); should(nextStub.args[0][0]).eql(undefined); }); it('Run immediately does not exits and cron expression exist, should pass', async () => { - req = { body: { cron_expression: '* * *' } }; + req = {body: {cron_expression: '* * *'}}; await jobVerifier.verifyJobBody(req, res, nextStub); should(nextStub.args[0][0]).eql(undefined); }); it('Run immediately does not exits and cron expression does not exist, should pass', async () => { - req = { body: {} }; + req = {body: {}}; await jobVerifier.verifyJobBody(req, res, nextStub); should(nextStub.args[0][0].message).eql('Please provide run_immediately or cron_expression in order to schedule a job'); should(nextStub.args[0][0].statusCode).eql(400); }); - - it('Run immediately is true and parallelism is set to 1 with metronome as job platform, should pass', async () => { - req = { body: { run_immediately: true, parallelism: 1 } }; - config.jobPlatform = consts.METRONOME; - await jobVerifier.verifyJobBody(req, res, nextStub); - should(nextStub.args[0][0]).eql(undefined); - }); - - it('Run immediately is true and parallelism is set to 2 with metronome as job platform, should fail', async () => { - req = { body: { run_immediately: true, parallelism: 2 } }; - config.jobPlatform = consts.METRONOME; - await jobVerifier.verifyJobBody(req, res, nextStub); - should(nextStub.args[0][0].message).eql('parallelism is only supported in JOB_PLATFORM: KUBERNETES'); - should(nextStub.args[0][0].statusCode).eql(400); - }); - - it('Run immediately is true and parallelism is set to 2 with kubernetes as job platform, should pass', async () => { - req = { body: { run_immediately: true, parallelism: 2 } }; - config.jobPlatform = consts.KUBERNETES; - await jobVerifier.verifyJobBody(req, res, nextStub); - should(nextStub.args[0][0]).eql(undefined); - }); }); }); \ No newline at end of file diff --git a/tests/unit-tests/jobs/models/metronome/jobConnector-test.js b/tests/unit-tests/jobs/models/metronome/jobConnector-test.js index 0c6eb7392..5086b9292 100644 --- a/tests/unit-tests/jobs/models/metronome/jobConnector-test.js +++ b/tests/unit-tests/jobs/models/metronome/jobConnector-test.js @@ -37,7 +37,6 @@ describe('Metronome job connector tests', function () { let jobResponse = await jobConnector.runJob({ id: 'predator.id' }); jobResponse.should.eql({ - 'id': '20190115084416zH0Ta', 'jobName': 'predator.d651ba1d-79fa-4970-b078-6f9dc4ae43e6' }); @@ -70,7 +69,6 @@ describe('Metronome job connector tests', function () { let jobResponse = await jobConnector.runJob({ id: 'predator.id' }); jobResponse.should.eql({ - 'id': '20190115084416zH0Ta', 'jobName': 'predator.d651ba1d-79fa-4970-b078-6f9dc4ae43e6' }); @@ -136,11 +134,25 @@ describe('Metronome job connector tests', function () { describe('Stop running job', () => { it('Stop a running run of specific job', async () => { - requestSenderSendStub.resolves({ statusCode: 200 }); - await jobConnector.stopRun('jobPlatformName', 'runId'); - requestSenderSendStub.calledOnce.should.eql(true); + requestSenderSendStub.withArgs(sinon.match({ method: 'GET' })).resolves([{ id: 1 }, { id: 2 }]); + requestSenderSendStub.withArgs(sinon.match({ method: 'POST' })) + .resolves({ statusCode: 200 }); + + await jobConnector.stopRun('jobPlatformName'); + requestSenderSendStub.calledThrice.should.eql(true); + requestSenderSendStub.args[0][0].should.eql({ - 'url': 'localhost:80/v1/jobs/jobPlatformName/runs/runId/actions/stop', + method: 'GET', + url: 'localhost:80/v1/jobs/jobPlatformName/runs', + headers: {} + }); + requestSenderSendStub.args[1][0].should.eql({ + 'url': 'localhost:80/v1/jobs/jobPlatformName/runs/1/actions/stop', + method: 'POST', + headers: {} + }); + requestSenderSendStub.args[2][0].should.eql({ + 'url': 'localhost:80/v1/jobs/jobPlatformName/runs/2/actions/stop', method: 'POST', headers: {} });