nifi icon indicating copy to clipboard operation
nifi copied to clipboard

NIFI-9822 - ConsumeKafkaRecord allows writing out Kafka record key

Open greyp9 opened this issue 2 years ago • 7 comments

Summary

NIFI-9822

Tracking

Please complete the following tracking steps prior to pull request creation.

Issue Tracking

Pull Request Tracking

  • [X] Pull Request title starts with Apache NiFi Jira issue number, such as NIFI-00000
  • [X] Pull Request commit message starts with Apache NiFi Jira issue number, as such NIFI-00000

Pull Request Formatting

  • [X] Pull Request based on current revision of the main branch
  • [X] Pull Request refers to a feature branch with one commit containing changes

Verification

Please indicate the verification steps performed prior to pull request creation.

Build

  • [X] Build completed using mvn clean install -P contrib-check
    • [X] JDK 8
    • [X] JDK 11
    • [X] JDK 17

Licensing

  • [ ] New dependencies are compatible with the Apache License 2.0 according to the License Policy
  • [ ] New dependencies are documented in applicable LICENSE and NOTICE files

Documentation

  • [X] Documentation formatting appears as expected in rendered files

greyp9 avatar Jun 16 '22 16:06 greyp9

Thanks for pushing this @greyp9 ! I think this gets us really far down field. I left some comments inline, mostly around terminology / naming conventions, and documentation. I also think the tests that were added are really helpful, but I feel like some of the code is super repetitive and some of the sets of assertions can be easily refactored into a method so that the code is a little cleaner & less verbose - unless I missed some discrepancy between very similar chunks of code that are actually different.

Just pushed another commit; it should address the issues you noted.

There were some efficiencies to be gained in the unit tests, but the variables needed to verify behavior made the suggested refactor difficult. Hope the updates reach the needed bar.

greyp9 avatar Jul 15 '22 19:07 greyp9

Thanks for updating @greyp9 . Trying this out again. I just tried sending the following content via PublishKafkaRecord_2_6:

{
  "key": {
    "type": "person"
  },
  "value": {
    "name": "Mark",
    "number": 49
  },
  "headers": {
     "headerA": "headerAValue"
  }
}

I set Publish Strategy to "Use Wrapper" and used a JsonTreeReader as the record reader. But I encountered a NullPointerException:

2022-08-22 15:58:22,792 ERROR [Timer-Driven Process Thread-2] o.a.n.p.k.pubsub.PublishKafkaRecord_2_6 PublishKafkaRecord_2_6[id=c71d2b54-0182-1000-4979-53ab4cc8d555] Failed to send StandardFlowFileRecord[uuid=e8d03807-fa13-4620-be6a-72e014ac0838,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1661197835339-1, container=default, section=1], offset=3200, length=145],offset=0,name=e8d03807-fa13-4620-be6a-72e014ac0838,size=145] to Kafka
java.lang.NullPointerException: null
	at org.apache.nifi.processors.kafka.pubsub.PublisherLease.toWrapperRecord(PublisherLease.java:262)
	at org.apache.nifi.processors.kafka.pubsub.PublisherLease.publish(PublisherLease.java:210)
	at org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_2_6$1.process(PublishKafkaRecord_2_6.java:521)
	at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2693)
	at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2661)
	at org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_2_6.onTrigger(PublishKafkaRecord_2_6.java:513)
	at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
	at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1357)
	at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:246)
	at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:102)
	at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
	at java.base/java.util.concurrent.FutureTask.runAndReset(Unknown Source)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)

One other thing that I noticed, though it's minor: When I change the "Publish Strategy" to "Use Wrapper" I'm given the option to configure the "Record Key Writer" property. But this is way down the list, about 8 or 10 properties down, so it's not at all obvious that it's made available. Probably want to move the "Record Key Writer" just after the "Publish Strategy" property sends it depends on it.

markap14 avatar Aug 22 '22 20:08 markap14

I also tried changing the "Publish Strategy" to "Use Record Content as Value" so that I could set the "Key Field." I then set the Key Field to key. And then changed the Publish Strategy back to "Use Wrapper". This time, because the Key Field was specified, it sent the data. But what it sent was not correct. It sent the key correctly. But for the Kafka message, it sent the entire payload, not just the value section. And it sent no headers. So it appears to behave as if the Publish Strategy still was set to "Use Record Content as Value"

markap14 avatar Aug 22 '22 20:08 markap14

Thanks for updating @greyp9 . Trying this out again. I just tried sending the following content via PublishKafkaRecord_2_6:

I set Publish Strategy to "Use Wrapper" and used a JsonTreeReader as the record reader. But I encountered a NullPointerException:

Updated to validate content of message key.

One other thing that I noticed, though it's minor: When I change the "Publish Strategy" to "Use Wrapper" I'm given the option to configure the "Record Key Writer" property. But this is way down the list, about 8 or 10 properties down, so it's not at all obvious that it's made available. Probably want to move the "Record Key Writer" just after the "Publish Strategy" property sends it depends on it.

Moved to just after "Publish Strategy".

greyp9 avatar Aug 23 '22 18:08 greyp9

{
  "key": {
    "type": "person"
  },
  "value": {
    "name": "Mark",
    "number": 49
  },
  "headers": {
     "headerA": "headerAValue"
  }
}

Thanks. There are enough permutations related to these changes such that I'm not sure what should be emitted for all of them. I've made some simplifying assumptions; those may need adjustments.

Maybe a concrete example would help. Given the record above, and the processor PublishKafkaRecord_2_6:

[Processor Property] PublishStrategy

if (Use Wrapper) {
    Kafka Record:
    Key = ?
    Value = ?
    Headers = ?
} else {
    Kafka Record:
    Key = ?
    Value = ?
    Headers = ?
}

[Processor Property] Attributes To Send as Headers (Regex)

if (Use Wrapper) {
    send attributes in wrapper record (do not add to kafka headers)
} else {
    send attributes in kafka headers
}

is this high-level logic valid?


[Processor Property] Message Key Field

if (Use Wrapper) {
    ignore (null key)
} else {
    send key (if record field exists) in Kafka key
}

is this right?

greyp9 avatar Aug 23 '22 19:08 greyp9

@greyp9 So, given the above (i will address in reverse order because I think you started with the hardest to describe and got easier as you went down the list) :) ...

The last point, about "Message Key Field" property I think is perfectly accurate.

As for the headers:

  • I think what you said is accurate, but to clarify:
  • If (Use Wrapper) - the headers to send would be a single header. Its name would be "headerA" and its value would be "headerAValue". FlowFile attributes would not be sent as headers.
  • Else, the headers would be any FlowFile attribute that matches the "Attributes to Send as Headers (Regex)" property

Now, as for the other...

if (Use Wrapper) {
    Kafka Record:
    Key = { "type": "person" }
    Value = { "name": "Mark", "number": 49 }
    Headers = A single header with name "headerA", value "headerAValue"
} else {
    Kafka Record:
    Key = <Depends on the value of the 'Message Key Field' property>
    Value = <The entire JSON payload. I.e.,:>
    {
                       "key": {
                           "type": "person"
                        },
                       "value": {
                            "name": "Mark",
                            "number": 49
                       },
                       "headers": {
                             "headerA": "headerAValue"
                       }
     }
    Headers = <Whatever matches the 'Attributes to Send as Headers (Regex)' property>
}

So, in short, if Use Wrapper, the incoming FlowFile must have 3 fields: Key. This becomes the kafka message key. Value. This becomes the contents of the kafka message. Headers. This becomes the headers attached to the kafka message. Any other fields, such as metadata, would be ignored.

If NOT using wrapper, it would function as it always has. The entire contents of the Record go as the kafka message payload. The key and headers are determined based on the configured "Message Key Field" and "Attributes to Send as Headers (Regex)" properties.

In this way, if the wrapper is being used by both, we can have ConsumeKafkaRecord_2_6 -> PublishKafkaRecord_2_6 and this will pull kafka messages from 1 topic and push to another topic. Each message in the destination would have the same key, value, and headers as the original mressage.

Does that make sense? Or have I confused things even worse? :)

markap14 avatar Aug 23 '22 20:08 markap14

Does that make sense? Or have I confused things even worse? :)

Much clearer now; thanks. I've added a new unit test that calls out the output differences between the publisher strategies VALUE and WRAPPER, and I leveraged this to adjust the wrapper to fit the expected behavior. (And picked up some useful jackson knowledge along the way.) :)

It should be easier to add test cases if we notice more discrepancies from the intended outputs.

greyp9 avatar Aug 25 '22 20:08 greyp9