emigrate/packages/postgres/src/index.ts

321 lines
8.4 KiB
TypeScript

import process from 'node:process';
import postgres, { type Options, type PostgresType, type Sql } from 'postgres';
import { getTimestampPrefix, sanitizeMigrationName } from '@emigrate/plugin-tools';
import {
type MigrationMetadata,
type EmigrateStorage,
type LoaderPlugin,
type Storage,
type MigrationMetadataFinished,
type GenerateMigrationFunction,
type GeneratorPlugin,
type SerializedError,
type MigrationHistoryEntry,
type Awaitable,
type MigrationFunction,
} from '@emigrate/types';
const defaultTable = 'migrations';
type ConnectionOptions = Options<Record<string, PostgresType>>;
export type PostgresStorageOptions = {
table?: string;
/**
* @see https://github.com/porsager/postgres#connection
*/
connection: ConnectionOptions | string;
};
export type PostgresLoaderOptions = {
/**
* @see https://github.com/porsager/postgres#connection
*/
connection: ConnectionOptions | string;
};
const getPool = async (connection: ConnectionOptions | string): Promise<Sql> => {
const sql = typeof connection === 'string' ? postgres(connection) : postgres(connection);
await sql`SELECT 1`;
return sql;
};
const lockMigration = async (sql: Sql, table: string, migration: MigrationMetadata) => {
const result = await sql`
INSERT INTO ${sql(table)} (name, status, date)
VALUES (${migration.name}, ${'locked'}, NOW())
ON CONFLICT (name) DO NOTHING
`;
return result.count === 1;
};
const unlockMigration = async (sql: Sql, table: string, migration: MigrationMetadata) => {
const result = await sql`
DELETE FROM ${sql(table)}
WHERE
name = ${migration.name}
AND status = ${'locked'}
`;
return result.count === 1;
};
const finishMigration = async (
sql: Sql,
table: string,
migration: MigrationMetadataFinished,
_error?: SerializedError,
) => {
const result = await sql`
UPDATE
${sql(table)}
SET
status = ${migration.status},
date = NOW()
WHERE
name = ${migration.name}
AND status = ${'locked'}
`;
return result.count === 1;
};
const deleteMigration = async (sql: Sql, table: string, migration: MigrationMetadata) => {
const result = await sql`
DELETE FROM ${sql(table)}
WHERE
name = ${migration.name}
AND status <> ${'locked'}
`;
return result.count === 1;
};
const getDatabaseName = (config: ConnectionOptions | string) => {
if (typeof config === 'string') {
const uri = new URL(config);
return uri.pathname.replace(/^\//u, '');
}
return config.database ?? '';
};
const setDatabaseName = <T extends ConnectionOptions | string>(config: T, databaseName: string): T => {
if (typeof config === 'string') {
const uri = new URL(config);
uri.pathname = `/${databaseName}`;
return uri.toString() as T;
}
if (typeof config === 'object') {
return {
...config,
database: databaseName,
};
}
throw new Error('Invalid connection config');
};
const initializeDatabase = async (config: ConnectionOptions | string) => {
let sql: Sql | undefined;
try {
sql = await getPool(config);
await sql.end();
} catch (error) {
await sql?.end();
// The error code 3D000 means that the database does not exist, but the user might have the permissions to create it
if (error && typeof error === 'object' && 'code' in error && error.code === '3D000') {
const databaseName = getDatabaseName(config);
const postgresConfig = setDatabaseName(config, 'postgres');
const postgresSql = await getPool(postgresConfig);
try {
await postgresSql`CREATE DATABASE ${postgresSql(databaseName)}`;
// Any database creation error here will be propagated
} finally {
await postgresSql.end();
}
} else {
// In this case we don't know how to handle the error, so we rethrow it
throw error;
}
}
};
const initializeTable = async (sql: Sql, table: string) => {
const [row] = await sql<Array<{ exists: 1 }>>`
SELECT 1 as exists
FROM
information_schema.tables
WHERE
table_schema = 'public'
AND table_name = ${table}
`;
if (row?.exists) {
return;
}
// This table definition is compatible with the one used by the immigration-postgres package
await sql`
CREATE TABLE ${sql(table)} (
name varchar(255) not null primary key,
status varchar(32),
date timestamptz not null
);
`;
};
export const createPostgresStorage = ({
table = defaultTable,
connection,
}: PostgresStorageOptions): EmigrateStorage => {
return {
async initializeStorage() {
await initializeDatabase(connection);
const sql = await getPool(connection);
try {
await initializeTable(sql, table);
} catch (error) {
await sql.end();
throw error;
}
const storage: Storage = {
async lock(migrations) {
const lockedMigrations: MigrationMetadata[] = [];
for await (const migration of migrations) {
if (await lockMigration(sql, table, migration)) {
lockedMigrations.push(migration);
}
}
return lockedMigrations;
},
async unlock(migrations) {
for await (const migration of migrations) {
await unlockMigration(sql, table, migration);
}
},
async remove(migration) {
await deleteMigration(sql, table, migration);
},
async *getHistory() {
const query = sql<Array<Exclude<MigrationHistoryEntry, 'error'>>>`
SELECT
*
FROM
${sql(table)}
WHERE
status <> ${'locked'}
ORDER BY
date ASC
`.cursor();
for await (const [row] of query) {
if (!row) {
continue;
}
if (row.status === 'failed') {
yield {
...row,
error: { name: 'Error', message: 'Unknown error' },
};
continue;
}
yield row;
}
},
async onSuccess(migration) {
await finishMigration(sql, table, migration);
},
async onError(migration, error) {
await finishMigration(sql, table, migration, error);
},
async end() {
await sql.end();
},
};
return storage;
},
};
};
export const createPostgresLoader = ({ connection }: PostgresLoaderOptions): LoaderPlugin => {
return {
loadableExtensions: ['.sql'],
async loadMigration(migration) {
return async () => {
const sql = await getPool(connection);
try {
// @ts-expect-error The "simple" option is not documented, but it exists
await sql.file(migration.filePath, { simple: true });
} finally {
await sql.end();
}
};
},
};
};
export const generateMigration: GenerateMigrationFunction = async (name) => {
return {
filename: `${getTimestampPrefix()}_${sanitizeMigrationName(name)}.sql`,
content: `-- Migration: ${name}
`,
};
};
const storage = createPostgresStorage({
table: process.env['POSTGRES_TABLE'],
connection: process.env['POSTGRES_URL'] ?? {
host: process.env['POSTGRES_HOST'],
port: process.env['POSTGRES_PORT'] ? Number.parseInt(process.env['POSTGRES_PORT'], 10) : undefined,
user: process.env['POSTGRES_USER'],
password: process.env['POSTGRES_PASSWORD'],
database: process.env['POSTGRES_DB'],
},
});
const loader = createPostgresLoader({
connection: process.env['POSTGRES_URL'] ?? {
host: process.env['POSTGRES_HOST'],
port: process.env['POSTGRES_PORT'] ? Number.parseInt(process.env['POSTGRES_PORT'], 10) : undefined,
user: process.env['POSTGRES_USER'],
password: process.env['POSTGRES_PASSWORD'],
database: process.env['POSTGRES_DB'],
},
});
// eslint-disable-next-line prefer-destructuring
export const initializeStorage: () => Promise<Storage> = storage.initializeStorage;
// eslint-disable-next-line prefer-destructuring
export const loadableExtensions: string[] = loader.loadableExtensions;
// eslint-disable-next-line prefer-destructuring
export const loadMigration: (migration: MigrationMetadata) => Awaitable<MigrationFunction> = loader.loadMigration;
const defaultExport: EmigrateStorage & LoaderPlugin & GeneratorPlugin = {
initializeStorage,
loadableExtensions,
loadMigration,
generateMigration,
};
export default defaultExport;