camel-kafka-connector
camel-kafka-connector copied to clipboard
camel-mongodb-kafka-connector “camel.source.endpoint.streamFilter” invalid
{
"connector.class": "org.apache.camel.kafkaconnector.mongodb.CamelMongodbSourceConnector",
"tasks.max": "1",
"topics": "mongo.test-object.save.source",
"camel.source.endpoint.consumerType": "changeStreams",
"camel.source.path.connectionBean": "mongo",
"errors.deadletterqueue.context.headers.enable": "true",
"camel.source.endpoint.collection": "test_object",
"camel.source.endpoint.database": "ac-test",
"errors.deadletterqueue.topic.name": "mongo.test-object.save.source.deadletter",
"value.converter.schemas.enable": "false",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.replication.factor": "1",
"camel.source.endpoint.streamFilter": "{'$match':{'$or':[{'operationType':'insert'},{'operationType':'update'},{'operationType':'replace'}]}}",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"errors.log.enable": "true",
"camel.source.endpoint.mongoConnection": "#class:com.mongodb.client.MongoClients#create('mongodb://root:[email protected]:27017/?authSource=admin')",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"camel.source.marshal": "org.apache.camel.component.gson.GsonDataFormat",
"camel.source.pollingConsumerQueueSize": "10000",
"camel.source.contentLogLevel": "INFO"
}
The camel.source.endpoint.streamFilter Setting an incorrect JSON format does not result in an exception So my guess is that this setting doesn't work
In the end I surmised that it was the Camel-MongoDB component problem The following is part of the Camel-MongoDB code org/apache/camel/component/mongodb3/MongoDbChangeStreamsConsumer.java
String streamFilter = (String) getRoute().getProperties().get(STREAM_FILTER_PROPERTY);
List<BsonDocument> bsonFilter = null;
if (ObjectHelper.isNotEmpty(streamFilter)) {
bsonFilter = singletonList(BsonDocument.parse(streamFilter));
}
The camel.source.endpoint.streamFilter is URI parameters But camel-mongodb Is to read content from properties Whether the Camle-Mongo component has an incompatible connector configuration
In addition, it is escaped as + when the setting contains Spaces
{
"camel.source.endpoint.streamFilter": "{$match: {operationType: {$in: ['insert', 'update', 'replace']}}}"
}
The log is: MongoDb endpoint: mongodb://mongo?collection=test_object&consumerType=changeStreams&database=ac-test&mongoConnection=%23class%3Acom.mongodb.client.MongoClients%23create%28%27mongodb%3A%2F%2Froot%3Aroot%40192.168.23.164%3A27017%2F%3FauthSource%3Dadmin%27%29&streamFilter=%7B%24match%3A+%7BoperationType%3A+%7B%24in%3A+%5B%27insert%27%2C+%27update%27%2C+%27replace%27%5D%7D%7D%7D
The decode is MongoDb endpoint: mongodb://mongo?collection=test_object&consumerType=changeStreams&database=ac-test&mongoConnection=#class:com.mongodb.client.MongoClients#create('mongodb://root:[email protected]:27017/?authSource=admin')&streamFilter={$match:+{operationType:+{$in:+['insert',+'update',+'replace']}}}
@yingbo-wu as you correctly reported this is a camel component issue, I have taken the liberty to open an issue about it https://issues.apache.org/jira/browse/CAMEL-16716