Skip to content

Commit

Permalink
feat(workerMonitoring): add model & logic (#39)
Browse files Browse the repository at this point in the history
* feat(workerMonitoring): add model & logic

* fix test

* Review : add debounce & update migration of workerMonitoring

* Review: add NO_ASYNC in debounce

* Add default env variables for test

* Fix snapshot
  • Loading branch information
germainvictor authored Jun 4, 2021
1 parent 970ba94 commit 6c1f25e
Show file tree
Hide file tree
Showing 14 changed files with 430 additions and 4 deletions.
4 changes: 3 additions & 1 deletion lib/graphql/getApolloServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ var batch_1 = __importDefault(require("./batch"));
var pipeline_1 = __importDefault(require("./pipeline"));
var pipelineStep_1 = __importDefault(require("./pipelineStep"));
var jobHoldType_1 = __importDefault(require("./jobHoldType"));
var workerMonitoring_1 = __importDefault(require("./workerMonitoring"));
/**
* @param dbConfig Sequelize database configuration object
* @param gsgParams Params from graphql-sequelize-generator that overwrite the default ones.
Expand All @@ -79,7 +80,8 @@ function getApolloServer(dbConfig, gsgParams, customMutations) {
batch: batch_1["default"](types, models),
pipeline: pipeline_1["default"](types, models),
pipelineStep: pipelineStep_1["default"](types, models),
jobHoldType: jobHoldType_1["default"](types, models)
jobHoldType: jobHoldType_1["default"](types, models),
workerMonitoring: workerMonitoring_1["default"](types, models)
};
return [2 /*return*/, graphql_sequelize_generator_1.generateApolloServer(__assign({ graphqlSchemaDeclaration: graphqlSchemaDeclaration,
types: types,
Expand Down
62 changes: 60 additions & 2 deletions lib/graphql/job/acquire.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,34 @@ var __generator = (this && this.__generator) || function (thisArg, body) {
if (op[0] & 5) throw op[1]; return { value: op[0] ? op[1] : void 0, done: true };
}
};
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
exports.__esModule = true;
var graphql_1 = require("graphql");
var sequelize_1 = require("sequelize");
var debounce_1 = __importDefault(require("debounce"));
var allInstanceOfDebounceWorker = [];
function getInstanceOfDebounceWorker(workerId) {
var _this = this;
var instance = allInstanceOfDebounceWorker.filter(function (instance) { return instance.workerId === workerId; });
if (!instance.length) {
allInstanceOfDebounceWorker.push({
workerId: workerId,
debounce: debounce_1["default"](function (callback) { return callback(); }, 50)
});
return process.env.NO_ASYNC === 'true'
? function (callback) { return __awaiter(_this, void 0, void 0, function () { return __generator(this, function (_a) {
return [2 /*return*/, callback()];
}); }); }
: allInstanceOfDebounceWorker.filter(function (instance) { return instance.workerId === workerId; })[0].debounce;
}
return process.env.NO_ASYNC === 'true'
? function (callback) { return __awaiter(_this, void 0, void 0, function () { return __generator(this, function (_a) {
return [2 /*return*/, callback()];
}); }); }
: instance[0].debounce;
}
function AcquireJobDefinition(graphqlTypes, models) {
var _this = this;
return {
Expand All @@ -50,8 +75,9 @@ function AcquireJobDefinition(graphqlTypes, models) {
workerId: { type: graphql_1.GraphQLString }
},
resolve: function (source, args, context) { return __awaiter(_this, void 0, void 0, function () {
var transaction, allJobHoldType, heldTypes, job;
var transaction, allJobHoldType, heldTypes, job, debounceWorker;
var _a, _b, _c, _d;
var _this = this;
return __generator(this, function (_e) {
switch (_e.label) {
case 0: return [4 /*yield*/, models.sequelize.transaction()];
Expand Down Expand Up @@ -98,7 +124,39 @@ function AcquireJobDefinition(graphqlTypes, models) {
return [4 /*yield*/, transaction.commit()];
case 7:
_e.sent();
return [2 /*return*/, job];
if (!args.workerId) return [3 /*break*/, 9];
debounceWorker = getInstanceOfDebounceWorker(args.workerId);
return [4 /*yield*/, debounceWorker(function () { return __awaiter(_this, void 0, void 0, function () {
var workerMonitoring;
return __generator(this, function (_a) {
switch (_a.label) {
case 0: return [4 /*yield*/, models.workerMonitoring.findOne({
where: { workerId: args.workerId }
})];
case 1:
workerMonitoring = _a.sent();
if (!workerMonitoring) return [3 /*break*/, 3];
return [4 /*yield*/, workerMonitoring.update({
lastCalledAt: new Date()
})];
case 2:
_a.sent();
return [3 /*break*/, 5];
case 3: return [4 /*yield*/, models.workerMonitoring.create({
workerId: args.workerId,
lastCalledAt: new Date()
})];
case 4:
_a.sent();
_a.label = 5;
case 5: return [2 /*return*/];
}
});
}); })];
case 8:
_e.sent();
_e.label = 9;
case 9: return [2 /*return*/, job];
}
});
}); }
Expand Down
14 changes: 14 additions & 0 deletions lib/graphql/workerMonitoring.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
"use strict";
exports.__esModule = true;
function WorkerMonitoringConfiguration(types, models) {
return {
model: models.workerMonitoring,
actions: ['list'],
list: {
before: function (findOptions) {
return findOptions;
}
}
};
}
exports["default"] = WorkerMonitoringConfiguration;
39 changes: 39 additions & 0 deletions lib/models/workerMonitoring.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"use strict";
exports.__esModule = true;
var sequelize_1 = require("sequelize");
function WorkerMonitoring(sequelize) {
var WorkerMonitoring = sequelize.define('workerMonitoring', {
id: {
type: sequelize_1.DataTypes.INTEGER,
primaryKey: true,
autoIncrement: true
},
workerId: {
type: sequelize_1.DataTypes.INTEGER,
allowNull: false
},
lastCalledAt: {
type: sequelize_1.DataTypes.DATE,
allowNull: false
},
createdAt: {
type: sequelize_1.DataTypes.DATE,
allowNull: false
},
updatedAt: {
type: sequelize_1.DataTypes.DATE,
allowNull: false
},
deletedAt: {
type: sequelize_1.DataTypes.DATE,
allowNull: true,
defaultValue: null
}
}, {
freezeTableName: true,
tableName: 'workerMonitoring',
paranoid: true
});
return WorkerMonitoring;
}
exports["default"] = WorkerMonitoring;
34 changes: 34 additions & 0 deletions migrations/20201130182919-create-workerMonitoring.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
'use strict'
module.exports = {
up: function (queryInterface, Sequelize) {
return queryInterface.createTable('workerMonitoring', {
id: {
allowNull: false,
autoIncrement: true,
primaryKey: true,
type: Sequelize.INTEGER,
},
workerId: {
type: Sequelize.STRING,
},
lastCalledAt: {
type: Sequelize.STRING,
},
createdAt: {
allowNull: false,
type: Sequelize.DATE,
},
updatedAt: {
allowNull: false,
type: Sequelize.DATE,
},
deletedAt: {
allowNull: true,
type: Sequelize.DATE,
},
})
},
down: function (queryInterface) {
return queryInterface.dropTable('workerMonitoring')
},
}
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
"graphql-node-jobs": "node ./lib/gnj.js",
"dev": "pm2 delete all -s; pm2 startOrReload ecosystem.dev.config.js",
"build": "tsc --lib es2019,dom --esModuleInterop --outDir ./lib ./src/index.ts ./src/models/*.ts ./src/gnj.ts",
"test": "NODE_ENV=test PORT=3332 jest --runInBand --ci --forceExit --verbose",
"test": "NO_ASYNC=true NODE_ENV=test PORT=3332 jest --runInBand --ci --forceExit --verbose",
"release": "standard-version",
"gnj": "node ./lib/gnj.js",
"start": "node ./tests/server.js"
Expand Down
2 changes: 2 additions & 0 deletions src/graphql/getApolloServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import batch from './batch'
import pipeline from './pipeline'
import pipelineStep from './pipelineStep'
import jobHoldType from './jobHoldType'
import workerMonitoring from './workerMonitoring'

/**
* @param dbConfig Sequelize database configuration object
Expand All @@ -33,6 +34,7 @@ export default async function getApolloServer(
pipeline: pipeline(types, models),
pipelineStep: pipelineStep(types, models),
jobHoldType: jobHoldType(types, models),
workerMonitoring: workerMonitoring(types, models),
}

return generateApolloServer({
Expand Down
47 changes: 47 additions & 0 deletions src/graphql/job/acquire.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,32 @@ import {
SequelizeModels,
} from 'graphql-sequelize-generator/types'
import { Op } from 'sequelize'
import debounce from 'debounce'

const allInstanceOfDebounceWorker: any = []

function getInstanceOfDebounceWorker(workerId: number) {
const instance = allInstanceOfDebounceWorker.filter(
(instance: any) => instance.workerId === workerId
)

if (!instance.length) {
allInstanceOfDebounceWorker.push({
workerId: workerId,
debounce: debounce((callback: Function) => callback(), 50),
})

return process.env.NO_ASYNC === 'true'
? async (callback: Function) => callback()
: allInstanceOfDebounceWorker.filter(
(instance: any) => instance.workerId === workerId
)[0].debounce
}

return process.env.NO_ASYNC === 'true'
? async (callback: Function) => callback()
: instance[0].debounce
}

export default function AcquireJobDefinition(
graphqlTypes: InAndOutTypes,
Expand Down Expand Up @@ -61,6 +87,27 @@ export default function AcquireJobDefinition(
)

await transaction.commit()

if (args.workerId) {
const debounceWorker = getInstanceOfDebounceWorker(args.workerId)
await debounceWorker(async () => {
const workerMonitoring = await models.workerMonitoring.findOne({
where: { workerId: args.workerId },
})

if (workerMonitoring) {
await workerMonitoring.update({
lastCalledAt: new Date(),
})
} else {
await models.workerMonitoring.create({
workerId: args.workerId,
lastCalledAt: new Date(),
})
}
})
}

return job
},
}
Expand Down
20 changes: 20 additions & 0 deletions src/graphql/workerMonitoring.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import {
InAndOutTypes,
ModelDeclarationType,
SequelizeModels,
} from 'graphql-sequelize-generator/types'

export default function WorkerMonitoringConfiguration(
types: InAndOutTypes,
models: SequelizeModels
): ModelDeclarationType {
return {
model: models.workerMonitoring,
actions: ['list'],
list: {
before: (findOptions) => {
return findOptions
},
},
}
}
41 changes: 41 additions & 0 deletions src/models/workerMonitoring.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { DataTypes } from 'sequelize'

export default function WorkerMonitoring(sequelize: any) {
var WorkerMonitoring = sequelize.define(
'workerMonitoring',
{
id: {
type: DataTypes.INTEGER,
primaryKey: true,
autoIncrement: true,
},
workerId: {
type: DataTypes.INTEGER,
allowNull: false,
},
lastCalledAt: {
type: DataTypes.DATE,
allowNull: false,
},
createdAt: {
type: DataTypes.DATE,
allowNull: false,
},
updatedAt: {
type: DataTypes.DATE,
allowNull: false,
},
deletedAt: {
type: DataTypes.DATE,
allowNull: true,
defaultValue: null,
},
},
{
freezeTableName: true,
tableName: 'workerMonitoring',
paranoid: true,
}
)
return WorkerMonitoring
}
23 changes: 23 additions & 0 deletions tests/__snapshots__/workerMonitoring.spec.js.snap
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP

exports[`Test the workerMonitoring endpoint WorkerMonitoring is updated when a new job is acquired 1`] = `
Object {
"acquireJob": Object {
"id": 6,
"name": null,
"output": null,
"status": "processing",
},
}
`;

exports[`Test the workerMonitoring endpoint WorkerMonitoring is updated when a new job is acquired 2`] = `
Object {
"acquireJob": Object {
"id": 7,
"name": null,
"output": null,
"status": "processing",
},
}
`;
4 changes: 4 additions & 0 deletions tests/job.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ const {
CancelRequestedError,
} = require('./../lib/index')

const { cleanupTestsEnv } = require('./tools')

// This is the maximum amount of time the band of test can run before timing-out
jest.setTimeout(600000)

Expand Down Expand Up @@ -170,6 +172,7 @@ describe('Test the job endpoint', () => {
await migrateDatabase()
await seedDatabase()
server = await getNewServer()
cleanupTestsEnv()
})

beforeEach(async () => {
Expand Down Expand Up @@ -752,6 +755,7 @@ describe('Test the job endpoint', () => {
status: 'waiting',
},
}

const timeout = async (ms) =>
new Promise((resolve) => setTimeout(resolve, ms))

Expand Down
3 changes: 3 additions & 0 deletions tests/tools.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
exports.cleanupTestsEnv = () => {
process.env.NO_ASYNC = true
}
Loading

0 comments on commit 6c1f25e

Please sign in to comment.