paimon icon indicating copy to clipboard operation
paimon copied to clipboard

[cdc]Fix: When the value of entry is null, subsequent java.lang.NullPointerException is thrown

Open scottxing opened this issue 1 year ago • 4 comments

Fix: When the value of entry is null, subsequent java.lang.NullPointerException is thrown

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java

Purpose

When the value of entry is null, subsequent java.lang.NullPointerException is thrown

scottxing avatar Apr 08 '24 03:04 scottxing

Fix: When the value of entry is null, subsequent java.lang.NullPointerException is thrown

scottxing avatar Apr 08 '24 03:04 scottxing

Can you show the full exception stack?

JingsongLi avatar Apr 15 '24 07:04 JingsongLi

Can you show the full exception stack?

Test Code:

There are "name": null,

        String oggMsg = """
                        {
                          "after": {
                            "id": 111,
                            "name": null,
                            "description": "Big 2-wheel scooter",
                            "weight": 5.15
                          },
                          "op_type": "I",
                          "op_ts": "2020-05-13 15:40:06.000000",
                          "current_ts": "2020-05-13 15:40:07.000000",
                          "primary_keys": [
                            "id"
                          ],
                          "pos": "00000000000000000000143",
                          "table": "PRODUCTS"
                        }
                """;

        RecordParser recordParser = new OggRecordParser(false, TypeMapping.defaultMapping(), new ArrayList<>());
        recordParser.buildSchema(oggMsg);

Source Code:

public abstract class RecordParser implements FlatMapFunction<String, RichCdcMultiplexRecord> {

  ...

    protected Map<String, String> extractRowData(JsonNode record, LinkedHashMap<String, DataType> paimonFieldTypes) {
        paimonFieldTypes.putAll(this.fillDefaultTypes(record));
        Map<String, Object> recordMap = (Map)JsonSerdeUtil.convertValue(record, new TypeReference<Map<String, Object>>() {
        });
        Map<String, String> rowData = (Map)recordMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, (entry) -> {
            if (Objects.nonNull(entry.getValue()) && !TypeUtils.isBasicType(entry.getValue())) {
                try {
                    return JsonSerdeUtil.writeValueAsString(entry.getValue());
                } catch (JsonProcessingException var2) {
                    JsonProcessingException e = var2;
                    LOG.error("Failed to deserialize record.", e);
                    return Objects.toString(entry.getValue());
                }
            } else {
                return Objects.toString(entry.getValue(), (String)null);
            }
        }));
        this.evalComputedColumns(rowData, paimonFieldTypes);
        return rowData;
    }
...

}

Exception:

Exception in thread "main" java.lang.NullPointerException
	at java.base/java.util.Objects.requireNonNull(Objects.java:208)
	at java.base/java.util.stream.Collectors.lambda$uniqKeysMapAccumulator$1(Collectors.java:180)
	at java.base/java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
	at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
	at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1845)
	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
	at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
	at org.apache.paimon.flink.action.cdc.format.RecordParser.extractRowData(RecordParser.java:146)
	at org.apache.paimon.flink.action.cdc.format.RecordParser.processRecord(RecordParser.java:192)
	at org.apache.paimon.flink.action.cdc.format.ogg.OggRecordParser.extractRecords(OggRecordParser.java:76)
	at org.apache.paimon.flink.action.cdc.format.RecordParser.buildSchema(RecordParser.java:96)
	at org.example.Main.main(Main.java:33)

scottxing avatar Apr 15 '24 13:04 scottxing

The latest branch has fixed this bug.

https://github.com/apache/paimon/pull/2904

zhuangchong avatar Apr 16 '24 04:04 zhuangchong

I will close this PR. If you still have questions, you can open it again and continue to add your questions.

zhuangchong avatar Apr 22 '24 01:04 zhuangchong