aop
aop copied to clipboard
[BUG] Error when using proxy
When I try to use the Proxy I receive this error in the client.
Exception in thread "main" java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:129)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:125)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:147)
at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:423)
at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:64)
at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:156)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1110)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1067)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1025)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1187)
at com.gm.pulsarpoc.tutorial.AMQP.amqp_poc_consumer.main(amqp_poc_consumer.java:37)
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:293)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141)
... 8 more
Caused by: java.io.EOFException
at java.base/java.io.DataInputStream.readUnsignedByte(DataInputStream.java:296)
at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:91)
at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:184)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:645)
at java.base/java.lang.Thread.run(Thread.java:833)
Looking in the broker logs I can see this:
2023-04-05T07:57:41,902-0400 [pulsar-io-4-2] ERROR io.streamnative.pulsar.handlers.amqp.proxy.ProxyConnection - Lookup broker failed; may retry.
io.streamnative.pulsar.handlers.amqp.proxy.ProxyException: Unable to locate metadata for the broker of the topic: persistent://public/default/__lookup__
at io.streamnative.pulsar.handlers.amqp.proxy.PulsarServiceLookupHandler.lambda$findBroker$1(PulsarServiceLookupHandler.java:75) ~[cIYN-EeoapXWiBkI2xvGAQ/:?]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) ~[?:?]
at org.apache.pulsar.client.impl.BinaryProtoLookupService.lambda$findBroker$1(BinaryProtoLookupService.java:164) ~[org.apache.pulsar-pulsar-client-original-2.9.4.jar:2.9.4]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) ~[?:?]
at org.apache.pulsar.client.impl.ClientCnx.handleLookupResponse(ClientCnx.java:586) ~[org.apache.pulsar-pulsar-client-original-2.9.4.jar:2.9.4]
at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:140) ~[org.apache.pulsar-pulsar-common-2.9.4.jar:2.9.4]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) ~[io.netty-netty-codec-4.1.77.Final.jar:4.1.77.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299) ~[io.netty-netty-codec-4.1.77.Final.jar:4.1.77.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final]
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1372) ~[io.netty-netty-handler-4.1.77.Final.jar:4.1.77.Final]
at io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1246) ~[io.netty-netty-handler-4.1.77.Final.jar:4.1.77.Final]
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1286) ~[io.netty-netty-handler-4.1.77.Final.jar:4.1.77.Final]
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:510) ~[io.netty-netty-codec-4.1.77.Final.jar:4.1.77.Final]
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:449) ~[io.netty-netty-codec-4.1.77.Final.jar:4.1.77.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279) ~[io.netty-netty-codec-4.1.77.Final.jar:4.1.77.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final]
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800) ~[io.netty-netty-transport-classes-epoll-4.1.77.Final.jar:4.1.77.Final]
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:487) ~[io.netty-netty-transport-classes-epoll-4.1.77.Final.jar:4.1.77.Final]
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:385) ~[io.netty-netty-transport-classes-epoll-4.1.77.Final.jar:4.1.77.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995) ~[io.netty-netty-common-4.1.77.Final.jar:4.1.77.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[io.netty-netty-common-4.1.77.Final.jar:4.1.77.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.77.Final.jar:4.1.77.Final]
at java.lang.Thread.run(Thread.java:833) ~[?:?]
2023-04-05T07:57:41,902-0400 [pulsar-io-4-2] INFO io.streamnative.pulsar.handlers.amqp.proxy.ProxyConnection - handle connect residue retryTimes: 0
2023-04-05T07:57:41,902-0400 [pulsar-io-4-2] WARN io.streamnative.pulsar.handlers.amqp.proxy.ProxyConnection - Handle connect retryTimes is 0.
2023-04-05T07:57:41,902-0400 [pulsar-io-4-2] INFO io.streamnative.pulsar.handlers.amqp.proxy.ProxyConnection - ProxyConnection close.
2023-04-05T07:57:41,902-0400 [amqp-redirect-io-56-4] INFO io.streamnative.pulsar.handlers.amqp.proxy.ProxyConnection - ProxyConnection close.
I am using this config:
messagingProtocols=amqp protocolHandlerDirectory=./protocols amqpListeners=amqp://127.0.0.1:5672
amqpProxyEnable=true amqpProxyPort=5682
Should create tenant public
and namespace default
, and the number of namespace bundles should be 1.
The tenant and namespace have been created like so
$PULSAR_HOME/bin/pulsar-admin namespaces create -b 1 public/default
$PULSAR_HOME/bin/pulsar-admin namespaces set-retention -s 100M -t 3d public/default
Should connect proxyPort 5682
.