feathr
feathr copied to clipboard
Add delimiter option for reading CSV files for Feathr
Signed-off-by: Chang Yong Lik [email protected]
Description
- Added delimiter options
Resolves https://github.com/linkedin/feathr/issues/241
How was this patch tested?
Tested locally with sbt
sbt 'testOnly com.linkedin.feathr.offline.source.dataloader.TestCsvDataLoader'
sbt 'testOnly com.linkedin.feathr.offline.util.TestSourceUtils'
sbt 'testOnly com.linkedin.feathr.offline.source.dataloader.TestBatchDataLoader'
sbt 'testOnly com.linkedin.feathr.offline.source.dataloader.hdfs.TestFileFormat'
sbt 'testOnly com.linkedin.feathr.offline.source.dataloader.TestDataLoaderFactory'
Progress Tracker
- [x] Use the Job configurations like here to implement, i.e. add a section called spark.feathr.inputFormat.csvOptions.sep which allows end users to pass the delimiters as options
- [x] in the scala code, if you search for ss.read.format("csv").option("header", "true"), there will be a bunch of places that you need to modify. Eventually they will be using something like the csv reader here (https://spark.apache.org/docs/3.2.0/sql-data-sources-csv.html).
- [x] You can get the config in different places thru something like this: sqlContext.getConf("spark.feathr.inputFormat.csvOptions.sep", ",")
- [x] Test case
- [x] Also please help the job configuration docs (https://linkedin.github.io/feathr/how-to-guides/feathr-job-configuration.html) to make sure the options are clear to end users.
Does this PR introduce any user-facing changes?
Allows users to specify delimiters
@ahlag thanks for the contribution! This PR looks good to me, but I'm not sure why the test fails. Spent a bit time to investigate and feel it might be caused by the newly added tests?
@xiaoyongzhu
Did it fail in GitHub Actions or by command line e.g. sbt 'testOnly com.linkedin.feathr.offline.util.TestSourceUtils'
? I tried rerunning the following commands locally but the unit cases were mot failling.
sbt 'testOnly com.linkedin.feathr.offline.source.dataloader.TestCsvDataLoader'
sbt 'testOnly com.linkedin.feathr.offline.util.TestSourceUtils'
sbt 'testOnly com.linkedin.feathr.offline.source.dataloader.TestBatchDataLoader'
sbt 'testOnly com.linkedin.feathr.offline.source.dataloader.hdfs.TestFileFormat'
sbt 'testOnly com.linkedin.feathr.offline.source.dataloader.TestDataLoaderFactory'
Talked with @ahlag offline and asked him to run sbt test
. Feel the issue is mostly because those failed tests were relying on loadDataFrame to read the csv files
@xiaoyongzhu I think I found the problem. The delimiter was not pass successfully when I tried passing the options with sqlContext. Is there a way to set the config with SparkSession in unit tests?
TestFileFormat.scala
val sqlContext = ss.sqlContext
sqlContext.setConf("spark.feathr.inputFormat.csvOptions.sep", "\t")
FileFormat.scala
val csvDelimiterOption = ss.sparkContext.getConf.get("spark.feathr.inputFormat.csvOptions.sep", ",")
@xiaoyongzhu I think I found the problem. The delimiter was not pass successfully when I tried passing the options with sqlContext. Is there a way to set the config with SparkSession in unit tests?
TestFileFormat.scala
val sqlContext = ss.sqlContext sqlContext.setConf("spark.feathr.inputFormat.csvOptions.sep", "\t")
FileFormat.scala
val csvDelimiterOption = ss.sparkContext.getConf.get("spark.feathr.inputFormat.csvOptions.sep", ",")
Hmm it's a bit weird. Is it possible to force set the delimiters?
Look like there can only be one SparkContext
[info] TestFileFormat:
[info] - testLoadDataFrame
[info] - testLoadDataFrameWithCsvDelimiterOption *** FAILED ***
[info] org.apache.spark.SparkException: Only one SparkContext should be running in this JVM (see SPARK-2243).The currently running SparkContext was created at:
I think I will try a new approach. Since an existing SparkContext cannot be edited nor can another one be created, I will test it with e2e by passing the config from the client.
I think I will try a new approach. Since an existing SparkContext cannot be edited nor can another one be created, I will test it with e2e by passing the config from the client.
I did some research and found this answer: https://stackoverflow.com/a/44613011
sqlContext.setConf("spark.sql.shuffle.partitions", "10") will set the property parameter for whole application before logicalPlan is generated.
sqlContext.sql("set spark.sql.shuffle.partitions=15") will also set the property but only for particular query and is generated at the time of logicalPlan creation.
Choosing between them depends on what your requirement is.
Maybe you can try sqlContext.sql?
Ok, I'll give this a shot
Hi @Xiaoyong Zhu,
Sorry for the hold up.
I fixed the test failures. Seems like my hunch was correct. When spark.feathr.inputFormat.csvOptions.sep
was passed it affected and mutated the dataLoader of the other unit tests. What I did was forcibly default csvDelimiterOption
to ,
if csvDelimiterOption
is not set or empty. By the way, what version do you want this feature to be?
https://github.com/linkedin/feathr/blob/ebc43eebd518510c16e49f808369111760d2b0bb/docs/how-to-guides/feathr-job-configuration.md?plain=1#L15
Test passed: https://github.com/linkedin/feathr/runs/7472178857?check_suite_focus=true
@xiaoyongzhu Ok! I have updated the release version.
@xiaoyongzhu @hangfei I have finished the changes. Could you review?
@ahlag would you mind merge latest main and resolve the conflicts so that we can get this merged, thanks for your time!
@xiaoyongzhu @blrchen Done! Could you merge it today? Cause I am afraid it might have new conflicts.