feat(cli): add graceful process abort

Using an AbortSignal and Promise.race we abandon running migrations that take longer to complete after the process is aborted than the given abortRespite period
This commit is contained in:
Joakim Carlstein 2024-01-22 10:53:01 +01:00 committed by Joakim Carlstein
parent ce15648251
commit a4da353d5a
17 changed files with 378 additions and 31 deletions

View file

@ -0,0 +1,5 @@
---
'@emigrate/docs': minor
---
Document the --abort-respite CLI option and the corresponding abortRespite config

View file

@ -0,0 +1,5 @@
---
'@emigrate/reporter-pino': minor
---
Handle the new onAbort method

View file

@ -0,0 +1,5 @@
---
'@emigrate/cli': minor
---
Handle process interruptions gracefully, e.g. due to receiving a SIGINT or SIGTERM signal. If a migration is currently running when the process is about to shutdown it will have a maximum of 10 more seconds to finish before being deserted (there's no way to cancel a promise sadly, and many database queries are not easy to abort either). The 10 second respite length can be customized using the --abort-respite CLI option or the abortRespite config.

View file

@ -65,6 +65,7 @@ Options:
--no-color Disable color output (this option is passed to the reporter)
--no-execution Mark the migrations as executed and successful without actually running them,
which is useful if you want to mark migrations as successful after running them manually
--abort-respite <sec> The number of seconds to wait before abandoning running migrations after the command has been aborted (default: 10)
Examples:

View file

@ -66,6 +66,8 @@ List the pending migrations that would be run without actually running them
### `-l, --limit <count>`
**type:** `number`
Limit the number of migrations to run. Can be combined with `--dry` which will show "pending" for the migrations that would be run if not in dry-run mode,
and "skipped" for the migrations that also haven't been run but won't because of the set limit.
@ -155,3 +157,10 @@ which is useful if you want to mark migrations as successful after running them
:::tip
See the <Link href="/guides/baseline/">Baseline guide</Link> for example usage of the `--no-execution` option
:::
### `--abort-respite`
**type:** `number`
**default:** `10`
Customize the number of seconds to wait before abandoning a running migration when the process is about to shutdown, for instance when the user presses `Ctrl+C` or when the container is being stopped (if running inside a container).

View file

@ -157,3 +157,16 @@ export default {
```
Will create new migration files with the `.ts` extension.
### `abortRespite`
**type:** `number`
**default:** `10`
Customize the number of seconds to wait before abandoning a running migration when the process is about to shutdown, for instance when the user presses `Ctrl+C` or when the container is being stopped (if running inside a container).
```js title="emigrate.config.js" {2}
export default {
abortRespite: 10,
};
```

View file

@ -44,6 +44,7 @@ Options:
--no-color Disable color output (this option is passed to the reporter)
--no-execution Mark the migrations as executed and successful without actually running them,
which is useful if you want to mark migrations as successful after running them manually
--abort-respite <sec> The number of seconds to wait before abandoning running migrations after the command has been aborted (default: 10)
Examples:

View file

@ -2,10 +2,11 @@
import process from 'node:process';
import { parseArgs } from 'node:util';
import importFromEsm from 'import-from-esm';
import { ShowUsageError } from './errors.js';
import { CommandAbortError, ShowUsageError } from './errors.js';
import { getConfig } from './get-config.js';
import { DEFAULT_RESPITE_SECONDS } from './defaults.js';
type Action = (args: string[]) => Promise<void>;
type Action = (args: string[], abortSignal: AbortSignal) => Promise<void>;
const useColors = (values: { color?: boolean; 'no-color'?: boolean }) => {
if (values['no-color']) {
@ -21,7 +22,7 @@ const importAll = async (cwd: string, modules: string[]) => {
}
};
const up: Action = async (args) => {
const up: Action = async (args, abortSignal) => {
const config = await getConfig('up');
const { values } = parseArgs({
args,
@ -78,6 +79,9 @@ const up: Action = async (args) => {
'no-color': {
type: 'boolean',
},
'abort-respite': {
type: 'string',
},
},
allowPositionals: false,
});
@ -105,6 +109,7 @@ Options:
--no-color Disable color output (this option is passed to the reporter)
--no-execution Mark the migrations as executed and successful without actually running them,
which is useful if you want to mark migrations as successful after running them manually
--abort-respite <sec> The number of seconds to wait before abandoning running migrations after the command has been aborted (default: ${DEFAULT_RESPITE_SECONDS})
Examples:
@ -133,11 +138,13 @@ Examples:
to,
limit: limitString,
import: imports = [],
'abort-respite': abortRespiteString,
'no-execution': noExecution,
} = values;
const plugins = [...(config.plugins ?? []), ...(values.plugin ?? [])];
const limit = limitString === undefined ? undefined : Number.parseInt(limitString, 10);
const abortRespite = abortRespiteString === undefined ? config.abortRespite : Number.parseInt(abortRespiteString, 10);
if (Number.isNaN(limit)) {
console.error('Invalid limit value, expected an integer but was:', limitString);
@ -146,6 +153,16 @@ Examples:
return;
}
if (Number.isNaN(abortRespite)) {
console.error(
'Invalid abortRespite value, expected an integer but was:',
abortRespiteString ?? config.abortRespite,
);
console.log(usage);
process.exitCode = 1;
return;
}
await importAll(cwd, imports);
try {
@ -161,6 +178,8 @@ Examples:
from,
to,
noExecution,
abortSignal,
abortRespite: (abortRespite ?? DEFAULT_RESPITE_SECONDS) * 1000,
color: useColors(values),
});
} catch (error) {
@ -479,7 +498,7 @@ const commands: Record<string, Action> = {
new: newMigration,
};
const main: Action = async (args) => {
const main: Action = async (args, abortSignal) => {
const { values, positionals } = parseArgs({
args,
options: {
@ -531,12 +550,9 @@ Commands:
return;
}
await action(process.argv.slice(3));
};
try {
await main(process.argv.slice(2));
} catch (error) {
try {
await action(process.argv.slice(3), abortSignal);
} catch (error) {
if (error instanceof Error) {
console.error(error);
if (error.cause instanceof Error) {
@ -547,4 +563,30 @@ try {
}
process.exitCode = 1;
}
}
};
const controller = new AbortController();
process.on('SIGINT', () => {
controller.abort(CommandAbortError.fromSignal('SIGINT'));
});
process.on('SIGTERM', () => {
controller.abort(CommandAbortError.fromSignal('SIGTERM'));
});
process.on('uncaughtException', (error) => {
controller.abort(CommandAbortError.fromReason('Uncaught exception', error));
});
process.on('unhandledRejection', (error) => {
controller.abort(CommandAbortError.fromReason('Unhandled rejection', error));
});
await main(process.argv.slice(2), controller.signal);
setTimeout(() => {
console.error('Process did not exit within 10 seconds, forcing exit');
process.exit(1);
}, 10_000).unref();

View file

@ -12,9 +12,16 @@ import {
type NonFailedMigrationHistoryEntry,
type MigrationMetadataFinished,
} from '@emigrate/types';
import { deserializeError } from 'serialize-error';
import { deserializeError, serializeError } from 'serialize-error';
import { version } from '../get-package-info.js';
import { BadOptionError, MigrationHistoryError, MigrationRunError, StorageInitError } from '../errors.js';
import {
BadOptionError,
CommandAbortError,
ExecutionDesertedError,
MigrationHistoryError,
MigrationRunError,
StorageInitError,
} from '../errors.js';
import upCommand from './up.js';
type Mocked<T> = {
@ -481,6 +488,123 @@ describe('up', () => {
]);
assert.strictEqual(migration.mock.calls.length, 0);
});
describe("aborting the migration process before it's finished", () => {
it('returns 1 and finishes with a command abort error when the migration process is aborted prematurely', async () => {
const controller = new AbortController();
const migration = mock.fn(
async () => {
// Success on second call, and abort
controller.abort(CommandAbortError.fromSignal('SIGINT'));
},
async () => {
// Success on first call
},
{ times: 1 },
);
const { reporter, run } = getUpCommand(
[
'1_some_already_run_migration.js',
'2_some_migration.js',
'3_another_migration.js',
'4_some_other_migration.js',
'5_yet_another_migration.js',
'6_some_more_migration.js',
],
getStorage(['1_some_already_run_migration.js']),
[
{
loadableExtensions: ['.js'],
async loadMigration() {
return migration;
},
},
],
);
const exitCode = await run({
abortSignal: controller.signal,
});
assert.strictEqual(exitCode, 1, 'Exit code');
assertPreconditionsFulfilled(
{ dry: false },
reporter,
[
{ name: '2_some_migration.js', status: 'done', started: true },
{ name: '3_another_migration.js', status: 'done', started: true },
{ name: '4_some_other_migration.js', status: 'skipped' },
{ name: '5_yet_another_migration.js', status: 'skipped' },
{ name: '6_some_more_migration.js', status: 'skipped' },
],
CommandAbortError.fromSignal('SIGINT'),
);
assert.strictEqual(reporter.onAbort.mock.calls.length, 1);
assert.strictEqual(migration.mock.calls.length, 2);
});
});
it('returns 1 and finishes with a deserted error with a command abort error as cause when the migration process is aborted prematurely and stops waiting on migrations taking longer than the respite period after the abort', async () => {
const controller = new AbortController();
const migration = mock.fn(
async () => {
// Success on second call, and abort
controller.abort(CommandAbortError.fromSignal('SIGINT'));
return new Promise((resolve) => {
setTimeout(resolve, 100); // Take longer than the respite period
});
},
async () => {
// Success on first call
},
{ times: 1 },
);
const { reporter, run } = getUpCommand(
[
'1_some_already_run_migration.js',
'2_some_migration.js',
'3_another_migration.js',
'4_some_other_migration.js',
'5_yet_another_migration.js',
'6_some_more_migration.js',
],
getStorage(['1_some_already_run_migration.js']),
[
{
loadableExtensions: ['.js'],
async loadMigration() {
return migration;
},
},
],
);
const exitCode = await run({
abortSignal: controller.signal,
abortRespite: 10,
});
assert.strictEqual(exitCode, 1, 'Exit code');
assertPreconditionsFulfilled(
{ dry: false },
reporter,
[
{ name: '2_some_migration.js', status: 'done', started: true },
{
name: '3_another_migration.js',
status: 'failed',
started: true,
error: ExecutionDesertedError.fromReason('Deserted after 10ms', CommandAbortError.fromSignal('SIGINT')),
},
{ name: '4_some_other_migration.js', status: 'skipped' },
{ name: '5_yet_another_migration.js', status: 'skipped' },
{ name: '6_some_more_migration.js', status: 'skipped' },
],
ExecutionDesertedError.fromReason('Deserted after 10ms', CommandAbortError.fromSignal('SIGINT')),
);
assert.strictEqual(reporter.onAbort.mock.calls.length, 1);
assert.strictEqual(migration.mock.calls.length, 2);
});
});
function getErrorCause(error: Error | undefined): Error | SerializedError | undefined {
@ -570,6 +694,7 @@ function getUpCommand(migrationFiles: string[], storage?: Mocked<Storage>, plugi
const reporter: Mocked<Required<EmigrateReporter>> = {
onFinished: mock.fn(noop),
onInit: mock.fn(noop),
onAbort: mock.fn(noop),
onCollectedMigrations: mock.fn(noop),
onLockedMigrations: mock.fn(noop),
onNewMigration: mock.fn(noop),
@ -689,7 +814,17 @@ function assertPreconditionsFulfilled(
assert.strictEqual(reporter.onMigrationSkip.mock.calls.length, pending + skipped, 'Total pending and skipped');
assert.strictEqual(reporter.onFinished.mock.calls.length, 1, 'Finished called once');
const [entries, error] = reporter.onFinished.mock.calls[0]?.arguments ?? [];
if (finishedError instanceof DOMException || error instanceof DOMException) {
// The assert library doesn't support DOMException apparently, so ugly workaround here:
assert.deepStrictEqual(
deserializeError(serializeError(error)),
deserializeError(serializeError(finishedError)),
'Finished error',
);
} else {
assert.deepStrictEqual(error, finishedError, 'Finished error');
}
const cause = getErrorCause(error);
const expectedCause = finishedError?.cause;
assert.deepStrictEqual(

View file

@ -19,6 +19,8 @@ type ExtraFlags = {
to?: string;
noExecution?: boolean;
getMigrations?: GetMigrationsFunction;
abortSignal?: AbortSignal;
abortRespite?: number;
};
const lazyDefaultReporter = async () => import('../reporters/default.js');
@ -33,6 +35,8 @@ export default async function upCommand({
from,
to,
noExecution,
abortSignal,
abortRespite,
dry = false,
plugins = [],
cwd,
@ -94,6 +98,8 @@ export default async function upCommand({
limit,
from,
to,
abortSignal,
abortRespite,
reporter,
storage,
migrations: await arrayFromAsync(collectedMigrations),

View file

@ -0,0 +1,2 @@
// eslint-disable-next-line @typescript-eslint/naming-convention
export const DEFAULT_RESPITE_SECONDS = 10;

View file

@ -146,6 +146,30 @@ export class StorageInitError extends EmigrateError {
}
}
export class CommandAbortError extends EmigrateError {
static fromSignal(signal: NodeJS.Signals) {
return new CommandAbortError(`Command aborted due to signal: ${signal}`);
}
static fromReason(reason: string, cause?: unknown) {
return new CommandAbortError(`Command aborted: ${reason}`, { cause });
}
constructor(message: string | undefined, options?: ErrorOptions) {
super(message, options, 'ERR_COMMAND_ABORT');
}
}
export class ExecutionDesertedError extends EmigrateError {
static fromReason(reason: string, cause?: Error) {
return new ExecutionDesertedError(`Execution deserted: ${reason}`, { cause });
}
constructor(message: string | undefined, options?: ErrorOptions) {
super(message, options, 'ERR_EXECUTION_DESERTED');
}
}
errorConstructors.set('EmigrateError', EmigrateError as ErrorConstructor);
errorConstructors.set('ShowUsageError', ShowUsageError as ErrorConstructor);
errorConstructors.set('MissingOptionError', MissingOptionError as unknown as ErrorConstructor);
@ -158,3 +182,5 @@ errorConstructors.set('MigrationLoadError', MigrationLoadError as unknown as Err
errorConstructors.set('MigrationRunError', MigrationRunError as unknown as ErrorConstructor);
errorConstructors.set('MigrationNotRunError', MigrationNotRunError as unknown as ErrorConstructor);
errorConstructors.set('StorageInitError', StorageInitError as unknown as ErrorConstructor);
errorConstructors.set('CommandAbortError', CommandAbortError as unknown as ErrorConstructor);
errorConstructors.set('ExecutionDesertedError', ExecutionDesertedError as unknown as ErrorConstructor);

View file

@ -1,22 +1,65 @@
import { toError } from './errors.js';
import prettyMs from 'pretty-ms';
import { ExecutionDesertedError, toError } from './errors.js';
import { DEFAULT_RESPITE_SECONDS } from './defaults.js';
type Fn<Args extends any[], Result> = (...args: Args) => Result;
type Result<T> = [value: T, error: undefined] | [value: undefined, error: Error];
type ExecOptions = {
abortSignal?: AbortSignal;
abortRespite?: number;
};
/**
* Execute a function and return a result tuple
*
* This is a helper function to make it easier to handle errors without the extra nesting of try/catch
* If an abort signal is provided the function will reject with an ExecutionDesertedError if the signal is aborted
* and the given function has not yet resolved within the given respite time (or a default of 30 seconds)
*
* @param fn The function to execute
* @param options Options for the execution
*/
export const exec = async <Args extends any[], Return extends Promise<any>>(
fn: Fn<Args, Return>,
...args: Args
export const exec = async <Return extends Promise<any>>(
fn: () => Return,
options: ExecOptions = {},
): Promise<Result<Awaited<Return>>> => {
try {
const result = await fn(...args);
const aborter = options.abortSignal ? getAborter(options.abortSignal, options.abortRespite) : undefined;
const result = await Promise.race(aborter ? [aborter, fn()] : [fn()]);
return [result, undefined];
} catch (error) {
return [undefined, toError(error)];
}
};
/**
* Returns a promise that rejects after a given time after the given signal is aborted
*
* @param signal The abort signal to listen to
* @param respite The time in milliseconds to wait before rejecting
*/
const getAborter = async (signal: AbortSignal, respite = DEFAULT_RESPITE_SECONDS * 1000): Promise<never> => {
return new Promise((_, reject) => {
if (signal.aborted) {
setTimeout(
reject,
respite,
ExecutionDesertedError.fromReason(`Deserted after ${prettyMs(respite)}`, toError(signal.reason)),
).unref();
return;
}
signal.addEventListener(
'abort',
() => {
setTimeout(
reject,
respite,
ExecutionDesertedError.fromReason(`Deserted after ${prettyMs(respite)}`, toError(signal.reason)),
).unref();
},
{ once: true },
);
});
};

View file

@ -18,6 +18,8 @@ type MigrationRunnerParameters = {
limit?: number;
from?: string;
to?: string;
abortSignal?: AbortSignal;
abortRespite?: number;
reporter: EmigrateReporter;
storage: Storage;
migrations: Array<MigrationMetadata | MigrationMetadataFinished>;
@ -30,6 +32,8 @@ export const migrationRunner = async ({
limit,
from,
to,
abortSignal,
abortRespite,
reporter,
storage,
migrations,
@ -43,6 +47,22 @@ export const migrationRunner = async ({
let skip = false;
abortSignal?.addEventListener(
'abort',
() => {
skip = true;
reporter.onAbort?.(toError(abortSignal.reason))?.then(
() => {
/* noop */
},
() => {
/* noop */
},
);
},
{ once: true },
);
for await (const migration of migrations) {
if (isFinishedMigration(migration)) {
skip ||= migration.status === 'failed' || migration.status === 'skipped';
@ -89,7 +109,7 @@ export const migrationRunner = async ({
const [lockedMigrations, lockError] = dry
? [migrationsToLock]
: await exec(async () => storage.lock(migrationsToLock));
: await exec(async () => storage.lock(migrationsToLock), { abortSignal, abortRespite });
if (lockError) {
for (const migration of migrationsToLock) {
@ -167,7 +187,7 @@ export const migrationRunner = async ({
const start = hrtime();
const [, migrationError] = await exec(async () => execute(migration));
const [, migrationError] = await exec(async () => execute(migration), { abortSignal, abortRespite });
const duration = getDuration(start);
@ -194,7 +214,9 @@ export const migrationRunner = async ({
}
}
const [, unlockError] = dry ? [] : await exec(async () => storage.unlock(lockedMigrations ?? []));
const [, unlockError] = dry
? []
: await exec(async () => storage.unlock(lockedMigrations ?? []), { abortSignal, abortRespite });
// eslint-disable-next-line unicorn/no-array-callback-reference
const firstFailed = finishedMigrations.find(isFailedMigration);
@ -204,7 +226,8 @@ export const migrationRunner = async ({
: firstFailed
? MigrationRunError.fromMetadata(firstFailed)
: undefined;
const error = unlockError ?? firstError ?? lockError;
const error =
unlockError ?? firstError ?? lockError ?? (abortSignal?.aborted ? toError(abortSignal.reason) : undefined);
await reporter.onFinished?.(finishedMigrations, error);

View file

@ -165,6 +165,20 @@ const getError = (error?: ErrorLike, indent = ' ') => {
return parts.join('\n');
};
const getAbortMessage = (reason?: Error) => {
if (!reason) {
return '';
}
const parts = [` ${red.bold(reason.message)}`];
if (isErrorLike(reason.cause)) {
parts.push(getError(reason.cause, ' '));
}
return parts.join('\n');
};
const getSummary = (
command: ReporterInitParameters['command'],
migrations: Array<MigrationMetadata | MigrationMetadataFinished> = [],
@ -281,6 +295,7 @@ class DefaultFancyReporter implements Required<EmigrateReporter> {
#error: Error | undefined;
#parameters!: ReporterInitParameters;
#interval: NodeJS.Timeout | undefined;
#abortReason: Error | undefined;
onInit(parameters: ReporterInitParameters): void | PromiseLike<void> {
this.#parameters = parameters;
@ -288,6 +303,10 @@ class DefaultFancyReporter implements Required<EmigrateReporter> {
this.#start();
}
onAbort(reason: Error): void | PromiseLike<void> {
this.#abortReason = reason;
}
onCollectedMigrations(migrations: MigrationMetadata[]): void | PromiseLike<void> {
this.#migrations = migrations;
}
@ -358,6 +377,7 @@ class DefaultFancyReporter implements Required<EmigrateReporter> {
getTitle(this.#parameters),
getHeaderMessage(this.#parameters.command, this.#migrations, this.#lockedMigrations),
this.#migrations?.map((migration) => getMigrationText(migration, this.#activeMigration)).join('\n') ?? '',
getAbortMessage(this.#abortReason),
getSummary(this.#parameters.command, this.#migrations),
getError(this.#error),
];
@ -403,6 +423,12 @@ class DefaultReporter implements Required<EmigrateReporter> {
console.log('');
}
onAbort(reason: Error): void | PromiseLike<void> {
console.log('');
console.error(getAbortMessage(reason));
console.log('');
}
onCollectedMigrations(migrations: MigrationMetadata[]): void | PromiseLike<void> {
this.#migrations = migrations;
}

View file

@ -12,6 +12,7 @@ export type Config = {
template?: string;
extension?: string;
color?: boolean;
abortRespite?: number;
};
export type EmigrateConfig = Config & {

View file

@ -57,6 +57,10 @@ class PinoReporter implements Required<EmigrateReporter> {
this.#logger.info({ parameters }, `Emigrate "${command}" initialized${parameters.dry ? ' (dry-run)' : ''}`);
}
onAbort(reason: Error): Awaitable<void> {
this.#logger.error({ reason }, `Emigrate "${this.#command}" shutting down`);
}
onCollectedMigrations(migrations: MigrationMetadata[]): Awaitable<void> {
this.#migrations = migrations;
}