feat(cli): add graceful process abort
Using an AbortSignal and Promise.race we abandon running migrations that take longer to complete after the process is aborted than the given abortRespite period
This commit is contained in:
parent
ce15648251
commit
a4da353d5a
17 changed files with 378 additions and 31 deletions
5
.changeset/bright-poems-fold.md
Normal file
5
.changeset/bright-poems-fold.md
Normal file
|
|
@ -0,0 +1,5 @@
|
|||
---
|
||||
'@emigrate/docs': minor
|
||||
---
|
||||
|
||||
Document the --abort-respite CLI option and the corresponding abortRespite config
|
||||
5
.changeset/pink-hairs-kiss.md
Normal file
5
.changeset/pink-hairs-kiss.md
Normal file
|
|
@ -0,0 +1,5 @@
|
|||
---
|
||||
'@emigrate/reporter-pino': minor
|
||||
---
|
||||
|
||||
Handle the new onAbort method
|
||||
5
.changeset/wicked-turkeys-smile.md
Normal file
5
.changeset/wicked-turkeys-smile.md
Normal file
|
|
@ -0,0 +1,5 @@
|
|||
---
|
||||
'@emigrate/cli': minor
|
||||
---
|
||||
|
||||
Handle process interruptions gracefully, e.g. due to receiving a SIGINT or SIGTERM signal. If a migration is currently running when the process is about to shutdown it will have a maximum of 10 more seconds to finish before being deserted (there's no way to cancel a promise sadly, and many database queries are not easy to abort either). The 10 second respite length can be customized using the --abort-respite CLI option or the abortRespite config.
|
||||
|
|
@ -65,6 +65,7 @@ Options:
|
|||
--no-color Disable color output (this option is passed to the reporter)
|
||||
--no-execution Mark the migrations as executed and successful without actually running them,
|
||||
which is useful if you want to mark migrations as successful after running them manually
|
||||
--abort-respite <sec> The number of seconds to wait before abandoning running migrations after the command has been aborted (default: 10)
|
||||
|
||||
Examples:
|
||||
|
||||
|
|
|
|||
|
|
@ -66,6 +66,8 @@ List the pending migrations that would be run without actually running them
|
|||
|
||||
### `-l, --limit <count>`
|
||||
|
||||
**type:** `number`
|
||||
|
||||
Limit the number of migrations to run. Can be combined with `--dry` which will show "pending" for the migrations that would be run if not in dry-run mode,
|
||||
and "skipped" for the migrations that also haven't been run but won't because of the set limit.
|
||||
|
||||
|
|
@ -155,3 +157,10 @@ which is useful if you want to mark migrations as successful after running them
|
|||
:::tip
|
||||
See the <Link href="/guides/baseline/">Baseline guide</Link> for example usage of the `--no-execution` option
|
||||
:::
|
||||
|
||||
### `--abort-respite`
|
||||
|
||||
**type:** `number`
|
||||
**default:** `10`
|
||||
|
||||
Customize the number of seconds to wait before abandoning a running migration when the process is about to shutdown, for instance when the user presses `Ctrl+C` or when the container is being stopped (if running inside a container).
|
||||
|
|
|
|||
|
|
@ -157,3 +157,16 @@ export default {
|
|||
```
|
||||
|
||||
Will create new migration files with the `.ts` extension.
|
||||
|
||||
### `abortRespite`
|
||||
|
||||
**type:** `number`
|
||||
**default:** `10`
|
||||
|
||||
Customize the number of seconds to wait before abandoning a running migration when the process is about to shutdown, for instance when the user presses `Ctrl+C` or when the container is being stopped (if running inside a container).
|
||||
|
||||
```js title="emigrate.config.js" {2}
|
||||
export default {
|
||||
abortRespite: 10,
|
||||
};
|
||||
```
|
||||
|
|
|
|||
|
|
@ -44,6 +44,7 @@ Options:
|
|||
--no-color Disable color output (this option is passed to the reporter)
|
||||
--no-execution Mark the migrations as executed and successful without actually running them,
|
||||
which is useful if you want to mark migrations as successful after running them manually
|
||||
--abort-respite <sec> The number of seconds to wait before abandoning running migrations after the command has been aborted (default: 10)
|
||||
|
||||
Examples:
|
||||
|
||||
|
|
|
|||
|
|
@ -2,10 +2,11 @@
|
|||
import process from 'node:process';
|
||||
import { parseArgs } from 'node:util';
|
||||
import importFromEsm from 'import-from-esm';
|
||||
import { ShowUsageError } from './errors.js';
|
||||
import { CommandAbortError, ShowUsageError } from './errors.js';
|
||||
import { getConfig } from './get-config.js';
|
||||
import { DEFAULT_RESPITE_SECONDS } from './defaults.js';
|
||||
|
||||
type Action = (args: string[]) => Promise<void>;
|
||||
type Action = (args: string[], abortSignal: AbortSignal) => Promise<void>;
|
||||
|
||||
const useColors = (values: { color?: boolean; 'no-color'?: boolean }) => {
|
||||
if (values['no-color']) {
|
||||
|
|
@ -21,7 +22,7 @@ const importAll = async (cwd: string, modules: string[]) => {
|
|||
}
|
||||
};
|
||||
|
||||
const up: Action = async (args) => {
|
||||
const up: Action = async (args, abortSignal) => {
|
||||
const config = await getConfig('up');
|
||||
const { values } = parseArgs({
|
||||
args,
|
||||
|
|
@ -78,6 +79,9 @@ const up: Action = async (args) => {
|
|||
'no-color': {
|
||||
type: 'boolean',
|
||||
},
|
||||
'abort-respite': {
|
||||
type: 'string',
|
||||
},
|
||||
},
|
||||
allowPositionals: false,
|
||||
});
|
||||
|
|
@ -105,6 +109,7 @@ Options:
|
|||
--no-color Disable color output (this option is passed to the reporter)
|
||||
--no-execution Mark the migrations as executed and successful without actually running them,
|
||||
which is useful if you want to mark migrations as successful after running them manually
|
||||
--abort-respite <sec> The number of seconds to wait before abandoning running migrations after the command has been aborted (default: ${DEFAULT_RESPITE_SECONDS})
|
||||
|
||||
Examples:
|
||||
|
||||
|
|
@ -133,11 +138,13 @@ Examples:
|
|||
to,
|
||||
limit: limitString,
|
||||
import: imports = [],
|
||||
'abort-respite': abortRespiteString,
|
||||
'no-execution': noExecution,
|
||||
} = values;
|
||||
const plugins = [...(config.plugins ?? []), ...(values.plugin ?? [])];
|
||||
|
||||
const limit = limitString === undefined ? undefined : Number.parseInt(limitString, 10);
|
||||
const abortRespite = abortRespiteString === undefined ? config.abortRespite : Number.parseInt(abortRespiteString, 10);
|
||||
|
||||
if (Number.isNaN(limit)) {
|
||||
console.error('Invalid limit value, expected an integer but was:', limitString);
|
||||
|
|
@ -146,6 +153,16 @@ Examples:
|
|||
return;
|
||||
}
|
||||
|
||||
if (Number.isNaN(abortRespite)) {
|
||||
console.error(
|
||||
'Invalid abortRespite value, expected an integer but was:',
|
||||
abortRespiteString ?? config.abortRespite,
|
||||
);
|
||||
console.log(usage);
|
||||
process.exitCode = 1;
|
||||
return;
|
||||
}
|
||||
|
||||
await importAll(cwd, imports);
|
||||
|
||||
try {
|
||||
|
|
@ -161,6 +178,8 @@ Examples:
|
|||
from,
|
||||
to,
|
||||
noExecution,
|
||||
abortSignal,
|
||||
abortRespite: (abortRespite ?? DEFAULT_RESPITE_SECONDS) * 1000,
|
||||
color: useColors(values),
|
||||
});
|
||||
} catch (error) {
|
||||
|
|
@ -479,7 +498,7 @@ const commands: Record<string, Action> = {
|
|||
new: newMigration,
|
||||
};
|
||||
|
||||
const main: Action = async (args) => {
|
||||
const main: Action = async (args, abortSignal) => {
|
||||
const { values, positionals } = parseArgs({
|
||||
args,
|
||||
options: {
|
||||
|
|
@ -531,20 +550,43 @@ Commands:
|
|||
return;
|
||||
}
|
||||
|
||||
await action(process.argv.slice(3));
|
||||
try {
|
||||
await action(process.argv.slice(3), abortSignal);
|
||||
} catch (error) {
|
||||
if (error instanceof Error) {
|
||||
console.error(error);
|
||||
if (error.cause instanceof Error) {
|
||||
console.error(error.cause);
|
||||
}
|
||||
} else {
|
||||
console.error(error);
|
||||
}
|
||||
|
||||
process.exitCode = 1;
|
||||
}
|
||||
};
|
||||
|
||||
try {
|
||||
await main(process.argv.slice(2));
|
||||
} catch (error) {
|
||||
if (error instanceof Error) {
|
||||
console.error(error);
|
||||
if (error.cause instanceof Error) {
|
||||
console.error(error.cause);
|
||||
}
|
||||
} else {
|
||||
console.error(error);
|
||||
}
|
||||
const controller = new AbortController();
|
||||
|
||||
process.exitCode = 1;
|
||||
}
|
||||
process.on('SIGINT', () => {
|
||||
controller.abort(CommandAbortError.fromSignal('SIGINT'));
|
||||
});
|
||||
|
||||
process.on('SIGTERM', () => {
|
||||
controller.abort(CommandAbortError.fromSignal('SIGTERM'));
|
||||
});
|
||||
|
||||
process.on('uncaughtException', (error) => {
|
||||
controller.abort(CommandAbortError.fromReason('Uncaught exception', error));
|
||||
});
|
||||
|
||||
process.on('unhandledRejection', (error) => {
|
||||
controller.abort(CommandAbortError.fromReason('Unhandled rejection', error));
|
||||
});
|
||||
|
||||
await main(process.argv.slice(2), controller.signal);
|
||||
|
||||
setTimeout(() => {
|
||||
console.error('Process did not exit within 10 seconds, forcing exit');
|
||||
process.exit(1);
|
||||
}, 10_000).unref();
|
||||
|
|
|
|||
|
|
@ -12,9 +12,16 @@ import {
|
|||
type NonFailedMigrationHistoryEntry,
|
||||
type MigrationMetadataFinished,
|
||||
} from '@emigrate/types';
|
||||
import { deserializeError } from 'serialize-error';
|
||||
import { deserializeError, serializeError } from 'serialize-error';
|
||||
import { version } from '../get-package-info.js';
|
||||
import { BadOptionError, MigrationHistoryError, MigrationRunError, StorageInitError } from '../errors.js';
|
||||
import {
|
||||
BadOptionError,
|
||||
CommandAbortError,
|
||||
ExecutionDesertedError,
|
||||
MigrationHistoryError,
|
||||
MigrationRunError,
|
||||
StorageInitError,
|
||||
} from '../errors.js';
|
||||
import upCommand from './up.js';
|
||||
|
||||
type Mocked<T> = {
|
||||
|
|
@ -481,6 +488,123 @@ describe('up', () => {
|
|||
]);
|
||||
assert.strictEqual(migration.mock.calls.length, 0);
|
||||
});
|
||||
|
||||
describe("aborting the migration process before it's finished", () => {
|
||||
it('returns 1 and finishes with a command abort error when the migration process is aborted prematurely', async () => {
|
||||
const controller = new AbortController();
|
||||
const migration = mock.fn(
|
||||
async () => {
|
||||
// Success on second call, and abort
|
||||
controller.abort(CommandAbortError.fromSignal('SIGINT'));
|
||||
},
|
||||
async () => {
|
||||
// Success on first call
|
||||
},
|
||||
{ times: 1 },
|
||||
);
|
||||
const { reporter, run } = getUpCommand(
|
||||
[
|
||||
'1_some_already_run_migration.js',
|
||||
'2_some_migration.js',
|
||||
'3_another_migration.js',
|
||||
'4_some_other_migration.js',
|
||||
'5_yet_another_migration.js',
|
||||
'6_some_more_migration.js',
|
||||
],
|
||||
getStorage(['1_some_already_run_migration.js']),
|
||||
[
|
||||
{
|
||||
loadableExtensions: ['.js'],
|
||||
async loadMigration() {
|
||||
return migration;
|
||||
},
|
||||
},
|
||||
],
|
||||
);
|
||||
|
||||
const exitCode = await run({
|
||||
abortSignal: controller.signal,
|
||||
});
|
||||
|
||||
assert.strictEqual(exitCode, 1, 'Exit code');
|
||||
assertPreconditionsFulfilled(
|
||||
{ dry: false },
|
||||
reporter,
|
||||
[
|
||||
{ name: '2_some_migration.js', status: 'done', started: true },
|
||||
{ name: '3_another_migration.js', status: 'done', started: true },
|
||||
{ name: '4_some_other_migration.js', status: 'skipped' },
|
||||
{ name: '5_yet_another_migration.js', status: 'skipped' },
|
||||
{ name: '6_some_more_migration.js', status: 'skipped' },
|
||||
],
|
||||
CommandAbortError.fromSignal('SIGINT'),
|
||||
);
|
||||
assert.strictEqual(reporter.onAbort.mock.calls.length, 1);
|
||||
assert.strictEqual(migration.mock.calls.length, 2);
|
||||
});
|
||||
});
|
||||
|
||||
it('returns 1 and finishes with a deserted error with a command abort error as cause when the migration process is aborted prematurely and stops waiting on migrations taking longer than the respite period after the abort', async () => {
|
||||
const controller = new AbortController();
|
||||
const migration = mock.fn(
|
||||
async () => {
|
||||
// Success on second call, and abort
|
||||
controller.abort(CommandAbortError.fromSignal('SIGINT'));
|
||||
return new Promise((resolve) => {
|
||||
setTimeout(resolve, 100); // Take longer than the respite period
|
||||
});
|
||||
},
|
||||
async () => {
|
||||
// Success on first call
|
||||
},
|
||||
{ times: 1 },
|
||||
);
|
||||
const { reporter, run } = getUpCommand(
|
||||
[
|
||||
'1_some_already_run_migration.js',
|
||||
'2_some_migration.js',
|
||||
'3_another_migration.js',
|
||||
'4_some_other_migration.js',
|
||||
'5_yet_another_migration.js',
|
||||
'6_some_more_migration.js',
|
||||
],
|
||||
getStorage(['1_some_already_run_migration.js']),
|
||||
[
|
||||
{
|
||||
loadableExtensions: ['.js'],
|
||||
async loadMigration() {
|
||||
return migration;
|
||||
},
|
||||
},
|
||||
],
|
||||
);
|
||||
|
||||
const exitCode = await run({
|
||||
abortSignal: controller.signal,
|
||||
abortRespite: 10,
|
||||
});
|
||||
|
||||
assert.strictEqual(exitCode, 1, 'Exit code');
|
||||
assertPreconditionsFulfilled(
|
||||
{ dry: false },
|
||||
reporter,
|
||||
[
|
||||
{ name: '2_some_migration.js', status: 'done', started: true },
|
||||
{
|
||||
name: '3_another_migration.js',
|
||||
status: 'failed',
|
||||
started: true,
|
||||
error: ExecutionDesertedError.fromReason('Deserted after 10ms', CommandAbortError.fromSignal('SIGINT')),
|
||||
},
|
||||
{ name: '4_some_other_migration.js', status: 'skipped' },
|
||||
{ name: '5_yet_another_migration.js', status: 'skipped' },
|
||||
{ name: '6_some_more_migration.js', status: 'skipped' },
|
||||
],
|
||||
ExecutionDesertedError.fromReason('Deserted after 10ms', CommandAbortError.fromSignal('SIGINT')),
|
||||
);
|
||||
assert.strictEqual(reporter.onAbort.mock.calls.length, 1);
|
||||
assert.strictEqual(migration.mock.calls.length, 2);
|
||||
});
|
||||
});
|
||||
|
||||
function getErrorCause(error: Error | undefined): Error | SerializedError | undefined {
|
||||
|
|
@ -570,6 +694,7 @@ function getUpCommand(migrationFiles: string[], storage?: Mocked<Storage>, plugi
|
|||
const reporter: Mocked<Required<EmigrateReporter>> = {
|
||||
onFinished: mock.fn(noop),
|
||||
onInit: mock.fn(noop),
|
||||
onAbort: mock.fn(noop),
|
||||
onCollectedMigrations: mock.fn(noop),
|
||||
onLockedMigrations: mock.fn(noop),
|
||||
onNewMigration: mock.fn(noop),
|
||||
|
|
@ -689,7 +814,17 @@ function assertPreconditionsFulfilled(
|
|||
assert.strictEqual(reporter.onMigrationSkip.mock.calls.length, pending + skipped, 'Total pending and skipped');
|
||||
assert.strictEqual(reporter.onFinished.mock.calls.length, 1, 'Finished called once');
|
||||
const [entries, error] = reporter.onFinished.mock.calls[0]?.arguments ?? [];
|
||||
assert.deepStrictEqual(error, finishedError, 'Finished error');
|
||||
if (finishedError instanceof DOMException || error instanceof DOMException) {
|
||||
// The assert library doesn't support DOMException apparently, so ugly workaround here:
|
||||
assert.deepStrictEqual(
|
||||
deserializeError(serializeError(error)),
|
||||
deserializeError(serializeError(finishedError)),
|
||||
'Finished error',
|
||||
);
|
||||
} else {
|
||||
assert.deepStrictEqual(error, finishedError, 'Finished error');
|
||||
}
|
||||
|
||||
const cause = getErrorCause(error);
|
||||
const expectedCause = finishedError?.cause;
|
||||
assert.deepStrictEqual(
|
||||
|
|
|
|||
|
|
@ -19,6 +19,8 @@ type ExtraFlags = {
|
|||
to?: string;
|
||||
noExecution?: boolean;
|
||||
getMigrations?: GetMigrationsFunction;
|
||||
abortSignal?: AbortSignal;
|
||||
abortRespite?: number;
|
||||
};
|
||||
|
||||
const lazyDefaultReporter = async () => import('../reporters/default.js');
|
||||
|
|
@ -33,6 +35,8 @@ export default async function upCommand({
|
|||
from,
|
||||
to,
|
||||
noExecution,
|
||||
abortSignal,
|
||||
abortRespite,
|
||||
dry = false,
|
||||
plugins = [],
|
||||
cwd,
|
||||
|
|
@ -94,6 +98,8 @@ export default async function upCommand({
|
|||
limit,
|
||||
from,
|
||||
to,
|
||||
abortSignal,
|
||||
abortRespite,
|
||||
reporter,
|
||||
storage,
|
||||
migrations: await arrayFromAsync(collectedMigrations),
|
||||
|
|
|
|||
2
packages/cli/src/defaults.ts
Normal file
2
packages/cli/src/defaults.ts
Normal file
|
|
@ -0,0 +1,2 @@
|
|||
// eslint-disable-next-line @typescript-eslint/naming-convention
|
||||
export const DEFAULT_RESPITE_SECONDS = 10;
|
||||
|
|
@ -146,6 +146,30 @@ export class StorageInitError extends EmigrateError {
|
|||
}
|
||||
}
|
||||
|
||||
export class CommandAbortError extends EmigrateError {
|
||||
static fromSignal(signal: NodeJS.Signals) {
|
||||
return new CommandAbortError(`Command aborted due to signal: ${signal}`);
|
||||
}
|
||||
|
||||
static fromReason(reason: string, cause?: unknown) {
|
||||
return new CommandAbortError(`Command aborted: ${reason}`, { cause });
|
||||
}
|
||||
|
||||
constructor(message: string | undefined, options?: ErrorOptions) {
|
||||
super(message, options, 'ERR_COMMAND_ABORT');
|
||||
}
|
||||
}
|
||||
|
||||
export class ExecutionDesertedError extends EmigrateError {
|
||||
static fromReason(reason: string, cause?: Error) {
|
||||
return new ExecutionDesertedError(`Execution deserted: ${reason}`, { cause });
|
||||
}
|
||||
|
||||
constructor(message: string | undefined, options?: ErrorOptions) {
|
||||
super(message, options, 'ERR_EXECUTION_DESERTED');
|
||||
}
|
||||
}
|
||||
|
||||
errorConstructors.set('EmigrateError', EmigrateError as ErrorConstructor);
|
||||
errorConstructors.set('ShowUsageError', ShowUsageError as ErrorConstructor);
|
||||
errorConstructors.set('MissingOptionError', MissingOptionError as unknown as ErrorConstructor);
|
||||
|
|
@ -158,3 +182,5 @@ errorConstructors.set('MigrationLoadError', MigrationLoadError as unknown as Err
|
|||
errorConstructors.set('MigrationRunError', MigrationRunError as unknown as ErrorConstructor);
|
||||
errorConstructors.set('MigrationNotRunError', MigrationNotRunError as unknown as ErrorConstructor);
|
||||
errorConstructors.set('StorageInitError', StorageInitError as unknown as ErrorConstructor);
|
||||
errorConstructors.set('CommandAbortError', CommandAbortError as unknown as ErrorConstructor);
|
||||
errorConstructors.set('ExecutionDesertedError', ExecutionDesertedError as unknown as ErrorConstructor);
|
||||
|
|
|
|||
|
|
@ -1,22 +1,65 @@
|
|||
import { toError } from './errors.js';
|
||||
import prettyMs from 'pretty-ms';
|
||||
import { ExecutionDesertedError, toError } from './errors.js';
|
||||
import { DEFAULT_RESPITE_SECONDS } from './defaults.js';
|
||||
|
||||
type Fn<Args extends any[], Result> = (...args: Args) => Result;
|
||||
type Result<T> = [value: T, error: undefined] | [value: undefined, error: Error];
|
||||
|
||||
type ExecOptions = {
|
||||
abortSignal?: AbortSignal;
|
||||
abortRespite?: number;
|
||||
};
|
||||
|
||||
/**
|
||||
* Execute a function and return a result tuple
|
||||
*
|
||||
* This is a helper function to make it easier to handle errors without the extra nesting of try/catch
|
||||
* If an abort signal is provided the function will reject with an ExecutionDesertedError if the signal is aborted
|
||||
* and the given function has not yet resolved within the given respite time (or a default of 30 seconds)
|
||||
*
|
||||
* @param fn The function to execute
|
||||
* @param options Options for the execution
|
||||
*/
|
||||
export const exec = async <Args extends any[], Return extends Promise<any>>(
|
||||
fn: Fn<Args, Return>,
|
||||
...args: Args
|
||||
export const exec = async <Return extends Promise<any>>(
|
||||
fn: () => Return,
|
||||
options: ExecOptions = {},
|
||||
): Promise<Result<Awaited<Return>>> => {
|
||||
try {
|
||||
const result = await fn(...args);
|
||||
const aborter = options.abortSignal ? getAborter(options.abortSignal, options.abortRespite) : undefined;
|
||||
const result = await Promise.race(aborter ? [aborter, fn()] : [fn()]);
|
||||
|
||||
return [result, undefined];
|
||||
} catch (error) {
|
||||
return [undefined, toError(error)];
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Returns a promise that rejects after a given time after the given signal is aborted
|
||||
*
|
||||
* @param signal The abort signal to listen to
|
||||
* @param respite The time in milliseconds to wait before rejecting
|
||||
*/
|
||||
const getAborter = async (signal: AbortSignal, respite = DEFAULT_RESPITE_SECONDS * 1000): Promise<never> => {
|
||||
return new Promise((_, reject) => {
|
||||
if (signal.aborted) {
|
||||
setTimeout(
|
||||
reject,
|
||||
respite,
|
||||
ExecutionDesertedError.fromReason(`Deserted after ${prettyMs(respite)}`, toError(signal.reason)),
|
||||
).unref();
|
||||
return;
|
||||
}
|
||||
|
||||
signal.addEventListener(
|
||||
'abort',
|
||||
() => {
|
||||
setTimeout(
|
||||
reject,
|
||||
respite,
|
||||
ExecutionDesertedError.fromReason(`Deserted after ${prettyMs(respite)}`, toError(signal.reason)),
|
||||
).unref();
|
||||
},
|
||||
{ once: true },
|
||||
);
|
||||
});
|
||||
};
|
||||
|
|
|
|||
|
|
@ -18,6 +18,8 @@ type MigrationRunnerParameters = {
|
|||
limit?: number;
|
||||
from?: string;
|
||||
to?: string;
|
||||
abortSignal?: AbortSignal;
|
||||
abortRespite?: number;
|
||||
reporter: EmigrateReporter;
|
||||
storage: Storage;
|
||||
migrations: Array<MigrationMetadata | MigrationMetadataFinished>;
|
||||
|
|
@ -30,6 +32,8 @@ export const migrationRunner = async ({
|
|||
limit,
|
||||
from,
|
||||
to,
|
||||
abortSignal,
|
||||
abortRespite,
|
||||
reporter,
|
||||
storage,
|
||||
migrations,
|
||||
|
|
@ -43,6 +47,22 @@ export const migrationRunner = async ({
|
|||
|
||||
let skip = false;
|
||||
|
||||
abortSignal?.addEventListener(
|
||||
'abort',
|
||||
() => {
|
||||
skip = true;
|
||||
reporter.onAbort?.(toError(abortSignal.reason))?.then(
|
||||
() => {
|
||||
/* noop */
|
||||
},
|
||||
() => {
|
||||
/* noop */
|
||||
},
|
||||
);
|
||||
},
|
||||
{ once: true },
|
||||
);
|
||||
|
||||
for await (const migration of migrations) {
|
||||
if (isFinishedMigration(migration)) {
|
||||
skip ||= migration.status === 'failed' || migration.status === 'skipped';
|
||||
|
|
@ -89,7 +109,7 @@ export const migrationRunner = async ({
|
|||
|
||||
const [lockedMigrations, lockError] = dry
|
||||
? [migrationsToLock]
|
||||
: await exec(async () => storage.lock(migrationsToLock));
|
||||
: await exec(async () => storage.lock(migrationsToLock), { abortSignal, abortRespite });
|
||||
|
||||
if (lockError) {
|
||||
for (const migration of migrationsToLock) {
|
||||
|
|
@ -167,7 +187,7 @@ export const migrationRunner = async ({
|
|||
|
||||
const start = hrtime();
|
||||
|
||||
const [, migrationError] = await exec(async () => execute(migration));
|
||||
const [, migrationError] = await exec(async () => execute(migration), { abortSignal, abortRespite });
|
||||
|
||||
const duration = getDuration(start);
|
||||
|
||||
|
|
@ -194,7 +214,9 @@ export const migrationRunner = async ({
|
|||
}
|
||||
}
|
||||
|
||||
const [, unlockError] = dry ? [] : await exec(async () => storage.unlock(lockedMigrations ?? []));
|
||||
const [, unlockError] = dry
|
||||
? []
|
||||
: await exec(async () => storage.unlock(lockedMigrations ?? []), { abortSignal, abortRespite });
|
||||
|
||||
// eslint-disable-next-line unicorn/no-array-callback-reference
|
||||
const firstFailed = finishedMigrations.find(isFailedMigration);
|
||||
|
|
@ -204,7 +226,8 @@ export const migrationRunner = async ({
|
|||
: firstFailed
|
||||
? MigrationRunError.fromMetadata(firstFailed)
|
||||
: undefined;
|
||||
const error = unlockError ?? firstError ?? lockError;
|
||||
const error =
|
||||
unlockError ?? firstError ?? lockError ?? (abortSignal?.aborted ? toError(abortSignal.reason) : undefined);
|
||||
|
||||
await reporter.onFinished?.(finishedMigrations, error);
|
||||
|
||||
|
|
|
|||
|
|
@ -165,6 +165,20 @@ const getError = (error?: ErrorLike, indent = ' ') => {
|
|||
return parts.join('\n');
|
||||
};
|
||||
|
||||
const getAbortMessage = (reason?: Error) => {
|
||||
if (!reason) {
|
||||
return '';
|
||||
}
|
||||
|
||||
const parts = [` ${red.bold(reason.message)}`];
|
||||
|
||||
if (isErrorLike(reason.cause)) {
|
||||
parts.push(getError(reason.cause, ' '));
|
||||
}
|
||||
|
||||
return parts.join('\n');
|
||||
};
|
||||
|
||||
const getSummary = (
|
||||
command: ReporterInitParameters['command'],
|
||||
migrations: Array<MigrationMetadata | MigrationMetadataFinished> = [],
|
||||
|
|
@ -281,6 +295,7 @@ class DefaultFancyReporter implements Required<EmigrateReporter> {
|
|||
#error: Error | undefined;
|
||||
#parameters!: ReporterInitParameters;
|
||||
#interval: NodeJS.Timeout | undefined;
|
||||
#abortReason: Error | undefined;
|
||||
|
||||
onInit(parameters: ReporterInitParameters): void | PromiseLike<void> {
|
||||
this.#parameters = parameters;
|
||||
|
|
@ -288,6 +303,10 @@ class DefaultFancyReporter implements Required<EmigrateReporter> {
|
|||
this.#start();
|
||||
}
|
||||
|
||||
onAbort(reason: Error): void | PromiseLike<void> {
|
||||
this.#abortReason = reason;
|
||||
}
|
||||
|
||||
onCollectedMigrations(migrations: MigrationMetadata[]): void | PromiseLike<void> {
|
||||
this.#migrations = migrations;
|
||||
}
|
||||
|
|
@ -358,6 +377,7 @@ class DefaultFancyReporter implements Required<EmigrateReporter> {
|
|||
getTitle(this.#parameters),
|
||||
getHeaderMessage(this.#parameters.command, this.#migrations, this.#lockedMigrations),
|
||||
this.#migrations?.map((migration) => getMigrationText(migration, this.#activeMigration)).join('\n') ?? '',
|
||||
getAbortMessage(this.#abortReason),
|
||||
getSummary(this.#parameters.command, this.#migrations),
|
||||
getError(this.#error),
|
||||
];
|
||||
|
|
@ -403,6 +423,12 @@ class DefaultReporter implements Required<EmigrateReporter> {
|
|||
console.log('');
|
||||
}
|
||||
|
||||
onAbort(reason: Error): void | PromiseLike<void> {
|
||||
console.log('');
|
||||
console.error(getAbortMessage(reason));
|
||||
console.log('');
|
||||
}
|
||||
|
||||
onCollectedMigrations(migrations: MigrationMetadata[]): void | PromiseLike<void> {
|
||||
this.#migrations = migrations;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ export type Config = {
|
|||
template?: string;
|
||||
extension?: string;
|
||||
color?: boolean;
|
||||
abortRespite?: number;
|
||||
};
|
||||
|
||||
export type EmigrateConfig = Config & {
|
||||
|
|
|
|||
|
|
@ -57,6 +57,10 @@ class PinoReporter implements Required<EmigrateReporter> {
|
|||
this.#logger.info({ parameters }, `Emigrate "${command}" initialized${parameters.dry ? ' (dry-run)' : ''}`);
|
||||
}
|
||||
|
||||
onAbort(reason: Error): Awaitable<void> {
|
||||
this.#logger.error({ reason }, `Emigrate "${this.#command}" shutting down`);
|
||||
}
|
||||
|
||||
onCollectedMigrations(migrations: MigrationMetadata[]): Awaitable<void> {
|
||||
this.#migrations = migrations;
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue