nifi
nifi copied to clipboard
NIFI-9822 - ConsumeKafkaRecord allows writing out Kafka record key
Summary
Tracking
Please complete the following tracking steps prior to pull request creation.
Issue Tracking
- [X] Apache NiFi Jira issue created
Pull Request Tracking
- [X] Pull Request title starts with Apache NiFi Jira issue number, such as
NIFI-00000
- [X] Pull Request commit message starts with Apache NiFi Jira issue number, as such
NIFI-00000
Pull Request Formatting
- [X] Pull Request based on current revision of the
main
branch - [X] Pull Request refers to a feature branch with one commit containing changes
Verification
Please indicate the verification steps performed prior to pull request creation.
Build
- [X] Build completed using
mvn clean install -P contrib-check
- [X] JDK 8
- [X] JDK 11
- [X] JDK 17
Licensing
- [ ] New dependencies are compatible with the Apache License 2.0 according to the License Policy
- [ ] New dependencies are documented in applicable
LICENSE
andNOTICE
files
Documentation
- [X] Documentation formatting appears as expected in rendered files
Thanks for pushing this @greyp9 ! I think this gets us really far down field. I left some comments inline, mostly around terminology / naming conventions, and documentation. I also think the tests that were added are really helpful, but I feel like some of the code is super repetitive and some of the sets of assertions can be easily refactored into a method so that the code is a little cleaner & less verbose - unless I missed some discrepancy between very similar chunks of code that are actually different.
Just pushed another commit; it should address the issues you noted.
There were some efficiencies to be gained in the unit tests, but the variables needed to verify behavior made the suggested refactor difficult. Hope the updates reach the needed bar.
Thanks for updating @greyp9 . Trying this out again. I just tried sending the following content via PublishKafkaRecord_2_6:
{
"key": {
"type": "person"
},
"value": {
"name": "Mark",
"number": 49
},
"headers": {
"headerA": "headerAValue"
}
}
I set Publish Strategy to "Use Wrapper" and used a JsonTreeReader as the record reader. But I encountered a NullPointerException:
2022-08-22 15:58:22,792 ERROR [Timer-Driven Process Thread-2] o.a.n.p.k.pubsub.PublishKafkaRecord_2_6 PublishKafkaRecord_2_6[id=c71d2b54-0182-1000-4979-53ab4cc8d555] Failed to send StandardFlowFileRecord[uuid=e8d03807-fa13-4620-be6a-72e014ac0838,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1661197835339-1, container=default, section=1], offset=3200, length=145],offset=0,name=e8d03807-fa13-4620-be6a-72e014ac0838,size=145] to Kafka
java.lang.NullPointerException: null
at org.apache.nifi.processors.kafka.pubsub.PublisherLease.toWrapperRecord(PublisherLease.java:262)
at org.apache.nifi.processors.kafka.pubsub.PublisherLease.publish(PublisherLease.java:210)
at org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_2_6$1.process(PublishKafkaRecord_2_6.java:521)
at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2693)
at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2661)
at org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_2_6.onTrigger(PublishKafkaRecord_2_6.java:513)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1357)
at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:246)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:102)
at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.runAndReset(Unknown Source)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
One other thing that I noticed, though it's minor: When I change the "Publish Strategy" to "Use Wrapper" I'm given the option to configure the "Record Key Writer" property. But this is way down the list, about 8 or 10 properties down, so it's not at all obvious that it's made available. Probably want to move the "Record Key Writer" just after the "Publish Strategy" property sends it depends on it.
I also tried changing the "Publish Strategy" to "Use Record Content as Value" so that I could set the "Key Field." I then set the Key Field to key
. And then changed the Publish Strategy back to "Use Wrapper".
This time, because the Key Field was specified, it sent the data. But what it sent was not correct.
It sent the key correctly. But for the Kafka message, it sent the entire payload, not just the value
section.
And it sent no headers. So it appears to behave as if the Publish Strategy still was set to "Use Record Content as Value"
Thanks for updating @greyp9 . Trying this out again. I just tried sending the following content via PublishKafkaRecord_2_6:
I set Publish Strategy to "Use Wrapper" and used a JsonTreeReader as the record reader. But I encountered a NullPointerException:
Updated to validate content of message key.
One other thing that I noticed, though it's minor: When I change the "Publish Strategy" to "Use Wrapper" I'm given the option to configure the "Record Key Writer" property. But this is way down the list, about 8 or 10 properties down, so it's not at all obvious that it's made available. Probably want to move the "Record Key Writer" just after the "Publish Strategy" property sends it depends on it.
Moved to just after "Publish Strategy".
{ "key": { "type": "person" }, "value": { "name": "Mark", "number": 49 }, "headers": { "headerA": "headerAValue" } }
Thanks. There are enough permutations related to these changes such that I'm not sure what should be emitted for all of them. I've made some simplifying assumptions; those may need adjustments.
Maybe a concrete example would help. Given the record above, and the processor PublishKafkaRecord_2_6:
[Processor Property] PublishStrategy
if (Use Wrapper) {
Kafka Record:
Key = ?
Value = ?
Headers = ?
} else {
Kafka Record:
Key = ?
Value = ?
Headers = ?
}
[Processor Property] Attributes To Send as Headers (Regex)
if (Use Wrapper) {
send attributes in wrapper record (do not add to kafka headers)
} else {
send attributes in kafka headers
}
is this high-level logic valid?
[Processor Property] Message Key Field
if (Use Wrapper) {
ignore (null key)
} else {
send key (if record field exists) in Kafka key
}
is this right?
@greyp9 So, given the above (i will address in reverse order because I think you started with the hardest to describe and got easier as you went down the list) :) ...
The last point, about "Message Key Field" property I think is perfectly accurate.
As for the headers:
- I think what you said is accurate, but to clarify:
- If (Use Wrapper) - the headers to send would be a single header. Its name would be "headerA" and its value would be "headerAValue". FlowFile attributes would not be sent as headers.
- Else, the headers would be any FlowFile attribute that matches the "Attributes to Send as Headers (Regex)" property
Now, as for the other...
if (Use Wrapper) {
Kafka Record:
Key = { "type": "person" }
Value = { "name": "Mark", "number": 49 }
Headers = A single header with name "headerA", value "headerAValue"
} else {
Kafka Record:
Key = <Depends on the value of the 'Message Key Field' property>
Value = <The entire JSON payload. I.e.,:>
{
"key": {
"type": "person"
},
"value": {
"name": "Mark",
"number": 49
},
"headers": {
"headerA": "headerAValue"
}
}
Headers = <Whatever matches the 'Attributes to Send as Headers (Regex)' property>
}
So, in short, if Use Wrapper, the incoming FlowFile must have 3 fields: Key. This becomes the kafka message key. Value. This becomes the contents of the kafka message. Headers. This becomes the headers attached to the kafka message. Any other fields, such as metadata, would be ignored.
If NOT using wrapper, it would function as it always has. The entire contents of the Record go as the kafka message payload. The key and headers are determined based on the configured "Message Key Field" and "Attributes to Send as Headers (Regex)" properties.
In this way, if the wrapper is being used by both, we can have ConsumeKafkaRecord_2_6 -> PublishKafkaRecord_2_6 and this will pull kafka messages from 1 topic and push to another topic. Each message in the destination would have the same key, value, and headers as the original mressage.
Does that make sense? Or have I confused things even worse? :)
Does that make sense? Or have I confused things even worse? :)
Much clearer now; thanks. I've added a new unit test that calls out the output differences between the publisher strategies VALUE and WRAPPER, and I leveraged this to adjust the wrapper to fit the expected behavior. (And picked up some useful jackson knowledge along the way.) :)
It should be easier to add test cases if we notice more discrepancies from the intended outputs.