pinot icon indicating copy to clipboard operation
pinot copied to clipboard

[Pinot 1.2.0] Batch upload for realtime table using Spark fails with error "Creation time must be set for uploaded realtime segment name generator"

Open ajeydudhe opened this issue 1 year ago • 1 comments

Steps to reproduce

  • Create schema for realtime table and define the table config having full upsert enabled.
  • Use the attached job spec for spark-submit command.
  • Note that there was issue with using http endpoint to fetch table config since it seems to expect the config to be returned only for OFFLINE table. Hence, using the local file path for realtime table. This is another issue.
  • Following is the segmentNameGeneratorSpec used.
  • The input file has format: uploaded__myTable__0__20220101T0000Z__suffix
  • Tried using the type as inputFile and uploadedRealtime
  • If type = uploadedRealtime then it fails with error "Creation time must be set for uploaded realtime segment name generator"
  • If type is inputFile and generated segment has same name format then segment gets loaded but on server it fails to load.
segmentNameGeneratorSpec:

  # type: Current supported type is 'simple' and 'normalizedDate'.
  type: uploadedRealtime
  #type: inputFile

  # configs: Configs to init SegmentNameGenerator.
  configs:
    #segment.name.prefix: 'uploaded__myTable__0__20220101T0000Z__suffix'
    #exclude.sequence.id: true
    # Below is for using file name as segment name
    file.path.pattern: '.+/(.+)\.json'
    segment.name.template: '\${filePathPattern:\1}'
  • Please confirm on what should be the segmentNameGeneratorSpec.type used to generate segments from json files for realtime table using Spark.

sparkIngestionJobSpec_myTable.yaml.txt

ajeydudhe avatar Sep 25 '24 14:09 ajeydudhe

@rohityadav1993 Can you help take a look? This is related to changes introduced in #13107

Jackie-Jiang avatar Sep 26 '24 22:09 Jackie-Jiang

I'm also working on batch upload for realtime table using spark job spec. Read through the source code, it looks like it's not supported for uploadedRealtime. All fields are getting from segmentGeneratorConfig instead of the spec map segmentNameGeneratorConfigs: https://github.com/apache/pinot/blob/master/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java#L169-L182

I think we need to update to get these fields from job spec, we can reuse segment.name.prefix, use.global.directory.sequence.id and segment.partitionId defined in BatchConfigProperties

@Jackie-Jiang do you know why these properties are defined in both SegmentGenerationTaskRunner and BatchConfigProperties? Could you also help confirm if my above assumptions are correct?

pengding-stripe avatar Oct 31 '24 05:10 pengding-stripe

@ajeydudhe, can you share the stacktrace. There might be some implementation gap in the spark connector side. The flink connector was refactored to support uploaded realtime segment.

rohityadav1993 avatar Oct 31 '24 09:10 rohityadav1993

@pengding-stripe , I believe the reason for the two to exist is batchConfigMap can come from tableConfig and jobs should consider both(flink-connector does it). Looked at the segmentGenerationJob and it gets the configs from spec only and currently it does not read configs needed for generating uploadedRealtime segments.

To support , following is needed:

  • Creation time: Usually, the segment creation time can be currentTimeMs() but some usecases can also put a more deterministic time i.e. an upload time.

  • Prefix can be anything.

  • Suffix is generally good to keep as sequence id and current spark jobs do have indexing based on files in a directory. This can be reused.

  • PartitionId: For append only table it does not matter but we should try to generate partition id as spread out as possible to avoid data skew in a partition. For upsert tables it must be provided consistent with the partitioning of the stream based on primary keys. I think the spark jobs are not implemented in way to generate partitioned segments for upserts to work when uploaded. (I'll cover this as part of #12987)

For non-upsert realtime usecases, I'll raise a PR to support uploadedRealtime segment conforming to UploadedRealtimeSegmentName

rohityadav1993 avatar Oct 31 '24 12:10 rohityadav1993

@rohityadav1993 thanks for the response. I think spark-connector only supports read operations so everyone uses spark for ingestion is using the job spec (though I see there are some efforts to make spark-connector support ingestion were merged recently)

I think the spark jobs are not implemented in way to generate partitioned segments for upserts to work when uploaded

I think we just need to fix this part to get these fields from job spec? Are you suggesting we should get from table config?

pengding-stripe avatar Oct 31 '24 16:10 pengding-stripe

@pengding-stripe , not suggesting to use table config right now just comparison with flink-connector (code) where it also uses table configs and additionally overrrides the necessary configs. For UploadedRealtimeSegmentName, you mentioned the right place to make changes and they will have to be populated using the logic I shared previously.

rohityadav1993 avatar Nov 04 '24 05:11 rohityadav1993

@rohityadav1993 I made a PR to fix this: https://github.com/apache/pinot/pull/14443

pengding-stripe avatar Nov 13 '24 23:11 pengding-stripe