flink-connectors
flink-connectors copied to clipboard
Allow inner class being parameter of `withTimestampAssigner` in Table API stream table source
Problem description
Due to the serialization validation for connectorProperties
, only public
static-inner/outer
class implements AssignerWithTimeWindow
is supported as a parameter of withTimestampAssigner
in Table API stream table source. We should support inner class for easier use.
Problem location https://github.com/pravega/flink-connectors/blob/master/src/main/java/io/pravega/connectors/flink/Pravega.java#L292
Suggestions for an improvement
I'm a little concerned that we're missing an ingredient with how assigners would be configured in Flink SQL. One of the main use cases of Flink SQL is to be able to use Flink without writing Java code. It is therefore important that a table source provide a pure properties-based experience. See the pure-SQL experience depicted here: https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#kafka-connector
Basically, we need a way to interoperate with the rowtime schema elements as seen here: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#rowtime-attributes
One approach could be, recommend that the user extract the timestamp by using from-field
for timestamp extraction, while using from-source
for watermark generation. This may require some enhancement to the underlying source function, so emit watermarks without emitting timestamps.
rowtime:
timestamps:
type: from-field
watermarks:
type: from-source
Also, notice in the Kafka table source that there's a property called sink-partitioner
(with an enum for possible values) and a property called sink-partitioner-class
. Consider using this naming convention (-class
) wherever we take a class name.
Thanks for the comment. @EronWright Here is what I'd like to explain.
rowtime: timestamps: type: from-field watermarks: type: from-source
This one is exactly what I have considered during the design. The problem is that when users want to use source with Pravega watermark, we force them to give an implementation of timestamp for each event. This implementation of our AssignerWithTimeWindows
is required to enable Pravega watermark in Flink source. We also pass it into the connectorProperties
which causes this issue. Therefore, in order not to conflict with users' assignment in the AssignerWithTimeWindows
, we currently only support below style to enable Pravega watermark in Table API.
rowtime:
timestamps:
type: from-source
watermarks:
type: from-source