scio
scio copied to clipboard
GroupBy not working in tests due to coder inequality
While upgrading scio from 0.12 to latest 0.13.3 and Apache Beam from 2.41 to 2.50 I observed an issue that Beam's GroupByKey
is not working anymore in tests as it complains about the equality of coders.
This is probably related to this PR https://github.com/apache/beam/pull/22702 that was also mention here in this repo.
The following code shows the error:
import org.apache.beam.sdk.transforms.{GroupByKey, SerializableFunction, WithKeys}
class ScioGroupByKeyTest extends PipelineSpec {
case class TestRecord(key: String, value: String)
// fails
it should "work correctly with beams native group by key" in {
runWithContext { sc =>
sc.parallelize(List(TestRecord("key1", "value1")))
.applyKvTransform(WithKeys.of(new SerializableFunction[TestRecord, String] {
override def apply(input: TestRecord): String = input.key
}))
.applyKvTransform(GroupByKey.create[String, TestRecord]())
}
}
// fails
it should "work correctly with scio's group by" in {
runWithContext { sc =>
sc.parallelize(List(TestRecord("key1", "value1")))
.groupBy(_.key)
}
}
// works
it should "work correctly with scio's group by with a simple string value" in {
runWithContext { sc =>
sc.parallelize(List("key1"))
.groupBy(identity)
}
}
}
The first two cases fail as there is a record involved. The exception states:
the GroupByKey requires its output coder to be KvCoder(StringUtf8Coder,IterableCoder(RecordCoder[com.aeroficial.pipeline.scio.ScioGroupByKeyTest.TestRecord](key -> StringUtf8Coder, value -> StringUtf8Coder))) but found KvCoder(StringUtf8Coder,IterableCoder(RecordCoder[com.aeroficial.pipeline.scio.ScioGroupByKeyTest.TestRecord](key -> StringUtf8Coder, value -> StringUtf8Coder))).
java.lang.IllegalStateException: the GroupByKey requires its output coder to be KvCoder(StringUtf8Coder,IterableCoder(RecordCoder[com.aeroficial.pipeline.scio.ScioGroupByKeyTest.TestRecord](key -> StringUtf8Coder, value -> StringUtf8Coder))) but found KvCoder(StringUtf8Coder,IterableCoder(RecordCoder[com.aeroficial.pipeline.scio.ScioGroupByKeyTest.TestRecord](key -> StringUtf8Coder, value -> StringUtf8Coder))).
at org.apache.beam.sdk.transforms.GroupByKey.validate(GroupByKey.java:190)
From the exception the coders seem to be identical. While debugging I discovered that it is somehow related to coder materialization as here a materialized coder is compared to a non-materialized value coder:
As mentioned this only appears in tests, because when using the DirectRunner it is working as it is comparing non MaterializedCoders.
Am I doing here something wrong or is there any workaround? I already tried setting the coder's manually through setCoder
.
Thanks for taking a look at this issue.
This sounds familiar. Thanks for providing the test cases, will look into it ASAP
Here are the debugging results: The 2nd case
it should "work correctly with scio's group by" in {
runWithContext { sc =>
sc.parallelize(List(TestRecord("key1", "value1")))
.groupBy(_.key)
}
}
Does not fail with the same exception:
Found an $outer field in class com.spotify.scio.ScioGroupByKeyTest$$anonfun$14$$anonfun$apply$5$$anon$5.
Possibly it is an attempt to use inner case class in a Scio transformation. Inner case classes are not supported in Scio auto-derived macros. Move the case class to the package level or define a custom coder.
basically, test classes are not serializable. If you move the TestRecord
definition into the test class companion object, it will work as expected.
Concerning the 1st test case
it should "work correctly with beams native group by key" in {
runWithContext { sc =>
sc.parallelize(List(TestRecord("key1", "value1")))
.applyKvTransform(WithKeys.of(new SerializableFunction[TestRecord, String] {
override def apply(input: TestRecord): String = input.key
}))
.applyKvTransform(GroupByKey.create[String, TestRecord]())
}
}
You are right. Since beam 2.42
there is an extra check, where the aggregation steps verifies the value coder to be of type IterableCoder
. Unfortunately, this does not play well with scio (scio would not require verification since it is typed checked), and the coder instrumentation (The materialized coder helps debugging failed step with call site info).
The workaround is to use Coder.aggregate
, that will change materialization behavior and not wrap the IterableCoder
:
// explicit
.applyKvTransform(GroupByKey.create[String, TestRecord]())(Coder.stringCoder, Coder.aggregate)
// implicit
implicit def groupCoder[T: Coder]: Coder[java.lang.Iterable[T]] = Coder.aggregate[T]
.applyKvTransform(GroupByKey.create[String, TestRecord]())
Thanks for taking a look and sorry that I mixed up that the second test case failed due to a different error.
I can confirm that the workaround works fine.
I suppose that this issue might appear also for other users that need to work with applyKvTransform
as I'm using stateful DoFn's where KV
is required. Would there be a way that applyKvTransform
handles this automatically or should it by at least documented somewhere?
The problem is not the applyKvTransform
but the GroupByKey
that performs the extra validation on the value coder.
We can add a extra check that if the transform
is of type GroupByKey
, we use the aggregate
coder