[SEDONA-142] Add ST_Collect to Flink Catalog
Did you read the Contributor Guide?
-
Yes, I have read Contributor Rules and Contributor Development Guide
-
Yes.
Is this PR related to a JIRA ticket?
- Yes, the URL of the assoicated JIRA ticket is https://issues.apache.org/jira/browse/SEDONA-142. The PR name follows the format
[SEDONA-142] Add ST_Collect to Flink Catalog.
I actually created to JIRA tickets by accident (SEDONA-141). One could/should be deleted. Sorry for that.
What changes were proposed in this PR?
This is the first attempt to add ST_Collect aggregation to the flink api.
How was this patch tested?
In progress For myself I only tested aggregations with group by. I guess it would be necessary to test window/over aggregations as well. Does anyone have specific data/queries for these in mind?
Did this PR include necessary documentation updates?
Currently not.
In advance this pull request is mainly done to discuss if this is a feasible approach to implement ST_Collect. The PR would have to be cleaned up (split up), tests would need to be added and it would need to be documented.
I basically took the Internal Collect Aggregator from flink-table and adjusted it's types, letting it return a GeometryCollection. The getTypeInference needs to be overriden otherwise reflection fails for the mapview / selecting the right serializer fails. One Problem is/was that this function is not instantiated from a BuiltInFunctionDefinition/, so it is not possible to implement it as BuiltInAggregateFunction and let it pass a SpecializedFunction.SpecializedContext to its constructor. And flinks collect function is constructed through its AggFunctionFactory which also passes the argType. So I needed to pass the Datatype for the geometry manually when constructing and adding it to the catalog, therefore needing a util function in the registrator.
If needed I add separate prs to ST_MakeValid which is an almost 1-t port/copy from the spark version, and the ST_Intersection, ST_Difference which just wrap the jts locationtech functions.
Thanks for your contribution. I have a much neat implementation of spatial aggregation in Flink using the UDAF api. It has been there for a while and I just forgot to create a PR.
I will create a new PR in one or two days. Then you can follow my implementation to tweak your ST_Collect a bit.
Thanks for your contribution. I have a much neat implementation of spatial aggregation in Flink using the UDAF api. It has been there for a while and I just forgot to create a PR.
I will create a new PR in one or two days. Then you can follow my implementation to tweak your ST_Collect a bit.
Sounds good. As stated earlier this really just is a copy/minimal port of flinks built in CollectAggFunction with a different output type (GeometryCollection instead of a MultiSet<Geometry> ) using the AggregateFunction Interface instead of BuiltInAggregateFunction.
Please see PR #672 for the updates
Hi, sorry for the late response. I saw that this was marked as part of the milestone. Unfortunately time is limited for me right now and I am not able to work on this for the next 1-2 weeks. If there is a scheduled timeline for the release of 1.3, I guess this PR should not hold it back. If in the meantime someone else wants to improve this PR feel free to do so, otherwise I hopefully will be able to do this roughly until the beginning of October.