From bc0a6e1b3551c77b17eedfb96b4aecedf5670638 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Wed, 22 Mar 2023 17:15:43 +0100 Subject: [PATCH 1/4] remove unnecesary Db re-initialization this is from before we added `Db.init` in `WorkflowRunnerProcess` --- packages/cli/src/CredentialsHelper.ts | 6 ------ packages/cli/src/WorkflowExecuteAdditionalData.ts | 6 ------ 2 files changed, 12 deletions(-) diff --git a/packages/cli/src/CredentialsHelper.ts b/packages/cli/src/CredentialsHelper.ts index cfc86bfd263af..5b20ff91b267d 100644 --- a/packages/cli/src/CredentialsHelper.ts +++ b/packages/cli/src/CredentialsHelper.ts @@ -453,12 +453,6 @@ export class CredentialsHelper extends ICredentialsHelper { ): Promise { const credentials = await this.getCredentials(nodeCredentials, type); - if (!Db.isInitialized) { - // The first time executeWorkflow gets called the Database has - // to get initialized first - await Db.init(); - } - credentials.setData(data, this.encryptionKey); const newCredentialsData = credentials.getDataToSave() as ICredentialsDb; diff --git a/packages/cli/src/WorkflowExecuteAdditionalData.ts b/packages/cli/src/WorkflowExecuteAdditionalData.ts index 9cc7aac265e1a..f19278700b86f 100644 --- a/packages/cli/src/WorkflowExecuteAdditionalData.ts +++ b/packages/cli/src/WorkflowExecuteAdditionalData.ts @@ -927,12 +927,6 @@ export async function getWorkflowData( let workflowData: IWorkflowBase | null; if (workflowInfo.id !== undefined) { - if (!Db.isInitialized) { - // The first time executeWorkflow gets called the Database has - // to get initialized first - await Db.init(); - } - const relations = config.getEnv('workflowTagsDisabled') ? [] : ['tags']; workflowData = await WorkflowsService.get({ id: workflowInfo.id }, { relations }); From cc19eb37c193abbd0455eaf7b8bab481e854bbaf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Thu, 23 Mar 2023 11:52:03 +0100 Subject: [PATCH 2/4] feat(core): Improved health check --- packages/cli/src/AbstractServer.ts | 22 +-- packages/cli/src/Db.ts | 31 ++-- packages/cli/src/Server.ts | 165 +++++++++--------- packages/cli/src/commands/BaseCommand.ts | 11 +- packages/cli/src/commands/start.ts | 7 +- packages/cli/src/commands/webhook.ts | 4 +- .../cli/test/integration/shared/testDb.ts | 8 +- 7 files changed, 134 insertions(+), 114 deletions(-) diff --git a/packages/cli/src/AbstractServer.ts b/packages/cli/src/AbstractServer.ts index 55eae9376a520..2c764ffa0f318 100644 --- a/packages/cli/src/AbstractServer.ts +++ b/packages/cli/src/AbstractServer.ts @@ -160,9 +160,9 @@ export abstract class AbstractServer { private async setupHealthCheck() { this.app.use((req, res, next) => { - if (!Db.isInitialized) { - sendErrorResponse(res, new ServiceUnavailableError('Database is not ready!')); - } else next(); + if (Db.connectionState === 'ready') next(); + else if (Db.connectionState === 'connected') res.send('n8n is starting'); + else sendErrorResponse(res, new ServiceUnavailableError('Database is not ready!')); }); // Does very basic health check @@ -400,8 +400,8 @@ export abstract class AbstractServer { ); } - async start(): Promise { - const { app, externalHooks, protocol, sslKey, sslCert } = this; + async init(): Promise { + const { app, protocol, sslKey, sslCert } = this; if (protocol === 'https' && sslKey && sslCert) { const https = await import('https'); @@ -431,6 +431,12 @@ export abstract class AbstractServer { await new Promise((resolve) => this.server.listen(PORT, ADDRESS, () => resolve())); + await this.setupHealthCheck(); + + console.log(`n8n ready on ${ADDRESS}, port ${PORT}`); + } + + async start(): Promise { await this.setupErrorHandlers(); this.setupPushServer(); await this.setupCommonMiddlewares(); @@ -438,11 +444,7 @@ export abstract class AbstractServer { this.setupDevMiddlewares(); } - await this.setupHealthCheck(); - await this.configure(); - - console.log(`n8n ready on ${ADDRESS}, port ${PORT}`); console.log(`Version: ${N8N_VERSION}`); const defaultLocale = config.getEnv('defaultLocale'); @@ -450,7 +452,7 @@ export abstract class AbstractServer { console.log(`Locale: ${defaultLocale}`); } - await externalHooks.run('n8n.ready', [this, config]); + await this.externalHooks.run('n8n.ready', [this, config]); } } diff --git a/packages/cli/src/Db.ts b/packages/cli/src/Db.ts index 114aa22a3116f..7a83435803268 100644 --- a/packages/cli/src/Db.ts +++ b/packages/cli/src/Db.ts @@ -19,6 +19,7 @@ import { getPostgresConnectionOptions, getSqliteConnectionOptions, } from '@db/config'; +import { inTest } from './constants'; import { wrapMigration } from '@db/utils/migrationHelpers'; import type { DatabaseType, Migration } from '@db/types'; import { @@ -43,7 +44,9 @@ import { WorkflowTagMappingRepository, } from '@db/repositories'; -export let isInitialized = false; +type ConnectionState = undefined | 'connected' | 'ready'; +export let connectionState: ConnectionState; + export const collections = {} as IDatabaseCollections; let connection: Connection; @@ -94,10 +97,8 @@ export function getConnectionOptions(dbType: DatabaseType): ConnectionOptions { } } -export async function init( - testConnectionOptions?: ConnectionOptions, -): Promise { - if (isInitialized) return collections; +export async function init(testConnectionOptions?: ConnectionOptions): Promise { + if (connectionState) return; const dbType = config.getEnv('database.type'); const connectionOptions = testConnectionOptions ?? getConnectionOptions(dbType); @@ -125,8 +126,8 @@ export async function init( }); connection = new Connection(connectionOptions); - Container.set(Connection, connection); await connection.initialize(); + Container.set(Connection, connection); if (dbType === 'postgresdb') { const schema = config.getEnv('database.postgresdb.schema'); @@ -138,9 +139,13 @@ export async function init( await connection.query(`SET search_path TO ${searchPath.join(',')};`); } - (connectionOptions.migrations as Migration[]).forEach(wrapMigration); + connectionState = 'connected'; +} + +export async function migrate() { + (connection.options.migrations as Migration[]).forEach(wrapMigration); - if (!testConnectionOptions && dbType === 'sqlite') { + if (!inTest && connection.options.type === 'sqlite') { // This specific migration changes database metadata. // A field is now nullable. We need to reconnect so that // n8n knows it has changed. Happens only on sqlite. @@ -161,14 +166,16 @@ export async function init( // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access if (migrations.length === 0) { await connection.destroy(); - connection = new Connection(connectionOptions); - Container.set(Connection, connection); + connection = new Connection(connection.options); await connection.initialize(); + Container.set(Connection, connection); } } else { await connection.runMigrations({ transaction: 'each' }); } + connectionState = 'ready'; + collections.AuthIdentity = Container.get(AuthIdentityRepository); collections.AuthProviderSyncHistory = Container.get(AuthProviderSyncHistoryRepository); collections.Credentials = Container.get(CredentialsRepository); @@ -188,8 +195,4 @@ export async function init( collections.Workflow = Container.get(WorkflowRepository); collections.WorkflowStatistics = Container.get(WorkflowStatisticsRepository); collections.WorkflowTagMapping = Container.get(WorkflowTagMappingRepository); - - isInitialized = true; - - return collections; } diff --git a/packages/cli/src/Server.ts b/packages/cli/src/Server.ts index ddaf2a6f9093a..f5406d2b2d212 100644 --- a/packages/cli/src/Server.ts +++ b/packages/cli/src/Server.ts @@ -170,7 +170,7 @@ import { VersionControlController } from '@/environments/versionControl/versionC const exec = promisify(callbackExec); -class Server extends AbstractServer { +export class Server extends AbstractServer { endpointPresetCredentials: string; waitTracker: WaitTracker; @@ -198,23 +198,6 @@ class Server extends AbstractServer { this.app.set('view engine', 'handlebars'); this.app.set('views', TEMPLATES_DIR); - this.loadNodesAndCredentials = Container.get(LoadNodesAndCredentials); - this.credentialTypes = Container.get(CredentialTypes); - this.nodeTypes = Container.get(NodeTypes); - - this.activeExecutionsInstance = Container.get(ActiveExecutions); - this.waitTracker = Container.get(WaitTracker); - this.postHog = Container.get(PostHogClient); - - this.presetCredentialsLoaded = false; - this.endpointPresetCredentials = config.getEnv('credentials.overwrite.endpoint'); - - this.push = Container.get(Push); - - if (process.env.E2E_TESTS === 'true') { - this.app.use('/e2e', require('./api/e2e.api').e2eController); - } - const urlBaseWebhook = WebhookHelpers.getWebhookBaseUrl(); const telemetrySettings: ITelemetrySettings = { enabled: config.getEnv('diagnostics.enabled'), @@ -339,6 +322,88 @@ class Server extends AbstractServer { }; } + async start() { + this.loadNodesAndCredentials = Container.get(LoadNodesAndCredentials); + this.credentialTypes = Container.get(CredentialTypes); + this.nodeTypes = Container.get(NodeTypes); + + this.activeExecutionsInstance = Container.get(ActiveExecutions); + this.waitTracker = Container.get(WaitTracker); + this.postHog = Container.get(PostHogClient); + + this.presetCredentialsLoaded = false; + this.endpointPresetCredentials = config.getEnv('credentials.overwrite.endpoint'); + + this.push = Container.get(Push); + + if (process.env.E2E_TESTS === 'true') { + this.app.use('/e2e', require('./api/e2e.api').e2eController); + } + + await super.start(); + + const cpus = os.cpus(); + const binaryDataConfig = config.getEnv('binaryDataManager'); + const diagnosticInfo: IDiagnosticInfo = { + basicAuthActive: config.getEnv('security.basicAuth.active'), + databaseType: config.getEnv('database.type'), + disableProductionWebhooksOnMainProcess: config.getEnv( + 'endpoints.disableProductionWebhooksOnMainProcess', + ), + notificationsEnabled: config.getEnv('versionNotifications.enabled'), + versionCli: N8N_VERSION, + systemInfo: { + os: { + type: os.type(), + version: os.version(), + }, + memory: os.totalmem() / 1024, + cpus: { + count: cpus.length, + model: cpus[0].model, + speed: cpus[0].speed, + }, + }, + executionVariables: { + executions_process: config.getEnv('executions.process'), + executions_mode: config.getEnv('executions.mode'), + executions_timeout: config.getEnv('executions.timeout'), + executions_timeout_max: config.getEnv('executions.maxTimeout'), + executions_data_save_on_error: config.getEnv('executions.saveDataOnError'), + executions_data_save_on_success: config.getEnv('executions.saveDataOnSuccess'), + executions_data_save_on_progress: config.getEnv('executions.saveExecutionProgress'), + executions_data_save_manual_executions: config.getEnv( + 'executions.saveDataManualExecutions', + ), + executions_data_prune: config.getEnv('executions.pruneData'), + executions_data_max_age: config.getEnv('executions.pruneDataMaxAge'), + executions_data_prune_timeout: config.getEnv('executions.pruneDataTimeout'), + }, + deploymentType: config.getEnv('deployment.type'), + binaryDataMode: binaryDataConfig.mode, + n8n_multi_user_allowed: isUserManagementEnabled(), + smtp_set_up: config.getEnv('userManagement.emails.mode') === 'smtp', + ldap_allowed: isLdapCurrentAuthenticationMethod(), + saml_enabled: isSamlCurrentAuthenticationMethod(), + }; + + // Set up event handling + initEvents(); + + if (inDevelopment && process.env.N8N_DEV_RELOAD === 'true') { + const { reloadNodesAndCredentials } = await import('@/ReloadNodesAndCredentials'); + await reloadNodesAndCredentials(this.loadNodesAndCredentials, this.nodeTypes, this.push); + } + + void Db.collections.Workflow.findOne({ + select: ['createdAt'], + order: { createdAt: 'ASC' }, + where: {}, + }).then(async (workflow) => + Container.get(InternalHooks).onServerStarted(diagnosticInfo, workflow?.createdAt), + ); + } + /** * Returns the current settings for the frontend */ @@ -1379,67 +1444,3 @@ class Server extends AbstractServer { setupPushServer(restEndpoint, server, app); } } - -export async function start(): Promise { - const app = new Server(); - await app.start(); - - const cpus = os.cpus(); - const binaryDataConfig = config.getEnv('binaryDataManager'); - const diagnosticInfo: IDiagnosticInfo = { - basicAuthActive: config.getEnv('security.basicAuth.active'), - databaseType: config.getEnv('database.type'), - disableProductionWebhooksOnMainProcess: config.getEnv( - 'endpoints.disableProductionWebhooksOnMainProcess', - ), - notificationsEnabled: config.getEnv('versionNotifications.enabled'), - versionCli: N8N_VERSION, - systemInfo: { - os: { - type: os.type(), - version: os.version(), - }, - memory: os.totalmem() / 1024, - cpus: { - count: cpus.length, - model: cpus[0].model, - speed: cpus[0].speed, - }, - }, - executionVariables: { - executions_process: config.getEnv('executions.process'), - executions_mode: config.getEnv('executions.mode'), - executions_timeout: config.getEnv('executions.timeout'), - executions_timeout_max: config.getEnv('executions.maxTimeout'), - executions_data_save_on_error: config.getEnv('executions.saveDataOnError'), - executions_data_save_on_success: config.getEnv('executions.saveDataOnSuccess'), - executions_data_save_on_progress: config.getEnv('executions.saveExecutionProgress'), - executions_data_save_manual_executions: config.getEnv('executions.saveDataManualExecutions'), - executions_data_prune: config.getEnv('executions.pruneData'), - executions_data_max_age: config.getEnv('executions.pruneDataMaxAge'), - executions_data_prune_timeout: config.getEnv('executions.pruneDataTimeout'), - }, - deploymentType: config.getEnv('deployment.type'), - binaryDataMode: binaryDataConfig.mode, - n8n_multi_user_allowed: isUserManagementEnabled(), - smtp_set_up: config.getEnv('userManagement.emails.mode') === 'smtp', - ldap_allowed: isLdapCurrentAuthenticationMethod(), - saml_enabled: isSamlCurrentAuthenticationMethod(), - }; - - // Set up event handling - initEvents(); - - if (inDevelopment && process.env.N8N_DEV_RELOAD === 'true') { - const { reloadNodesAndCredentials } = await import('@/ReloadNodesAndCredentials'); - await reloadNodesAndCredentials(app.loadNodesAndCredentials, app.nodeTypes, app.push); - } - - void Db.collections.Workflow.findOne({ - select: ['createdAt'], - order: { createdAt: 'ASC' }, - where: {}, - }).then(async (workflow) => - Container.get(InternalHooks).onServerStarted(diagnosticInfo, workflow?.createdAt), - ); -} diff --git a/packages/cli/src/commands/BaseCommand.ts b/packages/cli/src/commands/BaseCommand.ts index 2bd7a88572f3b..f55ae424ad283 100644 --- a/packages/cli/src/commands/BaseCommand.ts +++ b/packages/cli/src/commands/BaseCommand.ts @@ -5,6 +5,7 @@ import type { INodeTypes } from 'n8n-workflow'; import { LoggerProxy, ErrorReporterProxy as ErrorReporter, sleep } from 'n8n-workflow'; import type { IUserSettings } from 'n8n-core'; import { BinaryDataManager, UserSettings } from 'n8n-core'; +import type { AbstractServer } from '@/AbstractServer'; import { getLogger } from '@/Logger'; import config from '@/config'; import * as Db from '@/Db'; @@ -36,6 +37,8 @@ export abstract class BaseCommand extends Command { protected instanceId: string; + protected server?: AbstractServer; + async init(): Promise { await initErrorHandling(); @@ -55,6 +58,12 @@ export abstract class BaseCommand extends Command { this.exitWithCrash('There was an error initializing DB', error), ); + await this.server?.init(); + + await Db.migrate().catch(async (error: Error) => + this.exitWithCrash('There was an error running database migrations', error), + ); + if (process.env.WEBHOOK_TUNNEL_URL) { LoggerProxy.warn( 'You are still using the WEBHOOK_TUNNEL_URL environment variable. It has been deprecated and will be removed in a future version of n8n. Please switch to using WEBHOOK_URL instead.', @@ -112,7 +121,7 @@ export abstract class BaseCommand extends Command { async finally(error: Error | undefined) { if (inTest || this.id === 'start') return; - if (Db.isInitialized) { + if (Db.connectionState === 'ready') { await sleep(100); // give any in-flight query some time to finish await Db.getConnection().destroy(); } diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index 358d8091eb692..016fc48a9ef1b 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -21,7 +21,7 @@ import { ActiveExecutions } from '@/ActiveExecutions'; import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner'; import * as Db from '@/Db'; import * as GenericHelpers from '@/GenericHelpers'; -import * as Server from '@/Server'; +import { Server } from '@/Server'; import { TestWebhooks } from '@/TestWebhooks'; import { getAllInstalledPackages } from '@/CommunityNodes/packageModel'; import { EDITOR_UI_DIST_DIR, GENERATED_STATIC_DIR } from '@/constants'; @@ -62,6 +62,8 @@ export class Start extends BaseCommand { protected activeWorkflowRunner: ActiveWorkflowRunner; + protected server = new Server(); + /** * Opens the UI in browser */ @@ -208,6 +210,7 @@ export class Start extends BaseCommand { async init() { await this.initCrashJournal(); + await super.init(); this.logger.info('Initializing n8n process'); this.activeWorkflowRunner = Container.get(ActiveWorkflowRunner); @@ -351,7 +354,7 @@ export class Start extends BaseCommand { ); } - await Server.start(); + await this.server.start(); // Start to get active workflows and run their triggers await this.activeWorkflowRunner.init(); diff --git a/packages/cli/src/commands/webhook.ts b/packages/cli/src/commands/webhook.ts index cc1ac53b35389..da2fc44315ce6 100644 --- a/packages/cli/src/commands/webhook.ts +++ b/packages/cli/src/commands/webhook.ts @@ -16,6 +16,8 @@ export class Webhook extends BaseCommand { help: flags.help({ char: 'h' }), }; + protected server = new WebhookServer(); + /** * Stops n8n in a graceful way. * Make for example sure that all the webhooks from third party services @@ -81,7 +83,7 @@ export class Webhook extends BaseCommand { async run() { await Container.get(Queue).init(); - await new WebhookServer().start(); + await this.server.start(); this.logger.info('Webhook listener waiting for requests.'); // Make sure that the process does not close diff --git a/packages/cli/test/integration/shared/testDb.ts b/packages/cli/test/integration/shared/testDb.ts index 5cc910367d6b9..c21a397818279 100644 --- a/packages/cli/test/integration/shared/testDb.ts +++ b/packages/cli/test/integration/shared/testDb.ts @@ -58,7 +58,7 @@ export async function init() { if (dbType === 'sqlite') { // no bootstrap connection required - return Db.init(getSqliteOptions({ name: testDbName })); + await Db.init(getSqliteOptions({ name: testDbName })); } if (dbType === 'postgresdb') { @@ -89,7 +89,7 @@ export async function init() { await bootstrapPostgres.query(`CREATE DATABASE ${testDbName}`); await bootstrapPostgres.destroy(); - return Db.init(getDBOptions('postgres', testDbName)); + await Db.init(getDBOptions('postgres', testDbName)); } if (dbType === 'mysqldb') { @@ -97,10 +97,10 @@ export async function init() { await bootstrapMysql.query(`CREATE DATABASE ${testDbName}`); await bootstrapMysql.destroy(); - return Db.init(getDBOptions('mysql', testDbName)); + await Db.init(getDBOptions('mysql', testDbName)); } - throw new Error(`Unrecognized DB type: ${dbType}`); + await Db.migrate(); } /** From 3985353d7a5eb231e253fe4a2fb4d6b9cb85fc26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Tue, 9 May 2023 12:32:21 +0200 Subject: [PATCH 3/4] make health check not care about DB connections --- packages/cli/src/AbstractServer.ts | 34 ++++++--------------- packages/cli/src/Db.ts | 39 +++++++++++++++++++----- packages/cli/src/commands/BaseCommand.ts | 2 +- 3 files changed, 41 insertions(+), 34 deletions(-) diff --git a/packages/cli/src/AbstractServer.ts b/packages/cli/src/AbstractServer.ts index 2c764ffa0f318..10bebc44b819f 100644 --- a/packages/cli/src/AbstractServer.ts +++ b/packages/cli/src/AbstractServer.ts @@ -159,33 +159,17 @@ export abstract class AbstractServer { protected setupPushServer() {} private async setupHealthCheck() { - this.app.use((req, res, next) => { - if (Db.connectionState === 'ready') next(); - else if (Db.connectionState === 'connected') res.send('n8n is starting'); - else sendErrorResponse(res, new ServiceUnavailableError('Database is not ready!')); - }); - - // Does very basic health check + // health check should not care about DB connections this.app.get('/healthz', async (req, res) => { - Logger.debug('Health check started!'); - - const connection = Db.getConnection(); - - try { - if (!connection.isInitialized) { - // Connection is not active - throw new ServiceUnavailableError('No active database connection!'); - } - // DB ping - await connection.query('SELECT 1'); - } catch (error) { - ErrorReporter.error(error); - Logger.error('No Database connection!'); - return sendErrorResponse(res, new ServiceUnavailableError('No Database connection!')); - } + res.send({ status: 'ok' }); + }); - Logger.debug('Health check completed successfully!'); - sendSuccessResponse(res, { status: 'ok' }, true, 200); + const { connectionState } = Db; + this.app.use((req, res, next) => { + if (connectionState.connected) { + if (connectionState.migrated) next(); + else res.send('n8n is starting up. Please wait'); + } else sendErrorResponse(res, new ServiceUnavailableError('Database is not ready!')); }); if (config.getEnv('executions.mode') === 'queue') { diff --git a/packages/cli/src/Db.ts b/packages/cli/src/Db.ts index 7a83435803268..0e5ade5b20271 100644 --- a/packages/cli/src/Db.ts +++ b/packages/cli/src/Db.ts @@ -7,6 +7,8 @@ import { Container } from 'typedi'; import type { DataSourceOptions as ConnectionOptions, EntityManager, LoggerOptions } from 'typeorm'; import { DataSource as Connection } from 'typeorm'; import type { TlsOptions } from 'tls'; +import { ErrorReporterProxy as ErrorReporter } from 'n8n-workflow'; + import type { IDatabaseCollections } from '@/Interfaces'; import config from '@/config'; @@ -19,7 +21,7 @@ import { getPostgresConnectionOptions, getSqliteConnectionOptions, } from '@db/config'; -import { inTest } from './constants'; +import { inTest } from '@/constants'; import { wrapMigration } from '@db/utils/migrationHelpers'; import type { DatabaseType, Migration } from '@db/types'; import { @@ -44,15 +46,36 @@ import { WorkflowTagMappingRepository, } from '@db/repositories'; -type ConnectionState = undefined | 'connected' | 'ready'; -export let connectionState: ConnectionState; - export const collections = {} as IDatabaseCollections; let connection: Connection; export const getConnection = () => connection!; +type ConnectionState = { + connected: boolean; + migrated: boolean; +}; + +export const connectionState: ConnectionState = { + connected: false, + migrated: false, +}; + +// Ping DB connection every 2 seconds +setInterval(async () => { + if (connection?.isInitialized) { + try { + await connection.query('SELECT 1'); + connectionState.connected = true; + return; + } catch (error) { + ErrorReporter.error(error); + } + } + connectionState.connected = false; +}, 2000); + export async function transaction(fn: (entityManager: EntityManager) => Promise): Promise { return connection.transaction(fn); } @@ -98,7 +121,7 @@ export function getConnectionOptions(dbType: DatabaseType): ConnectionOptions { } export async function init(testConnectionOptions?: ConnectionOptions): Promise { - if (connectionState) return; + if (connectionState.connected) return; const dbType = config.getEnv('database.type'); const connectionOptions = testConnectionOptions ?? getConnectionOptions(dbType); @@ -139,7 +162,7 @@ export async function init(testConnectionOptions?: ConnectionOptions): Promise Date: Tue, 9 May 2023 21:38:16 +0200 Subject: [PATCH 4/4] close DB connections, and shutdown the timer --- packages/cli/src/Db.ts | 51 ++++++++++++------- packages/cli/src/commands/BaseCommand.ts | 2 +- .../cli/test/integration/shared/testDb.ts | 3 +- 3 files changed, 36 insertions(+), 20 deletions(-) diff --git a/packages/cli/src/Db.ts b/packages/cli/src/Db.ts index 0e5ade5b20271..af274427e2fd7 100644 --- a/packages/cli/src/Db.ts +++ b/packages/cli/src/Db.ts @@ -63,18 +63,24 @@ export const connectionState: ConnectionState = { }; // Ping DB connection every 2 seconds -setInterval(async () => { - if (connection?.isInitialized) { - try { - await connection.query('SELECT 1'); - connectionState.connected = true; - return; - } catch (error) { - ErrorReporter.error(error); +let pingTimer: NodeJS.Timer | undefined; +if (!inTest) { + const pingDBFn = async () => { + if (connection?.isInitialized) { + try { + await connection.query('SELECT 1'); + connectionState.connected = true; + return; + } catch (error) { + ErrorReporter.error(error); + } finally { + pingTimer = setTimeout(pingDBFn, 2000); + } } - } - connectionState.connected = false; -}, 2000); + connectionState.connected = false; + }; + pingTimer = setTimeout(pingDBFn, 2000); +} export async function transaction(fn: (entityManager: EntityManager) => Promise): Promise { return connection.transaction(fn); @@ -120,6 +126,12 @@ export function getConnectionOptions(dbType: DatabaseType): ConnectionOptions { } } +const openConnection = async (options: ConnectionOptions) => { + connection = new Connection(options); + await connection.initialize(); + Container.set(Connection, connection); +}; + export async function init(testConnectionOptions?: ConnectionOptions): Promise { if (connectionState.connected) return; @@ -148,9 +160,7 @@ export async function init(testConnectionOptions?: ConnectionOptions): Promise { + if (pingTimer) { + clearTimeout(pingTimer); + pingTimer = undefined; + } + + if (connection.isInitialized) await connection.destroy(); +}; diff --git a/packages/cli/src/commands/BaseCommand.ts b/packages/cli/src/commands/BaseCommand.ts index e47ce428a3cee..e629458b91b1a 100644 --- a/packages/cli/src/commands/BaseCommand.ts +++ b/packages/cli/src/commands/BaseCommand.ts @@ -123,7 +123,7 @@ export abstract class BaseCommand extends Command { if (inTest || this.id === 'start') return; if (Db.connectionState.connected) { await sleep(100); // give any in-flight query some time to finish - await Db.getConnection().destroy(); + await Db.close(); } const exitCode = error instanceof ExitError ? error.oclif.exit : error ? 1 : 0; this.exit(exitCode); diff --git a/packages/cli/test/integration/shared/testDb.ts b/packages/cli/test/integration/shared/testDb.ts index c21a397818279..08eb9fce2e9f8 100644 --- a/packages/cli/test/integration/shared/testDb.ts +++ b/packages/cli/test/integration/shared/testDb.ts @@ -107,8 +107,7 @@ export async function init() { * Drop test DB, closing bootstrap connection if existing. */ export async function terminate() { - const connection = Db.getConnection(); - if (connection.isInitialized) await connection.destroy(); + await Db.close(); } /**