pinot icon indicating copy to clipboard operation
pinot copied to clipboard

Cleanup the consumer interfaces and legacy code

Open Jackie-Jiang opened this issue 1 year ago • 2 comments

Deprecated:

  • PartitionLevelConsumer: Use PartitionGroupConsumer instead
  • In PartitionGroupConsumer:
    • Remove endOffset from fetchMessages() because offset might not be monotonically increasing for some consumers (e.g. Kinesis), and we should not use end offset to filter messages
  • In MessageBatch:
    • getMessageAtIndex(): Use getStreamMessage() instead
    • getMessageBytesAtIndex(): Use getStreamMessage() instead
    • getMessageLengthAtIndex(): Use getStreamMessage() instead
    • getMessageOffsetAtIndex(): Use getStreamMessage() instead, where offset info is embedded in the StreamMessageMetadata
    • getMetadataAtIndex(): Use getStreamMessage() instead
    • getNextStreamMessageOffsetAtIndex(): Use getOffsetOfNextBatch() instead
    • getNextStreamPartitionMsgOffsetAtIndex(): Use getOffsetOfNextBatch() instead
  • In StreamPartitionMsgOffset:
    • fromString(): Should be a static method

Removed Metadata Extractor interfaces because they are not well designed and not really pluggable (plug at consumer level should be good enough)

Add offset info into RowMetadata:

  • StreamPartitionMsgOffset getOffset()
  • StreamPartitionMsgOffset getNextOffset()

Bugfix:

  • With offset info properly attached to message metadata, we can correctly track the offset for each message. Currently we are using the next offset for each message

Clean up the code to adapt the above changes.

Jackie-Jiang avatar Mar 22 '24 09:03 Jackie-Jiang

Codecov Report

Attention: Patch coverage is 54.02844% with 194 lines in your changes are missing coverage. Please review.

Project coverage is 62.00%. Comparing base (59551e4) to head (082aada). Report is 204 commits behind head on master.

Files Patch % Lines
...apache/pinot/plugin/stream/pulsar/PulsarUtils.java 53.26% 38 Missing and 5 partials :warning:
...lugin/stream/kinesis/KinesisConnectionHandler.java 5.88% 32 Missing :warning:
...java/org/apache/pinot/spi/stream/MessageBatch.java 25.00% 12 Missing and 3 partials :warning:
...e/pinot/plugin/stream/kinesis/KinesisConsumer.java 65.85% 8 Missing and 6 partials :warning:
...a/manager/realtime/RealtimeSegmentDataManager.java 54.16% 8 Missing and 3 partials :warning:
...in/stream/kafka20/KafkaPartitionLevelConsumer.java 68.57% 10 Missing and 1 partial :warning:
...in/stream/kinesis/KinesisPartitionGroupOffset.java 33.33% 10 Missing :warning:
...apache/pinot/spi/stream/StreamMessageMetadata.java 58.33% 10 Missing :warning:
.../pulsar/PulsarPartitionLevelConnectionHandler.java 52.94% 5 Missing and 3 partials :warning:
...ava/org/apache/pinot/spi/stream/StreamMessage.java 36.36% 7 Missing :warning:
... and 13 more
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #12697      +/-   ##
============================================
+ Coverage     61.75%   62.00%   +0.25%     
+ Complexity      207      198       -9     
============================================
  Files          2436     2458      +22     
  Lines        133233   134667    +1434     
  Branches      20636    20812     +176     
============================================
+ Hits          82274    83500    +1226     
- Misses        44911    45021     +110     
- Partials       6048     6146      +98     
Flag Coverage Δ
custom-integration1 <0.01% <0.00%> (-0.01%) :arrow_down:
integration <0.01% <0.00%> (-0.01%) :arrow_down:
integration1 <0.01% <0.00%> (-0.01%) :arrow_down:
integration2 0.00% <0.00%> (ø)
java-11 61.94% <54.02%> (+0.23%) :arrow_up:
java-21 61.87% <54.02%> (+0.24%) :arrow_up:
skip-bytebuffers-false 61.97% <54.02%> (+0.22%) :arrow_up:
skip-bytebuffers-true 61.84% <54.02%> (+34.11%) :arrow_up:
temurin 62.00% <54.02%> (+0.25%) :arrow_up:
unittests 62.00% <54.02%> (+0.25%) :arrow_up:
unittests1 46.73% <45.36%> (-0.16%) :arrow_down:
unittests2 27.95% <43.60%> (+0.21%) :arrow_up:

Flags with carried forward coverage won't be shown. Click here to find out more.

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.

codecov-commenter avatar Mar 22 '24 09:03 codecov-commenter

I've reverted the change of removing separate executor for Kinesis and Pulsar to make this PR only focus on the interface change

Jackie-Jiang avatar Apr 02 '24 21:04 Jackie-Jiang

Thanks @Jackie-Jiang for cleaning up and fixing these interfaces. It helps address issues like #https://github.com/apache/pinot/pull/12602

satishd avatar Apr 18 '24 10:04 satishd