scio icon indicating copy to clipboard operation
scio copied to clipboard

GroupBy not working in tests due to coder inequality

Open f-loris opened this issue 9 months ago • 4 comments

While upgrading scio from 0.12 to latest 0.13.3 and Apache Beam from 2.41 to 2.50 I observed an issue that Beam's GroupByKey is not working anymore in tests as it complains about the equality of coders.

This is probably related to this PR https://github.com/apache/beam/pull/22702 that was also mention here in this repo.

The following code shows the error:

import org.apache.beam.sdk.transforms.{GroupByKey, SerializableFunction, WithKeys}

class ScioGroupByKeyTest extends PipelineSpec {
  case class TestRecord(key: String, value: String)

  // fails
  it should "work correctly with beams native group by key" in {
    runWithContext { sc =>
      sc.parallelize(List(TestRecord("key1", "value1")))
        .applyKvTransform(WithKeys.of(new SerializableFunction[TestRecord, String] {
          override def apply(input: TestRecord): String = input.key
        }))
        .applyKvTransform(GroupByKey.create[String, TestRecord]())
    }
  }

  // fails
  it should "work correctly with scio's group by" in {
    runWithContext { sc =>
      sc.parallelize(List(TestRecord("key1", "value1")))
        .groupBy(_.key)
    }
  }

  // works
  it should "work correctly with scio's group by with a simple string value" in {
    runWithContext { sc =>
      sc.parallelize(List("key1"))
        .groupBy(identity)
    }
  }
}

The first two cases fail as there is a record involved. The exception states:

the GroupByKey requires its output coder to be KvCoder(StringUtf8Coder,IterableCoder(RecordCoder[com.aeroficial.pipeline.scio.ScioGroupByKeyTest.TestRecord](key -> StringUtf8Coder, value -> StringUtf8Coder))) but found KvCoder(StringUtf8Coder,IterableCoder(RecordCoder[com.aeroficial.pipeline.scio.ScioGroupByKeyTest.TestRecord](key -> StringUtf8Coder, value -> StringUtf8Coder))).
java.lang.IllegalStateException: the GroupByKey requires its output coder to be KvCoder(StringUtf8Coder,IterableCoder(RecordCoder[com.aeroficial.pipeline.scio.ScioGroupByKeyTest.TestRecord](key -> StringUtf8Coder, value -> StringUtf8Coder))) but found KvCoder(StringUtf8Coder,IterableCoder(RecordCoder[com.aeroficial.pipeline.scio.ScioGroupByKeyTest.TestRecord](key -> StringUtf8Coder, value -> StringUtf8Coder))).
	at org.apache.beam.sdk.transforms.GroupByKey.validate(GroupByKey.java:190)

From the exception the coders seem to be identical. While debugging I discovered that it is somehow related to coder materialization as here a materialized coder is compared to a non-materialized value coder: image

As mentioned this only appears in tests, because when using the DirectRunner it is working as it is comparing non MaterializedCoders. image

Am I doing here something wrong or is there any workaround? I already tried setting the coder's manually through setCoder.

Thanks for taking a look at this issue.

f-loris avatar Sep 21 '23 13:09 f-loris

This sounds familiar. Thanks for providing the test cases, will look into it ASAP

RustedBones avatar Sep 21 '23 13:09 RustedBones

Here are the debugging results: The 2nd case

  it should "work correctly with scio's group by" in {
    runWithContext { sc =>
      sc.parallelize(List(TestRecord("key1", "value1")))
        .groupBy(_.key)
    }
  }

Does not fail with the same exception:

Found an $outer field in class com.spotify.scio.ScioGroupByKeyTest$$anonfun$14$$anonfun$apply$5$$anon$5.
Possibly it is an attempt to use inner case class in a Scio transformation. Inner case classes are not supported in Scio auto-derived macros. Move the case class to the package level or define a custom coder.

basically, test classes are not serializable. If you move the TestRecord definition into the test class companion object, it will work as expected.

Concerning the 1st test case

  it should "work correctly with beams native group by key" in {
    runWithContext { sc =>
      sc.parallelize(List(TestRecord("key1", "value1")))
        .applyKvTransform(WithKeys.of(new SerializableFunction[TestRecord, String] {
          override def apply(input: TestRecord): String = input.key
        }))
        .applyKvTransform(GroupByKey.create[String, TestRecord]())
    }
  }

You are right. Since beam 2.42 there is an extra check, where the aggregation steps verifies the value coder to be of type IterableCoder. Unfortunately, this does not play well with scio (scio would not require verification since it is typed checked), and the coder instrumentation (The materialized coder helps debugging failed step with call site info).

The workaround is to use Coder.aggregate, that will change materialization behavior and not wrap the IterableCoder:

 // explicit
.applyKvTransform(GroupByKey.create[String, TestRecord]())(Coder.stringCoder, Coder.aggregate)

// implicit
implicit def groupCoder[T: Coder]: Coder[java.lang.Iterable[T]] = Coder.aggregate[T]
.applyKvTransform(GroupByKey.create[String, TestRecord]())

RustedBones avatar Sep 22 '23 09:09 RustedBones

Thanks for taking a look and sorry that I mixed up that the second test case failed due to a different error.

I can confirm that the workaround works fine.

I suppose that this issue might appear also for other users that need to work with applyKvTransform as I'm using stateful DoFn's where KV is required. Would there be a way that applyKvTransform handles this automatically or should it by at least documented somewhere?

f-loris avatar Sep 24 '23 18:09 f-loris

The problem is not the applyKvTransform but the GroupByKey that performs the extra validation on the value coder. We can add a extra check that if the transform is of type GroupByKey, we use the aggregate coder

RustedBones avatar Oct 04 '23 10:10 RustedBones