Skip to content

Commit

Permalink
Merge pull request #51 from leapfrogtechnology/migrations
Browse files Browse the repository at this point in the history
Migrations support
  • Loading branch information
kabirbaidhya authored Apr 25, 2020
2 parents 87c7c5e + 10fd58e commit 8ffbc30
Show file tree
Hide file tree
Showing 47 changed files with 1,605 additions and 392 deletions.
5 changes: 1 addition & 4 deletions bin/run
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,4 @@
// Set CLI environment as true.
process.env.SYNC_DB_CLI = 'true';

require('@oclif/command')
.run()
.then(require('@oclif/command/flush'))
.catch(require('@oclif/errors/handle'));
require('@oclif/command').run().then(require('@oclif/command/flush')).catch(require('@oclif/errors/handle'));
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
"@oclif/command": "^1",
"@oclif/config": "^1",
"@oclif/plugin-help": "^2",
"chalk": "^4.0.0",
"debug": "^4.1.1",
"globby": "^10.0.2",
"knex": "^0.20.11",
Expand Down
237 changes: 184 additions & 53 deletions src/api.ts
Original file line number Diff line number Diff line change
@@ -1,73 +1,204 @@
import * as Knex from 'knex';
import { mergeDeepRight } from 'ramda';

import { log } from './logger';
import { getConnectionId } from './config';
import SyncParams from './domain/SyncParams';
import SyncConfig from './domain/SyncConfig';
import SyncResult from './domain/SyncResult';
import { DEFAULT_SYNC_PARAMS } from './constants';
import ConnectionConfig from './domain/ConnectionConfig';
import ConnectionReference from './domain/ConnectionReference';
import { isKnexInstance, getConfig, createInstance } from './util/db';

// Services
import { synchronizeDatabase } from './service/sync';
/**
* Programmatic API
* ----------------
* This module defines the Programmatic API of sync-db.
* The functions exposed here are used by the CLI frontend and
* are also meant to be the public interface for the developers using it as a package.
*/

import * as init from './init';
import { log } from './util/logger';
import { withTransaction, mapToConnectionReferences, DatabaseConnections } from './util/db';

import Configuration from './domain/Configuration';
import SynchronizeParams from './domain/SynchronizeParams';
import OperationParams from './domain/operation/OperationParams';
import OperationResult from './domain/operation/OperationResult';

// Service
import { executeProcesses } from './service/execution';
import { runSynchronize, runPrune } from './service/sync';
import { invokeMigrationApi, KnexMigrationAPI } from './migration/service/knexMigrator';

/**
* Synchronize all the configured database connections.
*
* @param {SyncConfig} config
* @param {(ConnectionConfig[] | ConnectionConfig | Knex[] | Knex)} conn
* @param {SyncParams} [options]
* @returns {Promise<SyncResult[]>}
* @param {Configuration} config
* @param {DatabaseConnections} conn
* @param {SynchronizeParams} [options]
* @returns {Promise<OperationResult[]>}
*/
export async function synchronize(
config: SyncConfig,
conn: ConnectionConfig[] | ConnectionConfig | Knex[] | Knex,
options?: SyncParams
): Promise<SyncResult[]> {
log('Starting to synchronize.');
const connectionList = Array.isArray(conn) ? conn : [conn];
const connections = mapToConnectionReferences(connectionList);
const params = mergeDeepRight(DEFAULT_SYNC_PARAMS, options || {});
const processes = connections.map(({ connection, id: connectionId }) => () =>
synchronizeDatabase(connection, {
config,
params,
connectionId
})
config: Configuration,
conn: DatabaseConnections,
options?: SynchronizeParams
): Promise<OperationResult[]> {
log('Synchronize');

const params: SynchronizeParams = {
force: false,
'skip-migration': false,
...options
};

// TODO: Need to preload the SQL source code under this step.
const { knexMigrationConfig } = await init.prepare(config, {
loadSqlSources: true,
loadMigrations: !params['skip-migration']
});

const connections = mapToConnectionReferences(conn);
const processes = connections.map(connection => () =>
withTransaction(connection, trx =>
runSynchronize(trx, {
config,
params,
connectionId: connection.id,
migrateFunc: t =>
invokeMigrationApi(t, KnexMigrationAPI.MIGRATE_LATEST, {
config,
connectionId: connection.id,
knexMigrationConfig: knexMigrationConfig(connection.id),
params: { ...params, onSuccess: params.onMigrationSuccess, onFailed: params.onMigrationFailed }
})
})
)
);

return executeProcesses(processes, config);
}

/**
* Prune all synchronized objects from the databases (except the ones like tables made via migrations).
*
* TODO: An ability to prune only a handful of objects from the last.
*
* @param {Configuration} config
* @param {(DatabaseConnections)} conn
* @param {OperationParams} [options]
* @returns {Promise<OperationResult[]>}
*/
export async function prune(
config: Configuration,
conn: DatabaseConnections,
options?: OperationParams
): Promise<OperationResult[]> {
log('Prune');

const params: OperationParams = { ...options };
const connections = mapToConnectionReferences(conn);

// TODO: Need to preload the SQL source code under this step.
await init.prepare(config, { loadSqlSources: true });

const processes = connections.map(connection => () =>
withTransaction(connection, trx =>
runPrune(trx, {
config,
params,
connectionId: connection.id
})
)
);

// Explicitly suppressing the `| Error` type since
// all errors are already caught inside synchronizeDatabase().
const results = (await executeProcesses(processes, config)) as SyncResult[];
return executeProcesses(processes, config);
}

/**
* Migrate Latest.
*
* @param {Configuration} config
* @param {(DatabaseConnections)} conn
* @param {OperationParams} [options]
* @returns {Promise<OperationResult[]>}
*/
export async function migrateLatest(
config: Configuration,
conn: DatabaseConnections,
options?: OperationParams
): Promise<OperationResult[]> {
log('Migrate Latest');

const params: OperationParams = { ...options };
const connections = mapToConnectionReferences(conn);
const { knexMigrationConfig } = await init.prepare(config, { loadMigrations: true });

log('Synchronization completed.');
const processes = connections.map(connection => () =>
withTransaction(connection, trx =>
invokeMigrationApi(trx, KnexMigrationAPI.MIGRATE_LATEST, {
config,
params,
connectionId: connection.id,
knexMigrationConfig: knexMigrationConfig(connection.id)
})
)
);

return results;
return executeProcesses(processes, config);
}

/**
* Map connection configuration list to the connection instances.
* Migrate Rollback.
*
* @param {((ConnectionConfig | Knex)[])} connectionList
* @returns {ConnectionReference[]}
* @param {Configuration} config
* @param {(DatabaseConnections)} conn
* @param {OperationParams} [options]
* @returns {Promise<OperationResult[]>}
*/
function mapToConnectionReferences(connectionList: (ConnectionConfig | Knex)[]): ConnectionReference[] {
return connectionList.map(connection => {
if (isKnexInstance(connection)) {
log(`Received connection instance to database: ${connection.client.config.connection.database}`);
export async function migrateRollback(
config: Configuration,
conn: DatabaseConnections,
options?: OperationParams
): Promise<OperationResult[]> {
log('Migrate Rollback');

// TODO: Ask for `id` explicitly in for programmatic API,
// when Knex instance is passed directly.
// This implies a breaking change with the programmatic API.
return { connection, id: getConnectionId(getConfig(connection)) };
}
const params: OperationParams = { ...options };
const connections = mapToConnectionReferences(conn);
const { knexMigrationConfig } = await init.prepare(config, { loadMigrations: true });

log(`Creating a connection to database: ${connection.host}/${connection.database}`);
const processes = connections.map(connection => () =>
withTransaction(connection, trx =>
invokeMigrationApi(trx, KnexMigrationAPI.MIGRATE_ROLLBACK, {
config,
params,
connectionId: connection.id,
knexMigrationConfig: knexMigrationConfig(connection.id)
})
)
);

return { connection: createInstance(connection), id: getConnectionId(connection) };
});
return executeProcesses(processes, config);
}

/**
* List Migrations.
*
* @param {Configuration} config
* @param {(DatabaseConnections)} conn
* @param {OperationParams} [options]
* @returns {Promise<OperationResult[]>}
*/
export async function migrateList(
config: Configuration,
conn: DatabaseConnections,
options?: OperationParams
): Promise<OperationResult[]> {
log('Migrate List');

const params: OperationParams = { ...options };
const connections = mapToConnectionReferences(conn);
const { knexMigrationConfig } = await init.prepare(config, { loadMigrations: true });

const processes = connections.map(connection => () =>
withTransaction(connection, trx =>
invokeMigrationApi(trx, KnexMigrationAPI.MIGRATE_LIST, {
config,
params,
connectionId: connection.id,
knexMigrationConfig: knexMigrationConfig(connection.id)
})
)
);

return executeProcesses(processes, config);
}
37 changes: 0 additions & 37 deletions src/cli.ts

This file was deleted.

73 changes: 73 additions & 0 deletions src/commands/migrate-latest.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import { Command } from '@oclif/command';
import { bold, red, cyan } from 'chalk';

import { migrateLatest } from '../api';
import { dbLogger } from '../util/logger';
import { loadConfig, resolveConnections } from '..';
import { printLine, printError, printInfo } from '../util/io';
import OperationResult from '../domain/operation/OperationResult';

class MigrateLatest extends Command {
static description = 'Run the migrations up to the latest changes.';

/**
* Success handler.
*/
onSuccess = async (result: OperationResult) => {
const log = dbLogger(result.connectionId);
const [num, list] = result.data;
const alreadyUpToDate = num && list.length === 0;

log('Up to date: ', alreadyUpToDate);

await printLine(bold(` ▸ ${result.connectionId} - Successful`) + ` (${result.timeElapsed}s)`);

if (alreadyUpToDate) {
await printInfo(' Already up to date.\n');

return;
}

// Completed migrations.
for (const item of list) {
await printLine(cyan(` - ${item}`));
}

await printInfo(`\n Ran ${list.length} migrations.\n`);
};

/**
* Failure handler.
*/
onFailed = async (result: OperationResult) => {
printLine(bold(red(` ▸ ${result.connectionId} - Failed`)));

await printError(` ${result.error}\n`);
};

/**
* CLI command execution handler.
*
* @returns {Promise<void>}
*/
async run(): Promise<void> {
const config = await loadConfig();
const connections = await resolveConnections();

const results = await migrateLatest(config, connections, {
onSuccess: this.onSuccess,
onFailed: this.onFailed
});

const failedCount = results.filter(({ success }) => !success).length;

if (failedCount === 0) {
return process.exit(0);
}

printError(`Error: Migration failed for ${failedCount} connection(s).`);
process.exit(-1);
}
}

export default MigrateLatest;
Loading

0 comments on commit 8ffbc30

Please sign in to comment.