refactor(cli): introduce the migration-runner helper for less code duplication and fewer return paths
Thanks to the migration-runner the "up" and "list" commands are now very similar code wise
This commit is contained in:
parent
5307e87242
commit
8cc43a8f83
9 changed files with 332 additions and 294 deletions
14
packages/cli/src/array-from-async.ts
Normal file
14
packages/cli/src/array-from-async.ts
Normal file
|
|
@ -0,0 +1,14 @@
|
|||
/**
|
||||
* This is a simple polyfill for [Array.fromAsync()](https://github.com/tc39/proposal-array-from-async)
|
||||
*
|
||||
* It converts an async iterable to an array.
|
||||
*/
|
||||
export const arrayFromAsync = async <T>(iterable: AsyncIterable<T>): Promise<T[]> => {
|
||||
const array: T[] = [];
|
||||
|
||||
for await (const item of iterable) {
|
||||
array.push(item);
|
||||
}
|
||||
|
||||
return array;
|
||||
};
|
||||
33
packages/cli/src/collect-migrations.ts
Normal file
33
packages/cli/src/collect-migrations.ts
Normal file
|
|
@ -0,0 +1,33 @@
|
|||
import {
|
||||
type MigrationHistoryEntry,
|
||||
type MigrationMetadata,
|
||||
type MigrationMetadataFinished,
|
||||
} from '@emigrate/plugin-tools/types';
|
||||
import { toMigrationMetadata } from './to-migration-metadata.js';
|
||||
import { getMigrations as getMigrationsOriginal } from './get-migrations.js';
|
||||
|
||||
export async function* collectMigrations(
|
||||
cwd: string,
|
||||
directory: string,
|
||||
history: AsyncIterable<MigrationHistoryEntry>,
|
||||
getMigrations = getMigrationsOriginal,
|
||||
): AsyncIterable<MigrationMetadata | MigrationMetadataFinished> {
|
||||
const allMigrations = await getMigrations(cwd, directory);
|
||||
const seen = new Set<string>();
|
||||
|
||||
for await (const entry of history) {
|
||||
const index = allMigrations.findIndex((migrationFile) => migrationFile.name === entry.name);
|
||||
|
||||
if (index === -1) {
|
||||
continue;
|
||||
}
|
||||
|
||||
yield toMigrationMetadata(entry, { cwd, directory });
|
||||
|
||||
seen.add(entry.name);
|
||||
}
|
||||
|
||||
yield* allMigrations.filter((migration) => !seen.has(migration.name));
|
||||
|
||||
seen.clear();
|
||||
}
|
||||
|
|
@ -1,12 +1,11 @@
|
|||
import process from 'node:process';
|
||||
import path from 'node:path';
|
||||
import { getOrLoadReporter, getOrLoadStorage } from '@emigrate/plugin-tools';
|
||||
import { type MigrationMetadataFinished } from '@emigrate/plugin-tools/types';
|
||||
import { BadOptionError, MigrationHistoryError, MissingOptionError, StorageInitError } from '../errors.js';
|
||||
import { BadOptionError, MissingOptionError, StorageInitError } from '../errors.js';
|
||||
import { type Config } from '../types.js';
|
||||
import { withLeadingPeriod } from '../with-leading-period.js';
|
||||
import { getMigrations } from '../get-migrations.js';
|
||||
import { exec } from '../exec.js';
|
||||
import { migrationRunner } from '../migration-runner.js';
|
||||
import { arrayFromAsync } from '../array-from-async.js';
|
||||
import { collectMigrations } from '../collect-migrations.js';
|
||||
|
||||
const lazyDefaultReporter = async () => import('../reporters/default.js');
|
||||
|
||||
|
|
@ -41,57 +40,20 @@ export default async function listCommand({ directory, reporter: reporterConfig,
|
|||
return 1;
|
||||
}
|
||||
|
||||
const migrationFiles = await getMigrations(cwd, directory);
|
||||
const collectedMigrations = collectMigrations(cwd, directory, storage.getHistory());
|
||||
|
||||
let migrationHistoryError: MigrationHistoryError | undefined;
|
||||
const error = await migrationRunner({
|
||||
dry: true,
|
||||
reporter,
|
||||
storage,
|
||||
migrations: await arrayFromAsync(collectedMigrations),
|
||||
async validate() {
|
||||
// No-op
|
||||
},
|
||||
async execute() {
|
||||
throw new Error('Unexpected execute call');
|
||||
},
|
||||
});
|
||||
|
||||
const finishedMigrations: MigrationMetadataFinished[] = [];
|
||||
|
||||
for await (const migrationHistoryEntry of storage.getHistory()) {
|
||||
const index = migrationFiles.findIndex((migrationFile) => migrationFile.name === migrationHistoryEntry.name);
|
||||
|
||||
if (index === -1) {
|
||||
// Only care about entries that exists in the current migration directory
|
||||
continue;
|
||||
}
|
||||
|
||||
const filePath = path.resolve(cwd, directory, migrationHistoryEntry.name);
|
||||
const finishedMigration: MigrationMetadataFinished = {
|
||||
name: migrationHistoryEntry.name,
|
||||
status: migrationHistoryEntry.status,
|
||||
filePath,
|
||||
relativeFilePath: path.relative(cwd, filePath),
|
||||
extension: withLeadingPeriod(path.extname(migrationHistoryEntry.name)),
|
||||
directory,
|
||||
cwd,
|
||||
duration: 0,
|
||||
};
|
||||
|
||||
if (migrationHistoryEntry.status === 'failed') {
|
||||
migrationHistoryError = new MigrationHistoryError(
|
||||
`Migration ${migrationHistoryEntry.name} is in a failed state`,
|
||||
migrationHistoryEntry,
|
||||
);
|
||||
|
||||
await reporter.onMigrationError?.(finishedMigration, migrationHistoryError);
|
||||
} else {
|
||||
await reporter.onMigrationSuccess?.(finishedMigration);
|
||||
}
|
||||
|
||||
finishedMigrations.push(finishedMigration);
|
||||
|
||||
migrationFiles.splice(index, 1);
|
||||
}
|
||||
|
||||
for await (const migration of migrationFiles) {
|
||||
const finishedMigration: MigrationMetadataFinished = { ...migration, status: 'pending', duration: 0 };
|
||||
await reporter.onMigrationSkip?.(finishedMigration);
|
||||
finishedMigrations.push(finishedMigration);
|
||||
}
|
||||
|
||||
await reporter.onFinished?.(finishedMigrations, migrationHistoryError);
|
||||
|
||||
await storage.end();
|
||||
|
||||
return migrationHistoryError ? 1 : 0;
|
||||
return error ? 1 : 0;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
import { describe, it, mock, type Mock } from 'node:test';
|
||||
import assert from 'node:assert';
|
||||
import path from 'node:path';
|
||||
import { serializeError } from '@emigrate/plugin-tools';
|
||||
import {
|
||||
type EmigrateReporter,
|
||||
type MigrationHistoryEntry,
|
||||
|
|
@ -49,8 +50,8 @@ describe('up', () => {
|
|||
|
||||
assert.strictEqual(exitCode, 1);
|
||||
assert.strictEqual(reporter.onInit.mock.calls.length, 1);
|
||||
assert.strictEqual(reporter.onCollectedMigrations.mock.calls.length, 0);
|
||||
assert.strictEqual(reporter.onLockedMigrations.mock.calls.length, 0);
|
||||
assert.strictEqual(reporter.onCollectedMigrations.mock.calls.length, 1);
|
||||
assert.strictEqual(reporter.onLockedMigrations.mock.calls.length, 1);
|
||||
assert.strictEqual(reporter.onMigrationStart.mock.calls.length, 0);
|
||||
assert.strictEqual(reporter.onMigrationSuccess.mock.calls.length, 0);
|
||||
assert.strictEqual(reporter.onMigrationError.mock.calls.length, 1);
|
||||
|
|
@ -59,11 +60,11 @@ describe('up', () => {
|
|||
assert.strictEqual(args?.length, 2);
|
||||
const entries = args[0];
|
||||
const error = args[1];
|
||||
assert.strictEqual(entries.length, 2);
|
||||
assert.deepStrictEqual(
|
||||
entries.map((entry) => `${entry.name} (${entry.status})`),
|
||||
['some_other.js (skipped)', 'some_file.sql (failed)'],
|
||||
);
|
||||
assert.strictEqual(entries.length, 2);
|
||||
assert.strictEqual(error?.message, 'No loader plugin found for file extension: .sql');
|
||||
});
|
||||
|
||||
|
|
@ -74,8 +75,8 @@ describe('up', () => {
|
|||
|
||||
assert.strictEqual(exitCode, 1);
|
||||
assert.strictEqual(reporter.onInit.mock.calls.length, 1);
|
||||
assert.strictEqual(reporter.onCollectedMigrations.mock.calls.length, 0);
|
||||
assert.strictEqual(reporter.onLockedMigrations.mock.calls.length, 0);
|
||||
assert.strictEqual(reporter.onCollectedMigrations.mock.calls.length, 1);
|
||||
assert.strictEqual(reporter.onLockedMigrations.mock.calls.length, 1);
|
||||
assert.strictEqual(reporter.onMigrationStart.mock.calls.length, 0);
|
||||
assert.strictEqual(reporter.onMigrationSuccess.mock.calls.length, 0);
|
||||
assert.strictEqual(reporter.onMigrationError.mock.calls.length, 1);
|
||||
|
|
@ -119,13 +120,13 @@ describe('up', () => {
|
|||
const [entries, error] = reporter.onFinished.mock.calls[0]?.arguments ?? [];
|
||||
assert.strictEqual(
|
||||
error?.message,
|
||||
`Migration ${failedEntry.name} is in a failed state, please fix and remove it first`,
|
||||
`Migration ${failedEntry.name} is in a failed state, it should be fixed and removed`,
|
||||
);
|
||||
assert.strictEqual(getErrorCause(error), failedEntry.error);
|
||||
assert.strictEqual(entries?.length, 2);
|
||||
assert.deepStrictEqual(
|
||||
entries.map((entry) => `${entry.name} (${entry.status})`),
|
||||
['some_failed_migration.js (failed)', 'some_file.js (pending)'],
|
||||
['some_failed_migration.js (failed)', 'some_file.js (skipped)'],
|
||||
);
|
||||
});
|
||||
|
||||
|
|
@ -156,7 +157,7 @@ describe('up', () => {
|
|||
const [entries, error] = reporter.onFinished.mock.calls[0]?.arguments ?? [];
|
||||
assert.strictEqual(
|
||||
error?.message,
|
||||
`Migration ${failedEntry.name} is in a failed state, please fix and remove it first`,
|
||||
`Migration ${failedEntry.name} is in a failed state, it should be fixed and removed`,
|
||||
);
|
||||
assert.strictEqual(getErrorCause(error), failedEntry.error);
|
||||
assert.strictEqual(entries?.length, 2);
|
||||
|
|
@ -354,7 +355,7 @@ function toEntry(
|
|||
name,
|
||||
status,
|
||||
date: new Date(),
|
||||
error: status === 'failed' ? new Error('Failed') : undefined,
|
||||
error: status === 'failed' ? serializeError(new Error('Failed')) : undefined,
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,27 +1,15 @@
|
|||
import path from 'node:path';
|
||||
import process from 'node:process';
|
||||
import { getOrLoadPlugins, getOrLoadReporter, getOrLoadStorage, serializeError } from '@emigrate/plugin-tools';
|
||||
import {
|
||||
type LoaderPlugin,
|
||||
type MigrationFunction,
|
||||
type MigrationMetadata,
|
||||
type MigrationMetadataFinished,
|
||||
} from '@emigrate/plugin-tools/types';
|
||||
import {
|
||||
BadOptionError,
|
||||
EmigrateError,
|
||||
MigrationHistoryError,
|
||||
MigrationLoadError,
|
||||
MigrationRunError,
|
||||
MissingOptionError,
|
||||
StorageInitError,
|
||||
toError,
|
||||
} from '../errors.js';
|
||||
import { getOrLoadPlugins, getOrLoadReporter, getOrLoadStorage } from '@emigrate/plugin-tools';
|
||||
import { isFinishedMigration, type LoaderPlugin } from '@emigrate/plugin-tools/types';
|
||||
import { BadOptionError, MigrationLoadError, MissingOptionError, StorageInitError } from '../errors.js';
|
||||
import { type Config } from '../types.js';
|
||||
import { withLeadingPeriod } from '../with-leading-period.js';
|
||||
import { getMigrations as getMigrationsOriginal, type GetMigrationsFunction } from '../get-migrations.js';
|
||||
import { getDuration } from '../get-duration.js';
|
||||
import { type GetMigrationsFunction } from '../get-migrations.js';
|
||||
import { exec } from '../exec.js';
|
||||
import { migrationRunner } from '../migration-runner.js';
|
||||
import { filterAsync } from '../filter-async.js';
|
||||
import { collectMigrations } from '../collect-migrations.js';
|
||||
import { arrayFromAsync } from '../array-from-async.js';
|
||||
|
||||
type ExtraFlags = {
|
||||
cwd?: string;
|
||||
|
|
@ -39,7 +27,7 @@ export default async function upCommand({
|
|||
dry = false,
|
||||
plugins = [],
|
||||
cwd = process.cwd(),
|
||||
getMigrations = getMigrationsOriginal,
|
||||
getMigrations,
|
||||
}: Config & ExtraFlags): Promise<number> {
|
||||
if (!directory) {
|
||||
throw new MissingOptionError('directory');
|
||||
|
|
@ -70,226 +58,52 @@ export default async function upCommand({
|
|||
return 1;
|
||||
}
|
||||
|
||||
const migrationFiles = await getMigrations(cwd, directory);
|
||||
const failedEntries: MigrationMetadataFinished[] = [];
|
||||
const collectedMigrations = filterAsync(
|
||||
collectMigrations(cwd, directory, storage.getHistory(), getMigrations),
|
||||
(migration) => !isFinishedMigration(migration) || migration.status === 'failed',
|
||||
);
|
||||
|
||||
for await (const migrationHistoryEntry of storage.getHistory()) {
|
||||
const index = migrationFiles.findIndex((migrationFile) => migrationFile.name === migrationHistoryEntry.name);
|
||||
|
||||
if (index === -1) {
|
||||
// Only care about entries that exists in the current migration directory
|
||||
continue;
|
||||
}
|
||||
|
||||
if (migrationHistoryEntry.status === 'failed') {
|
||||
const filePath = path.resolve(cwd, directory, migrationHistoryEntry.name);
|
||||
const finishedMigration: MigrationMetadataFinished = {
|
||||
name: migrationHistoryEntry.name,
|
||||
status: migrationHistoryEntry.status,
|
||||
filePath,
|
||||
relativeFilePath: path.relative(cwd, filePath),
|
||||
extension: withLeadingPeriod(path.extname(migrationHistoryEntry.name)),
|
||||
error: new MigrationHistoryError(
|
||||
`Migration ${migrationHistoryEntry.name} is in a failed state, please fix and remove it first`,
|
||||
migrationHistoryEntry,
|
||||
),
|
||||
directory,
|
||||
cwd,
|
||||
duration: 0,
|
||||
};
|
||||
failedEntries.push(finishedMigration);
|
||||
}
|
||||
|
||||
migrationFiles.splice(index, 1);
|
||||
}
|
||||
|
||||
const migrationFileExtensions = new Set(migrationFiles.map((migration) => migration.extension));
|
||||
const loaderPlugins = await getOrLoadPlugins('loader', [lazyPluginLoaderJs, ...plugins]);
|
||||
|
||||
const loaderByExtension = new Map<string, LoaderPlugin | undefined>(
|
||||
[...migrationFileExtensions].map(
|
||||
(extension) =>
|
||||
[
|
||||
extension,
|
||||
loaderPlugins.find((plugin) =>
|
||||
const loaderByExtension = new Map<string, LoaderPlugin | undefined>();
|
||||
|
||||
const getLoaderByExtension = (extension: string) => {
|
||||
if (!loaderByExtension.has(extension)) {
|
||||
const loader = loaderPlugins.find((plugin) =>
|
||||
plugin.loadableExtensions.some((loadableExtension) => withLeadingPeriod(loadableExtension) === extension),
|
||||
),
|
||||
] as const,
|
||||
),
|
||||
);
|
||||
|
||||
for await (const [extension, loader] of loaderByExtension) {
|
||||
if (!loader) {
|
||||
const finishedMigrations: MigrationMetadataFinished[] = [...failedEntries];
|
||||
|
||||
for await (const failedEntry of failedEntries) {
|
||||
await reporter.onMigrationError?.(failedEntry, failedEntry.error!);
|
||||
loaderByExtension.set(extension, loader);
|
||||
}
|
||||
|
||||
for await (const migration of migrationFiles) {
|
||||
if (migration.extension === extension) {
|
||||
const error = new BadOptionError('plugin', `No loader plugin found for file extension: ${extension}`);
|
||||
const finishedMigration: MigrationMetadataFinished = { ...migration, duration: 0, status: 'failed', error };
|
||||
await reporter.onMigrationError?.(finishedMigration, error);
|
||||
finishedMigrations.push(finishedMigration);
|
||||
} else {
|
||||
const finishedMigration: MigrationMetadataFinished = { ...migration, duration: 0, status: 'skipped' };
|
||||
await reporter.onMigrationSkip?.(finishedMigration);
|
||||
finishedMigrations.push(finishedMigration);
|
||||
}
|
||||
}
|
||||
|
||||
await reporter.onFinished?.(
|
||||
finishedMigrations,
|
||||
new BadOptionError('plugin', `No loader plugin found for file extension: ${extension}`),
|
||||
);
|
||||
|
||||
await storage.end();
|
||||
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
await reporter.onCollectedMigrations?.([...failedEntries, ...migrationFiles]);
|
||||
|
||||
if (migrationFiles.length === 0 || dry || failedEntries.length > 0) {
|
||||
const error = failedEntries.find((migration) => migration.status === 'failed')?.error;
|
||||
await reporter.onLockedMigrations?.(migrationFiles);
|
||||
|
||||
const finishedMigrations: MigrationMetadataFinished[] = migrationFiles.map((migration) => ({
|
||||
...migration,
|
||||
duration: 0,
|
||||
status: 'pending',
|
||||
}));
|
||||
|
||||
for await (const failedMigration of failedEntries) {
|
||||
await reporter.onMigrationError?.(failedMigration, failedMigration.error!);
|
||||
}
|
||||
|
||||
for await (const migration of finishedMigrations) {
|
||||
await reporter.onMigrationSkip?.(migration);
|
||||
}
|
||||
|
||||
await reporter.onFinished?.([...failedEntries, ...finishedMigrations], error);
|
||||
|
||||
await storage.end();
|
||||
|
||||
return failedEntries.length > 0 ? 1 : 0;
|
||||
}
|
||||
|
||||
let lockedMigrationFiles: MigrationMetadata[] = [];
|
||||
|
||||
try {
|
||||
lockedMigrationFiles = (await storage.lock(migrationFiles)) ?? [];
|
||||
|
||||
await reporter.onLockedMigrations?.(lockedMigrationFiles);
|
||||
} catch (error) {
|
||||
for await (const migration of migrationFiles) {
|
||||
await reporter.onMigrationSkip?.({ ...migration, duration: 0, status: 'skipped' });
|
||||
}
|
||||
|
||||
await reporter.onFinished?.([], toError(error));
|
||||
|
||||
await storage.end();
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
const nonLockedMigrations = migrationFiles.filter((migration) => !lockedMigrationFiles.includes(migration));
|
||||
|
||||
for await (const migration of nonLockedMigrations) {
|
||||
await reporter.onMigrationSkip?.({ ...migration, duration: 0, status: 'skipped' });
|
||||
}
|
||||
|
||||
let cleaningUp: Promise<void> | undefined;
|
||||
|
||||
const cleanup = async () => {
|
||||
if (cleaningUp) {
|
||||
return cleaningUp;
|
||||
}
|
||||
|
||||
process.off('SIGINT', cleanup);
|
||||
process.off('SIGTERM', cleanup);
|
||||
|
||||
cleaningUp = storage.unlock(lockedMigrationFiles).then(async () => storage.end());
|
||||
|
||||
return cleaningUp;
|
||||
return loaderByExtension.get(extension);
|
||||
};
|
||||
|
||||
process.on('SIGINT', cleanup);
|
||||
process.on('SIGTERM', cleanup);
|
||||
const error = await migrationRunner({
|
||||
dry,
|
||||
reporter,
|
||||
storage,
|
||||
migrations: await arrayFromAsync(collectedMigrations),
|
||||
async validate(migration) {
|
||||
const loader = getLoaderByExtension(migration.extension);
|
||||
|
||||
const finishedMigrations: MigrationMetadataFinished[] = [];
|
||||
|
||||
try {
|
||||
for await (const migration of lockedMigrationFiles) {
|
||||
const lastMigrationStatus = finishedMigrations.at(-1)?.status;
|
||||
|
||||
if (lastMigrationStatus === 'failed' || lastMigrationStatus === 'skipped') {
|
||||
const finishedMigration: MigrationMetadataFinished = { ...migration, status: 'skipped', duration: 0 };
|
||||
await reporter.onMigrationSkip?.(finishedMigration);
|
||||
finishedMigrations.push(finishedMigration);
|
||||
continue;
|
||||
if (!loader) {
|
||||
throw new BadOptionError('plugin', `No loader plugin found for file extension: ${migration.extension}`);
|
||||
}
|
||||
},
|
||||
async execute(migration) {
|
||||
const loader = getLoaderByExtension(migration.extension)!;
|
||||
const [migrationFunction, loadError] = await exec(async () => loader.loadMigration(migration));
|
||||
|
||||
await reporter.onMigrationStart?.(migration);
|
||||
|
||||
const loader = loaderByExtension.get(migration.extension)!;
|
||||
const start = process.hrtime();
|
||||
|
||||
let migrationFunction: MigrationFunction;
|
||||
|
||||
try {
|
||||
try {
|
||||
migrationFunction = await loader.loadMigration(migration);
|
||||
} catch (error) {
|
||||
if (loadError) {
|
||||
throw new MigrationLoadError(`Failed to load migration file: ${migration.relativeFilePath}`, migration, {
|
||||
cause: error,
|
||||
cause: loadError,
|
||||
});
|
||||
}
|
||||
|
||||
await migrationFunction();
|
||||
},
|
||||
});
|
||||
|
||||
const duration = getDuration(start);
|
||||
const finishedMigration: MigrationMetadataFinished = { ...migration, status: 'done', duration };
|
||||
|
||||
await storage.onSuccess(finishedMigration);
|
||||
await reporter.onMigrationSuccess?.(finishedMigration);
|
||||
|
||||
finishedMigrations.push(finishedMigration);
|
||||
} catch (error) {
|
||||
const errorInstance = toError(error);
|
||||
const serializedError = serializeError(errorInstance);
|
||||
const duration = getDuration(start);
|
||||
const finishedMigration: MigrationMetadataFinished = {
|
||||
...migration,
|
||||
status: 'failed',
|
||||
duration,
|
||||
error: serializedError,
|
||||
};
|
||||
|
||||
await storage.onError(finishedMigration, serializedError);
|
||||
await reporter.onMigrationError?.(finishedMigration, errorInstance);
|
||||
|
||||
finishedMigrations.push(finishedMigration);
|
||||
}
|
||||
}
|
||||
|
||||
const firstFailed = finishedMigrations.find((migration) => migration.status === 'failed');
|
||||
|
||||
return firstFailed ? 1 : 0;
|
||||
} finally {
|
||||
const firstFailed = finishedMigrations.find((migration) => migration.status === 'failed');
|
||||
const firstError =
|
||||
firstFailed?.error instanceof EmigrateError
|
||||
? firstFailed.error
|
||||
: firstFailed
|
||||
? new MigrationRunError(`Failed to run migration: ${firstFailed.relativeFilePath}`, firstFailed, {
|
||||
cause: firstFailed?.error,
|
||||
})
|
||||
: undefined;
|
||||
|
||||
await cleanup();
|
||||
await reporter.onFinished?.(finishedMigrations, firstError);
|
||||
}
|
||||
return error ? 1 : 0;
|
||||
}
|
||||
|
|
|
|||
13
packages/cli/src/filter-async.ts
Normal file
13
packages/cli/src/filter-async.ts
Normal file
|
|
@ -0,0 +1,13 @@
|
|||
export function filterAsync<T, S extends T>(
|
||||
iterable: AsyncIterable<T>,
|
||||
filter: (item: T) => item is S,
|
||||
): AsyncIterable<S>;
|
||||
export function filterAsync<T>(iterable: AsyncIterable<T>, filter: (item: T) => unknown): AsyncIterable<T>;
|
||||
|
||||
export async function* filterAsync<T>(iterable: AsyncIterable<T>, filter: (item: T) => unknown): AsyncIterable<T> {
|
||||
for await (const item of iterable) {
|
||||
if (filter(item)) {
|
||||
yield item;
|
||||
}
|
||||
}
|
||||
}
|
||||
165
packages/cli/src/migration-runner.ts
Normal file
165
packages/cli/src/migration-runner.ts
Normal file
|
|
@ -0,0 +1,165 @@
|
|||
import process from 'node:process';
|
||||
import {
|
||||
isFinishedMigration,
|
||||
type EmigrateReporter,
|
||||
type MigrationMetadata,
|
||||
type MigrationMetadataFinished,
|
||||
type Storage,
|
||||
} from '@emigrate/plugin-tools/types';
|
||||
import { toError, EmigrateError, MigrationRunError } from './errors.js';
|
||||
import { exec } from './exec.js';
|
||||
import { getDuration } from './get-duration.js';
|
||||
|
||||
type MigrationRunnerParameters = {
|
||||
dry: boolean;
|
||||
reporter: EmigrateReporter;
|
||||
storage: Storage;
|
||||
migrations: Array<MigrationMetadata | MigrationMetadataFinished>;
|
||||
validate: (migration: MigrationMetadata) => Promise<void>;
|
||||
execute: (migration: MigrationMetadata) => Promise<void>;
|
||||
};
|
||||
|
||||
export const migrationRunner = async ({
|
||||
dry,
|
||||
reporter,
|
||||
storage,
|
||||
migrations,
|
||||
validate,
|
||||
execute,
|
||||
}: MigrationRunnerParameters): Promise<Error | undefined> => {
|
||||
await reporter.onCollectedMigrations?.(migrations);
|
||||
|
||||
const finishedMigrations: MigrationMetadataFinished[] = [];
|
||||
const migrationsToRun: MigrationMetadata[] = [];
|
||||
|
||||
let skip = false;
|
||||
|
||||
for await (const migration of migrations) {
|
||||
if (isFinishedMigration(migration)) {
|
||||
skip ||= migration.status === 'failed' || migration.status === 'skipped';
|
||||
|
||||
finishedMigrations.push(migration);
|
||||
} else if (skip) {
|
||||
finishedMigrations.push({
|
||||
...migration,
|
||||
status: dry ? 'pending' : 'skipped',
|
||||
duration: 0,
|
||||
});
|
||||
} else {
|
||||
try {
|
||||
await validate(migration);
|
||||
migrationsToRun.push(migration);
|
||||
} catch (error) {
|
||||
for await (const migration of migrationsToRun) {
|
||||
finishedMigrations.push({ ...migration, status: 'skipped', duration: 0 });
|
||||
}
|
||||
|
||||
migrationsToRun.length = 0;
|
||||
|
||||
finishedMigrations.push({
|
||||
...migration,
|
||||
status: 'failed',
|
||||
duration: 0,
|
||||
error: toError(error),
|
||||
});
|
||||
|
||||
skip = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const [lockedMigrations, lockError] = dry ? [migrationsToRun] : await exec(async () => storage.lock(migrationsToRun));
|
||||
|
||||
if (lockError) {
|
||||
for await (const migration of migrationsToRun) {
|
||||
finishedMigrations.push({ ...migration, duration: 0, status: 'skipped' });
|
||||
}
|
||||
|
||||
migrationsToRun.length = 0;
|
||||
|
||||
skip = true;
|
||||
} else {
|
||||
await reporter.onLockedMigrations?.(lockedMigrations);
|
||||
}
|
||||
|
||||
for await (const finishedMigration of finishedMigrations) {
|
||||
switch (finishedMigration.status) {
|
||||
case 'failed': {
|
||||
await reporter.onMigrationError?.(finishedMigration, finishedMigration.error!);
|
||||
break;
|
||||
}
|
||||
|
||||
case 'pending': {
|
||||
await reporter.onMigrationSkip?.(finishedMigration);
|
||||
break;
|
||||
}
|
||||
|
||||
case 'skipped': {
|
||||
await reporter.onMigrationSkip?.(finishedMigration);
|
||||
break;
|
||||
}
|
||||
|
||||
default: {
|
||||
await reporter.onMigrationSuccess?.(finishedMigration);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for await (const migration of lockedMigrations ?? []) {
|
||||
if (dry || skip) {
|
||||
const finishedMigration: MigrationMetadataFinished = {
|
||||
...migration,
|
||||
status: dry ? 'pending' : 'skipped',
|
||||
duration: 0,
|
||||
};
|
||||
|
||||
await reporter.onMigrationSkip?.(finishedMigration);
|
||||
|
||||
finishedMigrations.push(finishedMigration);
|
||||
continue;
|
||||
}
|
||||
|
||||
await reporter.onMigrationStart?.(migration);
|
||||
|
||||
const start = process.hrtime();
|
||||
|
||||
const [, migrationError] = await exec(async () => execute(migration));
|
||||
|
||||
const duration = getDuration(start);
|
||||
const finishedMigration: MigrationMetadataFinished = {
|
||||
...migration,
|
||||
status: migrationError ? 'failed' : 'done',
|
||||
duration,
|
||||
error: migrationError,
|
||||
};
|
||||
finishedMigrations.push(finishedMigration);
|
||||
|
||||
if (migrationError) {
|
||||
await storage.onError(finishedMigration, migrationError);
|
||||
await reporter.onMigrationError?.(finishedMigration, migrationError);
|
||||
skip = true;
|
||||
} else {
|
||||
await storage.onSuccess(finishedMigration);
|
||||
await reporter.onMigrationSuccess?.(finishedMigration);
|
||||
}
|
||||
}
|
||||
|
||||
const [, unlockError] = dry ? [] : await exec(async () => storage.unlock(lockedMigrations ?? []));
|
||||
|
||||
const firstFailed = finishedMigrations.find((migration) => migration.status === 'failed');
|
||||
const firstError =
|
||||
firstFailed?.error instanceof EmigrateError
|
||||
? firstFailed.error
|
||||
: firstFailed
|
||||
? new MigrationRunError(`Failed to run migration: ${firstFailed.relativeFilePath}`, firstFailed, {
|
||||
cause: firstFailed?.error,
|
||||
})
|
||||
: undefined;
|
||||
const error = unlockError ?? firstError ?? lockError;
|
||||
|
||||
await reporter.onFinished?.(finishedMigrations, error);
|
||||
await storage.end();
|
||||
|
||||
return error;
|
||||
};
|
||||
30
packages/cli/src/to-migration-metadata.ts
Normal file
30
packages/cli/src/to-migration-metadata.ts
Normal file
|
|
@ -0,0 +1,30 @@
|
|||
import path from 'node:path';
|
||||
import { type MigrationHistoryEntry, type MigrationMetadataFinished } from '@emigrate/plugin-tools/types';
|
||||
import { withLeadingPeriod } from './with-leading-period.js';
|
||||
import { MigrationHistoryError } from './errors.js';
|
||||
|
||||
export const toMigrationMetadata = (
|
||||
entry: MigrationHistoryEntry,
|
||||
{ cwd, directory }: { cwd: string; directory: string },
|
||||
): MigrationMetadataFinished => {
|
||||
const filePath = path.resolve(cwd, directory, entry.name);
|
||||
const finishedMigration: MigrationMetadataFinished = {
|
||||
name: entry.name,
|
||||
status: entry.status,
|
||||
filePath,
|
||||
relativeFilePath: path.relative(cwd, filePath),
|
||||
extension: withLeadingPeriod(path.extname(entry.name)),
|
||||
directory,
|
||||
cwd,
|
||||
duration: 0,
|
||||
};
|
||||
|
||||
if (entry.status === 'failed') {
|
||||
finishedMigration.error = new MigrationHistoryError(
|
||||
`Migration ${entry.name} is in a failed state, it should be fixed and removed`,
|
||||
entry,
|
||||
);
|
||||
}
|
||||
|
||||
return finishedMigration;
|
||||
};
|
||||
|
|
@ -156,7 +156,13 @@ export type MigrationMetadata = {
|
|||
export type MigrationMetadataFinished = MigrationMetadata & {
|
||||
status: MigrationStatus | 'skipped';
|
||||
duration: number;
|
||||
error?: SerializedError;
|
||||
error?: Error;
|
||||
};
|
||||
|
||||
export const isFinishedMigration = (
|
||||
migration: MigrationMetadata | MigrationMetadataFinished,
|
||||
): migration is MigrationMetadataFinished => {
|
||||
return 'status' in migration;
|
||||
};
|
||||
|
||||
export type LoaderPlugin = {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue