starrocks-connector-for-apache-flink
starrocks-connector-for-apache-flink copied to clipboard
The format is JSON, and the dynamic column update is specified. Why not add columns in the code by default like CSV?
Version: flink-connector-starrocks:1.2.3_flink-1.15 flink:1.15.0
environment: idea
doubt: When dynamic column update is specified, CSV does not need to set sink.properties Columns is set according to fieldnames in the Flink connector starlocks code when the Flink SQL table is built. However, JSON does not. It is because JSON has any different operations from CSV or it is forgotten to add them in the Flink connector starlocks code of JSON
Relevant source code:
//CLASS:StarRocksStreamLoadVisitor.java
//METHOD:doHttpPut
try (CloseableHttpClient httpclient = httpClientBuilder.build()) {
HttpPut httpPut = new HttpPut(loadUrl);
Map<String, String> props = sinkOptions.getSinkStreamLoadProperties();
for (Map.Entry<String,String> entry : props.entrySet()) {
httpPut.setHeader(entry.getKey(), entry.getValue());
}
if (!props.containsKey("columns") && ((sinkOptions.supportUpsertDelete() && !__opAutoProjectionInJson) || StarRocksSinkOptions.StreamLoadFormat.CSV.equals(sinkOptions.getStreamLoadFormat()))) {
String cols = String.join(",", Arrays.asList(fieldNames).stream().map(f -> String.format("`%s`", f.trim().replace("`", ""))).collect(Collectors.toList()));
if (cols.length() > 0 && sinkOptions.supportUpsertDelete()) {
cols += String.format(",%s", StarRocksSinkOP.COLUMN_KEY); //COLUMN_KEY="__op"
}
httpPut.setHeader("columns", cols);
}
last:
- If the logic about JSON is omitted from the code, I can submit PR to add relevant logic.
- If it is designed in this way, please explain the reason for the design.
Look forward to your reply @hffariel
you can see StarRocksSerializerFactory#createSerializer