[Failing Test]:
What happened?
I try running this script: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount_xlang.py
$ python wordcount_xlang.py --output ./ --expansion_service_jar beam-sdks-java-io-expansion-service-2.55.0.jar
I downloaded the JAR file from: https://repo1.maven.org/maven2/org/apache/beam/beam-sdks-java-io-expansion-service/2.55.0/
Here is the output:
$ python wordcount_xlang.py --output ./ --expansion_service_jar beam-sdks-java-io-expansion-service-2.55.0.jar
Starting expansion service at localhost:8096
Apr 23, 2024 1:35:21 PM org.apache.beam.sdk.expansion.service.ExpansionService loadRegisteredTransforms
INFO: Registering external transforms: [beam:transform:org.apache.beam:kafka_write:v2, beam:transform:group_into_batches:v1, beam:transform:org.apache.beam:kafka_read_with_metadata:v2, beam:transform:org.apache.beam:kafka_write:v1, beam:transform:combine_grouped_values:v1, beam:transform:group_into_batches_with_sharded_key:v1, beam:transform:create_view:v1, beam:transform:teststream:v1, beam:transform:sdf_process_keyed_elements:v1, beam:transform:combine_globally:v1, beam:external:java:generate_sequence:v1, beam:transform:window_into:v1, beam:transform:flatten:v1, beam:transform:impulse:v1, beam:runners_core:transforms:splittable_process:v1, beam:transform:write_files:v1, beam:transform:combine_per_key:v1, beam:transform:org.apache.beam:kafka_read_without_metadata:v1, beam:transform:org.apache.beam:kafka_read_with_metadata:v1, beam:transform:group_by_key:v1, beam:transform:reshuffle:v1]
Registered transforms:
beam:transform:org.apache.beam:kafka_write:v2: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@c430e6c
beam:transform:group_into_batches:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@312aa7c
beam:transform:org.apache.beam:kafka_read_with_metadata:v2: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@536f2a7e
beam:transform:org.apache.beam:kafka_write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForBuilder@72bc6553
beam:transform:combine_grouped_values:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@66982506
beam:transform:group_into_batches_with_sharded_key:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@70cf32e3
beam:transform:create_view:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@5a59ca5e
beam:transform:teststream:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@4bdeaabb
beam:transform:sdf_process_keyed_elements:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@6c4906d3
beam:transform:combine_globally:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@65987993
beam:external:java:generate_sequence:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForBuilder@71075444
beam:transform:window_into:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@4f32a3ad
beam:transform:flatten:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@6b695b06
beam:transform:impulse:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@4d1bf319
beam:runners_core:transforms:splittable_process:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@6f53b8a
beam:transform:write_files:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@5c80cf32
beam:transform:combine_per_key:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@7d900ecf
beam:transform:org.apache.beam:kafka_read_without_metadata:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForBuilder@6f01b95f
beam:transform:org.apache.beam:kafka_read_with_metadata:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForBuilder@4007f65e
beam:transform:group_by_key:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@1a245833
beam:transform:reshuffle:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@673fdbce
Registered SchemaTransformProviders:
beam:schematransform:org.apache.beam:yaml:filter-java:v1
beam:schematransform:org.apache.beam:yaml:flatten:v1
beam:schematransform:org.apache.beam:kafka_read:v1
beam:schematransform:org.apache.beam:kafka_write:v1
beam:schematransform:org.apache.beam:yaml:map_to_fields-java:v1
beam:schematransform:org.apache.beam:yaml:window_into_strategy:v1
beam:schematransform:org.apache.beam:generate_sequence:v1
beam:schematransform:org.apache.beam:yaml:log_for_testing:v1
beam:schematransform:org.apache.beam:yaml:explode:v1
INFO:root:Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
Apr 23, 2024 1:35:27 PM org.apache.beam.sdk.expansion.service.ExpansionService expand
INFO: Expanding 'count' with URN 'beam:transforms:xlang:count'
Dependencies list: {}
Traceback (most recent call last):
File "/home/steeve/wordcount_xlang.py", line 129, in <module>
main()
File "/home/steeve/wordcount_xlang.py", line 122, in main
build_pipeline(p, known_args.input, known_args.output)
File "/home/steeve/wordcount_xlang.py", line 62, in build_pipeline
lines
File "/home/steeve/.local/lib/python3.10/site-packages/apache_beam/pvalue.py", line 138, in __or__
return self.pipeline.apply(ptransform, self)
File "/home/steeve/.local/lib/python3.10/site-packages/apache_beam/pipeline.py", line 667, in apply
return self.apply(
File "/home/steeve/.local/lib/python3.10/site-packages/apache_beam/pipeline.py", line 678, in apply
return self.apply(transform, pvalueish)
File "/home/steeve/.local/lib/python3.10/site-packages/apache_beam/pipeline.py", line 731, in apply
pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
File "/home/steeve/.local/lib/python3.10/site-packages/apache_beam/runners/runner.py", line 203, in apply
return self.apply_PTransform(transform, input, options)
File "/home/steeve/.local/lib/python3.10/site-packages/apache_beam/runners/runner.py", line 207, in apply_PTransform
return transform.expand(input)
File "/home/steeve/.local/lib/python3.10/site-packages/apache_beam/transforms/external.py", line 753, in expand
raise RuntimeError(response.error)
RuntimeError: java.lang.UnsupportedOperationException: Unknown urn: beam:transforms:xlang:count
at org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:599)
at org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:710)
at org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:306)
at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:351)
at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:861)
at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Can anyone please help me test this out, it looks like an issue with: RuntimeError: java.lang.UnsupportedOperationException: Unknown urn: beam:transforms:xlang:count
Issue Failure
Failure: Test is flaky
Issue Priority
Priority: 1 (unhealthy code / failing or flaky postcommit so we cannot be sure the product is healthy)
Issue Components
- [X] Component: Python SDK
- [ ] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam YAML
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner
I think the doc about this part is bad. https://github.com/apache/beam/blob/master/sdks/java/testing/expansion-service/src/test/java/org/apache/beam/sdk/testing/expansion/TestExpansionService.java#L80 defines that Java external transform. You need to add that to the expansion service, something like https://beam.apache.org/documentation/sdks/python-multi-language-pipelines/#choose-an-expansion-service.
Or following https://github.com/apache/beam/tree/master/examples/multi-language to use the maven jar Beam releases: beam-examples-multi-language*