pulsar-client-python
pulsar-client-python copied to clipboard
python client does not retry when get topic partition metadata to create producer fails
Describe the bug If you fail to get topic partition metadata through service_url, it will not retry and try to connect to other ip, and will directly exit abnormally after the connection fails pulsar-client=2.9.4
To Reproduce 1、Test Conditions 127.0.0.1:6650 The corresponding service is closed 127.0.0.2:6650 service is normal 2、Test code
import pulsar
client = pulsar.Client(
authentication=pulsar.AuthenticationToken(
"xxxxxxxxxxxx"),
service_url="pulsar://127.0.0.1:6650,127.0.0.2:6650",
#operation_timeout_seconds=120
)
producer = client.create_producer(
topic='persistent://qlm-test/qlm-ns/python-test3',
send_timeout_millis=120000,
block_if_queue_full=True,
batching_enabled=True,
batching_max_publish_delay_ms=10,
batching_max_messages=100,
batching_max_allowed_size_in_bytes=1024 * 1024,
max_pending_messages=1000)
while(True):
producer.send(('Hello-%d').encode('utf-8'))
producer.close()
client.close()
error log
2023-04-25 14:16:51.866 INFO [139745917667072] ExecutorService:41 | Run io_service in a single thread
2023-04-25 14:16:51.866 INFO [139746086545152] ClientConnection:189 | [<none> -> pulsar://127.0.0.1:6650,127.0.0.2:6650] Create ClientConnection, timeout=10000
2023-04-25 14:16:51.866 INFO [139746086545152] ConnectionPool:96 | Created connection for pulsar://127.0.0.1:6650,127.0.0.2:6650
2023-04-25 14:16:51.867 WARN [139745917667072] ClientConnection:436 | [<none> -> pulsar://127.0.0.1:6650,127.0.0.2:6650] Failed to establish connection: Connection refused
2023-04-25 14:16:51.867 INFO [139745917667072] ClientConnection:1563 | [<none> -> pulsar://127.0.0.1:6650,127.0.0.2:6650] Connection closed
2023-04-25 14:16:51.867 ERROR [139745917667072] ClientImpl:190 | Error Checking/Getting Partition Metadata while creating producer on persistent://qlm-test/qlm-ns/python-test3 -- ConnectError
2023-04-25 14:16:51.867 INFO [139745917667072] ClientConnection:263 | [<none> -> pulsar://127.0.0.1:6650,127.0.0.2:6650] Destroyed connection
Traceback (most recent call last):
File "pulsar-test.py", line 8, in <module>
producer = client.create_producer(
File "/usr/local/python3/lib/python3.8/site-packages/pulsar/__init__.py", line 603, in create_producer
p._producer = self._client.create_producer(topic, conf)
_pulsar.ConnectError: Pulsar error: ConnectError
2023-04-25 14:16:51.871 INFO [139745917667072] ExecutorService:47 | Event loop of ExecutorService exits successfully
Expected behavior Within the timeout period, if the connection fails to be obtained, a retry is initiated Such as the processing method of java client:
2023-04-25 14:18:45.078[pulsar-external-listener-3-1] WARN org.apache.pulsar.client.impl.PulsarClientImpl - [topic: persistent://qlm-test/qlm-ns/python-test3] Could not get connection while getPartitionedTopicMetadata -- Will try again in 6345 ms
2023-04-25 14:18:45.093[pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConnectionPool - [[id: 0xedb4eab7, L:/10.13.209.102:58831 - R:/10.101.129.65:6650]] Connected to server
Could you try the latest Python client? The retry logic was introduced since 3.0.0.
pulsar-client=3.1.0,I tried this version too, but there is also a problem
I tried this version too, but there is also a problem
Is the issue the same one? Or if you can provide logs on 3.1.0.
I got same error on pulsar-client 3.1.0
code: `from pulsar import Client, AuthenticationToken,BatchingType client = Client( service_url='pulsar://node01.public.pulsar.test:6650', authentication=AuthenticationToken( "xxxxxxxxxxxxxxxxxxxxxxxxx")) producer = client.create_producer( 'my-topic', block_if_queue_full=True, batching_enabled=True, batching_max_publish_delay_ms=10, properties={ "producer-name": "test-producer-name", "producer-id": "test-producer-id" }, batching_type=BatchingType.KeyBased )
for i in range(10): producer.send(('Hello-%d' % i).encode('utf-8'))
client.close()`
log:
`Connected to pydev debugger (build 213.7172.26)
2023-12-21 11:29:56.580 INFO [140635486381888] ClientConnection:190 | [
Process finished with exit code 1 `
I have same issue with Java pulsar client & pulsar 3.2 when I enabled authentication: authenticationEnabled=true
- Code:
PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar://127.0.0.1:6650") .authentication(AuthenticationFactory.token(token)) .build(); - Log on client side: 10:57:51.287 [pulsar-client-io-1-3] INFO org.apache.pulsar.client.impl.ConnectionPool -- [[id: 0x65a00d98, L:/127.0.0.1:64047 - R:/127.0.0.1:6650]] Connected to server 10:57:52.111 [pulsar-client-io-1-3] INFO org.apache.pulsar.client.impl.ProducerStatsRecorderImpl -- Starting Pulsar producer perf with config: {"topicName":"test.topic","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null} 10:57:52.129 [pulsar-client-io-1-3] INFO org.apache.pulsar.client.impl.ProducerStatsRecorderImpl -- Pulsar client config: {"serviceUrl":"pulsar://127.0.0.1:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":8,"numListenerThreads":8,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null} 10:57:52.152 [pulsar-client-io-1-3] INFO org.apache.pulsar.client.impl.ProducerImpl -- [test.topic] [null] Creating producer on cnx [id: 0x65a00d98, L:/127.0.0.1:64047 - R:/127.0.0.1:6650] 10:57:52.210 [pulsar-client-io-1-3] WARN org.apache.pulsar.client.impl.ClientCnx -- [id: 0x65a00d98, L:/127.0.0.1:64047 - R:/127.0.0.1:6650] Received error from server: Topic creation encountered an exception by initialize topic policies service. topic_name=persistent://public/default/test.topic error_message=Failed to authenticate 10:57:52.214 [pulsar-client-io-1-3] ERROR org.apache.pulsar.client.impl.ProducerImpl -- [test.topic] [null] Failed to create producer: {"errorMsg":"Topic creation encountered an exception by initialize topic policies service. topic_name=persistent://public/default/test.topic error_message=Failed to authenticate","reqId":3148985659011010964, "remote":"/127.0.0.1:6650", "local":"/127.0.0.1:64047"} 10:57:52.214 [pulsar-client-io-1-3] WARN org.apache.pulsar.client.impl.ConnectionHandler -- [test.topic] [null] Error connecting to broker: org.apache.pulsar.client.api.PulsarClientException$LookupException: {"errorMsg":"Topic creation encountered an exception by initialize topic policies service. topic_name=persistent://public/default/test.topic error_message=Failed to authenticate","reqId":3148985659011010964, "remote":"/127.0.0.1:6650", "local":"/127.0.0.1:64047"} 10:57:52.214 [pulsar-client-io-1-3] WARN org.apache.pulsar.client.impl.ConnectionHandler -- [test.topic] [null] Could not get connection to broker: org.apache.pulsar.client.api.PulsarClientException$LookupException: {"errorMsg":"Topic creation encountered an exception by initialize topic policies service. topic_name=persistent://public/default/test.topic error_message=Failed to authenticate","reqId":3148985659011010964, "remote":"/127.0.0.1:6650", "local":"/127.0.0.1:64047"} -- Will try again in 0.1 s 10:57:52.316 [pulsar-timer-27-1] INFO org.apache.pulsar.client.impl.ConnectionHandler -- [test.topic] [null] Reconnecting after connection was closed
- Log on server side:
2024-08-18T03:22:22,122+0000 [pulsar-io-3-13] DEBUG org.apache.pulsar.broker.service.ServerCnx - [/172.24.0.1:37336] connect state change to : [Connected]
2024-08-18T03:22:22,122+0000 [pulsar-io-3-13] INFO org.apache.pulsar.broker.service.ServerCnx - [/172.24.0.1:37336] connected with role=OWNER using authMethod=token, clientVersion=Pulsar-Java-v3.2.3, clientProtocolVersion=21, proxyVersion=null
2024-08-18T03:22:22,140+0000 [pulsar-io-3-13] DEBUG org.apache.pulsar.common.protocol.PulsarDecoder - [/172.24.0.1:37336] Received cmd PARTITIONED_METADATA
2024-08-18T03:22:22,140+0000 [pulsar-io-3-13] DEBUG org.apache.pulsar.broker.service.ServerCnx - [persistent://public/default/test.topic] Received PartitionMetadataLookup from /172.24.0.1:37336 for 1495031580381226501
2024-08-18T03:22:22,143+0000 [main-SendThread(zookeeper:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply session id: 0x1000073bd71000a, packet:: clientPath:null serverPath:null finished:false header:: 961,22 replyHeader:: 961,987,0 request:: org.apache.zookeeper.MultiOperationRecord@8adefa96 response:: org.apache.zookeeper.MultiResponse@dd603777
2024-08-18T03:22:22,148+0000 [main-SendThread(zookeeper:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply session id: 0x1000073bd71000a, packet:: clientPath:null serverPath:null finished:false header:: 962,22 replyHeader:: 962,987,0 request:: org.apache.zookeeper.MultiOperationRecord@8074b73c response:: org.apache.zookeeper.MultiResponse@ffffff9b
2024-08-18T03:22:22,149+0000 [main-SendThread(zookeeper:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply session id: 0x1000073bd71000a, packet:: clientPath:/managed-ledgers/public/default/persistent/test.topic serverPath:/managed-ledgers/public/default/persistent/test.topic finished:false header:: 963,3 replyHeader:: 963,987,-101 request:: '/managed-ledgers/public/default/persistent/test.topic,F response::
2024-08-18T03:22:22,150+0000 [pulsar-2-8] DEBUG org.apache.pulsar.broker.service.BrokerService - No autoTopicCreateOverride policy found for persistent://public/default/test.topic 2024-08-18T03:22:22,150+0000 [pulsar-2-8] DEBUG org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - Total number of partitions for topic persistent://public/default/test.topic is 0 2024-08-18T03:22:22,288+0000 [pulsar-io-3-13] DEBUG org.apache.pulsar.common.protocol.PulsarDecoder - [/172.24.0.1:37336] Received cmd LOOKUP 2024-08-18T03:22:22,288+0000 [pulsar-io-3-13] DEBUG org.apache.pulsar.broker.service.ServerCnx - [persistent://public/default/test.topic] Received Lookup from /172.24.0.1:37336 for 1495031580381226502 requesting listener (none) 2024-08-18T03:22:22,292+0000 [main-SendThread(zookeeper:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply session id: 0x1000073bd71000a, packet:: clientPath:null serverPath:null finished:false header:: 964,22 replyHeader:: 964,987,0 request:: org.apache.zookeeper.MultiOperationRecord@609b619f response:: org.apache.zookeeper.MultiResponse@ac99062a 2024-08-18T03:22:22,294+0000 [metadata-store-9-1] INFO org.apache.pulsar.broker.loadbalance.extensions.manager.RedirectManager - No need to redirect, current load manager class name: org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl 2024-08-18T03:22:22,294+0000 [metadata-store-9-1] DEBUG org.apache.pulsar.broker.namespace.NamespaceService - findBrokerServiceUrl: public/default/0xc0000000_0xd0000000 - options: LookupOptions(authoritative=false, readOnly=false, loadTopicsInBundle=true, requestHttps=false, advertisedListenerName=null) 2024-08-18T03:22:22,294+0000 [metadata-store-9-1] DEBUG org.apache.pulsar.broker.namespace.NamespaceService - Namespace bundle public/default/0xc0000000_0xd0000000 already owned by Optional[NamespaceEphemeralData(nativeUrl=pulsar://127.0.0.1:6650, nativeUrlTls=null, httpUrl=http://broker:8080, httpUrlTls=null, disabled=false, advertisedListeners={external=AdvertisedListener(brokerServiceUrl=pulsar://127.0.0.1:6650, brokerServiceUrlTls=null, brokerHttpUrl=null, brokerHttpsUrl=null)})] 2024-08-18T03:22:22,294+0000 [metadata-store-9-1] DEBUG org.apache.pulsar.broker.lookup.TopicLookupBase - [persistent://public/default/test.topic] Lookup result Optional[LookupResult [type=BrokerUrl, lookupData=LookupData{brokerUrl=pulsar://127.0.0.1:6650, brokerUrlTls=null, httpUrl=http://broker:8080, httpUrlTls=null}]] 2024-08-18T03:22:22,307+0000 [pulsar-io-3-13] DEBUG org.apache.pulsar.common.protocol.PulsarDecoder - [/172.24.0.1:37336] Received cmd PRODUCER 2024-08-18T03:22:22,307+0000 [pulsar-io-3-13] DEBUG org.apache.pulsar.broker.service.ServerCnx - [/172.24.0.1:37336] Client is authorized to Produce with role OWNER 2024-08-18T03:22:22,307+0000 [pulsar-io-3-13] DEBUG org.apache.pulsar.broker.service.ServerCnx - [/172.24.0.1:37336][persistent://public/default/test.topic] Creating producer. producerId=0, producerName=dev-es-pulsar-cluster-19-9, schema is present 2024-08-18T03:22:22,307+0000 [pulsar-io-3-13] DEBUG org.apache.pulsar.broker.service.BrokerService - No autoTopicCreateOverride policy found for persistent://public/default/test.topic 2024-08-18T03:22:22,307+0000 [pulsar-io-3-13] INFO org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory - Create topic policies system topic client persistent://public/default/__change_events 2024-08-18T03:22:22,343+0000 [pulsar-io-3-13] DEBUG org.apache.pulsar.client.impl.ConnectionPool - Connection for 127.0.0.1/:6650 not found in cache 2024-08-18T03:22:22,345+0000 [pulsar-io-3-14] INFO org.apache.pulsar.client.impl.ConnectionPool - [[id: 0x78f03fc2, L:/127.0.0.1:37338 - R:/127.0.0.1:6650]] Connected to server 2024-08-18T03:22:22,345+0000 [pulsar-io-3-14] DEBUG org.apache.pulsar.common.protocol.PulsarHandler - [[id: 0x78f03fc2, L:/127.0.0.1:37338 - R:/127.0.0.1:6650]] Scheduling keep-alive task every 30 s 2024-08-18T03:22:22,346+0000 [pulsar-io-3-14] DEBUG org.apache.pulsar.client.impl.ClientCnx - [id: 0x78f03fc2, L:/127.0.0.1:37338 - R:/127.0.0.1:6650] Connected to broker 2024-08-18T03:22:22,346+0000 [pulsar-io-3-15] DEBUG org.apache.pulsar.common.protocol.PulsarHandler - [[id: 0x9b9ead57, L:/127.0.0.1:6650 - R:/127.0.0.1:37338]] Scheduling keep-alive task every 30 s 2024-08-18T03:22:22,347+0000 [pulsar-io-3-15] DEBUG org.apache.pulsar.broker.service.ServerCnx - New connection from /127.0.0.1:37338 2024-08-18T03:22:22,348+0000 [pulsar-io-3-14] DEBUG org.apache.pulsar.client.impl.ClientCnx - Complete: true 2024-08-18T03:22:22,349+0000 [pulsar-io-3-15] DEBUG org.apache.pulsar.common.protocol.PulsarDecoder - [/127.0.0.1:37338] Received cmd CONNECT 2024-08-18T03:22:22,349+0000 [pulsar-io-3-15] DEBUG org.apache.pulsar.broker.service.ServerCnx - Received CONNECT from /127.0.0.1:37338, auth enabled: true: has original principal = false, original principal = null 2024-08-18T03:22:22,350+0000 [pulsar-io-3-15] INFO org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:37338] Failed to authenticate: operation=connect, principal=null, reason=No anonymous role, and no authentication provider configured 2024-08-18T03:22:22,352+0000 [pulsar-io-3-15] INFO org.apache.pulsar.broker.service.ServerCnx - Closed connection from /127.0.0.1:37338 2024-08-18T03:22:22,352+0000 [pulsar-io-3-14] DEBUG org.apache.pulsar.common.protocol.PulsarDecoder - [/127.0.0.1:6650] Received cmd ERROR 2024-08-18T03:22:22,353+0000 [pulsar-io-3-14] WARN org.apache.pulsar.client.impl.ClientCnx - [id: 0x78f03fc2, L:/127.0.0.1:37338 - R:/127.0.0.1:6650] Received error from server: Failed to authenticate 2024-08-18T03:22:22,353+0000 [pulsar-io-3-14] WARN org.apache.pulsar.client.impl.ConnectionPool - [[id: 0x78f03fc2, L:/127.0.0.1:37338 - R:/127.0.0.1:6650]] Connection handshake failed: org.apache.pulsar.client.api.PulsarClientException$AuthenticationException: Failed to authenticate 2024-08-18T03:22:22,353+0000 [pulsar-io-3-14] WARN org.apache.pulsar.client.impl.PulsarClientImpl - [persistent://public/default/__change_events] Failed to get partitioned topic metadata java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException$AuthenticationException: Failed to authenticate at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332) ~[?:?]