pulsar-client-python icon indicating copy to clipboard operation
pulsar-client-python copied to clipboard

python client does not retry when get topic partition metadata to create producer fails

Open yebai1105 opened this issue 2 years ago • 5 comments
trafficstars

Describe the bug If you fail to get topic partition metadata through service_url, it will not retry and try to connect to other ip, and will directly exit abnormally after the connection fails pulsar-client=2.9.4

To Reproduce 1、Test Conditions 127.0.0.1:6650 The corresponding service is closed 127.0.0.2:6650 service is normal 2、Test code

import pulsar
client = pulsar.Client(
    authentication=pulsar.AuthenticationToken(
            "xxxxxxxxxxxx"),
    service_url="pulsar://127.0.0.1:6650,127.0.0.2:6650",
    #operation_timeout_seconds=120
)
producer = client.create_producer(
    topic='persistent://qlm-test/qlm-ns/python-test3',    
    send_timeout_millis=120000,
    block_if_queue_full=True,
    batching_enabled=True,
    batching_max_publish_delay_ms=10,
    batching_max_messages=100,
    batching_max_allowed_size_in_bytes=1024 * 1024,
    max_pending_messages=1000)

while(True):
    producer.send(('Hello-%d').encode('utf-8'))
producer.close()
client.close()

error log

2023-04-25 14:16:51.866 INFO  [139745917667072] ExecutorService:41 | Run io_service in a single thread
2023-04-25 14:16:51.866 INFO  [139746086545152] ClientConnection:189 | [<none> -> pulsar://127.0.0.1:6650,127.0.0.2:6650] Create ClientConnection, timeout=10000
2023-04-25 14:16:51.866 INFO  [139746086545152] ConnectionPool:96 | Created connection for pulsar://127.0.0.1:6650,127.0.0.2:6650
2023-04-25 14:16:51.867 WARN  [139745917667072] ClientConnection:436 | [<none> -> pulsar://127.0.0.1:6650,127.0.0.2:6650] Failed to establish connection: Connection refused
2023-04-25 14:16:51.867 INFO  [139745917667072] ClientConnection:1563 | [<none> -> pulsar://127.0.0.1:6650,127.0.0.2:6650] Connection closed
2023-04-25 14:16:51.867 ERROR [139745917667072] ClientImpl:190 | Error Checking/Getting Partition Metadata while creating producer on persistent://qlm-test/qlm-ns/python-test3 -- ConnectError
2023-04-25 14:16:51.867 INFO  [139745917667072] ClientConnection:263 | [<none> -> pulsar://127.0.0.1:6650,127.0.0.2:6650] Destroyed connection
Traceback (most recent call last):
  File "pulsar-test.py", line 8, in <module>
    producer = client.create_producer(
  File "/usr/local/python3/lib/python3.8/site-packages/pulsar/__init__.py", line 603, in create_producer
    p._producer = self._client.create_producer(topic, conf)
_pulsar.ConnectError: Pulsar error: ConnectError
2023-04-25 14:16:51.871 INFO  [139745917667072] ExecutorService:47 | Event loop of ExecutorService exits successfully

Expected behavior Within the timeout period, if the connection fails to be obtained, a retry is initiated Such as the processing method of java client:

2023-04-25 14:18:45.078[pulsar-external-listener-3-1] WARN  org.apache.pulsar.client.impl.PulsarClientImpl - [topic: persistent://qlm-test/qlm-ns/python-test3] Could not get connection while getPartitionedTopicMetadata -- Will try again in 6345 ms
2023-04-25 14:18:45.093[pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConnectionPool - [[id: 0xedb4eab7, L:/10.13.209.102:58831 - R:/10.101.129.65:6650]] Connected to server

yebai1105 avatar Apr 25 '23 07:04 yebai1105

Could you try the latest Python client? The retry logic was introduced since 3.0.0.

BewareMyPower avatar Apr 25 '23 13:04 BewareMyPower

pulsar-client=3.1.0,I tried this version too, but there is also a problem

yebai1105 avatar Apr 27 '23 02:04 yebai1105

I tried this version too, but there is also a problem

Is the issue the same one? Or if you can provide logs on 3.1.0.

tisonkun avatar Jun 15 '23 13:06 tisonkun

I got same error on pulsar-client 3.1.0

code: `from pulsar import Client, AuthenticationToken,BatchingType client = Client( service_url='pulsar://node01.public.pulsar.test:6650', authentication=AuthenticationToken( "xxxxxxxxxxxxxxxxxxxxxxxxx")) producer = client.create_producer( 'my-topic', block_if_queue_full=True, batching_enabled=True, batching_max_publish_delay_ms=10, properties={ "producer-name": "test-producer-name", "producer-id": "test-producer-id" }, batching_type=BatchingType.KeyBased )

for i in range(10): producer.send(('Hello-%d' % i).encode('utf-8'))

client.close()`

log:

`Connected to pydev debugger (build 213.7172.26) 2023-12-21 11:29:56.580 INFO [140635486381888] ClientConnection:190 | [ -> pulsar://node01.public.pulsar.test:6650] Create ClientConnection, timeout=10000 2023-12-21 11:29:56.580 INFO [140635486381888] ConnectionPool:97 | Created connection for pulsar://node01.public.pulsar.test:6650 2023-12-21 11:29:56.635 INFO [140633529005824] ClientConnection:388 | [192.168.1.158:37378 -> 172.16.20.97:6650] Connected to broker 2023-12-21 11:29:56.688 INFO [140633529005824] ClientConnection:1600 | [192.168.1.158:37378 -> 172.16.20.97:6650] Connection closed with ConnectError 2023-12-21 11:29:56.688 ERROR [140633529005824] ClientImpl:183 | Error Checking/Getting Partition Metadata while creating producer on persistent://public/default/my-topic -- ConnectError 2023-12-21 11:29:56.688 INFO [140633529005824] ClientConnection:269 | [192.168.1.158:37378 -> 172.16.20.97:6650] Destroyed connection Traceback (most recent call last): File "/root/.pycharm_helpers/pydev/pydevd.py", line 1483, in _exec pydev_imports.execfile(file, globals, locals) # execute the script File "/root/.pycharm_helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile exec(compile(contents+"\n", file, 'exec'), glob, loc) File "/home/work/wangxx/projects/llm_qa/src/utils/pilsar_host.py", line 19, in producer = client.create_producer( File "/root/anaconda3/envs/MOSS/lib/python3.8/site-packages/pulsar/init.py", line 639, in create_producer p._producer = self._client.create_producer(topic, conf) _pulsar.ConnectError: Pulsar error: ConnectError

Process finished with exit code 1 `

iamxinxin avatar Dec 21 '23 03:12 iamxinxin

I have same issue with Java pulsar client & pulsar 3.2 when I enabled authentication: authenticationEnabled=true

  • Code: PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar://127.0.0.1:6650") .authentication(AuthenticationFactory.token(token)) .build();
  • Log on client side: 10:57:51.287 [pulsar-client-io-1-3] INFO org.apache.pulsar.client.impl.ConnectionPool -- [[id: 0x65a00d98, L:/127.0.0.1:64047 - R:/127.0.0.1:6650]] Connected to server 10:57:52.111 [pulsar-client-io-1-3] INFO org.apache.pulsar.client.impl.ProducerStatsRecorderImpl -- Starting Pulsar producer perf with config: {"topicName":"test.topic","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null} 10:57:52.129 [pulsar-client-io-1-3] INFO org.apache.pulsar.client.impl.ProducerStatsRecorderImpl -- Pulsar client config: {"serviceUrl":"pulsar://127.0.0.1:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":8,"numListenerThreads":8,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null} 10:57:52.152 [pulsar-client-io-1-3] INFO org.apache.pulsar.client.impl.ProducerImpl -- [test.topic] [null] Creating producer on cnx [id: 0x65a00d98, L:/127.0.0.1:64047 - R:/127.0.0.1:6650] 10:57:52.210 [pulsar-client-io-1-3] WARN org.apache.pulsar.client.impl.ClientCnx -- [id: 0x65a00d98, L:/127.0.0.1:64047 - R:/127.0.0.1:6650] Received error from server: Topic creation encountered an exception by initialize topic policies service. topic_name=persistent://public/default/test.topic error_message=Failed to authenticate 10:57:52.214 [pulsar-client-io-1-3] ERROR org.apache.pulsar.client.impl.ProducerImpl -- [test.topic] [null] Failed to create producer: {"errorMsg":"Topic creation encountered an exception by initialize topic policies service. topic_name=persistent://public/default/test.topic error_message=Failed to authenticate","reqId":3148985659011010964, "remote":"/127.0.0.1:6650", "local":"/127.0.0.1:64047"} 10:57:52.214 [pulsar-client-io-1-3] WARN org.apache.pulsar.client.impl.ConnectionHandler -- [test.topic] [null] Error connecting to broker: org.apache.pulsar.client.api.PulsarClientException$LookupException: {"errorMsg":"Topic creation encountered an exception by initialize topic policies service. topic_name=persistent://public/default/test.topic error_message=Failed to authenticate","reqId":3148985659011010964, "remote":"/127.0.0.1:6650", "local":"/127.0.0.1:64047"} 10:57:52.214 [pulsar-client-io-1-3] WARN org.apache.pulsar.client.impl.ConnectionHandler -- [test.topic] [null] Could not get connection to broker: org.apache.pulsar.client.api.PulsarClientException$LookupException: {"errorMsg":"Topic creation encountered an exception by initialize topic policies service. topic_name=persistent://public/default/test.topic error_message=Failed to authenticate","reqId":3148985659011010964, "remote":"/127.0.0.1:6650", "local":"/127.0.0.1:64047"} -- Will try again in 0.1 s 10:57:52.316 [pulsar-timer-27-1] INFO org.apache.pulsar.client.impl.ConnectionHandler -- [test.topic] [null] Reconnecting after connection was closed
  • Log on server side: 2024-08-18T03:22:22,122+0000 [pulsar-io-3-13] DEBUG org.apache.pulsar.broker.service.ServerCnx - [/172.24.0.1:37336] connect state change to : [Connected] 2024-08-18T03:22:22,122+0000 [pulsar-io-3-13] INFO org.apache.pulsar.broker.service.ServerCnx - [/172.24.0.1:37336] connected with role=OWNER using authMethod=token, clientVersion=Pulsar-Java-v3.2.3, clientProtocolVersion=21, proxyVersion=null 2024-08-18T03:22:22,140+0000 [pulsar-io-3-13] DEBUG org.apache.pulsar.common.protocol.PulsarDecoder - [/172.24.0.1:37336] Received cmd PARTITIONED_METADATA 2024-08-18T03:22:22,140+0000 [pulsar-io-3-13] DEBUG org.apache.pulsar.broker.service.ServerCnx - [persistent://public/default/test.topic] Received PartitionMetadataLookup from /172.24.0.1:37336 for 1495031580381226501 2024-08-18T03:22:22,143+0000 [main-SendThread(zookeeper:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply session id: 0x1000073bd71000a, packet:: clientPath:null serverPath:null finished:false header:: 961,22 replyHeader:: 961,987,0 request:: org.apache.zookeeper.MultiOperationRecord@8adefa96 response:: org.apache.zookeeper.MultiResponse@dd603777 2024-08-18T03:22:22,148+0000 [main-SendThread(zookeeper:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply session id: 0x1000073bd71000a, packet:: clientPath:null serverPath:null finished:false header:: 962,22 replyHeader:: 962,987,0 request:: org.apache.zookeeper.MultiOperationRecord@8074b73c response:: org.apache.zookeeper.MultiResponse@ffffff9b 2024-08-18T03:22:22,149+0000 [main-SendThread(zookeeper:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply session id: 0x1000073bd71000a, packet:: clientPath:/managed-ledgers/public/default/persistent/test.topic serverPath:/managed-ledgers/public/default/persistent/test.topic finished:false header:: 963,3 replyHeader:: 963,987,-101 request:: '/managed-ledgers/public/default/persistent/test.topic,F response::
    2024-08-18T03:22:22,150+0000 [pulsar-2-8] DEBUG org.apache.pulsar.broker.service.BrokerService - No autoTopicCreateOverride policy found for persistent://public/default/test.topic 2024-08-18T03:22:22,150+0000 [pulsar-2-8] DEBUG org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - Total number of partitions for topic persistent://public/default/test.topic is 0 2024-08-18T03:22:22,288+0000 [pulsar-io-3-13] DEBUG org.apache.pulsar.common.protocol.PulsarDecoder - [/172.24.0.1:37336] Received cmd LOOKUP 2024-08-18T03:22:22,288+0000 [pulsar-io-3-13] DEBUG org.apache.pulsar.broker.service.ServerCnx - [persistent://public/default/test.topic] Received Lookup from /172.24.0.1:37336 for 1495031580381226502 requesting listener (none) 2024-08-18T03:22:22,292+0000 [main-SendThread(zookeeper:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply session id: 0x1000073bd71000a, packet:: clientPath:null serverPath:null finished:false header:: 964,22 replyHeader:: 964,987,0 request:: org.apache.zookeeper.MultiOperationRecord@609b619f response:: org.apache.zookeeper.MultiResponse@ac99062a 2024-08-18T03:22:22,294+0000 [metadata-store-9-1] INFO org.apache.pulsar.broker.loadbalance.extensions.manager.RedirectManager - No need to redirect, current load manager class name: org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl 2024-08-18T03:22:22,294+0000 [metadata-store-9-1] DEBUG org.apache.pulsar.broker.namespace.NamespaceService - findBrokerServiceUrl: public/default/0xc0000000_0xd0000000 - options: LookupOptions(authoritative=false, readOnly=false, loadTopicsInBundle=true, requestHttps=false, advertisedListenerName=null) 2024-08-18T03:22:22,294+0000 [metadata-store-9-1] DEBUG org.apache.pulsar.broker.namespace.NamespaceService - Namespace bundle public/default/0xc0000000_0xd0000000 already owned by Optional[NamespaceEphemeralData(nativeUrl=pulsar://127.0.0.1:6650, nativeUrlTls=null, httpUrl=http://broker:8080, httpUrlTls=null, disabled=false, advertisedListeners={external=AdvertisedListener(brokerServiceUrl=pulsar://127.0.0.1:6650, brokerServiceUrlTls=null, brokerHttpUrl=null, brokerHttpsUrl=null)})] 2024-08-18T03:22:22,294+0000 [metadata-store-9-1] DEBUG org.apache.pulsar.broker.lookup.TopicLookupBase - [persistent://public/default/test.topic] Lookup result Optional[LookupResult [type=BrokerUrl, lookupData=LookupData{brokerUrl=pulsar://127.0.0.1:6650, brokerUrlTls=null, httpUrl=http://broker:8080, httpUrlTls=null}]] 2024-08-18T03:22:22,307+0000 [pulsar-io-3-13] DEBUG org.apache.pulsar.common.protocol.PulsarDecoder - [/172.24.0.1:37336] Received cmd PRODUCER 2024-08-18T03:22:22,307+0000 [pulsar-io-3-13] DEBUG org.apache.pulsar.broker.service.ServerCnx - [/172.24.0.1:37336] Client is authorized to Produce with role OWNER 2024-08-18T03:22:22,307+0000 [pulsar-io-3-13] DEBUG org.apache.pulsar.broker.service.ServerCnx - [/172.24.0.1:37336][persistent://public/default/test.topic] Creating producer. producerId=0, producerName=dev-es-pulsar-cluster-19-9, schema is present 2024-08-18T03:22:22,307+0000 [pulsar-io-3-13] DEBUG org.apache.pulsar.broker.service.BrokerService - No autoTopicCreateOverride policy found for persistent://public/default/test.topic 2024-08-18T03:22:22,307+0000 [pulsar-io-3-13] INFO org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory - Create topic policies system topic client persistent://public/default/__change_events 2024-08-18T03:22:22,343+0000 [pulsar-io-3-13] DEBUG org.apache.pulsar.client.impl.ConnectionPool - Connection for 127.0.0.1/:6650 not found in cache 2024-08-18T03:22:22,345+0000 [pulsar-io-3-14] INFO org.apache.pulsar.client.impl.ConnectionPool - [[id: 0x78f03fc2, L:/127.0.0.1:37338 - R:/127.0.0.1:6650]] Connected to server 2024-08-18T03:22:22,345+0000 [pulsar-io-3-14] DEBUG org.apache.pulsar.common.protocol.PulsarHandler - [[id: 0x78f03fc2, L:/127.0.0.1:37338 - R:/127.0.0.1:6650]] Scheduling keep-alive task every 30 s 2024-08-18T03:22:22,346+0000 [pulsar-io-3-14] DEBUG org.apache.pulsar.client.impl.ClientCnx - [id: 0x78f03fc2, L:/127.0.0.1:37338 - R:/127.0.0.1:6650] Connected to broker 2024-08-18T03:22:22,346+0000 [pulsar-io-3-15] DEBUG org.apache.pulsar.common.protocol.PulsarHandler - [[id: 0x9b9ead57, L:/127.0.0.1:6650 - R:/127.0.0.1:37338]] Scheduling keep-alive task every 30 s 2024-08-18T03:22:22,347+0000 [pulsar-io-3-15] DEBUG org.apache.pulsar.broker.service.ServerCnx - New connection from /127.0.0.1:37338 2024-08-18T03:22:22,348+0000 [pulsar-io-3-14] DEBUG org.apache.pulsar.client.impl.ClientCnx - Complete: true 2024-08-18T03:22:22,349+0000 [pulsar-io-3-15] DEBUG org.apache.pulsar.common.protocol.PulsarDecoder - [/127.0.0.1:37338] Received cmd CONNECT 2024-08-18T03:22:22,349+0000 [pulsar-io-3-15] DEBUG org.apache.pulsar.broker.service.ServerCnx - Received CONNECT from /127.0.0.1:37338, auth enabled: true: has original principal = false, original principal = null 2024-08-18T03:22:22,350+0000 [pulsar-io-3-15] INFO org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:37338] Failed to authenticate: operation=connect, principal=null, reason=No anonymous role, and no authentication provider configured 2024-08-18T03:22:22,352+0000 [pulsar-io-3-15] INFO org.apache.pulsar.broker.service.ServerCnx - Closed connection from /127.0.0.1:37338 2024-08-18T03:22:22,352+0000 [pulsar-io-3-14] DEBUG org.apache.pulsar.common.protocol.PulsarDecoder - [/127.0.0.1:6650] Received cmd ERROR 2024-08-18T03:22:22,353+0000 [pulsar-io-3-14] WARN org.apache.pulsar.client.impl.ClientCnx - [id: 0x78f03fc2, L:/127.0.0.1:37338 - R:/127.0.0.1:6650] Received error from server: Failed to authenticate 2024-08-18T03:22:22,353+0000 [pulsar-io-3-14] WARN org.apache.pulsar.client.impl.ConnectionPool - [[id: 0x78f03fc2, L:/127.0.0.1:37338 - R:/127.0.0.1:6650]] Connection handshake failed: org.apache.pulsar.client.api.PulsarClientException$AuthenticationException: Failed to authenticate 2024-08-18T03:22:22,353+0000 [pulsar-io-3-14] WARN org.apache.pulsar.client.impl.PulsarClientImpl - [persistent://public/default/__change_events] Failed to get partitioned topic metadata java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException$AuthenticationException: Failed to authenticate at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332) ~[?:?]

hero6-coder avatar Aug 18 '24 04:08 hero6-coder