postgresql-dart icon indicating copy to clipboard operation
postgresql-dart copied to clipboard

How to listen to pg_notify?

Open ember11498 opened this issue 1 year ago • 9 comments

Let's say in my Postgres I have a trigger in my friendships table (lookup table representing the relationship of 2 users, each row = 1 friendship) that sends notifications to a channel "new_friendship" everytime a new row is inserted.

How can I listen to that channel in my dart server?

ember11498 avatar Jan 31 '24 00:01 ember11498

You may listen on it through: conn.channels['new_friendship'].listen((e) {});, using these: https://pub.dev/documentation/postgres/latest/postgres/Connection/channels.html https://pub.dev/documentation/postgres/latest/postgres/Channels/operator_get.html

isoos avatar Jan 31 '24 07:01 isoos

@isoos I just tried to do connection.channels but there isnt any property called channel?

openChannel function

class PostgresDatabase implements RemoteDatabase, Disposable {
  final completer = Completer<PostgreSQLConnection>();
  final EnvService env;

  PostgresDatabase(this.env) {
    _init();
  }

  _init() async {
    final url = env['DATABASE_URL'];
    final uri = Uri.parse(url);
    var connection = PostgreSQLConnection(
      uri.host,
      uri.port,
      uri.pathSegments.first,
      username: uri.userInfo.split(':').first,
      password: uri.userInfo.split(':').last,
    );
    await connection.open();
    completer.complete(connection);
  }

  Future<void> openChannel() async {
    final connection = await completer.future;
    connection.  // there is no channel property
  }

  @override
  Future<List<Map<String, Map<String, dynamic>>>> mapquery(
    String queryText, {
    Map<String, dynamic> variables = const {},
  }) async {
    final connection = await completer.future;
    return await connection.mappedResultsQuery(
      queryText,
      substitutionValues: variables,
    );
  }

  @override
  Future<PostgreSQLResult> query(String queryText) async {
    final connection = await completer.future;
    final result = await connection.query(queryText);
    return result;
  }

  @override
  void dispose() async {
    final connection = await completer.future;
    connection.close();
  }
}

ember11498 avatar Jan 31 '24 10:01 ember11498

@isoos maybe its something like this?

Future<void> openChannel() async {
  final connection = await completer.future;
  connection.notifications.listen((event) {
    print(event.channel);
    print(event.payload);
    print(event.processID);
  });
}

but this doesnt specify the channel

ember11498 avatar Jan 31 '24 10:01 ember11498

@isoos I just figured that I was using legacy version. I will update to the latest version. thanks

ember11498 avatar Jan 31 '24 11:01 ember11498

@isoos after changing the my dart code I launched my backend server again but I got this error:

Severity.error Server does not support SSL, but it was required. #0 PgConnectionImplementation._connect (package:postgres/src/v3/connection.dart:302) #1 PgConnectionImplementation.connect (package:postgres/src/v3/connection.dart:200)

Any idea how to solve this?

my code looks like:

import 'dart:async';

import 'package:backend_simples/src/services/env/env_service.dart';
import 'package:backend_simples/src/services/database/remote_database.dart';
import 'package:postgres/postgres.dart';
import 'package:shelf_modular/shelf_modular.dart';

class PostgresDatabase implements RemoteDatabase, Disposable {
  final completer = Completer<Connection>();
  final EnvService env;
  late final StreamSubscription listenChannels;

  PostgresDatabase(this.env) {
    _init();
  }

  _init() async {
    final url = env['DATABASE_URL'];
    final uri = Uri.parse(url);
    var conn = await Connection.open(Endpoint(
      host: uri.host,
      port: uri.port,
      database: uri.pathSegments.first,
      username: uri.userInfo.split(':').first,
      password: uri.userInfo.split(':').last,
    ));
    completer.complete(conn);
    listenChannels = await openChannel();
  }

  Future<StreamSubscription> openChannel() async {
    final conn = await completer.future;
    return conn.channels.all.listen((event) {
      print(event.channel);
      print(event.payload);
      print(event.processId);
    });
  }

  @override
  Future<Result> mapquery(
    String queryText, {
    Map<String, dynamic> variables = const {},
  }) async {
    final conn = await completer.future;
    return await conn.execute(
      Sql.named(queryText),
      parameters: variables,
    );
  }

  @override
  Future<Result> query(String queryText) async {
    final conn = await completer.future;
    return await conn.execute(queryText);
  }

  @override
  void dispose() async {
    final conn = await completer.future;
    conn.close();
  }
}

ember11498 avatar Jan 31 '24 17:01 ember11498

You may disable the requirement for SSL connection via: https://pub.dev/documentation/postgres/latest/postgres/SslMode.html

When opening the connection, use this:

        final conn = await Connection.open(
          Endpoint(host: 'localhost', database: 'my_db', port: port),
          settings: ConnectionSettings(
            sslMode: SslMode.disable,
          ),
        );

isoos avatar Jan 31 '24 17:01 isoos

After turning SslMode.disable it now runs without errors. However, in my pgadmin4 if i query:

SELECT pg_notify('new_friend', '{"id1": 1, "id2": 2}');

Nothing is printed in my backend. I was expecting my listenChannels streamsubscription to be listening to any channel notifications. Am I missing something?

Thank you for your help by the way.

ember11498 avatar Jan 31 '24 17:01 ember11498

@isoos this works:

Future<StreamSubscription> openChannel() async {
  final conn = await completer.future;
  return conn.channels['coco'].listen((event) {
    print(event);
    print('yes');
  });
}

but this doesnt:

Future<StreamSubscription> openChannel() async {
  final conn = await completer.future;
  return conn.channels.all.listen((event) {
    print(event);
    print('yes');
  });
}

its fine, I can easily set up one channel at a time, just thought it would work anyways with all instead of selecting a single channel

ember11498 avatar Jan 31 '24 18:01 ember11498

The difference is: channels['coco'] will execute LISTEN "coco"; on the connection, while channels.all does not have anything like that. As IIRC there is not LISTEN * on postgres, so you would need to execute LISTEN "x"; and LISTEN "y" on your connection if you want to use channels.all.

isoos avatar Jan 31 '24 18:01 isoos