Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: store queue status in the database to persist queue status between restarts #18

Merged
merged 3 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions server/scripts/database/database-status.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { StatusTable } from '../../../types/database';
import { database } from './database';

// export function InitializeStatus() {
// const;
// }

export function GetStatusFromDatabase(id: string) {
try {
const statusStatement = database?.prepare<{ id: string }, StatusTable>(
'SELECT state FROM status WHERE id = $id'
);
const statusQuery = statusStatement?.get({ id: id });
return statusQuery;
} catch (err) {
console.error(`[server] [database] [error] Could not get the status of '${id}'.`);
console.error(err);
}
}

export function UpdateStatusInDatabase(id: string, state: number) {
try {
const updateStatement = database?.prepare(
'INSERT INTO status (id, state) VALUES ($id, $state) ON CONFLICT (id) DO UPDATE SET state = $state'
);
updateStatement?.run({ id: id, state: state });
console.log(`[server] [database] Sucessfully updated the status of '${id}' to '${state}'.`);
} catch (err) {
console.error(`[server] [database] [error] Could not update the status of '${id}'.`);
console.error(err);
}
}
108 changes: 9 additions & 99 deletions server/scripts/database/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,109 +13,19 @@ export function DatabaseConnect() {
try {
const db = new Database(databasePath, {});

// if (!db) throw new Error('[database] [error] Could not connect to the database...');

// Create the queue table if it doesn't exist
const initQueueStatement = db.prepare(
'CREATE TABLE IF NOT EXISTS queue(id TEXT NOT NULL, job TEXT NOT NULL, PRIMARY KEY (id))'
);
initQueueStatement.run();
const initQueueStatement = [
db.prepare(
'CREATE TABLE IF NOT EXISTS queue(id TEXT NOT NULL, job TEXT NOT NULL, PRIMARY KEY (id))'
),
db.prepare(
'CREATE TABLE IF NOT EXISTS status(id TEXT NOT NULL, state INTEGER NOT NULL, PRIMARY KEY (id))'
),
];
initQueueStatement.forEach((statement) => statement.run());
database = db;
console.log('[server] [database] The database connection has been initalized!');
} catch (err) {
console.error(err);
}
}

// export function GetQueueFromDatabase() {
// try {
// if (!database) throw new Error('[database] [error] There is no database connection...');

// const queueStatement = database.prepare<[], QueueTable>('SELECT * FROM queue');
// const queueTable = queueStatement.all();
// const queueResult = queueTable
// .map((entry) => {
// const newEntry: QueueEntry = {
// id: entry.id,
// job: JSON.parse(entry.job),
// };
// return newEntry;
// })
// .reduce<Queue>((prev, curr) => {
// prev[curr.id] = curr.job;
// return prev;
// }, {});
// console.log(
// `[server] [database] Retrieved ${
// Object.keys(queueResult).length
// } queue jobs from the database.`
// );
// return queueResult;
// } catch (err) {
// console.error(`[database] [error] Could not get jobs from the queue table.`);
// console.error(err);
// return null;
// }
// }

// export function GetJobFromDatabase(id: string): Job | undefined {
// try {
// if (!database) throw new Error('[database] [error] There is no database connection...');

// const jobStatement = database.prepare<{ id: string }, QueueTable>(
// 'SELECT job FROM queue WHERE id = $id'
// );
// const jobQuery = jobStatement.get({ id: id });
// const jobResult: Job = jobQuery ? JSON.parse(jobQuery.job) : jobQuery;
// return jobResult;
// } catch (err) {
// console.error(`[database] [error] Could not get jobs from the queue table.`);
// console.error(err);
// }
// }

// export function InsertJobToDatabase(id: string, job: Job) {
// try {
// if (!database) throw new Error('[database] [error] There is no database connection...');

// const jobJSON = JSON.stringify(job);
// const insertStatement = database.prepare<QueueTable, QueueTable>(
// 'INSERT INTO queue (id, job) VALUES ($id, $job)'
// );
// const insertResult = insertStatement.run({ id: id, job: jobJSON });
// console.log(`[server] [database] Successfully inserted job '${id}' into the database.`);
// return insertResult;
// } catch (err) {
// console.error(`[server] [error] [database] Could not insert job '${id}' into queue table.`);
// console.error(err);
// }
// }

// export function UpdateJobInDatabase(id: string, job: Job) {
// try {
// if (!database) throw new Error('[database] [error] There is no database connection...');

// const jobJSON = JSON.stringify(job);
// const updateStatement = database.prepare('UPDATE queue SET job = $job WHERE id = $id');
// const updateResult = updateStatement.run({ id: id, job: jobJSON });
// console.log(`[server] [database] Successfully updated job '${id}' in the database.`);
// return updateResult;
// } catch (err) {
// console.error(`[server] [error] [database] Could not update job '${id}'.`);
// console.error(err);
// }
// }

// export function RemoveJobFromDatabase(id: string) {
// try {
// if (!database) throw new Error('[database] [error] There is no database connection...');

// const removalStatement = database.prepare('DELETE FROM queue WHERE id = $id');
// const removalResult = removalStatement.run({ id: id });
// console.log(`[server] [database] Successfully removed job '${id}' from the database.`);
// return removalResult;
// } catch (err) {
// console.error(`[server] [error] [database] Could not remove job '${id}'.`);
// console.error(err);
// }
// }
106 changes: 61 additions & 45 deletions server/scripts/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,30 @@ import {
RemoveJobFromDatabase,
UpdateJobInDatabase,
} from './database/database-queue';
import { GetStatusFromDatabase, UpdateStatusInDatabase } from './database/database-status';
import { GetPresets } from './presets';
import { SearchForWorker } from './worker';
import hash from 'object-hash';

export const queuePath: string = './data/queue.json';

// export let queue: Queue = {};

export let state: QueueStatus = QueueStatus.Stopped;

let workerSearchInterval: null | NodeJS.Timeout = null;

export function GetQueue() {
return GetQueueFromDatabase();
}

// Init --------------------------------------------------------------------------------------------
export function InitializeQueue() {
// Queue Status
const status = GetQueueStatus();
if (status) {
console.log(
`[server] [queue] Existing queue status '${QueueStatus[status]}' retreived from the database.`
);
EmitToAllClients('queue-status-update', status);
} else {
SetQueueStatus(QueueStatus.Stopped);
console.error(
`[server] [queue] The queue status does not exist in the database, initializing to the state 'stopped'.`
);
}

// Queue Data
const queue = GetQueueFromDatabase();
if (queue) {
Object.keys(queue).forEach((jobID) => {
Expand All @@ -55,14 +62,57 @@ export function InitializeQueue() {
}
}

// Status ------------------------------------------------------------------------------------------
export function GetQueueStatus() {
const status = GetStatusFromDatabase('queue')?.state as QueueStatus;
return status;
}

export function SetQueueStatus(newState: QueueStatus) {
UpdateStatusInDatabase('queue', newState);
EmitToAllClients('queue-status-update', newState);
}

export function StartQueue(clientID: string) {
if (GetQueueStatus() == QueueStatus.Stopped) {
const newStatus = QueueStatus.Active;
SetQueueStatus(newStatus);

console.log(`[server] The queue has been started by client '${clientID}'`);

workerSearchInterval = setInterval(SearchForWorker, 1000);
}
}

export function StopQueue(clientID?: string) {
if (GetQueueStatus() != QueueStatus.Stopped) {
const newStatus = QueueStatus.Stopped;
SetQueueStatus(newStatus);

const stoppedBy = clientID ? `client '${clientID}'` : 'the server.';

console.log(`[server] The queue has been stopped by ${stoppedBy}.`);

if (workerSearchInterval) {
clearInterval(workerSearchInterval);
}
EmitToAllConnections('queue-status-changed', newStatus);
}
}

// Queue -------------------------------------------------------------------------------------------
export function GetQueue() {
return GetQueueFromDatabase();
}

export function UpdateQueue() {
const updatedQueue = GetQueueFromDatabase();
if (updatedQueue) {
// queue = updatedQueue;
EmitToAllClients('queue-update', updatedQueue);
}
}

// Job Actions -------------------------------------------------------------------------------------
export function AddJob(data: QueueRequest) {
const jobID =
new Date().getTime().toString() +
Expand Down Expand Up @@ -173,40 +223,6 @@ export function RemoveJob(id: string) {
}
}

export function GetQueueStatus() {
return state;
}

export function SetQueueStatus(newState: QueueStatus) {
state = newState;
EmitToAllClients('queue-status-update', state);
}

export function StartQueue(clientID: string) {
if (state == QueueStatus.Stopped) {
SetQueueStatus(QueueStatus.Active);

console.log(`[server] The queue has been started by client '${clientID}'`);

workerSearchInterval = setInterval(SearchForWorker, 1000);
}
}

export function StopQueue(clientID?: string) {
if (state != QueueStatus.Stopped) {
SetQueueStatus(QueueStatus.Stopped);

const stoppedBy = clientID ? `client '${clientID}'` : 'the server.';

console.log(`[server] The queue has been stopped by ${stoppedBy}.`);

if (workerSearchInterval) {
clearInterval(workerSearchInterval);
}
EmitToAllConnections('queue-status-changed', state);
}
}

export function ClearQueue(clientID: string, finishedOnly: boolean = false) {
console.log(
`[server] [queue] Client '${clientID}' has requested to clear ${
Expand Down
5 changes: 5 additions & 0 deletions types/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,8 @@ export type QueueTable = {
id: string;
job: string;
};

export type StatusTable = {
id: string;
state: number;
};