starrocks-connector-for-apache-flink icon indicating copy to clipboard operation
starrocks-connector-for-apache-flink copied to clipboard

The format is JSON, and the dynamic column update is specified. Why not add columns in the code by default like CSV?

Open fs3085 opened this issue 2 years ago • 2 comments

Version: flink-connector-starrocks:1.2.3_flink-1.15 flink:1.15.0

environment: idea

doubt: When dynamic column update is specified, CSV does not need to set sink.properties Columns is set according to fieldnames in the Flink connector starlocks code when the Flink SQL table is built. However, JSON does not. It is because JSON has any different operations from CSV or it is forgotten to add them in the Flink connector starlocks code of JSON

Relevant source code:

//CLASS:StarRocksStreamLoadVisitor.java 
//METHOD:doHttpPut
        try (CloseableHttpClient httpclient = httpClientBuilder.build()) {
            HttpPut httpPut = new HttpPut(loadUrl);
            Map<String, String> props = sinkOptions.getSinkStreamLoadProperties();
            for (Map.Entry<String,String> entry : props.entrySet()) {
                httpPut.setHeader(entry.getKey(), entry.getValue());
            }
            if (!props.containsKey("columns") && ((sinkOptions.supportUpsertDelete() && !__opAutoProjectionInJson) || StarRocksSinkOptions.StreamLoadFormat.CSV.equals(sinkOptions.getStreamLoadFormat()))) {
                String cols = String.join(",", Arrays.asList(fieldNames).stream().map(f -> String.format("`%s`", f.trim().replace("`", ""))).collect(Collectors.toList()));
                if (cols.length() > 0 && sinkOptions.supportUpsertDelete()) {
                    cols += String.format(",%s", StarRocksSinkOP.COLUMN_KEY); //COLUMN_KEY="__op"
                }
                httpPut.setHeader("columns", cols);
            }

last:

  1. If the logic about JSON is omitted from the code, I can submit PR to add relevant logic.
  2. If it is designed in this way, please explain the reason for the design.

fs3085 avatar Aug 19 '22 03:08 fs3085

Look forward to your reply @hffariel

fs3085 avatar Aug 19 '22 03:08 fs3085

you can see StarRocksSerializerFactory#createSerializer

xlfjcg avatar Dec 08 '22 14:12 xlfjcg