scio icon indicating copy to clipboard operation
scio copied to clipboard

covary* doesn't work with transform

Open stormy-ua opened this issue 2 years ago • 2 comments

It looks like newly added covary* functions don't work when applied directly to the input transform SCollection e.g.:

  def main(cmdlineArgs: Array[String]): Unit = {
    val (sc, args) = ContextAndArgs(cmdlineArgs)

    val files = sc.parallelize(Seq(
      "gs://kirill/files/1/file1.txt",
      "gs://kirill/files/2/file2.txt"
    ))

    val fileContents =
      files.transform("x")(_.readFiles).map(x => println(x))

    sc.run()
  }

The pipeline defined above would fail with the following error:

Exception in thread "main" java.lang.IllegalStateException: cannot change the Coder of parallelize@{AvroExample.scala:57}:1/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper).output [PCollection@1360332263] once it's been used
	at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:588)
	at org.apache.beam.sdk.values.PCollection.setCoder(PCollection.java:292)
	at com.spotify.scio.values.SCollection.setCoder(SCollection.scala:144)
	at com.spotify.scio.values.SCollection.setCoder$(SCollection.scala:142)
	at com.spotify.scio.values.SCollectionImpl.setCoder(SCollection.scala:1582)
	at com.spotify.scio.values.SCollection$$anonfun$unsafeCastElementWithCoder$2.apply(SCollection.scala:303)
	at com.spotify.scio.values.SCollection$$anonfun$unsafeCastElementWithCoder$2.apply(SCollection.scala:303)
	at scala.util.Either.fold(Either.scala:189)
	at com.spotify.scio.values.SCollection.unsafeCastElementWithCoder(SCollection.scala:303)
	at com.spotify.scio.values.SCollection.unsafeCastElementWithCoder$(SCollection.scala:301)
	at com.spotify.scio.values.SCollectionImpl.unsafeCastElementWithCoder(SCollection.scala:1582)
	at com.spotify.scio.values.SCollection.covary_(SCollection.scala:334)
	at com.spotify.scio.values.SCollection.covary_$(SCollection.scala:334)
	at com.spotify.scio.values.SCollectionImpl.covary_(SCollection.scala:1582)
	at com.spotify.scio.values.SCollection.readFiles(SCollection.scala:1346)
	at com.spotify.scio.values.SCollection.readFiles$(SCollection.scala:1336)
	at com.spotify.scio.values.SCollectionImpl.readFiles(SCollection.scala:1582)
	at com.spotify.scio.values.SCollection.readFiles(SCollection.scala:1280)
	at com.spotify.scio.values.SCollection.readFiles$(SCollection.scala:1279)
	at com.spotify.scio.values.SCollectionImpl.readFiles(SCollection.scala:1582)
	at com.spotify.scio.examples.extra.AvroExample$$anonfun$1.apply(AvroExample.scala:63)
	at com.spotify.scio.examples.extra.AvroExample$$anonfun$1.apply(AvroExample.scala:63)
	at com.spotify.scio.values.SCollection$$anonfun$transform$1.apply(SCollection.scala:255)
	at com.spotify.scio.values.SCollection$$anonfun$transform$1.apply(SCollection.scala:255)
	at com.spotify.scio.values.SCollection$$anon$1.expand(SCollection.scala:264)
	at com.spotify.scio.values.SCollection$$anon$1.expand(SCollection.scala:263)
	at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:547)
	at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:498)
	at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:370)
	at com.spotify.scio.values.PCollectionWrapper.applyInternal(PCollectionWrapper.scala:42)
	at com.spotify.scio.values.PCollectionWrapper.applyInternal$(PCollectionWrapper.scala:38)
	at com.spotify.scio.values.SCollectionImpl.applyInternal(SCollection.scala:1582)
	at com.spotify.scio.values.PCollectionWrapper.applyInternal(PCollectionWrapper.scala:53)
	at com.spotify.scio.values.PCollectionWrapper.applyInternal$(PCollectionWrapper.scala:49)
	at com.spotify.scio.values.SCollectionImpl.applyInternal(SCollection.scala:1582)
	at com.spotify.scio.values.SCollection.transform_(SCollection.scala:263)
	at com.spotify.scio.values.SCollection.transform_$(SCollection.scala:260)
	at com.spotify.scio.values.SCollectionImpl.transform_(SCollection.scala:1582)
	at com.spotify.scio.values.SCollection.transform(SCollection.scala:255)
	at com.spotify.scio.values.SCollection.transform$(SCollection.scala:254)
	at com.spotify.scio.values.SCollectionImpl.transform(SCollection.scala:1582)

According to Beam's docs setCoder function which is invoked by covary* could be invoked only before a PCollection is finalized, but apparently it is already finalized when coming as an argument to the transform underlying PTransform:

private[scio] def transform_[U <: POutput](name: String)(f: SCollection[T] => U): U = {
    applyInternal(
      name,
      new PTransform[PCollection[T], U]() {
        override def expand(input: PCollection[T]): U = f(context.wrap(input))
      }
    )
  }

input: PCollection[T] in this case is finalized and covary/setCoder couldn't be invoked. A simple trick with mapping a collection through an identity function fixes the problem because a new non-finalized collection is created even though it is equivalent to the original collection:

files.transform("x")(_.map(identity).readFiles).map(x => println(x))

I don't think we can add mapping with identity to the transform_ implementation because it would create a separate node in the DAG. In addition, there might be some performance considerations.

Don't have any other ideas how to make covary work with transform for now. Looking for ideas!

stormy-ua avatar Jul 06 '21 20:07 stormy-ua

FWIW I'm seeing the same exception when using a PTransfrom we use to write data. It works without issue for a trivial vanilla Beam job but fails from a Scio job that's a line by line conversion.

Use site:

sc.customInput(...)
...
.applyTransform(customWritePTransfrom)

Very deep inside the PTransform does:

.apply(
  FileIO
    .write()
    .via(new ElephantBirdSink)
    .to(dir)
    .withSuffix(".lzo")
    .withNumShards(shards))
.getPerDestinationOutputFilenames
.apply(Values.create[String]())

The same exception regarding Coder already set comes up, seems to come from the Values step there at the end. Using a MapElements doesn't fix it.

As a workaround switched the code to:

sc.customInput(...)
...
.saveAsCustomOutput("Write", customWritePTransfrom)

This works but misses out on the output being a collection of the output file names. In this case it's not a big deal since it gets registered in a metadata service and it's unlikely users process the list of files. My worry is that at some point some PTransform that we really need output from can't be used in applyTranform.

We are on Scio 0.9.2 and Beam 2.29.0, I understand it might not be the most useful report.

tlazaro avatar Jul 30 '21 23:07 tlazaro

May not be tractable per description in #3969

kellen avatar Jul 08 '22 14:07 kellen