quarkus icon indicating copy to clipboard operation
quarkus copied to clipboard

Upgraded to 3.10.0.CR1 failed In-Memory Message Tests

Open gsmet opened this issue 9 months ago • 6 comments

Discussed in https://github.com/quarkusio/quarkus/discussions/40219

Originally posted by hantsy April 23, 2024 All messaging examples in my repository are failed when upgrading to the newest Quarkus 3.10.0.CR1, eg. https://github.com/hantsy/quarkus-sandbox/tree/master/amqp

I followed this guide to clean up the dependency artifact id and use the new name in all messaging examples, https://github.com/quarkusio/quarkus/wiki/Migration-Guide-3.9#resteasy-reactive-extensions-renamed-to-quarkus-rest-gear-white_check_mark

@QuarkusTest
@TestProfile(InMemoryProfile.class)
class MessageHandlerTest {

    @Inject
    @Any
    InMemoryConnector connector;

    @Inject
    MessageHandler handler;

    @BeforeEach
    void setUp() {
    }

    @Test
    void receive() {
        InMemorySource<String> messages = connector.source("messages");
        InMemorySink<String> sink = connector.sink("send");
        InMemorySink<Message> dataStream = connector.sink("data-stream");


        handler.send("hello");
        assertThat(sink.received().getFirst().getPayload()).isEqualTo("hello");

        messages.send("hello-123");
        assertThat(dataStream.received().getFirst().getPayload().body()).isEqualTo("hello-123");
    }
}

The assertThat(dataStream.received().getFirst().getPayload().body()).isEqualTo("hello-123"); throws the following exceptions

java.util.NoSuchElementException
	at java.base/java.util.ArrayList.getFirst(ArrayList.java:439)
	at com.example.MessageHandlerTest.receive(MessageHandlerTest.java:41)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at io.quarkus.test.junit.QuarkusTestExtension.runExtensionMethod(QuarkusTestExtension.java:1018)
	at io.quarkus.test.junit.QuarkusTestExtension.interceptTestMethod(QuarkusTestExtension.java:832)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)

2024-04-23 22:38:03,206 INFO  [io.quarkus] (main) quarkus-example-rabbitmq stopped in 0.033s

Process finished with exit code -1
```</div>

gsmet avatar Apr 26 '24 15:04 gsmet

@ozangunalp could you have a look at this one? It looks a bit concerning and AFAICS there is a reproducer.

gsmet avatar Apr 26 '24 15:04 gsmet

This is normal, you should not assume that the send operation, even with the in-memory connector, is synchronous. So in the next line, you should have something like the following :

await().untilAsserted(() -> assertThat(sink.received().get(0).getPayload()).isEqualTo("hello"));

The change, I think, is related to the change in Mutiny 2.6.0, which changes the queue implementation.

ozangunalp avatar Apr 26 '24 17:04 ozangunalp

Check the 3.7.2 build, https://github.com/hantsy/quarkus-sandbox/commit/295c6852e02d116aa47bff5d19ac9fb41c6dd101, amqp, amqp-rabbit, rabbitmq kafka example projects worked well.

But the pulsar example in-memory test did not work due to an issue I have not found a solution. see: https://github.com/quarkusio/quarkus/discussions/36764

hantsy avatar Apr 27 '24 00:04 hantsy

@ozangunalp Will try it with await().untilAsserted(...).

But I am not sure why two different exceptions thrown.

https://github.com/quarkusio/quarkus/discussions/40219#discussioncomment-9206892

In the kafka and pulsar example, the same tests got different errors:

https://github.com/hantsy/quarkus-sandbox/actions/runs/8800974801/job/24153288400

Invalid channel configuration -  the `connector` attribute must be set for channel `data-stream`

hantsy avatar Apr 27 '24 01:04 hantsy

The Smallrye Reactive Messaging testing does not explain the sync/async when using in-memory connector.

https://smallrye.io/smallrye-reactive-messaging/4.21.0/concepts/testing/

hantsy avatar Apr 27 '24 01:04 hantsy

Added await().untilAsserted(...) to all messaging example projects: https://github.com/hantsy/quarkus-sandbox/commit/9411a96de41102d5619e133d9b674523c3132f09

Check the build result:

  • The kakfa build error: https://github.com/hantsy/quarkus-sandbox/actions/runs/8856423669/job/24322679423 (worked in the 3.7.2 commit)
  • The pulsar build error: https://github.com/hantsy/quarkus-sandbox/actions/runs/8856423673/job/24322679420

hantsy avatar Apr 27 '24 01:04 hantsy

No, it wouldn't work either on 3.7.2. Again you are trying to replace an internal channel data-stream with an in-memory test connector.

ozangunalp avatar Apr 29 '24 07:04 ozangunalp

No, it wouldn't work either on 3.7.2.

I am sure kafka example project is working well before, check my link of the build result for it. And it never threw such an exception like the connector attribute must be set for channel data-stream

Again you are trying to replace an internal channel data-stream with an in-memory test connector.

So I can change the channel name to fix it?

hantsy avatar Apr 29 '24 07:04 hantsy

For the async aspect of in-memory tests, I agree that it wasn't the case before. @jponge do you think this can be related to the queue implementation change in Mutiny 2.6.0?

ozangunalp avatar Apr 29 '24 08:04 ozangunalp

I am sure kafka example project is working well before, check my link of the build result for it. And it never threw such an exception like the connector attribute must be set for channel data-stream

Yes for the same build pulsar tests are failing as they are supposed to. I'll need to debug the 3.7.2 to understand what was going on to pass that check. But for me it is correct that it fails.

ozangunalp avatar Apr 29 '24 08:04 ozangunalp

For the async aspect of in-memory tests, I agree that it wasn't the case before. @jponge do you think this can be related to the queue implementation change in Mutiny 2.6.0?

I'd be surprised if the queue provider change has had any impact, but if you find something let's dig it 👍

jponge avatar Apr 29 '24 08:04 jponge

@ozangunalp Got new exceptions in the kafka and pulsar examples after renamed the data-stream to messages-stream.

Caused by: jakarta.enterprise.inject.spi.DeploymentException: 
java.lang.IllegalArgumentException: 
SRMSG00071: Invalid channel configuration - 
 the `connector` attribute must be set for channel `messages-stream`

Check the build of this commit: https://github.com/hantsy/quarkus-sandbox/commit/7991825fa6fb1b6dd196b409155582f012fe2a41

hantsy avatar Apr 29 '24 10:04 hantsy

I'd be surprised if the queue provider change has had any impact, but if you find something let's dig it 👍

I'll also need to debug that...

ozangunalp avatar Apr 29 '24 11:04 ozangunalp

/cc @cescoffier (reactive-messaging)

quarkus-bot[bot] avatar Apr 29 '24 14:04 quarkus-bot[bot]

@ozangunalp Got new exceptions in the kafka and pulsar examples after renamed the data-stream to messages-stream.

Caused by: jakarta.enterprise.inject.spi.DeploymentException: 
java.lang.IllegalArgumentException: 
SRMSG00071: Invalid channel configuration - 
 the `connector` attribute must be set for channel `messages-stream`

Check the build of this commit: hantsy/quarkus-sandbox@7991825

It is not about the specific name of that channel. It is the fact that it is not already connected to a connector. If you replace the outgoing data-stream channel with the in-memory connector, the incoming data-stream becomes unconnected to a downstream. So Quarkus tries to link that incoming data-stream channel to the connector, and also detect the serde (or schema in case of pulsar). Because there is a serde/schema configuration, at startup time, a related connector configuration is required for the incoming data-stream and because there is none the startup fails.

Replacing internal channels with in-memory connector creates, in most cases, an unresolved graph of channels. That's why it is discouraged.

Then why was it working before with Kafka extension? There was an issue with the Kafka serde detection which prevented the discovery for @Channels with Flow.Publisher signature. When this was resolved it started to fail.

As a current workaround, you can disable the serde/schema detection : quarkus.messaging.kafka.serializer-autodetection.enabled=false and quarkus.messaging.pulsar.schema-autodetection.enabled=false in your InMemoryProfile test profile.

All that being said, there is (probably) an issue with the schema auto-detection which should not trigger on that incoming @Channel("data-stream"), because it is not a connector channel. Again replacing an internal channel with in-memory messes up with some assumptions.

ozangunalp avatar Apr 29 '24 14:04 ozangunalp

If you replace the outgoing data-stream channel with the in-memory connector, the incoming data-stream becomes unconnected to a downstream.

I replaced all occurrences of data-stream to messages-stream, include the Java codes and application.properties.

hantsy avatar May 01 '24 02:05 hantsy

As a current workaround, you can disable the serde/schema detection : quarkus.messaging.kafka.serializer-autodetection.enabled=false and quarkus.messaging.pulsar.schema-autodetection.enabled=false in your InMemoryProfile test profile.

This resolved the issues of kafka and pulsar example projects, Thanks.

hantsy avatar May 01 '24 03:05 hantsy