nifi
nifi copied to clipboard
NIFI-4239 - Adding CaptureChangePostgreSQL processor to capture data changes (INSERT/UPDATE/DELETE) from PostgreSQL tables via Logical Replication API
NIFI-4239 - Adding CaptureChangePostgreSQL processor to capture data changes from PostgreSQL tables via Logical Replication API
Authors: Davy Machado [email protected] Gerdan Santos [email protected]
Thank you for submitting a contribution to Apache NiFi.
Please provide a short description of the PR here:
Description of PR
The CaptureChangePostgreSQL processor retrieves Change Data Capture (CDC) events from a PostgreSQL database. Works for PostgreSQL version 10+. Events include INSERT, UPDATE, and DELETE operations and are output as individual flow files ordered by the time at which the operation occurred. This processor uses a Logical Replication Connection to stream data and a SQL Connection to query system views.
This new pull request builds upon the PR #5710.
In order to streamline the review of the contribution we ask you to ensure the following steps have been taken:
For all changes:
-
[X] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message?
-
[X] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
-
[X] Has your PR been rebased against the latest commit within the target branch (typically
main
)? -
[ ] Is your initial contribution a single, squashed commit? Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not
squash
or use--force
when pushing to allow for clean monitoring of changes.
For code changes:
- [X] Have you ensured that the full suite of tests is executed via
mvn -Pcontrib-check clean install
at the rootnifi
folder? - [X] Have you written or updated unit tests to verify your changes?
- [X] Have you verified that the full build is successful on JDK 8?
- [X] Have you verified that the full build is successful on JDK 11?
- [X] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under ASF 2.0?
- [X] If applicable, have you updated the
LICENSE
file, including the mainLICENSE
file undernifi-assembly
? - [X] If applicable, have you updated the
NOTICE
file, including the mainNOTICE
file found undernifi-assembly
? - [X] If adding new Properties, have you added
.displayName
in addition to .name (programmatic access) for each of the new properties?
For documentation related changes:
- [X] Have you ensured that format looks appropriate for the output in which it is rendered?
Note:
Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible.
@joewitt @mattyb149 @exceptionfactory
Hey guys, this is the new PR for NIFI-4239 (CaptureChangePostgreSQL processor).
Sorry for the mess in #5710 =/
Please, run the workflow. Thanks!
Now,after two year's!!! We will go on? I believe!!!!
@exceptionfactory thanks for review!
I'll check all suggestions and apply the necessary adjustments as soon as possible.
About the RecordWriter Service, you're right, there are many benefits, but we see that isn't essential for now. There are a lot of processors that don't use services yet. So, as this improvement requires lots of change, we'll leave that for a future processor's version.
About the RecordWriter Service, you're right, there are many benefits, but we see that isn't essential for now. There are a lot of processors that don't use services yet. So, as this improvement requires lots of change, we'll leave that for a future processor's version.
Thanks for the reply @davyam. Although record-oriented support may not be required in some cases, introducing this processor without support for a RecordWriter would lock in support for the JSON-specific output.
Understanding that it requires some changes, and that a lot of work has already gone into this component, it is also important to consider community maintainability. Components designed for more narrow use cases are difficult to maintain over time, so that's why it can be more difficult to introduce new components. Generating large numbers of small FlowFiles leads to poor flow performance, so that's why it is more important to implement that capability initially, as opposed to introducing it later. If you need some assistance on integration with record handling, perhaps this could be an opportunity for additional collaboration on this PR.
Generating large numbers of small FlowFiles leads to poor flow performance, so that's why it is more important to implement that capability initially, as opposed to introducing it later. If you need some assistance on integration with record handling, perhaps this could be an opportunity for additional collaboration on this PR.
@exceptionfactory I understand. OK, I'll analyze the effort required to use the RecordWriter Service.
I have a problem. I have a clean nifi running through docker, I use these settings for bootstrap.config
JVM memory settings java.arg.2=-Xms2048m java.arg.3=-Xmx4096m
Initially, docker stat displays: CONTAINER ID NAME CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PINS 5017cf957d26 nifi_docker 0.60% 2.185GiB / 7.771GiB 28.12% 40.5 kB / 121kB 757MB / 995kB 109
But as soon as I start the process for a table weighing 370 MB, docker stat displays: CONTAINER ID NAME CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PINS 5017cf957d26 nifi_docker 26.17% 4.985GiB / 7.771GiB 64.15% 1.15 GB / 26.4 MB 767MB / 1.01 MB 113
And also I'm starting to get the error: Java heap space
Here are the settings of my process:
I have a problem.
Hey, @djshura2008 you are using a previous CaptureChangePostgreSQL processor version, maybe from PR 4065. I know that because we don't have anymore the "Take a initial snapshot?" property. Since then, a lot of improvements was done, so please repeat the tests with the last version. Thanks for your support.
I have a problem.
Hey, @djshura2008 you are using a previous CaptureChangePostgreSQL processor version, maybe from PR 4065. I know that because we don't have anymore the "Take a initial snapshot?" property. Since then, a lot of improvements was done, so please repeat the tests with the last version. Thanks for your support.
Okay, but how do you take an initial snapshot of database and immediately start the cdc process?
Okay, but how do you take an initial snapshot of database and immediately start the cdc process?
There were several problems with the snapshot (delay, memory, etc.). Then, we decided to remove this option when the code was refactored. Our goal is to capture the changes. But you still can do that with a backup/dump/copy before start the processor.
Hi @davyam - May I know that when we can expect the "CaptureChangePostgreSQL" processor in actual released version of Nifi. We have some requirement where we need to use this feature of Nifi. Thank you.
Hi @davyam - May I know that when we can expect the "CaptureChangePostgreSQL" processor in actual released version of Nifi. We have some requirement where we need to use this feature of Nifi. Thank you.
Hi @MaheshKungaria, we expect to complete the improvements requested by reviewers until the end of the year. So, we believe that in the NiFi v1.19 our processor will be there.
Thanks for the confirmation @davyam