starrocks-connector-for-apache-flink icon indicating copy to clipboard operation
starrocks-connector-for-apache-flink copied to clipboard

Get the table name from Kafka and sink the data into the corresponding table. Can this method be realized by modifying the source code? What needs to be done in the invoke method? I tried to initialize sinkoptions and sinkmanager in the invoke method, but it didn't work.

Open awk333 opened this issue 3 years ago • 0 comments

Get the table name from Kafka and sink the data into the corresponding table. Can this method be realized by modifying the source code? What needs to be done in the invoke method? I tried to initialize sinkoptions and sinkmanager in the invoke method, but it didn't work.

Examples: kafka datastream<Tuple2<String, String>> : ("{"score": "99", "name": "stephen"}", "tmp_sr_test_api_2")

StarRocksSink : StarRocksSinkOptions.builder() .withProperty("jdbc-url", ConfigCommon.FLINK_STARROCKS_JDBC_URL) .withProperty("load-url", ConfigCommon.FLINK_STARROCKS_LOAD_URL) .withProperty("username", ConfigCommon.STARROCKS_USER) .withProperty("password", ConfigCommon.STARROCKS_PASSWORD) .withProperty("table-name", "tmp_sr_test_api_1") .withProperty("database-name", ConfigCommon.DATABASE_NAME) .withProperty("sink.properties.format", "json") .withProperty("sink.properties.strip_outer_array", "true") .build()

Finally, I want to add the data to the “tmp_sr_test_api_2” table。

awk333 avatar Feb 22 '22 11:02 awk333