nifi icon indicating copy to clipboard operation
nifi copied to clipboard

NIFI-4239 - Adding CaptureChangePostgreSQL processor to capture data changes (INSERT/UPDATE/DELETE) from PostgreSQL tables via Logical Replication API

Open davyam opened this issue 2 years ago • 12 comments

NIFI-4239 - Adding CaptureChangePostgreSQL processor to capture data changes from PostgreSQL tables via Logical Replication API

Authors: Davy Machado [email protected] Gerdan Santos [email protected]

Thank you for submitting a contribution to Apache NiFi.

Please provide a short description of the PR here:

Description of PR

The CaptureChangePostgreSQL processor retrieves Change Data Capture (CDC) events from a PostgreSQL database. Works for PostgreSQL version 10+. Events include INSERT, UPDATE, and DELETE operations and are output as individual flow files ordered by the time at which the operation occurred. This processor uses a Logical Replication Connection to stream data and a SQL Connection to query system views.

This new pull request builds upon the PR #5710.

In order to streamline the review of the contribution we ask you to ensure the following steps have been taken:

For all changes:

  • [X] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message?

  • [X] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.

  • [X] Has your PR been rebased against the latest commit within the target branch (typically main)?

  • [ ] Is your initial contribution a single, squashed commit? Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not squash or use --force when pushing to allow for clean monitoring of changes.

For code changes:

  • [X] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
  • [X] Have you written or updated unit tests to verify your changes?
  • [X] Have you verified that the full build is successful on JDK 8?
  • [X] Have you verified that the full build is successful on JDK 11?
  • [X] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under ASF 2.0?
  • [X] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
  • [X] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
  • [X] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?

For documentation related changes:

  • [X] Have you ensured that format looks appropriate for the output in which it is rendered?

Note:

Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible.

davyam avatar May 18 '22 00:05 davyam

@joewitt @mattyb149 @exceptionfactory

Hey guys, this is the new PR for NIFI-4239 (CaptureChangePostgreSQL processor).

Sorry for the mess in #5710 =/

Please, run the workflow. Thanks!

davyam avatar May 18 '22 00:05 davyam

Now,after two year's!!! We will go on? I believe!!!!

gerdansantos avatar May 18 '22 12:05 gerdansantos

@exceptionfactory thanks for review!

I'll check all suggestions and apply the necessary adjustments as soon as possible.

About the RecordWriter Service, you're right, there are many benefits, but we see that isn't essential for now. There are a lot of processors that don't use services yet. So, as this improvement requires lots of change, we'll leave that for a future processor's version.

davyam avatar May 18 '22 15:05 davyam

About the RecordWriter Service, you're right, there are many benefits, but we see that isn't essential for now. There are a lot of processors that don't use services yet. So, as this improvement requires lots of change, we'll leave that for a future processor's version.

Thanks for the reply @davyam. Although record-oriented support may not be required in some cases, introducing this processor without support for a RecordWriter would lock in support for the JSON-specific output.

Understanding that it requires some changes, and that a lot of work has already gone into this component, it is also important to consider community maintainability. Components designed for more narrow use cases are difficult to maintain over time, so that's why it can be more difficult to introduce new components. Generating large numbers of small FlowFiles leads to poor flow performance, so that's why it is more important to implement that capability initially, as opposed to introducing it later. If you need some assistance on integration with record handling, perhaps this could be an opportunity for additional collaboration on this PR.

exceptionfactory avatar May 18 '22 16:05 exceptionfactory

Generating large numbers of small FlowFiles leads to poor flow performance, so that's why it is more important to implement that capability initially, as opposed to introducing it later. If you need some assistance on integration with record handling, perhaps this could be an opportunity for additional collaboration on this PR.

@exceptionfactory I understand. OK, I'll analyze the effort required to use the RecordWriter Service.

davyam avatar May 18 '22 16:05 davyam

I have a problem. I have a clean nifi running through docker, I use these settings for bootstrap.config

JVM memory settings java.arg.2=-Xms2048m java.arg.3=-Xmx4096m

Initially, docker stat displays: CONTAINER ID NAME CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PINS 5017cf957d26 nifi_docker 0.60% 2.185GiB / 7.771GiB 28.12% 40.5 kB / 121kB 757MB / 995kB 109

But as soon as I start the process for a table weighing 370 MB, docker stat displays: CONTAINER ID NAME CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PINS 5017cf957d26 nifi_docker 26.17% 4.985GiB / 7.771GiB 64.15% 1.15 GB / 26.4 MB 767MB / 1.01 MB 113

And also I'm starting to get the error: Java heap space

Here are the settings of my process: изображение изображение изображение изображение

kuleshov01 avatar May 20 '22 06:05 kuleshov01

I have a problem.

Hey, @djshura2008 you are using a previous CaptureChangePostgreSQL processor version, maybe from PR 4065. I know that because we don't have anymore the "Take a initial snapshot?" property. Since then, a lot of improvements was done, so please repeat the tests with the last version. Thanks for your support.

davyam avatar May 22 '22 19:05 davyam

I have a problem.

Hey, @djshura2008 you are using a previous CaptureChangePostgreSQL processor version, maybe from PR 4065. I know that because we don't have anymore the "Take a initial snapshot?" property. Since then, a lot of improvements was done, so please repeat the tests with the last version. Thanks for your support.

Okay, but how do you take an initial snapshot of database and immediately start the cdc process?

kuleshov01 avatar May 23 '22 05:05 kuleshov01

Okay, but how do you take an initial snapshot of database and immediately start the cdc process?

There were several problems with the snapshot (delay, memory, etc.). Then, we decided to remove this option when the code was refactored. Our goal is to capture the changes. But you still can do that with a backup/dump/copy before start the processor.

davyam avatar May 23 '22 18:05 davyam

Hi @davyam - May I know that when we can expect the "CaptureChangePostgreSQL" processor in actual released version of Nifi. We have some requirement where we need to use this feature of Nifi. Thank you.

MaheshKungaria avatar Oct 04 '22 12:10 MaheshKungaria

Hi @davyam - May I know that when we can expect the "CaptureChangePostgreSQL" processor in actual released version of Nifi. We have some requirement where we need to use this feature of Nifi. Thank you.

Hi @MaheshKungaria, we expect to complete the improvements requested by reviewers until the end of the year. So, we believe that in the NiFi v1.19 our processor will be there.

davyam avatar Oct 05 '22 12:10 davyam

Thanks for the confirmation @davyam

MaheshKungaria avatar Oct 06 '22 06:10 MaheshKungaria