Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): Improve health check #6205

Merged
merged 4 commits into from
May 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 18 additions & 32 deletions packages/cli/src/AbstractServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,33 +159,17 @@ export abstract class AbstractServer {
protected setupPushServer() {}

private async setupHealthCheck() {
this.app.use((req, res, next) => {
if (!Db.isInitialized) {
sendErrorResponse(res, new ServiceUnavailableError('Database is not ready!'));
} else next();
});

// Does very basic health check
// health check should not care about DB connections
this.app.get('/healthz', async (req, res) => {
Logger.debug('Health check started!');

const connection = Db.getConnection();

try {
if (!connection.isInitialized) {
// Connection is not active
throw new ServiceUnavailableError('No active database connection!');
}
// DB ping
await connection.query('SELECT 1');
} catch (error) {
ErrorReporter.error(error);
Logger.error('No Database connection!');
return sendErrorResponse(res, new ServiceUnavailableError('No Database connection!'));
}
res.send({ status: 'ok' });
});

Logger.debug('Health check completed successfully!');
sendSuccessResponse(res, { status: 'ok' }, true, 200);
const { connectionState } = Db;
this.app.use((req, res, next) => {
if (connectionState.connected) {
if (connectionState.migrated) next();
else res.send('n8n is starting up. Please wait');
} else sendErrorResponse(res, new ServiceUnavailableError('Database is not ready!'));
});

if (config.getEnv('executions.mode') === 'queue') {
Expand Down Expand Up @@ -400,8 +384,8 @@ export abstract class AbstractServer {
);
}

async start(): Promise<void> {
const { app, externalHooks, protocol, sslKey, sslCert } = this;
async init(): Promise<void> {
const { app, protocol, sslKey, sslCert } = this;

if (protocol === 'https' && sslKey && sslCert) {
const https = await import('https');
Expand Down Expand Up @@ -431,26 +415,28 @@ export abstract class AbstractServer {

await new Promise<void>((resolve) => this.server.listen(PORT, ADDRESS, () => resolve()));

await this.setupHealthCheck();

console.log(`n8n ready on ${ADDRESS}, port ${PORT}`);
}

async start(): Promise<void> {
await this.setupErrorHandlers();
this.setupPushServer();
await this.setupCommonMiddlewares();
if (inDevelopment) {
this.setupDevMiddlewares();
}

await this.setupHealthCheck();

await this.configure();

console.log(`n8n ready on ${ADDRESS}, port ${PORT}`);
console.log(`Version: ${N8N_VERSION}`);

const defaultLocale = config.getEnv('defaultLocale');
if (defaultLocale !== 'en') {
console.log(`Locale: ${defaultLocale}`);
}

await externalHooks.run('n8n.ready', [this, config]);
await this.externalHooks.run('n8n.ready', [this, config]);
}
}

Expand Down
6 changes: 0 additions & 6 deletions packages/cli/src/CredentialsHelper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -453,12 +453,6 @@ export class CredentialsHelper extends ICredentialsHelper {
): Promise<void> {
const credentials = await this.getCredentials(nodeCredentials, type);

if (!Db.isInitialized) {
// The first time executeWorkflow gets called the Database has
// to get initialized first
await Db.init();
}

credentials.setData(data, this.encryptionKey);
const newCredentialsData = credentials.getDataToSave() as ICredentialsDb;

Expand Down
75 changes: 59 additions & 16 deletions packages/cli/src/Db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import { Container } from 'typedi';
import type { DataSourceOptions as ConnectionOptions, EntityManager, LoggerOptions } from 'typeorm';
import { DataSource as Connection } from 'typeorm';
import type { TlsOptions } from 'tls';
import { ErrorReporterProxy as ErrorReporter } from 'n8n-workflow';

import type { IDatabaseCollections } from '@/Interfaces';

import config from '@/config';
Expand All @@ -19,6 +21,7 @@ import {
getPostgresConnectionOptions,
getSqliteConnectionOptions,
} from '@db/config';
import { inTest } from '@/constants';
import { wrapMigration } from '@db/utils/migrationHelpers';
import type { DatabaseType, Migration } from '@db/types';
import {
Expand All @@ -43,13 +46,42 @@ import {
WorkflowTagMappingRepository,
} from '@db/repositories';

export let isInitialized = false;
export const collections = {} as IDatabaseCollections;

let connection: Connection;

export const getConnection = () => connection!;

type ConnectionState = {
connected: boolean;
migrated: boolean;
};

export const connectionState: ConnectionState = {
connected: false,
migrated: false,
};

// Ping DB connection every 2 seconds
let pingTimer: NodeJS.Timer | undefined;
if (!inTest) {
const pingDBFn = async () => {
if (connection?.isInitialized) {
try {
await connection.query('SELECT 1');
connectionState.connected = true;
return;
} catch (error) {
ErrorReporter.error(error);
} finally {
pingTimer = setTimeout(pingDBFn, 2000);
}
}
connectionState.connected = false;
};
pingTimer = setTimeout(pingDBFn, 2000);
}

export async function transaction<T>(fn: (entityManager: EntityManager) => Promise<T>): Promise<T> {
return connection.transaction(fn);
}
Expand Down Expand Up @@ -94,10 +126,14 @@ export function getConnectionOptions(dbType: DatabaseType): ConnectionOptions {
}
}

export async function init(
testConnectionOptions?: ConnectionOptions,
): Promise<IDatabaseCollections> {
if (isInitialized) return collections;
const openConnection = async (options: ConnectionOptions) => {
connection = new Connection(options);
await connection.initialize();
Container.set(Connection, connection);
};

export async function init(testConnectionOptions?: ConnectionOptions): Promise<void> {
if (connectionState.connected) return;

const dbType = config.getEnv('database.type');
const connectionOptions = testConnectionOptions ?? getConnectionOptions(dbType);
Expand All @@ -124,9 +160,7 @@ export async function init(
migrationsRun: false,
});

connection = new Connection(connectionOptions);
Container.set(Connection, connection);
await connection.initialize();
await openConnection(connectionOptions);

if (dbType === 'postgresdb') {
const schema = config.getEnv('database.postgresdb.schema');
Expand All @@ -138,9 +172,13 @@ export async function init(
await connection.query(`SET search_path TO ${searchPath.join(',')};`);
}

(connectionOptions.migrations as Migration[]).forEach(wrapMigration);
connectionState.connected = true;
}

export async function migrate() {
(connection.options.migrations as Migration[]).forEach(wrapMigration);

if (!testConnectionOptions && dbType === 'sqlite') {
if (!inTest && connection.options.type === 'sqlite') {
// This specific migration changes database metadata.
// A field is now nullable. We need to reconnect so that
// n8n knows it has changed. Happens only on sqlite.
Expand All @@ -161,9 +199,7 @@ export async function init(
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
if (migrations.length === 0) {
await connection.destroy();
connection = new Connection(connectionOptions);
Container.set(Connection, connection);
await connection.initialize();
await openConnection(connection.options);
}
} else {
await connection.runMigrations({ transaction: 'each' });
Expand All @@ -189,7 +225,14 @@ export async function init(
collections.WorkflowStatistics = Container.get(WorkflowStatisticsRepository);
collections.WorkflowTagMapping = Container.get(WorkflowTagMappingRepository);

isInitialized = true;

return collections;
connectionState.migrated = true;
}

export const close = async () => {
if (pingTimer) {
clearTimeout(pingTimer);
pingTimer = undefined;
}

if (connection.isInitialized) await connection.destroy();
};
Loading