Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix async tasks race condition for selecting task to be executed #838

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/tasks/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ export class Worker {
return;
}
await sbvrUtils.db.transaction(async (tx) => {
const result = await sbvrUtils.db.executeSql(
const result = await tx.executeSql(
`SELECT ${selectColumns}
FROM task AS t
WHERE
Expand Down
61 changes: 60 additions & 1 deletion test/08-tasks.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import { testInit, testDeInit, testLocalServer } from './lib/test-init';
import { tasks as tasksEnv } from '../src/config-loader/env';
import type Model from '../src/tasks/tasks';
import * as cronParser from 'cron-parser';
import { PINE_TEST_SIGNALS } from './lib/common';
import { getAvailableTCPPort, PINE_TEST_SIGNALS } from './lib/common';
import type { ChildProcess } from 'node:child_process';

const actorId = 1;
const fixturesBasePath = __dirname + '/fixtures/08-tasks/';
Expand Down Expand Up @@ -468,5 +469,63 @@ describe('08 task tests', function () {
attempt_count: 1,
});
});

describe('tasks over multiple instances', async () => {
let pineServers: ChildProcess[] = [];
const testConcurrency = 5;
before(async () => {
const pineServerPromises: Array<Promise<ChildProcess>> = [];
for (let i = 0; i < testConcurrency - 1; i++) {
const PORT = await getAvailableTCPPort();
pineServerPromises.push(
testInit({
listenPort: PORT,
configPath: fixturesBasePath + 'config.js',
deleteDb: false,
taskHandlersPath: fixturesBasePath + 'task-handlers.js',
}),
);
}
pineServers = await Promise.all(pineServerPromises);
});

after(() => {
pineServers.map(testDeInit);
});

it('should only run task once with multiple pine instances', async () => {
const { body: device } = await pineTest
.post({
apiPrefix: 'example/',
resource: 'device',
body: {
name: randomUUID(),
type: randomUUID(),
count: 0,
},
})
.expect(201);

for (let j = 0; j < 30; j++) {
await createTask(pineTest, apikey, {
is_executed_by__handler: 'increment_device_count',
is_executed_with__parameter_set: {
deviceId: device.id,
},
});

await waitFor(async () => {
const { body: deviceAfter } = await pineTest
.get({
apiPrefix: 'example/',
resource: 'device',
id: device.id,
})
.expect(200);
return deviceAfter?.count === j + 1;
});
}
});
});
});
});
8 changes: 7 additions & 1 deletion test/fixtures/08-tasks/example.sbvr
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ Term: note
Term: type
Concept Type: Short Text (Type)

Term: count
Concept Type: Integer (Type)

Term: device

Fact Type: device has name
Expand All @@ -18,4 +21,7 @@ Fact Type: device has note
Necessity: each device has at most one note.

Fact Type: device has type
Necessity: each device has exactly one type.
Necessity: each device has exactly one type.

Fact Type: device has count
Necessity: each device has at most one count.
29 changes: 28 additions & 1 deletion test/fixtures/08-tasks/task-handlers.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { FromSchema } from 'json-schema-to-ts';
import { tasks } from '../../../src/server-glue/module';
import { sbvrUtils, tasks } from '../../../src/server-glue/module';

// Define JSON schema for accepted parameters
const createDeviceParamsSchema = {
Expand All @@ -16,8 +16,22 @@ const createDeviceParamsSchema = {
additionalProperties: false,
} as const;

const incrementDeviceCountParamsSchema = {
type: 'object',
properties: {
deviceId: {
type: 'number',
},
},
required: ['deviceId'],
additionalProperties: false,
} as const;

// Generate type from schema and export for callers to use
export type CreateDeviceParams = FromSchema<typeof createDeviceParamsSchema>;
export type IncrementDeviceCountParams = FromSchema<
typeof incrementDeviceCountParamsSchema
>;

export const initTaskHandlers = () => {
tasks.addTaskHandler(
Expand Down Expand Up @@ -57,5 +71,18 @@ export const initTaskHandlers = () => {
}
});

tasks.addTaskHandler(
'increment_device_count',
async (options) => {
const deviceId = options.params.deviceId;
await sbvrUtils.db.executeSql(
'UPDATE device SET count = count + 1 WHERE id = $1',
[deviceId],
);
return { status: 'succeeded' };
},
incrementDeviceCountParamsSchema,
);

void tasks.worker?.start();
};
13 changes: 13 additions & 0 deletions test/lib/common.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { expect } from 'chai';
import net, { type AddressInfo } from 'node:net';

export function assertExists(v: unknown): asserts v is NonNullable<typeof v> {
expect(v).to.exist;
Expand All @@ -8,3 +9,15 @@ export const PINE_TEST_SIGNALS = {
STOP_TASK_WORKER: 'PINEJS_TEST_STOP_TASK_WORKER',
START_TASK_WORKER: 'PINEJS_TEST_START_TASK_WORKER',
};

export async function getAvailableTCPPort(): Promise<number> {
return new Promise((res) => {
const srv = net.createServer();
srv.listen(0, () => {
const port = (srv.address() as AddressInfo).port;
srv.close(() => {
res(port);
});
});
});
}
Loading