chunjun icon indicating copy to clipboard operation
chunjun copied to clipboard

Mongodb-sink会报空指针问题

Open frank-zkz opened this issue 3 years ago • 1 comments

Describe the bug sink会报空指针问题

To Reproduce 1、在类MongodbDynamicTableSink中走的是

MongodbOutputFormatBuilder builder =
                new MongodbOutputFormatBuilder(
                        mongoClientConf, null, MongodbOutputFormat.WriteMode.INSERT);

这个构造方法,如下代码所示

public class MongodbDynamicTableSink implements DynamicTableSink {
  .......
    @Override
    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
        final RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();
        String[] fieldNames = physicalSchema.getFieldNames();
        MongodbOutputFormatBuilder builder =
                new MongodbOutputFormatBuilder(
                        mongoClientConf, null, MongodbOutputFormat.WriteMode.INSERT);
        FlinkxCommonConf flinkxCommonConf = new FlinkxCommonConf();
        flinkxCommonConf.setBatchSize(mongoWriteConf.getFlushMaxRows());
        flinkxCommonConf.setFlushIntervalMills(mongoWriteConf.getFlushInterval());
        builder.setConfig(flinkxCommonConf);
        builder.setRowConverter(new MongodbRowConverter(rowType, fieldNames));
        return SinkFunctionProvider.of(
                new DtOutputFormatSinkFunction(builder.finish()), mongoWriteConf.getParallelism());
    }
..........
}

2、在类MongodbOutputFormatBuilder.checkFormat方法中 String upsertKey = mongodbDataSyncConf.getReplaceKey();会报空指针异常

public class MongodbOutputFormatBuilder extends BaseRichOutputFormatBuilder {
    MongodbDataSyncConf mongodbDataSyncConf;
    public MongodbOutputFormatBuilder(MongodbDataSyncConf mongodbDataSyncConf) {
        this.mongodbDataSyncConf = mongodbDataSyncConf;
        MongoClientConf mongoClientConf =
                MongoClientConfFactory.createMongoClientConf(mongodbDataSyncConf);
        MongodbOutputFormat.WriteMode writeMode =
                parseWriteMode(mongodbDataSyncConf.getWriteMode());
        this.format =
                new MongodbOutputFormat(
                        mongoClientConf, mongodbDataSyncConf.getReplaceKey(), writeMode);
    }

    //走的是这个构造方法,不会给mongodbDataSyncConf赋值
    public MongodbOutputFormatBuilder(
            MongoClientConf mongoClientConf, String key, MongodbOutputFormat.WriteMode writeMode) {
        this.format = new MongodbOutputFormat(mongoClientConf, key, writeMode);
    }

    @Override
    protected void checkFormat() {
        String upsertKey = mongodbDataSyncConf.getReplaceKey();
        if (!StringUtils.isBlank(upsertKey)) {
            List<FieldConf> fields = mongodbDataSyncConf.getColumn();
            boolean flag = false;
            for (FieldConf field : fields) {
                if (field.getName().equalsIgnoreCase(upsertKey)) {
                    flag = true;
                    break;
                }
            }
            if (!flag) {
                throw new IllegalArgumentException(
                        String.format(
                                "upsertKey must be included in the column,upsertKey=[%s]",
                                upsertKey));
            }
        }
    }

    private MongodbOutputFormat.WriteMode parseWriteMode(String str) {
        if (WriteMode.REPLACE.getMode().equals(str) || WriteMode.UPDATE.getMode().equals(str)) {
            return MongodbOutputFormat.WriteMode.UPSERT;
        } else {
            return MongodbOutputFormat.WriteMode.INSERT;
        }
    }
}

frank-zkz avatar May 16 '22 07:05 frank-zkz

Thanks for this issue, can you fix it?

FlechazoW avatar May 16 '22 07:05 FlechazoW