function-mesh icon indicating copy to clipboard operation
function-mesh copied to clipboard

[issue] In effectively_once mode, when a single pod fails, the entire function will fail due to failure to create the producer.

Open graysonzeng opened this issue 2 years ago • 15 comments

  1. When I enable effectively_once and deploy the function in k8s, for example, after deploying 5 pods, when one of my pods crashes, his subscription will be transferred to other pods due to failover mode. At this time, other pods will fail to create the producer because the producer on the server side is not closed.
Failed to create producer: {"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$NamingException: Producer with name 'persistent://pulsar/default/input_test-partition-5-0' is already connected to topic","reqId":1766584259806202457, "remote":"21.21.47.12/21.21.47.12:6650", "local":"/9.165.174.197:46786"}

After this, the function restarts due to an exception, and due to failover, the function once again transfers the subscription and fails due to failure to create the producer. Causes the all function pods to constantly restart Therefore, when I need to enable effectively_once, I have to deploy multiple functions to consume partitioned topics separately. But this is not an easy way to maintain

  1. In addition, the function can easily fall into the following error and be stuck because of this error until the broker is restarted.
WARN  org.apache.pulsar.client.impl.ClientCnx - [id: 0x24fe09d6, L:/9.165.182.50:36944 ! R:21.21.134.241/21.21.134.241:6650] Received send error from server: PersistenceError : Cannot determine whether the message is a duplicate at this time

If we have any optimization suggestions, I hope can provide them, thank very much

graysonzeng avatar Dec 25 '23 11:12 graysonzeng

@freeznet @nlu90 @jiangpengcheng PTAL

graysonzeng avatar Dec 25 '23 11:12 graysonzeng

it looks like that the server should close the producer when the related client is crashed and doesn't close the producer

which version of Pulsar are you using? @graysonzeng

jiangpengcheng avatar Jan 03 '24 02:01 jiangpengcheng

it looks like that the server should close the producer when the related client is crashed and doesn't close the producer

which version of Pulsar are you using? @graysonzeng

Thanks for your reply, the pulsar version is 3.1.1。I agree with you. It seems that some mechanism is needed to properly shut down the producer before creating it.

graysonzeng avatar Jan 04 '24 07:01 graysonzeng

I will check it

jiangpengcheng avatar Jan 09 '24 08:01 jiangpengcheng

I can't reproduce the error, when I manually delete one function pod, its producer for output topic will be closed by the server too, so the next active consumer can create the producer

could u share the yaml you used? @graysonzeng

jiangpengcheng avatar Jan 12 '24 01:01 jiangpengcheng

I can't reproduce the error, when I manually delete one function pod, its producer for output topic will be closed by the server too, so the next active consumer can create the producer

could u share the yaml you used? @graysonzeng

Of course @jiangpengcheng

apiVersion: compute.functionmesh.io/v1alpha1
kind: FunctionMesh
metadata:
  name: functionmesh-001
spec:
  functions:
    - name: functions-dedup-v1
      className: com.tencent.functionNoTag
      image: mirrors.tencent.com/g_k_cdp/pulsar-functions-test:v1.0.1
      replicas: 10
      processingGuarantee: "effectively_once" #effectively_once manual
      pod:
        terminationGracePeriodSeconds: 30
      input:
        topics:
          - persistent://pulsar/default2/input_test
        typeClassName: "[B"
      output:
        topic: persistent://pulsar/default2/alltables3
        typeClassName: "[B"
      pulsar:
        pulsarConfig: "pulsar-dedup-gtmz-167-sink-test02-config-v1"
        authSecret: "sink-dedup-test02-config-auth"
      java:
        extraDependenciesDir: ""
        jar: /pulsar/connectors//DynamicTopic-1.0.nar # the NAR location in image.
        jarLocation: "" # leave empty since we will not download package from Pulsar Packages
      clusterName: pulsar-gtmz-167
      forwardSourceMessageProperty: true
      resources:
        requests:
          cpu: "2"
          memory: 2G
        limits:
          cpu: "2"
          memory: 2G
      clusterName: test-pulsar
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: pulsar-dedup-gtmz-167-sink-test02-config-v1
data:
  webServiceURL: http://xx.xx.47.12:8080
  brokerServiceURL: pulsar://xx.xx.47.12:6650
  authPlugin: "org.apache.pulsar.client.impl.auth.AuthenticationToken"
  authParams: "eyJhxxxx"
---
apiVersion: v1
kind: Secret
metadata:
  name: sink-dedup-test02-config-auth
stringData:
  clientAuthenticationPlugin: "org.apache.pulsar.client.impl.auth.AuthenticationToken"
  clientAuthenticationParameters: "token:eyxxxx"
  

graysonzeng avatar Jan 12 '24 08:01 graysonzeng

I see, the error is caused by below:

when pod-1 crashes, another pod(like pod-2) will take over pod-1's subscription to pod-1 subscripted partition(like partition-5), if a message comes to partition-5, the pod-2 will create a new producer with the name persistent://pulsar/default/input_test-partition-5-0, then when pod-1 restarted and got a new message, it cannot create new producer with name persistent://pulsar/default/input_test-partition-5-0

this issue is caused by JavaInstanceRunnable, I will fix it in the apache/pulsar repo

@graysonzeng

jiangpengcheng avatar Jan 16 '24 07:01 jiangpengcheng

I see, the error is caused by below:

when pod-1 crashes, another pod(like pod-2) will take over pod-1's subscription to pod-1 subscripted partition(like partition-5), if a message comes to partition-5, the pod-2 will create a new producer with the name persistent://pulsar/default/input_test-partition-5-0, then when pod-1 restarted and got a new message, it cannot create new producer with name persistent://pulsar/default/input_test-partition-5-0

this issue is caused by JavaInstanceRunnable, I will fix it in the apache/pulsar repo

@graysonzeng

thanks!

graysonzeng avatar Jan 17 '24 02:01 graysonzeng

I created a pr here: https://github.com/apache/pulsar/pull/21912, and built a jar based on it, could you add it to your runner image to check whether the issue is resolved? @graysonzeng

jiangpengcheng avatar Jan 17 '24 11:01 jiangpengcheng

This issue is hard to fix since we cannot use different producer name to ensure the de-duplication. Below is a workaround when the error happens:

  1. set the function's replicas to 0
  2. wait until all function pods stop so that no producer exists for the output topic
  3. set back the function's replicas

cc @graysonzeng

jiangpengcheng avatar Jan 23 '24 06:01 jiangpengcheng

Thanks for the suggestion. I have an idea, how about using independent subscription names for each pod. For example, I have 10 partitions on the consumer side and two pods for consumption. Then, my two pods set subscription names A and B respectively. After this, even if the pod fails, the subscription will not be switched to other surviving pods due to failover, and it can continue to consume when the pod is pulled up by kubernetes. @jiangpengcheng

graysonzeng avatar Jan 24 '24 01:01 graysonzeng

with different sub names will make podA and podB both consume all ten partitions and lead to duplication

jiangpengcheng avatar Jan 24 '24 02:01 jiangpengcheng

A and B consume part of the partitions respectively. For example, A consumes partitions 0-4 and B consumes partitions 5-9. This will not lead to duplication.

graysonzeng avatar Jan 24 '24 03:01 graysonzeng

A and B consume part of the partitions respectively.

this needs to be specified manually, which is just like what you do now:

Therefore, when I need to enable effectively_once, I have to deploy multiple functions to consume partitioned topics separately. But this is not an easy way to maintain

jiangpengcheng avatar Jan 25 '24 06:01 jiangpengcheng

Thanks . I originally thought that the creation of subscription does not need to be specified manually. In this way, the user can create a sink with mutil-replicas. No need to configure multiple configurations in yaml repeatedly. But it seems like this is the only way it can be for now @jiangpengcheng

graysonzeng avatar Jan 30 '24 04:01 graysonzeng