From 096695be5f55bad5f286a2135539ba2589a59cf1 Mon Sep 17 00:00:00 2001 From: Otavio Jacobi Date: Sat, 26 Oct 2024 09:41:35 -0300 Subject: [PATCH 1/2] Add test for tasks over multiple pine processes Change-type: patch --- test/08-tasks.test.ts | 61 ++++++++++++++++++++++++- test/fixtures/08-tasks/example.sbvr | 8 +++- test/fixtures/08-tasks/task-handlers.ts | 29 +++++++++++- test/lib/common.ts | 13 ++++++ 4 files changed, 108 insertions(+), 3 deletions(-) diff --git a/test/08-tasks.test.ts b/test/08-tasks.test.ts index a531c01d6..cfd6f4a1e 100644 --- a/test/08-tasks.test.ts +++ b/test/08-tasks.test.ts @@ -7,7 +7,8 @@ import { testInit, testDeInit, testLocalServer } from './lib/test-init'; import { tasks as tasksEnv } from '../src/config-loader/env'; import type Model from '../src/tasks/tasks'; import * as cronParser from 'cron-parser'; -import { PINE_TEST_SIGNALS } from './lib/common'; +import { getAvailableTCPPort, PINE_TEST_SIGNALS } from './lib/common'; +import type { ChildProcess } from 'node:child_process'; const actorId = 1; const fixturesBasePath = __dirname + '/fixtures/08-tasks/'; @@ -468,5 +469,63 @@ describe('08 task tests', function () { attempt_count: 1, }); }); + + describe('tasks over multiple instances', async () => { + let pineServers: ChildProcess[] = []; + const testConcurrency = 5; + before(async () => { + const pineServerPromises: Array> = []; + for (let i = 0; i < testConcurrency - 1; i++) { + const PORT = await getAvailableTCPPort(); + pineServerPromises.push( + testInit({ + listenPort: PORT, + configPath: fixturesBasePath + 'config.js', + deleteDb: false, + taskHandlersPath: fixturesBasePath + 'task-handlers.js', + }), + ); + } + pineServers = await Promise.all(pineServerPromises); + }); + + after(() => { + pineServers.map(testDeInit); + }); + + it('should only run task once with multiple pine instances', async () => { + const { body: device } = await pineTest + .post({ + apiPrefix: 'example/', + resource: 'device', + body: { + name: randomUUID(), + type: randomUUID(), + count: 0, + }, + }) + .expect(201); + + for (let j = 0; j < 30; j++) { + await createTask(pineTest, apikey, { + is_executed_by__handler: 'increment_device_count', + is_executed_with__parameter_set: { + deviceId: device.id, + }, + }); + + await waitFor(async () => { + const { body: deviceAfter } = await pineTest + .get({ + apiPrefix: 'example/', + resource: 'device', + id: device.id, + }) + .expect(200); + return deviceAfter?.count === j + 1; + }); + } + }); + }); }); }); diff --git a/test/fixtures/08-tasks/example.sbvr b/test/fixtures/08-tasks/example.sbvr index 8f5cd292f..0029afd5c 100644 --- a/test/fixtures/08-tasks/example.sbvr +++ b/test/fixtures/08-tasks/example.sbvr @@ -9,6 +9,9 @@ Term: note Term: type Concept Type: Short Text (Type) +Term: count + Concept Type: Integer (Type) + Term: device Fact Type: device has name @@ -18,4 +21,7 @@ Fact Type: device has note Necessity: each device has at most one note. Fact Type: device has type - Necessity: each device has exactly one type. \ No newline at end of file + Necessity: each device has exactly one type. + +Fact Type: device has count + Necessity: each device has at most one count. \ No newline at end of file diff --git a/test/fixtures/08-tasks/task-handlers.ts b/test/fixtures/08-tasks/task-handlers.ts index 446fa03fa..1a4de9164 100644 --- a/test/fixtures/08-tasks/task-handlers.ts +++ b/test/fixtures/08-tasks/task-handlers.ts @@ -1,5 +1,5 @@ import type { FromSchema } from 'json-schema-to-ts'; -import { tasks } from '../../../src/server-glue/module'; +import { sbvrUtils, tasks } from '../../../src/server-glue/module'; // Define JSON schema for accepted parameters const createDeviceParamsSchema = { @@ -16,8 +16,22 @@ const createDeviceParamsSchema = { additionalProperties: false, } as const; +const incrementDeviceCountParamsSchema = { + type: 'object', + properties: { + deviceId: { + type: 'number', + }, + }, + required: ['deviceId'], + additionalProperties: false, +} as const; + // Generate type from schema and export for callers to use export type CreateDeviceParams = FromSchema; +export type IncrementDeviceCountParams = FromSchema< + typeof incrementDeviceCountParamsSchema +>; export const initTaskHandlers = () => { tasks.addTaskHandler( @@ -57,5 +71,18 @@ export const initTaskHandlers = () => { } }); + tasks.addTaskHandler( + 'increment_device_count', + async (options) => { + const deviceId = options.params.deviceId; + await sbvrUtils.db.executeSql( + 'UPDATE device SET count = count + 1 WHERE id = $1', + [deviceId], + ); + return { status: 'succeeded' }; + }, + incrementDeviceCountParamsSchema, + ); + void tasks.worker?.start(); }; diff --git a/test/lib/common.ts b/test/lib/common.ts index 8910564ca..e7897700d 100644 --- a/test/lib/common.ts +++ b/test/lib/common.ts @@ -1,4 +1,5 @@ import { expect } from 'chai'; +import net, { type AddressInfo } from 'node:net'; export function assertExists(v: unknown): asserts v is NonNullable { expect(v).to.exist; @@ -8,3 +9,15 @@ export const PINE_TEST_SIGNALS = { STOP_TASK_WORKER: 'PINEJS_TEST_STOP_TASK_WORKER', START_TASK_WORKER: 'PINEJS_TEST_START_TASK_WORKER', }; + +export async function getAvailableTCPPort(): Promise { + return new Promise((res) => { + const srv = net.createServer(); + srv.listen(0, () => { + const port = (srv.address() as AddressInfo).port; + srv.close(() => { + res(port); + }); + }); + }); +} From 886567f31ae8a5a7842cca6cff4c643e59f6bc84 Mon Sep 17 00:00:00 2001 From: Otavio Jacobi Date: Sat, 26 Oct 2024 10:17:50 -0300 Subject: [PATCH 2/2] Fix async tasks race condition for selecting task to be executed Change-type: patch --- src/tasks/worker.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tasks/worker.ts b/src/tasks/worker.ts index 53b0cfca9..a8cdf3770 100644 --- a/src/tasks/worker.ts +++ b/src/tasks/worker.ts @@ -220,7 +220,7 @@ export class Worker { return; } await sbvrUtils.db.transaction(async (tx) => { - const result = await sbvrUtils.db.executeSql( + const result = await tx.executeSql( `SELECT ${selectColumns} FROM task AS t WHERE