lapis icon indicating copy to clipboard operation
lapis copied to clipboard

db.postgres: implement wait() and post() based on Postgres notifications

Open starius opened this issue 8 years ago • 0 comments

Postgres supports publish-subscribe feature. One client listens on a channel (provided as a string), while another (or same) client send a notification. It can be used to send notifications accross machines and nginx workers.

The notification can be delivered with any read operation. To decouple receiving notifications from reading results of normal queries, a background handler with independent database socket is created. The handler starts two light threads: a reader and a writer. The reader reads new notifications from the database. The writer pushes commands "listen" and "unlisten" to the database. Signals to/from the background handler are delivered using openresty's semaphores.

Two new methods of "lapis.db.postgres" (and "lapis.db") were added:

  • wait(channel) -- waits on the channel until it is notified with post(). Returns a payload passed by caller of post().
  • post(channel, payload) -- notifies the channel with the payload.

All waiting light threads in nginx worker share the same database socket.

All light threads waiting on the same channel use the same semaphore object which is triggered by reader light thread of the background handler. When new channel is started being listened, it is provided to writer light thread of the background handler and it sends command "listen channel". When a notification is received, the corresponding channel is "unlistened" by writer light thread and the corresponding semaphore is removed. It is needed to prevent resource leaks in nginx worker and in database worker.

All methods are 100% non-blocking. Notifications add zero overhead when not used.

Example:

  location /wait {
    default_type text/html;
    lua_check_client_abort on;
    content_by_lua '
      local channel = ngx.req.get_uri_args().channel
      while true do
        ngx.say(require("lapis.db").wait(channel))
        ngx.flush()
      end
    ';
  }

  location /post {
    default_type text/html;
    content_by_lua '
      local channel = ngx.req.get_uri_args().channel
      local payload = ngx.req.get_uri_args().payload
      require("lapis.db").post(channel, payload)
    ';
  }
$ curl http://localhost:8080/wait?channel=foo

In another terminal:

$ curl 'http://localhost:25516/post?channel=foo&payload=bar'

The first terminal should output "bar".

This pull request depends on https://github.com/leafo/pgmoon/pull/28

starius avatar Apr 06 '16 22:04 starius