scio
scio copied to clipboard
covary* doesn't work with transform
It looks like newly added covary* functions don't work when applied directly to the input transform
SCollection
e.g.:
def main(cmdlineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdlineArgs)
val files = sc.parallelize(Seq(
"gs://kirill/files/1/file1.txt",
"gs://kirill/files/2/file2.txt"
))
val fileContents =
files.transform("x")(_.readFiles).map(x => println(x))
sc.run()
}
The pipeline defined above would fail with the following error:
Exception in thread "main" java.lang.IllegalStateException: cannot change the Coder of parallelize@{AvroExample.scala:57}:1/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper).output [PCollection@1360332263] once it's been used
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:588)
at org.apache.beam.sdk.values.PCollection.setCoder(PCollection.java:292)
at com.spotify.scio.values.SCollection.setCoder(SCollection.scala:144)
at com.spotify.scio.values.SCollection.setCoder$(SCollection.scala:142)
at com.spotify.scio.values.SCollectionImpl.setCoder(SCollection.scala:1582)
at com.spotify.scio.values.SCollection$$anonfun$unsafeCastElementWithCoder$2.apply(SCollection.scala:303)
at com.spotify.scio.values.SCollection$$anonfun$unsafeCastElementWithCoder$2.apply(SCollection.scala:303)
at scala.util.Either.fold(Either.scala:189)
at com.spotify.scio.values.SCollection.unsafeCastElementWithCoder(SCollection.scala:303)
at com.spotify.scio.values.SCollection.unsafeCastElementWithCoder$(SCollection.scala:301)
at com.spotify.scio.values.SCollectionImpl.unsafeCastElementWithCoder(SCollection.scala:1582)
at com.spotify.scio.values.SCollection.covary_(SCollection.scala:334)
at com.spotify.scio.values.SCollection.covary_$(SCollection.scala:334)
at com.spotify.scio.values.SCollectionImpl.covary_(SCollection.scala:1582)
at com.spotify.scio.values.SCollection.readFiles(SCollection.scala:1346)
at com.spotify.scio.values.SCollection.readFiles$(SCollection.scala:1336)
at com.spotify.scio.values.SCollectionImpl.readFiles(SCollection.scala:1582)
at com.spotify.scio.values.SCollection.readFiles(SCollection.scala:1280)
at com.spotify.scio.values.SCollection.readFiles$(SCollection.scala:1279)
at com.spotify.scio.values.SCollectionImpl.readFiles(SCollection.scala:1582)
at com.spotify.scio.examples.extra.AvroExample$$anonfun$1.apply(AvroExample.scala:63)
at com.spotify.scio.examples.extra.AvroExample$$anonfun$1.apply(AvroExample.scala:63)
at com.spotify.scio.values.SCollection$$anonfun$transform$1.apply(SCollection.scala:255)
at com.spotify.scio.values.SCollection$$anonfun$transform$1.apply(SCollection.scala:255)
at com.spotify.scio.values.SCollection$$anon$1.expand(SCollection.scala:264)
at com.spotify.scio.values.SCollection$$anon$1.expand(SCollection.scala:263)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:547)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:498)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:370)
at com.spotify.scio.values.PCollectionWrapper.applyInternal(PCollectionWrapper.scala:42)
at com.spotify.scio.values.PCollectionWrapper.applyInternal$(PCollectionWrapper.scala:38)
at com.spotify.scio.values.SCollectionImpl.applyInternal(SCollection.scala:1582)
at com.spotify.scio.values.PCollectionWrapper.applyInternal(PCollectionWrapper.scala:53)
at com.spotify.scio.values.PCollectionWrapper.applyInternal$(PCollectionWrapper.scala:49)
at com.spotify.scio.values.SCollectionImpl.applyInternal(SCollection.scala:1582)
at com.spotify.scio.values.SCollection.transform_(SCollection.scala:263)
at com.spotify.scio.values.SCollection.transform_$(SCollection.scala:260)
at com.spotify.scio.values.SCollectionImpl.transform_(SCollection.scala:1582)
at com.spotify.scio.values.SCollection.transform(SCollection.scala:255)
at com.spotify.scio.values.SCollection.transform$(SCollection.scala:254)
at com.spotify.scio.values.SCollectionImpl.transform(SCollection.scala:1582)
According to Beam's docs setCoder
function which is invoked by covary*
could be invoked only before a PCollection is finalized, but apparently it is already finalized when coming as an argument to the transform
underlying PTransform:
private[scio] def transform_[U <: POutput](name: String)(f: SCollection[T] => U): U = {
applyInternal(
name,
new PTransform[PCollection[T], U]() {
override def expand(input: PCollection[T]): U = f(context.wrap(input))
}
)
}
input: PCollection[T]
in this case is finalized and covary/setCoder couldn't be invoked. A simple trick with mapping a collection through an identity function fixes the problem because a new non-finalized collection is created even though it is equivalent to the original collection:
files.transform("x")(_.map(identity).readFiles).map(x => println(x))
I don't think we can add mapping with identity to the transform_
implementation because it would create a separate node in the DAG. In addition, there might be some performance considerations.
Don't have any other ideas how to make covary
work with transform
for now. Looking for ideas!
FWIW I'm seeing the same exception when using a PTransfrom we use to write data. It works without issue for a trivial vanilla Beam job but fails from a Scio job that's a line by line conversion.
Use site:
sc.customInput(...)
...
.applyTransform(customWritePTransfrom)
Very deep inside the PTransform does:
.apply(
FileIO
.write()
.via(new ElephantBirdSink)
.to(dir)
.withSuffix(".lzo")
.withNumShards(shards))
.getPerDestinationOutputFilenames
.apply(Values.create[String]())
The same exception regarding Coder already set comes up, seems to come from the Values step there at the end. Using a MapElements doesn't fix it.
As a workaround switched the code to:
sc.customInput(...)
...
.saveAsCustomOutput("Write", customWritePTransfrom)
This works but misses out on the output being a collection of the output file names. In this case it's not a big deal since it gets registered in a metadata service and it's unlikely users process the list of files. My worry is that at some point some PTransform that we really need output from can't be used in applyTranform
.
We are on Scio 0.9.2
and Beam 2.29.0
, I understand it might not be the most useful report.
May not be tractable per description in #3969