camel-kafka-connector icon indicating copy to clipboard operation
camel-kafka-connector copied to clipboard

Camel Kafka Salesforce Source connector keeps disconnecting/resubscribing and eventually fails

Open hakidehari opened this issue 3 years ago • 7 comments

Hi all,

I have been trying to troubleshoot some issues I have been seeing with the Camel Kafka Salesforce Source connector to no avail. I have two different kafka topics which each have 2 connectors flowing information into them via Salesforce streaming API using the Salesforce source connector. There are a few weird things I am seeing. The connectors seem to be reconnecting to the Salesforce streams every minute or so. Here is an example of what I see in the logs

[2022-02-14 02:36:41,133] WARN [sf_account_change_connector|task-0] Connect failure: {failure={exception=org.cometd.common.TransportException: {httpCode=504}, message={clientId=y98udzrc3x45vv1jfrg9twg1f9e, channel=/meta/connect, id=30557, connectionType=long-polling}, httpCode=504, connectionType=long-polling}, channel=/meta/connect, id=30557, successful=false} (org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:174) [2022-02-14 02:36:41,252] INFO [sf_account_change_connector|task-0] Restarting on unexpected disconnect from Salesforce... (org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:262) [2022-02-14 02:36:41,525] INFO [sf_account_change_connector|task-0] Set Replay extension to replay from -1for channel/data/AccountChangeEvent(org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:547) [2022-02-14 02:36:41,525] INFO [sf_account_change_connector|task-0] Subscribing to channel /data/AccountChangeEvent... (org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:435) [2022-02-14 02:36:41,526] INFO [sf_account_change_connector|task-0] Successfully restarted! (org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:312) [2022-02-14 02:36:41,569] INFO [sf_account_change_connector|task-0] Subscribed to channel /data/AccountChangeEvent (org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:499) [2022-02-14 02:36:46,193] WARN [sf_order_p_event_connector|task-0] Connect failure: {failure={exception=org.cometd.common.TransportException: {httpCode=504}, message={clientId=139646d4ddo6w8r1v1xmp4rj3bli, channel=/meta/connect, id=30562, connectionType=long-polling}, httpCode=504, connectionType=long-polling}, channel=/meta/connect, id=30562, successful=false} (org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:174) [2022-02-14 02:36:46,210] INFO [sf_account_change_connector|task-0|offsets] WorkerSourceTask{id=sf_account_change_connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487) [2022-02-14 02:36:46,336] INFO [sf_order_p_event_connector|task-0] Restarting on unexpected disconnect from Salesforce... (org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:262) [2022-02-14 02:36:46,392] INFO [sf_order_p_event_connector|task-0|offsets] WorkerSourceTask{id=sf_order_p_event_connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487) [2022-02-14 02:36:46,596] INFO [sf_order_p_event_connector|task-0] Set Replay extension to replay from-1for channel/event/Order_Completed__e(org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:547) [2022-02-14 02:36:46,596] INFO [sf_order_p_event_connector|task-0] Subscribing to channel /event/Order_Completed__e... (org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:435)

Eventually, after a few hours or even a couple of days, some of the connectors fail to reconnect. Here, I see two different errors. I see either 401::Authentication invalid error or 403::Unknown Client error. Examples:

[2022-02-14 04:11:15,324] ERROR [sf_order_p_event_connector|task-0] Error restarting: Error during CONNECT: 403::Unknown client (org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:307) org.apache.camel.CamelException: Error during CONNECT: 403::Unknown client at org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.connect(SubscriptionHelper.java:230) at org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.doStart(SubscriptionHelper.java:212) at org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.performClientRestart(SubscriptionHelper.java:304) at org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.access$1000(SubscriptionHelper.java:60) at org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper$4.run(SubscriptionHelper.java:249) at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:882) at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1036) at java.lang.Thread.run(Thread.java:748)

and

[2022-02-09 19:58:39,911] ERROR [sf_contact_change_connector|task-0] Error restarting: Exception during HANDSHAKE: 401::Authentication invalid (org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:307) org.apache.camel.CamelException: Exception during HANDSHAKE: 401::Authentication invalid at org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.connect(SubscriptionHelper.java:223) at org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.doStart(SubscriptionHelper.java:212) at org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.performClientRestart(SubscriptionHelper.java:304) at org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.access$1000(SubscriptionHelper.java:60) at org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper$4.run(SubscriptionHelper.java:249) at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:882) at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1036) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.camel.component.salesforce.api.SalesforceException: 401::Authentication invalid at org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.getFailure(SubscriptionHelper.java:340) at org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.access$300(SubscriptionHelper.java:60) at org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper$1.onMessage(SubscriptionHelper.java:132) at org.cometd.common.AbstractClientSession$AbstractSessionChannel.notifyOnMessage(AbstractClientSession.java:583) at org.cometd.common.AbstractClientSession$AbstractSessionChannel.notifyMessageListeners(AbstractClientSession.java:568) at org.cometd.common.AbstractClientSession.notifyListeners(AbstractClientSession.java:308) at org.cometd.common.AbstractClientSession.lambda$receive$4(AbstractClientSession.java:269) at org.cometd.bayeux.Promise$2.succeed(Promise.java:103) at org.cometd.common.AsyncFoldLeft$AbstractLoop.run(AsyncFoldLeft.java:199) at org.cometd.common.AsyncFoldLeft.run(AsyncFoldLeft.java:93) at org.cometd.common.AbstractClientSession.extendIncoming(AbstractClientSession.java:103) at org.cometd.common.AbstractClientSession.receive(AbstractClientSession.java:263) at org.cometd.client.BayeuxClient.failHandshake(BayeuxClient.java:721) at org.cometd.client.BayeuxClient.processHandshake(BayeuxClient.java:707)

This is not specific to one connector but can happen on any of them. Here is an example config of my Salesforce Source connector which I am using. I am using the REFRESH_TOKEN authentication method and have set the refresh token to expire if it is not used for 365 days within Salesforce. The authentication is good, as is evident by it working fine when I recreate the connectors.

{ "name": "sf_account_change_connector", "config": { "camel.source.endpoint.rawPayload": true, "key.converter": "org.apache.kafka.connect.storage.StringConverter", "key.converter.schemas.enable": false, "value.converter.schemas.enable":false, "value.converter":"org.apache.kafka.connect.converters.ByteArrayConverter", "camel.source.marshal":"json-jackson", "connector.class":"org.apache.camel.kafkaconnector.salesforce.CamelSalesforceSourceConnector", "camel.component.salesforce.loginUrl":"<redacted>", "camel.component.salesforce.instanceUrl":"<redacted>", "topics": "CISL_SF_ASSET_INFO_INPUT", "errors.tolerance":"all", "errors.retry.timeout":"-1", "camel.source.path.topicName":"/data/AccountChangeEvent", "camel.source.endpoint.replayId":"-1", "camel.component.salesforce.authenticationType":"REFRESH_TOKEN", "camel.component.salesforce.clientId":"<redacted>", "camel.component.salesforce.clientSecret":"<redacted>", "camel.component.salesforce.refreshToken":"redacted", "camel.component.salesforce.userName":"<redacted>" } }

All the connectors have similar configs to this one, only changes are the streaming API path and topic names. So my questions are this:

Why is it reconnecting to the streams every so often? Is this a setting I can change in the connector config itself or is this something SF specific? Also, why are there occasional 403 and 401 errors arising, even though my authentication is perfectly valid? The connectors seem to stop working when one of these errors happen.

Thanks in advance!

hakidehari avatar Feb 14 '22 17:02 hakidehari

What is the connector version?

oscerd avatar Feb 14 '22 18:02 oscerd

Hi @oscerd , I am not sure how to check the version, but I do see these jar files under the salesforce connector directory. not sure if this will help

camel-salesforce-3.10.0.jar camel-salesforce-kafka-connector-0.10.1.jar

camel-kafka-connector-0.10.1.jar

hakidehari avatar Feb 14 '22 18:02 hakidehari

@oscerd I have confirmed with our infrastructure team that the version is 0.10.1.

hakidehari avatar Feb 14 '22 19:02 hakidehari

We upgraded to the latest version 0.11.0 and are still getting the same Unknown Client error after some elapsed time. Is there any way to forcefully restart the connector automatically through some sort of config? @oscerd

hakidehari avatar Feb 16 '22 16:02 hakidehari

I don't think so, maybe @zregvart could help

oscerd avatar Feb 16 '22 17:02 oscerd

@hakidehari we had reports of similar issues in the past and as far as we're aware they have been fixed up to Camel 3.5, that is the folk that reported them haven't raised any new concerns. This is the list of issues in the Salesforce component, look for CometD and platform event ones. These issues are really difficult to troubleshoot and the best we can offer here is this anecdotal advice. It seems that connecting via the same Connected App could cause authentication issues, we have never confirmed this to be the case. So, if you're using the same Connected App configuration for all clients I would suggest using different one for each one, also make sure that no one else is using the same Connected Apps as the ones you have configured. Beyond this, to fix this we would need a very detailed and clear way of reproducing this issue.

zregvart avatar Feb 16 '22 18:02 zregvart

Thanks for the response @zregvart. I also had this hunch that using the same connected app for each connector could be causing this issue. I am glad you brought this up as well. I will go ahead and try using one connected app for one connector and see what happens.

hakidehari avatar Feb 16 '22 18:02 hakidehari