aop icon indicating copy to clipboard operation
aop copied to clipboard

[BUG] Error receiving message with aop.

Open yasu1209 opened this issue 4 years ago • 6 comments
trafficstars

Describe the bug I'm using pulsar 2.8.0 and aop 2.8.0-rc-202106071430. And I'm using the sample code in README.MD to publish and subscrbe message. But no messages returned.

To Reproduce Steps to reproduce the behavior:

  1. I run the pulsar on a server, and add aop configuration into standalone.conf
messagingProtocols=mqtt,amqp,kafka
amqpListeners=amqp://127.0.0.1:5672
advertisedAddress=10.191.5.110
  1. Run the pulsar as standalone.
  2. Use the java code for testing.
@Test
    public void test04() throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setVirtualHost("vhost1");
        connectionFactory.setHost("10.191.5.110");
        connectionFactory.setPort(5672);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        String exchange = "ex";
        String queue = "qu";

        // exchage declare
        channel.exchangeDeclare(exchange, BuiltinExchangeType.FANOUT, true, false, false, null);

        // queue declare and bind
        channel.queueDeclare(queue, true, false, false, null);
        channel.queueBind(queue, exchange, "");

        // publish some messages
        for (int i = 0; i < 100; i++) {
            channel.basicPublish(exchange, "", null, ("hello-" + i).getBytes());
        }

        // consume messages
        CountDownLatch countDownLatch = new CountDownLatch(100);
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("receive msg: " + new String(body));
                countDownLatch.countDown();
            }
        });
        countDownLatch.await();

        // release resource
        channel.close();
        connection.close();
    }
  1. Run the test and no messages returned.

And when publishing messages to pulsar, warning logs are printed.

14:18:09.771 [BookKeeperClientWorker-OrderedExecutor-0-0] WARN  io.streamnative.pulsar.handlers.amqp.AmqpExchangeReplicator - Route message failed.
java.lang.IllegalArgumentException: Invalid unknonwn tag type: 6
        at org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:270) ~[org.apache.pulsar-pulsar-common-2.8.0.jar:2.8.0]
        at org.apache.pulsar.common.api.proto.MessageMetadata.parseFrom(MessageMetadata.java:1370) ~[org.apache.pulsar-pulsar-common-2.8.0.jar:2.8.0]
        at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:426) ~[org.apache.pulsar-pulsar-common-2.8.0.jar:2.8.0]
        at org.apache.pulsar.client.impl.MessageImpl.deserialize(MessageImpl.java:261) ~[org.apache.pulsar-pulsar-client-original-2.8.0.jar:2.8.0]
        at io.streamnative.pulsar.handlers.amqp.impl.PersistentExchange$1.readProcess(PersistentExchange.java:87) ~[?:?]
        at io.streamnative.pulsar.handlers.amqp.AmqpExchangeReplicator.readEntriesComplete(AmqpExchangeReplicator.java:197) ~[?:?]
        at org.apache.bookkeeper.mledger.impl.OpReadEntry.lambda$checkReadCompletion$2(OpReadEntry.java:156) ~[org.apache.pulsar-managed-ledger-2.8.0.jar:2.8.0]
        at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) [org.apache.pulsar-managed-ledger-2.8.0.jar:2.8.0]
        at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.14.1.jar:4.14.1]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_282]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_282]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.63.Final.jar:4.1.63.Final]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
14:18:09.773 [BookKeeperClientWorker-OrderedExecutor-0-0] WARN  io.streamnative.pulsar.handlers.amqp.AmqpExchangeReplicator - Route message failed.
java.lang.IllegalArgumentException: Invalid unknonwn tag type: 6
        at org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:270) ~[org.apache.pulsar-pulsar-common-2.8.0.jar:2.8.0]
        at org.apache.pulsar.common.api.proto.MessageMetadata.parseFrom(MessageMetadata.java:1370) ~[org.apache.pulsar-pulsar-common-2.8.0.jar:2.8.0]
        at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:426) ~[org.apache.pulsar-pulsar-common-2.8.0.jar:2.8.0]
        at org.apache.pulsar.client.impl.MessageImpl.deserialize(MessageImpl.java:261) ~[org.apache.pulsar-pulsar-client-original-2.8.0.jar:2.8.0]
        at io.streamnative.pulsar.handlers.amqp.impl.PersistentExchange$1.readProcess(PersistentExchange.java:87) ~[?:?]
        at io.streamnative.pulsar.handlers.amqp.AmqpExchangeReplicator.readEntriesComplete(AmqpExchangeReplicator.java:197) ~[?:?]
        at org.apache.bookkeeper.mledger.impl.OpReadEntry.lambda$checkReadCompletion$2(OpReadEntry.java:156) ~[org.apache.pulsar-managed-ledger-2.8.0.jar:2.8.0]
        at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) [org.apache.pulsar-managed-ledger-2.8.0.jar:2.8.0]
        at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.14.1.jar:4.14.1]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_282]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_282]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.63.Final.jar:4.1.63.Final]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]

And I can receive the messages when using pulsar consumer.

Screenshots In pulsar-manager the subscription type is none. image

I may done something wrong. Please tell me.

yasu1209 avatar Sep 08 '21 06:09 yasu1209

I've tried pulsar 2.7.3 and aop 2.7.2.9, the error during publishment is gone. But still no message returned.

yasu1209 avatar Sep 10 '21 03:09 yasu1209

I have same problem,I've tried both pulsar 2.7.0 and aop 2.7.0.3,pulsar 2.8.0 and aop 2.8.1.0. And no message returned,no error in logs.

WrathLi avatar Sep 10 '21 09:09 WrathLi

I encountered the same problem. So yesterday I tried to solve this problem, and found the problem during testing: the internal development version of AOP is not compatible with the current apache pulsar 2.8.0 version, because the ServiceConfigurationUtils.getAppliedAdvertisedAddress method was modified, resulting in an error when loading the plug-in. At present, apache pulsar 2.8.0 is only compatible with AOP 2.8.0.3 or earlier versions, and version 2.8.0.4 or later versions needs to wait for the official release of apache pulsar 2.8.1.

kuangye098 avatar Sep 13 '21 02:09 kuangye098

I encountered the same problem. So yesterday I tried to solve this problem, and found the problem during testing: the internal development version of AOP is not compatible with the current apache pulsar 2.8.0 version, because the ServiceConfigurationUtils.getAppliedAdvertisedAddress method was modified, resulting in an error when loading the plug-in. At present, apache pulsar 2.8.0 is only compatible with AOP 2.8.0.3 or earlier versions, and version 2.8.0.4 or later versions needs to wait for the official release of apache pulsar 2.8.1.

Thank you for your information. I'll give it a try. Did you try pulsar 2.7? The combination of pulsar 2.7.3 and aop 2.7.2.9 seems not work too.

yasu1209 avatar Sep 13 '21 02:09 yasu1209

I am using apache pulsar 2.8.0 and aop 2.8.0.3 . The official test examples can pass normally. You can try apache pulsar 2.7.3 and aop 2.8.0.3 or 2.7.2 and 2.7.2.9?

kuangye098 avatar Sep 13 '21 02:09 kuangye098

I am using apache pulsar 2.8.0 and aop 2.8.0.3 . The official test examples can pass normally. You can try apache pulsar 2.7.3 and aop 2.8.0.3 or 2.7.2 and 2.7.2.9?

  • 2.7.2(which builds a 2.7.2.2.nar) no messages returned.

  • 2.7.2.9 no messages returned.

  • 2.8.0.3 error when running sample code image

yasu1209 avatar Sep 13 '21 03:09 yasu1209