starrocks-connector-for-apache-flink
starrocks-connector-for-apache-flink copied to clipboard
Get the table name from Kafka and sink the data into the corresponding table. Can this method be realized by modifying the source code? What needs to be done in the invoke method? I tried to initialize sinkoptions and sinkmanager in the invoke method, but it didn't work.
Get the table name from Kafka and sink the data into the corresponding table. Can this method be realized by modifying the source code? What needs to be done in the invoke method? I tried to initialize sinkoptions and sinkmanager in the invoke method, but it didn't work.
Examples: kafka datastream<Tuple2<String, String>> : ("{"score": "99", "name": "stephen"}", "tmp_sr_test_api_2")
StarRocksSink : StarRocksSinkOptions.builder() .withProperty("jdbc-url", ConfigCommon.FLINK_STARROCKS_JDBC_URL) .withProperty("load-url", ConfigCommon.FLINK_STARROCKS_LOAD_URL) .withProperty("username", ConfigCommon.STARROCKS_USER) .withProperty("password", ConfigCommon.STARROCKS_PASSWORD) .withProperty("table-name", "tmp_sr_test_api_1") .withProperty("database-name", ConfigCommon.DATABASE_NAME) .withProperty("sink.properties.format", "json") .withProperty("sink.properties.strip_outer_array", "true") .build()
Finally, I want to add the data to the “tmp_sr_test_api_2” table。