scio
scio copied to clipboard
Testing pipeline using a parquet input more than once fails
I have a pipeline that does something like this on Scio 0.9.2
val a = sc.parquetAvroFile[MySchema](path, Projection)
val b = a.filter(xyz)
val c = a.map(a => abc)
and a matching JobTest
JobTest[MyJob.type]
.args(
"--output=a",
"--input1=b",
"--input2=c"
)
.input(AvroIO[input1]("a"), input1s)
.input(ParquetAvroIO[input2]("b"), input2s)
.output(AvroIO[Output]("c")))(_ should containInAnyOrder(output))
Running this Job Test results in the following error
[info] java.lang.IllegalArgumentException: requirement failed: Test input ParquetAvroIO(a) has already been read from once.
[info] at scala.Predef$.require(Predef.scala:281)
[info] at com.spotify.scio.testing.TestInput.apply(TestDataManager.scala:69)
[info] at com.spotify.scio.io.ScioIO.readTest(ScioIO.scala:84)
[info] at com.spotify.scio.io.ScioIO.readTest$(ScioIO.scala:81)
[info] at com.spotify.scio.parquet.avro.ParquetAvroIO.readTest(ParquetAvroIO.scala:46)
[info] at com.spotify.scio.io.ScioIO$$anonfun$readWithContext$1.apply(ScioIO.scala:75)
[info] at com.spotify.scio.io.ScioIO$$anonfun$readWithContext$1.apply(ScioIO.scala:74)
[info] at com.spotify.scio.ScioContext.requireNotClosed(ScioContext.scala:690)
[info] at com.spotify.scio.io.ScioIO.readWithContext(ScioIO.scala:74)
[info] at com.spotify.scio.io.ScioIO.readWithContext$(ScioIO.scala:70)
[info] at com.spotify.scio.parquet.avro.ParquetAvroIO.readWithContext(ParquetAvroIO.scala:46)
[info] at com.spotify.scio.ScioContext.read(ScioContext.scala:827)
[info] at com.spotify.scio.parquet.avro.package$ParquetAvroFile.map(package.scala:85)
[info] at MyJob$.main(MyJob.scala:42)
I can work around this by not directly referencing the parquet input val e.g.
val a = sc.ParquetAvroFile().map(identity)
val b = a.filter
val c = a.map
This is sort of by design. sc.ParquetAvroFile(path)
doesn't initiate the IO right way, .map(f)
does, together with f
as a projection function since the projected Avro records might be incomplete and fail ser/de
a.map(f1)
& a.map(f2)
actually generates 2 ParquetAvroIO(path)
for testing and causes duplication. Not sure if it's fixable though since we'll than have to uniquely identify the f
both in pipeline code and test.
The workaround would be something like this:
val a = sc.parquetAvroFile(...).map(f0)
val b = a.map(f1)
val c = a.map(f2)
Where f0
projects columns needed by both f1
& f2
, while f1
& f2
splits a
into 2. That way we only read the file once with a union of column projections.