slonik-utilities
slonik-utilities copied to clipboard
Implement cancel() and terminate() methods
These methods would abstract the following patterns:
await pool.connect(async (connection0) => {
await pool.connect(async (connection1) => {
const backendProcessId = await connection1.oneFirst(sql`SELECT pg_backend_pid()`);
setTimeout(() => {
connection0.query(sql`SELECT pg_terminate_backend(${backendProcessId})`)
}, 2000);
try {
await connection1.query(sql`SELECT pg_sleep(30)`);
} catch (error) {
}
// This will never be executed.
await connection1.query(sql`SELECT now()`)
});
});
and
await pool.connect(async (connection0) => {
await pool.connect(async (connection1) => {
const backendProcessId = await connection1.oneFirst(sql`SELECT pg_backend_pid()`);
setTimeout(() => {
connection0.query(sql`SELECT pg_cancel_backend(${backendProcessId})`)
}, 2000);
try {
await connection1.query(sql`SELECT pg_sleep(30)`);
} catch (error) {
}
// This will be executed.
await connection1.query(sql`SELECT now()`)
});
});
The design challenge:
- Should the supervisor connection (the one thats responsible for cancelling the query) be created before the query executed?
- If yes, then this would double the number of connections.
- We could use a dedicated method
pool.cancellableConnect((connection) => {}). - If not, then how do we guarantee that there are enough connection slots to start the new connection when required?
QueryCancelledErrorcan be used to capturepg_cancel_backend.BackendTerminatedErrorcan be used to capturepg_terminate_backend.
https://github.com/gajus/slonik/issues/39
Example implementation:
const cancellableConnection = async (pool: DatabasePoolType, handler: (connection: DatabaseConnectionType, cancel: () => Promise<void>) => Promise<*>) => {
return pool.connect(async (connection0) => {
let done = false;
const backendProcessId = await connection0.oneFirst(sql`SELECT pg_backend_pid()`);
await pool.connect(async (connection1) => {
await handler(connection1, async () => {
if (done) {
return;
}
await connection1.query(sql`
SELECT pg_terminate_backend(${backendProcessId})
`);
});
done = true;
});
});
};
Used as:
await cancellableConnection(target, (connection, cancel) => {
setTimeout(() => {
cancel();
}, 1000);
return connection.query(sql`SELECT pg_sleep(10)`);
});
A more correct implementation:
const cancellableConnection = async (pool: DatabasePoolType, cancellableConnectionRoutine: CancellableConnectionRoutineType) => {
let done = false;
return pool.connect(async (connection0) => {
return pool.connect(async (connection1) => {
const backendProcessId = await connection1.oneFirst(sql`SELECT pg_backend_pid()`);
const cancel = () => {
if (done) {
return;
}
done = true;
connection0.query(sql`
SELECT pg_terminate_backend(${backendProcessId})
`)
.catch((error) => {
if (!(error instanceof BackendTerminatedError)) {
throw error;
}
});
};
let result;
try {
result = await cancellableConnectionRoutine(connection1, cancel);
done = true;
} catch (error) {
done = true;
throw error;
}
return result;
});
});
};