beam
beam copied to clipboard
[Feature Request]: UDF supports lambda & RecordType parameter
What would you like to happen?
I want to create a new UDF which supports a lambda parameter and a Row parameter, for example
static class LambdaUDFA implements BeamSqlUdf {
public static List<Row> eval(@Parameter(name = "rows") List<Row> rows,
@Parameter(name = "filter") Predicate1<Row> filter) {
return rows.stream().filter(filter::apply).collect(Collectors.toList());
}
}
But beam SQL throws an exception like this
sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl cannot be cast to java.lang.Class
Issue Priority
Priority: 2 (default / most feature requests should be filed as P2)
Issue Components
- [ ] Component: Python SDK
- [X] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner
There are some difficulties in supporting UDFs with @Parameter(...) Row and @Parameter(...) Predicate1:
- Each parameter type should be translatable to Calcite
RelDataType. - If we want to translate
Rowtype to CalciteRelRecordType, then we need to have an ability to somehow pass aSchemaof thatRow. This is currently not implemented in registering UDFs. - There is no an appropriate Calcite
RelDataTypefor lambda types. - At the moment when UDF is applied, we have all
Rows in form ofObject[]. AndObject[]cannot be automatically cast toRow.
@gabrywu FYI