camel-kafka-connector
camel-kafka-connector copied to clipboard
[issue] [salesforce source] sending replayId to re-run old records
Hi,
I am testing salesforce source connector to see if it can re-run old messages incase there is some issue or api limitation error. I am on connector version 3.18.2 with config like
"tasks.max": "1",
"connector.class": "org.apache.camel.kafkaconnector.salesforcesource.CamelSalesforcesourceSourceConnector",
"topics": "nam_sf_source_test",
"camel.main.streamCachingEnabled": false,
"camel.source.path.topicName": "/data/Fan__ChangeEvent",
"camel.component.salesforce.sObjectQuery": "select Id,Email__c,First_Name__c,LastModifiedDate from Fan__c where IsDeleted=false",
"camel.component.salesforce.updateTopic": true,
"camel.component.salesforce.rawPayload": true,
"camel.component.salesforce.defaultReplayId": "4497",
"camel.source.endpoint.defaultReplayId": "4497",
"camel.source.endpoint.replayId": "4497",
"camel.kamelet.salesforce-source.replayId": "4497",
"camel.kamelet.salesforce-source.defaultReplayId": "4497",
I was not sure which naming will work so i added all of those that i could find in different documentation. But none of these work. The logs always show Replay from -1
As a workaround i found that i can send it as a part of topic like
SF_TOPIC: "/data/Fan__ChangeEvent?replayId=4497&"
and that seem to do the trick.
Question: Do you know if the next release will have this fixed? What is the current config name? How does the connector handle API limit exceeded scenario?
Both of the options are valid:
https://camel.apache.org/components/4.0.x/salesforce-component.html#_endpoint_query_option_replayId https://camel.apache.org/components/4.0.x/salesforce-component.html#_endpoint_query_option_defaultReplayId
The original Kamelet, used to build the connector, doesn't expose the replayId option: https://github.com/apache/camel-kamelets/blob/main/kamelets/salesforce-source.kamelet.yaml
We can add the option, but just for 4.4.1 release, since 4.4.0 is already on vote. My suggestion for the API limits is to throttle or to check the limits ahead of time.