pdo-event-store icon indicating copy to clipboard operation
pdo-event-store copied to clipboard

Make use of Postgres pub/sub ?

Open codeliner opened this issue 8 years ago • 14 comments

I read recently that PostgresSql supports durable pub/sub

I'm wondering if this is useful for our projections to avoid polling event streams directly ... The mechanism is relatively simple: When creating a stream or appending to it we could use NOTIFY to indicate that new events are available.

Running projections can use pg_get_notify to check for new notifications and only query effected event streams if new events are available.

pg_get_notify still uses polling, so I'm not sure if it is worth the effort. I also checked amphp but they use pg_get_notify, too. But an AmphpProjectionManager would be nice so that a single non-blocking Postgres connection is shared between multiple projections. But that's a different story.

Thoughts @prolic @basz @sandrokeil @oqq @shochdoerfer ?

@shochdoerfer Do you use that feature or do you know someone who uses it?

codeliner avatar Jun 20 '17 08:06 codeliner

I am aware of that feature but have never tried it myself.

shochdoerfer avatar Jun 20 '17 08:06 shochdoerfer

IMO, if you get down this road, it's more efficient to just do everything in the DB here.

Ocramius avatar Jun 20 '17 15:06 Ocramius

This should be more efficient than polling the table. Especially on high polling frequency. We should give it a try. Like @Ocramius mentioned, a trigger should work.

sandrokeil avatar Jun 20 '17 16:06 sandrokeil

Interestingly, I was reading about this stuff yesterday:

  • https://github.com/jfischoff/postgresql-queue
  • https://hackernoon.com/testing-postgresql-for-fun-af891047e5fc

That's haskell keeping a connection with the DB, but the NOTIFY should probably stay in the database directly, instead of looping through PHP (which can become an extreme bottleneck)

Ocramius avatar Jun 20 '17 16:06 Ocramius

@Ocramius

Interestingly, I was reading about this stuff yesterday:

Follow my first link "durable pub/sub". You will be surprised :D

codeliner avatar Jun 20 '17 16:06 codeliner

DB trigger for NOTIFY is a great idea. This would keep pdo-event-store logic untouched. Even better we can keep everything untouched and just wrap a projection with a pg_get_notify loop and run the projection manually whenever a new notification is received. One can put that into amphp loop or use Node.js or even haskell projections :D

codeliner avatar Jun 20 '17 16:06 codeliner

You will be surprised :D

More like: "I don't know how to read, duh"

Ocramius avatar Jun 20 '17 16:06 Ocramius

Amp v2's postgres package supports listening and notifying that is entirely async. pg_get_notify is used in the watcher callback that is only invoked when data is available (never blocks). The code below is taken from the listening example.

use Amp\Loop;
use Amp\Postgres;

Loop::run(function () {
    $pool = Postgres\pool('host=localhost user=postgres');

    $channel = "test";

    /** @var \Amp\Postgres\Listener $listener */
    $listener = yield $pool->listen($channel);

    printf("Listening on channel '%s'\n", $listener->getChannel());

    Loop::delay(3000, function () use ($listener) { // Unlisten in 3 seconds.
        printf("Unlistening from channel '%s'\n", $listener->getChannel());
        return $listener->unlisten();
    });

    Loop::delay(1000, function () use ($pool, $channel) {
        return $pool->notify($channel, "Data 1"); // Send first notification.
    });

    Loop::delay(2000, function () use ($pool, $channel) {
        return $pool->notify($channel, "Data 2"); // Send second notification.
    });

    while (yield $listener->advance()) {
        $notification = $listener->getCurrent();
        printf(
            "Received notification from PID %d on channel '%s' with payload: %s\n",
            $notification->pid,
            $notification->channel,
            $notification->payload
        );
    }
});

trowski avatar Jun 21 '17 04:06 trowski

@trowski wow, that's nice. Did not read the source code carefully enough. Just to make sure I understand correctly: watcher callback is only invoked if data is available. But it will also be invoked if I would notify on channel otherChannel, right?

Anyway, that is a killer feature for an amp postgres projection. Thx for the reply!

codeliner avatar Jun 21 '17 10:06 codeliner

@codeliner Yes, the watcher callback is only invoked when the connection associated with that watcher has data available. This is done by creating the watcher using the stream socket resource returned from pg_socket(). The watcher is invoked anytime data arrives from a listener, query, prepare, etc.

You can listen and notify on multiple channels. All connection ops return promises that resolve with various objects depending on the op when complete. Make a new project requiring amphp/postgres and give the script below a try.

#!/usr/bin/env php
<?php

require __DIR__ . '/vendor/autoload.php';

use Amp\Iterator;
use Amp\Loop;
use Amp\Postgres;

Loop::run(function () {
    $pool = Postgres\pool('host=localhost user=postgres');

    $channel1 = "test1";
    $channel2 = "test2";

    /** @var \Amp\Postgres\Listener $listener1 */
    $listener1 = yield $pool->listen($channel1);

    printf("Listening on channel '%s'\n", $listener1->getChannel());

    /** @var \Amp\Postgres\Listener $listener2 */
    $listener2 = yield $pool->listen($channel2);

    printf("Listening on channel '%s'\n", $listener2->getChannel());

    Loop::delay(6000, function () use ($listener1) { // Unlisten in 6 seconds.
        printf("Unlistening from channel '%s'\n", $listener1->getChannel());
        return $listener1->unlisten();
    });

    Loop::delay(4000, function () use ($listener2) { // Unlisten in 4 seconds.
        printf("Unlistening from channel '%s'\n", $listener2->getChannel());
        return $listener2->unlisten();
    });

    Loop::delay(1000, function () use ($pool, $channel1) {
        return $pool->notify($channel1, "Data 1.1");
    });

    Loop::delay(2000, function () use ($pool, $channel2) {
        return $pool->notify($channel2, "Data 2.1");
    });

    Loop::delay(3000, function () use ($pool, $channel2) {
        return $pool->notify($channel2, "Data 2.2");
    });

    Loop::delay(5000, function () use ($pool, $channel1) {
        return $pool->notify($channel1, "Data 1.2");
    });

    // Merge both listeners into single iterator.
    $listener = Iterator\merge([$listener1, $listener2]);

    while (yield $listener->advance()) {
        $notification = $listener->getCurrent();
        printf(
            "Received notification from PID %d on channel '%s' with payload: %s\n",
            $notification->pid,
            $notification->channel,
            $notification->payload
        );
    }
});

A connection can have multiple listeners, though a connection can only perform a single query at a time. The library also provides a connection pool that sends concurrent queries over multiple connection. In general you will always want to use a connection pool.

trowski avatar Jun 21 '17 17:06 trowski

Coming to this again today (more then one year later) as we are planning event-store v8, I think this is another reason to pin this new major release to postgres only. I leave this issue open for now, until we get to implement and work really on v8

prolic avatar Sep 23 '18 16:09 prolic

I think this is another reason to pin this new major release to postgres only.

But MongoDB 4 has such a feature too and it is implemented here.

sandrokeil avatar Sep 23 '18 18:09 sandrokeil

For reference/inspiration, a similar PR is opened on symfony/messenger (https://github.com/symfony/symfony/pull/35485).

gquemener avatar Jan 30 '20 06:01 gquemener

@gquemener thanks for the link. That's indeed a good inspiration.

codeliner avatar Feb 21 '20 22:02 codeliner