scio icon indicating copy to clipboard operation
scio copied to clipboard

Testing pipeline using a parquet input more than once fails

Open idreeskhan opened this issue 4 years ago • 1 comments

I have a pipeline that does something like this on Scio 0.9.2

val a = sc.parquetAvroFile[MySchema](path, Projection)
val b = a.filter(xyz)
val c = a.map(a => abc)

and a matching JobTest

    JobTest[MyJob.type]
      .args(
        "--output=a",
        "--input1=b",
        "--input2=c"
      )
      .input(AvroIO[input1]("a"), input1s)
      .input(ParquetAvroIO[input2]("b"), input2s)
      .output(AvroIO[Output]("c")))(_ should containInAnyOrder(output))

Running this Job Test results in the following error

[info]   java.lang.IllegalArgumentException: requirement failed: Test input ParquetAvroIO(a) has already been read from once.
[info]   at scala.Predef$.require(Predef.scala:281)
[info]   at com.spotify.scio.testing.TestInput.apply(TestDataManager.scala:69)
[info]   at com.spotify.scio.io.ScioIO.readTest(ScioIO.scala:84)
[info]   at com.spotify.scio.io.ScioIO.readTest$(ScioIO.scala:81)
[info]   at com.spotify.scio.parquet.avro.ParquetAvroIO.readTest(ParquetAvroIO.scala:46)
[info]   at com.spotify.scio.io.ScioIO$$anonfun$readWithContext$1.apply(ScioIO.scala:75)
[info]   at com.spotify.scio.io.ScioIO$$anonfun$readWithContext$1.apply(ScioIO.scala:74)
[info]   at com.spotify.scio.ScioContext.requireNotClosed(ScioContext.scala:690)
[info]   at com.spotify.scio.io.ScioIO.readWithContext(ScioIO.scala:74)
[info]   at com.spotify.scio.io.ScioIO.readWithContext$(ScioIO.scala:70)
[info]   at com.spotify.scio.parquet.avro.ParquetAvroIO.readWithContext(ParquetAvroIO.scala:46)
[info]   at com.spotify.scio.ScioContext.read(ScioContext.scala:827)
[info]   at com.spotify.scio.parquet.avro.package$ParquetAvroFile.map(package.scala:85)
[info]   at MyJob$.main(MyJob.scala:42)

I can work around this by not directly referencing the parquet input val e.g.

val a = sc.ParquetAvroFile().map(identity)
val b = a.filter
val c = a.map

idreeskhan avatar Aug 03 '20 17:08 idreeskhan

This is sort of by design. sc.ParquetAvroFile(path) doesn't initiate the IO right way, .map(f) does, together with f as a projection function since the projected Avro records might be incomplete and fail ser/de

a.map(f1) & a.map(f2) actually generates 2 ParquetAvroIO(path) for testing and causes duplication. Not sure if it's fixable though since we'll than have to uniquely identify the f both in pipeline code and test.

The workaround would be something like this:

val a = sc.parquetAvroFile(...).map(f0)
val b = a.map(f1)
val c = a.map(f2)

Where f0 projects columns needed by both f1 & f2, while f1 & f2 splits a into 2. That way we only read the file once with a union of column projections.

nevillelyh avatar Aug 03 '20 18:08 nevillelyh