Skip to content

Commit

Permalink
fix(acquire): Fix transaction allowing repeatable/uncommitted reads. …
Browse files Browse the repository at this point in the history
…the Acquire() function must work as a semaphore and always return an unique job without throwing in case of conflict.
  • Loading branch information
vincentdesmares committed Apr 15, 2024
1 parent dd51e2b commit 52cd5ed
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 15 deletions.
27 changes: 16 additions & 11 deletions lib/graphql/job/acquire.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,21 @@ function AcquireJobDefinition(graphqlTypes, models) {
var _a, _b, _c, _d;
return __generator(this, function (_e) {
switch (_e.label) {
case 0: return [4 /*yield*/, models.sequelize.transaction()];
case 0: return [4 /*yield*/, models.sequelize.transaction({
isolationLevel: sequelize_1.Transaction.ISOLATION_LEVELS.SERIALIZABLE
})];
case 1:
transaction = _e.sent();
return [4 /*yield*/, models.jobHoldType.findAll({ transaction: transaction })];
case 2:
allJobHoldType = _e.sent();
heldTypes = allJobHoldType.map(function (heldType) { return heldType.type; });
if (heldTypes.includes('all')) {
return [2 /*return*/, null];
}
if (!heldTypes.includes('all')) return [3 /*break*/, 4];
return [4 /*yield*/, transaction.rollback()];
case 3:
_e.sent();
return [2 /*return*/, null];
case 4:
conditions = [
(_a = {
type: args.typeList,
Expand All @@ -88,22 +93,22 @@ function AcquireJobDefinition(graphqlTypes, models) {
order: [['id', 'ASC']],
transaction: transaction
})];
case 3:
case 5:
job = _e.sent();
if (!!job) return [3 /*break*/, 5];
return [4 /*yield*/, transaction.commit()];
case 4:
if (!!job) return [3 /*break*/, 7];
return [4 /*yield*/, transaction.rollback()];
case 6:
_e.sent();
return [2 /*return*/, null];
case 5: return [4 /*yield*/, job.update({
case 7: return [4 /*yield*/, job.update({
workerId: args.workerId,
status: 'processing',
startedAt: new Date()
}, { transaction: transaction })];
case 6:
case 8:
_e.sent();
return [4 /*yield*/, transaction.commit()];
case 7:
case 9:
_e.sent();
return [2 /*return*/, job];
}
Expand Down
12 changes: 8 additions & 4 deletions src/graphql/job/acquire.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { GraphQLString, GraphQLNonNull, GraphQLList } from 'graphql'
import {
CustomMutationConfiguration,
InAndOutTypes,
SequelizeModels,
} from '@teamstarter/graphql-sequelize-generator/types'
import { Op } from 'sequelize'
import { GraphQLList, GraphQLNonNull, GraphQLString } from 'graphql'
import { Op, Transaction } from 'sequelize'

export default function AcquireJobDefinition(
graphqlTypes: InAndOutTypes,
Expand All @@ -24,11 +24,14 @@ export default function AcquireJobDefinition(
workerType: { type: GraphQLString },
},
resolve: async (source, args, context) => {
const transaction = await models.sequelize.transaction()
const transaction = await models.sequelize.transaction({
isolationLevel: Transaction.ISOLATION_LEVELS.SERIALIZABLE
})
const allJobHoldType = await models.jobHoldType.findAll({ transaction })
const heldTypes = allJobHoldType.map((heldType: any) => heldType.type)

if (heldTypes.includes('all')) {
await transaction.rollback()
return null
}

Expand Down Expand Up @@ -56,8 +59,9 @@ export default function AcquireJobDefinition(
order: [['id', 'ASC']],
transaction,
})

if (!job) {
await transaction.commit()
await transaction.rollback()
return null
}

Expand Down

0 comments on commit 52cd5ed

Please sign in to comment.