hudi
hudi copied to clipboard
[SUPPORT] How to skip some partitions in a table when readStreaming in Spark at the init stage
Tips before filing an issue
-
Have you gone through our FAQs?
-
Join the mailing list to engage in conversations and get faster support at [email protected].
-
If you have triaged this as a bug, then file an issue directly.
Describe the problem you faced
I have a table partition by operation type and ingestion date(like insert/2023-12-11/, update/2023-12-11/, delete/2023-12-11/), when I read(use spark readStream) this table, I just want to read data under update
partition. And I found a config 'hoodie.datasource.read.incr.path.glob', then I use this config and value is update/202*
. But I found spark job init very slow, and found job was stuck
But this parquet file is not under update
partition, it is under insert
partition, which is very confused.
So I want ask is there a config that can only read the target partition and skip others and also does not read other partitions' data files to get schema.
Expected behavior
I want to know is there a config that can only read the target partition and skip others and also does not read other partitions' data files to get schema.
Environment Description
-
Hudi version : 0.14.0
-
Spark version : 3.4.1
-
Hive version :
-
Hadoop version :
-
Storage (HDFS/S3/GCS..) : GCS
-
Running on Docker? (yes/no) : no
Additional context
I use the below configurations to write to table: hudi_write_options = { 'hoodie.table.name': hudi_table_name, 'hoodie.datasource.write.partitionpath.field': 'operation_type, ingestion_dt', 'hoodie.datasource.write.operation': 'insert', 'hoodie.datasource.write.table.type': 'MERGE_ON_READ', 'hoodie.parquet.compression.codec': 'zstd', "hoodie.datasource.write.payload.class": "org.apache.hudi.common.model.DefaultHoodieRecordPayload", "hoodie.datasource.write.reconcile.schema": True, "hoodie.metadata.enable": True }
And I use the configurations to read from table: read_streaming_hudi_options = { 'maxFilesPerTrigger': 5, 'hoodie.datasource.read.incr.path.glob': 'update/202*', 'hoodie.read.timeline.holes.resolution.policy': 'BLOCK', 'hoodie.datasource.read.file.index.listing.partition-path-prefix.analysis.enabled': False, 'hoodie.file.listing.parallelism': 1000, 'hoodie.metadata.enable': True, 'hoodie.datasource.read.schema.use.end.instanttime': True, 'hoodie.datasource.streaming.startOffset': '20231211000000000' }
Stacktrace
Add the stacktrace of the error.
Did you try to add filter condition with the partition fields?
Hi @danny0405 , do you mean like this:
I tried this, but also will read other partitions' data file to resolve schema. And I think kind of filter takes effect after source load all partitions files, but I want a config that can tell source that only reads the partition that in my configs so I do not need to use filter.
but I want a config that can tell source that only reads the partition that in my configs so I do not need to use filter
That does not follow the common intuition.
@lei-su-awx If the table is partitioned then it should only read the files under that partition. Are you seeing any behaviour otherwise if it is reading all files?
@ad1happy2go I tried to only read files under that partition using spark(spark.readStream), but an error was thrown: no .hoodie file exists in the partition path, and I found only .hoodie folder only exist under the table path.
@lei-su-awx You dont need to read from that directory. You should give the parent directory only. According to your .filer("operation_type 'update')
partition pruning will take place and it will only process that partition data.
@lei-su-awx Did you see the behaviour I mentioned? Any more updates here please?
@lei-su-awx Any updates here? Do you still have this issue?
@ad1happy2go sorry for the late, I did not test this anymore because I meet new issues. But I think .filer("operation_type 'update')
can do what I want, so never mind this issue.
@lei-su-awx Closing this issue then. thanks. Please reopen in case you see issue again.