feat(storage): add "end" method to storage for cleaning up resources when commands are finished
This commit is contained in:
parent
334e2099bb
commit
703e6f028a
15 changed files with 150 additions and 77 deletions
5
.changeset/calm-grapes-arrive.md
Normal file
5
.changeset/calm-grapes-arrive.md
Normal file
|
|
@ -0,0 +1,5 @@
|
||||||
|
---
|
||||||
|
'@emigrate/plugin-tools': minor
|
||||||
|
---
|
||||||
|
|
||||||
|
Add "end" method to storage plugins so they can cleanup resources when a command is finished
|
||||||
5
.changeset/eight-moles-tell.md
Normal file
5
.changeset/eight-moles-tell.md
Normal file
|
|
@ -0,0 +1,5 @@
|
||||||
|
---
|
||||||
|
'@emigrate/mysql': patch
|
||||||
|
---
|
||||||
|
|
||||||
|
Fix issue with closing the connection pool too early
|
||||||
5
.changeset/healthy-flies-divide.md
Normal file
5
.changeset/healthy-flies-divide.md
Normal file
|
|
@ -0,0 +1,5 @@
|
||||||
|
---
|
||||||
|
'@emigrate/storage-fs': minor
|
||||||
|
---
|
||||||
|
|
||||||
|
Implement an empty "end" method for cleaning up
|
||||||
5
.changeset/large-plants-smoke.md
Normal file
5
.changeset/large-plants-smoke.md
Normal file
|
|
@ -0,0 +1,5 @@
|
||||||
|
---
|
||||||
|
'@emigrate/cli': patch
|
||||||
|
---
|
||||||
|
|
||||||
|
Handle storage initialization errors in the "list" and "remove" commands
|
||||||
5
.changeset/thick-forks-knock.md
Normal file
5
.changeset/thick-forks-knock.md
Normal file
|
|
@ -0,0 +1,5 @@
|
||||||
|
---
|
||||||
|
'@emigrate/cli': minor
|
||||||
|
---
|
||||||
|
|
||||||
|
Call storage.end() to cleanup resources when a command has finished
|
||||||
|
|
@ -230,7 +230,7 @@ Examples:
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const { default: listCommand } = await import('./commands/list.js');
|
const { default: listCommand } = await import('./commands/list.js');
|
||||||
await listCommand({ directory, storage, reporter });
|
process.exitCode = await listCommand({ directory, storage, reporter });
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
if (error instanceof ShowUsageError) {
|
if (error instanceof ShowUsageError) {
|
||||||
console.error(error.message, '\n');
|
console.error(error.message, '\n');
|
||||||
|
|
@ -306,7 +306,7 @@ Examples:
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const { default: removeCommand } = await import('./commands/remove.js');
|
const { default: removeCommand } = await import('./commands/remove.js');
|
||||||
await removeCommand({ directory, storage, reporter, force }, positionals[0] ?? '');
|
process.exitCode = await removeCommand({ directory, storage, reporter, force }, positionals[0] ?? '');
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
if (error instanceof ShowUsageError) {
|
if (error instanceof ShowUsageError) {
|
||||||
console.error(error.message, '\n');
|
console.error(error.message, '\n');
|
||||||
|
|
|
||||||
|
|
@ -2,10 +2,11 @@ import process from 'node:process';
|
||||||
import path from 'node:path';
|
import path from 'node:path';
|
||||||
import { getOrLoadReporter, getOrLoadStorage } from '@emigrate/plugin-tools';
|
import { getOrLoadReporter, getOrLoadStorage } from '@emigrate/plugin-tools';
|
||||||
import { type MigrationMetadataFinished } from '@emigrate/plugin-tools/types';
|
import { type MigrationMetadataFinished } from '@emigrate/plugin-tools/types';
|
||||||
import { BadOptionError, MigrationHistoryError, MissingOptionError } from '../errors.js';
|
import { BadOptionError, MigrationHistoryError, MissingOptionError, StorageInitError } from '../errors.js';
|
||||||
import { type Config } from '../types.js';
|
import { type Config } from '../types.js';
|
||||||
import { withLeadingPeriod } from '../with-leading-period.js';
|
import { withLeadingPeriod } from '../with-leading-period.js';
|
||||||
import { getMigrations } from '../get-migrations.js';
|
import { getMigrations } from '../get-migrations.js';
|
||||||
|
import { exec } from '../exec.js';
|
||||||
|
|
||||||
const lazyDefaultReporter = async () => import('../reporters/default.js');
|
const lazyDefaultReporter = async () => import('../reporters/default.js');
|
||||||
|
|
||||||
|
|
@ -21,7 +22,6 @@ export default async function listCommand({ directory, reporter: reporterConfig,
|
||||||
throw new BadOptionError('storage', 'No storage found, please specify a storage using the storage option');
|
throw new BadOptionError('storage', 'No storage found, please specify a storage using the storage option');
|
||||||
}
|
}
|
||||||
|
|
||||||
const storage = await storagePlugin.initializeStorage();
|
|
||||||
const reporter = await getOrLoadReporter([reporterConfig ?? lazyDefaultReporter]);
|
const reporter = await getOrLoadReporter([reporterConfig ?? lazyDefaultReporter]);
|
||||||
|
|
||||||
if (!reporter) {
|
if (!reporter) {
|
||||||
|
|
@ -33,6 +33,14 @@ export default async function listCommand({ directory, reporter: reporterConfig,
|
||||||
|
|
||||||
await reporter.onInit?.({ command: 'list', cwd, dry: false, directory });
|
await reporter.onInit?.({ command: 'list', cwd, dry: false, directory });
|
||||||
|
|
||||||
|
const [storage, storageError] = await exec(async () => storagePlugin.initializeStorage());
|
||||||
|
|
||||||
|
if (storageError) {
|
||||||
|
await reporter.onFinished?.([], new StorageInitError('Could not initialize storage', { cause: storageError }));
|
||||||
|
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
const migrationFiles = await getMigrations(cwd, directory);
|
const migrationFiles = await getMigrations(cwd, directory);
|
||||||
|
|
||||||
let migrationHistoryError: MigrationHistoryError | undefined;
|
let migrationHistoryError: MigrationHistoryError | undefined;
|
||||||
|
|
@ -82,4 +90,8 @@ export default async function listCommand({ directory, reporter: reporterConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
await reporter.onFinished?.(finishedMigrations, migrationHistoryError);
|
await reporter.onFinished?.(finishedMigrations, migrationHistoryError);
|
||||||
|
|
||||||
|
await storage.end();
|
||||||
|
|
||||||
|
return migrationHistoryError ? 1 : 0;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -7,10 +7,12 @@ import {
|
||||||
MissingArgumentsError,
|
MissingArgumentsError,
|
||||||
MissingOptionError,
|
MissingOptionError,
|
||||||
OptionNeededError,
|
OptionNeededError,
|
||||||
|
StorageInitError,
|
||||||
} from '../errors.js';
|
} from '../errors.js';
|
||||||
import { type Config } from '../types.js';
|
import { type Config } from '../types.js';
|
||||||
import { getMigration } from '../get-migration.js';
|
import { getMigration } from '../get-migration.js';
|
||||||
import { getDuration } from '../get-duration.js';
|
import { getDuration } from '../get-duration.js';
|
||||||
|
import { exec } from '../exec.js';
|
||||||
|
|
||||||
type ExtraFlags = {
|
type ExtraFlags = {
|
||||||
force?: boolean;
|
force?: boolean;
|
||||||
|
|
@ -37,7 +39,6 @@ export default async function removeCommand(
|
||||||
throw new BadOptionError('storage', 'No storage found, please specify a storage using the storage option');
|
throw new BadOptionError('storage', 'No storage found, please specify a storage using the storage option');
|
||||||
}
|
}
|
||||||
|
|
||||||
const storage = await storagePlugin.initializeStorage();
|
|
||||||
const reporter = await getOrLoadReporter([reporterConfig ?? lazyDefaultReporter]);
|
const reporter = await getOrLoadReporter([reporterConfig ?? lazyDefaultReporter]);
|
||||||
|
|
||||||
if (!reporter) {
|
if (!reporter) {
|
||||||
|
|
@ -47,6 +48,16 @@ export default async function removeCommand(
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const [storage, storageError] = await exec(async () => storagePlugin.initializeStorage());
|
||||||
|
|
||||||
|
if (storageError) {
|
||||||
|
await reporter.onFinished?.([], new StorageInitError('Could not initialize storage', { cause: storageError }));
|
||||||
|
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
await reporter.onInit?.({ command: 'remove', cwd, dry: false, directory });
|
||||||
|
|
||||||
const migrationFile = await getMigration(cwd, directory, name, !force);
|
const migrationFile = await getMigration(cwd, directory, name, !force);
|
||||||
|
|
||||||
const finishedMigrations: MigrationMetadataFinished[] = [];
|
const finishedMigrations: MigrationMetadataFinished[] = [];
|
||||||
|
|
@ -59,17 +70,15 @@ export default async function removeCommand(
|
||||||
}
|
}
|
||||||
|
|
||||||
if (migrationHistoryEntry.status === 'done' && !force) {
|
if (migrationHistoryEntry.status === 'done' && !force) {
|
||||||
throw new OptionNeededError(
|
removalError = new OptionNeededError(
|
||||||
'force',
|
'force',
|
||||||
`The migration "${migrationFile.name}" is not in a failed state. Use the "force" option to force its removal`,
|
`The migration "${migrationFile.name}" is not in a failed state. Use the "force" option to force its removal`,
|
||||||
);
|
);
|
||||||
|
} else {
|
||||||
|
historyEntry = migrationHistoryEntry;
|
||||||
}
|
}
|
||||||
|
|
||||||
historyEntry = migrationHistoryEntry;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
await reporter.onInit?.({ command: 'remove', cwd, dry: false, directory });
|
|
||||||
|
|
||||||
await reporter.onMigrationRemoveStart?.(migrationFile);
|
await reporter.onMigrationRemoveStart?.(migrationFile);
|
||||||
|
|
||||||
const start = process.hrtime();
|
const start = process.hrtime();
|
||||||
|
|
@ -107,4 +116,8 @@ export default async function removeCommand(
|
||||||
}
|
}
|
||||||
|
|
||||||
await reporter.onFinished?.(finishedMigrations, removalError);
|
await reporter.onFinished?.(finishedMigrations, removalError);
|
||||||
|
|
||||||
|
await storage.end();
|
||||||
|
|
||||||
|
return removalError ? 1 : 0;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -218,6 +218,7 @@ function getStorage(historyEntries: Array<string | MigrationHistoryEntry>) {
|
||||||
remove: mock.fn(),
|
remove: mock.fn(),
|
||||||
onSuccess: mock.fn(),
|
onSuccess: mock.fn(),
|
||||||
onError: mock.fn(),
|
onError: mock.fn(),
|
||||||
|
end: mock.fn(),
|
||||||
};
|
};
|
||||||
|
|
||||||
return storage;
|
return storage;
|
||||||
|
|
|
||||||
|
|
@ -15,11 +15,13 @@ import {
|
||||||
MigrationRunError,
|
MigrationRunError,
|
||||||
MissingOptionError,
|
MissingOptionError,
|
||||||
StorageInitError,
|
StorageInitError,
|
||||||
|
toError,
|
||||||
} from '../errors.js';
|
} from '../errors.js';
|
||||||
import { type Config } from '../types.js';
|
import { type Config } from '../types.js';
|
||||||
import { withLeadingPeriod } from '../with-leading-period.js';
|
import { withLeadingPeriod } from '../with-leading-period.js';
|
||||||
import { getMigrations as getMigrationsOriginal, type GetMigrationsFunction } from '../get-migrations.js';
|
import { getMigrations as getMigrationsOriginal, type GetMigrationsFunction } from '../get-migrations.js';
|
||||||
import { getDuration } from '../get-duration.js';
|
import { getDuration } from '../get-duration.js';
|
||||||
|
import { exec } from '../exec.js';
|
||||||
|
|
||||||
type ExtraFlags = {
|
type ExtraFlags = {
|
||||||
cwd?: string;
|
cwd?: string;
|
||||||
|
|
@ -30,29 +32,6 @@ type ExtraFlags = {
|
||||||
const lazyDefaultReporter = async () => import('../reporters/default.js');
|
const lazyDefaultReporter = async () => import('../reporters/default.js');
|
||||||
const lazyPluginLoaderJs = async () => import('../plugin-loader-js.js');
|
const lazyPluginLoaderJs = async () => import('../plugin-loader-js.js');
|
||||||
|
|
||||||
const toError = (error: unknown) => (error instanceof Error ? error : new Error(String(error)));
|
|
||||||
|
|
||||||
type Fn<Args extends any[], Result> = (...args: Args) => Result;
|
|
||||||
type Result<T> = [value: T, error: undefined] | [value: undefined, error: Error];
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Execute a function and return a result tuple
|
|
||||||
*
|
|
||||||
* This is a helper function to make it easier to handle errors without the extra nesting of try/catch
|
|
||||||
*/
|
|
||||||
const exec = async <Args extends any[], Return extends Promise<any>>(
|
|
||||||
fn: Fn<Args, Return>,
|
|
||||||
...args: Args
|
|
||||||
): Promise<Result<Awaited<Return>>> => {
|
|
||||||
try {
|
|
||||||
const result = await fn(...args);
|
|
||||||
|
|
||||||
return [result, undefined];
|
|
||||||
} catch (error) {
|
|
||||||
return [undefined, toError(error)];
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
export default async function upCommand({
|
export default async function upCommand({
|
||||||
storage: storageConfig,
|
storage: storageConfig,
|
||||||
reporter: reporterConfig,
|
reporter: reporterConfig,
|
||||||
|
|
@ -165,6 +144,8 @@ export default async function upCommand({
|
||||||
new BadOptionError('plugin', `No loader plugin found for file extension: ${extension}`),
|
new BadOptionError('plugin', `No loader plugin found for file extension: ${extension}`),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
await storage.end();
|
||||||
|
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -191,6 +172,8 @@ export default async function upCommand({
|
||||||
|
|
||||||
await reporter.onFinished?.([...failedEntries, ...finishedMigrations], error);
|
await reporter.onFinished?.([...failedEntries, ...finishedMigrations], error);
|
||||||
|
|
||||||
|
await storage.end();
|
||||||
|
|
||||||
return failedEntries.length > 0 ? 1 : 0;
|
return failedEntries.length > 0 ? 1 : 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -207,6 +190,8 @@ export default async function upCommand({
|
||||||
|
|
||||||
await reporter.onFinished?.([], toError(error));
|
await reporter.onFinished?.([], toError(error));
|
||||||
|
|
||||||
|
await storage.end();
|
||||||
|
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -226,7 +211,7 @@ export default async function upCommand({
|
||||||
process.off('SIGINT', cleanup);
|
process.off('SIGINT', cleanup);
|
||||||
process.off('SIGTERM', cleanup);
|
process.off('SIGTERM', cleanup);
|
||||||
|
|
||||||
cleaningUp = storage.unlock(lockedMigrationFiles);
|
cleaningUp = storage.unlock(lockedMigrationFiles).then(async () => storage.end());
|
||||||
|
|
||||||
return cleaningUp;
|
return cleaningUp;
|
||||||
};
|
};
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,8 @@ import { type MigrationHistoryEntry, type MigrationMetadata } from '@emigrate/pl
|
||||||
|
|
||||||
const formatter = new Intl.ListFormat('en', { style: 'long', type: 'disjunction' });
|
const formatter = new Intl.ListFormat('en', { style: 'long', type: 'disjunction' });
|
||||||
|
|
||||||
|
export const toError = (error: unknown) => (error instanceof Error ? error : new Error(String(error)));
|
||||||
|
|
||||||
export class EmigrateError extends Error {
|
export class EmigrateError extends Error {
|
||||||
constructor(
|
constructor(
|
||||||
public code: string,
|
public code: string,
|
||||||
|
|
|
||||||
22
packages/cli/src/exec.ts
Normal file
22
packages/cli/src/exec.ts
Normal file
|
|
@ -0,0 +1,22 @@
|
||||||
|
import { toError } from './errors.js';
|
||||||
|
|
||||||
|
type Fn<Args extends any[], Result> = (...args: Args) => Result;
|
||||||
|
type Result<T> = [value: T, error: undefined] | [value: undefined, error: Error];
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute a function and return a result tuple
|
||||||
|
*
|
||||||
|
* This is a helper function to make it easier to handle errors without the extra nesting of try/catch
|
||||||
|
*/
|
||||||
|
export const exec = async <Args extends any[], Return extends Promise<any>>(
|
||||||
|
fn: Fn<Args, Return>,
|
||||||
|
...args: Args
|
||||||
|
): Promise<Result<Awaited<Return>>> => {
|
||||||
|
try {
|
||||||
|
const result = await fn(...args);
|
||||||
|
|
||||||
|
return [result, undefined];
|
||||||
|
} catch (error) {
|
||||||
|
return [undefined, toError(error)];
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
@ -167,30 +167,34 @@ export const createMysqlStorage = ({ table = defaultTable, connection }: MysqlSt
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await initializeTable(pool, table);
|
await initializeTable(pool, table);
|
||||||
|
} catch (error) {
|
||||||
|
await pool.end();
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
|
||||||
const storage: Storage = {
|
const storage: Storage = {
|
||||||
async lock(migrations) {
|
async lock(migrations) {
|
||||||
const lockedMigrations: MigrationMetadata[] = [];
|
const lockedMigrations: MigrationMetadata[] = [];
|
||||||
|
|
||||||
for await (const migration of migrations) {
|
for await (const migration of migrations) {
|
||||||
if (await lockMigration(pool, table, migration)) {
|
if (await lockMigration(pool, table, migration)) {
|
||||||
lockedMigrations.push(migration);
|
lockedMigrations.push(migration);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return lockedMigrations;
|
return lockedMigrations;
|
||||||
},
|
},
|
||||||
async unlock(migrations) {
|
async unlock(migrations) {
|
||||||
for await (const migration of migrations) {
|
for await (const migration of migrations) {
|
||||||
await unlockMigration(pool, table, migration);
|
await unlockMigration(pool, table, migration);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
async remove(migration) {
|
async remove(migration) {
|
||||||
await deleteMigration(pool, table, migration);
|
await deleteMigration(pool, table, migration);
|
||||||
},
|
},
|
||||||
async *getHistory() {
|
async *getHistory() {
|
||||||
const [rows] = await pool.execute<Array<RowDataPacket & HistoryEntry>>({
|
const [rows] = await pool.execute<Array<RowDataPacket & HistoryEntry>>({
|
||||||
sql: `
|
sql: `
|
||||||
SELECT
|
SELECT
|
||||||
*
|
*
|
||||||
FROM
|
FROM
|
||||||
|
|
@ -200,31 +204,31 @@ export const createMysqlStorage = ({ table = defaultTable, connection }: MysqlSt
|
||||||
ORDER BY
|
ORDER BY
|
||||||
date ASC
|
date ASC
|
||||||
`,
|
`,
|
||||||
values: ['locked'],
|
values: ['locked'],
|
||||||
});
|
});
|
||||||
|
|
||||||
for (const row of rows) {
|
for (const row of rows) {
|
||||||
yield {
|
yield {
|
||||||
name: row.name,
|
name: row.name,
|
||||||
status: row.status,
|
status: row.status,
|
||||||
date: new Date(row.date),
|
date: new Date(row.date),
|
||||||
// FIXME: Migrate the migrations table to support the error column
|
// FIXME: Migrate the migrations table to support the error column
|
||||||
error: row.status === 'failed' ? new Error('Unknown error reason') : undefined,
|
error: row.status === 'failed' ? new Error('Unknown error reason') : undefined,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
async onSuccess(migration) {
|
async onSuccess(migration) {
|
||||||
await finishMigration(pool, table, migration);
|
await finishMigration(pool, table, migration);
|
||||||
},
|
},
|
||||||
async onError(migration, error) {
|
async onError(migration, error) {
|
||||||
await finishMigration(pool, table, { ...migration, status: 'failed', error });
|
await finishMigration(pool, table, { ...migration, status: 'failed', error });
|
||||||
},
|
},
|
||||||
};
|
async end() {
|
||||||
|
await pool.end();
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
return storage;
|
return storage;
|
||||||
} finally {
|
|
||||||
await pool.end();
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
|
|
|
||||||
|
|
@ -74,6 +74,12 @@ export type Storage = {
|
||||||
* @param error The error that caused the migration to fail.
|
* @param error The error that caused the migration to fail.
|
||||||
*/
|
*/
|
||||||
onError(migration: MigrationMetadataFinished, error: SerializedError): Promise<void>;
|
onError(migration: MigrationMetadataFinished, error: SerializedError): Promise<void>;
|
||||||
|
/**
|
||||||
|
* Called when the command is finished or aborted (e.g. by a SIGTERM or SIGINT signal).
|
||||||
|
*
|
||||||
|
* Use this to clean up any resources like database connections or file handles.
|
||||||
|
*/
|
||||||
|
end(): Promise<void>;
|
||||||
};
|
};
|
||||||
|
|
||||||
export type EmigrateStorage = {
|
export type EmigrateStorage = {
|
||||||
|
|
|
||||||
|
|
@ -112,6 +112,9 @@ export default function storageFs({ filename }: StorageFsOptions): EmigrateStora
|
||||||
async onError(migration, error) {
|
async onError(migration, error) {
|
||||||
await update(migration.name, 'failed', error);
|
await update(migration.name, 'failed', error);
|
||||||
},
|
},
|
||||||
|
async end() {
|
||||||
|
// Nothing to do
|
||||||
|
},
|
||||||
};
|
};
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue