connect
connect copied to clipboard
feat: add `sql_outbox` input
Adds a sql_outbox input that consumes messages from a database table implementing the transaction outbox pattern.
- Outbox items that are processed successfully are deleted from the table.
- Outbox items that fail to be processed will have a retry counter incremented on their corresponding database row.
This current implementations makes hard assumptions about the implementation of this pattern. Namely:
- The outbox table must have an "id" column: We need to target outbox items for deletions and updates based on their processing outcomes
- The outbox table must have a "retry count" column: For failed items we are opting to increment a counter in the database table for bookkeeping. This counter can be used to skip over outbox items that will terminally fail.
- The processing strategy is consume-delete-update.
Local testing
PostgreSQL
You can use the docker postgres image to setup a database with sample data very quickly for the purpose of testing this input.
Create a file called docker-entrypoint-initdb.d/create-outbox.sql:
ALTER DATABASE postgres
SET log_statement = 'all';
CREATE TABLE outbox_items (
id BIGINT GENERATED BY DEFAULT AS IDENTITY,
created_at TIMESTAMP DEFAULT NOW(),
deliverable_at TIMESTAMP DEFAULT NOW(),
payload JSON DEFAULT '{"message": "Hello, World!"}'::json,
attempt_count INT DEFAULT 0,
PRIMARY KEY (id)
);
INSERT INTO outbox_items SELECT id FROM generate_series(1, 1000) AS g (id);
Then run:
docker run \
--name benthospg \
-p 5432:5432 \
-e POSTGRES_HOST_AUTH_METHOD=trust \
-v "${PWD}/docker-entrypoint-initdb.d:/docker-entrypoint-initdb.d" \
--rm \
postgres:14
The database server should be up and running and bootstrapped with an outbox table containing a mock entries.
In a different terminal, run a Benthos pipeline with this config:
rate_limit_resources:
- label: outbox_poll_rate
local:
count: 1
interval: 10s
input:
sql_outbox:
driver: postgres
dsn: postgres://postgres@localhost:5432/postgres?sslmode=disable
table: outbox_items
columns:
- payload
- created_at
- id
where: deliverable_at <= now()
id_column: id
retry_count_column: attempt_count
max_retries: 3
item_count: 10
rate_limit: outbox_poll_rate
output:
stdout: {}
Given that the postgres server is setup to log all statements, you should see database transactions from the benthos process that are consuming the outbox table.
MySQL
You can use the mysql docker image to setup a database with sample data very quickly for the purpose of testing this input.
Create a file called docker-entrypoint-initdb.d/create-outbox.sql:
CREATE TABLE outbox_items (
id BIGINT NOT NULL AUTO_INCREMENT,
created_at TIMESTAMP DEFAULT NOW(),
deliverable_at TIMESTAMP DEFAULT NOW(),
attempt_count INT DEFAULT 0,
PRIMARY KEY (id)
);
ALTER TABLE outbox_items
ADD COLUMN payload JSON NOT NULL DEFAULT ( JSON_OBJECT() );
delimiter //
CREATE PROCEDURE populate (IN num INT)
BEGIN
DECLARE i INT DEFAULT 0;
WHILE i < num do
INSERT INTO outbox_items () VALUES ();
SET i = i + 1;
END WHILE;
END //
delimiter ;
CALL populate (1000);
Then run:
docker run \
--name benthosmysql \
-p 3306:3306 \
-e MYSQL_DATABASE=local \
-e MYSQL_ALLOW_EMPTY_PASSWORD=true \
-v "${PWD}/docker-entrypoint-initdb.d:/docker-entrypoint-initdb.d" \
--rm \
mysql:8 --general-log=1 --general-log-file=/tmp/queries.log --log-output=FILE
The database server should be up and running and bootstrapped with an outbox table containing a mock entries.
In a different terminal, run a Benthos pipeline with this config:
rate_limit_resources:
- label: outbox_poll_rate
local:
count: 1
interval: 10s
input:
sql_outbox:
driver: mysql
dsn: root@tcp(localhost:3306)/local
table: outbox_items
columns:
- payload
- created_at
- id
where: deliverable_at <= now()
id_column: id
retry_count_column: attempt_count
max_retries: 3
item_count: 10
rate_limit: outbox_poll_rate
output:
stdout: {}
Given that the mysql server is setup to log all statements, you should see database transactions from the benthos process that are consuming the outbox table using:
docker exec benthosmysql tail -f /tmp/queries.log
When you are done you can terminate mysql with:
docker stop benthosmysql
SQLite
Create an initialization file called create-outbox.sql with the following:
DROP TABLE IF EXISTS outbox_items;
CREATE TABLE outbox_items (
id INTEGER PRIMARY KEY AUTOINCREMENT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
deliverable_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
payload TEXT DEFAULT '{"message": "Hello, World!"}',
attempt_count INT DEFAULT 0,
discard INT DEFAULT 0
);
INSERT INTO outbox_items (discard) SELECT value FROM generate_series(1, 1000);
Create a database initialized with the above script:
sqlite3 local.db < create-outbox.sql
Run a Benthos pipeline with the following config:
rate_limit_resources:
- label: outbox_poll_rate
local:
count: 1
interval: 10s
input:
sql_outbox:
driver: sqlite
dsn: file:./local.db
table: outbox_items
columns:
- payload
- created_at
- id
where: deliverable_at <= CURRENT_TIMESTAMP
id_column: id
retry_count_column: attempt_count
max_retries: 3
item_count: 10
rate_limit: outbox_poll_rate
output:
stdout: {}
Monitor the size of the table while the pipeline is running with:
sqlite3 local.db "select count(*) from outbox_items;"
This feature seems pretty nice. Did you consider use some kind of “soft-delete”? Perhaps run a custom query instead just delete (imagine a field status tha t can be “active” or “delete”)
Hey @peczenyj, I did consider that and the implementation of this plugin can be extended to multiple strategies including the one you mentioned. Ultimately, I decided not to add support for a soft-delete strategy because the outbox table is meant to be a durable buffer so to speak. You're meant to transfer its contents to a queue or other durable destination that supports multiple consumers and large volumes of events.
If you feel we should add a soft-delete strategy, do you mind describing to me your use case?
Sure @disintegrator
I can se few use cases, and they are debatable.
Imagine we have a column “status” and it can be “active” or “deleted” (among others, as an enum)
- To be easier replay some data.
for instance, we consume few entries and we detect a bug in another part of the system (wrong path, missing credentials etc). By update the status we allow benthos replay some entries again.
- To be possible audit
The history (status, retries, etc) is present on the same table
- Plug it on existing workflows that already uses a state machine in the database.
That perhaps is the most interesting for me. I know few programs that uses the database like this. Most of then are Legacy but no plans to change it any time soon
When I said it was debatable, it is because we have workarounds: I can perform a custom sql update on the id when the output finish with success, for instance.
For testing, it is easier to use sqlite files. Just copy form the “master”, run and compare them before.
For audit we have logs, metrics, etc
however I have a final argument: adding such feature helps keep the configuration closer
For instance, with soft delete (by running a custom query or changing one particular field where id =?) I have everything close in the same input resource. If I have instead different inputs resources, I need to verify if the data came from db to perform such update. But if I may use GCS as an alternative input, I don’t need it. However, this scenario is rare IMHO, and people can write the best workaround on such case
that is my 2 cents
BTW I am not a maintainer of this project, just a regular user, I just give my impressions :)
Thanks for that detailed comment @peczenyj. I'll look at the changes needed to support an additional soft-delete strategy. I will continue to discourage using this approach though. You're not meant to use the outbox table in queries in your app. You write an event to it as part of a db transaction and move on i.e. it's an event buffer that you write to and drain. In my world, we are writing hundreds of thousands or millions of rows to it and it can cause considerable database churn if it keeps growing. We don't care about anything in it once we've published rows to GCP PubSub (and BigQuery from there).
I'm closing this PR because after researching subject there appears to be a mix of opinions around handling deletions, retries and deadlettering across various outbox solutions. Maybe I'll revisit this plugin if I can come up with an ergonomic API. Until then I invite folks to copy the code in this PR and use it to build a custom Benthos binary.