feat(cli): add support for "reporter" plugins and include a default reporter
This commit is contained in:
parent
b61072a3b6
commit
8f35812fed
5 changed files with 286 additions and 61 deletions
39
packages/cli/src/plugin-reporter-default.ts
Normal file
39
packages/cli/src/plugin-reporter-default.ts
Normal file
|
|
@ -0,0 +1,39 @@
|
|||
import { type ReporterPlugin } from '@emigrate/plugin-tools/types';
|
||||
|
||||
const reporterDefault: ReporterPlugin = {
|
||||
onInit({ dry, directory }) {
|
||||
console.log(`Running migrations in: ${directory}${dry ? ' (dry run)' : ''}`);
|
||||
},
|
||||
onCollectedMigrations(migrations) {
|
||||
console.log(`Found ${migrations.length} pending migrations`);
|
||||
},
|
||||
onLockedMigrations(migrations) {
|
||||
console.log(`Locked ${migrations.length} migrations`);
|
||||
},
|
||||
onMigrationStart(migration) {
|
||||
console.log(`- ${migration.relativeFilePath} (running)`);
|
||||
},
|
||||
onMigrationSuccess(migration) {
|
||||
console.log(`- ${migration.relativeFilePath} (success) [${migration.duration}ms]`);
|
||||
},
|
||||
onMigrationError(migration, error) {
|
||||
console.error(`- ${migration.relativeFilePath} (failed!) [${migration.duration}ms]`);
|
||||
console.error(error.cause ?? error);
|
||||
},
|
||||
onMigrationSkip(migration) {
|
||||
console.log(`- ${migration.relativeFilePath} (skipped)`);
|
||||
},
|
||||
onFinished(migrations, error) {
|
||||
const totalDuration = migrations.reduce((total, migration) => total + migration.duration, 0);
|
||||
|
||||
if (error) {
|
||||
console.error('Failed to run migrations! [total duration: %dms]', totalDuration);
|
||||
console.error(error.cause ?? error);
|
||||
return;
|
||||
}
|
||||
|
||||
console.log(`Successfully ran ${migrations.length} migrations! [total duration: ${totalDuration}ms]`);
|
||||
},
|
||||
};
|
||||
|
||||
export default reporterDefault;
|
||||
|
|
@ -1,6 +1,14 @@
|
|||
import process from 'node:process';
|
||||
import { getOrLoadPlugin, getOrLoadPlugins } from '@emigrate/plugin-tools';
|
||||
import { type LoaderPlugin, type MigrationFunction } from '@emigrate/plugin-tools/types';
|
||||
import {
|
||||
type LoaderPlugin,
|
||||
type MigrationFunction,
|
||||
type Plugin,
|
||||
type PluginType,
|
||||
type PluginFromType,
|
||||
type MigrationMetadata,
|
||||
type MigrationMetadataFinished,
|
||||
} from '@emigrate/plugin-tools/types';
|
||||
import {
|
||||
BadOptionError,
|
||||
EmigrateError,
|
||||
|
|
@ -12,26 +20,45 @@ import {
|
|||
import { type Config } from './types.js';
|
||||
import { stripLeadingPeriod } from './strip-leading-period.js';
|
||||
import pluginLoaderJs from './plugin-loader-js.js';
|
||||
import pluginReporterDefault from './plugin-reporter-default.js';
|
||||
|
||||
type ExtraFlags = {
|
||||
dry?: boolean;
|
||||
};
|
||||
|
||||
export default async function upCommand({ directory, dry, plugins = [] }: Config & ExtraFlags) {
|
||||
const requirePlugin = async <T extends PluginType>(
|
||||
type: T,
|
||||
plugins: Array<Plugin | string>,
|
||||
): Promise<PluginFromType<T>> => {
|
||||
const plugin = await getOrLoadPlugin(type, plugins);
|
||||
|
||||
if (!plugin) {
|
||||
throw new BadOptionError(
|
||||
'plugin',
|
||||
`No ${type} plugin found, please specify a ${type} plugin using the plugin option`,
|
||||
);
|
||||
}
|
||||
|
||||
return plugin;
|
||||
};
|
||||
|
||||
const getDuration = (start: [number, number]) => {
|
||||
const [seconds, nanoseconds] = process.hrtime(start);
|
||||
return seconds * 1000 + nanoseconds / 1_000_000;
|
||||
};
|
||||
|
||||
export default async function upCommand({ directory, dry = false, plugins = [] }: Config & ExtraFlags) {
|
||||
if (!directory) {
|
||||
throw new MissingOptionError('directory');
|
||||
}
|
||||
|
||||
const storagePlugin = await getOrLoadPlugin('storage', plugins);
|
||||
|
||||
if (!storagePlugin) {
|
||||
throw new BadOptionError(
|
||||
'plugin',
|
||||
'No storage plugin found, please specify a storage plugin using the plugin option',
|
||||
);
|
||||
}
|
||||
|
||||
const cwd = process.cwd();
|
||||
const storagePlugin = await requirePlugin('storage', plugins);
|
||||
const storage = await storagePlugin.initializeStorage();
|
||||
const reporter = await requirePlugin('reporter', [pluginReporterDefault, ...plugins]);
|
||||
|
||||
await reporter.onInit?.({ cwd, dry, directory });
|
||||
|
||||
const path = await import('node:path');
|
||||
const fs = await import('node:fs/promises');
|
||||
|
||||
|
|
@ -39,26 +66,41 @@ export default async function upCommand({ directory, dry, plugins = [] }: Config
|
|||
withFileTypes: true,
|
||||
});
|
||||
|
||||
const migrationFiles = allFilesInMigrationDirectory
|
||||
const migrationFiles: MigrationMetadata[] = allFilesInMigrationDirectory
|
||||
.filter((file) => file.isFile() && !file.name.startsWith('.') && !file.name.startsWith('_'))
|
||||
.sort((a, b) => a.name.localeCompare(b.name))
|
||||
.map((file) => file.name);
|
||||
.map(({ name }) => {
|
||||
const filePath = path.resolve(process.cwd(), directory, name);
|
||||
|
||||
return {
|
||||
name,
|
||||
filePath,
|
||||
relativeFilePath: path.relative(cwd, filePath),
|
||||
extension: stripLeadingPeriod(path.extname(name)),
|
||||
directory,
|
||||
cwd,
|
||||
};
|
||||
});
|
||||
|
||||
let migrationHistoryError: MigrationHistoryError | undefined;
|
||||
|
||||
for await (const migrationHistoryEntry of storage.getHistory()) {
|
||||
if (migrationHistoryEntry.status === 'failed') {
|
||||
throw new MigrationHistoryError(
|
||||
migrationHistoryError = new MigrationHistoryError(
|
||||
`Migration ${migrationHistoryEntry.name} is in a failed state, please fix it first`,
|
||||
migrationHistoryEntry,
|
||||
);
|
||||
}
|
||||
|
||||
if (migrationFiles.includes(migrationHistoryEntry.name)) {
|
||||
migrationFiles.splice(migrationFiles.indexOf(migrationHistoryEntry.name), 1);
|
||||
const index = migrationFiles.findIndex((migrationFile) => migrationFile.name === migrationHistoryEntry.name);
|
||||
|
||||
if (index !== -1) {
|
||||
migrationFiles.splice(index, 1);
|
||||
}
|
||||
}
|
||||
|
||||
const migrationFileExtensions = new Set(migrationFiles.map((file) => stripLeadingPeriod(path.extname(file))));
|
||||
const loaderPlugins = await getOrLoadPlugins('loader', [...plugins, pluginLoaderJs]);
|
||||
const migrationFileExtensions = new Set(migrationFiles.map((migration) => migration.extension));
|
||||
const loaderPlugins = await getOrLoadPlugins('loader', [pluginLoaderJs, ...plugins]);
|
||||
|
||||
const loaderByExtension = new Map<string, LoaderPlugin | undefined>(
|
||||
[...migrationFileExtensions].map(
|
||||
|
|
@ -78,10 +120,19 @@ export default async function upCommand({ directory, dry, plugins = [] }: Config
|
|||
}
|
||||
}
|
||||
|
||||
if (dry) {
|
||||
console.log('Pending migrations:');
|
||||
console.log(migrationFiles.map((file) => ` - ${file}`).join('\n'));
|
||||
console.log('\nDry run, exiting...');
|
||||
await reporter.onCollectedMigrations?.(migrationFiles);
|
||||
|
||||
if (migrationFiles.length === 0 || dry || migrationHistoryError) {
|
||||
await reporter.onLockedMigrations?.([]);
|
||||
|
||||
for await (const migration of migrationFiles) {
|
||||
await reporter.onMigrationSkip?.(migration);
|
||||
}
|
||||
|
||||
await reporter.onFinished?.(
|
||||
migrationFiles.map((migration) => ({ ...migration, status: 'skipped', duration: 0 })),
|
||||
migrationHistoryError,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -104,48 +155,70 @@ export default async function upCommand({ directory, dry, plugins = [] }: Config
|
|||
process.on('SIGINT', cleanup);
|
||||
process.on('SIGTERM', cleanup);
|
||||
|
||||
const finishedMigrations: MigrationMetadataFinished[] = [];
|
||||
|
||||
try {
|
||||
for await (const name of lockedMigrationFiles) {
|
||||
console.log(' -', name, '...');
|
||||
for await (const migration of lockedMigrationFiles) {
|
||||
const lastMigrationStatus = finishedMigrations.at(-1)?.status;
|
||||
|
||||
const extension = stripLeadingPeriod(path.extname(name));
|
||||
const cwd = process.cwd();
|
||||
const filePath = path.resolve(cwd, directory, name);
|
||||
const relativeFilePath = path.relative(cwd, filePath);
|
||||
const loader = loaderByExtension.get(extension)!;
|
||||
const metadata = { name, filePath, relativeFilePath, cwd, directory, extension };
|
||||
if (lastMigrationStatus === 'failed' || lastMigrationStatus === 'skipped') {
|
||||
await reporter.onMigrationSkip?.(migration);
|
||||
finishedMigrations.push({ ...migration, status: 'skipped', duration: 0 });
|
||||
continue;
|
||||
}
|
||||
|
||||
let migration: MigrationFunction;
|
||||
await reporter.onMigrationStart?.(migration);
|
||||
|
||||
const loader = loaderByExtension.get(migration.extension)!;
|
||||
const start = process.hrtime();
|
||||
|
||||
let migrationFunction: MigrationFunction;
|
||||
|
||||
try {
|
||||
try {
|
||||
migration = await loader.loadMigration(metadata);
|
||||
migrationFunction = await loader.loadMigration(migration);
|
||||
} catch (error) {
|
||||
throw new MigrationLoadError(`Failed to load migration file: ${relativeFilePath}`, metadata, {
|
||||
throw new MigrationLoadError(`Failed to load migration file: ${migration.relativeFilePath}`, migration, {
|
||||
cause: error,
|
||||
});
|
||||
}
|
||||
|
||||
await migration();
|
||||
await migrationFunction();
|
||||
|
||||
console.log(' -', name, 'done');
|
||||
const duration = getDuration(start);
|
||||
const finishedMigration: MigrationMetadataFinished = { ...migration, status: 'done', duration };
|
||||
|
||||
await storage.onSuccess(name);
|
||||
await storage.onSuccess(finishedMigration);
|
||||
await reporter.onMigrationSuccess?.(finishedMigration);
|
||||
|
||||
finishedMigrations.push(finishedMigration);
|
||||
} catch (error) {
|
||||
const errorInstance = error instanceof Error ? error : new Error(String(error));
|
||||
let errorInstance = error instanceof Error ? error : new Error(String(error));
|
||||
|
||||
console.error(' -', name, 'failed:', errorInstance.message);
|
||||
|
||||
await storage.onError(name, errorInstance);
|
||||
|
||||
if (!(error instanceof EmigrateError)) {
|
||||
throw new MigrationRunError(`Failed to run migration: ${relativeFilePath}`, metadata, { cause: error });
|
||||
if (!(errorInstance instanceof EmigrateError)) {
|
||||
errorInstance = new MigrationRunError(`Failed to run migration: ${migration.relativeFilePath}`, migration, {
|
||||
cause: error,
|
||||
});
|
||||
}
|
||||
|
||||
throw error;
|
||||
const duration = getDuration(start);
|
||||
const finishedMigration: MigrationMetadataFinished = {
|
||||
...migration,
|
||||
status: 'done',
|
||||
duration,
|
||||
error: errorInstance,
|
||||
};
|
||||
|
||||
await storage.onError(finishedMigration, errorInstance);
|
||||
await reporter.onMigrationError?.(finishedMigration, errorInstance);
|
||||
|
||||
finishedMigrations.push(finishedMigration);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
const firstError = finishedMigrations.find((migration) => migration.status === 'failed')?.error;
|
||||
|
||||
await reporter.onFinished?.(finishedMigrations, firstError);
|
||||
await cleanup();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ import {
|
|||
type StoragePlugin,
|
||||
type Plugin,
|
||||
type LoaderPlugin,
|
||||
type ReporterPlugin,
|
||||
} from './types.js';
|
||||
|
||||
export const isGeneratorPlugin = (plugin: any): plugin is GeneratorPlugin => {
|
||||
|
|
@ -32,6 +33,25 @@ export const isLoaderPlugin = (plugin: any): plugin is LoaderPlugin => {
|
|||
return typeof plugin.loadMigration === 'function' && Array.isArray(plugin.loadableExtensions);
|
||||
};
|
||||
|
||||
export const isReporterPlugin = (plugin: any): plugin is ReporterPlugin => {
|
||||
if (!plugin || typeof plugin !== 'object') {
|
||||
return false;
|
||||
}
|
||||
|
||||
const reporterFunctions = [
|
||||
'onInit',
|
||||
'onCollectedMigrations',
|
||||
'onLockedMigrations',
|
||||
'onMigrationStart',
|
||||
'onMigrationSuccess',
|
||||
'onMigrationError',
|
||||
'onMigrationSkip',
|
||||
'onFinished',
|
||||
];
|
||||
|
||||
return reporterFunctions.some((fn) => typeof plugin[fn] === 'function');
|
||||
};
|
||||
|
||||
export const isPluginOfType = <T extends PluginType>(type: T, plugin: any): plugin is PluginFromType<T> => {
|
||||
if (type === 'generator') {
|
||||
return isGeneratorPlugin(plugin);
|
||||
|
|
@ -45,6 +65,10 @@ export const isPluginOfType = <T extends PluginType>(type: T, plugin: any): plug
|
|||
return isLoaderPlugin(plugin);
|
||||
}
|
||||
|
||||
if (type === 'reporter') {
|
||||
return isReporterPlugin(plugin);
|
||||
}
|
||||
|
||||
throw new Error(`Unknown plugin type: ${type}`);
|
||||
};
|
||||
|
||||
|
|
@ -52,7 +76,9 @@ export const getOrLoadPlugin = async <T extends PluginType>(
|
|||
type: T,
|
||||
plugins: Array<Plugin | string>,
|
||||
): Promise<PluginFromType<T> | undefined> => {
|
||||
for await (const plugin of plugins) {
|
||||
const reversePlugins = [...plugins].reverse();
|
||||
|
||||
for await (const plugin of reversePlugins) {
|
||||
if (isPluginOfType(type, plugin)) {
|
||||
return plugin;
|
||||
}
|
||||
|
|
@ -72,8 +98,9 @@ export const getOrLoadPlugins = async <T extends PluginType>(
|
|||
plugins: Array<Plugin | string>,
|
||||
): Promise<Array<PluginFromType<T>>> => {
|
||||
const result: Array<PluginFromType<T>> = [];
|
||||
const reversePlugins = [...plugins].reverse();
|
||||
|
||||
for await (const plugin of plugins) {
|
||||
for await (const plugin of reversePlugins) {
|
||||
if (isPluginOfType(type, plugin)) {
|
||||
result.push(plugin);
|
||||
continue;
|
||||
|
|
|
|||
|
|
@ -1,3 +1,5 @@
|
|||
type Awaitable<T> = T | PromiseLike<T>;
|
||||
|
||||
export type MigrationStatus = 'failed' | 'done';
|
||||
|
||||
export type MigrationHistoryEntry = {
|
||||
|
|
@ -19,7 +21,7 @@ export type Storage = {
|
|||
*
|
||||
* @returns The migrations that were successfully locked.
|
||||
*/
|
||||
lock(migrations: string[]): Promise<string[]>;
|
||||
lock(migrations: MigrationMetadata[]): Promise<MigrationMetadata[]>;
|
||||
/**
|
||||
* The unlock method is called after all migrations have been executed or when the process is interrupted (e.g. by a SIGTERM or SIGINT signal).
|
||||
*
|
||||
|
|
@ -27,7 +29,7 @@ export type Storage = {
|
|||
*
|
||||
* @param migrations The previously successfully locked migrations that should now be unlocked.
|
||||
*/
|
||||
unlock(migrations: string[]): Promise<void>;
|
||||
unlock(migrations: MigrationMetadata[]): Promise<void>;
|
||||
/**
|
||||
* Get the history of previously executed migrations.
|
||||
*
|
||||
|
|
@ -46,14 +48,14 @@ export type Storage = {
|
|||
*
|
||||
* @param migration The name of the migration that should be marked as done.
|
||||
*/
|
||||
onSuccess(migration: string): Promise<void>;
|
||||
onSuccess(migration: MigrationMetadataFinished): Promise<void>;
|
||||
/**
|
||||
* Called when a migration has failed.
|
||||
*
|
||||
* @param migration The name of the migration that should be marked as failed.
|
||||
* @param error The error that caused the migration to fail.
|
||||
*/
|
||||
onError(migration: string, error: Error): Promise<void>;
|
||||
onError(migration: MigrationMetadataFinished, error: Error): Promise<void>;
|
||||
};
|
||||
|
||||
export type StoragePlugin = {
|
||||
|
|
@ -87,7 +89,7 @@ export type GeneratorPlugin = {
|
|||
|
||||
export type GenerateMigrationFunction = GeneratorPlugin['generateMigration'];
|
||||
|
||||
export type MigrationFunction = () => Promise<void>;
|
||||
export type MigrationFunction = () => Awaitable<void>;
|
||||
|
||||
export type MigrationMetadata = {
|
||||
/**
|
||||
|
|
@ -126,6 +128,12 @@ export type MigrationMetadata = {
|
|||
extension: string;
|
||||
};
|
||||
|
||||
export type MigrationMetadataFinished = MigrationMetadata & {
|
||||
status: MigrationStatus | 'skipped';
|
||||
duration: number;
|
||||
error?: Error;
|
||||
};
|
||||
|
||||
export type LoaderPlugin = {
|
||||
/**
|
||||
* The file extensions that this plugin can load.
|
||||
|
|
@ -137,17 +145,89 @@ export type LoaderPlugin = {
|
|||
* @param migration Some metadata about the migration file that should be loaded.
|
||||
* @returns A function that will execute the migration.
|
||||
*/
|
||||
loadMigration(migration: MigrationMetadata): Promise<MigrationFunction>;
|
||||
loadMigration(migration: MigrationMetadata): Awaitable<MigrationFunction>;
|
||||
};
|
||||
|
||||
export type Plugin = StoragePlugin | GeneratorPlugin | LoaderPlugin;
|
||||
type InitParameters = {
|
||||
/**
|
||||
* The directory where the migration files are located
|
||||
*/
|
||||
directory: string;
|
||||
/**
|
||||
* The current working directory (the same as process.cwd())
|
||||
*/
|
||||
cwd: string;
|
||||
/**
|
||||
* Specifies whether the migration process is a dry run or not.
|
||||
*/
|
||||
dry: boolean;
|
||||
};
|
||||
|
||||
export type PluginType = 'storage' | 'generator' | 'loader';
|
||||
export type ReporterPlugin = Partial<{
|
||||
/**
|
||||
* Called when the plugin is initialized, which happens before the migrations are collected.
|
||||
*/
|
||||
onInit(parameters: InitParameters): Awaitable<void>;
|
||||
/**
|
||||
* Called when all pending migrations that should be executed have been collected.
|
||||
*
|
||||
* @param migrations The pending migrations that will be executed.
|
||||
*/
|
||||
onCollectedMigrations(migrations: MigrationMetadata[]): Awaitable<void>;
|
||||
/**
|
||||
* Called when the migrations have been successfully locked.
|
||||
*
|
||||
* Usually the migrations passed to this method are the same as the migrations passed to the onCollectedMigrations method,
|
||||
* but in case of a concurrent migration attempt, some or all migrations might already be locked by another process.
|
||||
*
|
||||
* @param migrations The migrations that have been successfully locked so they can be executed.
|
||||
*/
|
||||
onLockedMigrations(migrations: MigrationMetadata[]): Awaitable<void>;
|
||||
/**
|
||||
* Called when a migration is about to be executed.
|
||||
*
|
||||
* @param migration Information about the migration that is about to be executed.
|
||||
*/
|
||||
onMigrationStart(migration: MigrationMetadata): Awaitable<void>;
|
||||
/**
|
||||
* Called when a migration has been successfully executed.
|
||||
*
|
||||
* @param migration Information about the migration that was executed.
|
||||
*/
|
||||
onMigrationSuccess(migration: MigrationMetadataFinished): Awaitable<void>;
|
||||
/**
|
||||
* Called when a migration has failed.
|
||||
*
|
||||
* @param migration Information about the migration that failed.
|
||||
* @param error The error that caused the migration to fail.
|
||||
*/
|
||||
onMigrationError(migration: MigrationMetadataFinished, error: Error): Awaitable<void>;
|
||||
/**
|
||||
* Called when a migration has been skipped because a previous migration failed, it couldn't be successfully locked, or in case of a dry run.
|
||||
*
|
||||
* @param migration Information about the migration that was skipped.
|
||||
*/
|
||||
onMigrationSkip(migration: MigrationMetadata): Awaitable<void>;
|
||||
/**
|
||||
* Called when the migration process has finished.
|
||||
*
|
||||
* This is called either after all migrations have been executed successfully, at the end of a dry run, or when a migration has failed.
|
||||
*
|
||||
* @param migrations Information about all migrations that were executed, their status and any error that occurred.
|
||||
* @param error If the migration process failed, this will be the error that caused the failure.
|
||||
*/
|
||||
onFinished(migrations: MigrationMetadataFinished[], error?: Error): Awaitable<void>;
|
||||
}>;
|
||||
|
||||
export type PluginFromType<T extends PluginType> = T extends 'storage'
|
||||
? StoragePlugin
|
||||
: T extends 'generator'
|
||||
? GeneratorPlugin
|
||||
: T extends 'loader'
|
||||
? LoaderPlugin
|
||||
: never;
|
||||
export type Plugin = StoragePlugin | GeneratorPlugin | LoaderPlugin | ReporterPlugin;
|
||||
|
||||
type PluginTypeMap = {
|
||||
storage: StoragePlugin;
|
||||
generator: GeneratorPlugin;
|
||||
loader: LoaderPlugin;
|
||||
reporter: ReporterPlugin;
|
||||
};
|
||||
|
||||
export type PluginType = keyof PluginTypeMap;
|
||||
|
||||
export type PluginFromType<T extends PluginType> = PluginTypeMap[T];
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue