pulsar-client-python
pulsar-client-python copied to clipboard
Pulsar Client not working in Google App Engine Standard environment.
Consider the following Python code:
print("----------> 1")
auth_token = "<my token>"
print("----------> 2")
pulsar_client = pulsar.Client("pulsar+ssl://dev-stream-int.datamanaged.io:6651", authentication=pulsar.AuthenticationToken(auth_token))
print("----------> 3")
producer = pulsar_client.create_producer('persistent://bladerunner/notification/subscribe')
print("----------> 4")
producer.send("Hammurabi".encode('utf-8'), None)
print("----------> 5")
When this code is executed, it crashes with the following error:
File "/workspace/api/pulsartest.py", line 43, in pulsartest
producer = pulsar_client.create_producer('persistent://bladerunner/notification/subscribe')
File "/layers/google.python.pip/pip/lib/python3.9/site-packages/pulsar/__init__.py", line 603, in create_producer
p._producer = self._client.create_producer(topic, conf)
_pulsar.ConnectError: Pulsar error: ConnectError
From the logs for the Python code, I am able to see:
It corroborates the log above that this line of code caused the crash:
producer = pulsar_client.create_producer('persistent://bladerunner/notification/subscribe')
Now when I run the same Python code in a Ubuntu VM, it works just fine.
Some background:
- Pulsar (v2.10.2) is installed in a GKE cluster in GCP Project-A.
- I deployed a Ubuntu VM outside of the cluster but within the same VPC network as the GKE cluster. This VM is used to run the same Python code for testing to compare the results against App Engine. In this VM, the code executes successfully.
- My App Engine code uses FastAPI and has the following in the deployment yaml:
entrypoint: gunicorn main:app -k uvicorn.workers.UvicornWorker. The App Engine instance is running in a different GCP project (let's call it Project-B) but it has VPC peering established with Project-A and I use Python3.9 runtime. - I have
pulsar-client==2.10.2in myrequirements.txtfile. - I have istio sitting in front of pulsar-proxy doing TLS termination.
- Within the instance of pulsar, TLS is not enabled.
When the App Engine code is executed, I see these logs in GKE for pulsar-proxy:
2022-12-08T08:26:26,015+0000 [pulsar-proxy-io-2-1] INFO org.apache.pulsar.proxy.server.ProxyConnection - [/127.0.0.6:39297] New connection opened
2022-12-08T08:26:26,158+0000 [pulsar-proxy-io-2-1] INFO org.apache.pulsar.proxy.server.ProxyConnection - [/127.0.0.6:39297] complete connection, init proxy handler. authenticated with token role admin, hasProxyToBrokerUrl: false
2022-12-08T08:26:26,246+0000 [pulsar-proxy-io-2-1] INFO org.apache.pulsar.client.impl.ConnectionPool - [[id: 0x77f9b9a7, L:/10.44.3.20:49572 - R:pulsar-dev-2-broker.hulk-dev-2.svc.cluster.local/10.44.14.33:6650]] Connected to server
2022-12-08T08:26:26,313+0000 [pulsar-proxy-io-2-1] INFO org.apache.pulsar.proxy.server.ProxyConnection - [/127.0.0.6:44881] New connection opened
2022-12-08T08:26:26,354+0000 [pulsar-proxy-io-2-1] INFO org.apache.pulsar.proxy.server.ProxyConnection - [/127.0.0.6:44881] complete connection, init proxy handler. authenticated with token role admin, hasProxyToBrokerUrl: true
2022-12-08T08:26:26,355+0000 [pulsar-proxy-io-2-1] WARN org.apache.pulsar.proxy.server.ProxyConnection - [/127.0.0.6:44881] Unable to authenticate:
java.lang.NumberFormatException: For input string: "" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) ~[?:?] at java.lang.Integer.parseInt(Integer.java:662) ~[?:?] at java.lang.Integer.parseInt(Integer.java:770) ~[?:?] at org.apache.pulsar.proxy.server.BrokerProxyValidator.resolveAndCheckTargetAddress(BrokerProxyValidator.java:118) ~[org.apache.pulsar-pulsar-proxy-2.10.2.jar:2.10.2] at org.apache.pulsar.proxy.server.ProxyConnection.completeConnect(ProxyConnection.java:304) ~[org.apache.pulsar-pulsar-proxy-2.10.2.jar:2.10.2] at org.apache.pulsar.proxy.server.ProxyConnection.doAuthentication(ProxyConnection.java:389) ~[org.apache.pulsar-pulsar-proxy-2.10.2.jar:2.10.2] at org.apache.pulsar.proxy.server.ProxyConnection.handleConnect(ProxyConnection.java:471) ~[org.apache.pulsar-pulsar-proxy-2.10.2.jar:2.10.2] at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:167) ~[org.apache.pulsar-pulsar-common-2.10.2.jar:2.10.2] at org.apache.pulsar.proxy.server.ProxyConnection.channelRead(ProxyConnection.java:234) ~[org.apache.pulsar-pulsar-proxy-2.10.2.jar:2.10.2] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) ~[io.netty-netty-codec-4.1.77.Final.jar:4.1.77.Final] at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299) ~[io.netty-netty-codec-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) ~[io.netty-netty-handler-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152) ~[io.netty-netty-handler-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800) ~[io.netty-netty-transport-classes-epoll-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:487) ~[io.netty-netty-transport-classes-epoll-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:385) ~[io.netty-netty-transport-classes-epoll-4.1.77.Final.jar:4.1.77.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995) ~[io.netty-netty-common-4.1.77.Final.jar:4.1.77.Final] at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[io.netty-netty-common-4.1.77.Final.jar:4.1.77.Final] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.77.Final.jar:4.1.77.Final] at java.lang.Thread.run(Thread.java:829) ~[?:?]
2022-12-08T08:26:26,368+0000 [pulsar-proxy-io-2-1] INFO org.apache.pulsar.proxy.server.ProxyConnection - [/127.0.0.6:44881] Connection closed
Based on the logs, I am quite sure network packets are flowing correctly between Project-A and Project-B such that App Engine is able to reach pulsar-proxy. I also know that the combination of tls + istio + GKE is working fine because the same Python code in the Ubuntu VM works with 100% success rate. The Ubuntu VM was purposely installed outside of GKE as opposed to running the Python code in a Ubuntu pod within the cluster to simulate remote network connectivity.
The error log from pulsar-proxy seems to point to this line:
https://github.com/apache/pulsar/blob/da87e40aca848c0cb1ede7ba56605bdcd5f96137/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java#L354