flink-connectors icon indicating copy to clipboard operation
flink-connectors copied to clipboard

Allow inner class being parameter of `withTimestampAssigner` in Table API stream table source

Open crazyzhou opened this issue 5 years ago • 3 comments

Problem description Due to the serialization validation for connectorProperties, only public static-inner/outer class implements AssignerWithTimeWindow is supported as a parameter of withTimestampAssigner in Table API stream table source. We should support inner class for easier use.

Problem location https://github.com/pravega/flink-connectors/blob/master/src/main/java/io/pravega/connectors/flink/Pravega.java#L292

Suggestions for an improvement

crazyzhou avatar Oct 17 '19 01:10 crazyzhou

I'm a little concerned that we're missing an ingredient with how assigners would be configured in Flink SQL. One of the main use cases of Flink SQL is to be able to use Flink without writing Java code. It is therefore important that a table source provide a pure properties-based experience. See the pure-SQL experience depicted here: https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#kafka-connector

Basically, we need a way to interoperate with the rowtime schema elements as seen here: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#rowtime-attributes

One approach could be, recommend that the user extract the timestamp by using from-field for timestamp extraction, while using from-source for watermark generation. This may require some enhancement to the underlying source function, so emit watermarks without emitting timestamps.

rowtime:
  timestamps:
    type: from-field
  watermarks:
    type: from-source

EronWright avatar Oct 17 '19 21:10 EronWright

Also, notice in the Kafka table source that there's a property called sink-partitioner (with an enum for possible values) and a property called sink-partitioner-class. Consider using this naming convention (-class) wherever we take a class name.

EronWright avatar Oct 17 '19 21:10 EronWright

Thanks for the comment. @EronWright Here is what I'd like to explain.

rowtime:
  timestamps:
    type: from-field
  watermarks:
    type: from-source

This one is exactly what I have considered during the design. The problem is that when users want to use source with Pravega watermark, we force them to give an implementation of timestamp for each event. This implementation of our AssignerWithTimeWindows is required to enable Pravega watermark in Flink source. We also pass it into the connectorProperties which causes this issue. Therefore, in order not to conflict with users' assignment in the AssignerWithTimeWindows, we currently only support below style to enable Pravega watermark in Table API.

rowtime:
  timestamps:
    type: from-source
  watermarks:
    type: from-source

crazyzhou avatar Oct 18 '19 07:10 crazyzhou