pulsar-client-python icon indicating copy to clipboard operation
pulsar-client-python copied to clipboard

Pulsar Client not working in Google App Engine Standard environment.

Open jesumyip opened this issue 2 years ago • 0 comments
trafficstars

Consider the following Python code:

    print("----------> 1")
    auth_token = "<my token>"
    print("----------> 2")
    pulsar_client = pulsar.Client("pulsar+ssl://dev-stream-int.datamanaged.io:6651", authentication=pulsar.AuthenticationToken(auth_token))
    print("----------> 3")
    producer = pulsar_client.create_producer('persistent://bladerunner/notification/subscribe')
    print("----------> 4")
    producer.send("Hammurabi".encode('utf-8'), None)
    print("----------> 5")

When this code is executed, it crashes with the following error:

  File "/workspace/api/pulsartest.py", line 43, in pulsartest
    producer = pulsar_client.create_producer('persistent://bladerunner/notification/subscribe')
  File "/layers/google.python.pip/pip/lib/python3.9/site-packages/pulsar/__init__.py", line 603, in create_producer
    p._producer = self._client.create_producer(topic, conf)
_pulsar.ConnectError: Pulsar error: ConnectError

From the logs for the Python code, I am able to see:

image

It corroborates the log above that this line of code caused the crash:

producer = pulsar_client.create_producer('persistent://bladerunner/notification/subscribe')

Now when I run the same Python code in a Ubuntu VM, it works just fine.

Some background:

  1. Pulsar (v2.10.2) is installed in a GKE cluster in GCP Project-A.
  2. I deployed a Ubuntu VM outside of the cluster but within the same VPC network as the GKE cluster. This VM is used to run the same Python code for testing to compare the results against App Engine. In this VM, the code executes successfully.
  3. My App Engine code uses FastAPI and has the following in the deployment yaml: entrypoint: gunicorn main:app -k uvicorn.workers.UvicornWorker. The App Engine instance is running in a different GCP project (let's call it Project-B) but it has VPC peering established with Project-A and I use Python3.9 runtime.
  4. I have pulsar-client==2.10.2 in my requirements.txt file.
  5. I have istio sitting in front of pulsar-proxy doing TLS termination.
  6. Within the instance of pulsar, TLS is not enabled.

When the App Engine code is executed, I see these logs in GKE for pulsar-proxy:

2022-12-08T08:26:26,015+0000 [pulsar-proxy-io-2-1] INFO org.apache.pulsar.proxy.server.ProxyConnection - [/127.0.0.6:39297] New connection opened
2022-12-08T08:26:26,158+0000 [pulsar-proxy-io-2-1] INFO org.apache.pulsar.proxy.server.ProxyConnection - [/127.0.0.6:39297] complete connection, init proxy handler. authenticated with token role admin, hasProxyToBrokerUrl: false
2022-12-08T08:26:26,246+0000 [pulsar-proxy-io-2-1] INFO org.apache.pulsar.client.impl.ConnectionPool - [[id: 0x77f9b9a7, L:/10.44.3.20:49572 - R:pulsar-dev-2-broker.hulk-dev-2.svc.cluster.local/10.44.14.33:6650]] Connected to server
2022-12-08T08:26:26,313+0000 [pulsar-proxy-io-2-1] INFO org.apache.pulsar.proxy.server.ProxyConnection - [/127.0.0.6:44881] New connection opened
2022-12-08T08:26:26,354+0000 [pulsar-proxy-io-2-1] INFO org.apache.pulsar.proxy.server.ProxyConnection - [/127.0.0.6:44881] complete connection, init proxy handler. authenticated with token role admin, hasProxyToBrokerUrl: true
2022-12-08T08:26:26,355+0000 [pulsar-proxy-io-2-1] WARN org.apache.pulsar.proxy.server.ProxyConnection - [/127.0.0.6:44881] Unable to authenticate:
java.lang.NumberFormatException: For input string: "" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) ~[?:?] at java.lang.Integer.parseInt(Integer.java:662) ~[?:?] at java.lang.Integer.parseInt(Integer.java:770) ~[?:?] at org.apache.pulsar.proxy.server.BrokerProxyValidator.resolveAndCheckTargetAddress(BrokerProxyValidator.java:118) ~[org.apache.pulsar-pulsar-proxy-2.10.2.jar:2.10.2] at org.apache.pulsar.proxy.server.ProxyConnection.completeConnect(ProxyConnection.java:304) ~[org.apache.pulsar-pulsar-proxy-2.10.2.jar:2.10.2] at org.apache.pulsar.proxy.server.ProxyConnection.doAuthentication(ProxyConnection.java:389) ~[org.apache.pulsar-pulsar-proxy-2.10.2.jar:2.10.2] at org.apache.pulsar.proxy.server.ProxyConnection.handleConnect(ProxyConnection.java:471) ~[org.apache.pulsar-pulsar-proxy-2.10.2.jar:2.10.2] at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:167) ~[org.apache.pulsar-pulsar-common-2.10.2.jar:2.10.2] at org.apache.pulsar.proxy.server.ProxyConnection.channelRead(ProxyConnection.java:234) ~[org.apache.pulsar-pulsar-proxy-2.10.2.jar:2.10.2] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) ~[io.netty-netty-codec-4.1.77.Final.jar:4.1.77.Final] at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299) ~[io.netty-netty-codec-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) ~[io.netty-netty-handler-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152) ~[io.netty-netty-handler-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800) ~[io.netty-netty-transport-classes-epoll-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:487) ~[io.netty-netty-transport-classes-epoll-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:385) ~[io.netty-netty-transport-classes-epoll-4.1.77.Final.jar:4.1.77.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995) ~[io.netty-netty-common-4.1.77.Final.jar:4.1.77.Final] at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[io.netty-netty-common-4.1.77.Final.jar:4.1.77.Final] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.77.Final.jar:4.1.77.Final] at java.lang.Thread.run(Thread.java:829) ~[?:?]
2022-12-08T08:26:26,368+0000 [pulsar-proxy-io-2-1] INFO org.apache.pulsar.proxy.server.ProxyConnection - [/127.0.0.6:44881] Connection closed

Based on the logs, I am quite sure network packets are flowing correctly between Project-A and Project-B such that App Engine is able to reach pulsar-proxy. I also know that the combination of tls + istio + GKE is working fine because the same Python code in the Ubuntu VM works with 100% success rate. The Ubuntu VM was purposely installed outside of GKE as opposed to running the Python code in a Ubuntu pod within the cluster to simulate remote network connectivity.

The error log from pulsar-proxy seems to point to this line:
https://github.com/apache/pulsar/blob/da87e40aca848c0cb1ede7ba56605bdcd5f96137/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java#L354

jesumyip avatar Dec 08 '22 09:12 jesumyip