feat(storage): add "end" method to storage for cleaning up resources when commands are finished

This commit is contained in:
Joakim Carlstein 2023-12-08 13:01:19 +01:00
parent 334e2099bb
commit 703e6f028a
15 changed files with 150 additions and 77 deletions

View file

@ -0,0 +1,5 @@
---
'@emigrate/plugin-tools': minor
---
Add "end" method to storage plugins so they can cleanup resources when a command is finished

View file

@ -0,0 +1,5 @@
---
'@emigrate/mysql': patch
---
Fix issue with closing the connection pool too early

View file

@ -0,0 +1,5 @@
---
'@emigrate/storage-fs': minor
---
Implement an empty "end" method for cleaning up

View file

@ -0,0 +1,5 @@
---
'@emigrate/cli': patch
---
Handle storage initialization errors in the "list" and "remove" commands

View file

@ -0,0 +1,5 @@
---
'@emigrate/cli': minor
---
Call storage.end() to cleanup resources when a command has finished

View file

@ -230,7 +230,7 @@ Examples:
try { try {
const { default: listCommand } = await import('./commands/list.js'); const { default: listCommand } = await import('./commands/list.js');
await listCommand({ directory, storage, reporter }); process.exitCode = await listCommand({ directory, storage, reporter });
} catch (error) { } catch (error) {
if (error instanceof ShowUsageError) { if (error instanceof ShowUsageError) {
console.error(error.message, '\n'); console.error(error.message, '\n');
@ -306,7 +306,7 @@ Examples:
try { try {
const { default: removeCommand } = await import('./commands/remove.js'); const { default: removeCommand } = await import('./commands/remove.js');
await removeCommand({ directory, storage, reporter, force }, positionals[0] ?? ''); process.exitCode = await removeCommand({ directory, storage, reporter, force }, positionals[0] ?? '');
} catch (error) { } catch (error) {
if (error instanceof ShowUsageError) { if (error instanceof ShowUsageError) {
console.error(error.message, '\n'); console.error(error.message, '\n');

View file

@ -2,10 +2,11 @@ import process from 'node:process';
import path from 'node:path'; import path from 'node:path';
import { getOrLoadReporter, getOrLoadStorage } from '@emigrate/plugin-tools'; import { getOrLoadReporter, getOrLoadStorage } from '@emigrate/plugin-tools';
import { type MigrationMetadataFinished } from '@emigrate/plugin-tools/types'; import { type MigrationMetadataFinished } from '@emigrate/plugin-tools/types';
import { BadOptionError, MigrationHistoryError, MissingOptionError } from '../errors.js'; import { BadOptionError, MigrationHistoryError, MissingOptionError, StorageInitError } from '../errors.js';
import { type Config } from '../types.js'; import { type Config } from '../types.js';
import { withLeadingPeriod } from '../with-leading-period.js'; import { withLeadingPeriod } from '../with-leading-period.js';
import { getMigrations } from '../get-migrations.js'; import { getMigrations } from '../get-migrations.js';
import { exec } from '../exec.js';
const lazyDefaultReporter = async () => import('../reporters/default.js'); const lazyDefaultReporter = async () => import('../reporters/default.js');
@ -21,7 +22,6 @@ export default async function listCommand({ directory, reporter: reporterConfig,
throw new BadOptionError('storage', 'No storage found, please specify a storage using the storage option'); throw new BadOptionError('storage', 'No storage found, please specify a storage using the storage option');
} }
const storage = await storagePlugin.initializeStorage();
const reporter = await getOrLoadReporter([reporterConfig ?? lazyDefaultReporter]); const reporter = await getOrLoadReporter([reporterConfig ?? lazyDefaultReporter]);
if (!reporter) { if (!reporter) {
@ -33,6 +33,14 @@ export default async function listCommand({ directory, reporter: reporterConfig,
await reporter.onInit?.({ command: 'list', cwd, dry: false, directory }); await reporter.onInit?.({ command: 'list', cwd, dry: false, directory });
const [storage, storageError] = await exec(async () => storagePlugin.initializeStorage());
if (storageError) {
await reporter.onFinished?.([], new StorageInitError('Could not initialize storage', { cause: storageError }));
return 1;
}
const migrationFiles = await getMigrations(cwd, directory); const migrationFiles = await getMigrations(cwd, directory);
let migrationHistoryError: MigrationHistoryError | undefined; let migrationHistoryError: MigrationHistoryError | undefined;
@ -82,4 +90,8 @@ export default async function listCommand({ directory, reporter: reporterConfig,
} }
await reporter.onFinished?.(finishedMigrations, migrationHistoryError); await reporter.onFinished?.(finishedMigrations, migrationHistoryError);
await storage.end();
return migrationHistoryError ? 1 : 0;
} }

View file

@ -7,10 +7,12 @@ import {
MissingArgumentsError, MissingArgumentsError,
MissingOptionError, MissingOptionError,
OptionNeededError, OptionNeededError,
StorageInitError,
} from '../errors.js'; } from '../errors.js';
import { type Config } from '../types.js'; import { type Config } from '../types.js';
import { getMigration } from '../get-migration.js'; import { getMigration } from '../get-migration.js';
import { getDuration } from '../get-duration.js'; import { getDuration } from '../get-duration.js';
import { exec } from '../exec.js';
type ExtraFlags = { type ExtraFlags = {
force?: boolean; force?: boolean;
@ -37,7 +39,6 @@ export default async function removeCommand(
throw new BadOptionError('storage', 'No storage found, please specify a storage using the storage option'); throw new BadOptionError('storage', 'No storage found, please specify a storage using the storage option');
} }
const storage = await storagePlugin.initializeStorage();
const reporter = await getOrLoadReporter([reporterConfig ?? lazyDefaultReporter]); const reporter = await getOrLoadReporter([reporterConfig ?? lazyDefaultReporter]);
if (!reporter) { if (!reporter) {
@ -47,6 +48,16 @@ export default async function removeCommand(
); );
} }
const [storage, storageError] = await exec(async () => storagePlugin.initializeStorage());
if (storageError) {
await reporter.onFinished?.([], new StorageInitError('Could not initialize storage', { cause: storageError }));
return 1;
}
await reporter.onInit?.({ command: 'remove', cwd, dry: false, directory });
const migrationFile = await getMigration(cwd, directory, name, !force); const migrationFile = await getMigration(cwd, directory, name, !force);
const finishedMigrations: MigrationMetadataFinished[] = []; const finishedMigrations: MigrationMetadataFinished[] = [];
@ -59,16 +70,14 @@ export default async function removeCommand(
} }
if (migrationHistoryEntry.status === 'done' && !force) { if (migrationHistoryEntry.status === 'done' && !force) {
throw new OptionNeededError( removalError = new OptionNeededError(
'force', 'force',
`The migration "${migrationFile.name}" is not in a failed state. Use the "force" option to force its removal`, `The migration "${migrationFile.name}" is not in a failed state. Use the "force" option to force its removal`,
); );
} } else {
historyEntry = migrationHistoryEntry; historyEntry = migrationHistoryEntry;
} }
}
await reporter.onInit?.({ command: 'remove', cwd, dry: false, directory });
await reporter.onMigrationRemoveStart?.(migrationFile); await reporter.onMigrationRemoveStart?.(migrationFile);
@ -107,4 +116,8 @@ export default async function removeCommand(
} }
await reporter.onFinished?.(finishedMigrations, removalError); await reporter.onFinished?.(finishedMigrations, removalError);
await storage.end();
return removalError ? 1 : 0;
} }

View file

@ -218,6 +218,7 @@ function getStorage(historyEntries: Array<string | MigrationHistoryEntry>) {
remove: mock.fn(), remove: mock.fn(),
onSuccess: mock.fn(), onSuccess: mock.fn(),
onError: mock.fn(), onError: mock.fn(),
end: mock.fn(),
}; };
return storage; return storage;

View file

@ -15,11 +15,13 @@ import {
MigrationRunError, MigrationRunError,
MissingOptionError, MissingOptionError,
StorageInitError, StorageInitError,
toError,
} from '../errors.js'; } from '../errors.js';
import { type Config } from '../types.js'; import { type Config } from '../types.js';
import { withLeadingPeriod } from '../with-leading-period.js'; import { withLeadingPeriod } from '../with-leading-period.js';
import { getMigrations as getMigrationsOriginal, type GetMigrationsFunction } from '../get-migrations.js'; import { getMigrations as getMigrationsOriginal, type GetMigrationsFunction } from '../get-migrations.js';
import { getDuration } from '../get-duration.js'; import { getDuration } from '../get-duration.js';
import { exec } from '../exec.js';
type ExtraFlags = { type ExtraFlags = {
cwd?: string; cwd?: string;
@ -30,29 +32,6 @@ type ExtraFlags = {
const lazyDefaultReporter = async () => import('../reporters/default.js'); const lazyDefaultReporter = async () => import('../reporters/default.js');
const lazyPluginLoaderJs = async () => import('../plugin-loader-js.js'); const lazyPluginLoaderJs = async () => import('../plugin-loader-js.js');
const toError = (error: unknown) => (error instanceof Error ? error : new Error(String(error)));
type Fn<Args extends any[], Result> = (...args: Args) => Result;
type Result<T> = [value: T, error: undefined] | [value: undefined, error: Error];
/**
* Execute a function and return a result tuple
*
* This is a helper function to make it easier to handle errors without the extra nesting of try/catch
*/
const exec = async <Args extends any[], Return extends Promise<any>>(
fn: Fn<Args, Return>,
...args: Args
): Promise<Result<Awaited<Return>>> => {
try {
const result = await fn(...args);
return [result, undefined];
} catch (error) {
return [undefined, toError(error)];
}
};
export default async function upCommand({ export default async function upCommand({
storage: storageConfig, storage: storageConfig,
reporter: reporterConfig, reporter: reporterConfig,
@ -165,6 +144,8 @@ export default async function upCommand({
new BadOptionError('plugin', `No loader plugin found for file extension: ${extension}`), new BadOptionError('plugin', `No loader plugin found for file extension: ${extension}`),
); );
await storage.end();
return 1; return 1;
} }
} }
@ -191,6 +172,8 @@ export default async function upCommand({
await reporter.onFinished?.([...failedEntries, ...finishedMigrations], error); await reporter.onFinished?.([...failedEntries, ...finishedMigrations], error);
await storage.end();
return failedEntries.length > 0 ? 1 : 0; return failedEntries.length > 0 ? 1 : 0;
} }
@ -207,6 +190,8 @@ export default async function upCommand({
await reporter.onFinished?.([], toError(error)); await reporter.onFinished?.([], toError(error));
await storage.end();
return 1; return 1;
} }
@ -226,7 +211,7 @@ export default async function upCommand({
process.off('SIGINT', cleanup); process.off('SIGINT', cleanup);
process.off('SIGTERM', cleanup); process.off('SIGTERM', cleanup);
cleaningUp = storage.unlock(lockedMigrationFiles); cleaningUp = storage.unlock(lockedMigrationFiles).then(async () => storage.end());
return cleaningUp; return cleaningUp;
}; };

View file

@ -2,6 +2,8 @@ import { type MigrationHistoryEntry, type MigrationMetadata } from '@emigrate/pl
const formatter = new Intl.ListFormat('en', { style: 'long', type: 'disjunction' }); const formatter = new Intl.ListFormat('en', { style: 'long', type: 'disjunction' });
export const toError = (error: unknown) => (error instanceof Error ? error : new Error(String(error)));
export class EmigrateError extends Error { export class EmigrateError extends Error {
constructor( constructor(
public code: string, public code: string,

22
packages/cli/src/exec.ts Normal file
View file

@ -0,0 +1,22 @@
import { toError } from './errors.js';
type Fn<Args extends any[], Result> = (...args: Args) => Result;
type Result<T> = [value: T, error: undefined] | [value: undefined, error: Error];
/**
* Execute a function and return a result tuple
*
* This is a helper function to make it easier to handle errors without the extra nesting of try/catch
*/
export const exec = async <Args extends any[], Return extends Promise<any>>(
fn: Fn<Args, Return>,
...args: Args
): Promise<Result<Awaited<Return>>> => {
try {
const result = await fn(...args);
return [result, undefined];
} catch (error) {
return [undefined, toError(error)];
}
};

View file

@ -167,6 +167,10 @@ export const createMysqlStorage = ({ table = defaultTable, connection }: MysqlSt
try { try {
await initializeTable(pool, table); await initializeTable(pool, table);
} catch (error) {
await pool.end();
throw error;
}
const storage: Storage = { const storage: Storage = {
async lock(migrations) { async lock(migrations) {
@ -219,12 +223,12 @@ export const createMysqlStorage = ({ table = defaultTable, connection }: MysqlSt
async onError(migration, error) { async onError(migration, error) {
await finishMigration(pool, table, { ...migration, status: 'failed', error }); await finishMigration(pool, table, { ...migration, status: 'failed', error });
}, },
async end() {
await pool.end();
},
}; };
return storage; return storage;
} finally {
await pool.end();
}
}, },
}; };
}; };

View file

@ -74,6 +74,12 @@ export type Storage = {
* @param error The error that caused the migration to fail. * @param error The error that caused the migration to fail.
*/ */
onError(migration: MigrationMetadataFinished, error: SerializedError): Promise<void>; onError(migration: MigrationMetadataFinished, error: SerializedError): Promise<void>;
/**
* Called when the command is finished or aborted (e.g. by a SIGTERM or SIGINT signal).
*
* Use this to clean up any resources like database connections or file handles.
*/
end(): Promise<void>;
}; };
export type EmigrateStorage = { export type EmigrateStorage = {

View file

@ -112,6 +112,9 @@ export default function storageFs({ filename }: StorageFsOptions): EmigrateStora
async onError(migration, error) { async onError(migration, error) {
await update(migration.name, 'failed', error); await update(migration.name, 'failed', error);
}, },
async end() {
// Nothing to do
},
}; };
}, },
}; };