sedona icon indicating copy to clipboard operation
sedona copied to clipboard

[GH-2402] Add Sedona Flink SQL module

Open radekaadek opened this issue 2 months ago • 11 comments

Did you read the Contributor Guide?

Is this PR related to a ticket?

  • Yes, and the PR name follows the format [GH-2402] my subject. Closes #2402

What changes were proposed in this PR?

Add a Sedona Flink module for people who use the Flink SQL Gateway.

How was this patch tested?

Did this PR include necessary documentation updates?

radekaadek avatar Oct 25 '25 13:10 radekaadek

I have tested the module manually and it loads in Flink but the functions are not usable.

I can call LOAD MODULE sedona and after calling SHOW FUNCTIONS I get a list that shows all of them but after trying to actually call one of them, for example:

SELECT ST_Degrees(2.0)

I get

org.apache.flink.table.api.ValidationException: SQL validation failed. SQL validation failed. From line 1, column 26 to line 1, column 56: No match found for function signature ST_Degrees(<NUMERIC>)

When i explicitly cast the number to a double or an integer the same issue appears. Do you have any ideas on what might be going on @Imbruced?

EDIT: I fixed the issue, it was linked to incorrect casing and I made everything lowercase.

radekaadek avatar Oct 25 '25 14:10 radekaadek

When I call SELECT ST_Point(2,2) It produces ugly jobs where it casts the 2s from NUMERIC to DOUBLE: SELECT ST_Point(CAST(2 AS DOUBLE), CAST(2 AS DOUBLE))

Other than that, I'll work on the types tomorrow.

radekaadek avatar Oct 25 '25 18:10 radekaadek

I have been looking through the code and it seems to me that either there was a mistake or I'm missing something because all of the functions in flink/src/main/java/org/apache/sedona/flink/expressions/Functions.java have this: @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)

For example:

  public static class ST_Centroid extends ScalarFunction {
    @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
    public Geometry eval(
        @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
            Object o) {
      Geometry geom = (Geometry) o;
      return org.apache.sedona.common.Functions.getCentroid(geom);
    }
  }

The documentation of this DataTypeHint states that this way of defining hints leads to using the Flink's default serializer:

 * <p>{@code @DataTypeHint(value = "RAW", bridgedTo = MyCustomClass.class)} defines a RAW data type
 * with Flink's default serializer for class {@code MyCustomClass}.

Making the functions use custom serializers requires passing a rawSerializer:

 * <p>{@code @DataTypeHint(value = "RAW", rawSerializer = MyCustomSerializer.class)} defines a RAW
 * data type with a custom serializer class.

What do you think about this @Imbruced?

radekaadek avatar Oct 26 '25 16:10 radekaadek

@radekaadek, thanks for the research 🙇 Are you able to verify if it applies correctly with your changes? cc: @jiayuasu (iirc, you have been workign on the Flink java)

Imbruced avatar Oct 26 '25 21:10 Imbruced

@radekaadek I think the only reason why I did that is because otherwise no way to register those functions. If there is a clear path now, please feel free to use the customized serializer. Please use Sedona's own geom serializer if this is the case: https://github.com/apache/sedona/tree/master/common/src/main/java/org/apache/sedona/common/geometrySerde

Our serializer has way better performance than the default Java / Kyro serializer

jiayuasu avatar Oct 26 '25 23:10 jiayuasu

Well, looks like the current code already uses our serializer

jiayuasu avatar Oct 27 '25 01:10 jiayuasu

I guess the reason that it works now is because Flink automatically infers that a serializer that was passed in when that type was created. I suppose I would need to change all of the functions to use the custom serializer in order to get it to work with the module.

Can you tell me how you were able to verify that the code actually uses custom serializers @jiayuasu ? I had been trying to put print statements inside of the geometry serialization class you mentioned but they never triggered when I was testing my code.

I don't know if I'll be able to serialize the index as I don't see it called in a function anywhere but I'll try to instantiate it's type and serializer when the module is loaded.

The types in the module resolve to this RAW type where a custom serializer can be specified and the types themselves should already be handled correctly by Flink.

I'll write some tests and documentation for the module once I'll have some time on my hands.

radekaadek avatar Oct 27 '25 06:10 radekaadek

I have added some tests for the module and made the functions use the custom serializer. I still don't know how to verify that the functions in the module are actually using the custom serializer, so I would appreciate it if someone could help me with that.

I also noticed that all tests are initialized with the initialize method:

  static void initialize(boolean enableWebUI) {
    env =
        enableWebUI
            ? StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration())
            : StreamExecutionEnvironment.getExecutionEnvironment();
    EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
    tableEnv = SedonaContext.create(env, StreamTableEnvironment.create(env, settings));
  }

It occurred to me that the module could reuse most, if not all, of the test cases if this method were modified to something like this:

  static void initialize(boolean enableWebUI) {
    env =
        enableWebUI
            ? StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration())
            : StreamExecutionEnvironment.getExecutionEnvironment();
    EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
    tableEnv = StreamTableEnvironment.create(env, settings);
    tableEnv.executeSql("LOAD MODULE sedona");
  }

This would require some refactoring, but let me know what you think.

radekaadek avatar Nov 01 '25 13:11 radekaadek

@radekaadek regarding logs, did you try to use log4j in Flink to print logs?

jiayuasu avatar Nov 02 '25 04:11 jiayuasu

@jiayuasu I solved the issue by making the serializer make some files on my machine, and it did, but I don't know how you would like see it actually being tested in unit tests. I also found that I had missed the Geometry[] type and have added it too.

I have not noticed an error appeared in the CI that looks like this:

testTableToDS(org.apache.sedona.flink.AdapterTest)  Time elapsed: 0.889 sec  <<< ERROR!
org.apache.flink.table.api.ValidationException: Data type 'RAW('java.lang.Object', '...')' does not support an output conversion to class 'org.locationtech.jts.geom.Geometry'.

and I don't know if where it's coming from.

After implementing the Geometry[] serializer I also noticed more errors:

testDump(org.apache.sedona.flink.FunctionTest)  Time elapsed: 0.006 sec  <<< ERROR!
org.apache.flink.table.api.ValidationException: Could not extract a valid type inference for function class 'org.apache.sedona.flink.expressions.Functions$ST_Dump'. Please check for implementation mistakes and/or provide a corresponding hint.

The rest of the tests that are currently failing all seem to be connected to the Geometry[] type:

Tests in error:
  AdapterTest.testTableToDS:51 » Validation Data type 'RAW('java.lang.Object', '...
  FunctionTest.testCollectWithArray:277->TestBase.first:464 » Runtime Failed to ...
  FunctionTest.testDump:378 » Validation Could not extract a valid type inferenc...
  FunctionTest.testDumpPoints:387 » Validation Could not extract a valid type in...
  FunctionTest.testH3ToGeom:2088 » Validation SQL validation failed. An error oc...
  FunctionTest.testLineSegments:1632 » Validation Could not extract a valid type...
  FunctionTest.testMakeLine:1668->TestBase.first:464 » Runtime Failed to fetch n...
  FunctionTest.testMakePolygonWithHoles:1722->TestBase.first:464 » Runtime Faile...
  FunctionTest.testS2ToGeom:2009 » Validation SQL validation failed. An error oc...
  FunctionTest.testSubdivide:1936 » Validation Could not extract a valid type in...
  FunctionTest.testUnionArrayVariant:532->TestBase.first:464 » Runtime Failed to...

The Geometry[] class is never added in ~/prg/sedona/flink/src/main/java/org/apache/sedona/flink/SedonaContext.java and it's a different type from GeometryCollection, and this is probably the reason why I'm seeing all of these errors. Was it ever verified that Geometry[] is also properly registered and serialized?

radekaadek avatar Nov 02 '25 16:11 radekaadek

sorry I am traveling this week so my response might be slow

jiayuasu avatar Nov 04 '25 17:11 jiayuasu

@radekaadek I've fixed the failed test cases. It turns out that you just need to add back the bridgedTo, and make the GeometryArrayTypeSerializer public.

jiayuasu avatar Dec 17 '25 05:12 jiayuasu