fix(mysql): make sure migrations are run in order when run concurrently

Now we either lock all or none of the migrations to run,
to make sure they are not out of order when multiple instances of Emigrate run concurrently.
This commit is contained in:
Joakim Carlstein 2025-04-24 15:06:32 +02:00 committed by Joakim Carlstein
parent 6eb60177c5
commit 26240f49ff
13 changed files with 922 additions and 98 deletions

View file

@ -11,6 +11,7 @@ import {
StorageInitError,
} from '../errors.js';
import {
assertErrorEqualEnough,
getErrorCause,
getMockedReporter,
getMockedStorage,
@ -199,6 +200,11 @@ function assertPreconditionsFailed(reporter: Mocked<Required<EmigrateReporter>>,
assert.strictEqual(reporter.onMigrationSkip.mock.calls.length, 0, 'Total pending and skipped');
assert.strictEqual(reporter.onFinished.mock.calls.length, 1, 'Finished called once');
const [entries, error] = reporter.onFinished.mock.calls[0]?.arguments ?? [];
// hackety hack:
if (finishedError) {
finishedError.stack = error?.stack;
}
assert.deepStrictEqual(error, finishedError, 'Finished error');
const cause = getErrorCause(error);
const expectedCause = finishedError?.cause;
@ -288,14 +294,7 @@ function assertPreconditionsFulfilled(
assert.strictEqual(reporter.onMigrationSkip.mock.calls.length, 0, 'Total pending and skipped');
assert.strictEqual(reporter.onFinished.mock.calls.length, 1, 'Finished called once');
const [entries, error] = reporter.onFinished.mock.calls[0]?.arguments ?? [];
assert.deepStrictEqual(error, finishedError, 'Finished error');
const cause = getErrorCause(error);
const expectedCause = finishedError?.cause;
assert.deepStrictEqual(
cause,
expectedCause ? deserializeError(expectedCause) : expectedCause,
'Finished error cause',
);
assertErrorEqualEnough(error, finishedError, 'Finished error');
assert.strictEqual(entries?.length, expected.length, 'Finished entries length');
assert.deepStrictEqual(
entries.map((entry) => `${entry.name} (${entry.status})`),

View file

@ -1,13 +1,6 @@
import { describe, it, mock } from 'node:test';
import assert from 'node:assert';
import {
type EmigrateReporter,
type Storage,
type Plugin,
type SerializedError,
type MigrationMetadataFinished,
} from '@emigrate/types';
import { deserializeError } from 'serialize-error';
import { type EmigrateReporter, type Storage, type Plugin, type MigrationMetadataFinished } from '@emigrate/types';
import { version } from '../get-package-info.js';
import {
BadOptionError,
@ -16,7 +9,6 @@ import {
MigrationHistoryError,
MigrationRunError,
StorageInitError,
toSerializedError,
} from '../errors.js';
import {
type Mocked,
@ -24,7 +16,7 @@ import {
toMigrations,
getMockedReporter,
getMockedStorage,
getErrorCause,
assertErrorEqualEnough,
} from '../test-utils.js';
import upCommand from './up.js';
@ -930,15 +922,13 @@ function assertPreconditionsFulfilled(
for (const [index, entry] of failedEntries.entries()) {
if (entry.status === 'failed') {
const error = reporter.onMigrationError.mock.calls[index]?.arguments[1];
assert.deepStrictEqual(error, entry.error, 'Error');
const cause = entry.error?.cause;
assert.deepStrictEqual(error?.cause, cause ? deserializeError(cause) : cause, 'Error cause');
assertErrorEqualEnough(error, entry.error, 'Error');
if (entry.started) {
const [finishedMigration, error] = storage.onError.mock.calls[index]?.arguments ?? [];
assert.strictEqual(finishedMigration?.name, entry.name);
assert.strictEqual(finishedMigration?.status, entry.status);
assertErrorEqualEnough(error, entry.error);
assertErrorEqualEnough(error, entry.error, `Entry error (${entry.name})`);
}
}
}
@ -946,15 +936,7 @@ function assertPreconditionsFulfilled(
assert.strictEqual(reporter.onMigrationSkip.mock.calls.length, pending + skipped, 'Total pending and skipped');
assert.strictEqual(reporter.onFinished.mock.calls.length, 1, 'Finished called once');
const [entries, error] = reporter.onFinished.mock.calls[0]?.arguments ?? [];
assertErrorEqualEnough(error, finishedError);
const cause = getErrorCause(error);
const expectedCause = finishedError?.cause;
assert.deepStrictEqual(
cause,
expectedCause ? deserializeError(expectedCause) : expectedCause,
'Finished error cause',
);
assertErrorEqualEnough(error, finishedError, 'Finished error');
assert.strictEqual(entries?.length, expected.length, 'Finished entries length');
assert.deepStrictEqual(
entries.map((entry) => `${entry.name} (${entry.status})`),
@ -995,33 +977,6 @@ function assertPreconditionsFailed(
assert.strictEqual(reporter.onMigrationSkip.mock.calls.length, 0, 'Total pending and skipped');
assert.strictEqual(reporter.onFinished.mock.calls.length, 1, 'Finished called once');
const [entries, error] = reporter.onFinished.mock.calls[0]?.arguments ?? [];
assert.deepStrictEqual(error, finishedError, 'Finished error');
const cause = getErrorCause(error);
const expectedCause = finishedError?.cause;
assert.deepStrictEqual(
cause,
expectedCause ? deserializeError(expectedCause) : expectedCause,
'Finished error cause',
);
assertErrorEqualEnough(error, finishedError, 'Finished error');
assert.strictEqual(entries?.length, 0, 'Finished entries length');
}
function assertErrorEqualEnough(actual?: Error | SerializedError, expected?: Error) {
if (expected === undefined) {
assert.strictEqual(actual, undefined);
return;
}
const {
cause: actualCause,
stack: actualStack,
...actualError
} = actual instanceof Error ? toSerializedError(actual) : actual ?? {};
const { cause: expectedCause, stack: expectedStack, ...expectedError } = toSerializedError(expected);
// @ts-expect-error Ignore
const { stack: actualCauseStack, ...actualCauseRest } = actualCause ?? {};
// @ts-expect-error Ignore
const { stack: expectedCauseStack, ...expectedCauseRest } = expectedCause ?? {};
assert.deepStrictEqual(actualError, expectedError);
assert.deepStrictEqual(actualCauseRest, expectedCauseRest);
}

View file

@ -1,5 +1,6 @@
import { mock, type Mock } from 'node:test';
import path from 'node:path';
import assert from 'node:assert';
import {
type SerializedError,
type EmigrateReporter,
@ -9,6 +10,7 @@ import {
type NonFailedMigrationHistoryEntry,
type Storage,
} from '@emigrate/types';
import { toSerializedError } from './errors.js';
export type Mocked<T> = {
// @ts-expect-error - This is a mock
@ -110,3 +112,23 @@ export function toEntries(
): MigrationHistoryEntry[] {
return names.map((name) => (typeof name === 'string' ? toEntry(name, status) : name));
}
export function assertErrorEqualEnough(actual?: Error | SerializedError, expected?: Error, message?: string): void {
if (expected === undefined) {
assert.strictEqual(actual, undefined);
return;
}
const {
cause: actualCause,
stack: actualStack,
...actualError
} = actual instanceof Error ? toSerializedError(actual) : actual ?? {};
const { cause: expectedCause, stack: expectedStack, ...expectedError } = toSerializedError(expected);
// @ts-expect-error Ignore
const { stack: actualCauseStack, ...actualCauseRest } = actualCause ?? {};
// @ts-expect-error Ignore
const { stack: expectedCauseStack, ...expectedCauseRest } = expectedCause ?? {};
assert.deepStrictEqual(actualError, expectedError, message);
assert.deepStrictEqual(actualCauseRest, expectedCauseRest, message ? `${message} (cause)` : undefined);
}

View file

@ -17,12 +17,16 @@
},
"files": [
"dist",
"!dist/*.tsbuildinfo"
"!dist/*.tsbuildinfo",
"!dist/**/*.test.js",
"!dist/tests/*"
],
"scripts": {
"build": "tsc --pretty",
"build:watch": "tsc --pretty --watch",
"lint": "xo --cwd=../.. $(pwd)"
"lint": "xo --cwd=../.. $(pwd)",
"test-disabled": "glob -c \"node --import tsx --test-reporter spec --test\" \"./src/**/*.test.ts\"",
"test:watch": "glob -c \"node --watch --import tsx --test-reporter spec --test\" \"./src/**/*.test.ts\""
},
"keywords": [
"emigrate",

View file

@ -0,0 +1,92 @@
import assert from 'node:assert';
import path from 'node:path';
import { before, after, describe, it } from 'node:test';
import type { MigrationMetadata } from '@emigrate/types';
import { startDatabase, stopDatabase } from './tests/database.js';
import { createMysqlStorage } from './index.js';
let db: { port: number; host: string };
describe('emigrate-mysql', async () => {
before(
async () => {
db = await startDatabase();
},
{ timeout: 60_000 },
);
after(
async () => {
await stopDatabase();
},
{ timeout: 10_000 },
);
describe('migration locks', async () => {
it('either locks none or all of the given migrations', async () => {
const { initializeStorage } = createMysqlStorage({
table: 'migrations',
connection: {
host: db.host,
user: 'emigrate',
password: 'emigrate',
database: 'emigrate',
port: db.port,
},
});
const [storage1, storage2] = await Promise.all([initializeStorage(), initializeStorage()]);
const migrations = toMigrations('/emigrate', 'migrations', [
'2023-10-01-01-test.js',
'2023-10-01-02-test.js',
'2023-10-01-03-test.js',
'2023-10-01-04-test.js',
'2023-10-01-05-test.js',
'2023-10-01-06-test.js',
'2023-10-01-07-test.js',
'2023-10-01-08-test.js',
'2023-10-01-09-test.js',
'2023-10-01-10-test.js',
'2023-10-01-11-test.js',
'2023-10-01-12-test.js',
'2023-10-01-13-test.js',
'2023-10-01-14-test.js',
'2023-10-01-15-test.js',
'2023-10-01-16-test.js',
'2023-10-01-17-test.js',
'2023-10-01-18-test.js',
'2023-10-01-19-test.js',
'2023-10-01-20-test.js',
]);
const [locked1, locked2] = await Promise.all([storage1.lock(migrations), storage2.lock(migrations)]);
assert.strictEqual(
locked1.length === 0 || locked2.length === 0,
true,
'One of the processes should have no locks',
);
assert.strictEqual(
locked1.length === 20 || locked2.length === 20,
true,
'One of the processes should have all locks',
);
});
});
});
function toMigration(cwd: string, directory: string, name: string): MigrationMetadata {
return {
name,
filePath: `${cwd}/${directory}/${name}`,
relativeFilePath: `${directory}/${name}`,
extension: path.extname(name),
directory,
cwd,
};
}
function toMigrations(cwd: string, directory: string, names: string[]): MigrationMetadata[] {
return names.map((name) => toMigration(cwd, directory, name));
}

View file

@ -1,5 +1,6 @@
import process from 'node:process';
import fs from 'node:fs/promises';
import { setTimeout } from 'node:timers/promises';
import {
createConnection,
createPool,
@ -54,6 +55,7 @@ const getConnection = async (options: ConnectionOptions | string) => {
// best to leave this at 0 (disabled)
uri.searchParams.set('connectTimeout', '0');
uri.searchParams.set('multipleStatements', 'true');
uri.searchParams.set('flags', '-FOUND_ROWS');
connection = await createConnection(uri.toString());
} else {
@ -64,6 +66,7 @@ const getConnection = async (options: ConnectionOptions | string) => {
// best to leave this at 0 (disabled)
connectTimeout: 0,
multipleStatements: true,
flags: ['-FOUND_ROWS'],
});
}
@ -84,6 +87,7 @@ const getPool = (connection: PoolOptions | string) => {
// it throws an error you can't catch and crashes node
// best to leave this at 0 (disabled)
uri.searchParams.set('connectTimeout', '0');
uri.searchParams.set('flags', '-FOUND_ROWS');
return createPool(uri.toString());
}
@ -94,6 +98,7 @@ const getPool = (connection: PoolOptions | string) => {
// it throws an error you can't catch and crashes node
// best to leave this at 0 (disabled)
connectTimeout: 0,
flags: ['-FOUND_ROWS'],
});
};
@ -104,8 +109,8 @@ type HistoryEntry = {
error?: SerializedError;
};
const lockMigration = async (pool: Pool, table: string, migration: MigrationMetadata) => {
const [result] = await pool.execute<ResultSetHeader>({
const lockMigration = async (connection: Connection, table: string, migration: MigrationMetadata) => {
const [result] = await connection.execute<ResultSetHeader>({
sql: `
INSERT INTO ${escapeId(table)} (name, status, date)
VALUES (?, ?, NOW())
@ -228,8 +233,10 @@ const initializeDatabase = async (config: ConnectionOptions | string) => {
}
};
const initializeTable = async (pool: Pool, table: string) => {
const [result] = await pool.execute<RowDataPacket[]>({
const lockWaitTimeout = 10; // seconds
const isHistoryTableExisting = async (connection: Connection, table: string) => {
const [result] = await connection.execute<RowDataPacket[]>({
sql: `
SELECT
1 as table_exists
@ -242,24 +249,70 @@ const initializeTable = async (pool: Pool, table: string) => {
values: [table],
});
if (result[0]?.['table_exists']) {
return result[0]?.['table_exists'] === 1;
};
const initializeTable = async (config: ConnectionOptions | string, table: string) => {
const connection = await getConnection(config);
if (await isHistoryTableExisting(connection, table)) {
await connection.end();
return;
}
// This table definition is compatible with the one used by the immigration-mysql package
await pool.execute(`
CREATE TABLE ${escapeId(table)} (
name varchar(255) not null primary key,
status varchar(32),
date datetime not null
) Engine=InnoDB;
`);
const lockName = `emigrate_init_table_lock_${table}`;
const [lockResult] = await connection.query<RowDataPacket[]>(`SELECT GET_LOCK(?, ?) AS got_lock`, [
lockName,
lockWaitTimeout,
]);
const didGetLock = lockResult[0]?.['got_lock'] === 1;
if (didGetLock) {
try {
// This table definition is compatible with the one used by the immigration-mysql package
await connection.execute(`
CREATE TABLE IF NOT EXISTS ${escapeId(table)} (
name varchar(255) not null primary key,
status varchar(32),
date datetime not null
) Engine=InnoDB;
`);
} finally {
await connection.query(`SELECT RELEASE_LOCK(?)`, [lockName]);
await connection.end();
}
return;
}
// Didn't get the lock, wait to see if the table was created by another process
const maxWait = lockWaitTimeout * 1000; // milliseconds
const checkInterval = 250; // milliseconds
const start = Date.now();
try {
while (Date.now() - start < maxWait) {
// eslint-disable-next-line no-await-in-loop
if (await isHistoryTableExisting(connection, table)) {
return;
}
// eslint-disable-next-line no-await-in-loop
await setTimeout(checkInterval);
}
throw new Error(`Timeout waiting for table ${table} to be created by other process`);
} finally {
await connection.end();
}
};
export const createMysqlStorage = ({ table = defaultTable, connection }: MysqlStorageOptions): EmigrateStorage => {
return {
async initializeStorage() {
await initializeDatabase(connection);
await initializeTable(connection, table);
const pool = getPool(connection);
@ -271,24 +324,35 @@ export const createMysqlStorage = ({ table = defaultTable, connection }: MysqlSt
});
}
try {
await initializeTable(pool, table);
} catch (error) {
await pool.end();
throw error;
}
const storage: Storage = {
async lock(migrations) {
const lockedMigrations: MigrationMetadata[] = [];
const connection = await pool.getConnection();
for await (const migration of migrations) {
if (await lockMigration(pool, table, migration)) {
lockedMigrations.push(migration);
try {
await connection.beginTransaction();
const lockedMigrations: MigrationMetadata[] = [];
for await (const migration of migrations) {
if (await lockMigration(connection, table, migration)) {
lockedMigrations.push(migration);
}
}
}
return lockedMigrations;
if (lockedMigrations.length === migrations.length) {
await connection.commit();
return lockedMigrations;
}
await connection.rollback();
return [];
} catch (error) {
await connection.rollback();
throw error;
} finally {
connection.release();
}
},
async unlock(migrations) {
for await (const migration of migrations) {

View file

@ -0,0 +1,45 @@
/* eslint @typescript-eslint/naming-convention:0, import/no-extraneous-dependencies: 0 */
import process from 'node:process';
import { GenericContainer, type StartedTestContainer } from 'testcontainers';
let container: StartedTestContainer | undefined;
export const startDatabase = async (): Promise<{ port: number; host: string }> => {
if (process.env['CI']) {
return {
port: process.env['MYSQL_PORT'] ? Number.parseInt(process.env['MYSQL_PORT'], 10) : 3306,
// eslint-disable-next-line @typescript-eslint/prefer-nullish-coalescing
host: process.env['MYSQL_HOST'] || 'localhost',
};
}
if (!container) {
console.log('Starting MySQL container...');
const containerSetup = new GenericContainer('mysql:8.2')
.withEnvironment({
MYSQL_ROOT_PASSWORD: 'admin',
MYSQL_USER: 'emigrate',
MYSQL_PASSWORD: 'emigrate',
MYSQL_DATABASE: 'emigrate',
})
.withTmpFs({ '/var/lib/mysql': 'rw' })
.withCommand(['--sql-mode=NO_ENGINE_SUBSTITUTION', '--default-authentication-plugin=mysql_native_password'])
.withExposedPorts(3306)
.withReuse();
container = await containerSetup.start();
console.log('MySQL container started');
}
return { port: container.getMappedPort(3306), host: container.getHost() };
};
export const stopDatabase = async (): Promise<void> => {
if (container) {
console.log('Stopping MySQL container...');
await container.stop();
console.log('MySQL container stopped');
container = undefined;
}
};