pinot icon indicating copy to clipboard operation
pinot copied to clipboard

Adds intermediate dataType to schema and use it for ingestion aggregation

Open noob-se7en opened this issue 3 months ago • 5 comments

Problem Related to https://github.com/apache/pinot/issues/16317 . TLDR: When the ingestion aggregation/tranformation happens on source column not present in schema, There can be exceptions thrown which occur from data type conversions since there is no info of those source column as they are not present in the schema. Example: Ingestion aggregation: sum(price) , Here if price column is not part of schema, Pinot assumes it to be as Number but it can be String in source.

PR Add new intermediate field type like below to schema and use this info in ingestion aggregation.

  "intermediateFieldSpecs": [
    {
      "name": "price",
      "dataType": "STRING"
    }
  ],

Pending Adding more tests. Opening this PR to get early reviews.

noob-se7en avatar Sep 22 '25 18:09 noob-se7en

Codecov Report

:x: Patch coverage is 53.74150% with 68 lines in your changes missing coverage. Please review. :white_check_mark: Project coverage is 63.28%. Comparing base (2eeecc5) to head (a22c5cf).

Files with missing lines Patch % Lines
...t/local/aggregator/MinMaxRangeValueAggregator.java 19.04% 12 Missing and 5 partials :warning:
...t/segment/local/aggregator/AvgValueAggregator.java 15.78% 11 Missing and 5 partials :warning:
...rc/main/java/org/apache/pinot/spi/data/Schema.java 18.75% 12 Missing and 1 partial :warning:
...local/indexsegment/mutable/MutableSegmentImpl.java 75.00% 9 Missing and 3 partials :warning:
...g/apache/pinot/spi/data/IntermediateFieldSpec.java 0.00% 3 Missing :warning:
.../local/aggregator/SumPrecisionValueAggregator.java 80.00% 1 Missing and 1 partial :warning:
...segment/local/aggregator/ValueAggregatorUtils.java 66.66% 1 Missing and 1 partial :warning:
...t/segment/local/aggregator/MaxValueAggregator.java 85.71% 1 Missing :warning:
...t/segment/local/aggregator/MinValueAggregator.java 85.71% 1 Missing :warning:
...t/segment/local/aggregator/SumValueAggregator.java 85.71% 1 Missing :warning:
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #16868      +/-   ##
============================================
- Coverage     63.30%   63.28%   -0.02%     
  Complexity     1474     1474              
============================================
  Files          3155     3157       +2     
  Lines        188119   188223     +104     
  Branches      28792    28805      +13     
============================================
+ Hits         119088   119121      +33     
- Misses        59800    59857      +57     
- Partials       9231     9245      +14     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-11 63.25% <53.74%> (-0.03%) :arrow_down:
java-21 63.26% <53.74%> (+7.64%) :arrow_up:
temurin 63.28% <53.74%> (-0.02%) :arrow_down:
unittests 63.28% <53.74%> (-0.02%) :arrow_down:
unittests1 55.61% <26.53%> (-0.05%) :arrow_down:
unittests2 34.00% <51.70%> (+0.03%) :arrow_up:

Flags with carried forward coverage won't be shown. Click here to find out more.

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.

:rocket: New features to boost your workflow:
  • :package: JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

codecov-commenter avatar Sep 23 '25 08:09 codecov-commenter

@Jackie-Jiang added intermediate field spec in schema:

Like:

  "intermediateFieldSpecs": [
    {
      "name": "random",
      "dataType": "STRING"
    }
  ],

noob-se7en avatar Oct 16 '25 08:10 noob-se7en

@noob-se7en

  1. Will it impact segment reload (due to schema change ) etc?
    • It's impact on existing segments: Given that these are transformation at the time of ingestion, were we failing segment build for such scenarios (referring to the issues mentioned above) ?
    • It's impact on pauseless ingestion i.e. scenarios of continued ingestion without segment build. Will we rely on DR here ?
  2. How are we handling transformations for such scenarios ? Is the expectation that the column being transformed is part of the schema.

9aman avatar Nov 03 '25 06:11 9aman

@noob-se7en

  1. Will it impact segment reload (due to schema change ) etc?

    • It's impact on existing segments: Given that these are transformation at the time of ingestion, were we failing segment build for such scenarios (referring to the issues mentioned above) ?
    • It's impact on pauseless ingestion i.e. scenarios of continued ingestion without segment build. Will we rely on DR here ?
  2. How are we handling transformations for such scenarios ? Is the expectation that the column being transformed is part of the schema.

I guess for transformation the ingestion itself, at row level, will throw exceptions and we won't wait till the segment build ?

9aman avatar Nov 03 '25 07:11 9aman

@noob-se7en

  1. Will it impact segment reload (due to schema change ) etc?

    • It's impact on existing segments: Given that these are transformation at the time of ingestion, were we failing segment build for such scenarios (referring to the issues mentioned above) ?
    • It's impact on pauseless ingestion i.e. scenarios of continued ingestion without segment build. Will we rely on DR here ?
  2. How are we handling transformations for such scenarios ? Is the expectation that the column being transformed is part of the schema.

I don't understand the questions fully. Code changes are only in MutableSegmentImpl. It should not impact reload of segments right?

This PR is only meant for supporting realtime ingestion aggregation (which happens during indexing of mutable segments)

noob-se7en avatar Nov 03 '25 16:11 noob-se7en