chunjun
chunjun copied to clipboard
Mongodb-sink会报空指针问题
Describe the bug sink会报空指针问题
To Reproduce 1、在类MongodbDynamicTableSink中走的是
MongodbOutputFormatBuilder builder =
new MongodbOutputFormatBuilder(
mongoClientConf, null, MongodbOutputFormat.WriteMode.INSERT);
这个构造方法,如下代码所示
public class MongodbDynamicTableSink implements DynamicTableSink {
.......
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
final RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();
String[] fieldNames = physicalSchema.getFieldNames();
MongodbOutputFormatBuilder builder =
new MongodbOutputFormatBuilder(
mongoClientConf, null, MongodbOutputFormat.WriteMode.INSERT);
FlinkxCommonConf flinkxCommonConf = new FlinkxCommonConf();
flinkxCommonConf.setBatchSize(mongoWriteConf.getFlushMaxRows());
flinkxCommonConf.setFlushIntervalMills(mongoWriteConf.getFlushInterval());
builder.setConfig(flinkxCommonConf);
builder.setRowConverter(new MongodbRowConverter(rowType, fieldNames));
return SinkFunctionProvider.of(
new DtOutputFormatSinkFunction(builder.finish()), mongoWriteConf.getParallelism());
}
..........
}
2、在类MongodbOutputFormatBuilder.checkFormat方法中 String upsertKey = mongodbDataSyncConf.getReplaceKey();会报空指针异常
public class MongodbOutputFormatBuilder extends BaseRichOutputFormatBuilder {
MongodbDataSyncConf mongodbDataSyncConf;
public MongodbOutputFormatBuilder(MongodbDataSyncConf mongodbDataSyncConf) {
this.mongodbDataSyncConf = mongodbDataSyncConf;
MongoClientConf mongoClientConf =
MongoClientConfFactory.createMongoClientConf(mongodbDataSyncConf);
MongodbOutputFormat.WriteMode writeMode =
parseWriteMode(mongodbDataSyncConf.getWriteMode());
this.format =
new MongodbOutputFormat(
mongoClientConf, mongodbDataSyncConf.getReplaceKey(), writeMode);
}
//走的是这个构造方法,不会给mongodbDataSyncConf赋值
public MongodbOutputFormatBuilder(
MongoClientConf mongoClientConf, String key, MongodbOutputFormat.WriteMode writeMode) {
this.format = new MongodbOutputFormat(mongoClientConf, key, writeMode);
}
@Override
protected void checkFormat() {
String upsertKey = mongodbDataSyncConf.getReplaceKey();
if (!StringUtils.isBlank(upsertKey)) {
List<FieldConf> fields = mongodbDataSyncConf.getColumn();
boolean flag = false;
for (FieldConf field : fields) {
if (field.getName().equalsIgnoreCase(upsertKey)) {
flag = true;
break;
}
}
if (!flag) {
throw new IllegalArgumentException(
String.format(
"upsertKey must be included in the column,upsertKey=[%s]",
upsertKey));
}
}
}
private MongodbOutputFormat.WriteMode parseWriteMode(String str) {
if (WriteMode.REPLACE.getMode().equals(str) || WriteMode.UPDATE.getMode().equals(str)) {
return MongodbOutputFormat.WriteMode.UPSERT;
} else {
return MongodbOutputFormat.WriteMode.INSERT;
}
}
}
Thanks for this issue, can you fix it?