feat(storage): add "end" method to storage for cleaning up resources when commands are finished

This commit is contained in:
Joakim Carlstein 2023-12-08 13:01:19 +01:00
parent 334e2099bb
commit 703e6f028a
15 changed files with 150 additions and 77 deletions

View file

@ -167,30 +167,34 @@ export const createMysqlStorage = ({ table = defaultTable, connection }: MysqlSt
try {
await initializeTable(pool, table);
} catch (error) {
await pool.end();
throw error;
}
const storage: Storage = {
async lock(migrations) {
const lockedMigrations: MigrationMetadata[] = [];
const storage: Storage = {
async lock(migrations) {
const lockedMigrations: MigrationMetadata[] = [];
for await (const migration of migrations) {
if (await lockMigration(pool, table, migration)) {
lockedMigrations.push(migration);
}
for await (const migration of migrations) {
if (await lockMigration(pool, table, migration)) {
lockedMigrations.push(migration);
}
}
return lockedMigrations;
},
async unlock(migrations) {
for await (const migration of migrations) {
await unlockMigration(pool, table, migration);
}
},
async remove(migration) {
await deleteMigration(pool, table, migration);
},
async *getHistory() {
const [rows] = await pool.execute<Array<RowDataPacket & HistoryEntry>>({
sql: `
return lockedMigrations;
},
async unlock(migrations) {
for await (const migration of migrations) {
await unlockMigration(pool, table, migration);
}
},
async remove(migration) {
await deleteMigration(pool, table, migration);
},
async *getHistory() {
const [rows] = await pool.execute<Array<RowDataPacket & HistoryEntry>>({
sql: `
SELECT
*
FROM
@ -200,31 +204,31 @@ export const createMysqlStorage = ({ table = defaultTable, connection }: MysqlSt
ORDER BY
date ASC
`,
values: ['locked'],
});
values: ['locked'],
});
for (const row of rows) {
yield {
name: row.name,
status: row.status,
date: new Date(row.date),
// FIXME: Migrate the migrations table to support the error column
error: row.status === 'failed' ? new Error('Unknown error reason') : undefined,
};
}
},
async onSuccess(migration) {
await finishMigration(pool, table, migration);
},
async onError(migration, error) {
await finishMigration(pool, table, { ...migration, status: 'failed', error });
},
};
for (const row of rows) {
yield {
name: row.name,
status: row.status,
date: new Date(row.date),
// FIXME: Migrate the migrations table to support the error column
error: row.status === 'failed' ? new Error('Unknown error reason') : undefined,
};
}
},
async onSuccess(migration) {
await finishMigration(pool, table, migration);
},
async onError(migration, error) {
await finishMigration(pool, table, { ...migration, status: 'failed', error });
},
async end() {
await pool.end();
},
};
return storage;
} finally {
await pool.end();
}
return storage;
},
};
};