pinot icon indicating copy to clipboard operation
pinot copied to clipboard

Extract record keys, headers and metadata from Stream sources

Open navina opened this issue 3 years ago • 4 comments

Design doc: https://docs.google.com/document/d/1kTUfBud1SBSh_703mvu6ybkbIwiKKH9eXpdcpEmhC2E/edit

This is an extension of PR #9096

Motivation

Most stream systems provide a message envelope, which encapsulates the record payload, along with record headers, keys and other system-specific metadata For e.g:

  1. Kafka allows keyed records and additionally, provides headers
  2. Kinesis requires keyed records and includes some additional metadata such as sequenceId etc
  3. Pulsar also supports keyed records and allows including arbitrary properties.
  4. Pubsub supports keyed messages, along with user-defined attributes and message metadata.

Today, Pinot drops everything from the payload, other than the record value itself. Hence, there needs to be a way to extract these values and present them in the Pinot table as regular columns (of course, it has to be defined in the pinot schema).

This can be very useful for the Pinot user as they don't have to "pre-process" the stream to make the record metadata available in the data payload. It also prevents custom solutions (such as this).

Context

Want to clarify the terminology here. Typically, in most streaming systems, a record is composed of the following:

  1. Record key - usually, a string, although kafka allows any type (today, pinot-kafka connector assumes the key to always be a key)
  2. Record value - actual data paylaod. Pinot extract only this value and decodes it.
  3. Record headers - these are user-defined record header that can be specific to the publishing application. Typically, headers are meant to be efficient and small. For example, in Kafka , it allows <String, byte[]>. technically, byte[] can be anything and we can make a call on whether to support arbitrary header value types or not.
  4. Record Metadata - these may or may not be included in the record payload and it is system-defined. For example, for message identifiers, kinesis has sequenceId, kafka has offset, pubsub has messageId etc. While these may not be useful for the user-facing application, it comes-in handy for debugging.

What does this PR do?

This PR attempts to extract key, header and other metadata from any supported streaming connector. This feature is opt-in, meaning it can be enabled by setting stream.$streamType.metadata.populate as true

please note:

  1. I am in the process of adding some unit tests. I have tested with a pinot realtime quickstart. Need to do some more cleanup.
  2. For whatever reason, the integration tests fail in the CI pipeline here, where as it runs fine on my laptop. Still fixing forward.
  3. Documentation for this feature will follow after this PR is merged.

For Reviewers, things to discuss:

  1. In the current patch, the record key (when available) is extracted as __key column , where as headers are extracted as header$<HEADER_KEY_NAME> . Does this sound like a good convention to follow for all stream connectors -> Header columns will always be prefixed with header$ and any other metadata such as key or offset will be prefixed as __
  2. In MessageBatch, I have marked one of the methods as @Deprecated as I am hoping to eventually eliminate the need for typed interface there. The current changes are backwards compatible. Let me know if there is a better way.

navina avatar Aug 16 '22 22:08 navina

Codecov Report

Merging #9224 (92de7ca) into master (83b7f15) will increase coverage by 0.02%. The diff coverage is 61.80%.

@@             Coverage Diff              @@
##             master    #9224      +/-   ##
============================================
+ Coverage     69.89%   69.91%   +0.02%     
- Complexity     4742     4822      +80     
============================================
  Files          1910     1914       +4     
  Lines        101787   101886      +99     
  Branches      15445    15457      +12     
============================================
+ Hits          71139    71231      +92     
- Misses        25628    25630       +2     
- Partials       5020     5025       +5     
Flag Coverage Δ
integration1 26.10% <18.05%> (+0.06%) :arrow_up:
integration2 24.79% <20.13%> (+0.08%) :arrow_up:
unittests1 67.18% <67.05%> (+0.03%) :arrow_up:
unittests2 15.54% <18.75%> (+0.03%) :arrow_up:

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...lugin/stream/kafka20/server/KafkaDataProducer.java 33.33% <0.00%> (-24.36%) :arrow_down:
...rg/apache/pinot/spi/stream/StreamDataProducer.java 32.35% <0.00%> (-11.65%) :arrow_down:
.../pinot/common/function/scalar/StringFunctions.java 73.14% <14.28%> (-4.31%) :arrow_down:
...inot/plugin/stream/kafka20/KafkaStreamMessage.java 50.00% <40.00%> (ø)
.../java/org/apache/pinot/spi/stream/RowMetadata.java 40.00% <40.00%> (ø)
...pinot/plugin/stream/kafka20/KafkaMessageBatch.java 68.75% <50.00%> (-24.11%) :arrow_down:
.../plugin/stream/kafka20/KafkaMetadataExtractor.java 82.35% <82.35%> (ø)
...apache/pinot/spi/stream/StreamDataDecoderImpl.java 82.60% <82.60%> (ø)
...in/stream/kafka20/KafkaPartitionLevelConsumer.java 72.72% <85.71%> (+1.29%) :arrow_up:
...manager/realtime/LLRealtimeSegmentDataManager.java 69.98% <100.00%> (-0.24%) :arrow_down:
... and 44 more

:mega: We’re building smart automated test selection to slash your CI/CD build times. Learn more

codecov-commenter avatar Aug 18 '22 00:08 codecov-commenter

is there a backward compatibility story for this change.

  • if the header is broken but somehow the payload is ok. what's the behavior before/after this change
  • if header doesn't agree with the pre-process info, which one should we trust?
  • if payload doesn't agree with the pre-process info but align with header, which one should we trust?

another question ii what's the performance cost of skipping headers vs parsing headers? do we have any benchmarks?

walterddr avatar Aug 18 '22 18:08 walterddr

if the header is broken but somehow the payload is ok. what's the behavior before/after this change

Not sure if I follow what you mean by " header is broken "? Today, if a record cannot be decoded, it throws an exception and stops ingestion. The behavior will remain the same.

if header doesn't agree with the pre-process info, which one should we trust? if payload doesn't agree with the pre-process info but align with header, which one should we trust?

I think the PR description needs more details on what kind of pre-processing I was referring to. I was referring to pre-processing on the source that is outside of Pinot.

another question ii what's the performance cost of skipping headers vs parsing headers? do we have any benchmarks?

There shouldn't be any performance cost for parsing headers as typically, when the headers are returned via the client API, they are already decoded. If the user-specified headers are non-primitives, then it will add to the cost. But in my current implementation, I haven't even added support for a header value decoder. It assumes that header key is a string and the value is a primitive. this is a good question. I will add more details in the PR description.

navina avatar Aug 18 '22 20:08 navina

Let's continue any design related discussions in the design doc Thanks!

navina avatar Aug 19 '22 00:08 navina

Design doc: https://docs.google.com/document/d/1kTUfBud1SBSh_703mvu6ybkbIwiKKH9eXpdcpEmhC2E/edit

This is an extension of PR #9096

Motivation

Most stream systems provide a message envelope, which encapsulates the record payload, along with record headers, keys and other system-specific metadata For e.g:

  1. Kafka allows keyed records and additionally, provides headers
  2. Kinesis requires keyed records and includes some additional metadata such as sequenceId etc
  3. Pulsar also supports keyed records and allows including arbitrary properties.
  4. Pubsub supports keyed messages, along with user-defined attributes and message metadata.

Today, Pinot drops everything from the payload, other than the record value itself. Hence, there needs to be a way to extract these values and present them in the Pinot table as regular columns (of course, it has to be defined in the pinot schema).

This can be very useful for the Pinot user as they don't have to "pre-process" the stream to make the record metadata available in the data payload. It also prevents custom solutions (such as this).

Context

Want to clarify the terminology here. Typically, in most streaming systems, a record is composed of the following:

  1. Record key - usually, a string, although kafka allows any type (today, pinot-kafka connector assumes the key to always be a key)
  2. Record value - actual data paylaod. Pinot extract only this value and decodes it.
  3. Record headers - these are user-defined record header that can be specific to the publishing application. Typically, headers are meant to be efficient and small. For example, in Kafka , it allows <String, byte[]>. technically, byte[] can be anything and we can make a call on whether to support arbitrary header value types or not.
  4. Record Metadata - these may or may not be included in the record payload and it is system-defined. For example, for message identifiers, kinesis has sequenceId, kafka has offset, pubsub has messageId etc. While these may not be useful for the user-facing application, it comes-in handy for debugging.

What does this PR do?

This PR attempts to extract key, header and other metadata from any supported streaming connector. This feature is opt-in, meaning it can be enabled by setting stream.$streamType.metadata.populate as true

please note:

  1. I am in the process of adding some unit tests. I have tested with a pinot realtime quickstart. Need to do some more cleanup.
  2. For whatever reason, the integration tests fail in the CI pipeline here, where as it runs fine on my laptop. Still fixing forward.
  3. Documentation for this feature will follow after this PR is merged.

For Reviewers, things to discuss:

  1. In the current patch, the record key (when available) is extracted as __key column , where as headers are extracted as header$<HEADER_KEY_NAME> . Does this sound like a good convention to follow for all stream connectors -> Header columns will always be prefixed with header$ and any other metadata such as key or offset will be prefixed as __
  2. In MessageBatch, I have marked one of the methods as @Deprecated as I am hoping to eventually eliminate the need for typed interface there. The current changes are backwards compatible. Let me know if there is a better way.

Would prefer if we're able to keep it all consistent in terms of the prefix (if going with __, then __key, __header$headerName, __metadata)

npawar avatar Sep 15 '22 01:09 npawar

Drew out the class diagram for easy reference. RowMetadata seems redundant. But I think we can keep it around for now.

stream-message-apis

navina avatar Sep 23 '22 08:09 navina

We would like to review this. thanks

mcvsubbu avatar Sep 23 '22 22:09 mcvsubbu