pgsink
pgsink copied to clipboard
Logically replicate data out of Postgres into sinks (files, Google BigQuery, etc)
pgsink

Path to v1.0.0: https://github.com/lawrencejones/pgsink/projects/1
Draft docs can be seen at: docs
pgsink is a Postgres change-capture device that supports high-throughput and low-latency capture to a variety of sinks.
You'd use this project if your primary database is Postgres and you want a stress-free, quick-to-setup and easy-to-operate tool to replicate your data to other stores such as BigQuery or Elasticsearch, which works with any size Postgres database.
Similar projects
There are many change-capture projects out there, and several support Postgres.
As an example, pgsink is similar to debezium in performance and durability goals, but with a much simpler setup (no Kafka required). We also bear similarity to Netflix's dblog, with the benefit of being open-source and available for use.
We win in these comparisons when you want a simple, no additional dependencies setup. We also benefit from the sole focus on Postgres instead of many upstream sources, as we can optimise our data-access pattern for large, high-transaction volume Postgres databases. Examples of this are keeping transactions short to help vacuums, and traversing tables using efficient indexes.
This makes pgsink a much safer bet for people managing production critical Postgres databases.
Developing
As an overview of important packages, for those understanding the source code:
-
changelog
the input to sinks, produced by subscription or import -
decode
configures decoding of Postgres types into Golang (text -> int64) -
imports
create, manage and work import jobs, producing changelog entries -
logical
parsing of the pgoutput logical encoding, used by subscription -
sinks
implements different types of sink, from files to Google BigQuery -
subscription
Postgres change capture via replication, generating a changelog
This project comes with a docker-compose development environment. Boot the environment like so:
$ docker-compose up -d
docker-compose up -d
pgsink_prometheus_1 is up-to-date
pgsink_postgres_1 is up-to-date
pgsink_grafana_1 is up-to-date
Then run make recreatedb
to create a pgsink
database. You can now access
your database like so:
$ psql --host localhost --user pgsink pgsink
pgsink=> \q
pgsink will work with this database: try pgsink --sink=file --decode-only
.
Database migrations
We use goose to run database migrations. Create new migrations like so:
$ go run internal/migration/cmd/goose.go --dir internal/migration create create_import_jobs_table go
2019/12/29 14:59:51 Created new file: internal/migration/20191229145951_create_import_jobs_table.go
Running migrations is done using the make target:
$ make migrate structure.sql
$ go run internal/migration/cmd/goose.go --install up
2021/01/09 15:38:29 requested --install, so creating schema 'pgsink'
2021/01/09 15:38:29 goose: no migrations to run. current version: 20210102200953
docker-compose --env-file=/dev/null exec -T postgres pg_dump -U postgres pgsink --schema-only --schema=pgsink >structure.sql
Getting started
Boot a Postgres database, then create an example table.
$ createdb pgsink
$ psql pgsink
psql (11.5)
Type "help" for help.
pgsink=# create table public.example (id bigserial primary key, msg text);
CREATE TABLE
pgsink=# insert into public.example (msg) values ('hello world');
INSERT 1
pgsink will stream these changes from the database and send it to the
configured sink. Changes are expressed as a stream of messages, either a
Schema
that describes the structure of a Postgres table, or a Modification
corresponding to an insert/update/delete of a row in Postgres.
Our example would produce the following modification, where timestamp
is the
time at which the change was committed and sequence
the operation index within
the transaction:
{
"timestamp": "2019-10-04T16:05:55.123456+01:00",
"sequence": 1,
"namespace": "public",
"name": "example",
"before": null,
"after": {
"id": "1",
"msg": "hello world"
}
}
Also sent, arriving before the modification element, will be a schema entry that
describes the public.example
table. We represent these as Avro schemas, built
from the Postgres catalog information.
{
"timestamp": "2019-10-04T16:05:55.123456+01:00",
"schema": {
"namespace": "public.example",
"type": "record",
"name": "value",
"fields": [
{
"name": "id",
"type": ["long", "null"],
"default": null
},
{
"name": "msg",
"type": ["string", "null"],
"default": null
}
]
}
}
Schemas are published whenever we first discover a relation. Use the timestamp field to order each successive schema event to ensure stale messages don't override more recent data.