feathr icon indicating copy to clipboard operation
feathr copied to clipboard

Add delimiter option for reading CSV files for Feathr

Open ahlag opened this issue 2 years ago • 12 comments

Signed-off-by: Chang Yong Lik [email protected]

Description

  • Added delimiter options

Resolves https://github.com/linkedin/feathr/issues/241

How was this patch tested?

Tested locally with sbt

sbt 'testOnly com.linkedin.feathr.offline.source.dataloader.TestCsvDataLoader'
sbt 'testOnly com.linkedin.feathr.offline.util.TestSourceUtils'
sbt 'testOnly com.linkedin.feathr.offline.source.dataloader.TestBatchDataLoader'
sbt 'testOnly com.linkedin.feathr.offline.source.dataloader.hdfs.TestFileFormat'
sbt 'testOnly com.linkedin.feathr.offline.source.dataloader.TestDataLoaderFactory'

Progress Tracker

  • [x] Use the Job configurations like here to implement, i.e. add a section called spark.feathr.inputFormat.csvOptions.sep which allows end users to pass the delimiters as options
  • [x] in the scala code, if you search for ss.read.format("csv").option("header", "true"), there will be a bunch of places that you need to modify. Eventually they will be using something like the csv reader here (https://spark.apache.org/docs/3.2.0/sql-data-sources-csv.html).
  • [x] You can get the config in different places thru something like this: sqlContext.getConf("spark.feathr.inputFormat.csvOptions.sep", ",")
  • [x] Test case
  • [x] Also please help the job configuration docs (https://linkedin.github.io/feathr/how-to-guides/feathr-job-configuration.html) to make sure the options are clear to end users.

Does this PR introduce any user-facing changes?

Allows users to specify delimiters

ahlag avatar May 30 '22 14:05 ahlag

@ahlag thanks for the contribution! This PR looks good to me, but I'm not sure why the test fails. Spent a bit time to investigate and feel it might be caused by the newly added tests?

xiaoyongzhu avatar Jul 08 '22 16:07 xiaoyongzhu

@xiaoyongzhu Did it fail in GitHub Actions or by command line e.g. sbt 'testOnly com.linkedin.feathr.offline.util.TestSourceUtils'? I tried rerunning the following commands locally but the unit cases were mot failling.

sbt 'testOnly com.linkedin.feathr.offline.source.dataloader.TestCsvDataLoader'
sbt 'testOnly com.linkedin.feathr.offline.util.TestSourceUtils'
sbt 'testOnly com.linkedin.feathr.offline.source.dataloader.TestBatchDataLoader'
sbt 'testOnly com.linkedin.feathr.offline.source.dataloader.hdfs.TestFileFormat'
sbt 'testOnly com.linkedin.feathr.offline.source.dataloader.TestDataLoaderFactory'

ahlag avatar Jul 09 '22 05:07 ahlag

Talked with @ahlag offline and asked him to run sbt test. Feel the issue is mostly because those failed tests were relying on loadDataFrame to read the csv files

xiaoyongzhu avatar Jul 09 '22 13:07 xiaoyongzhu

@xiaoyongzhu I think I found the problem. The delimiter was not pass successfully when I tried passing the options with sqlContext. Is there a way to set the config with SparkSession in unit tests?

TestFileFormat.scala

    val sqlContext = ss.sqlContext
    sqlContext.setConf("spark.feathr.inputFormat.csvOptions.sep", "\t")

FileFormat.scala

val csvDelimiterOption = ss.sparkContext.getConf.get("spark.feathr.inputFormat.csvOptions.sep", ",")

ahlag avatar Jul 10 '22 10:07 ahlag

@xiaoyongzhu I think I found the problem. The delimiter was not pass successfully when I tried passing the options with sqlContext. Is there a way to set the config with SparkSession in unit tests?

TestFileFormat.scala

    val sqlContext = ss.sqlContext
    sqlContext.setConf("spark.feathr.inputFormat.csvOptions.sep", "\t")

FileFormat.scala

val csvDelimiterOption = ss.sparkContext.getConf.get("spark.feathr.inputFormat.csvOptions.sep", ",")

Hmm it's a bit weird. Is it possible to force set the delimiters?

xiaoyongzhu avatar Jul 10 '22 10:07 xiaoyongzhu

Look like there can only be one SparkContext

[info] TestFileFormat:
[info] - testLoadDataFrame
[info] - testLoadDataFrameWithCsvDelimiterOption *** FAILED ***
[info]   org.apache.spark.SparkException: Only one SparkContext should be running in this JVM (see SPARK-2243).The currently running SparkContext was created at:

ahlag avatar Jul 10 '22 11:07 ahlag

I think I will try a new approach. Since an existing SparkContext cannot be edited nor can another one be created, I will test it with e2e by passing the config from the client.

ahlag avatar Jul 10 '22 13:07 ahlag

I think I will try a new approach. Since an existing SparkContext cannot be edited nor can another one be created, I will test it with e2e by passing the config from the client.

I did some research and found this answer: https://stackoverflow.com/a/44613011

sqlContext.setConf("spark.sql.shuffle.partitions", "10") will set the property parameter for whole application before logicalPlan is generated.

sqlContext.sql("set spark.sql.shuffle.partitions=15") will also set the property but only for particular query and is generated at the time of logicalPlan creation.

Choosing between them depends on what your requirement is.

Maybe you can try sqlContext.sql?

xiaoyongzhu avatar Jul 11 '22 04:07 xiaoyongzhu

Ok, I'll give this a shot

ahlag avatar Jul 11 '22 05:07 ahlag

Hi @Xiaoyong Zhu,

Sorry for the hold up. I fixed the test failures. Seems like my hunch was correct. When spark.feathr.inputFormat.csvOptions.sep was passed it affected and mutated the dataLoader of the other unit tests. What I did was forcibly default csvDelimiterOption to , if csvDelimiterOption is not set or empty. By the way, what version do you want this feature to be? https://github.com/linkedin/feathr/blob/ebc43eebd518510c16e49f808369111760d2b0bb/docs/how-to-guides/feathr-job-configuration.md?plain=1#L15

Test passed: https://github.com/linkedin/feathr/runs/7472178857?check_suite_focus=true

ahlag avatar Jul 22 '22 16:07 ahlag

@xiaoyongzhu Ok! I have updated the release version.

ahlag avatar Jul 27 '22 03:07 ahlag

@xiaoyongzhu @hangfei I have finished the changes. Could you review?

ahlag avatar Aug 04 '22 13:08 ahlag

@ahlag would you mind merge latest main and resolve the conflicts so that we can get this merged, thanks for your time!

blrchen avatar Aug 16 '22 12:08 blrchen

@xiaoyongzhu @blrchen Done! Could you merge it today? Cause I am afraid it might have new conflicts.

ahlag avatar Aug 16 '22 14:08 ahlag