sedona icon indicating copy to clipboard operation
sedona copied to clipboard

[SEDONA-142] Add ST_Collect to Flink Catalog

Open Elephantusparvus opened this issue 3 years ago • 4 comments

Did you read the Contributor Guide?

Is this PR related to a JIRA ticket?

  • Yes, the URL of the assoicated JIRA ticket is https://issues.apache.org/jira/browse/SEDONA-142. The PR name follows the format [SEDONA-142] Add ST_Collect to Flink Catalog.

I actually created to JIRA tickets by accident (SEDONA-141). One could/should be deleted. Sorry for that.

What changes were proposed in this PR?

This is the first attempt to add ST_Collect aggregation to the flink api.

How was this patch tested?

In progress For myself I only tested aggregations with group by. I guess it would be necessary to test window/over aggregations as well. Does anyone have specific data/queries for these in mind?

Did this PR include necessary documentation updates?

Currently not.

In advance this pull request is mainly done to discuss if this is a feasible approach to implement ST_Collect. The PR would have to be cleaned up (split up), tests would need to be added and it would need to be documented.

I basically took the Internal Collect Aggregator from flink-table and adjusted it's types, letting it return a GeometryCollection. The getTypeInference needs to be overriden otherwise reflection fails for the mapview / selecting the right serializer fails. One Problem is/was that this function is not instantiated from a BuiltInFunctionDefinition/, so it is not possible to implement it as BuiltInAggregateFunction and let it pass a SpecializedFunction.SpecializedContext to its constructor. And flinks collect function is constructed through its AggFunctionFactory which also passes the argType. So I needed to pass the Datatype for the geometry manually when constructing and adding it to the catalog, therefore needing a util function in the registrator.

If needed I add separate prs to ST_MakeValid which is an almost 1-t port/copy from the spark version, and the ST_Intersection, ST_Difference which just wrap the jts locationtech functions.

Elephantusparvus avatar Aug 15 '22 09:08 Elephantusparvus

Thanks for your contribution. I have a much neat implementation of spatial aggregation in Flink using the UDAF api. It has been there for a while and I just forgot to create a PR.

I will create a new PR in one or two days. Then you can follow my implementation to tweak your ST_Collect a bit.

jiayuasu avatar Aug 17 '22 05:08 jiayuasu

Thanks for your contribution. I have a much neat implementation of spatial aggregation in Flink using the UDAF api. It has been there for a while and I just forgot to create a PR.

I will create a new PR in one or two days. Then you can follow my implementation to tweak your ST_Collect a bit.

Sounds good. As stated earlier this really just is a copy/minimal port of flinks built in CollectAggFunction with a different output type (GeometryCollection instead of a MultiSet<Geometry> ) using the AggregateFunction Interface instead of BuiltInAggregateFunction.

Elephantusparvus avatar Aug 17 '22 08:08 Elephantusparvus

Please see PR #672 for the updates

jiayuasu avatar Aug 23 '22 20:08 jiayuasu

Hi, sorry for the late response. I saw that this was marked as part of the milestone. Unfortunately time is limited for me right now and I am not able to work on this for the next 1-2 weeks. If there is a scheduled timeline for the release of 1.3, I guess this PR should not hold it back. If in the meantime someone else wants to improve this PR feel free to do so, otherwise I hopefully will be able to do this roughly until the beginning of October.

Elephantusparvus avatar Sep 07 '22 11:09 Elephantusparvus