[cdc]Fix: When the value of entry is null, subsequent java.lang.NullPointerException is thrown
Fix: When the value of entry is null, subsequent java.lang.NullPointerException is thrown
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java
Purpose
When the value of entry is null, subsequent java.lang.NullPointerException is thrown
Fix: When the value of entry is null, subsequent java.lang.NullPointerException is thrown
Can you show the full exception stack?
Can you show the full exception stack?
Test Code:
There are "name": null,
String oggMsg = """
{
"after": {
"id": 111,
"name": null,
"description": "Big 2-wheel scooter",
"weight": 5.15
},
"op_type": "I",
"op_ts": "2020-05-13 15:40:06.000000",
"current_ts": "2020-05-13 15:40:07.000000",
"primary_keys": [
"id"
],
"pos": "00000000000000000000143",
"table": "PRODUCTS"
}
""";
RecordParser recordParser = new OggRecordParser(false, TypeMapping.defaultMapping(), new ArrayList<>());
recordParser.buildSchema(oggMsg);
Source Code:
public abstract class RecordParser implements FlatMapFunction<String, RichCdcMultiplexRecord> {
...
protected Map<String, String> extractRowData(JsonNode record, LinkedHashMap<String, DataType> paimonFieldTypes) {
paimonFieldTypes.putAll(this.fillDefaultTypes(record));
Map<String, Object> recordMap = (Map)JsonSerdeUtil.convertValue(record, new TypeReference<Map<String, Object>>() {
});
Map<String, String> rowData = (Map)recordMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, (entry) -> {
if (Objects.nonNull(entry.getValue()) && !TypeUtils.isBasicType(entry.getValue())) {
try {
return JsonSerdeUtil.writeValueAsString(entry.getValue());
} catch (JsonProcessingException var2) {
JsonProcessingException e = var2;
LOG.error("Failed to deserialize record.", e);
return Objects.toString(entry.getValue());
}
} else {
return Objects.toString(entry.getValue(), (String)null);
}
}));
this.evalComputedColumns(rowData, paimonFieldTypes);
return rowData;
}
...
}
Exception:
Exception in thread "main" java.lang.NullPointerException
at java.base/java.util.Objects.requireNonNull(Objects.java:208)
at java.base/java.util.stream.Collectors.lambda$uniqKeysMapAccumulator$1(Collectors.java:180)
at java.base/java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1845)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
at org.apache.paimon.flink.action.cdc.format.RecordParser.extractRowData(RecordParser.java:146)
at org.apache.paimon.flink.action.cdc.format.RecordParser.processRecord(RecordParser.java:192)
at org.apache.paimon.flink.action.cdc.format.ogg.OggRecordParser.extractRecords(OggRecordParser.java:76)
at org.apache.paimon.flink.action.cdc.format.RecordParser.buildSchema(RecordParser.java:96)
at org.example.Main.main(Main.java:33)
The latest branch has fixed this bug.
https://github.com/apache/paimon/pull/2904
I will close this PR. If you still have questions, you can open it again and continue to add your questions.