diff --git a/lib/graphql/job/acquire.js b/lib/graphql/job/acquire.js index cef4276..aaf7039 100644 --- a/lib/graphql/job/acquire.js +++ b/lib/graphql/job/acquire.js @@ -37,6 +37,7 @@ var __generator = (this && this.__generator) || function (thisArg, body) { }; exports.__esModule = true; var graphql_1 = require("graphql"); +var sequelize_1 = require("sequelize"); function AcquireJobDefinition(graphqlTypes, models) { var _this = this; return { @@ -50,25 +51,31 @@ function AcquireJobDefinition(graphqlTypes, models) { }, resolve: function (source, args, context) { return __awaiter(_this, void 0, void 0, function () { var transaction, job; - return __generator(this, function (_a) { - switch (_a.label) { + var _a, _b; + return __generator(this, function (_c) { + switch (_c.label) { case 0: return [4 /*yield*/, models.sequelize.transaction()]; case 1: - transaction = _a.sent(); + transaction = _c.sent(); return [4 /*yield*/, models.job.findOne({ - where: { - type: args.typeList, - status: 'queued' - }, + where: (_a = { + type: args.typeList, + status: 'queued' + }, + _a[sequelize_1.Op.or] = [ + { startAfter: null }, + { startAfter: (_b = {}, _b[sequelize_1.Op.lt] = new Date(), _b) } + ], + _a), order: [['id', 'ASC']], transaction: transaction })]; case 2: - job = _a.sent(); + job = _c.sent(); if (!!job) return [3 /*break*/, 4]; return [4 /*yield*/, transaction.commit()]; case 3: - _a.sent(); + _c.sent(); return [2 /*return*/, null]; case 4: return [4 /*yield*/, job.update({ workerId: args.workerId, @@ -76,10 +83,10 @@ function AcquireJobDefinition(graphqlTypes, models) { startedAt: new Date() }, { transaction: transaction })]; case 5: - _a.sent(); + _c.sent(); return [4 /*yield*/, transaction.commit()]; case 6: - _a.sent(); + _c.sent(); return [2 /*return*/, job]; } }); diff --git a/lib/models/job.js b/lib/models/job.js index 38f7d6f..d475947 100644 --- a/lib/models/job.js +++ b/lib/models/job.js @@ -62,6 +62,11 @@ function Job(sequelize) { allowNull: true, defaultValue: null }, + startAfter: { + type: sequelize_1.DataTypes.DATE, + allowNull: true, + defaultValue: null + }, endedAt: { type: sequelize_1.DataTypes.DATE, allowNull: true, diff --git a/migrations/20200323172523-add-job-start-after.js b/migrations/20200323172523-add-job-start-after.js new file mode 100644 index 0000000..a4f30b7 --- /dev/null +++ b/migrations/20200323172523-add-job-start-after.js @@ -0,0 +1,13 @@ +module.exports = { + up: async function(queryInterface, Sequelize) { + await queryInterface.addColumn('job', 'startAfter', { + type: Sequelize.DATE, + allowNull: true, + defaultValue: null + }) + }, + + down: async function(queryInterface) { + await queryInterface.removeColumn('job', 'startAfter') + } +} diff --git a/package.json b/package.json index eff59cf..9c9a235 100644 --- a/package.json +++ b/package.json @@ -12,10 +12,6 @@ "migrations/*" ], "dependencies": { - "@types/debug": "^4.1.5", - "@types/node-fetch": "^2.5.4", - "@types/spdy": "^3.4.4", - "@types/uuid": "^3.4.6", "apollo-cache-inmemory": "^1.6.5", "apollo-client": "^2.6.8", "apollo-link-http": "^1.5.16", @@ -40,6 +36,11 @@ "node": ">=12.14.0" }, "devDependencies": { + "@types/debug": "^4.1.5", + "@types/node-fetch": "^2.5.4", + "@types/spdy": "^3.4.4", + "@types/uuid": "^3.4.6", + "date-fns": "^2.11.0", "express": "^4.17.1", "graphql": "^14.0.2", "graphql-relay": "^0.5.5", diff --git a/src/graphql/job/acquire.ts b/src/graphql/job/acquire.ts index 738a6a6..b2fbc19 100644 --- a/src/graphql/job/acquire.ts +++ b/src/graphql/job/acquire.ts @@ -1,4 +1,5 @@ import { GraphQLString, GraphQLNonNull, GraphQLList } from 'graphql' +import { Op } from 'sequelize' import { InAndOutGraphqlTypes, SequelizeModels, @@ -24,7 +25,11 @@ export default function AcquireJobDefinition( const job = await models.job.findOne({ where: { type: args.typeList, - status: 'queued' + status: 'queued', + [Op.or]: [ + { startAfter: null }, + { startAfter: { [Op.lt]: new Date() } } + ] }, order: [['id', 'ASC']], transaction diff --git a/src/models/job.ts b/src/models/job.ts index fefe2de..d06c9de 100644 --- a/src/models/job.ts +++ b/src/models/job.ts @@ -63,6 +63,11 @@ export default function Job(sequelize: any) { allowNull: true, defaultValue: null }, + startAfter: { + type: DataTypes.DATE, + allowNull: true, + defaultValue: null + }, endedAt: { type: DataTypes.DATE, allowNull: true, diff --git a/tests/job.spec.js b/tests/job.spec.js index 3e88fdb..8453025 100644 --- a/tests/job.spec.js +++ b/tests/job.spec.js @@ -1,4 +1,6 @@ const request = require('supertest') +const addMinutes = require('date-fns/addMinutes') + const { migrateDatabase, seedDatabase, @@ -292,4 +294,35 @@ describe('Test the job endpoint', () => { expect(jobEntity.output).toMatchSnapshot() expect(jobEntity.processingInfo).toMatchSnapshot() }) + + it('When a job is planified to be run in the future, it cannot be acquired.', async () => { + const date = new Date() + const job = await models.job.findByPk(1) + await job.update({ startAfter: addMinutes(date, 5) }) + + const response = await request(server) + .post('/graphql') + .send( + acquireJob({ + typeList: ['a'] + }) + ) + + expect(response.body.errors).toBeUndefined() + expect(response.body.data.acquireJob).toBe(null) + + // A few milli-seconds passed, so the job should be returned + await job.update({ startAfter: date }) + + const response2 = await request(server) + .post('/graphql') + .send( + acquireJob({ + typeList: ['a'] + }) + ) + + expect(response2.body.errors).toBeUndefined() + expect(response2.body.data.acquireJob).not.toBe(null) + }) }) diff --git a/yarn.lock b/yarn.lock index a43cf0e..1f9d091 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1937,6 +1937,11 @@ date-fns@1.30.1: resolved "https://registry.yarnpkg.com/date-fns/-/date-fns-1.30.1.tgz#2e71bf0b119153dbb4cc4e88d9ea5acfb50dc05c" integrity sha512-hBSVCvSmWC+QypYObzwGOd9wqdDpOt+0wl0KbU+R+uuZBS1jN8VsD1ss3irQDknRj5NvxiTF6oj/nDRnN/UQNw== +date-fns@^2.11.0: + version "2.11.0" + resolved "https://registry.yarnpkg.com/date-fns/-/date-fns-2.11.0.tgz#ec2b44977465b9dcb370021d5e6c019b19f36d06" + integrity sha512-8P1cDi8ebZyDxUyUprBXwidoEtiQAawYPGvpfb+Dg0G6JrQ+VozwOmm91xYC0vAv1+0VmLehEPb+isg4BGUFfA== + dateformat@^3.0.0: version "3.0.3" resolved "https://registry.yarnpkg.com/dateformat/-/dateformat-3.0.3.tgz#a6e37499a4d9a9cf85ef5872044d62901c9889ae"