slonik-utilities icon indicating copy to clipboard operation
slonik-utilities copied to clipboard

Implement cancel() and terminate() methods

Open gajus opened this issue 6 years ago • 3 comments

These methods would abstract the following patterns:

await pool.connect(async (connection0) => {
  await pool.connect(async (connection1) => {
    const backendProcessId = await connection1.oneFirst(sql`SELECT pg_backend_pid()`);

    setTimeout(() => {
      connection0.query(sql`SELECT pg_terminate_backend(${backendProcessId})`)
    }, 2000);

    try {
      await connection1.query(sql`SELECT pg_sleep(30)`);
    } catch (error) {

    }

    // This will never be executed.
    await connection1.query(sql`SELECT now()`)
  });
});

and

await pool.connect(async (connection0) => {
  await pool.connect(async (connection1) => {
    const backendProcessId = await connection1.oneFirst(sql`SELECT pg_backend_pid()`);

    setTimeout(() => {
      connection0.query(sql`SELECT pg_cancel_backend(${backendProcessId})`)
    }, 2000);

    try {
      await connection1.query(sql`SELECT pg_sleep(30)`);
    } catch (error) {

    }

    // This will be executed.
    await connection1.query(sql`SELECT now()`)
  });
});

The design challenge:

  • Should the supervisor connection (the one thats responsible for cancelling the query) be created before the query executed?
  • If yes, then this would double the number of connections.
  • We could use a dedicated method pool.cancellableConnect((connection) => {}).
  • If not, then how do we guarantee that there are enough connection slots to start the new connection when required?

gajus avatar Feb 26 '19 17:02 gajus

  • QueryCancelledError can be used to capture pg_cancel_backend.
  • BackendTerminatedError can be used to capture pg_terminate_backend.

https://github.com/gajus/slonik/issues/39

gajus avatar Feb 26 '19 17:02 gajus

Example implementation:

const cancellableConnection = async (pool: DatabasePoolType, handler: (connection: DatabaseConnectionType, cancel: () => Promise<void>) => Promise<*>) => {
  return pool.connect(async (connection0) => {
    let done = false;

    const backendProcessId = await connection0.oneFirst(sql`SELECT pg_backend_pid()`);

    await pool.connect(async (connection1) => {
      await handler(connection1, async () => {
        if (done) {
          return;
        }

        await connection1.query(sql`
          SELECT pg_terminate_backend(${backendProcessId})
        `);
      });

      done = true;
    });
  });
};

Used as:

await cancellableConnection(target, (connection, cancel) => {
  setTimeout(() => {
    cancel();
  }, 1000);

  return connection.query(sql`SELECT pg_sleep(10)`);
});

gajus avatar Feb 26 '19 20:02 gajus

A more correct implementation:

const cancellableConnection = async (pool: DatabasePoolType, cancellableConnectionRoutine: CancellableConnectionRoutineType) => {
  let done = false;

  return pool.connect(async (connection0) => {
    return pool.connect(async (connection1) => {
      const backendProcessId = await connection1.oneFirst(sql`SELECT pg_backend_pid()`);

      const cancel = () => {
        if (done) {
          return;
        }

        done = true;

        connection0.query(sql`
          SELECT pg_terminate_backend(${backendProcessId})
        `)
          .catch((error) => {
            if (!(error instanceof BackendTerminatedError)) {
              throw error;
            }
          });
      };

      let result;

      try {
        result = await cancellableConnectionRoutine(connection1, cancel);

        done = true;
      } catch (error) {
        done = true;

        throw error;
      }

      return result;
    });
  });
};

gajus avatar Feb 27 '19 14:02 gajus